Pipeline SLA Freshness Monitor
Python SLA monitor that checks table freshness and row volume against configured thresholds, with Snowflake integration and Slack webhook alerting. Define SLAs per critical pipeline.
Data ArchitectureIntermediatePython
Code preview
140 linesReplace {{PLACEHOLDERS}} with your environment values, then deploy to your stack.
"""
PIPELINE SLA FRESHNESS MONITOR
Checks whether critical tables/pipelines met freshness SLAs and emits alerts.
Usage:
python pipeline_sla_monitor.py
python pipeline_sla_monitor.py --dry-run
Configure SLAS list below or load from YAML.
"""
from __future__ import annotations
import argparse
import os
from dataclasses import dataclass
from datetime import datetime, timezone
# ── CONFIGURATION: plug in your warehouse connection env vars ───────────────
WAREHOUSE_CONN = {
"account": os.getenv("SNOWFLAKE_ACCOUNT", "{{SNOWFLAKE_ACCOUNT}}"),
"user": os.getenv("SNOWFLAKE_USER", "{{SNOWFLAKE_USER}}"),
"password": os.getenv("SNOWFLAKE_PASSWORD", "{{SNOWFLAKE_PASSWORD}}"),
"warehouse": os.getenv("SNOWFLAKE_WAREHOUSE", "{{WAREHOUSE_NAME}}"),
"database": os.getenv("SNOWFLAKE_DATABASE", "{{DATABASE}}"),
"schema": os.getenv("SNOWFLAKE_SCHEMA", "{{SCHEMA}}"),
}
ALERT_WEBHOOK = os.getenv("SLA_ALERT_WEBHOOK", "{{SLACK_WEBHOOK_URL}}")
@dataclass
class PipelineSLA:
name: str
table: str
timestamp_column: str
sla_hours: int
owner: str
min_row_count: int = 1
# ── Define SLAs for your critical pipelines ───────────────────────────────────
SLAS: list[PipelineSLA] = [
PipelineSLA(
name="{{PIPELINE_1_NAME}}", # e.g. orders_daily
table="{{DATABASE}}.{{SCHEMA}}.fct_orders",
timestamp_column="_loaded_at",
sla_hours=int("{{SLA_HOURS_1}}"), # e.g. 6
owner="{{OWNER_TEAM}}",
min_row_count=int("{{MIN_ROWS_1}}"), # e.g. 10000
),
PipelineSLA(
name="{{PIPELINE_2_NAME}}",
table="{{DATABASE}}.{{SCHEMA}}.fct_customers",
timestamp_column="_loaded_at",
sla_hours=int("{{SLA_HOURS_2}}"),
owner="{{OWNER_TEAM}}",
min_row_count=int("{{MIN_ROWS_2}}"),
),
]
def get_connection():
"""Replace with your warehouse client (snowflake-connector-python, etc.)."""
import snowflake.connector # pip install snowflake-connector-python
return snowflake.connector.connect(**WAREHOUSE_CONN)
def check_sla(conn, sla: PipelineSLA) -> dict:
query = f"""
SELECT
COUNT(*) AS row_count,
MAX({sla.timestamp_column}) AS last_loaded_at,
DATEDIFF('hour', MAX({sla.timestamp_column}), CURRENT_TIMESTAMP()) AS hours_stale
FROM {sla.table}
"""
cursor = conn.cursor()
cursor.execute(query)
row_count, last_loaded_at, hours_stale = cursor.fetchone()
// ... download full template for remaining codeAbout this template
Python SLA monitor that checks table freshness and row volume against configured thresholds, with Snowflake integration and Slack webhook alerting. Define SLAs per critical pipeline.
slafreshnessmonitoringalerting
Reviews
SLA monitor was a quick win
Dropped the pipeline SLA script into our daily checks. We caught a late-arriving feed twice last month before downstream dashboards broke.
Ryan O'Brien · Data Platform Lead · InsureCore
Downloads67
Reviews1
Rating4.0
CreatedJul 2, 2026
UpdatedJul 2, 2026