How to force Spark to use Shuffle Hash Join when it defaults to Sort Merge Join? - Big Data In Real World

How to force Spark to use Shuffle Hash Join when it defaults to Sort Merge Join?

How to migrate an Amazon S3 bucket from one region to another?
February 17, 2021
What is the difference between Query and Filter in Elasticsearch?
February 22, 2021
How to migrate an Amazon S3 bucket from one region to another?
February 17, 2021
What is the difference between Query and Filter in Elasticsearch?
February 22, 2021

Take a look at the below execution plan. Currently when you print the executed plan, you see that Spark is using Sort Merge Join.

scala> dfJoined.queryExecution.executedPlan

res3: org.apache.spark.sql.execution.SparkPlan =
*(3) SortMergeJoin [id1#3], [id2#8], Inner
:- *(1) Sort [id1#3 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id1#3, 200)
:     +- LocalTableScan [id1#3]
+- *(2) Sort [id2#8 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id2#8, 200)
      +- LocalTableScan [id2#8]

Do you like us to send you a 47 page Definitive guide on Spark join algorithms? ===>

But you prefer it to use a Shuffle Hash Join because you know your dataset is equally distributed and one side can be hashed without resulting in Out of Memory exception.

scala> dfJoined.queryExecution.executedPlan

res4: org.apache.spark.sql.execution.SparkPlan =
ShuffledHashJoin [id1#3], [id2#8], Inner, BuildRight
:- Exchange hashpartitioning(id1#3, 200)
:  +- LocalTableScan [id1#3]
+- Exchange hashpartitioning(id2#8, 200)
   +- LocalTableScan [id2#8]

There are 3 important properties that need to be met before Spark chooses to perform Shuffled Hash Join

spark.sql.join.preferSortMergeJoin

Make sure spark.sql.join.preferSortMergeJoin  is set to false.

spark.conf.set("spark.sql.join.preferSortMergeJoin", false)

 

spark.sql.autoBroadcastJoinThreshold

To perform a Shuffle Hash Join the individual partitions should be small enough to build a hash table or else you would result in Out Of Memory exception.

Set spark.sql.autoBroadcastJoinThreshold  to a very small number.

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 2)

 

Datasets size

Spark expects the smaller dataset to be 3 times smaller than the other (bigger) dataset. So if your datasets are somewhat equal in size and even if they are smaller you will not be able to perform a Shuffle Hash Join.

Big Data In Real World
Big Data In Real World
We are a group of Big Data engineers who are passionate about Big Data and related Big Data technologies. We have designed, developed, deployed and maintained Big Data applications ranging from batch to real time streaming big data platforms. We have seen a wide range of real world big data problems, implemented some innovative and complex (or simple, depending on how you look at it) solutions.

Comments are closed.

How to force Spark to use Shuffle Hash Join when it defaults to Sort Merge Join?
This website uses cookies to improve your experience. By using this website you agree to our Data Protection Policy.

Hadoop In Real World is now Big Data In Real World!

X