What is the difference between sync and cp operations in S3?
May 11, 2023How to add total count of DataFrame to an already grouped DataFrame?
May 22, 2023If your organization is working with lots of data you might be leveraging Spark to compute distribution. You could also potentially have some or all your data in a Snowflake data warehouse.
In a situation like this, you might have to expose data in Snowflake to the processes that run on Spark. This is made possible using the Spark Connector for Snowflake.
In this post, we will see what is Spark connector for Snowflake and how to use it from Spark to connect to Snowflake and access data from Snowflake in your Spark cluster.
In one of our previous posts, we showed how to use SnowSQL client to work with Snowflake. Check out that post here if you are interested.
Architecture
Spark connector can be used to both –
- Populate a Spark DataFrame from a table (or query) in Snowflake.
- Write the contents of a Spark DataFrame to a table in Snowflake.
Here is how the Snowflake connector for Spark work. Spark communicates to Snowflake via JDBC driver using the Snowflake connector.
The intermediary data from Snowflake can be kept internally (staged) on the Snowflake cluster or it can be kept externally on a Azure blob storage or AWS S3. Snowflake recommends to keep the data internally staged on the Snowflake cluster.
Spark Connector Installation
We need 2 packages to make the Spark connector for Snowflake to work – snowflake-jdbc and spark-snowflake.
We are going to see how to connect to Snowflake from a Spark shell.
In the below spark-shell command we specify the maven coordinates for the 2 packages. This will download the packages and the connector libraries will be made available for us in our spark session.
spark-shell --packages net.snowflake:snowflake-jdbc:3.13.22,net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3 --master "local[2]"
Connect to Snowflake
To connect to Snowflake we create a simple Map with the connection properties like account URL, login and password.
We are also specifying the database, schema and the warehouse information for the table we are going to query later.
import org.apache.spark.sql._ import net.snowflake.spark.snowflake.Utils.SNOWFLAKE_SOURCE_NAME // // Configure your Snowflake environment // var sfOptions = Map( "sfURL" -> "xyz-abc.snowflakecomputing.com", "sfUser" -> "bigdatainrealworld", "sfPassword" -> "password", "sfDatabase" -> "COVID19_EPIDEMIOLOGICAL_DATA", "sfSchema" -> "PUBLIC", "sfWarehouse" -> "COMPUTE_WH" )
You can find the account URL by logging in to your Snowflake account.
Query data from Snowflake
Now that we have defined the connection properties, we can define a Dataframe pointing to the table in Snowflake.
val sqlContext = new org.apache.spark.sql.SQLContext(sc) // // Create a DataFrame from a Snowflake table // val df: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("dbtable", "ECDC_GLOBAL") .load()
When we execute the above code we can see the DataFrame schema reflects that of the table in Snowflake. Spark is actually able to infer the schema from Snowflake without us have to explicitly define it.
sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@38fdbc9 df: org.apache.spark.sql.DataFrame = [COUNTRY_REGION: string, CONTINENTEXP: string ... 9 more fields]
The COVID related dataset we are working with is available in Snowflake for free. For more information on how to load this dataset check one of our previous posts here.
Now that we have a Dataframe representing the data in the Snowflake table, let’s run a simple group by query on the table using Spark SQL.
val df2: DataFrame = sqlContext.read .format(SNOWFLAKE_SOURCE_NAME) .options(sfOptions) .option("query", "SELECT COUNTRY_REGION, SUM(CASES) AS Cases FROM ECDC_GLOBAL GROUP BY COUNTRY_REGION") .load() scala> df2.show(10) +-------------------+---------+ | COUNTRY_REGION| CASES| +-------------------+---------+ | Afghanistan| 49273.0| | Albania| 48530.0| | Algeria| 92102.0| | Andorra| 7338.0| | Angola| 16188.0| | Anguilla| 10.0| |Antigua and Barbuda| 148.0| | Argentina|1498160.0| | Armenia| 148682.0| | Aruba| 5049.0| +-------------------+---------+ only showing top 10 rows
That is it. Working with Spark and Snowflake is possible and made easy by the Snowflake Connector for Spark.
1 Comment
[…] The Big Data in Real World team crosses data platforms: […]