PySpark runs Python transformations on Spark's distributed JVM engine
Lazy evaluation: builds a plan, executes only on .count(), .write(), .show()
Data skew on join keys is the #1 cause of production stalls at 99% completion
Use broadcast joins for small tables; salt join keys when both sides are large
Native pyspark.sql.functions are 5x–20x faster than Python UDFs
Set spark.sql.shuffle.partitions based on data size, not the default 200
Plain-English First
Imagine you need to count every word in every book ever printed. Doing it yourself, one book at a time, would take lifetimes. Now imagine you hire a thousand librarians, split the books between them, and each one counts their pile simultaneously — then a coordinator tallies the final results. PySpark is that coordinator. Your Python code describes the counting rules; Spark figures out how to split the work across dozens of machines without you micromanaging who reads which shelf. The trick is that Spark doesn't actually start reading until you demand the final answer — which means it can plan the most efficient reading route before anyone opens a single book.
A fintech team I worked with spent three weeks tuning a PySpark job that aggregated transaction records for daily risk reports. It ran fine on 10 million rows in staging. At 2 billion rows in production, it silently stalled for six hours, then crashed the cluster with a java.lang.OutOfMemoryError: GC overhead limit exceeded. The root cause wasn't bad code — it was a single unpartitioned join that forced every executor to shuffle 400GB of data through a single node. One line of misunderstood API destroyed a week's worth of cluster credits.
PySpark sits at the intersection of Python's ecosystem and Apache Spark's distributed execution engine. That's a powerful combination, but it's also a trap for developers who treat it like pandas with a bigger machine. Spark doesn't run your Python the way you think it does. Your DataFrame transformations are lazy. Your joins can silently cause catastrophic data skew. Your UDFs are serialized across a JVM boundary in a way that can cut throughput by 10x. Understanding the execution model isn't academic — it's the difference between a job that completes in 8 minutes and one that runs your cloud bill into the thousands.
After this article, you'll know how to configure a SparkSession for production, write transformations that respect Spark's lazy evaluation model, tune partitioning to eliminate shuffle bottlenecks, debug skewed joins using the Spark UI, and write Spark-native aggregations instead of Python UDFs that strangle your executors. Concrete patterns. Runnable code. The failure modes that textbooks skip.
SparkSession Setup and the Execution Model You Must Understand First
Most tutorials show you spark = SparkSession.builder.getOrCreate() and move on. That's like showing someone a car key and skipping the part about combustion engines. Before you write a single transformation, you need to understand what Spark actually does with your code — because it doesn't run it.
Spark uses lazy evaluation. Every transformation you write — filter, select, join, groupBy — builds a logical query plan. Nothing executes until you call an action: show(), count(), write(), collect(). This is why you can chain twenty transformations and Spark will optimize the entire chain before touching a single byte of data. The Catalyst optimizer reorders predicates, prunes unused columns, and sometimes rewrites your join strategy entirely. It's genuinely impressive — until you start debugging and wonder why your print statements inside a map never fire.
The DAG (Directed Acyclic Graph) is Spark's execution blueprint. Each action triggers a job, which splits into stages wherever a shuffle is required, and stages split into tasks that run in parallel across executor cores. Shuffles are expensive because they require data to move across the network between executors. Every wide transformation — groupBy, join, distinct, repartition — causes a shuffle. Narrow transformations — filter, select, withColumn — don't. This distinction drives every performance decision you'll make in production.
SparkSessionConfig.pyPYTHON
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# io.thecodeforge — Python tutorialfrom pyspark.sql importSparkSessionfrom pyspark.sql import functions as F
from pyspark.sql.types importStructType, StructField, StringType, DoubleType, TimestampType, LongType# Production SparkSession — never use defaults for anything beyond local testing.# spark.sql.shuffle.partitions defaults to 200, which is catastrophically wrong# for both small datasets (200 near-empty tasks) and massive ones (OOM per task).
spark = (
SparkSession.builder
.appName("transaction-risk-aggregator")
.config("spark.sql.shuffle.partitions", "400")
.config("spark.sql.adaptive.enabled", "true")
.config("spark.sql.adaptive.coalescePartitions.enabled", "true")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.autoBroadcastJoinThreshold", "50mb")
.getOrCreate()
)
spark.sparkContext.setLogLevel("WARN")
transaction_schema = StructType([
StructField("transaction_id", StringType(), nullable=False),
StructField("account_id", StringType(), nullable=False),
StructField("merchant_id", StringType(), nullable=False),
StructField("amount_usd", DoubleType(), nullable=False),
StructField("transaction_ts", TimestampType(), nullable=False),
StructField("risk_score", DoubleType(), nullable=True),
])
transaction_df = (
spark.read
.schema(transaction_schema)
.option("mergeSchema", "false")
.parquet("/data/transactions/year=2024/month=01/")
)
high_risk_transactions = (
transaction_df
.filter(F.col("risk_score") > 0.85)
.filter(F.col("amount_usd") > 100.0)
.select("transaction_id", "account_id", "merchant_id", "amount_usd", "risk_score")
.withColumn("risk_tier",
F.when(F.col("risk_score") > 0.95, F.lit("CRITICAL"))
.when(F.col("risk_score") > 0.85, F.lit("HIGH"))
.otherwise(F.lit("MEDIUM"))
)
)
high_risk_transactions.explain(mode="formatted")
record_count = high_risk_transactions.count()
print(f"High-risk transactions in January 2024: {record_count:}")
Production Trap: inferSchema Costs You Two Full Scans
Using .option('inferSchema', 'true') on a CSV or JSON source triggers a full dataset scan just to guess column types — before your actual job even starts. On a 500GB dataset, I've seen this add 40 minutes to job startup time. Always define your schema explicitly with StructType. It also prevents silent type coercion bugs where '2024-01-15' gets read as a string instead of a date, corrupting every downstream date calculation without a single error.
Production Insight
Lazy evaluation means your code can look correct but fail at the .count() line.
The physical plan shown by explain() is your only way to verify Catalyst actually used your filters.
Always run explain(mode="formatted") on any DataFrame before promoting code to production — one missing predicate pushdown doubles scan time.
Key Takeaway
Spark transformations build a plan; actions execute it.
Never confuse the two — a .count() after twenty chained .filter() calls triggers all twenty at once.
Use explain() to confirm your transformations are being optimized the way you expect.
Joins, Shuffles, and the Data Skew Problem That Tanks Production Jobs
Joins are where production PySpark jobs go to die. Not because joins are bad — because developers write them without thinking about what Spark has to do physically to execute them. When you join two DataFrames, Spark needs to get matching keys onto the same executor. That means shuffling data across the network. On a well-distributed dataset, this is fine. On a skewed dataset — where one key represents 40% of your data — one executor gets buried while the other 99 sit idle. Your job appears to be 99% complete for six hours, then either finishes three days late or crashes.
The most common skew pattern I see in production is joining on customer_id or merchant_id in transactional data. Real-world data isn't uniform. Your top merchant processes ten thousand times more transactions than your median merchant. When you join your transactions table against a merchant metadata table on merchant_id, all records for that top merchant route to a single executor. I've personally watched this kill a Spark job at a payments company at 11pm on a Friday — the job had run fine for months, then the top merchant's volume doubled during a flash sale and suddenly one task ran for four hours while 199 tasks completed in two minutes.
Spark 3.x Adaptive Query Execution (AQE) helps here, but it's not magic. You still need to understand broadcast joins, salting strategies, and when to break a complex join into multiple simpler stages.
-- stddev/mean ratio ~0.9% indicates healthy partition distribution after salting
-- Before salting: one partition contained 818M rows (40% of dataset)
The Classic Bug: Silent Cartesian Join from a Missing Join Condition
If you accidentally call df1.join(df2) without specifying the 'on' parameter — or if your join key column has the same name on both sides but you pass it as a string list and one side has it as a different type — Spark may produce a cartesian product instead of an error. 2B rows × 50K merchants = 100 trillion output rows. Your job won't fail immediately; it'll appear to run forever while quietly filling every disk on the cluster. Always call .explain() before running a join in production and verify the join type in the physical plan says 'BroadcastHashJoin' or 'SortMergeJoin', never 'CartesianProduct'.
Production Insight
Data skew on join keys is the single most common cause of production job stalls.
AQE can split skewed partitions, but it only helps post-shuffle — the root cause remains.
Salting adds ~10% overhead to the small table but prevents 100x slowdowns on the large.
Key Takeaway
Broadcast joins eliminate shuffle when one side fits in executor memory.
If broadcast isn't possible, salt the join key to spread the hot key across multiple partitions.
Always call explain() and verify the join strategy in the physical plan.
Aggregations Without UDFs: Why Native Spark Functions Outperform Python 10x
Here's what I see constantly from developers coming from pandas: they hit a transformation that's slightly complex, they can't immediately find the built-in Spark function, and they write a Python UDF. It feels natural. It works in testing. Then it hits production at scale and your job takes four times longer than it should.
The reason is the JVM boundary. Spark's execution engine runs on the JVM. Your Python UDFs run in a separate Python process on each executor. For every batch of rows, Spark has to serialize data from JVM memory, ship it across a local socket to the Python process, execute your Python code, serialize the results back, and deserialize them into JVM memory. This round-trip happens millions of times. I've measured a trivially simple string transformation running 8x slower as a Python UDF than as a native Spark SQL function call.
The fix is to learn pyspark.sql.functions deeply. It covers 95% of what you'd ever want to do with a UDF. Window functions handle running totals, rankings, and lag/lead calculations. Higher-order functions (transform, filter, aggregate) handle array and map columns. When you genuinely need custom logic that has no Spark equivalent, use Pandas UDFs (also called vectorized UDFs) — they batch rows into pandas DataFrames using Apache Arrow, which eliminates the per-row serialization cost and typically runs within 2x of native Spark performance.
Senior Shortcut: Use explain() to Catch Window Function Spills Before They Hit Prod
Window functions without a partitionBy clause — like Window.orderBy('timestamp') with no partitionBy — force ALL data into a single partition for sorting. On 2 billion rows this causes a guaranteed OOM or a massive disk spill. Call .explain('extended') on any DataFrame using window functions and scan the physical plan for 'Sort' nodes that aren't preceded by an 'Exchange hashpartitioning'. If you see a sort without a preceding partition exchange on a large dataset, you're about to have a very bad time. Always include partitionBy in production window specs.
Production Insight
Python UDFs cross the JVM boundary per row — that's 5x–20x slower than native functions.
Pandas UDFs batch rows via Apache Arrow, reducing overhead to ~2x of native.
Always check pyspark.sql.functions before reaching for a UDF — 95% of cases have a built-in equivalent.
Key Takeaway
Native Spark functions run inside the JVM; Catalyst can optimize them.
UDFs are opaque to Catalyst and incur serialization cost.
Use native F.* functions first; only fall back to Pandas UDFs when genuinely needed.
Writing to Production Storage: Partition Strategy, Output Modes, and Avoiding the Small Files Problem
Writing Spark output correctly is just as important as reading and processing correctly, and it's where I see the most rookie mistakes land in production. The most insidious one: after all your careful processing, you write out with the default partition count — or worse, you repartition(1) because you want a single output file — and you've just created either thousands of tiny 1KB files or one massive unparallelizable blob.
The small files problem is real and painful. HDFS and object stores like S3 weren't designed for millions of tiny files. Each file carries metadata overhead. AWS S3 LIST operations are expensive and slow. Downstream Spark jobs reading your output have to open one file handle per partition file — if you wrote 10,000 partitions with 3 tasks each, your downstream reader opens 30,000 files just to start processing. I've seen a single poorly-partitioned write turn a downstream job's startup time from 8 seconds to 12 minutes.
The right approach is intentional: partition your output by the dimensions your downstream queries actually filter on, target 100-500MB per output file, and use coalesce (not repartition) when you need to reduce partition count without a shuffle. For streaming or incremental pipelines, understand the difference between overwrite, append, and the Delta Lake / Iceberg merge patterns — because overwrite on partitioned data can silently delete partitions you didn't intend to touch.
Never Do This: mode('overwrite') Without Dynamic Partition Overwrite
If you write partitioned parquet with .write.mode('overwrite').partitionBy('report_date') and partitionOverwriteMode is set to 'static' (the default), Spark deletes the ENTIRE output directory tree before writing — not just the partitions present in your current DataFrame. Processing January 15th data will silently delete January 1st through 14th. This has caused real data loss in pipelines I've had to recover. Always set spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic') when doing incremental writes to partitioned tables.
Production Insight
Small output files (under 64MB) increase downstream read times by up to 60x.
Always partition by filterable columns and target 100-500MB per file.
Use coalesce() to reduce partition count without a shuffle; repartition() only when redistribution is needed.
Key Takeaway
Partition output by query dimensions, not arbitrary columns.
Target 100-500MB per file for optimal storage and read performance.
Set partitionOverwriteMode to dynamic before writing — static mode silently deletes partitions.
Debugging Production PySpark Jobs Using the Spark UI and Metrics
When your job fails — not if, when — you need to know exactly where to look. The Spark UI is your primary diagnostic tool. It exposes everything: DAG visualization, stage details, task metrics, executor memory, shuffle read/write, and GC time. Most engineers never open it until something breaks. You should be reading it during development.
The key tabs
Jobs: Shows each action (count, write) and the DAG of stages.
Stages: For each stage, you see the number of tasks, duration distribution, and input/output sizes. The task duration histogram is your fastest indicator of data skew — if one bar is 100x longer than the rest, you have a problem.
Storage: Shows cached DataFrames. If cache memory usage is unexpectedly high or low, you may have wasted memory or evictions.
Executors: Memory per executor, GC time, and tasks completed. High GC time (>20% of task time) indicates memory pressure.
SQL: Physical plan of each query. Look for 'Sort' without preceding 'Exchange' — that's a window function without partitionBy. Look for 'CartesianProduct' — that's a missing join condition.
A critical metric often missed:Shuffle Read Size / Records per task. If one task reads 10GB while others read <100MB, you have data skew. AQE can help split skewed partitions at runtime, but you must enable spark.sql.adaptive.skewJoin.enabled=true and set a reasonable threshold (e.g., 256mb).
Another essential setting for debugging: spark.sql.adaptive.logLevel=TRACE — this logs every AQE decision like partition coalescing or skew splitting. In a recent incident, that log revealed a partition was not being split because the skew threshold was set to 256MB but the actual skewed partition was 249MB. Lesson: round down your thresholds.
Finally, shuffle spill metrics tell you when data exceeds executor memory. If you see 'Shuffle Spill (Memory)' and 'Shuffle Spill (Disk)' in task metrics, your partitions are too large. Increase partition count or executor memory. Spilling to disk is a last resort — it slows jobs by orders of magnitude.
SparkUI: The 'Stages' Tab is Your Single Pane of Debugging Glass
The Stages tab tells you everything: task duration histogram, input size, shuffle read/write, GC time, and spill metrics. Spend 10 minutes here during development and you'll save hours of debugging later. The DAG visualization also shows you which stages can be pipelined and where shuffles occur — invaluable for understanding the cost of each transformation.
Production Insight
Check GC Time in task metrics: >20% means memory pressure — increase executor memory or partition count.
Check Shuffle Spill: any spill to disk indicates partitions too large.
AQE logs (level TRACE) show exactly why a partition was or was not split.
Key Takeaway
The Spark UI Stages tab is the first place to look when a job fails or stalls.
Task duration histogram reveals data skew instantly.
Enable spark.eventLog and spark.sql.adaptive.logLevel=TRACE in production for retrospective debugging.
● Production incidentPOST-MORTEMseverity: high
Unpartitioned Join Busts Cluster Budget
Symptom
Job runs fine on 10M rows in staging. At 2B rows in production, it hangs at 99% completion for hours, then dies with java.lang.OutOfMemoryError: GC overhead limit exceeded. Spark UI shows one task reading 400GB of shuffle data while 199 tasks process <1GB each.
Assumption
The team assumed Spark's Catalyst optimizer would automatically choose a broadcast join because the merchant lookup table was only 50MB. They didn't realize join key data types mismatched (String vs Integer), which forced a SortMergeJoin.
Root cause
The merchant_id column was a String in the transactions table but an Integer in the lookup. Spark cannot broadcast when data types differ, so it fell back to SortMergeJoin. The skewed distribution of merchant IDs (top merchant = 40% of rows) concentrated all data on one partition.
Fix
1) Cast the join key to the same type: lookup.withColumn("merchant_id", col("merchant_id").cast("string"))
2) Explicitly F.broadcast(lookup) after type alignment
3) Enable AQE: spark.sql.adaptive.skewJoin.enabled=true as a safety net
4) Add a healthcheck that monitors partition size distribution via Spark UI.
Key lesson
Always cast join keys to the same data type before the join — type mismatches silently disable broadcast optimizations.
Never rely on autoBroadcastJoinThreshold alone; always call explain() and verify the join strategy in the physical plan.
Monitor per-task input size in the Spark UI Stages tab — a 100x variance between tasks is a guaranteed skew problem.
Enable AQE with skew join handling as a safety net, but don't treat it as a substitute for correct join design.
Production debug guideDiagnose and fix the four most common PySpark production failures4 entries
Symptom · 01
Job stuck at 99% completion for >30 minutes after all other tasks finished
→
Fix
Check Spark UI → Stages tab → task duration histogram. If one task runs >100x longer than median, you have data skew. Click the slow task and note 'Shuffle Read Size / Records' — if one partition has 10x more data, apply salting or broadcast join.
Symptom · 02
java.lang.OutOfMemoryError: Java heap space or GC overhead limit exceeded
→
Fix
Check Spark UI → Executors tab → 'Max Memory' usage. If >80%, reduce executor memory per executor or increase number of executors. Also check spark.sql.shuffle.partitions — default 200 may create partitions >500MB. Increase partitions to ~data_size_in_MB/200.
Symptom · 03
Job runs fine in staging but fails with stage retries or shuffle errors
→
Fix
Look for 'FetchFailedException' in logs — this indicates network issues or executor container kill. Check spark.shuffle.service.enabled (should be true for dynamic allocation). Increase spark.network.timeout to 600s. Verify executor-to-executor connectivity is open (no firewall blocking).
Symptom · 04
PySpark job runs 5x slower than expected; CPU utilization <50%
→
Fix
Open Spark UI → SQL tab → physical plan. Look for 'BroadcastHashJoin' vs 'SortMergeJoin' — if you expected a broadcast but see sort merge, check data type compatibility. Also check 'PythonUDF' nodes in the plan — each UDF adds serialization overhead. Replace with native F.* functions.
★ Quick Debug Cheat Sheet: PySpark Production IssuesImmediate commands and fixes for the most common PySpark job failures
Job stuck at 99% (data skew suspected)−
Immediate action
Open Spark UI → Stages tab → click the stuck stage → view task list. Sort by Duration descending.
Commands
In Spark UI on the driver node: navigate to port 4040 or set spark.ui.port in config.
Check task duration distribution: click 'Show metrics' → 'Shuffle Read Size / Records'. If one task has >10x data than others, note partition ID.
Fix now
If lookup table is small (<200MB), add explicit F.broadcast(lookup) to the join. If both sides large, apply salting (random salt on join key). Set spark.sql.adaptive.skewJoin.enabled=true for future runs.
OOM / GC overhead limit exceeded+
Immediate action
Check Executors tab in Spark UI for memory usage per executor.
Commands
`spark.executor.memory=8g` — increase from default 1g. Also increase `spark.executor.memoryOverhead=2g`.
`spark.sql.shuffle.partitions=4000` — increase if data volume >200GB. Use formula: total_input_size_MB / 150.
Fix now
Enable AQE: spark.sql.adaptive.coalescePartitions.enabled=true and spark.sql.adaptive.advisoryPartitionSizeInBytes=256mb. This allows Spark to merge small partitions automatically.
Shuffle fetch failure / stage retries+
Immediate action
Check executor logs for 'FetchFailedException' and 'Connection refused'.
Commands
`spark.shuffle.service.enabled=true` — enables external shuffle service for dynamic allocation.
`spark.network.timeout=600s` — increases default timeout from 120s to avoid false failures during shuffle.
Check SQL tab in Spark UI → physical plan. Look for 'PythonUDF' nodes.
Commands
`df.explain('formatted')` — prints the physical plan. If you see 'PythonUDF', identify the column.
Replace with native function: use `F.col`, `F.when`, `F.regexp_extract` instead of `udf(lambda ...)`.
Fix now
If UDF is unavoidable, convert to Pandas UDF: @pandas_udf(returnType, PySparkUDFType.SCALAR) and ensure Apache Arrow is installed (pip install pyarrow).
UDF Performance Comparison
Aspect
Python UDF (udf decorator)
Pandas UDF (pandas_udf)
Native Spark Function (F.*)
Serialization overhead
Per-row Python ↔ JVM round-trip
Batch Arrow serialization
None — runs inside JVM
Typical throughput vs native
5x–20x slower
1.5x–3x slower
Baseline (fastest)
Use case fit
Legacy code only — avoid
Custom ML scoring, complex regex
95% of production transformations
Null handling
Must handle None explicitly in Python
Must handle NaN/None in pandas
Built-in null propagation
Pushdown / optimization
No — opaque to Catalyst optimizer
No — opaque to Catalyst optimizer
Yes — Catalyst can optimize
Debugging experience
Stack traces cross JVM/Python boundary
Pandas exceptions surface clearly
Clear Spark plan errors
When AQE helps
No — bottleneck is serialization
Partially — batching helps
Yes — full AQE benefits apply
Type safety
Runtime type errors only
Runtime type errors only
Compile-time schema checks
Key takeaways
1
Spark transformations are lazy; action calls trigger execution.
2
Data skew on join keys is the #1 cause of production stalls.
3
Broadcast joins eliminate shuffle when one side fits in memory; salting fixes skew when both sides are large.
4
Native Spark functions are 5x-20x faster than Python UDFs; use Pandas UDFs only when necessary.
5
Partition output by filterable dimensions; target 100-500MB per file.
6
Use the Spark UI Stages tab to diagnose skew and memory issues.
7
Always call explain() to verify the physical plan before running in production.
8
Enable AQE with skew join and coalescing as a safety net, but don't rely on it to fix design flaws.
Common mistakes to avoid
8 patterns
×
Calling .collect() on a large DataFrame to 'check the results'
Symptom
AnalysisException or silent OOM as all distributed data is pulled into the driver's heap. On 10GB+ DataFrames, this kills the driver process.
Fix
Use .show(20), .limit(100).toPandas(), or .describe() instead. They leave data on the executors and only transfer a sample.
×
Using spark.sql.shuffle.partitions=200 (default) for datasets over 100GB
Symptom
Each post-shuffle partition becomes 500MB+, causing executor OOM, task retries, and excessive GC.
Fix
Set partitions to ceil(dataset_size_in_MB / 200). Enable AQE with adaptive coalescing so Spark merges small partitions automatically.
×
Writing a Python UDF for string transformations like uppercasing, trimming, or regex matching
Symptom
Throughput drops 8x–15x with no benefit. The same transformation using native functions is nearly instant.
Fix
Use pyspark.sql.functions equivalents: F.upper(), F.trim(), F.regexp_extract(), F.regexp_replace(). Check docs before reaching for udf().
×
Caching a DataFrame that's only used once
Symptom
Wastes executor memory, causes cache evictions for DataFrames you actually need cached, adds serialization overhead.
Fix
Cache only when a DataFrame is referenced by two or more downstream actions in the same job. Call .unpersist() explicitly after done.
×
Using repartition() when you only need coalesce()
Symptom
repartition() always causes a full shuffle across the network. If you're just reducing partition count, this is wasted I/O.
Fix
Use coalesce() to merge existing partitions without network I/O. repartition() is only needed when you also need to redistribute by a key or increase partition count.
×
Forgetting to cast join keys to the same data type
Symptom
Spark cannot broadcast or use optimized join strategies when key types mismatch. Falls back to SortMergeJoin, causing unnecessary shuffle and possible skew.
Fix
Always cast both sides of the join key to the same string type: df1.withColumn("key", col("key").cast("string")). Verify with explain().
×
Using .option('inferSchema', 'true') on CSV/JSON in production
Symptom
Double scan of entire dataset before the actual job starts; silent type coercion bugs when mixed data types are present.
Fix
Define schema explicitly using StructType. It's zero-cost to write and prevents production data corruption.
×
Setting spark.sql.shuffle.partitions to a value without considering AQE coalescing
Symptom
If AQE coalescing is enabled, setting a high partition count is fine, but if AQE is off, too many partitions cause overhead and too few cause OOM.
Fix
Enable AQE and set advisoryPartitionSizeInBytes to your target file size. Let Spark decide the final partition count.
INTERVIEW PREP · PRACTICE MODE
Interview Questions on This Topic
Q01SENIOR
You have a PySpark job where one groupBy stage consistently runs at 99% ...
Q02SENIOR
When would you choose a SortMergeJoin over a BroadcastHashJoin in a prod...
Q03SENIOR
Explain lazy evaluation in Spark. Can you give an example where lazy eva...
Q04SENIOR
What is the impact of data skew on a Spark SQL join, and how does AQE he...
Q05SENIOR
You need to run a weekly aggregation that sums transactions per merchant...
Q01 of 05SENIOR
You have a PySpark job where one groupBy stage consistently runs at 99% completion for two hours while all other tasks finish in five minutes. What's happening, how do you confirm it, and what are your mitigation options given both sides of the join are too large to broadcast?
ANSWER
This is classic data skew on the groupBy key. Confirm by opening the Spark UI → Stages tab → click the stuck stage → view task duration histogram. One task will have much higher 'Shuffle Read Size / Records'. Mitigation: (1) Enable AQE with spark.sql.adaptive.skewJoin.enabled=true — but that only helps post-shuffle; it doesn't fix the root cause. (2) Apply salting: add a random salt to the skewed key (e.g., concat(key, '_', floor(rand() * 20))) and explode the smaller side correspondingly. (3) If possible, break the operation into multiple steps — first aggregate on the skewed key's non-skewed portion separately, then union. (4) Check if the key distribution is truly skewed or if you have a data quality issue (e.g., a null key causing all nulls to land in one partition).
Q02 of 05SENIOR
When would you choose a SortMergeJoin over a BroadcastHashJoin in a production Spark pipeline, and what signals in the Spark UI would tell you that your current join strategy is wrong?
ANSWER
SortMergeJoin is chosen when both sides of the join are too large to broadcast (exceeding spark.sql.autoBroadcastJoinThreshold, default 10MB). It sorts both sides by the join key and then merges — this requires a full shuffle on both sides. BroadcastHashJoin is faster but limited by memory. In the Spark UI SQL tab, if you see 'SortMergeJoin' but expected a broadcast, check: (1) Are both sides large? (2) Are key types matching? A type mismatch forces SortMergeJoin. (3) Is spark.sql.autoBroadcastJoinThreshold high enough? If the plan shows 'Exchange' nodes before the join, that's the shuffle — that's expected for SortMergeJoin. However, if you see a single 'Exchange' node disproportionately large for one side, you have a problem. Also check 'Shuffle Read Size' in the Stages tab — if one side's shuffle reads are uneven, you have skew. Wrong strategy signals: high shuffle read/write times, high GC time on executors, and the join stage being the longest stage by far.
Q03 of 05SENIOR
Explain lazy evaluation in Spark. Can you give an example where lazy evaluation causes a bug that only manifests in production?
ANSWER
Lazy evaluation means transformations are recorded in a DAG but not executed until an action is called. A common production bug: a developer may write a transformation that depends on a filtered DataFrame, but if they reuse the original DataFrame variable (which hasn't been reassigned), the DAG may re-read the entire dataset. Example: df = spark.read.parquet(...); df = df.filter(...); df2 = df.groupBy(...); df2.count() — this is fine. But if someone writes df = spark.read.parquet(...); df_filtered = df.filter(...); df_filtered.show(); df.count() — the count action will re-read the full parquet and re-run all previous transformations because no cache was used. In production with large data, this causes double processing and severe slowdowns. The fix is to use cache() or checkpoint() when reusing DataFrames multiple times.
Q04 of 05SENIOR
What is the impact of data skew on a Spark SQL join, and how does AQE help?
ANSWER
Data skew occurs when key values are unevenly distributed, causing one or few partitions to hold a disproportionate amount of data. During a join, the skewed partition forces its executor to process far more data than others, leading to long tail tasks and potentially OOM. AQE (Adaptive Query Execution) in Spark 3.x can detect skewed partitions based on the shuffle statistics. When spark.sql.adaptive.skewJoin.enabled=true, AQE can split a skewed partition into smaller sub-partitions and join them separately, balancing the load. However, AQE can only split partitions that exceed the threshold after the shuffle write; it cannot fix skew in the original data distribution before the join. You still need to handle initial data skew via salting or broader loading strategies.
Q05 of 05SENIOR
You need to run a weekly aggregation that sums transactions per merchant. The merchants table has 50K rows for 2M merchants — but 10% of the transactions have a null merchant_id. How would you handle this?
ANSWER
First, investigate why merchant_id is null — data quality issue? If it's expected (e.g., anonymous purchases), decide whether to exclude or group as 'unknown'. For the aggregation, handle nulls explicitly: df.groupBy(F.coalesce(F.col("merchant_id"), F.lit("unknown")).alias("merchant_id")).agg(F.sum("amount").alias("total")). If the nulls are distributed evenly across partitions, no skew issue. But if many nulls exist, they all hash to the same default partition, causing skew on that key. Solution: use a random salt for the null key: withColumn("merchant_key", F.when(F.col("merchant_id").isNull(), F.concat(F.lit("unknown_"), (F.rand() * 10).cast("int"))).otherwise(F.col("merchant_id"))). Then aggregate on that grouped key and later sum the unknowns together. Also check the Spark UI for skew on the null partition.
01
You have a PySpark job where one groupBy stage consistently runs at 99% completion for two hours while all other tasks finish in five minutes. What's happening, how do you confirm it, and what are your mitigation options given both sides of the join are too large to broadcast?
SENIOR
02
When would you choose a SortMergeJoin over a BroadcastHashJoin in a production Spark pipeline, and what signals in the Spark UI would tell you that your current join strategy is wrong?
SENIOR
03
Explain lazy evaluation in Spark. Can you give an example where lazy evaluation causes a bug that only manifests in production?
SENIOR
04
What is the impact of data skew on a Spark SQL join, and how does AQE help?
SENIOR
05
You need to run a weekly aggregation that sums transactions per merchant. The merchants table has 50K rows for 2M merchants — but 10% of the transactions have a null merchant_id. How would you handle this?
SENIOR
FAQ · 5 QUESTIONS
Frequently Asked Questions
01
What is the difference between repartition() and coalesce() in PySpark?
repartition() causes a full shuffle to redistribute data evenly across the specified number of partitions. It can increase or decrease partition count. coalesce() only reduces the number of partitions by merging existing partitions without a full shuffle — it's an optimization (narrow transformation). Use coalesce() when you need fewer partitions and are okay with some imbalance; use repartition() when you need even distribution or want to increase partitions.
Was this helpful?
02
How do I handle null keys in a join that cause skew?
Null keys all hash to the same partition, causing severe skew. Options: (1) Filter out nulls before the join if they are invalid. (2) Replace null with a salted default key: concat("unknown_", floor(rand()*10)) to spread nulls across multiple partitions. (3) Use a separate join for null keys if they need special handling. Enable AQE skew join as a safety net.
Was this helpful?
03
When should I use a window function vs a groupBy aggregation?
Use window functions when you need to compute values per group without collapsing rows — e.g., running totals, ranks, or comparing current row to previous within the same group. Use groupBy when you want to aggregate rows into a single row per group (sum, count, avg). Window functions preserve row count; groupBy reduces it.
Was this helpful?
04
How can I debug a PySpark job that runs fine on small data but fails on large data?
Common causes: (1) Data skew that only appears at scale — check Spark UI task duration histogram. (2) Memory issues from larger data — increase executor memory or partitions. (3) Serialization/deserialization overhead from UDFs — check physical plan for PythonUDF. (4) Join key type mismatches that cause full shuffle — verify join plan with explain(). (5) Small files problem from default partition settings — target larger file sizes.
Was this helpful?
05
What is the optimal file size for parquet in Spark?
100-500MB per file. This balances parallelism (enough files for efficient reading) and metadata overhead (not too many small files). For S3, larger files reduce LIST request costs. Use AQE to control partition size via spark.sql.adaptive.advisoryPartitionSizeInBytes (e.g., 256mb).