AWS Glue Incremental Lake ETL

PySpark Glue job for incremental raw-to-curated lake ingestion with schema validation, quarantine routing, deduplication, and catalog registration. Configure S3 paths and partition columns.

AWS GlueIntermediatePython

Code preview

115 lines

Replace {{PLACEHOLDERS}} with your environment values, then deploy to your stack.

"""
AWS GLUE INCREMENTAL LAKE ETL JOB
Deploy as an AWS Glue ETL script. Replace {{PLACEHOLDERS}} before running.

Glue Job Parameters (set in console or workflow):
  --SOURCE_PATH       s3://{{BUCKET}}/raw/{{ENTITY}}/
  --TARGET_PATH       s3://{{BUCKET}}/curated/{{ENTITY}}/
  --DATABASE_NAME     {{GLUE_DATABASE}}
  --TABLE_NAME        {{ENTITY}}_curated
  --BOOKMARK_KEY      {{PARTITION_COLUMN}}
"""

import sys
from datetime import datetime

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, TimestampType

# ── Resolve job parameters ───────────────────────────────────────────────────
args = getResolvedOptions(
    sys.argv,
    [
        "JOB_NAME",
        "SOURCE_PATH",
        "TARGET_PATH",
        "DATABASE_NAME",
        "TABLE_NAME",
        "BOOKMARK_KEY",
    ],
)

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

SOURCE_PATH = args["SOURCE_PATH"]
TARGET_PATH = args["TARGET_PATH"]
DATABASE_NAME = args["DATABASE_NAME"]
TABLE_NAME = args["TABLE_NAME"]
PARTITION_COL = args["BOOKMARK_KEY"]  # e.g. ingestion_date

# ── CONFIGURATION: business rules ─────────────────────────────────────────────
PK_COLUMNS = ["{{PK_COLUMN}}"]           # e.g. ["order_id"]
REQUIRED_COLUMNS = ["{{PK_COLUMN}}", "{{DATE_COLUMN}}", "{{AMOUNT_COLUMN}}"]
QUARANTINE_PATH = TARGET_PATH.replace("/curated/", "/quarantine/")

# ── 1. Read incremental from raw zone (Glue bookmark enabled) ────────────────
dyf = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [SOURCE_PATH], "recurse": True},
    format="parquet",
    transformation_ctx="raw_source",
)
df = dyf.toDF()

# ── 2. Schema validation & quarantine bad records ────────────────────────────
for col_name in REQUIRED_COLUMNS:
    df = df.withColumn(
        f"_null_{col_name}",
        F.when(F.col(col_name).isNull(), 1).otherwise(0),
    )

df = df.withColumn(
    "_is_valid",
    F.when(
        sum(F.col(f"_null_{c}") for c in REQUIRED_COLUMNS) == 0, True
    ).otherwise(False),
)

valid_df = df.filter(F.col("_is_valid")).drop("_is_valid", *[f"_null_{c}" for c in REQUIRED_COLUMNS])
invalid_df = df.filter(~F.col("_is_valid"))

if invalid_df.count() > 0:
    invalid_df.write.mode("append").parquet(

// ... download full template for remaining code

About this template

PySpark Glue job for incremental raw-to-curated lake ingestion with schema validation, quarantine routing, deduplication, and catalog registration. Configure S3 paths and partition columns.

aws gluepysparkdata lakeincremental
Downloads45
Reviews0
Rating-
CreatedJul 2, 2026
UpdatedJul 2, 2026
Login to review