DOWNLOAD : The Cloud Data Integration Checklist from TDWI

How to Develop a Data Processing Job Using Apache Beam – Streaming Pipelines

How to Develop a Data Processing Job Using Apache Beam – Streaming Pipelines

  • Alexey Romanenko
    Alexey Romanenko is Open Source Engineer in Talend (France) with more than 15 years of experience in software development. During his career, he has been working on different projects, like high-load web services, web search engines and cloud storage. Also, he developed and presented a course devoted to Hadoop/Cloud technologies for students. Recently, he joined the Apache Beam project as a new contributor. He spends his spare time with his family and he likes to play ice hockey.

In our last blog, we talked about developing data processing jobs using Apache Beam. This time we are going to talk about one of the most demanded things in modern Big Data world nowadays – processing of Streaming data.

The principal difference between Batch and Streaming is the type of input data source. When your data set is limited (even if it’s huge in terms of size) and it is not being updated along the time of processing, then you would likely use a batching pipeline. Input source, in this case, can be anything from files, database tables, objects in object storages, etc. I want to underline one more time that, with batching, we assume that data is mutable during all the processing time and the number of input records is constant. Why should we pay attention to this? Because even with files we can have unlimited data stream when files are always added or changed. In this instance, we have to apply a streaming approach to work with data. So, if we know that our data is limited and immutable then we need to develop a batching pipeline.

Things get more complicated when our data set is unlimited (continuously arriving) or/and mutable. Some of the examples of such sources might be the following - message systems (like Apache Kafka), new files in a directory (web server logs) or some other system collecting real-time data (like IoT sensors). The common theme among all of these sources is that we always have to wait for new data. Of course, we can split our data into batches (by time or by data size) and process every split in a batching way, but it would be quite difficult to apply some functions across all consumed datasets and create the whole pipeline for this. Luckily, there are several streaming engines that allow us to cope with this type of data processing easily – Apache SparkApache FlinkApache ApexGoogle DataFlow. All of them are supported by Apache Beam and we can run the same pipeline on different engines without any code changes. Moreover, we can use the same pipeline in batching or in streaming mode with minimal changes – the one just needs to properly set input source and voilà – everything works out of the box! Just like magic! I would dream of this a while ago when I was rewriting my batch jobs into streaming ones.

So, enough theory - it’s time to take an example and write our first streaming code. We are going to read some data from Kafka (unbounded source), perform some simple data processing and write results back to Kafka as well.

Let's suppose we have an unlimited stream of geo-coordinates (X and Y) of some objects on a map (for this example, let's say the objects are cars) which arrives in real time and we want to select only those that are located inside a specified area. In other words, we have to consume text data from Kafka topic, parse it, filter by specified limits and write back into another Kafka topic. Let’s see how we can do this with a help of Apache Beam.

Every Kafka message contains text data in the following format:
id,x,y

where:
  id – unique id of the object,
  x, y - coordinates on the map (integers).

We will need to take care of the format if it’s not valid and skip such records.

Creating a pipeline

Much like our previous blog, where we did batching processing, we create a pipeline in the same way:

Pipeline pipeline = Pipeline.create(options);


We can elaborate Options object to pass command line options into the pipeline. Please, see the whole example on Github for more details.

Then, we have to read data from Kafka input topic. As stated before, Apache Beam already provides a number of different IO connectors and KafkaIO is one of them. Therefore, we create new unbounded PTransform which consumes arriving messages from specified Kafka topic and propagates them further to the next step:

pipeline.apply(
    KafkaIO.<Long, String>read()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getInputTopic())
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(StringDeserializer.class))


By default, KafkaIO encapsulates all consumed messages into KafkaRecord object. Though, next transform just retrieves a payload (string values) by new created DoFn object:

.apply(
    ParDo.of(
        new DoFn<KafkaRecord<Long, String>, String>() {
            @ProcessElement
            public void processElement(ProcessContext processContext) {
                KafkaRecord<Long, String> record = processContext.element();
                processContext.output(record.getKV().getValue());
            }
        }
    )
)


After this step, it is time to filter the records (see the initial task stated above) but before we do that, we have to parse our string value according to the defined format. This allows it to be encapsulated into one functional object which then will be used by Beam internal transform Filter.

.apply(
    "FilterValidCoords",
    Filter.by(new FilterObjectsByCoordinates(
        options.getCoordX(), options.getCoordY()))
)


Then, we have to prepare filtered messages to write back to Kafka by creating a new pair of key/values using internal Beam KV class which can be used across different IO connectors, including KafkaIO as well.

.apply(
    "ExtractPayload",
    ParDo.of(
        new DoFn<String, KV<String, String>>() {
           @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
                c.output(KV.of("filtered", c.element()));
           }
        }
    )
)


The final transformation is needed to write messages into Kafka, so we simply use KafkaIO.write() - sink implementation – for these purposes. As for reading, we have to configure this transform with some required options, like Kafka bootstrap servers, output topic name and serialisers for key/value.

.apply(
    "WriteToKafka",
    KafkaIO.<String, String>write()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getOutputTopic())
        .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
        .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)
);


In the end, we just run our pipeline as usual:

pipeline.run();


This time it may seem a bit more complicated than it was in previous blog, but, as one can easily notice, we didn’t do any specific things to make our pipeline streaming-compatible. This is the whole responsibility of the Apache Beam data model implementation which makes it very easy to switch between batching and streaming processing for Beam users.

Building and running a pipeline

Let’s add the required dependencies to make it possible to use Beam KafkaIO:

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-kafka</artifactId>
  <version>2.4.0</version>
</dependency>

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>1.1.0</version>
</dependency>

Then, just build a jar and run it with DirectRunner to test how it works:

# mvn clean package
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.FilterObjects -Pdirect-runner -Dexec.args="--runner=DirectRunner"

If it’s needed, we can add other arguments used in the pipeline with a help of “exec.args” option. Also, make sure that your Kafka servers are available and properly specified before running Beam pipeline. Lastly, the Maven command will launch a pipeline and run it forever until it will be finished manually (optionally, it is possible to specify maximum running time). So, it means that data will be processed continuously, in streaming mode.

As usual, all code of this example is published on this github repository.

Happy streaming!

Join The Conversation

0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *