How to migrate an Amazon S3 bucket from one region to another?
February 17, 2021What is the difference between Query and Filter in Elasticsearch?
February 22, 2021Take a look at the below execution plan. Currently when you print the executed plan, you see that Spark is using Sort Merge Join.
scala> dfJoined.queryExecution.executedPlan res3: org.apache.spark.sql.execution.SparkPlan = *(3) SortMergeJoin [id1#3], [id2#8], Inner :- *(1) Sort [id1#3 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(id1#3, 200) : +- LocalTableScan [id1#3] +- *(2) Sort [id2#8 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(id2#8, 200) +- LocalTableScan [id2#8]
Do you like us to send you a 47 page Definitive guide on Spark join algorithms? ===>
But you prefer it to use a Shuffle Hash Join because you know your dataset is equally distributed and one side can be hashed without resulting in Out of Memory exception.
scala> dfJoined.queryExecution.executedPlan res4: org.apache.spark.sql.execution.SparkPlan = ShuffledHashJoin [id1#3], [id2#8], Inner, BuildRight :- Exchange hashpartitioning(id1#3, 200) : +- LocalTableScan [id1#3] +- Exchange hashpartitioning(id2#8, 200) +- LocalTableScan [id2#8]
There are 3 important properties that need to be met before Spark chooses to perform Shuffled Hash Join
spark.sql.join.preferSortMergeJoin
Make sure spark.sql.join.preferSortMergeJoin is set to false.
spark.conf.set("spark.sql.join.preferSortMergeJoin", false)
spark.sql.autoBroadcastJoinThreshold
To perform a Shuffle Hash Join the individual partitions should be small enough to build a hash table or else you would result in Out Of Memory exception.
Set spark.sql.autoBroadcastJoinThreshold to a very small number.
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 2)
Datasets size
Spark expects the smaller dataset to be 3 times smaller than the other (bigger) dataset. So if your datasets are somewhat equal in size and even if they are smaller you will not be able to perform a Shuffle Hash Join.