How to convert RDD to DataFrame and Dataset in Spark? - Big Data In Real World

How to convert RDD to DataFrame and Dataset in Spark?

How to make an Amazon S3 bucket read-only?
February 8, 2021
How does Spark choose the join algorithm to use at runtime?
February 12, 2021
How to make an Amazon S3 bucket read-only?
February 8, 2021
How does Spark choose the join algorithm to use at runtime?
February 12, 2021

Let’s create RDD first. Below we are creating a RDD with some sample data.

scala>   val data = Seq(
     |     (1, "Shirt", 20, 2),
     |     (2, "Pants", 30, 0),
     |     (3, "Skirt", 40, 3),
     |     (4, "Hat", 10, 4),
     |     (5, "Shirt", 20, 0)
     |   )
data: Seq[(Int, String, Int, Int)] = List((1,Shirt,20,2), (2,Pants,30,0), (3,Skirt,40,3), (4,Hat,10,4), (5,Shirt,20,0))

scala> val rdd = sc.parallelize(data)
rdd: org.apache.spark.rdd.RDD[(Int, String, Int, Int)] = ParallelCollectionRDD[23] at parallelize at <console>:39

Do you like us to send you a 47 page Definitive guide on Spark join algorithms? ===>

RDD to DataFrame

Creating DataFrame without schema

Using toDF() to convert RDD to DataFrame

scala> import spark.implicits._
import spark.implicits._

scala> val df1 = rdd.toDF()
df1: org.apache.spark.sql.DataFrame = [_1: int, _2: string ... 2 more fields]

Using createDataFrame to convert RDD to DataFrame

scala> val df2 = spark.createDataFrame(rdd)
df2: org.apache.spark.sql.DataFrame = [_1: int, _2: string ... 2 more fields]

 

Creating DataFrame with schema

Using toDF with schema

scala> val df_colname = rdd.toDF("sale_id","sale_item","sale_price", "sale_quantity")
df_colname: org.apache.spark.sql.DataFrame = [sale_id: int, sale_item: string ... 2 more fields]

To use createDataFrame() to create a DataFrame with schema we need to create a Schema first and then convert RDD to RDD of type Row.

Creating schema using Struct Type

scala> import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType}
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType}

scala> val schema = StructType(
     |     StructField("sale_id", IntegerType) ::
     |     StructField("sale_item", StringType) ::
     |     StructField("sale_price", IntegerType) ::
     |     StructField("sale_quantity", IntegerType) :: Nil)
schema: org.apache.spark.sql.types.StructType = StructType(StructField(sale_id,IntegerType,true), 
StructField(sale_item,StringType,true), 
StructField(sale_price,IntegerType,true), 
StructField(sale_quantity,IntegerType,true))

Convert RDD to RDD[Row]

 

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val rowRDD = rdd.map(columns => Row(columns._1, columns._2, columns._3, columns._4))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[24] at map at <console>:43

 

Pass Row[RDD] and schema to createDataFrame to create DataFrame.

scala> val df2_schema = spark.createDataFrame(rowRDD, schema)
df2_schema: org.apache.spark.sql.DataFrame = [sale_id: int, sale_item: string ... 2 more fields]

scala> df2_schema.printSchema
root
 |-- sale_id: integer (nullable = true)
 |-- sale_item: string (nullable = true)
 |-- sale_price: integer (nullable = true)
 |-- sale_quantity: integer (nullable = true)

scala> df2_schema.show
+-------+---------+----------+-------------+
|sale_id|sale_item|sale_price|sale_quantity|
+-------+---------+----------+-------------+
|      1|    Shirt|        20|            2|
|      2|    Pants|        30|            0|
|      3|    Skirt|        40|            3|
|      4|      Hat|        10|            4|
|      5|    Shirt|        20|            0|
+-------+---------+----------+-------------+

RDD to Dataset

 

Creating Dataset without schema

 

scala> val ds = spark.createDataset(rdd)
ds: org.apache.spark.sql.Dataset[(Int, String, Int, Int)] = [_1: int, _2: string ... 2 more fields]

scala> ds.printSchema
root
 |-- _1: integer (nullable = false)
 |-- _2: string (nullable = true)
 |-- _3: integer (nullable = false)
 |-- _4: integer (nullable = false)

scala> ds.show
+---+-----+---+---+
| _1|   _2| _3| _4|
+---+-----+---+---+
|  1|Shirt| 20|  2|
|  2|Pants| 30|  0|
|  3|Skirt| 40|  3|
|  4|  Hat| 10|  4|
|  5|Shirt| 20|  0|
+---+-----+---+---+

 

Creating Dataset with Schema

 

Create a case class first with the same structure as sales data. Make sure to match the column name. Spark encoder will use the column name to map the data.

scala> case class Sales(sale_id: Int, sale_item: String, sale_price: Int, sale_quantity: Int)
defined class Sales

as[Sales] will encode the data in dataframe to Sales

scala> val ds_schema = df2_schema.as[Sales]
ds_schema: org.apache.spark.sql.Dataset[Sales] = [sale_id: int, sale_item: string ... 2 more fields]

scala> ds_schema.printSchema
root
 |-- sale_id: integer (nullable = true)
 |-- sale_item: string (nullable = true)
 |-- sale_price: integer (nullable = true)
 |-- sale_quantity: integer (nullable = true)

scala> ds_schema.show
+-------+---------+----------+-------------+
|sale_id|sale_item|sale_price|sale_quantity|
+-------+---------+----------+-------------+
|      1|    Shirt|        20|            2|
|      2|    Pants|        30|            0|
|      3|    Skirt|        40|            3|
|      4|      Hat|        10|            4|
|      5|    Shirt|        20|            0|
+-------+---------+----------+-------------+

 

Big Data In Real World
Big Data In Real World
We are a group of Big Data engineers who are passionate about Big Data and related Big Data technologies. We have designed, developed, deployed and maintained Big Data applications ranging from batch to real time streaming big data platforms. We have seen a wide range of real world big data problems, implemented some innovative and complex (or simple, depending on how you look at it) solutions.

Comments are closed.

How to convert RDD to DataFrame and Dataset in Spark?
This website uses cookies to improve your experience. By using this website you agree to our Data Protection Policy.

Hadoop In Real World is now Big Data In Real World!

X