AWS Glue Incremental Lake ETL
PySpark Glue job for incremental raw-to-curated lake ingestion with schema validation, quarantine routing, deduplication, and catalog registration. Configure S3 paths and partition columns.
AWS GlueIntermediatePython
Code preview
115 linesReplace {{PLACEHOLDERS}} with your environment values, then deploy to your stack.
"""
AWS GLUE INCREMENTAL LAKE ETL JOB
Deploy as an AWS Glue ETL script. Replace {{PLACEHOLDERS}} before running.
Glue Job Parameters (set in console or workflow):
--SOURCE_PATH s3://{{BUCKET}}/raw/{{ENTITY}}/
--TARGET_PATH s3://{{BUCKET}}/curated/{{ENTITY}}/
--DATABASE_NAME {{GLUE_DATABASE}}
--TABLE_NAME {{ENTITY}}_curated
--BOOKMARK_KEY {{PARTITION_COLUMN}}
"""
import sys
from datetime import datetime
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, TimestampType
# ── Resolve job parameters ───────────────────────────────────────────────────
args = getResolvedOptions(
sys.argv,
[
"JOB_NAME",
"SOURCE_PATH",
"TARGET_PATH",
"DATABASE_NAME",
"TABLE_NAME",
"BOOKMARK_KEY",
],
)
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
SOURCE_PATH = args["SOURCE_PATH"]
TARGET_PATH = args["TARGET_PATH"]
DATABASE_NAME = args["DATABASE_NAME"]
TABLE_NAME = args["TABLE_NAME"]
PARTITION_COL = args["BOOKMARK_KEY"] # e.g. ingestion_date
# ── CONFIGURATION: business rules ─────────────────────────────────────────────
PK_COLUMNS = ["{{PK_COLUMN}}"] # e.g. ["order_id"]
REQUIRED_COLUMNS = ["{{PK_COLUMN}}", "{{DATE_COLUMN}}", "{{AMOUNT_COLUMN}}"]
QUARANTINE_PATH = TARGET_PATH.replace("/curated/", "/quarantine/")
# ── 1. Read incremental from raw zone (Glue bookmark enabled) ────────────────
dyf = glueContext.create_dynamic_frame.from_options(
connection_type="s3",
connection_options={"paths": [SOURCE_PATH], "recurse": True},
format="parquet",
transformation_ctx="raw_source",
)
df = dyf.toDF()
# ── 2. Schema validation & quarantine bad records ────────────────────────────
for col_name in REQUIRED_COLUMNS:
df = df.withColumn(
f"_null_{col_name}",
F.when(F.col(col_name).isNull(), 1).otherwise(0),
)
df = df.withColumn(
"_is_valid",
F.when(
sum(F.col(f"_null_{c}") for c in REQUIRED_COLUMNS) == 0, True
).otherwise(False),
)
valid_df = df.filter(F.col("_is_valid")).drop("_is_valid", *[f"_null_{c}" for c in REQUIRED_COLUMNS])
invalid_df = df.filter(~F.col("_is_valid"))
if invalid_df.count() > 0:
invalid_df.write.mode("append").parquet(
// ... download full template for remaining codeAbout this template
PySpark Glue job for incremental raw-to-curated lake ingestion with schema validation, quarantine routing, deduplication, and catalog registration. Configure S3 paths and partition columns.
aws gluepysparkdata lakeincremental
Downloads45
Reviews0
Rating-
CreatedJul 2, 2026
UpdatedJul 2, 2026