Pipeline SLA Freshness Monitor

Python SLA monitor that checks table freshness and row volume against configured thresholds, with Snowflake integration and Slack webhook alerting. Define SLAs per critical pipeline.

Data ArchitectureIntermediatePython

Code preview

140 lines

Replace {{PLACEHOLDERS}} with your environment values, then deploy to your stack.

"""
PIPELINE SLA FRESHNESS MONITOR
Checks whether critical tables/pipelines met freshness SLAs and emits alerts.

Usage:
  python pipeline_sla_monitor.py
  python pipeline_sla_monitor.py --dry-run

Configure SLAS list below or load from YAML.
"""

from __future__ import annotations

import argparse
import os
from dataclasses import dataclass
from datetime import datetime, timezone

# ── CONFIGURATION: plug in your warehouse connection env vars ───────────────
WAREHOUSE_CONN = {
    "account": os.getenv("SNOWFLAKE_ACCOUNT", "{{SNOWFLAKE_ACCOUNT}}"),
    "user": os.getenv("SNOWFLAKE_USER", "{{SNOWFLAKE_USER}}"),
    "password": os.getenv("SNOWFLAKE_PASSWORD", "{{SNOWFLAKE_PASSWORD}}"),
    "warehouse": os.getenv("SNOWFLAKE_WAREHOUSE", "{{WAREHOUSE_NAME}}"),
    "database": os.getenv("SNOWFLAKE_DATABASE", "{{DATABASE}}"),
    "schema": os.getenv("SNOWFLAKE_SCHEMA", "{{SCHEMA}}"),
}

ALERT_WEBHOOK = os.getenv("SLA_ALERT_WEBHOOK", "{{SLACK_WEBHOOK_URL}}")


@dataclass
class PipelineSLA:
    name: str
    table: str
    timestamp_column: str
    sla_hours: int
    owner: str
    min_row_count: int = 1


# ── Define SLAs for your critical pipelines ───────────────────────────────────
SLAS: list[PipelineSLA] = [
    PipelineSLA(
        name="{{PIPELINE_1_NAME}}",           # e.g. orders_daily
        table="{{DATABASE}}.{{SCHEMA}}.fct_orders",
        timestamp_column="_loaded_at",
        sla_hours=int("{{SLA_HOURS_1}}"),     # e.g. 6
        owner="{{OWNER_TEAM}}",
        min_row_count=int("{{MIN_ROWS_1}}"),  # e.g. 10000
    ),
    PipelineSLA(
        name="{{PIPELINE_2_NAME}}",
        table="{{DATABASE}}.{{SCHEMA}}.fct_customers",
        timestamp_column="_loaded_at",
        sla_hours=int("{{SLA_HOURS_2}}"),
        owner="{{OWNER_TEAM}}",
        min_row_count=int("{{MIN_ROWS_2}}"),
    ),
]


def get_connection():
    """Replace with your warehouse client (snowflake-connector-python, etc.)."""
    import snowflake.connector  # pip install snowflake-connector-python

    return snowflake.connector.connect(**WAREHOUSE_CONN)


def check_sla(conn, sla: PipelineSLA) -> dict:
    query = f"""
        SELECT
            COUNT(*) AS row_count,
            MAX({sla.timestamp_column}) AS last_loaded_at,
            DATEDIFF('hour', MAX({sla.timestamp_column}), CURRENT_TIMESTAMP()) AS hours_stale
        FROM {sla.table}
    """
    cursor = conn.cursor()
    cursor.execute(query)
    row_count, last_loaded_at, hours_stale = cursor.fetchone()

// ... download full template for remaining code

About this template

Python SLA monitor that checks table freshness and row volume against configured thresholds, with Snowflake integration and Slack webhook alerting. Define SLAs per critical pipeline.

slafreshnessmonitoringalerting

Reviews

SLA monitor was a quick win

Dropped the pipeline SLA script into our daily checks. We caught a late-arriving feed twice last month before downstream dashboards broke.

Ryan O'Brien · Data Platform Lead · InsureCore

Downloads67
Reviews1
Rating4.0
CreatedJul 2, 2026
UpdatedJul 2, 2026
Login to review