How to make an Amazon S3 bucket read-only?
February 8, 2021How does Spark choose the join algorithm to use at runtime?
February 12, 2021Let’s create RDD first. Below we are creating a RDD with some sample data.
scala> val data = Seq( | (1, "Shirt", 20, 2), | (2, "Pants", 30, 0), | (3, "Skirt", 40, 3), | (4, "Hat", 10, 4), | (5, "Shirt", 20, 0) | ) data: Seq[(Int, String, Int, Int)] = List((1,Shirt,20,2), (2,Pants,30,0), (3,Skirt,40,3), (4,Hat,10,4), (5,Shirt,20,0)) scala> val rdd = sc.parallelize(data) rdd: org.apache.spark.rdd.RDD[(Int, String, Int, Int)] = ParallelCollectionRDD[23] at parallelize at <console>:39
Do you like us to send you a 47 page Definitive guide on Spark join algorithms? ===>
RDD to DataFrame
Creating DataFrame without schema
Using toDF() to convert RDD to DataFrame
scala> import spark.implicits._ import spark.implicits._ scala> val df1 = rdd.toDF() df1: org.apache.spark.sql.DataFrame = [_1: int, _2: string ... 2 more fields]
Using createDataFrame to convert RDD to DataFrame
scala> val df2 = spark.createDataFrame(rdd) df2: org.apache.spark.sql.DataFrame = [_1: int, _2: string ... 2 more fields]
Creating DataFrame with schema
Using toDF with schema
scala> val df_colname = rdd.toDF("sale_id","sale_item","sale_price", "sale_quantity") df_colname: org.apache.spark.sql.DataFrame = [sale_id: int, sale_item: string ... 2 more fields]
To use createDataFrame() to create a DataFrame with schema we need to create a Schema first and then convert RDD to RDD of type Row.
Creating schema using Struct Type
scala> import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType} import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, DoubleType} scala> val schema = StructType( | StructField("sale_id", IntegerType) :: | StructField("sale_item", StringType) :: | StructField("sale_price", IntegerType) :: | StructField("sale_quantity", IntegerType) :: Nil) schema: org.apache.spark.sql.types.StructType = StructType(StructField(sale_id,IntegerType,true), StructField(sale_item,StringType,true), StructField(sale_price,IntegerType,true), StructField(sale_quantity,IntegerType,true))
Convert RDD to RDD[Row]
scala> import org.apache.spark.sql.Row import org.apache.spark.sql.Row scala> val rowRDD = rdd.map(columns => Row(columns._1, columns._2, columns._3, columns._4)) rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[24] at map at <console>:43
Pass Row[RDD] and schema to createDataFrame to create DataFrame.
scala> val df2_schema = spark.createDataFrame(rowRDD, schema) df2_schema: org.apache.spark.sql.DataFrame = [sale_id: int, sale_item: string ... 2 more fields] scala> df2_schema.printSchema root |-- sale_id: integer (nullable = true) |-- sale_item: string (nullable = true) |-- sale_price: integer (nullable = true) |-- sale_quantity: integer (nullable = true) scala> df2_schema.show +-------+---------+----------+-------------+ |sale_id|sale_item|sale_price|sale_quantity| +-------+---------+----------+-------------+ | 1| Shirt| 20| 2| | 2| Pants| 30| 0| | 3| Skirt| 40| 3| | 4| Hat| 10| 4| | 5| Shirt| 20| 0| +-------+---------+----------+-------------+
RDD to Dataset
Creating Dataset without schema
scala> val ds = spark.createDataset(rdd) ds: org.apache.spark.sql.Dataset[(Int, String, Int, Int)] = [_1: int, _2: string ... 2 more fields] scala> ds.printSchema root |-- _1: integer (nullable = false) |-- _2: string (nullable = true) |-- _3: integer (nullable = false) |-- _4: integer (nullable = false) scala> ds.show +---+-----+---+---+ | _1| _2| _3| _4| +---+-----+---+---+ | 1|Shirt| 20| 2| | 2|Pants| 30| 0| | 3|Skirt| 40| 3| | 4| Hat| 10| 4| | 5|Shirt| 20| 0| +---+-----+---+---+
Creating Dataset with Schema
Create a case class first with the same structure as sales data. Make sure to match the column name. Spark encoder will use the column name to map the data.
scala> case class Sales(sale_id: Int, sale_item: String, sale_price: Int, sale_quantity: Int) defined class Sales
as[Sales] will encode the data in dataframe to Sales
scala> val ds_schema = df2_schema.as[Sales] ds_schema: org.apache.spark.sql.Dataset[Sales] = [sale_id: int, sale_item: string ... 2 more fields] scala> ds_schema.printSchema root |-- sale_id: integer (nullable = true) |-- sale_item: string (nullable = true) |-- sale_price: integer (nullable = true) |-- sale_quantity: integer (nullable = true) scala> ds_schema.show +-------+---------+----------+-------------+ |sale_id|sale_item|sale_price|sale_quantity| +-------+---------+----------+-------------+ | 1| Shirt| 20| 2| | 2| Pants| 30| 0| | 3| Skirt| 40| 3| | 4| Hat| 10| 4| | 5| Shirt| 20| 0| +-------+---------+----------+-------------+