API & Data
PySpark & ETL
How Spark actually runs your job, the RDD/DataFrame/Dataset split, lazy evaluation, shuffles and skew, window functions, Delta Lake & ACID, CDC, and — the part interviewers care about for a Test Automation Engineer — how you validate a pipeline. Short explanations, short examples.
01How Spark Runs a Job
Spark is a distributed engine: one driver plans the work and many executors run it in parallel across the cluster. Understanding this split explains almost every performance and correctness question.
- Driver — runs your
main, builds the logical plan, and schedules tasks. TheSparkSessionlives here. - Executors — JVM processes on worker nodes that run tasks and hold cached data in memory.
- Partition — a chunk of the data; one task processes one partition. Parallelism = number of partitions you can run at once.
Spark is lazy: transformations only build a plan; nothing runs until an action asks for a result.
The Catalyst optimizer rewrites that logical plan (predicate pushdown, column pruning, join reordering) and Tungsten generates efficient bytecode — which is exactly why you stay on DataFrames instead of hand-rolling RDDs.
02RDD vs DataFrame vs Dataset
Three abstractions, in order of how much Spark can optimize for you:
| API | What it is | When |
|---|---|---|
| RDD | Low-level distributed collection of objects. Full control, no Catalyst optimization, no schema. | Rarely — custom partitioning or unstructured data only. |
| DataFrame | Distributed table with a schema (named, typed columns). Optimized by Catalyst. The default in PySpark. | Almost always. |
| Dataset | DataFrame + compile-time type safety. JVM only (Scala/Java). | N/A in Python — PySpark has no typed Dataset. |
df = (spark.read
.schema(schema) # never inferSchema in prod
.parquet("s3://lake/events/"))
df.filter(df.country == "DE").select("user_id", "amount")03Transformations & Actions
Every DataFrame operation is one of two kinds:
- Transformations (
select,filter,join,groupBy,withColumn) are lazy — they return a new DataFrame and add to the plan. - Actions (
count,collect,show,write,take) trigger execution of the whole plan.
Transformations are further split by how data moves:
- Narrow (
filter,map,select) — each output partition depends on one input partition. No data movement; cheap. - Wide (
groupBy,join,distinct,orderBy) — rows must be regrouped across partitions. Triggers a shuffle; expensive.
collect() pulls every row to the driver and can OOM it. In tests assert on aggregates or a bounded limit(...).collect(), never on a full collect() of production-scale data.04Shuffling, Partitions & Joins
A shuffle redistributes data across executors for a wide transformation — it writes intermediate data to disk and reads it back over the network. It is the single most expensive thing Spark does, so most tuning is about avoiding or shrinking it.
Repartition vs Coalesce
repartition(n)— full shuffle to exactlynpartitions (can increase or decrease); use to fix skew or raise parallelism.coalesce(n)— merges partitions without a full shuffle; only decreases. Use before writing to avoid thousands of tiny files.
Broadcast join
If one side of a join is small (default < 10 MB, tunable), Spark broadcasts it to every executor so the join happens locally — no shuffle of the big table.
from pyspark.sql.functions import broadcast
big.join(broadcast(small_dim), "product_id", "left")
# forces a broadcast hash join, skipping the shufflespark.sql.shuffle.partitions (default 200). Too high → tiny tasks & overhead; too low → huge tasks & spill. Adaptive Query Execution now tunes this at runtime.05Data Skew & Salting
Skew = a few keys hold most of the rows (one hot user_id, a NULL join key, a default value). One task gets a giant partition and the whole stage waits on it — you see 199 tasks finish in seconds and 1 run for an hour.
Fixes, in order of preference:
- Adaptive Query Execution (AQE) — enable
spark.sql.adaptive.skewJoin.enabled; Spark splits skewed partitions automatically. First thing to reach for. - Broadcast the small side if it fits — no shuffle, so no skew.
- Salting — append a random suffix to the hot key on both sides to spread it across partitions, then strip it after the join.
- Isolate — handle
NULL/sentinel keys separately and union the results.
from pyspark.sql.functions import concat, lit, rand, floor
N = 8
big_salted = big.withColumn("k", concat("user_id", lit("_"),
floor(rand() * N)))
# explode the small side into N salted copies, then join on k06Window Functions
Window functions compute across a set of related rows without collapsing them (unlike groupBy, which reduces to one row per group). Ideal for ranking, running totals, and period-over-period deltas.
partitionBy— the group the function runs within (e.g. perregion).orderBy— the sequence inside the partition (e.g. bydate).- The function —
row_number(),rank(),lag()/lead(), or an aggregate likesum().
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, sum as _sum
w = Window.partitionBy("region").orderBy("date")
df.withColumn("running_total", _sum("sales").over(w))
# de-dupe: keep the latest row per key
latest = (df.withColumn("rn", row_number().over(
Window.partitionBy("id").orderBy(df.ts.desc())))
.filter("rn = 1").drop("rn"))row_number() over partitionBy(key).orderBy(ts.desc()), keep rn = 1.07Schema & Data Quality — the QA Mindset
This is where a Test Automation Engineer earns the role: a pipeline that runs green can still emit wrong data. You test the data, not just the code.
Pin the schema
Always pass an explicit schema. inferSchema reads the file twice and silently changes types between runs — a flaky-test factory.
The checks interviewers expect you to name
- Completeness — no unexpected
NULLs in required columns. - Uniqueness — primary keys aren’t duplicated.
- Row-count / reconciliation — output count reconciles with source (no silent drops from an inner join).
- Range / domain —
amount >= 0, status in an allowed set. - Referential — every foreign key resolves to a dimension row.
- Freshness — the latest partition is recent enough.
from pyspark.sql.functions import col, count, when
# null + duplicate audit in one pass
bad = df.select([
count(when(col(c).isNull(), c)).alias(c) for c in df.columns
]).collect()[0]
dupes = df.groupBy("id").count().filter("count > 1").count()
assert dupes == 0, f"{dupes} duplicate ids"08Delta Lake & ACID
Plain Parquet on object storage has no transactions — a half-finished write leaves readers seeing partial data. Delta Lake (and Iceberg/Hudi) add a transaction log on top to bring ACID to the lake.
| Property | Meaning |
|---|---|
| Atomicity | A write fully commits or not at all — no partial files visible. |
| Consistency | Schema is enforced on write; bad writes are rejected. |
| Isolation | Concurrent readers/writers don’t see each other’s in-progress data (snapshot isolation). |
| Durability | Once committed, it survives crashes. |
What Delta buys you:
- MERGE / upserts — update & delete historical rows, not just append.
- Time travel — query a past version (
VERSION AS OF/TIMESTAMP AS OF); great for diffing a regression. - Schema enforcement & evolution — reject mismatches, or opt in to
mergeSchema.
-- idempotent upsert (the CDC workhorse)
MERGE INTO target t USING updates u ON t.id = u.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *VERSION AS OF the previous good version against the current one to isolate exactly which rows changed.09Incremental Processing & CDC
Goal: process only what changed since the last run instead of reprocessing everything — the difference between a 5-minute and a 5-hour job.
- Watermark column (simplest) — read where
updated_at > last_run_ts. Misses hard deletes. - Log-based CDC (e.g. Debezium) — reads the database’s transaction log and emits every insert/update/delete exactly. The reliable standard.
You make incremental loads safe by making them idempotent: a MERGE keyed on the primary key, so re-running the same batch produces the same table — no duplicates. That idempotency is itself a test case.
>= with dedupe, or you lose rows that landed in the same second as the boundary; and a pure updated_at filter never sees deletes.10ETL vs ELT
- ETL — Extract, Transform, Load. Clean and shape data before it lands in the warehouse.
- ELT — Extract, Load, Transform. Load raw first, transform in the warehouse with its own compute (dbt on Snowflake/BigQuery/Databricks).
Trend: ELT dominates because cloud warehouses make compute cheap and elastic, and keeping the raw layer lets you re-derive everything downstream. A typical lake is layered bronze (raw) → silver (cleaned) → gold (aggregated), and you put data-quality gates between the layers.
11Performance & Reliability
The levers you should be able to list:
- cache() / persist() — materialize a DataFrame reused many times; don’t cache a one-shot.
- Partition pruning — partition by a column you filter on (e.g.
date) so Spark skips whole directories. - Predicate & projection pushdown — columnar formats (Parquet) let Spark read only needed columns/row-groups; prefer them over CSV/JSON.
- Adaptive Query Execution (AQE) — coalesces shuffle partitions, flips to broadcast joins, and splits skew at runtime.
- Avoid Python UDFs — they break Catalyst and serialize row-by-row; use built-in
pyspark.sql.functions, or a pandas/Arrow UDF if you must. - Small-files problem —
coalescebefore writing and compact periodically.
12Testing a PySpark Pipeline
Pipelines are testable like any other code — the trick is structuring them so you can.
- Pure transform functions — keep logic in functions that take a DataFrame and return a DataFrame; no I/O inside. Now you can unit-test them on tiny in-memory inputs.
- Local SparkSession — a session-scoped
local[*]fixture; build inputs withspark.createDataFrame. - DataFrame equality — use chispa (
assert_df_equality) or the built-inassertDataFrameEqual(Spark 3.5+) instead of comparing collected lists — it handles column order, nullability, and gives readable diffs. - Data-quality / contract tests — Great Expectations or dbt tests running the completeness/uniqueness/range checks as a gate in CI.
import pytest
from pyspark.sql import SparkSession
from chispa import assert_df_equality
@pytest.fixture(scope="session")
def spark():
return SparkSession.builder.master("local[*]").getOrCreate()
def test_dedupe_keeps_latest(spark):
src = spark.createDataFrame(
[(1, "2024-01-01"), (1, "2024-02-01")], ["id", "ts"])
expected = spark.createDataFrame([(1, "2024-02-01")], ["id", "ts"])
assert_df_equality(dedupe_latest(src), expected, ignore_row_order=True)13Rapid-Fire Q&A
Reveal each answer to self-check, then test yourself with the quiz.
Why is Spark lazy?
Transformations only build a plan; deferring until an action lets Catalyst optimize the whole DAG (pushdown, pruning, join reorder) before anything runs.
Narrow vs wide transformation?
Narrow = output partition depends on one input partition, no movement (filter, select). Wide = rows regroup across partitions, triggering a shuffle (groupBy, join).
What is a shuffle and why care?
Redistributing data across executors via disk + network for a wide op — the most expensive operation, so tuning is mostly about avoiding or shrinking it.
repartition vs coalesce?
repartition(n) = full shuffle to exactly n (up or down); coalesce(n) = merge down with no full shuffle. Coalesce before writing to avoid tiny files.
How do you fix a skewed join?
Enable AQE skew-join, broadcast the small side if it fits, or salt the hot key on both sides; isolate NULL/sentinel keys separately.
When does Spark broadcast a join?
When one side is under spark.sql.autoBroadcastJoinThreshold (~10MB); it ships that side to every executor so the join is local — no shuffle of the big table.
RDD vs DataFrame vs Dataset in PySpark?
DataFrame is the default (schema + Catalyst). RDD is low-level with no optimizer. Dataset is JVM-only — PySpark has no typed Dataset.
How do you de-duplicate keeping the latest row?
row_number() over partitionBy(key).orderBy(ts desc), keep rn = 1.
What does Delta Lake add over Parquet?
ACID transactions, MERGE/upserts, time travel, and schema enforcement via a transaction log — Parquet alone has none of these.
Why avoid a Python UDF?
Catalyst can’t optimize it and it serializes row-by-row; prefer built-in functions, or an Arrow/pandas UDF when unavoidable.
How do you validate a pipeline’s output?
Schema + completeness, uniqueness, row-count reconciliation, range/domain, and referential checks — via Great Expectations or assertions, gating CI.
How do you unit-test a transformation?
Keep transforms as pure DataFrame→DataFrame functions, feed small createDataFrame inputs on a local SparkSession, and compare with chispa / assertDataFrameEqual.