Take a look at the below execution plan. Currently when you print the executed plan, you see that Spark is using Sort Merge Join.

scala> dfJoined.queryExecution.executedPlan

res3: org.apache.spark.sql.execution.SparkPlan =
*(3) SortMergeJoin [id1#3], [id2#8], Inner
:- *(1) Sort [id1#3 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(id1#3, 200)
:     +- LocalTableScan [id1#3]
+- *(2) Sort [id2#8 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(id2#8, 200)
      +- LocalTableScan [id2#8]

But you prefer it to use a Shuffle Hash Join because you know your dataset is equally distributed and one side can be hashed without resulting in Out of Memory exception.

scala> dfJoined.queryExecution.executedPlan

res4: org.apache.spark.sql.execution.SparkPlan =
ShuffledHashJoin [id1#3], [id2#8], Inner, BuildRight
:- Exchange hashpartitioning(id1#3, 200)
:  +- LocalTableScan [id1#3]
+- Exchange hashpartitioning(id2#8, 200)
   +- LocalTableScan [id2#8]

There are 3 important properties that need to be met before Spark chooses to perform Shuffled Hash Join


Make sure spark.sql.join.preferSortMergeJoin  is set to false.

spark.conf.set("spark.sql.join.preferSortMergeJoin", false)



To perform a Shuffle Hash Join the individual partitions should be small enough to build a hash table or else you would result in Out Of Memory exception.

Set spark.sql.autoBroadcastJoinThreshold  to a very small number.

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 2)


Datasets size

Spark expects the smaller dataset to be 3 times smaller than the other (bigger) dataset. So if your datasets are somewhat equal in size and even if they are smaller you will not be able to perform a Shuffle Hash Join.

