DOWNLOAD : The 2018 Gartner Magic Quadrant for Data Integration Tools

How to Develop a Data Processing Job Using Apache Beam

How to Develop a Data Processing Job Using Apache Beam

  • Alexey Romanenko
    Alexey Romanenko is Open Source Engineer in Talend (France) with more than 15 years of experience in software development. During his career, he has been working on different projects, like high-load web services, web search engines and cloud storage. Also, he developed and presented a course devoted to Hadoop/Cloud technologies for students. Recently, he joined the Apache Beam project as a new contributor. He spends his spare time with his family and he likes to play ice hockey.
  • April 23, 2018

 

This blog post is part 1 of a series of blog posts on Apache Beam.

Are you familiar with Apache Beam? If not, don’t be ashamed, as one of the latest projects developed by the Apache Software Foundation and first released in June 2016, Apache Beam is still relatively new in the data processing world. As a matter of fact, it wasn’t until recently when I started to work closely with Apache Beam, that I loved to learn and learned to love everything about it.

Apache Beam is a unified programming model that provides an easy way to implement batch and streaming data processing jobs and run them on any execution engine using a set of different IOs. Sounds promising but still confusing? This is why I decided to launch a series of blog posts on Apache Beam. In this post, and in the following ones, I’ll show concrete examples and highlight several use cases of data processing jobs using Apache Beam.

Our topic for today is batch processing. Let’s take the following example: You work for a car dealership and want to analyze car sales over a given period of time (e.g. how many cars of each brand were sold?). This means that our data set is bounded (finite amount of data) and it won’t be updated (the sales happened in the past). In this case, we can rely on a batch process to analyze our data.

As an input data, we have text logs of sold cars in the following format:

id,brand_name,model_name,sales_number

For example:
1,Toyota,Prius,3
2,Nissan,Sentra,2
3,Ford,Fusion,4

Before starting implementation of our first Beam application, we need to get aware of some core ideas that will be used later all the time. There are three main conceptions in Beam: Pipeline, PCollection, and PTransform.

  • Pipeline encapsulates the workflow of your entire data processing tasks from start to finish. 
  • PCollection is a distributed dataset abstraction that Beam uses to transfer data between PTransforms
  • PTransform is a process that operates with input data (input PCollection) and produces output data (output PCollection). Usually, the first and the last PTransformsrepresent a way to input/output data which can be bounded (batch processing) or unbounded (streaming processing).

To simplify things, we can consider Pipeline as DAG (directed acyclic graph) which represents your whole workflow, PTransforms as nodes (that transform the data) and PCollections as edges of this graph. More information can be found in the Beam Programming Guide.

Now, let’s get back to our example and try to implement the first pipeline which will process provided data set.

Creating a pipeline

First, just create a new pipeline:

Pipeline pipeline = Pipeline.create();

Then, let’s create a new PTransform using the pipeline.apply() method which will read data from text file and create a new PCollection of strings. To do this, we use one of the already implemented IOs in Beam - TextIO. TextIO allows to read from and write into text file(s) line by line. It has many other features, like working with different file systems, supporting file patterns, streaming of files. For more information, see the Apache Beam documentation.

apply(TextIO.read().from(/path/to/input/file))


The output of this PTransform is a new instance of PCollection<String> where every entry of the collection is a text line of input file.

Since we want to have the total number of sales per brand as a result, we must group them accordingly. Therefore, the next step will be to parse every line and create a key/value pair where key is a brand name and value is a number of sales. It’s worth to mention that the output PCollection from a previous PTransform will be the input PCollection for this one.


On this step, we use Beam internal PTransform, that is called MapElements to create a new pair of key/values for every input entry using the provided implementation of SimpleFunction interface.

We then group the number of sales by brand using another Beam’s transform - GroupByKey. As an output result we have a PCollection of key/values where key is brand name and value is an iterable collection of sales for that brand.

.apply(GroupByKey.<String, Integer>create())

Now we are ready to sum up all numbers of car sales per brand using our own implementation of ParDo transform:

 

To finalize the pipeline, we apply another IO transform to take the PCollection of strings and write them in a text file:

.apply(TextIO.write().to(/path/to/output/dir).withoutSharding());

The last thing, we need to do, is to run our created pipeline:

pipeline.run();

Looks quite easy, doesn’t it? This is the power of Apache Beam which allows to create complicated data processing pipelines with a minimum amount of code.

For those of you familiar with Hadoop, you may have noticed that this pipeline resembles something:

  • It reads and parses text data line by line creating new key/value pairs (Map)
  • Then groups these key/values by key (GroupBy)
  • Finally, it iterates over all values of one key applying some user function (Reduce)

Yes, that’s true - this simple pipeline can be performed with a classic MapReduce job! But just compare how simpler and clearer it looks in Beam (despite being in Java!) and if we decide to extend our pipelines by adding another transform then it won’t become much more complicated.

Building and running a pipeline

As I mentioned before, a Beam pipeline can be run on different runners (processing engines):

  • Direct Runner
  • Apache Apex
  • Apache Flink
  • Apache Gearpump
  • Apache Spark
  • Google Cloud Dataflow

To do this, we just need to add a correspondent dependency to our maven or gradle project configuration. The good thing is that we don’t have to adjust or rewrite pipeline code to run it on each runner. Even better, we don’t have to recompile our jars if all required runners dependency were included before - we just need to choose which runner to use and that’s it!

Direct Runner is a local runner which is usually used to test your pipeline. When using Java, you must specify your dependency on the Direct Runner in your pom.xml.

<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-runners-direct-java</artifactId>
   <version>2.4.0</version>
   <scope>runtime</scope>
</dependency>

After, you have to compile your project:
# mvn clean package

And run your pipeline on direct runner:
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.SalesPerCarsBrand -Pdirect-runner -Dexec.args="--runner=DirectRunner”

For example, if our input file contains the following data:
# cat /tmp/beam/cars_sales_log
 1,Toyota,Prius,3
 2,Nissan,Sentra,2
 1,Toyota,Yaris,4
 3,Ford,Fusion,5
 3,Ford,Kuga,3

Then the final result will be like this:
# cat /tmp/beam/cars_sales_report
Toyota: 7
Nissan: 2
Ford: 8

The list of all supported runners and the instructions, how to use them, can be found on this page.

Finally, all code of this example is published on this GitHub repository: https://github.com/aromanenko-dev/beam-tutorial

In the next part of this blog post series, I will talk about streaming data processing in Beam. I’ll take another example of data analytics task with an unbounded data source and we will see what Beam provides us in this case.

Join The Conversation

0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *