Airflow Production ETL DAG
Production-grade Airflow DAG with task groups, SQL operators, row-count validation, SLA metrics, retries, and alerting hooks. Plug in connection IDs, cron schedule, and merge SQL for your entity.
AirflowAdvancedPython
Code preview
118 linesReplace {{PLACEHOLDERS}} with your environment values, then deploy to your stack.
"""
AIRFLOW PRODUCTION ETL DAG TEMPLATE
Replace {{PLACEHOLDERS}} and deploy to your Airflow DAGs folder.
Required Airflow Variables:
- {{SOURCE_CONN_ID}} : connection to source system
- {{TARGET_SCHEMA}} : destination schema
"""
from __future__ import annotations
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.utils.task_group import TaskGroup
# ── CONFIGURATION: plug in your values ───────────────────────────────────────
DAG_ID = "{{DAG_ID}}" # e.g. fct_orders_daily
OWNER = "{{OWNER_TEAM}}" # e.g. data-platform
SCHEDULE = "{{CRON_SCHEDULE}}" # e.g. "0 6 * * *"
SOURCE_CONN = "{{SOURCE_CONN_ID}}" # e.g. postgres_analytics
TARGET_CONN = "{{TARGET_CONN_ID}}" # e.g. snowflake_default
TARGET_SCHEMA = "{{TARGET_SCHEMA}}" # e.g. analytics.marts
ENTITY = "{{ENTITY_NAME}}" # e.g. orders
SLA_HOURS = int("{{SLA_HOURS}}") # e.g. 8
DEFAULT_ARGS = {
"owner": OWNER,
"depends_on_past": False,
"email": ["{{ALERT_EMAIL}}"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5),
"execution_timeout": timedelta(hours=2),
}
def validate_row_counts(**context):
"""Post-load validation - customize SQL for your entity."""
import logging
logical_date = context["ds"]
logging.info("Validating load for %s on %s", ENTITY, logical_date)
# Raise exception if validation fails to block downstream
min_rows = int(Variable.get(f"{DAG_ID}_min_rows", default_var=1000))
# Implement: hook.get_records(...) and compare to min_rows
logging.info("Validation passed: row count >= %s", min_rows)
def send_sla_metric(**context):
"""Emit SLA metric to your observability stack."""
import logging
logging.info("SLA check: pipeline %s completed within %s hours", DAG_ID, SLA_HOURS)
with DAG(
dag_id=DAG_ID,
default_args=DEFAULT_ARGS,
description=f"Production ETL for {ENTITY}",
schedule=SCHEDULE,
start_date=datetime(2026, 1, 1),
catchup=False,
max_active_runs=1,
tags=["production", "{{DOMAIN}}", ENTITY],
doc_md=f"""
## {ENTITY} Daily Pipeline
- **Owner:** {OWNER}
- **SLA:** data available within {SLA_HOURS} hours of schedule
- **Source:** {SOURCE_CONN}
- **Target:** {TARGET_SCHEMA}.fct_{ENTITY}
""",
) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
// ... download full template for remaining codeAbout this template
Production-grade Airflow DAG with task groups, SQL operators, row-count validation, SLA metrics, retries, and alerting hooks. Plug in connection IDs, cron schedule, and merge SQL for your entity.
airflowdagetlorchestrationproduction
Reviews
Our on-call runbook got simpler
We adapted the Airflow DAG template for nightly inventory loads. Retries, SLA checks, and row-count validation were already wired— we just swapped in our SQL.
Marcus Chen · Data Engineer · RetailFlow
Downloads89
Reviews1
Rating5.0
CreatedJul 2, 2026
UpdatedJul 2, 2026