How to properly add jars to a Spark application?
January 6, 2021How to list all the available brokers in a Kafka cluster?
January 11, 2021Broadcast Nested Loop join works by broadcasting one of the entire datasets and performing a nested loop to join the data. So essentially every record from dataset 1 is attempted to join with every record from dataset 2.
As you could guess, Broadcast Nested Loop is not preferred and could be quite slow. It works for both equi and non-equi joins and it is picked by default when you have a non-equi join.
Do you like us to send you a 47 page Definitive guide on Spark join algorithms? ===>
Example
We don’t change the default values for both spark.sql.join.preferSortMergeJoin and spark.sql.autoBroadcastJoinThreshold .
scala> spark.conf.get("spark.sql.join.preferSortMergeJoin") res0: String = true scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold") res1: String = 10485760
scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50) data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50) scala> val df1 = data1.toDF("id1") df1: org.apache.spark.sql.DataFrame = [id1: int] scala> val data2 = Seq(30, 20, 40, 50) data2: Seq[Int] = List(30, 20, 40, 50) scala> val df2 = data2.toDF("id2") df2: org.apache.spark.sql.DataFrame = [id2: int]
Note here we are trying to perform a non-equi join operation.
scala> val dfJoined = df1.join(df2, $"id1" >= $"id2") dfJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
When we see the plan that will be executed, we can see that BroadcastNestedLoopJoin is used.
scala> dfJoined.queryExecution.executedPlan res2: org.apache.spark.sql.execution.SparkPlan = BroadcastNestedLoopJoin BuildRight, Inner, (id1#3 >= id2#8) :- LocalTableScan [id1#3] +- BroadcastExchange IdentityBroadcastMode +- LocalTableScan [id2#8] scala> dfJoined.show +---+---+ |id1|id2| +---+---+ | 20| 20| | 20| 20| | 30| 30| | 30| 20| | 40| 30| | 40| 20| | 40| 40| | 40| 30| | 40| 20| | 40| 40| | 20| 20| | 20| 20| | 20| 20| | 20| 20| | 50| 30| | 50| 20| | 50| 40| | 50| 50| +---+---+
Stages involved in a Broadcast Nested Loop Join
Broadcast Nested Loop join does not involve a shuffle or a sort. Smallest dataset of the two will be broadcasted to all partitions and a nested loop is performed between the 2 datasets to perform the join. Every record from dataset 1 is attempted to join with every record from dataset 2.
Internal workings of Broadcast Nested Loop Join
There are 2 phases in a Broadcast Nested Loop Join.
Broadcast phase
- Smallest dataset is broadcasted to all executors or tasks processing the bigger dataset
- Left side will be broadcasted in a right outer join.
- Right side in a left outer, left semi, left anti or existence join will be broadcasted.
- Either side can be broadcasted in an inner-like join.
Nested Loop Join phase
- Once the dataset is broadcasted, every record from one dataset is attempted to join with every record from another dataset in a nested loop.
- Since this join is used for non-equi conditions, the iteration can not stop as soon as a match is encountered like in Sort Merge Join. The iteration will go through the entire dataset.
- Note that a sort is not involved in this join.
When does Broadcast Nested Loop Join work?
- Works for both equi and non-equi joins
- Works for all join types
When Broadcast Nested Loop Join doesn’t work?
- This join is slow
- This join will not work when either sides are big enough for broadcasting and you could see Out Of Memory exceptions.
Interested in learning about Shuffle Hash join in Spark? – Click here.
3 Comments
[…] Interested in learning about Broadcast Nested Loop Join in Spark? – Click here. […]
[…] Click here if you like to understand the internal workings of Broadcast Nested Loop join. […]
[…] How does Broadcast Nested Loop join in Spark? […]