How to find the number of partitions in a DataFrame?
April 20, 2022How to make Hive recursively read files from all the sub directories?
May 11, 2022Let’s say we have a DataFrame like below.
+---------+-------+---------------+ | Project| Name|Cost_To_Project| +---------+-------+---------------+ |Ingestion| Jerry| 1000| |Ingestion| Arya| 2000| |Ingestion| Emily| 3000| | ML| Riley| 9000| | ML|Patrick| 1000| | ML| Mickey| 8000| |Analytics| Donald| 1000| |Ingestion| John| 1000| |Analytics| Emily| 8000| |Analytics| Arya| 10000| | BI| Mickey| 12000| | BI| Martin| 5000| +---------+-------+---------------+
We would like to add an index column with a unique incrementing value like below.
+---------+-------+---------------+-----+ | Project| Name|Cost_To_Project|index| +---------+-------+---------------+-----+ |Ingestion| Jerry| 1000| 0| |Ingestion| Arya| 2000| 1| |Ingestion| Emily| 3000| 2| | ML| Riley| 9000| 3| | ML|Patrick| 1000| 4| | ML| Mickey| 8000| 5| |Analytics| Donald| 1000| 6| |Ingestion| John| 1000| 7| |Analytics| Emily| 8000| 8| |Analytics| Arya| 10000| 9| | BI| Mickey| 12000| 10| | BI| Martin| 5000| 11| +---------+-------+---------------+-----+
There are few options to implement this use case in Spark. Let’s see them one by one.
Option 1 – Using monotonically_increasing_id function
Spark comes with a function named monotonically_increasing_id which creates a unique incrementing number for each record in the DataFrame.
val data = Seq( ("Ingestion", "Jerry", 1000), ("Ingestion", "Arya", 2000), ("Ingestion", "Emily", 3000), ("ML", "Riley", 9000), ("ML", "Patrick", 1000), ("ML", "Mickey", 8000), ("Analytics", "Donald", 1000), ("Ingestion", "John", 1000), ("Analytics", "Emily", 8000), ("Analytics", "Arya", 10000), ("BI", "Mickey", 12000), ("BI", "Martin", 5000)) import spark.sqlContext.implicits._ val df = data.toDF("Project", "Name", "Cost_To_Project") df.show() val dfIndex = df.withColumn("index", monotonically_increasing_id) dfIndex.show +---------+-------+---------------+-----+ | Project| Name|Cost_To_Project|index| +---------+-------+---------------+-----+ |Ingestion| Jerry| 1000| 0| |Ingestion| Arya| 2000| 1| |Ingestion| Emily| 3000| 2| | ML| Riley| 9000| 3| | ML|Patrick| 1000| 4| | ML| Mickey| 8000| 5| |Analytics| Donald| 1000| 6| |Ingestion| John| 1000| 7| |Analytics| Emily| 8000| 8| |Analytics| Arya| 10000| 9| | BI| Mickey| 12000| 10| | BI| Martin| 5000| 11| +---------+-------+---------------+-----+
Let’s now assume we have a DataFrame which is divided into 4 partitions. Let’s repartition the data with 4 partitions and apply monotonically_increasing_id function.
You can see in the below output that the index id is not sequential anymore. This is because with the monotonically_increasing_id, generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits.
val dfRepartitionIndex = df.repartition(4).withColumn("monotonically_increasing_id", monotonically_increasing_id) dfRepartitionIndex.show +---------+-------+---------------+---------------------------+ | Project| Name|Cost_To_Project|monotonically_increasing_id| +---------+-------+---------------+---------------------------+ |Analytics| Donald| 1000| 0| |Analytics| Arya| 10000| 1| |Ingestion| Arya| 2000| 2| | ML| Mickey| 8000| 8589934592| |Ingestion| Jerry| 1000| 8589934593| | ML| Riley| 9000| 8589934594| |Ingestion| Emily| 3000| 17179869184| |Analytics| Emily| 8000| 17179869185| | BI| Martin| 5000| 17179869186| |Ingestion| John| 1000| 25769803776| | BI| Mickey| 12000| 25769803777| | ML|Patrick| 1000| 25769803778| +---------+-------+---------------+---------------------------+
Option 2 – Using row_number function
Here we are using row_number window function over a global ordering of the dataframe. This will give us a sequential unique, incremental id.
val windowSpec = Window.orderBy("Name") df.withColumn("index", row_number.over(windowSpec)).show() +---------+-------+---------------+-----+ | Project| Name|Cost_To_Project|index| +---------+-------+---------------+-----+ |Ingestion| Arya| 2000| 1| |Analytics| Arya| 10000| 2| |Analytics| Donald| 1000| 3| |Ingestion| Emily| 3000| 4| |Analytics| Emily| 8000| 5| |Ingestion| Jerry| 1000| 6| |Ingestion| John| 1000| 7| | BI| Martin| 5000| 8| | ML| Mickey| 8000| 9| | BI| Mickey| 12000| 10| | ML|Patrick| 1000| 11| | ML| Riley| 9000| 12| +---------+-------+---------------+-----+
Option 3 – zipWithIndex function
We can convert the DataFrame to RDD and then apply the zipWithIndex function. This will result in an Array with the records in RDD as Row and then the index.
Seems like an overkill when you don’t need to use RDD and if you have to further unnest to fetch the individual columns.
df.rdd.zipWithIndex.collect res39: Array[(org.apache.spark.sql.Row, Long)] = Array(([Ingestion,Jerry,1000],0), ([Ingestion,Arya,2000],1), ([Ingestion,Emily,3000],2), ([ML,Riley,9000],3), ([ML,Patrick,1000],4), ([ML,Mickey,8000],5), ([Analytics,Donald,1000],6), ([Ingestion,John,1000],7), ([Analytics,Emily,8000],8), ([Analytics,Arya,10000],9), ([BI,Mickey,12000],10), ([BI,Martin,5000],11))
Which option to use?
Use monotonically_increasing_id if you don’t need the index to be sequential
Use row_number if you need index to be sequential
Use zipWithIndex if you are dealing with RDD and you don’t want to convert them to DataFrame.