Airflow Production ETL DAG

Production-grade Airflow DAG with task groups, SQL operators, row-count validation, SLA metrics, retries, and alerting hooks. Plug in connection IDs, cron schedule, and merge SQL for your entity.

AirflowAdvancedPython

Code preview

118 lines

Replace {{PLACEHOLDERS}} with your environment values, then deploy to your stack.

"""
AIRFLOW PRODUCTION ETL DAG TEMPLATE
Replace {{PLACEHOLDERS}} and deploy to your Airflow DAGs folder.

Required Airflow Variables:
  - {{SOURCE_CONN_ID}}  : connection to source system
  - {{TARGET_SCHEMA}}   : destination schema
"""

from __future__ import annotations

from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import Variable
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.utils.task_group import TaskGroup

# ── CONFIGURATION: plug in your values ───────────────────────────────────────
DAG_ID = "{{DAG_ID}}"                          # e.g. fct_orders_daily
OWNER = "{{OWNER_TEAM}}"                       # e.g. data-platform
SCHEDULE = "{{CRON_SCHEDULE}}"                 # e.g. "0 6 * * *"
SOURCE_CONN = "{{SOURCE_CONN_ID}}"             # e.g. postgres_analytics
TARGET_CONN = "{{TARGET_CONN_ID}}"             # e.g. snowflake_default
TARGET_SCHEMA = "{{TARGET_SCHEMA}}"            # e.g. analytics.marts
ENTITY = "{{ENTITY_NAME}}"                     # e.g. orders
SLA_HOURS = int("{{SLA_HOURS}}")               # e.g. 8

DEFAULT_ARGS = {
    "owner": OWNER,
    "depends_on_past": False,
    "email": ["{{ALERT_EMAIL}}"],
    "email_on_failure": True,
    "email_on_retry": False,
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "execution_timeout": timedelta(hours=2),
}


def validate_row_counts(**context):
    """Post-load validation - customize SQL for your entity."""
    import logging
    logical_date = context["ds"]
    logging.info("Validating load for %s on %s", ENTITY, logical_date)
    # Raise exception if validation fails to block downstream
    min_rows = int(Variable.get(f"{DAG_ID}_min_rows", default_var=1000))
    # Implement: hook.get_records(...) and compare to min_rows
    logging.info("Validation passed: row count >= %s", min_rows)


def send_sla_metric(**context):
    """Emit SLA metric to your observability stack."""
    import logging
    logging.info("SLA check: pipeline %s completed within %s hours", DAG_ID, SLA_HOURS)


with DAG(
    dag_id=DAG_ID,
    default_args=DEFAULT_ARGS,
    description=f"Production ETL for {ENTITY}",
    schedule=SCHEDULE,
    start_date=datetime(2026, 1, 1),
    catchup=False,
    max_active_runs=1,
    tags=["production", "{{DOMAIN}}", ENTITY],
    doc_md=f"""
    ## {ENTITY} Daily Pipeline
    - **Owner:** {OWNER}
    - **SLA:** data available within {SLA_HOURS} hours of schedule
    - **Source:** {SOURCE_CONN}
    - **Target:** {TARGET_SCHEMA}.fct_{ENTITY}
    """,
) as dag:

    start = EmptyOperator(task_id="start")
    end = EmptyOperator(task_id="end")


// ... download full template for remaining code

About this template

Production-grade Airflow DAG with task groups, SQL operators, row-count validation, SLA metrics, retries, and alerting hooks. Plug in connection IDs, cron schedule, and merge SQL for your entity.

airflowdagetlorchestrationproduction

Reviews

Our on-call runbook got simpler

We adapted the Airflow DAG template for nightly inventory loads. Retries, SLA checks, and row-count validation were already wired— we just swapped in our SQL.

Marcus Chen · Data Engineer · RetailFlow

Downloads89
Reviews1
Rating5.0
CreatedJul 2, 2026
UpdatedJul 2, 2026
Login to review