How to Build a Notification System That Actually Gets Read

·9 min read·Automation

Build a multi-channel notification system that routes alerts to Slack, email, and SMS based on severity — with rate limiting, digests, and escalation so people do not ignore your alerts.

How to Build a Notification System That Actually Gets Read

Your pipeline sends alerts. Nobody reads them. The Slack channel has 200 unread messages, all saying "Pipeline completed successfully." When something actually breaks, the real alert is buried.

This is alert fatigue. And it is the reason most notification systems fail — not because they cannot send messages, but because they send too many of the wrong ones.

This guide builds a notification system that routes alerts by severity, rate-limits noise, digests low-priority updates, and escalates critical failures. By the end, your alerts will be read because they will be worth reading.

# The Notification Architecture

flowchart TD
  E["Event\n(pipeline output)"] --> R["Rules Engine\n(severity + routing)"]
  R -->|Critical| S["Slack #critical\n+ SMS"]
  R -->|Warning| SL["Slack #alerts"]
  R -->|Info| D["Digest\n(daily summary)"]
  R -->|Success| L["Log Only"]
  S --> RL["Rate Limiter"]
  SL --> RL
  RL --> Send["Deliver"]

Events come in. The rules engine decides where they go. Rate limiting prevents flooding. Digests batch low-priority items into a single daily message.

# What You Will Need

bash
pip install requests schedule
  • requests — HTTP client for Slack and webhook calls
  • schedule — lightweight task scheduling for digests

# Step 1: Define Alert Severity

Every alert should have a severity level. Without this, all alerts look the same and all get ignored.

python
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime


class Severity(Enum):
    CRITICAL = "critical"   # Pipeline failed, data loss risk
    WARNING = "warning"     # Something unusual, might need attention
    INFO = "info"           # Status update, progress tracking
    SUCCESS = "success"     # Pipeline completed normally


@dataclass
class Alert:
    """A structured alert with all routing metadata."""

    title: str
    message: str
    severity: Severity
    source: str                          # Which pipeline or system
    timestamp: datetime = field(default_factory=datetime.now)
    context: dict = field(default_factory=dict)  # Extra data for debugging

    @property
    def emoji(self):
        return {
            Severity.CRITICAL: "🔴",
            Severity.WARNING: "🟡",
            Severity.INFO: "🔵",
            Severity.SUCCESS: "🟢",
        }[self.severity]

# Creating Alerts

python
# From a pipeline
alert = Alert(
    title="Daily sales pipeline failed",
    message="Database connection refused after 3 retries",
    severity=Severity.CRITICAL,
    source="sales_pipeline",
    context={"stage": "extract", "retries": 3, "error": "ConnectionRefused"},
)

# Informational
alert = Alert(
    title="Pipeline completed",
    message="Processed 4,521 orders in 12.3 seconds",
    severity=Severity.SUCCESS,
    source="sales_pipeline",
    context={"rows": 4521, "duration_seconds": 12.3},
)

# Step 2: Notification Channels

# Slack Integration

python
import requests
import os
import logging

logger = logging.getLogger("notifications")

class SlackNotifier:
    """Send notifications to Slack channels via webhooks."""

    def __init__(self, webhooks=None):
        self.webhooks = webhooks or {
            "critical": os.environ.get("SLACK_WEBHOOK_CRITICAL"),
            "alerts": os.environ.get("SLACK_WEBHOOK_ALERTS"),
            "info": os.environ.get("SLACK_WEBHOOK_INFO"),
        }

    def send(self, alert, channel="alerts"):
        """Send an alert to a Slack channel."""
        webhook_url = self.webhooks.get(channel)
        if not webhook_url:
            logger.warning(f"No Slack webhook configured for channel: {channel}")
            return False

        payload = {
            "blocks": [
                {
                    "type": "header",
                    "text": {
                        "type": "plain_text",
                        "text": f"{alert.emoji} {alert.title}",
                    },
                },
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": (
                            f"*Source:* {alert.source}\n"
                            f"*Time:* {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}\n"
                            f"*Message:* {alert.message}"
                        ),
                    },
                },
            ],
        }

        # Add context fields if present
        if alert.context:
            fields = []
            for key, value in list(alert.context.items())[:8]:
                fields.append({
                    "type": "mrkdwn",
                    "text": f"*{key}:* {value}",
                })
            payload["blocks"].append({
                "type": "section",
                "fields": fields,
            })

        response = requests.post(webhook_url, json=payload, timeout=10)
        if response.status_code == 200:
            logger.info(f"Slack alert sent to #{channel}: {alert.title}")
            return True
        else:
            logger.error(f"Slack delivery failed: {response.status_code}")
            return False

# Email Notifications

python
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

class EmailNotifier:
    """Send notification emails."""

    def __init__(self, smtp_host=None, smtp_port=587, username=None, password=None):
        self.smtp_host = smtp_host or os.environ.get("SMTP_HOST")
        self.smtp_port = smtp_port
        self.username = username or os.environ.get("SMTP_USERNAME")
        self.password = password or os.environ.get("SMTP_PASSWORD")

    def send(self, alert, recipients):
        """Send an alert via email."""
        msg = MIMEMultipart("alternative")
        msg["Subject"] = f"[{alert.severity.value.upper()}] {alert.title}"
        msg["From"] = self.username
        msg["To"] = ", ".join(recipients)

        # Plain text
        text = (
            f"Alert: {alert.title}\n"
            f"Severity: {alert.severity.value}\n"
            f"Source: {alert.source}\n"
            f"Time: {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
            f"{alert.message}\n"
        )

        # HTML
        context_rows = "".join(
            f"<tr><td><strong>{k}</strong></td><td>{v}</td></tr>"
            for k, v in alert.context.items()
        )
        html = f"""
        <h2>{alert.emoji} {alert.title}</h2>
        <table>
            <tr><td><strong>Severity</strong></td><td>{alert.severity.value}</td></tr>
            <tr><td><strong>Source</strong></td><td>{alert.source}</td></tr>
            <tr><td><strong>Time</strong></td><td>{alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}</td></tr>
            {context_rows}
        </table>
        <p>{alert.message}</p>
        """

        msg.attach(MIMEText(text, "plain"))
        msg.attach(MIMEText(html, "html"))

        with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
            server.starttls()
            server.login(self.username, self.password)
            server.send_message(msg)

        logger.info(f"Email sent to {len(recipients)} recipients: {alert.title}")

# Step 3: Routing Rules

Define where each alert goes based on severity and source:

flowchart LR
  A["Alert"] --> RE["Rules\nEngine"]
  RE -->|"CRITICAL\n(any source)"| C["Slack #critical\n+ Email\n+ SMS"]
  RE -->|"WARNING\n(production)"| W["Slack #alerts\n+ Email"]
  RE -->|"WARNING\n(dev)"| WD["Slack #dev"]
  RE -->|"INFO"| I["Daily Digest"]
  RE -->|"SUCCESS"| S["Log Only"]
python
class NotificationRouter:
    """Route alerts to the right channels based on rules."""

    def __init__(self):
        self.slack = SlackNotifier()
        self.email = EmailNotifier()
        self.digest = DigestCollector()

        # Define routing rules
        self.rules = [
            {
                "match": lambda a: a.severity == Severity.CRITICAL,
                "actions": [
                    lambda a: self.slack.send(a, channel="critical"),
                    lambda a: self.email.send(a, recipients=["[email protected]"]),
                ],
            },
            {
                "match": lambda a: a.severity == Severity.WARNING,
                "actions": [
                    lambda a: self.slack.send(a, channel="alerts"),
                ],
            },
            {
                "match": lambda a: a.severity == Severity.INFO,
                "actions": [
                    lambda a: self.digest.add(a),
                ],
            },
            {
                "match": lambda a: a.severity == Severity.SUCCESS,
                "actions": [
                    lambda a: logger.info(f"Success: {a.source}{a.message}"),
                ],
            },
        ]

    def route(self, alert):
        """Route an alert through matching rules."""
        matched = False
        for rule in self.rules:
            if rule["match"](alert):
                for action in rule["actions"]:
                    try:
                        action(alert)
                    except Exception as e:
                        logger.error(f"Notification action failed: {e}")
                matched = True
                break

        if not matched:
            logger.warning(f"No routing rule matched alert: {alert.title}")

# Step 4: Rate Limiting

Prevent flooding channels when multiple pipelines fail at once.

python
from collections import defaultdict
from datetime import datetime, timedelta

class RateLimiter:
    """Prevent notification flooding."""

    def __init__(self, max_per_minute=5, max_per_hour=30):
        self.max_per_minute = max_per_minute
        self.max_per_hour = max_per_hour
        self.history = defaultdict(list)

    def should_send(self, channel):
        """Check if sending to this channel is allowed."""
        now = datetime.now()

        # Clean old entries
        self.history[channel] = [
            t for t in self.history[channel]
            if t > now - timedelta(hours=1)
        ]

        # Check rate limits
        recent_minute = sum(
            1 for t in self.history[channel] if t > now - timedelta(minutes=1)
        )
        recent_hour = len(self.history[channel])

        if recent_minute >= self.max_per_minute:
            logger.warning(f"Rate limited ({channel}): {recent_minute}/min limit reached")
            return False

        if recent_hour >= self.max_per_hour:
            logger.warning(f"Rate limited ({channel}): {recent_hour}/hour limit reached")
            return False

        return True

    def record_send(self, channel):
        """Record that a notification was sent."""
        self.history[channel].append(datetime.now())


class RateLimitedRouter(NotificationRouter):
    """Router with rate limiting applied."""

    def __init__(self):
        super().__init__()
        self.limiter = RateLimiter(max_per_minute=5, max_per_hour=30)

    def route(self, alert):
        """Route with rate limiting — critical alerts bypass limits."""
        if alert.severity == Severity.CRITICAL:
            # Critical alerts always go through
            super().route(alert)
            return

        channel = alert.severity.value
        if self.limiter.should_send(channel):
            super().route(alert)
            self.limiter.record_send(channel)
        else:
            # Silently add to digest instead
            self.digest.add(alert)
            logger.info(f"Rate limited — alert added to digest: {alert.title}")

# Step 5: Daily Digests

Batch low-priority updates into a single daily summary.

python
import json

class DigestCollector:
    """Collect alerts for daily digest delivery."""

    def __init__(self, digest_file="digest_buffer.json"):
        self.digest_file = digest_file
        self._load()

    def _load(self):
        """Load existing digest buffer."""
        try:
            with open(self.digest_file, "r") as f:
                self.buffer = json.load(f)
        except (FileNotFoundError, json.JSONDecodeError):
            self.buffer = []

    def _save(self):
        """Persist digest buffer to disk."""
        with open(self.digest_file, "w") as f:
            json.dump(self.buffer, f, default=str)

    def add(self, alert):
        """Add an alert to the digest buffer."""
        self.buffer.append({
            "title": alert.title,
            "message": alert.message,
            "severity": alert.severity.value,
            "source": alert.source,
            "timestamp": alert.timestamp.isoformat(),
        })
        self._save()

    def build_digest(self):
        """Build a formatted digest from buffered alerts."""
        if not self.buffer:
            return None

        # Group by source
        by_source = defaultdict(list)
        for item in self.buffer:
            by_source[item["source"]].append(item)

        lines = [f"📋 *Daily Pipeline Digest* — {datetime.now().strftime('%Y-%m-%d')}\n"]

        for source, items in by_source.items():
            lines.append(f"\n*{source}* ({len(items)} events)")
            for item in items[-5:]:  # Last 5 per source
                severity_emoji = {"critical": "🔴", "warning": "🟡", "info": "🔵", "success": "🟢"}
                emoji = severity_emoji.get(item["severity"], "")
                lines.append(f"  {emoji} {item['title']}")

            if len(items) > 5:
                lines.append(f"  _...and {len(items) - 5} more_")

        return "\n".join(lines)

    def send_and_clear(self, slack_notifier, channel="info"):
        """Send the digest and clear the buffer."""
        digest_text = self.build_digest()
        if not digest_text:
            logger.info("No items in digest — skipping")
            return

        webhook_url = slack_notifier.webhooks.get(channel)
        if webhook_url:
            requests.post(webhook_url, json={"text": digest_text}, timeout=10)

        item_count = len(self.buffer)
        self.buffer = []
        self._save()
        logger.info(f"Digest sent: {item_count} items")

# Scheduling the Digest

python
import schedule
import time

def run_digest():
    """Send the daily digest."""
    digest = DigestCollector()
    slack = SlackNotifier()
    digest.send_and_clear(slack, channel="info")

# Send digest every day at 8 AM
schedule.every().day.at("08:00").do(run_digest)

while True:
    schedule.run_pending()
    time.sleep(60)

# Step 6: Escalation Chains

If a critical alert goes unacknowledged, escalate to the next person.

python
class EscalationChain:
    """Escalate alerts through a chain of contacts."""

    def __init__(self):
        self.chains = {
            "default": [
                {"method": "slack", "channel": "critical", "wait_minutes": 0},
                {"method": "email", "contact": "[email protected]", "wait_minutes": 10},
                {"method": "email", "contact": "[email protected]", "wait_minutes": 30},
            ],
        }
        self.pending_escalations = {}

    def start_escalation(self, alert, chain_name="default"):
        """Begin escalation for a critical alert."""
        chain = self.chains.get(chain_name, self.chains["default"])
        escalation_id = f"{alert.source}_{alert.timestamp.isoformat()}"

        self.pending_escalations[escalation_id] = {
            "alert": alert,
            "chain": chain,
            "current_step": 0,
            "started_at": datetime.now(),
            "acknowledged": False,
        }

        # Send first notification immediately
        self._send_step(escalation_id)
        return escalation_id

    def acknowledge(self, escalation_id):
        """Mark an escalation as acknowledged — stop further steps."""
        if escalation_id in self.pending_escalations:
            self.pending_escalations[escalation_id]["acknowledged"] = True
            logger.info(f"Escalation acknowledged: {escalation_id}")

    def check_escalations(self):
        """Check if any pending escalations need the next step."""
        now = datetime.now()

        for esc_id, esc in self.pending_escalations.items():
            if esc["acknowledged"]:
                continue

            chain = esc["chain"]
            current = esc["current_step"]

            if current >= len(chain) - 1:
                continue

            next_step = chain[current + 1]
            elapsed = (now - esc["started_at"]).total_seconds() / 60

            if elapsed >= next_step["wait_minutes"]:
                esc["current_step"] += 1
                self._send_step(esc_id)

    def _send_step(self, escalation_id):
        """Send notification for the current escalation step."""
        esc = self.pending_escalations[escalation_id]
        step = esc["chain"][esc["current_step"]]
        alert = esc["alert"]

        logger.warning(
            f"Escalation step {esc['current_step'] + 1}/{len(esc['chain'])}: "
            f"{step['method']} for {alert.title}"
        )

# Wiring It Together

python
class NotificationSystem:
    """Complete notification system with routing, rate limiting, and digests."""

    def __init__(self):
        self.router = RateLimitedRouter()
        self.escalation = EscalationChain()

    def notify(self, title, message, severity, source, **context):
        """Send a notification through the system."""
        alert = Alert(
            title=title,
            message=message,
            severity=severity,
            source=source,
            context=context,
        )

        self.router.route(alert)

        if severity == Severity.CRITICAL:
            self.escalation.start_escalation(alert)

        return alert


# Usage in a pipeline
notifications = NotificationSystem()

try:
    result = run_pipeline()
    notifications.notify(
        title="Sales pipeline completed",
        message=f"Processed {result['rows']} orders",
        severity=Severity.SUCCESS,
        source="sales_pipeline",
        rows=result["rows"],
        duration=result["duration"],
    )
except Exception as e:
    notifications.notify(
        title="Sales pipeline failed",
        message=str(e),
        severity=Severity.CRITICAL,
        source="sales_pipeline",
        error_type=type(e).__name__,
    )

# What This Replaces

Old approach New approach
print("Pipeline done") Structured alerts with severity
All alerts to one Slack channel Routing by severity and source
200 unread messages Rate limiting + digests
Nobody notices critical failures Escalation chains
"I think it ran yesterday?" Success/failure notifications with context
Checking logs manually Alerts come to you

# Common Alert Design Mistakes

Mistake Why it fails Fix
Alerting on every success Alert fatigue — nothing seems important Log successes, alert on failures
No severity levels Cannot prioritise Use CRITICAL / WARNING / INFO / SUCCESS
Same channel for everything Important alerts get buried Route by severity
No rate limiting One broken pipeline floods channel Cap per-minute and per-hour
Alert without context "Pipeline failed" — which one? How? Include source, stage, error, timestamp
No escalation Nobody sees the 2 AM alert Escalation chains with increasing urgency

# Next Steps

Start with Slack notifications on pipeline failures — that alone is a significant improvement over checking logs. Add severity levels next, then rate limiting when alert volume grows.

The digest pattern is particularly valuable: collect all the "it worked fine" messages into one daily summary so your alert channels stay clean for things that actually need attention.

For building the pipelines that feed into this notification system, see How to Design Data Pipelines for Reliable Reporting. For making pipelines self-healing before they even need to alert, see How to Build Self-Healing Data Pipelines.

Automation services include building monitoring and alerting infrastructure for production pipeline systems.

Get in touch to discuss setting up notifications for your data systems.

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

python notification systemslack webhook pythonmulti-channel alerts pythonemail alerts python automationalert routing systempython alert escalationnotification rate limitingslack alerts data pipelinedigest notifications pythonalert fatigue automation