How to Build a Notification System That Actually Gets Read
Build a multi-channel notification system that routes alerts to Slack, email, and SMS based on severity — with rate limiting, digests, and escalation so people do not ignore your alerts.

Your pipeline sends alerts. Nobody reads them. The Slack channel has 200 unread messages, all saying "Pipeline completed successfully." When something actually breaks, the real alert is buried.
This is alert fatigue. And it is the reason most notification systems fail — not because they cannot send messages, but because they send too many of the wrong ones.
This guide builds a notification system that routes alerts by severity, rate-limits noise, digests low-priority updates, and escalates critical failures. By the end, your alerts will be read because they will be worth reading.
# The Notification Architecture
flowchart TD E["Event\n(pipeline output)"] --> R["Rules Engine\n(severity + routing)"] R -->|Critical| S["Slack #critical\n+ SMS"] R -->|Warning| SL["Slack #alerts"] R -->|Info| D["Digest\n(daily summary)"] R -->|Success| L["Log Only"] S --> RL["Rate Limiter"] SL --> RL RL --> Send["Deliver"]
Events come in. The rules engine decides where they go. Rate limiting prevents flooding. Digests batch low-priority items into a single daily message.
# What You Will Need
pip install requests schedule
- requests — HTTP client for Slack and webhook calls
- schedule — lightweight task scheduling for digests
# Step 1: Define Alert Severity
Every alert should have a severity level. Without this, all alerts look the same and all get ignored.
from enum import Enum
from dataclasses import dataclass, field
from datetime import datetime
class Severity(Enum):
CRITICAL = "critical" # Pipeline failed, data loss risk
WARNING = "warning" # Something unusual, might need attention
INFO = "info" # Status update, progress tracking
SUCCESS = "success" # Pipeline completed normally
@dataclass
class Alert:
"""A structured alert with all routing metadata."""
title: str
message: str
severity: Severity
source: str # Which pipeline or system
timestamp: datetime = field(default_factory=datetime.now)
context: dict = field(default_factory=dict) # Extra data for debugging
@property
def emoji(self):
return {
Severity.CRITICAL: "🔴",
Severity.WARNING: "🟡",
Severity.INFO: "🔵",
Severity.SUCCESS: "🟢",
}[self.severity]
# Creating Alerts
# From a pipeline
alert = Alert(
title="Daily sales pipeline failed",
message="Database connection refused after 3 retries",
severity=Severity.CRITICAL,
source="sales_pipeline",
context={"stage": "extract", "retries": 3, "error": "ConnectionRefused"},
)
# Informational
alert = Alert(
title="Pipeline completed",
message="Processed 4,521 orders in 12.3 seconds",
severity=Severity.SUCCESS,
source="sales_pipeline",
context={"rows": 4521, "duration_seconds": 12.3},
)
# Step 2: Notification Channels
# Slack Integration
import requests
import os
import logging
logger = logging.getLogger("notifications")
class SlackNotifier:
"""Send notifications to Slack channels via webhooks."""
def __init__(self, webhooks=None):
self.webhooks = webhooks or {
"critical": os.environ.get("SLACK_WEBHOOK_CRITICAL"),
"alerts": os.environ.get("SLACK_WEBHOOK_ALERTS"),
"info": os.environ.get("SLACK_WEBHOOK_INFO"),
}
def send(self, alert, channel="alerts"):
"""Send an alert to a Slack channel."""
webhook_url = self.webhooks.get(channel)
if not webhook_url:
logger.warning(f"No Slack webhook configured for channel: {channel}")
return False
payload = {
"blocks": [
{
"type": "header",
"text": {
"type": "plain_text",
"text": f"{alert.emoji} {alert.title}",
},
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": (
f"*Source:* {alert.source}\n"
f"*Time:* {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}\n"
f"*Message:* {alert.message}"
),
},
},
],
}
# Add context fields if present
if alert.context:
fields = []
for key, value in list(alert.context.items())[:8]:
fields.append({
"type": "mrkdwn",
"text": f"*{key}:* {value}",
})
payload["blocks"].append({
"type": "section",
"fields": fields,
})
response = requests.post(webhook_url, json=payload, timeout=10)
if response.status_code == 200:
logger.info(f"Slack alert sent to #{channel}: {alert.title}")
return True
else:
logger.error(f"Slack delivery failed: {response.status_code}")
return False
# Email Notifications
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class EmailNotifier:
"""Send notification emails."""
def __init__(self, smtp_host=None, smtp_port=587, username=None, password=None):
self.smtp_host = smtp_host or os.environ.get("SMTP_HOST")
self.smtp_port = smtp_port
self.username = username or os.environ.get("SMTP_USERNAME")
self.password = password or os.environ.get("SMTP_PASSWORD")
def send(self, alert, recipients):
"""Send an alert via email."""
msg = MIMEMultipart("alternative")
msg["Subject"] = f"[{alert.severity.value.upper()}] {alert.title}"
msg["From"] = self.username
msg["To"] = ", ".join(recipients)
# Plain text
text = (
f"Alert: {alert.title}\n"
f"Severity: {alert.severity.value}\n"
f"Source: {alert.source}\n"
f"Time: {alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}\n\n"
f"{alert.message}\n"
)
# HTML
context_rows = "".join(
f"<tr><td><strong>{k}</strong></td><td>{v}</td></tr>"
for k, v in alert.context.items()
)
html = f"""
<h2>{alert.emoji} {alert.title}</h2>
<table>
<tr><td><strong>Severity</strong></td><td>{alert.severity.value}</td></tr>
<tr><td><strong>Source</strong></td><td>{alert.source}</td></tr>
<tr><td><strong>Time</strong></td><td>{alert.timestamp.strftime('%Y-%m-%d %H:%M:%S')}</td></tr>
{context_rows}
</table>
<p>{alert.message}</p>
"""
msg.attach(MIMEText(text, "plain"))
msg.attach(MIMEText(html, "html"))
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)
logger.info(f"Email sent to {len(recipients)} recipients: {alert.title}")
# Step 3: Routing Rules
Define where each alert goes based on severity and source:
flowchart LR A["Alert"] --> RE["Rules\nEngine"] RE -->|"CRITICAL\n(any source)"| C["Slack #critical\n+ Email\n+ SMS"] RE -->|"WARNING\n(production)"| W["Slack #alerts\n+ Email"] RE -->|"WARNING\n(dev)"| WD["Slack #dev"] RE -->|"INFO"| I["Daily Digest"] RE -->|"SUCCESS"| S["Log Only"]
class NotificationRouter:
"""Route alerts to the right channels based on rules."""
def __init__(self):
self.slack = SlackNotifier()
self.email = EmailNotifier()
self.digest = DigestCollector()
# Define routing rules
self.rules = [
{
"match": lambda a: a.severity == Severity.CRITICAL,
"actions": [
lambda a: self.slack.send(a, channel="critical"),
lambda a: self.email.send(a, recipients=["[email protected]"]),
],
},
{
"match": lambda a: a.severity == Severity.WARNING,
"actions": [
lambda a: self.slack.send(a, channel="alerts"),
],
},
{
"match": lambda a: a.severity == Severity.INFO,
"actions": [
lambda a: self.digest.add(a),
],
},
{
"match": lambda a: a.severity == Severity.SUCCESS,
"actions": [
lambda a: logger.info(f"Success: {a.source} — {a.message}"),
],
},
]
def route(self, alert):
"""Route an alert through matching rules."""
matched = False
for rule in self.rules:
if rule["match"](alert):
for action in rule["actions"]:
try:
action(alert)
except Exception as e:
logger.error(f"Notification action failed: {e}")
matched = True
break
if not matched:
logger.warning(f"No routing rule matched alert: {alert.title}")
# Step 4: Rate Limiting
Prevent flooding channels when multiple pipelines fail at once.
from collections import defaultdict
from datetime import datetime, timedelta
class RateLimiter:
"""Prevent notification flooding."""
def __init__(self, max_per_minute=5, max_per_hour=30):
self.max_per_minute = max_per_minute
self.max_per_hour = max_per_hour
self.history = defaultdict(list)
def should_send(self, channel):
"""Check if sending to this channel is allowed."""
now = datetime.now()
# Clean old entries
self.history[channel] = [
t for t in self.history[channel]
if t > now - timedelta(hours=1)
]
# Check rate limits
recent_minute = sum(
1 for t in self.history[channel] if t > now - timedelta(minutes=1)
)
recent_hour = len(self.history[channel])
if recent_minute >= self.max_per_minute:
logger.warning(f"Rate limited ({channel}): {recent_minute}/min limit reached")
return False
if recent_hour >= self.max_per_hour:
logger.warning(f"Rate limited ({channel}): {recent_hour}/hour limit reached")
return False
return True
def record_send(self, channel):
"""Record that a notification was sent."""
self.history[channel].append(datetime.now())
class RateLimitedRouter(NotificationRouter):
"""Router with rate limiting applied."""
def __init__(self):
super().__init__()
self.limiter = RateLimiter(max_per_minute=5, max_per_hour=30)
def route(self, alert):
"""Route with rate limiting — critical alerts bypass limits."""
if alert.severity == Severity.CRITICAL:
# Critical alerts always go through
super().route(alert)
return
channel = alert.severity.value
if self.limiter.should_send(channel):
super().route(alert)
self.limiter.record_send(channel)
else:
# Silently add to digest instead
self.digest.add(alert)
logger.info(f"Rate limited — alert added to digest: {alert.title}")
# Step 5: Daily Digests
Batch low-priority updates into a single daily summary.
import json
class DigestCollector:
"""Collect alerts for daily digest delivery."""
def __init__(self, digest_file="digest_buffer.json"):
self.digest_file = digest_file
self._load()
def _load(self):
"""Load existing digest buffer."""
try:
with open(self.digest_file, "r") as f:
self.buffer = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
self.buffer = []
def _save(self):
"""Persist digest buffer to disk."""
with open(self.digest_file, "w") as f:
json.dump(self.buffer, f, default=str)
def add(self, alert):
"""Add an alert to the digest buffer."""
self.buffer.append({
"title": alert.title,
"message": alert.message,
"severity": alert.severity.value,
"source": alert.source,
"timestamp": alert.timestamp.isoformat(),
})
self._save()
def build_digest(self):
"""Build a formatted digest from buffered alerts."""
if not self.buffer:
return None
# Group by source
by_source = defaultdict(list)
for item in self.buffer:
by_source[item["source"]].append(item)
lines = [f"📋 *Daily Pipeline Digest* — {datetime.now().strftime('%Y-%m-%d')}\n"]
for source, items in by_source.items():
lines.append(f"\n*{source}* ({len(items)} events)")
for item in items[-5:]: # Last 5 per source
severity_emoji = {"critical": "🔴", "warning": "🟡", "info": "🔵", "success": "🟢"}
emoji = severity_emoji.get(item["severity"], "⚪")
lines.append(f" {emoji} {item['title']}")
if len(items) > 5:
lines.append(f" _...and {len(items) - 5} more_")
return "\n".join(lines)
def send_and_clear(self, slack_notifier, channel="info"):
"""Send the digest and clear the buffer."""
digest_text = self.build_digest()
if not digest_text:
logger.info("No items in digest — skipping")
return
webhook_url = slack_notifier.webhooks.get(channel)
if webhook_url:
requests.post(webhook_url, json={"text": digest_text}, timeout=10)
item_count = len(self.buffer)
self.buffer = []
self._save()
logger.info(f"Digest sent: {item_count} items")
# Scheduling the Digest
import schedule
import time
def run_digest():
"""Send the daily digest."""
digest = DigestCollector()
slack = SlackNotifier()
digest.send_and_clear(slack, channel="info")
# Send digest every day at 8 AM
schedule.every().day.at("08:00").do(run_digest)
while True:
schedule.run_pending()
time.sleep(60)
# Step 6: Escalation Chains
If a critical alert goes unacknowledged, escalate to the next person.
class EscalationChain:
"""Escalate alerts through a chain of contacts."""
def __init__(self):
self.chains = {
"default": [
{"method": "slack", "channel": "critical", "wait_minutes": 0},
{"method": "email", "contact": "[email protected]", "wait_minutes": 10},
{"method": "email", "contact": "[email protected]", "wait_minutes": 30},
],
}
self.pending_escalations = {}
def start_escalation(self, alert, chain_name="default"):
"""Begin escalation for a critical alert."""
chain = self.chains.get(chain_name, self.chains["default"])
escalation_id = f"{alert.source}_{alert.timestamp.isoformat()}"
self.pending_escalations[escalation_id] = {
"alert": alert,
"chain": chain,
"current_step": 0,
"started_at": datetime.now(),
"acknowledged": False,
}
# Send first notification immediately
self._send_step(escalation_id)
return escalation_id
def acknowledge(self, escalation_id):
"""Mark an escalation as acknowledged — stop further steps."""
if escalation_id in self.pending_escalations:
self.pending_escalations[escalation_id]["acknowledged"] = True
logger.info(f"Escalation acknowledged: {escalation_id}")
def check_escalations(self):
"""Check if any pending escalations need the next step."""
now = datetime.now()
for esc_id, esc in self.pending_escalations.items():
if esc["acknowledged"]:
continue
chain = esc["chain"]
current = esc["current_step"]
if current >= len(chain) - 1:
continue
next_step = chain[current + 1]
elapsed = (now - esc["started_at"]).total_seconds() / 60
if elapsed >= next_step["wait_minutes"]:
esc["current_step"] += 1
self._send_step(esc_id)
def _send_step(self, escalation_id):
"""Send notification for the current escalation step."""
esc = self.pending_escalations[escalation_id]
step = esc["chain"][esc["current_step"]]
alert = esc["alert"]
logger.warning(
f"Escalation step {esc['current_step'] + 1}/{len(esc['chain'])}: "
f"{step['method']} for {alert.title}"
)
# Wiring It Together
class NotificationSystem:
"""Complete notification system with routing, rate limiting, and digests."""
def __init__(self):
self.router = RateLimitedRouter()
self.escalation = EscalationChain()
def notify(self, title, message, severity, source, **context):
"""Send a notification through the system."""
alert = Alert(
title=title,
message=message,
severity=severity,
source=source,
context=context,
)
self.router.route(alert)
if severity == Severity.CRITICAL:
self.escalation.start_escalation(alert)
return alert
# Usage in a pipeline
notifications = NotificationSystem()
try:
result = run_pipeline()
notifications.notify(
title="Sales pipeline completed",
message=f"Processed {result['rows']} orders",
severity=Severity.SUCCESS,
source="sales_pipeline",
rows=result["rows"],
duration=result["duration"],
)
except Exception as e:
notifications.notify(
title="Sales pipeline failed",
message=str(e),
severity=Severity.CRITICAL,
source="sales_pipeline",
error_type=type(e).__name__,
)
# What This Replaces
| Old approach | New approach |
|---|---|
print("Pipeline done") |
Structured alerts with severity |
| All alerts to one Slack channel | Routing by severity and source |
| 200 unread messages | Rate limiting + digests |
| Nobody notices critical failures | Escalation chains |
| "I think it ran yesterday?" | Success/failure notifications with context |
| Checking logs manually | Alerts come to you |
# Common Alert Design Mistakes
| Mistake | Why it fails | Fix |
|---|---|---|
| Alerting on every success | Alert fatigue — nothing seems important | Log successes, alert on failures |
| No severity levels | Cannot prioritise | Use CRITICAL / WARNING / INFO / SUCCESS |
| Same channel for everything | Important alerts get buried | Route by severity |
| No rate limiting | One broken pipeline floods channel | Cap per-minute and per-hour |
| Alert without context | "Pipeline failed" — which one? How? | Include source, stage, error, timestamp |
| No escalation | Nobody sees the 2 AM alert | Escalation chains with increasing urgency |
# Next Steps
Start with Slack notifications on pipeline failures — that alone is a significant improvement over checking logs. Add severity levels next, then rate limiting when alert volume grows.
The digest pattern is particularly valuable: collect all the "it worked fine" messages into one daily summary so your alert channels stay clean for things that actually need attention.
For building the pipelines that feed into this notification system, see How to Design Data Pipelines for Reliable Reporting. For making pipelines self-healing before they even need to alert, see How to Build Self-Healing Data Pipelines.
Automation services include building monitoring and alerting infrastructure for production pipeline systems.
Get in touch to discuss setting up notifications for your data systems.
Enjoyed this article?
Get notified when I publish new articles on automation, ecommerce, and data engineering.