Broadcast Joins — Eliminating Shuffles for Small Tables
Learn how PySpark broadcast joins work, when to use them, how to control the broadcast threshold, and the performance gains they provide.
Broadcast Joins — Eliminating Shuffles for Small Tables
What You'll Learn
- What a broadcast join is and how it works
- When to use broadcast joins vs regular joins
- How to force a broadcast with
broadcast() - How to check if Spark is auto-broadcasting
- The broadcast size threshold and how to tune it
The Problem: Shuffle Joins Are Expensive
In Lesson 15, we joined orders (large) with customers (small). In Lesson 20, we learned that joins trigger shuffles — both tables get redistributed across the cluster by the join key.
But think about it: if the customers table has only 5,000 rows (a few MB), why shuffle 100 million order rows? It's like moving an entire library to find one book.
The Solution: Send the Small Table Everywhere
A broadcast join copies the small table to every executor. Then each executor can join its local partition of the large table with the complete small table — no shuffle needed.
Regular join:
Orders (100M rows) ──shuffle──┐
├── Join
Customers (5K rows) ─shuffle──┘
Broadcast join:
Orders (100M rows) ─── no shuffle ─── Join
Customers (5K rows) ── copied to every executor
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast, col
spark = SparkSession.builder.appName("BroadcastJoin").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
# Large table: 100,000 orders
orders_data = [(i, i % 100, f"product_{i % 50}", i * 10.0) for i in range(100000)]
orders = spark.createDataFrame(orders_data, ["order_id", "customer_id", "product", "amount"])
# Small table: 100 customers
customers_data = [(i, f"Customer_{i}", f"City_{i % 10}") for i in range(100)]
customers = spark.createDataFrame(customers_data, ["customer_id", "name", "city"])
print(f"Orders: {orders.count()} rows")
print(f"Customers: {customers.count()} rows")
Using broadcast()
# Force broadcast of the small table
result = orders.join(broadcast(customers), on="customer_id", how="inner")
result.explain()
Expected Output (Physical Plan)
*(2) BroadcastHashJoin [customer_id], [customer_id], Inner, BuildRight
:- *(2) Scan ExistingRDD [order_id, customer_id, product, amount]
+- BroadcastExchange HashedRelationBroadcastMode(List(customer_id))
+- *(1) Scan ExistingRDD [customer_id, name, city]
Look for BroadcastHashJoin — that confirms the broadcast is happening. There's no Exchange hashpartitioning (shuffle) on the orders side. Only the small customers table is broadcast.
Compare this to a regular join:
# Without broadcast — both sides shuffle
result_regular = orders.join(customers, on="customer_id", how="inner")
result_regular.explain()
You'd see SortMergeJoin with Exchange hashpartitioning on BOTH sides — two shuffles instead of zero.
Auto-Broadcasting
Spark automatically broadcasts tables smaller than spark.sql.autoBroadcastJoinThreshold (default: 10 MB):
# Check the threshold
print(spark.conf.get("spark.sql.autoBroadcastJoinThreshold")) # 10485760 (10 MB)
If your customers table is under 10 MB, Spark broadcasts it automatically — you don't need broadcast(). But there are cases where you should be explicit:
# Force broadcast even if Spark doesn't detect the table is small
# (e.g., after a complex transformation, Spark may overestimate the size)
result = orders.join(broadcast(customers), on="customer_id")
# Increase the auto-broadcast threshold for larger lookup tables
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 50 * 1024 * 1024) # 50 MB
# Disable auto-broadcasting entirely
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
When to Use Broadcast Joins
Use broadcast when:
- One table is small enough to fit in executor memory (typically < 100 MB)
- The small table is a lookup/dimension table (products, countries, categories)
- You're joining a large fact table with a small dimension table
Don't use broadcast when:
- Both tables are large — broadcasting a 10 GB table to 100 executors means 1 TB of network transfer
- The small table changes frequently — it's broadcast once per query
- Memory is tight — each executor holds a full copy
Verifying Broadcast in the Spark UI
After running a broadcast join, check the Spark UI:
- Click on the job → the stage
- Look at the task metrics
- Shuffle Read should be zero (or very small) — that's the proof the broadcast worked
- You'll see a "broadcast" stage that's usually very fast (< 1 second for small tables)
Multiple Broadcast Joins
You can broadcast multiple small tables in the same query:
# Join orders with both products AND customers — broadcast both
result = orders \
.join(broadcast(products), on="product_id") \
.join(broadcast(customers), on="customer_id")
Each small table is broadcast independently. The orders table is never shuffled.
Common Mistakes
- Broadcasting a table that's too large. If you
broadcast()a 5 GB table, Spark sends 5 GB to every executor. With 50 executors, that's 250 GB of network transfer — worse than a shuffle. You'll also get out-of-memory errors on executors. - Not checking if auto-broadcast kicked in. Before manually adding
broadcast(), checkexplain()— Spark might already be broadcasting. Addingbroadcast()to an already-auto-broadcast join is harmless but unnecessary. - Forgetting that broadcast happens per query. If you run 100 queries that each broadcast the same lookup table, it's broadcast 100 times. For repeated use, cache the broadcast table first.
- Using broadcast with left/full outer joins incorrectly. In a left join, only the right side can be broadcast.
orders.join(broadcast(customers), how="left")works. Butbroadcast(orders).join(customers, how="left")won't broadcast because the left side of a left join can't be broadcast.
Key Takeaways
- Broadcast joins copy a small table to every executor, eliminating the shuffle on the large table.
- Spark auto-broadcasts tables under 10 MB. Use
broadcast()to force it for larger tables or when Spark's size estimate is wrong. - Broadcast joins turn expensive shuffle joins into fast, local joins — the single biggest join optimization.
- Only broadcast tables that fit in executor memory (typically < 100 MB).
- Check
explain()forBroadcastHashJointo verify the broadcast is happening.
Next Lesson
Broadcast joins solve the "small table" problem. But what about joins where both tables are large and the data is unevenly distributed? In Lesson 26: Data Skew, we'll tackle the hardest performance problem in Spark.