How to create a folder in Amazon S3 using AWS CLI?
January 27, 2021How to get a count of the number of documents in an Elasticsearch Index?
February 1, 2021Cartesian Product Join (a.k.a Shuffle-and-Replication Nested Loop) join works very similar to a Broadcast Nested Loop join except the dataset is not broadcasted.
Shuffle-and-Replication does not mean a “true” shuffle as in records with the same keys are sent to the same partition. Instead the entire partition of the dataset is sent over or replicated to all the partitions for a full cross or nested-loop join.
Do you like us to send you a 47 page Definitive guide on Spark join algorithms? ===>
Example
We are setting spark.sql.autoBroadcastJoinThreshold to -1 to disable broadcast.
scala> spark.conf.get("spark.sql.join.preferSortMergeJoin") res1: String = true scala> spark.conf.get("spark.sql.autoBroadcastJoinThreshold") res2: String = -1 scala> val data1 = Seq(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50) data1: Seq[Int] = List(10, 20, 20, 30, 40, 10, 40, 20, 20, 20, 20, 50) scala> val df1 = data1.toDF("id1") df1: org.apache.spark.sql.DataFrame = [id1: int] scala> val data2 = Seq(30, 20, 40, 50) data2: Seq[Int] = List(30, 20, 40, 50) scala> val df2 = data2.toDF("id2") df2: org.apache.spark.sql.DataFrame = [id2: int]
Note here we are trying to perform a non-equi join operation.
scala> val dfJoined = df1.join(df2, $"id1" >= $"id2") dfJoined: org.apache.spark.sql.DataFrame = [id1: int, id2: int]
When we see the plan that will be executed, we can see that CartesianProduct is used.
scala> dfJoined.queryExecution.executedPlan res3: org.apache.spark.sql.execution.SparkPlan = CartesianProduct (id1#3 >= id2#8) :- LocalTableScan [id1#3] +- LocalTableScan [id2#8] scala> dfJoined.show +---+---+ |id1|id2| +---+---+ | 20| 20| | 20| 20| | 30| 30| | 30| 20| | 40| 30| | 40| 20| | 40| 40| | 40| 30| | 40| 20| | 40| 40| | 20| 20| | 20| 20| | 20| 20| | 20| 20| | 50| 30| | 50| 20| | 50| 40| | 50| 50| +---+---+
Stages involved in a Cartesian Product Join
This join is executed all in one stage. Even though this join is also called Shuffle-and-Replication it does not mean a “true” shuffle as in records with the same keys are sent to the same partition. Instead the entire partition of the dataset is sent over or replicated to all the partitions for a full cross or nested-loop join.
Inner workings of a cartesian product Join
- Both datasets are read. All partitions from one of the dataset are sent to all partitions in the other dataset.
- Once partitions from both dataset are available on one side, a nested loop join is performed.
- If there are N records in one dataset and M records in the other dataset a nested loop is performed on N * M records.
When does Cartesian Product Join work?
- Works in both equi and non-equi joins
- Works only on inner like joins
When Cartesian Product Join doesn’t work?
- Doesn’t work on non inner like joins
- This is a very expensive join algorithm. Except load on the network and partitions are moved across the network.
- High possibility of Out of Memory exception.
Interested in learning about Shuffle Sort Merge join in Spark? – Click here.
2 Comments
[…] post How does Cartesian Product Join work in Spark? appeared first on Hadoop In Real […]
[…] How does Cartesian Product Join work in Spark? […]