How to Build a Notification System That Actually Gets Read

· 9 min read · Automation

Build a multi-channel notification system that routes alerts to Slack, email, and SMS based on severity — with rate limiting, digests, and escalation so people do not ignore your alerts.

How to Build a Notification System That Actually Gets Read

Your pipeline sends alerts. Nobody reads them. The Slack channel has 200 unread messages, all saying “Pipeline completed successfully.” When something actually breaks, the real alert is buried.

This is alert fatigue. And it is the reason most notification systems fail — not because they cannot send messages, but because they send too many of the wrong ones.

This guide builds a notification system that routes alerts by severity, rate-limits noise, digests low-priority updates, and escalates critical failures. By the end, your alerts will be read because they will be worth reading.

Who This Is For

  • Engineers whose Slack channels are flooded with meaningless pipeline alerts
  • On-call teams who need to know when something genuinely requires human attention
  • Vibe coders who built automations that run unattended and need smart alerting when things go wrong
  • Managers who want to know about problems before their customers or stakeholders do

You need basic Python knowledge. The patterns here work with any messaging system (Slack, email, Teams, PagerDuty) — the guide focuses on the routing and filtering logic.

The Notification Architecture

Events come in. The rules engine decides where they go. Rate limiting prevents flooding. Digests batch low-priority items into a single daily message.

What You Will Need

pip install requests schedule
  • requests — HTTP client for Slack and webhook calls
  • schedule — lightweight task scheduling for digests

Step 1: Define Alert Severity

Every alert should have a severity level. Without this, all alerts look the same and all get ignored.

from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime


class Severity(Enum):
    CRITICAL = "critical"   # Pipeline failed, data loss risk
    WARNING = "warning"     # Something unusual, might need attention
    INFO = "info"           # Status update, progress tracking
    SUCCESS = "success"     # Pipeline completed normally


@dataclass
class Alert:
    """A structured alert with all routing metadata."""

    title: str
    message: str
    severity: Severity
    source: str                          # Which pipeline or system
    timestamp: datetime = field(default_factory=datetime.now)
    context: dict = field(default_factory=dict)  # Extra data for debugging

    @property
    def emoji(self):
        return {
            Severity.CRITICAL: "[CRIT]",
            Severity.WARNING: "[WARN]",
            Severity.INFO: "[INFO]",
            Severity.SUCCESS: "[OK]",
        }[self.severity]

Creating Alerts

# From a pipeline
alert = Alert(
    title="Daily sales pipeline failed",
    message="Database connection refused after 3 retries",
    severity=Severity.CRITICAL,
    source="sales_pipeline",
    context={"stage": "extract", "retries": 3, "error": "ConnectionRefused"},
)

# Informational
alert = Alert(
    title="Pipeline completed",
    message="Processed 4,521 orders in 12.3 seconds",
    severity=Severity.SUCCESS,
    source="sales_pipeline",
    context={"rows": 4521, "duration_seconds": 12.3},
)

Step 2: Notification Channels

Slack Integration

import requests
import os
import logging

logger = logging.getLogger("notifications")

class SlackNotifier:
    """Send notifications to Slack channels via webhooks."""

    def __init__(self, webhooks=None):
        self.webhooks = webhooks or {
            "critical": os.environ.get("SLACK_WEBHOOK_CRITICAL"),
            "alerts": os.environ.get("SLACK_WEBHOOK_ALERTS"),
            "info": os.environ.get("SLACK_WEBHOOK_INFO"),
        }

    def send(self, alert, channel="alerts"):
        """Send an alert to a Slack channel."""
        webhook_url = self.webhooks.get(channel)
        if not webhook_url:
            logger.warning(f"No Slack webhook configured for channel: {channel}")
            return False

        payload = {
            "blocks": [
                {
                    "type": "header",
                    "text": {
                        "type": "plain_text",
                        "text": f"{alert.emoji} {alert.title}",
                    },
                },
                {
                    "type": "section",
                    "text": {
                        "type": "mrkdwn",
                        "text": (
                            f"*Source:* {alert.source}\n"
                            f"*Time:* {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}\n"
                            f"*Message:* {alert.message}"
                        ),
                    },
                },
            ],
        }

        # Add context fields if present
        if alert.context:
            fields = []
            for key, value in list(alert.context.items())[:8]:
                fields.append({
                    "type": "mrkdwn",
                    "text": f"*{key}:* {value}",
                })
            payload["blocks"].append({
                "type": "section",
                "fields": fields,
            })

        response = requests.post(webhook_url, json=payload, timeout=10)
        if response.status_code == 200:
            logger.info(f"Slack alert sent to #{channel}: {alert.title}")
            return True
        else:
            logger.error(f"Slack delivery failed: {response.status_code}")
            return False

Email Notifications

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

class EmailNotifier:
    """Send notification emails."""

    def __init__(self, smtp_host=None, smtp_port=587, username=None, password=None):
        self.smtp_host = smtp_host or os.environ.get("SMTP_HOST")
        self.smtp_port = smtp_port
        self.username = username or os.environ.get("SMTP_USERNAME")
        self.password = password or os.environ.get("SMTP_PASSWORD")

    def send(self, alert, recipients):
        """Send an alert via email."""
        msg = MIMEMultipart("alternative")
        msg["Subject"] = f"[{alert.severity.value.upper()}] {alert.title}"
        msg["From"] = self.username
        msg["To"] = ", ".join(recipients)

        # Plain text
        text = (
            f"Alert: {alert.title}\n"
            f"Severity: {alert.severity.value}\n"
            f"Source: {alert.source}\n"
            f"Time: {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
            f"{alert.message}\n"
        )

        # HTML
        context_rows = "".join(
            f"<tr><td><strong>{k}</strong></td><td>{v}</td></tr>"
            for k, v in alert.context.items()
        )
        html = f"""
        <h2>{alert.emoji} {alert.title}</h2>
        <table>
            <tr><td><strong>Severity</strong></td><td>{alert.severity.value}</td></tr>
            <tr><td><strong>Source</strong></td><td>{alert.source}</td></tr>
            <tr><td><strong>Time</strong></td><td>{alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}</td></tr>
            {context_rows}
        </table>
        <p>{alert.message}</p>
        """

        msg.attach(MIMEText(text, "plain"))
        msg.attach(MIMEText(html, "html"))

        with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
            server.starttls()
            server.login(self.username, self.password)
            server.send_message(msg)

        logger.info(f"Email sent to {len(recipients)} recipients: {alert.title}")

Step 3: Routing Rules

Define where each alert goes based on severity and source:

class NotificationRouter:
    """Route alerts to the right channels based on rules."""

    def __init__(self):
        self.slack = SlackNotifier()
        self.email = EmailNotifier()
        self.digest = DigestCollector()

        # Define routing rules
        self.rules = [
            {
                "match": lambda a: a.severity == Severity.CRITICAL,
                "actions": [
                    lambda a: self.slack.send(a, channel="critical"),
                    lambda a: self.email.send(a, recipients=["[email protected]"]),
                ],
            },
            {
                "match": lambda a: a.severity == Severity.WARNING,
                "actions": [
                    lambda a: self.slack.send(a, channel="alerts"),
                ],
            },
            {
                "match": lambda a: a.severity == Severity.INFO,
                "actions": [
                    lambda a: self.digest.add(a),
                ],
            },
            {
                "match": lambda a: a.severity == Severity.SUCCESS,
                "actions": [
                    lambda a: logger.info(f"Success: {a.source}{a.message}"),
                ],
            },
        ]

    def route(self, alert):
        """Route an alert through matching rules."""
        matched = False
        for rule in self.rules:
            if rule["match"](alert):
                for action in rule["actions"]:
                    try:
                        action(alert)
                    except Exception as e:
                        logger.error(f"Notification action failed: {e}")
                matched = True
                break

        if not matched:
            logger.warning(f"No routing rule matched alert: {alert.title}")

Step 4: Rate Limiting

Prevent flooding channels when multiple pipelines fail at once.

from collections import defaultdict
from datetime import datetime, timedelta

class RateLimiter:
    """Prevent notification flooding."""

    def __init__(self, max_per_minute=5, max_per_hour=30):
        self.max_per_minute = max_per_minute
        self.max_per_hour = max_per_hour
        self.history = defaultdict(list)

    def should_send(self, channel):
        """Check if sending to this channel is allowed."""
        now = datetime.now()

        # Clean old entries
        self.history[channel] = [
            t for t in self.history[channel]
            if t > now - timedelta(hours=1)
        ]

        # Check rate limits
        recent_minute = sum(
            1 for t in self.history[channel] if t > now - timedelta(minutes=1)
        )
        recent_hour = len(self.history[channel])

        if recent_minute >= self.max_per_minute:
            logger.warning(f"Rate limited ({channel}): {recent_minute}/min limit reached")
            return False

        if recent_hour >= self.max_per_hour:
            logger.warning(f"Rate limited ({channel}): {recent_hour}/hour limit reached")
            return False

        return True

    def record_send(self, channel):
        """Record that a notification was sent."""
        self.history[channel].append(datetime.now())


class RateLimitedRouter(NotificationRouter):
    """Router with rate limiting applied."""

    def __init__(self):
        super().__init__()
        self.limiter = RateLimiter(max_per_minute=5, max_per_hour=30)

    def route(self, alert):
        """Route with rate limiting — critical alerts bypass limits."""
        if alert.severity == Severity.CRITICAL:
            # Critical alerts always go through
            super().route(alert)
            return

        channel = alert.severity.value
        if self.limiter.should_send(channel):
            super().route(alert)
            self.limiter.record_send(channel)
        else:
            # Silently add to digest instead
            self.digest.add(alert)
            logger.info(f"Rate limited — alert added to digest: {alert.title}")

Step 5: Daily Digests

Batch low-priority updates into a single daily summary.

import json

class DigestCollector:
    """Collect alerts for daily digest delivery."""

    def __init__(self, digest_file="digest_buffer.json"):
        self.digest_file = digest_file
        self._load()

    def _load(self):
        """Load existing digest buffer."""
        try:
            with open(self.digest_file, "r") as f:
                self.buffer = json.load(f)
        except (FileNotFoundError, json.JSONDecodeError):
            self.buffer = []

    def _save(self):
        """Persist digest buffer to disk."""
        with open(self.digest_file, "w") as f:
            json.dump(self.buffer, f, default=str)

    def add(self, alert):
        """Add an alert to the digest buffer."""
        self.buffer.append({
            "title": alert.title,
            "message": alert.message,
            "severity": alert.severity.value,
            "source": alert.source,
            "timestamp": alert.timestamp.isoformat(),
        })
        self._save()

    def build_digest(self):
        """Build a formatted digest from buffered alerts."""
        if not self.buffer:
            return None

        # Group by source
        by_source = defaultdict(list)
        for item in self.buffer:
            by_source[item["source"]].append(item)

        lines = [f"Daily Pipeline Digest -- {datetime.now().strftime('%Y-%m-%d')}\n"]

        for source, items in by_source.items():
            lines.append(f"\n*{source}* ({len(items)} events)")
            for item in items[-5:]:  # Last 5 per source
                severity_label = {"critical": "[CRIT]", "warning": "[WARN]", "info": "[INFO]", "success": "[OK]"}
                label = severity_label.get(item["severity"], "[--]")
                lines.append(f"  {label} {item['title']}")

            if len(items) > 5:
                lines.append(f"  _...and {len(items) - 5} more_")

        return "\n".join(lines)

    def send_and_clear(self, slack_notifier, channel="info"):
        """Send the digest and clear the buffer."""
        digest_text = self.build_digest()
        if not digest_text:
            logger.info("No items in digest — skipping")
            return

        webhook_url = slack_notifier.webhooks.get(channel)
        if webhook_url:
            requests.post(webhook_url, json={"text": digest_text}, timeout=10)

        item_count = len(self.buffer)
        self.buffer = []
        self._save()
        logger.info(f"Digest sent: {item_count} items")

Scheduling the Digest

import schedule
import time

def run_digest():
    """Send the daily digest."""
    digest = DigestCollector()
    slack = SlackNotifier()
    digest.send_and_clear(slack, channel="info")

# Send digest every day at 8 AM
schedule.every().day.at("08:00").do(run_digest)

while True:
    schedule.run_pending()
    time.sleep(60)

Step 6: Escalation Chains

If a critical alert goes unacknowledged, escalate to the next person.

class EscalationChain:
    """Escalate alerts through a chain of contacts."""

    def __init__(self):
        self.chains = {
            "default": [
                {"method": "slack", "channel": "critical", "wait_minutes": 0},
                {"method": "email", "contact": "[email protected]", "wait_minutes": 10},
                {"method": "email", "contact": "[email protected]", "wait_minutes": 30},
            ],
        }
        self.pending_escalations = {}

    def start_escalation(self, alert, chain_name="default"):
        """Begin escalation for a critical alert."""
        chain = self.chains.get(chain_name, self.chains["default"])
        escalation_id = f"{alert.source}_{alert.timestamp.isoformat()}"

        self.pending_escalations[escalation_id] = {
            "alert": alert,
            "chain": chain,
            "current_step": 0,
            "started_at": datetime.now(),
            "acknowledged": False,
        }

        # Send first notification immediately
        self._send_step(escalation_id)
        return escalation_id

    def acknowledge(self, escalation_id):
        """Mark an escalation as acknowledged — stop further steps."""
        if escalation_id in self.pending_escalations:
            self.pending_escalations[escalation_id]["acknowledged"] = True
            logger.info(f"Escalation acknowledged: {escalation_id}")

    def check_escalations(self):
        """Check if any pending escalations need the next step."""
        now = datetime.now()

        for esc_id, esc in self.pending_escalations.items():
            if esc["acknowledged"]:
                continue

            chain = esc["chain"]
            current = esc["current_step"]

            if current >= len(chain) - 1:
                continue

            next_step = chain[current + 1]
            elapsed = (now - esc["started_at"]).total_seconds() / 60

            if elapsed >= next_step["wait_minutes"]:
                esc["current_step"] += 1
                self._send_step(esc_id)

    def _send_step(self, escalation_id):
        """Send notification for the current escalation step."""
        esc = self.pending_escalations[escalation_id]
        step = esc["chain"][esc["current_step"]]
        alert = esc["alert"]

        logger.warning(
            f"Escalation step {esc['current_step'] + 1}/{len(esc['chain'])}: "
            f"{step['method']} for {alert.title}"
        )

Wiring It Together

class NotificationSystem:
    """Complete notification system with routing, rate limiting, and digests."""

    def __init__(self):
        self.router = RateLimitedRouter()
        self.escalation = EscalationChain()

    def notify(self, title, message, severity, source, **context):
        """Send a notification through the system."""
        alert = Alert(
            title=title,
            message=message,
            severity=severity,
            source=source,
            context=context,
        )

        self.router.route(alert)

        if severity == Severity.CRITICAL:
            self.escalation.start_escalation(alert)

        return alert


# Usage in a pipeline
notifications = NotificationSystem()

try:
    result = run_pipeline()
    notifications.notify(
        title="Sales pipeline completed",
        message=f"Processed {result['rows']} orders",
        severity=Severity.SUCCESS,
        source="sales_pipeline",
        rows=result["rows"],
        duration=result["duration"],
    )
except Exception as e:
    notifications.notify(
        title="Sales pipeline failed",
        message=str(e),
        severity=Severity.CRITICAL,
        source="sales_pipeline",
        error_type=type(e).__name__,
    )

What This Replaces

Old approachNew approach
print("Pipeline done")Structured alerts with severity
All alerts to one Slack channelRouting by severity and source
200 unread messagesRate limiting + digests
Nobody notices critical failuresEscalation chains
”I think it ran yesterday?”Success/failure notifications with context
Checking logs manuallyAlerts come to you

Common Alert Design Mistakes

MistakeWhy it failsFix
Alerting on every successAlert fatigue — nothing seems importantLog successes, alert on failures
No severity levelsCannot prioritiseUse CRITICAL / WARNING / INFO / SUCCESS
Same channel for everythingImportant alerts get buriedRoute by severity
No rate limitingOne broken pipeline floods channelCap per-minute and per-hour
Alert without context”Pipeline failed” — which one? How?Include source, stage, error, timestamp
No escalationNobody sees the 2 AM alertEscalation chains with increasing urgency

Next Steps

Start with Slack notifications on pipeline failures — that alone is a significant improvement over checking logs. Add severity levels next, then rate limiting when alert volume grows.

The digest pattern is particularly valuable: collect all the “it worked fine” messages into one daily summary so your alert channels stay clean for things that actually need attention.

For building the pipelines that feed into this notification system, see How to Design Data Pipelines for Reliable Reporting. For making pipelines self-healing before they even need to alert, see How to Build Self-Healing Data Pipelines.

Automation services include building monitoring and alerting infrastructure for production pipeline systems.

Get in touch to discuss setting up notifications for your data systems.

python notification system slack webhook python multi-channel alerts python email alerts python automation alert routing system python alert escalation notification rate limiting slack alerts data pipeline digest notifications python alert fatigue automation

Enjoyed this article?

Get notified when I publish new articles on automation, ecommerce, and data engineering.

Get in touch

Related Articles