Apache Beamを使用したデータ処理ジョブの開発 – ストリーミングパイプライン

前回のブログでは、Apache Beamを使ったデータ処理ジョブの開発について紹介しました。今回は、現代のビッグデータ処理で非常にニーズの大きなストリーミングデータの処理について説明します。

バッチストリーミングの主な違いは、入力データソースのタイプです。データセットが限られていて(サイズが巨大でも)、処理中に更新されない場合は、バッチパイプラインを使用する可能性が高くなります。この場合、入力ソースはファイル、データベーステーブル、オブジェクトストレージ内のオブジェクトなど、何でもかまいません。もう一度強調しますが、バッチ処理では、処理の期間全体でデータが変更可能であり、入力レコードの数は一定です。この点に注意しなければならないのは、ファイルについても、常にファイルの追加や変更が行われるとデータストリームが無限になる可能性があるためです。その場合は、データを処理するためにストリーミングアプローチを適用する必要があります。したがって、データが限られていて不変であることがわかっている場合は、バッチ処理パイプラインを開発する必要があります。

データセットが無制限(継続的に到着)/可変の場合は、処理がより複雑になります。ソースの例としては、メッセージシステム(Apache Kafkaなど)、ディレクトリー内の新しいファイル(Webサーバーログなど)、その他のリアルタイムデータを収集するシステム(IoTセンサーなど)といったものがあります。これらすべての情報源に共通しているのは、常に新しいデータを待たなければならないという点です。もちろん、データを(時間ごとまたはデータサイズごとに)バッチに分割し、分割ごとにバッチ処理することも可能です。しかし、一部の関数については、すべての消費データセットに適用し、そのためのパイプラインを丸ごと作るのが困難です。幸いなことに、この種のデータ処理に簡単に対処できるストリーミングエンジンとして、Apache SparkApache FlinkApache ApexGoogle DataFlowを使用できます。これらはすべてApache Beamによってサポートされ、コードを変更することなく、異なるエンジンで同じパイプラインを実行できます。さらに、最小限の変更でバッチ処理でもストリーミングモードでも同じパイプラインを使用できます。入力パイプラインを正しく設定するだけで、即座に使用できます。バッチジョブをストリーミングジョブに書き換えていた頃から、このような機能があれば素晴らしいだろうと考えていました。

理屈はさておき、例を使用して最初のストリーミングコードを記述していきましょう。Kafka(無制限のソース)からデータを読み込み、簡単なデータ処理を実行し、結果をKafkaにも書き戻します。

リアルタイムで到着する地図上のいくつかのオブジェクトの地理座標(XとY)の無限の流れ(この例では、オブジェクトは車だとしましょう)があり、特定地域にあるものだけを選択したい場合を考えます。つまり、我々はKafkaトピックからテキストデータを消費し、それを解析し、指定された制限でフィルタリングし、別のKafkaトピックに書き戻す必要があります。Apache Beamを利用してこれを実現する方法を見ていきましょう。

それぞれのKafkaメッセージには、次の形式のテキストデータが含まれています。
id,x,y

このとき:
id – オブジェクトの一意のID
x, y – 上の座標(整数)

形式に注意し、有効でない場合はスキップします。

パイプラインの作成

前回のブログでのバッチ処理と同じ方法でパイプラインを作成します。

Pipeline pipeline = Pipeline.create(options);

Optionsオブジェクトを詳細に指定することで、コマンドラインオプションをパイプラインに渡すことができます。詳しくはGithubを参照してください。

次に、Kafkaの入力トピックからデータを読み込みます。すでに述べたように、Apache BeamはさまざまなIOコネクターを提供しています。KafkaIOもその1つです。したがって、指定されたKafkaトピックからの着信メッセージを消費し、それらをさらに次のステップに伝播する新しい無制限のPTransformを作成します。

pipeline.apply(
    KafkaIO.<Long, String>read()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getInputTopic())
        .withKeyDeserializer(LongDeserializer.class)
        .withValueDeserializer(StringDeserializer.class))

デフォルトでは、KafkaIOは消費されるすべてのメッセージをKafkaRecordオブジェクトにカプセル化します。ただし、次の変換は新しく作成されるDoFnオブジェクトによってペイロード(文字列値)を取得するだけです。

.apply(
    ParDo.of(
        new DoFn<KafkaRecord<Long, String>, String>() {
            @ProcessElement
            public void processElement(ProcessContext processContext) {
                KafkaRecord<Long, String> record = processContext.element();
                processContext.output(record.getKV().getValue());
            }
        }
    )
)

このステップの後、レコードをフィルタリングします(上記の最初のタスクを参照)。その前に、定義されたフォーマットに従って文字列値を解析する必要があります。これにより、Beam内部変換Filterで使用される単一の機能オブジェクトにカプセル化できます。

.apply(
    "FilterValidCoords",
    Filter.by(new FilterObjectsByCoordinates(
        options.getCoordX(), options.getCoordY()))
)

次に、Kafkaに書き戻すためのフィルタリングされたメッセージを準備します。そのために、KafkaIOを含むさまざまなIOコネクターで使用できる内部Beam KVクラスを使用して、新しいキー/値ペアを作成します。

.apply(
    "ExtractPayload",
    ParDo.of(
        new DoFn<String, KV<String, String>>() {
           @ProcessElement
           public void processElement(ProcessContext c) throws Exception {
                c.output(KV.of("filtered", c.element()));
           }
        }
    )
)

最後の変換は、Kafkaにメッセージを書き込むために必要なのです。そのため、これらの目的のために単純にKafkaIO.write()(シンクの実装)を使用します。読み込みについては、Kafkaブートストラップサーバー、出力トピック名、キー/値のシリアライザーなど、いくつかの必須オプションを使用して、この変換を構成する必要があります。

.apply(
    "WriteToKafka",
    KafkaIO.<String, String>write()
        .withBootstrapServers(options.getBootstrap())
        .withTopic(options.getOutputTopic())
        .withKeySerializer(org.apache.kafka.common.serialization.StringSerializer.class)
        .withValueSerializer(org.apache.kafka.common.serialization.StringSerializer.class)
);

最後に、パイプラインを通常どおり実行します。

pipeline.run();

今回の操作は、前回よりも若干複雑に思えるかもしれませんが、パイプラインストリーミング互換にするために特別なことは何も行っていません。それはApache Beamデータモデル実装が担っているので、Beamユーザーにとってバッチ処理とストリーミング処理の切り替えが非常に簡単になっています。

パイプラインの構築と実行

Beam KafkaIOを使用できるようにするために必要な依存関係を追加しましょう。

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-kafka</artifactId>
  <version>2.4.0</version>
</dependency>

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>1.1.0</version>
</dependency>

次に、jarを作成してDirectRunnerで実行し、動作をテストします。

# mvn clean package
# mvn exec:java -Dexec.mainClass=org.apache.beam.tutorial.analytic.FilterObjects -Pdirect-runner -Dexec.args=”–runner=DirectRunner”

必要に応じて、「exec.args」オプションを使用して、パイプラインで使用されている他の引数を追加できます。また、Beamパイプラインを実行する前に、Kafkaサーバーが使用可能で適切に指定されていることを確認します。最後に、Mavenコマンドがパイプラインを起動し、手動で終了するまで永続的に実行します(任意に最大実行時間を指定することもできます)。つまり、データがストリーミングモードで継続的に処理されます。

いつものように、この例で使用したコードはGithubリポジトリで公開されています。

ストリーミングの成功をお祈りしています!

ディスカッションに参加

0 Comments

Leave a Reply