How to get a few lines of data from a file in HDFS?
September 13, 2021How does Spark decide stages and tasks during execution of a Job?
September 17, 2021Accumulators are like global variables in Spark application. In the real world, accumulators are used as counters and keep to keep track of something at an application level. Accumulators serve a very similar purpose as counters in MapReduce.
Do you like us to send you a 47 page Definitive guide on Spark join algorithms? ===>
Example
Let’s say we have a sales dataset and we want to count the number of “bad” records. We consider a record to be bad when the sale_quantity column is 0. Because a sale record doesn’t make sense with 0 quantity.
First we register a Long accumulator variable named badRecords.
scala> val badRecords = spark.sparkContext.longAccumulator("badRecords") badRecords: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 28, name: Some(badRecords), value: 0)
We can call the value() on the accumulator and we get the value as 0
scala> badRecords.value res2: Long = 0
Let’s now create a quick list of sale record and create a dataframe out of the data. You can see that record number 2 and 5 have 0 quantity and we consider them as “bad” records.
scala> val data = Seq( | (1, "Shirt", 20, 2), | (2, "Pants", 30, 0), | (3, "Skirt", 40, 3), | (4, "Hat", 10, 4), | (5, "Shirt", 20, 0) | ) data: Seq[(Int, String, Int, Int)] = List((1,Shirt,20,2), (2,Pants,30,0), (3,Skirt,40,3), (4,Hat,10,4), (5,Shirt,20,0)) scala> val columns = Seq("sale_id","sale_item","sale_price", "sale_quantity") columns: Seq[String] = List(sale_id, sale_item, sale_price, sale_quantity) scala> val df = data.toDF(columns:_*) df: org.apache.spark.sql.DataFrame = [sale_id: int, sale_item: string ... 2 more fields]
Here we are calling the foreach() on our dataframe and when we encounter 0 quantity in the sales records. We call the add() to increment the badRecords accumulator count by 1. Now when we print the value of the accumulator we get 2. Which is correct.
scala> df.foreach(row => { | if(row.getInt(3) == 0) badRecords.add(1) | } | ) scala> badRecords.value res4: Long = 2
When to use accumulators?
Now we know the function of accumulators, let’s look at the correct way to use accumulators. If you notice, we have used accumulators inside the foreach(). foreach() is an action and action functions are the right location to use accumulators.
We have a separate post on foreach() and click here to learn more about foreach().
foreach() are when to update the state of external variables and systems aka. it causes side effects and foreach() is an appropriate place to update accumulators and more over foreach() is an action function and not a transformation function and hence it is the correct place to manipulate accumulators.
When NOT to use accumulators?
Accumulators should not be used inside map() functions doing so can have unintended consequences.
Spark can rerun a task in a few instances –
- When a task encounters an exception, Spark will re-execute the task 4 times by default.
- If an executor crashes, Spark will re execute the tasks
- If a task is running slow, Spark can rerun another copy of the task and this is called speculative execution. It only takes results from the task which completes first.
When a task re-execute, it will execute all the transformation functions in the task and this causes the accumulator value which was already manipulated by the first execution of the task to get manipulated again causing duplication in the accumulator’s result.
Due to this reason, always include code related to accumulator in action functions like foreach(). Spark will not complain at compile time or runtime when you include code related to accumulator in transformation functions like map() so make sure to keep this point in mind when you deal with accumulators.