What is the difference between Apache Pig and Hive?
May 21, 2021Can multiple Kafka consumers read the same message from a partition?
May 26, 2021Broadcast variables are variables which are available in all executors executing the Spark application. These variables are already cached and ready to be used by tasks executing as part of the application. Broadcast variables are sent to the executors only once and it is available for all tasks executing in the executors.
Do you like us to send you a 47 page Definitive guide on Spark join algorithms? ===>
When to use broadcast variables?
Let’s say you are working with an employee dataset. In the employee dataset you have a column to represent state. The state is represent with 2 letter notation i.e. NY for New York.
Now you want the output to print employee name and the state but you want the full name name of the state as opposed to the 2 letter notation.
There is a traditional way to solve this problem. Which is to maintain a small dataset with state 2 letter to full name mapping and join this dataset with the employee dataset, joining on the 2 letter state key. This will most certainly get the output you are looking for.
But there are few needless complications with the above approach.
- Join triggers a shuffle and for a big dataset it will be expensive
- Lot of data gets transferred over the network
- Shuffle will increase the execution time of the job.
Solution
Instead of using a join, form a Map (key value pair) with state 2 letter and state full name and broadcast the Map. Spark will serialize the data and will make the Map data available for all executors. The tasks can do a simple look up of 2 letters and state full name mapping instead of a join to get to the output.
When to NOT use broadcast variables
Use broadcast variables on smaller look up style data and not on big datasets. The size of the data that you are broadcasting should be in MBs and not in GBs.
How does Spark handle broadcast variables?
When Spark sees the use of a broadcast variable in your code, Spark will serialize the data and send it to all executors involved in your application. The broadcast variables are cached on the executor side and all tasks in the application will have access to the data in the broadcast variable.
Assume you have 10 executions and your applications execute 100 tasks in total. The broadcast variable will be sent to the 10 executors as opposed to 100 times. That’s a ten fold decrease in the amount of data that would have been transferred if we don’t use the broadcast variable.
Example
spark.sparkContext.broadcast(states) – broadcast the data to all executors
Here is how we retrieve the data from the broadcast variable- val stateName = bStates.value.get(state).get
scala> val states = Map(("NY","New York"),("IL","Illinois"),("CA","California")) states: scala.collection.immutable.Map[String,String] = Map(NY -> New York, IL -> Illinois, CA -> California) scala> val bStates = spark.sparkContext.broadcast(states) bStates: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[String,String]] = Broadcast(0) scala> val data = Seq( | ("John","Software Engineer","CA"), | ("Jerry","Project Manager","IL"), | ("Emily","Developer","NY") | ) data: Seq[(String, String, String)] = List((John,Software Engineer,CA), (Jerry,Project Manager,IL), (Emily,Developer,NY)) scala> import spark.sqlContext.implicits._ import spark.sqlContext.implicits._ scala> val columns = Seq("name","role","state") columns: Seq[String] = List(name, role, state) scala> val df = data.toDF(columns:_*) df: org.apache.spark.sql.DataFrame = [name: string, role: string ... 1 more field] scala> df.show(false) +-----+-----------------+-----+ |name |role |state| +-----+-----------------+-----+ |John |Software Engineer|CA | |Jerry|Project Manager |IL | |Emily|Developer |NY | +-----+-----------------+-----+ scala> val df2 = df.map(row =>{ | val state = row.getString(2) | val stateName = bStates.value.get(state).get | (row.getString(0),row.getString(1), stateName) | }).toDF(columns:_*) df2: org.apache.spark.sql.DataFrame = [name: string, role: string ... 1 more field] scala> df2.show(false) +-----+-----------------+----------+ |name |role |state | +-----+-----------------+----------+ |John |Software Engineer|California| |Jerry|Project Manager |Illinois | |Emily|Developer |New York | +-----+-----------------+----------+