Building a Data Pipeline with Apache NiFi
June 15, 2020Calculate Resource Allocation for Spark Applications
July 13, 2020In this post we will see how to implement a Batch processing pipeline by moving data from Google Cloud Storage to Google Big Query using Cloud Dataflow.
Cloud Dataflow is a fully managed data processing service on Google Cloud Platform. Apache Beam SDK let us develop both BATCH as well as STREAM processing pipelines. We program our ETL/ELT flow and Beam let us run them on Cloud Dataflow using Dataflow Runner.
In this post, we will code the pipeline in Apache Bean and run the pipeline on Google Data Flow.
Code for this post can be found here.
Dataflow vs Apache Beam
Most of the time, people get confused in understanding what is Apache Beam and what is Cloud Dataflow. To understand how to write a pipeline, it is very important to understand what is the difference between the two.
Apache Beam is an open source framework to create Data processing pipelines (BATCH as well as STREAM processing). The pipeline is then executed by one of Beam’s supported distributed processing back-ends, which include Apache Apex, Apache Flink, Apache Spark, and Google Cloud Dataflow.
Interested in getting in to Big Data? check out our Hadoop Developer In Real World course for interesting use case and real world projects just like what you are reading.
Benefits of Cloud Dataflow
- Horizontal autoscaling of worker nodes
- Fully Managed Service
- Monitor the pipeline anytime during its execution
- Reliable and consistent processing
What is Google Cloud Storage?
Google Cloud Storage is a service for storing your objects. An object is an immutable piece of data consisting of a file of any format. You store objects in containers called buckets. All buckets are associated with a project. You can compare GCS buckets with Amazon S3 buckets.
What is a Big Query?
Big Query is a highly scalable, cost-effective data warehouse solution on Google Cloud Platform.
Benefits of Big Query
- Analyze petabytes of data using ANSI SQL queries.
- Access data and share insights with ease
- More secure platform that scales with your needs
Batch processing from Google Cloud Storage to Big Query
Architecture Design
This is how the pipeline flow will look like. Here, the source is Google Cloud Storage Bucket and the sink is Big Query. Big Query is a Data Warehouse offering on Google Cloud Platform.
As you can see in the above screenshot, this is how the data in Google Cloud Storage bucket will look like. We have data in the form of JSON files which we will push in Big Query.
Initiate and Configure the Pipeline
The very first step is to configure the pipeline configuration. We have to set what machine type the pipeline will use, in which available region, the pipeline will execute and so on.
We can program our pipeline in JAVA or Python. First, we have to set up the Dataflow Pipeline Options object where we will define the configuration of our pipeline.
We have used Direct Runner to execute and test the pipeline locally.
options.setRunner(DirectRunner.class);
Once we test it locally, then we can replace Direct Runner with Dataflow Runner. That’s all that we need to deploy our pipeline on Cloud Dataflow.
options.setRunner(DataflowRunner.class);
Apart from this, we also need to pass other configurations too to the pipeline like project id, max number of worker nodes, temp location, staging location, worker machine type, region where our pipeline will be deployed, etc.
Create Pipeline
After passing all the configurations to the Dataflow Pipeline Options object, then we will create our Pipeline object.
Refer below snippet to take a closer look at it.
public class StorageToBQBatchPipeline { public static void main(String[] args) { /* * Initialize Pipeline Configurations */ DataflowPipelineOptions options = PipelineOptionsFactory .as(DataflowPipelineOptions.class); options.setRunner(DirectRunner.class); options.setProject(""); options.setStreaming(true); options.setTempLocation(""); options.setStagingLocation(""); options.setRegion(""); options.setMaxNumWorkers(1); options.setWorkerMachineType("n1-standard-1"); Pipeline pipeline = Pipeline.create(options);
Like what you are reading? You would like our free live webinars too. Sign up and get notified when we host webinars =>
Processing Data from Source (Google Cloud Storage)
Here, the source of reading the data is Google Cloud Storage Buckets, Once we create the pipeline object.
/* * Read files from GCS buckets */ PCollection<ReadableFile> data = pipeline .apply(FileIO.match().filepattern("gs://dump_*")) .apply(FileIO.readMatches()); /* * Create Tuple Tag to process passed as well as failed records while parsing in ParDo functions */ final TupleTag<KV<String, String>> mapSuccessTag = new TupleTag<KV<String, String>>() { private static final long serialVersionUID = 1L; }; final TupleTag<KV<String, String>> mapFailedTag = new TupleTag<KV<String, String>>() { private static final long serialVersionUID = 1L; }; PCollectionTuple mapTupleObj = data.apply(ParDo.of(new MapTransformation(mapSuccessTag, mapFailedTag)) .withOutputTags(mapSuccessTag, TupleTagList.of(mapFailedTag))); PCollection<KV<String, String>> map = mapTupleObj.get(mapSuccessTag).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
FileIO.match().filepattern("gs://dump_*")
FileIO is the connector which is built in Apache Beam SDK that lets you read files from GCS.
We have used ParDo functions to first convert File IO objects into Key Value pair Objects as you can see below.
Using Tuple Tags, we will make sure that we process only correct results to the next step. The failed records will be processed separately using failed tuple tags if we face any kind of exception during processing of records.
/* * Convert File to KV Object */ private static class MapTransformation extends DoFn<FileIO.ReadableFile,KV<String, String>> { static final long serialVersionUID = 1L; TupleTag<KV<String, String>> successTag; TupleTag<KV<String, String>> failedTag; public MapTransformation(TupleTag<KV<String, String>> successTag, TupleTag<KV<String, String>> failedTag) { this.successTag = successTag; this.failedTag = failedTag; } @ProcessElement public void processElement(ProcessContext c) { FileIO.ReadableFile f = c.element(); String fileName = f.getMetadata().resourceId().toString(); String fileData = null; try { fileData = f.readFullyAsUTF8String(); c.output(successTag,KV.of(fileName, fileData)); } catch (Exception e) { c.output(failedTag, KV.of(e.getMessage(), fileName)); } } }
Pushing Data to Destination (Google Big Query)
At this step, we will clean every Key Value pair and can do any kind of transformation as per the use case or requirement. In this case, we are directly pushing records to Big Query.
Interested in getting in to Big Data? check out our Spark Developer In Real World course for interesting use case and real world projects just like what you are reading.
Before pushing the records in BQ, we will have to first convert Key value pairs to Big Query Table Row objects.
See below snippet for that.
/* * Convert KV to Table Row */ private static class TableRowTransformation extends DoFn<KV<String, String>, TableRow> { static final long serialVersionUID = 1L; TupleTag<TableRow> successTag; TupleTag<TableRow> failedTag; public TableRowTransformation(TupleTag<TableRow> successTag, TupleTag<TableRow> failedTag) { this.successTag = successTag; this.failedTag = failedTag; } @ProcessElement public void processElement(ProcessContext c) { try { KV<String, String> kvObj = c.element(); TableRow tableRow = new TableRow(); tableRow.set(kvObj.getKey(), kvObj.getValue()); c.output(successTag,tableRow); } catch (Exception e) { TableRow tableRow = new TableRow(); c.output(failedTag, tableRow); } } }
Once we convert the objects into Table Row objects, then using the built- in Big Query connector in Apache Beam SDK, you can push records into the table.
As we can see, we have a bunch of options in BQ connector. We have to pass the table name, where the records will be saved.
/* * Push records to BQ. */ rowObj.apply(BigQueryIO.writeTableRows() .to("options.getOutputTable()") .ignoreUnknownValues() .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); pipeline.run().waitUntilFinish();
Once the pipeline gets deployed, we can see the monitoring details on the right hand side. As you can see below images, the config that we have passed into the pipeline is visible there.
As Dataflow is a managed offering by Google Cloud Platform, we can define the auto scaling algorithm as well the pipeline.
The monitoring section will let us know how many worker machines are currently in use, what will be the CPU utilization of the pipeline and so on.