How to kill multiple YARN applications at once?
June 5, 2023How to recursively delete files, folders or bucket from S3?
June 19, 2023In this post we are going to create a Spark UDF which converts temperature from Fahrenheit to Celsius.
Here is our data. We have day and temperature in Fahrenheit.
val data = Seq( ("Sunday", 50), ("Monday", 60), ("Tuesday", 65), ("Wednesday", 70), ("Thursday", 85), ("Friday", 25), ("Saturday", 15)) import spark.sqlContext.implicits._ val df = data.toDF("Day", "Temp_Fahrenheit") df.show() +---------+---------------+ | Day|Temp_Fahrenheit| +---------+---------------+ | Sunday| 50| | Monday| 60| | Tuesday| 65| |Wednesday| 70| | Thursday| 85| | Friday| 25| | Saturday| 15| +---------+---------------+
Create an UDF
UDF is nothing but a function. Here we have a function which takes in celsius in Double format and changes that value to Fahrenheit.
val fahrenheitToCelcius = (fahrenheit :Double) => { (fahrenheit - 32) * 0.55 }
Register a UDF
Here we register the UDF. This makes the function we created an UDF which can be used in Spark. As you can see this will return UserDefinedFunction.
val ftoCUDF = udf(fahrenheitToCelcius) ftoCUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(DoubleType)))
Use the UDF
Next we can use UDF like another other Spark functions.
df.withColumn("Temp_Celcius", ftoCUDF(col("Temp_Fahrenheit"))).show +---------+---------------+-------------------+ | Day|Temp_Fahrenheit| Temp_Celcius| +---------+---------------+-------------------+ | Sunday| 50| 9.9| | Monday| 60| 15.400000000000002| | Tuesday| 65| 18.150000000000002| |Wednesday| 70| 20.900000000000002| | Thursday| 85| 29.150000000000002| | Friday| 25|-3.8500000000000005| | Saturday| 15| -9.350000000000001| +---------+---------------+-------------------+
UDFs with Spark SQL
Using UDFs with Spark SQL is slightly different from the syntactical standpoint.
Here we use spark.udf.register to register the UDF and the UDF will be used in the SELECT in a standard way like using any other functions.
spark.udf.register("ftoCUDF", fahrenheitToCelcius) df.createOrReplaceTempView("days_temp") spark.sql("select Day, Temp_Fahrenheit, ftoCUDF(Temp_Fahrenheit) from days_temp").show(false)
Full code
val fahrenheitToCelcius = (fahrenheit :Double) => { (fahrenheit - 32) * 0.55 } val ftoCUDF = udf(fahrenheitToCelcius) ftoCUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(DoubleType))) val data = Seq( ("Sunday", 50), ("Monday", 60), ("Tuesday", 65), ("Wednesday", 70), ("Thursday", 85), ("Friday", 25), ("Saturday", 15)) import spark.sqlContext.implicits._ val df = data.toDF("Day", "Temp_Fahrenheit") df.show() df.withColumn("Temp_Celcius", ftoCUDF(col("Temp_Fahrenheit"))).show --with SQL spark.udf.register("ftoCUDF", fahrenheitToCelcius) df.createOrReplaceTempView("days_temp") spark.sql("select Day, Temp_Fahrenheit, ftoCUDF(Temp_Fahrenheit) from days_temp").show(false)