注目の投稿

Talend、Q2 2020 Forrester WaveTMのEnterprise Data Fabricリーダーの一社に選ばれる

詳細情報
注目の投稿

Talend Winter’20で、データからインテリジェンスを引き出す

詳細情報
注目の投稿

Pipeline Designerの紹介:データ統合の革新

詳細情報
注目の投稿

オンプレミスからクラウドにデータを移行する方法:Amazon S3

詳細情報
注目の投稿

データガバナンス戦略を構築するために重要な5つの考慮事項

詳細情報

データレイク構築で陥りやすい3つの落とし穴とその回避方法

| 2018年8月8日 | Big Data/Data Lake

最近、北米大手銀行のIT部門のSVP(シニアバイスプレジデント)とデジタルトランスフォーメーション戦略について話す機会がありました。その中で、ビッグデータやデジタルトランスフォーメーションに対するアプローチが絶え間なく進化しているという話が印象的でした。市場に新しく登場してきたテクノロジーの機能をビジネスに生かすには、新たな軸足やアプローチが必要です。データとアナリティクスの成長を維持/拡張できる俊敏性の高いアーキテクチャーを使用することが、これまで以上に重要になっています。ここでは、データレイクの構築で陥りやすい3つの落とし穴と、その回避方法について説明したいと思います。 「取り込みツールさえあればよい」 データレイクを構築すれば、あらゆる課題を解決できると思われがちです。確かに、データの格納場所ができることは成果と言えます。多くの場合、最初に課題となるのがデータの取り込みです。データレイクに流れ込む種類も量も莫大なデータの収集/取り込みに対処する必要があり、データさえ収集できれば簡単に目標を達成できると考え、データ取り込みソリューションを購入します。データのキャプチャーと収集は可能になりますが、これで問題が解決したのではありません。一時的には問題ないかもしれませんが、真の取り組みはこれからです。 データをデータレイクに格納することが始まりでしかないことは、すぐに明らかになります。多くのプロジェクトは、「データスワンプ(沼)」の問題により失敗します。これは、データレイクが構造を持たず、品質が低いうえに、人材も不足し、実際にデータがどこから来たのかトレースすることもできない状況を指します。生データは、そのままでは有用性が低く、データから質の高いアナリティクスを行うには、まずデータを処理し、クレンジングし、変換する必要があります。これが、2つ目の落とし穴につながります。 データレイクのハンドコーディング We have had many blogs in the past on this, but you can’t emphasize this topic enough. It’s strikingly true that hand coding may look promising from the initial deployment costs, but the maintenance costs can increase by upwards of 200%. The …

Read Article

Apache Beamを使用したデータ処理ジョブの開発 – ストリーミングパイプライン

| 2018年8月7日 | Open Source Streaming Data

前回のブログでは、Apache Beamを使ったデータ処理ジョブの開発について紹介しました。今回は、現代のビッグデータ処理で非常にニーズの大きなストリーミングデータの処理について説明します。 バッチとストリーミングの主な違いは、入力データソースのタイプです。データセットが限られていて(サイズが巨大でも)、処理中に更新されない場合は、バッチパイプラインを使用する可能性が高くなります。この場合、入力ソースはファイル、データベーステーブル、オブジェクトストレージ内のオブジェクトなど、何でもかまいません。もう一度強調しますが、バッチ処理では、処理の期間全体でデータが変更可能であり、入力レコードの数は一定です。この点に注意しなければならないのは、ファイルについても、常にファイルの追加や変更が行われるとデータストリームが無限になる可能性があるためです。その場合は、データを処理するためにストリーミングアプローチを適用する必要があります。したがって、データが限られていて不変であることがわかっている場合は、バッチ処理パイプラインを開発する必要があります。 データセットが無制限(継続的に到着)/可変の場合は、処理がより複雑になります。ソースの例としては、メッセージシステム(Apache Kafkaなど)、ディレクトリー内の新しいファイル(Webサーバーログなど)、その他のリアルタイムデータを収集するシステム(IoTセンサーなど)といったものがあります。これらすべての情報源に共通しているのは、常に新しいデータを待たなければならないという点です。もちろん、データを(時間ごとまたはデータサイズごとに)バッチに分割し、分割ごとにバッチ処理することも可能です。しかし、一部の関数については、すべての消費データセットに適用し、そのためのパイプラインを丸ごと作るのが困難です。幸いなことに、この種のデータ処理に簡単に対処できるストリーミングエンジンとして、Apache Spark、Apache Flink、Apache Apex、Google DataFlowを使用できます。これらはすべてApache Beamによってサポートされ、コードを変更することなく、異なるエンジンで同じパイプラインを実行できます。さらに、最小限の変更でバッチ処理でもストリーミングモードでも同じパイプラインを使用できます。入力パイプラインを正しく設定するだけで、即座に使用できます。バッチジョブをストリーミングジョブに書き換えていた頃から、このような機能があれば素晴らしいだろうと考えていました。 理屈はさておき、例を使用して最初のストリーミングコードを記述していきましょう。Kafka(無制限のソース)からデータを読み込み、簡単なデータ処理を実行し、結果をKafkaにも書き戻します。 リアルタイムで到着する地図上のいくつかのオブジェクトの地理座標(XとY)の無限の流れ(この例では、オブジェクトは車だとしましょう)があり、特定地域にあるものだけを選択したい場合を考えます。つまり、我々はKafkaトピックからテキストデータを消費し、それを解析し、指定された制限でフィルタリングし、別のKafkaトピックに書き戻す必要があります。Apache Beamを利用してこれを実現する方法を見ていきましょう。 それぞれのKafkaメッセージには、次の形式のテキストデータが含まれています。 id,x,y このとき: id – オブジェクトの一意のID x, y – 上の座標(整数) 形式に注意し、有効でない場合はスキップします。 パイプラインの作成 前回のブログでのバッチ処理と同じ方法でパイプラインを作成します。 Pipeline pipeline = Pipeline.create(options); Optionsオブジェクトを詳細に指定することで、コマンドラインオプションをパイプラインに渡すことができます。詳しくはGithubを参照してください。 次に、Kafkaの入力トピックからデータを読み込みます。すでに述べたように、Apache BeamはさまざまなIOコネクターを提供しています。KafkaIOもその1つです。したがって、指定されたKafkaトピックからの着信メッセージを消費し、それらをさらに次のステップに伝播する新しい無制限のPTransformを作成します。 pipeline.apply( KafkaIO.<Long, String>read() .withBootstrapServers(options.getBootstrap()) .withTopic(options.getInputTopic()) .withKeyDeserializer(LongDeserializer.class) .withValueDeserializer(StringDeserializer.class)) デフォルトでは、KafkaIOは消費されるすべてのメッセージをKafkaRecordオブジェクトにカプセル化します。ただし、次の変換は新しく作成されるDoFnオブジェクトによってペイロード(文字列値)を取得するだけです。 .apply( ParDo.of( new DoFn<KafkaRecord<Long, String>, String>() …

Read Article

効果的なTalendジョブ設計レビューの実施 – 入門編

どこの開発チームでも、一般的なプラクティスとしてコードレビューが実施されています(少なくとも、そうあるべきです)。コードレビューは、複数の開発者が記述されたコードを調べ、品質と正確さを向上させるために、その設計、実装、構造について議論するプロセスです。正式な手法、あるいは、より簡易な手法(ペアプログラミングなど)のどちらを実施するにせよ、実稼働前に欠陥や不足を見つけるために、コードレビューは効果的であることが証明されています。 さらに、コードレビューを実施することで、チーム内で確立されたベストプラクティスに沿って全員が作業を進めることができます。このチーム内でのコラボレーションにより、途中で新しいベストプラクティスを特定することも容易になります。それだけでなく、定期的にコードレビューを行うことで一定レベルの情報共有が実現され、すべての開発者がお互いから学ぶ機会が得られます。これは、経験の浅い開発者にとって特に有効ですが、上級の開発者もこのプロセスから学ぶことがあります Talendはコードジェネレーターなので、開発者が実際にコードを記述することはありませんが、その裏で、ジョブの開発中には行単位のコーディングで多くの要素を共通しています。すなわち、Talendは非常に柔軟なプラットフォームであり、開発者は様々な方法でジョブを構築できるのです。ジョブの設計、設定、オーケストレーションのフローをレビューするだけであっても、コードレビューを実施することのメリットを100%得ることができます。 Talendジョブレビューの「意義」 Talendジョブレビューの目的を一語で表すとすれば、「品質」でしょう。戦略的には、長期的にはジョブレビューによって当然のことながら開発者のスキルが向上し、ベストプラクティスが洗練されます。つまり、将来のジョブについては、ジョブレビュー前の段階からパフォーマンスが改善され、欠陥が減少し、そして保守が容易になるのです。 Talendジョブ設計レビューのメリットについて、上記の説明だけでは確信を持てない方もいるでしょう。開発者はそれなりのプライドをもってジョブを構築しているので、反応にばらつきがあるのも当然です。しかし、レビューのメリットに注目し、お互いから学び、全員のスキルを向上させる場として前向きに捉えることが重要です。レビューに参加している間は、関与する開発者に対して常に思いやりと敬意を持つべきです。チームによっては、正式なチーム全体のレビューよりもペアで行うレビューの方が効果的な場合もあります。ペアで実施する場合でも、オフラインではなく直接顔を合わせてのレビューの実施を推奨します。そのほうが、レビューのコラボレーションから得られるものが多いのです。実利をとることが重要です。コードレビュープロセスを改善する方法を提案するようにチームに求めましょう 定性的側面と定量的側面 Talendジョブをレビューするときには、質と量の両方から考えることが重要です。 質の面では、ベストプラクティスの適用と、それを採用することが必要です。Talendの推奨するベストプラクティスをまだ読まれていない場合は、ぜひ目を通してください。現在、4回シリーズのブログ記事で紹介しています。 Talendの「ジョブ設計パターン」とベストプラクティス - 第1回 Job Design Patterns & Best Practices PART 2(英語) Job Design Patterns & Best Practices PART 3(英語) Job Design Patterns & Best Practices PART 4(英語) ベストプラクティスに関するこれらの記事では、効果的なジョブ設計の定性的側面について説明しています。読みやすく、書きやすく、保守しやすいジョブを設計する方法について、推奨事項を紹介しています。さらに、より良いジョブを構築するために事前に知っておくべき、以下のような基本事項についても述べています。 機能 再利用可能性 拡張性 一貫性 パフォーマンス その他 これらの基本事項をうまく選択してバランスを取ることが成功の秘訣です。 成功するジョブ設計パターンの方法論と実践に関する後続シリーズ(現在のところ2回分)も参考にしてください。 Successful Methodologies …

Read Article

Apache SparkとTalend:パフォーマンスと調整

| 2018年4月12日 | ビッグデータ統合

このブログシリーズでは、すでに2つの記事でTalendとApache Sparkに関して説明してきました。 このブログシリーズのこれまでの記事をまだ読んでいない場合は、最初に第1部「TalendとApache Spark:技術的な手引きと概要」と第2部「Talend Sparkジョブ vs. spark-submitの構成:2つの違いとは?」をお読みください。 Apache Sparkに関するシリーズの最初の2回は、Talendとspark-submitの類似点、およびTalendのSparkジョブで使用可能な構成オプションの概要について説明しました。 このブログでは、Apache Sparkのパフォーマンスと調整について説明します。これは、Talendユーザーに限らず、Apache Sparkを使用しているほとんどのユーザーに共通の議論です。最初のSparkジョブを開発して実行するときには、常に次のような疑問が浮かびます。 Sparkジョブにはいくつのエクゼキューターを割り当てる必要があるのか? 各エグゼキューターに必要なメモリー量は? コアをいくつ使用する必要があるのか? 一部のSparkジョブは10GB程度のデータを処理するのに何時間もかかるが、この問題をどうやって解決できるか? このブログでは、これらの質問を取り上げ、答えと洞察を提供します。その前に、このブログで使用されるいくつかの重要な概念を紹介します。 パーティション:パーティションは分散データセットの一部です。デフォルトのHDFSブロックサイズで作成されます。Sparkはパーティションを利用してデータセットを並列処理します。 タスク:タスクは、エクゼキューター内で実行できる作業単位です。 コア:コアは、エクゼキューター内で実行できるSpark内の並列タスクの数を決定する、CPU内の処理単位です。 Sparkエグゼキューター:ワーカーノード上で開始され、メモリーまたはディスク内でジョブのサブミットを実行するプロセスです。 アプリケーションマスター:各YARNアプリケーションは、リソースマネージャーからリソースを要求する責任を持つアプリケーションマスタープロセスをスピンアップします。リソースが割り当てられると、プロセスはノードマネージャーと連携して、ノードマネージャー内で必要なコンテナーを起動します。 Sparkの調整 最初に、Talend内のApache Sparkジョブを調整する方法を検討しましょう。前述のように、Talend Sparkジョブには、[Spark Configuration]タブがあり、ここで調整プロパティを設定できます。Talendでは、これはデフォルトでは常にオフになっています。 このセクションには、アプリケーションのマスターとエグゼキューターが使用するメモリーとコア、およびジョブが要求するエグゼキューターの数を設定するオプションがあります。このセクションで値を指定する際の主な疑問は、「アプリケーションマスターやエグゼキューターがパフォーマンスを向上させるために必要なコア数またはメモリー数をどのように決定するのか」ということです。 Sparkジョブのコア数を選択する方法 この時点では、先に進む前に考慮しなければならない要素がいくつかあります。 データセットのサイズ ジョブが完了する必要がある時間枠 ジョブが実行している処理とアクション これらの要素を念頭に置いて、パフォーマンスを最大化するようにジョブを構成し始めることができます。まずアプリケーションマスターの調整から始めましょう。アプリケーションマスターの場合、リソースのオーケストレーションを行うだけで、処理は行わないため、デフォルト値をそのまま使用できます。つまり、メモリーやコアの値を大きくする必要はありません。 次のステップは、エクゼキューター用にメモリーとコアを構成することです。ここでの主な問題は、エクゼキューター、メモリー、コアをいくつ使うべきかということです。その答えを見つけるために、それぞれに32コアと120GBのメモリーを使用する6つのワーカーノードを持つHadoopクラスターがあるとします。おそらく頭に浮かぶ最初の考えは、エグゼキューターごとに持つことができる同時タスクが多いほど、パフォーマンスが向上するということです。これについて調べると、Hadoopディストリビューションのパフォーマンスチューニングガイド(Clouderaの例はこちら)では、1エクゼキューターあたり5コアを超えるとHDFS I/Oが低下することがわかります。したがって、高いパフォーマンスのためのコアの最適値は5です。 次に、いくつのエグゼキューターを起動したいのかを見てみましょう。コアとノードの数に基づいて、この数を簡単に判断できます。前述したように、5コアがエグゼキューターごとに使用するのに最適な数です。さて、ノードあたりの32個のコアのそれぞれから、ノードで実行されているオペレーティングシステムとHadoopデーモンで必要とされているために、ジョブに使用できないものを削除する必要があります。Hadoopクラスター管理ツールはすでにこれを行っているので、ノードあたりのSparkジョブに使用できるコアの数を簡単に判断できます。 この計算を行った後、使用可能なノードあたり30コアが残っているとしましょう。5コアがエグゼキューターあたりの最適な数であるとすでに決定したので、ノードあたり最大6つのエグゼキューターを実行できることを意味します。簡単に特定できましたね! 最後に、使用可能なメモリー量を計算します。上記のハードウェア仕様に基づいて、ノードごとに120GBのメモリーがあることがわかりますが、コアについて説明したときに述べたように、オペレーティングシステムが一部を使用する必要があるため、ジョブ用にそのメモリーをすべて使用できません。ここでも、Hadoopクラスター管理ツールは、メモリーのうちどれだけをジョブに使用できるかを判断できます。オペレーティングシステムとHadoopデーモンに2GBのメモリーが必要な場合、Sparkジョブに使用するために118GBのメモリーが残ります。ノードごとに6つのエクゼキューターを持つことができると決定済みなので、エクゼキューターごとに最大約20GBのメモリーを使用できることになります。ただし、これは100%正しいわけではありません。各エクゼキューターが持つメモリーオーバーヘッドも計算する必要があるためです。前のブログで、オーバーヘッドのデフォルトは384MBであると述べました。これを20GBから差し引くと、1エグゼキューターあたり最大19GBを指定できると言えます。 クラスターリソースの動的割り当て vs. 固定割り当て 上記の数値は、Sparkジョブ内のクラスターリソースの固定または動的割り当てに使用できます。両者の違いは動的割り当てです。動的割り当てでは、使用されるエクゼキューターの最初の数、それほどワークロードがない場合にジョブが使用できる最低限のエクゼキューター、より多くの処理能力が必要な場合の最大数を指定できます。ジョブのためにクラスターのすべてのリソースを使うことができれば素晴らしいですが、その処理能力をクラスター上で実行される他のジョブと共有する必要があります。そのため、Talend Sparkジョブの調整を検討するために先に定義した要因を検討する際に、要件として特定したものに基づいて、最大値の何パーセントを使用可能か決定します。 ジョブを構成したので、次は実際にジョブを実行しましょう。上記で定義した最大設定でもSparkジョブが完了までに時間がかかることがわかった場合は、最大のパフォーマンスを引き出すために、ジョブに戻り、さらにいくつかの設定を確認する必要があります。 Sparkのパフォーマンス まず、Sparkのジョブで2つのテーブルを結合しましょう。Sparkジョブの最適化を開始する前に検討した要因の1つは、データセットのサイズです。テーブルのサイズを確認し、1つが50GBで、もう1つが100MBであると判断したら、Talendコンポーネントのレプリケート結合を利用しているかどうかを確認する必要があります。 …

Read Article

Apache Sparkのパーティショニングの基礎知識

| 2018年3月5日 | Developer Step-by-Step ビッグデータ統合

Apache SparkのResilient Distributed Dataset(RDD)は、サイズが大きすぎて1つのノードに収まらないため、複数のノードに分割する必要があるさまざまなデータの集合です。Apache Sparkは自動的にRDDをパーティションに分割し、複数のノードに分散します。この操作は遅延評価され(たとえば、アクションがトリガーされるまで実行を開始しないことで管理性が高まり、計算量が低減するため、結果的に最適化と速度が向上します)、変換はDirected Acyclic Graph(DAG)として格納されます。したがって、RDDに対して何らかのアクションが実行されると、Apache SparkがDAGを再計算します。 Apache Sparkのパーティションの特性を理解しておくことで、パフォーマンスの向上、デバッグ、およびエラー処理が容易になります。 以下に、パーティション分割の基本情報をいくつか紹介します。 Sparkクラスター内の各ノードには、1つ以上のパーティションが含まれています。 Sparkで使用されるパーティションの数は設定が可能で、少なすぎると同時実行性の低下、データの偏り(データスキュー)、不適切なリソース利用の原因となり、多すぎるとタスクスケジューリングの所要時間が実際の実行時間より長くなるなどの問題が発生します。デフォルトでは、すべてのexecutorノード上のコアの総数に設定されています。 Sparkのパーティションが複数のマシンにまたがることはありません。 同じパーティション内のタプルは、同じマシン上にあることが保証されています。 Sparkはパーティションごとに1つのタスクを割り当て、各Workerは一度に1つのタスクを処理できます。 Apache Sparkでのハッシュパーティショニングとレンジパーティショニング Apache Sparkは「ハッシュパーティショニング」と「レンジパーティショニング」の2種類のパーティショニングをサポートしています。データ内のキーの分散方法または配列方法や、データに対して実行するアクションに応じて、適切な手法を選択します。以下のような多くの要因が、パーティショニングの選択に影響を与えます。 利用可能なリソース — タスクを実行できるコアの数。 外部のデータソース — ローカルコレクション、Cassandraテーブル、またはHDFSファイルのサイズによってパーティション数が決まります。 RDDの派生に使用される変換 — RDDが別のRDDから派生する際にパーティションの数を決定するためのルールが多数存在します。 Apache Sparkの使用に際しては、留意すべき点がいくつかあります。このブログでは、ビジネスデータ、そのデータのキー、Spark処理における物理リソース、そして何よりもネットワーク、CPU、メモリーを完全に認識しておくことの重要性について説明します。 Apache Sparkのパーティショニングでよく見られる問題には次のようなものがあります。 データの偏り(データスキュー)とシャッフルブロック Apache Sparkのデフォルトのパーティション分割ではデータの偏りが発生し、その結果、データ集約操作中のシャッフルや単一のエグゼキューターのメモリー不足に関連した問題が発生する可能性があります。 この例では、「key-a」のパーティション内のデータ量が大きいため、Exec-5のタスクの完了には他の5つのタスクよりもはるかに長い時間を要します。もう1つの重要なのは、Sparkのシャッフルブロックは2GB以下でなければならないという点です(内部的にByteBuffer抽象化ではMAX_SIZEが2GBに設定されているため)。たとえば、集約、結合、キャッシュ操作などの操作を実行している場合、Sparkシャッフルが発生し、パーティションの数が少ないことやデータスキューが原因でシャッフルブロックの問題が発生する可能性があります。したがって、シャッフルによるMAX_SIZE制限の違反に関連するエラーが発生し始めた場合には、データの偏りが原因であることがわかります。 賢明なパーティショニング では、どうすればデータの偏りとシャッフルブロックを回避できるのでしょうか。非常に重要なのは、メモリープレッシャーを管理し、エグゼキューターのノードでリソースをフル活用できるよう、賢明な方法でパーティション分割を行うことです。そのためには、データのサイズ、タイプ、分散方法を常に把握しておく必要があります。覚えておくべきベストプラクティスは次のとおりです。 ドライバーに負荷をかけないように、またエグゼキューター上でタスクが適切に実行されるように、reduceByKeyやaggregateByKeyなどのアクションの正しい演算子を理解し選択します。 いくつかの大規模で分割不可なファイルでデータを受け取った場合、InputFormatによるパーティション分割では各パーティションに大量のレコードを配置される可能性があります。しかし、利用可能なすべてのコアを活用するのに十分なパーティションは生成されません。この場合、データのロード後に多数のパーティションを使用する再パーティションを呼び出すことで、後続の操作でクラスターのCPUをより多く利用できるようになります。 また、データが偏っている場合は、負荷を均等に分散できる適切なキーを使用して再パーティション化することも推奨されます。 Talendは、選択された適切なキーに基づいて、再パーティション化に必要なtPartitionコンポーネントを提供します。 最適なパーティション数を算出するには Apache Sparkは、RDDの各パーティションに対して1つの同時タスクしか実行できず、その最大数はクラスター内のコア数(またはその2~3倍)になります。したがって、「最適な」数のパーティションを選択するためには、一般的に最低でも並列処理用のエクゼキューターの数と同数のパーティションが必要です。この数値を算出するには、sc.defaultParallelismを呼び出します。パーティションの最大サイズは、最終的にはエグゼキューターの利用可能なメモリーによって決まります。 適切な再パーティション化キーを使用してデータを均等に分散することが不可能な場合もあります。そこで、新しい「偽の」キーを追加し、現在のキーと一緒に使用することでデータを均等に分散させるソルトなどの方法を使用します。次に例を示します。 …

Read Article

Talend Sparkジョブ vs. spark-submitの構成:2つの違いとは?

| 2018年2月21日 | Hadoop Streaming Data ビッグデータ統合

前回のブログ、「TalendとApache Spark:技術的な手引きと概要」では、Talend Sparkジョブとspark-submitの対応について説明しました。このブログ記事では、Apache spark-submitとの比較でTalend Sparkの構成を引き続き評価していきます。最初に、Talend Sparkジョブでの[Spark Configuration]タブのオプションをspark-submitに引数として渡す先にマッピングする方法を検討し、それらの使用について説明します。 コマンドの違い お使いの環境でApache Sparkジョブ(Sparkが正常に機能することを確認するために使用される、Hadoopクラスターでデフォルトとして提供されるApache Sparkサンプルジョブなど)を実行するときは、次のコマンドを使用します。 export HADOOP_CONF_DIR=XXX./bin/spark-submit –class org.apache.spark.examples.SparkPi –master yarn –deploy-mode client –executor-memory 5G –num-executors 10 /path/to/examples.jar 1000 上記の2つのコマンドは、spark-submitジョブがクラスター構成ファイルを読み込むディレクトリーを設定します。次に、Sparkサンプルジョブを実行するために、10のエクゼキューターと5Gのメモリーを使用して、クライアントモードによりYARNクラスター上でSparkを実行するspark-submitコマンドを発行します。 次に、同じSparkサンプルジョブがTalendでどのように実行されるのかを見てみましょう。TalendでSparkサンプルジョブ(上記のようなもの)を実行すると、すべてのSpark構成情報が実行タブ内の次のタブに入力されます。 ここでいくつか疑問が生まれます。Talendに入力した情報は、Sparkジョブを実行するために端末に入力した情報にどのように対応するのか? どのくらいの数のエグゼキューターとメモリーを要求したのかを、どうやって知ることができるのか? トラブルシューティングについてはどうか? これらの質問すべてに答えていきます。 まず、このブログで使用されるspark-submitのオプションをいくつか紹介します。Apache Sparkドキュメントによると、これらはspark-submitスクリプトに渡すことができる一般的なオプションです。 –class:これは、Sparkアプリケーションの主なエントリーポイントです。 –master:このオプションでは、SparkマスターをスタンドアロンのSparkとして使用するか、YARN上でSparkを使用するかを指定します。 –deploy-mode:前のブログで述べたように、これは利用可能な2つのYARNモードに移り、Sparkドライバーの展開方法を詳述します。 –conf:このオプションでは、ジョブに使用させる追加のSpark構成(たとえば、spark.executor.userClassPathFirst=true)を渡します。 –application-jar:これは、Apache Sparkが実行するSparkのコンパイル済みコードを配置した場所のパスを指します。 –application-arguments:このオプションでは、Sparkコードに固有の引数を渡します。 では、Talend Sparkジョブ内で上記のオプションがどのように使用されるのかを見てみましょう。実行タブの[Spark Configuration]タブでは、設定可能なさまざまなオプションが論理的に次のカテゴリに分類されています。 クラスターのバージョン 構成 認証 調整 …

Read Article

TalendとApache Spark:技術的な手引きと概要

| 2017年9月15日 | Developer Step-by-Step Hadoop ビッグデータ統合

Talendのカスタマーサクセスアーキテクトチームに加わる前の数年間は、サポートエンジニアとして、Apache SparkでのTalendの機能についてお客様からよく質問を受けました。Sparkについて話すとき、最初に頭に浮かぶのはspark-submitコマンドです。これはSparkジョブをサブミットするために使用するものです。そのため、TalendのSparkジョブと通常のspark-submitの対応についての疑問が自然に生じます。このブログでは、提供されているさまざまなApache Sparkモード、Talendで使用されるモード、およびTalendとApache Sparkの連携について説明します。 Apache Sparkジョブについて Apache Sparkでは、2種類のジョブをサブミットできす。そのうちの1つはSpark Batchで、もう1つはSpark Streamingです。Spark Batchはバッチ処理モデルで動作します。このモデルでは、一定期間にわたって収集されたデータセットがSparkエンジンに送信され、処理されます。 他方のSpark Streamingはストリーミングモデルで動作し、データがSparkエンジンに1つずつ送信され、処理がリアルタイムで行われます。Talendは両方のジョブタイプをサポートしており、それぞれのタイプ向けにSparkジョブを作成できます。Talend Studioでは、ライセンスに応じて、Spark Batchジョブを作成するための「ビッグデータバッチ」とSpark Streamingジョブを作成するための「ビッグデータストリーミング」のオプションを使用できます。 TalendとApache Sparkについて、さらに検討 先に進む前に、このブログで使用される重要な概念をいくつか紹介します。 Sparkドライバー:アプリケーションをSparkマスターに送り、Spark Contextを作成・実行します。 Sparkマスター:Sparkドライバーの定義に従ってYARNからリソースを要求し、ジョブを実行するホストを見つけます。 Sparkエグゼキューター:ワーカーノード上で開始され、メモリーまたはディスク内でジョブのサブミットを実行するプロセスです。 はじめに、spark-submitまたはTalendを使用してSparkジョブがどのように機能するかについて、いくつかのコンテキストで見ていきます。Sparkのジョブには、Sparkのジョブを設定・調整する「ドライバー」が常にあります。この場合、Sparkドライバーは、接続に使用するSparkマスターやSparkエグゼキューターに割り当てるメモリー量など、ジョブが使用する構成をセットアップします。したがって、Talendは、Sparkジョブを設定・調整するSparkドライバーが常に存在するという前提の下で、spark-submitと同等の機能を果たします。 これで、Hadoopクラスター内からspark-submitを実行したときに、一部の構成情報がクラスター構成ファイルから取得されます。Talend Studioは常にHadoopクラスター上にあるわけではないので、使用できる設定を認識できるようにするために、Studioでのジョブ内でこの情報を提供する必要があります。 Sparkジョブで行われるデータ変換は、Talendでは、spark-submitプロシージャーが使用されるときに行われるものと同一のジョブのコンパイル時に行われます。spark-submitと同様に、Talendもジョブを上記のように定義された「ドライバー」として開始しますが、ジョブはドライバー内ではなく、クラスターレベルのSparkエグゼキューター上で実行されます。ジョブが開始されると、TalendはHadoopクラスターレベルで発生しているイベントを監視して、ジョブの進行状況を確認することでジョブを監視します。これは、ユーザーがspark-submitを使用する場合と似ています。 spark-submitまたはTalendジョブのいずれかを使用してSparkにジョブをサブミットする場合、Hadoopクラスター構成に応じて3つのモードが提供されます。Sparkドキュメントでは、以下の3つのモードがあります(http://spark.apache.org/docs/latest/cluster-overview.html)。 1. スタンドアロン:このモードでは、SparkドライバーがジョブをサブミットするSparkマスターと、ジョブを処理するためにクラスター上で実行されるSparkエグゼキューターがあります。 2. YARNクライアントモード:ここでは、各ジョブに割り当てられたSparkワーカーデーモンがYARNフレームワーク内で開始・停止されます。上記で説明したSparkドライバーは、Talendジョブを実行しているのと同じシステム上で実行されます。 3. YARNクラスターモード:SparkマスターとSparkエグゼキューターはYARNフレームワーク内で実行されます。これらはジョブと共に開始・停止します。この場合、SparkドライバーもHadoopクラスターレベルのYARN内で動作します。 Sparkが提供するモードを定義したので、次にTalendが提供する機能を見ていきましょう。Talendでサポートされるモードは次のとおりです。 1. ローカル:これを選択すると、ジョブはSparkフレームワークをローカルでスピンアップして、ジョブを実行します。ローカルマシンが、Sparkマスターとして、そしてデータ変換を実行するためのSparkエグゼキューターとしても使われます。 2. スタンドアロン:このモードでは、上でも定義されているように、TalendはHadoopクラスターで定義されているSparkマスターに接続してからジョブを実行します。 3. YARNクライアントモード:上記で定義したように、Talend StudioはSparkの「ドライバー」を実行して、ジョブの開始場所からジョブのオーケストレーションを行い、次にオーケストレーションをYARNフレームワークに送信してリソースの実行と割り当てを行います。これは、Hortonworks、Cloudera、MapR、Amazon EMRなどのHadoopディストリビューションで使用できる選択肢です。 4. YARNクラスター:このモードは現在、Talend内のHDInsightとCloudera Altusでのみサポートされています。前述のとおり、このモードでは、TalendはHadoopクラスターレベルのYARN内でSparkの「ドライバー」を実行します。 TalendとApache …

Read Article

LambdaからKappaまで:リアルタイムビッグデータアーキテクチャーのガイド

| 2017年8月28日 | Streaming Data ビッグデータ統合

今日では、リアルタイムビッグデータアーキテクチャーを選べるようになっています。現在の選択肢はLambdaだけではありません。このブログシリーズでは、これら2つの選択肢について説明し、関連するユースケースで比較します。リアルタイムプロジェクトに適したアーキテクチャーを選択する方法について紹介します。 リアルタイムの要件 アーキテクチャーのトピックに入る前に、ビッグデータシナリオにおけるリアルタイムデータ処理システムの要件のいくつかについて検討しましょう。 これらの要件の中でも、データが動いているという点は最も明白なものです。言い換えれば、データは連続的で無制限です。重要なのは、このデータをいつ分析するかということです。現在のデータのスナップショットに対する回答を探している場合、または特定の低レイテンシー要件がある場合は、リアルタイムのシナリオを検討しているでしょう。 さらに、多くの場合はビジネス上の期限に対応しなければなりません。結局のところ、リアルタイム分析の期限を守らなくても影響がないのであれば、バッチ処理のプロセスでもかまいません。これらの影響は、完全な障害から単なるサービスの低下まで多様です。 ここで問題となっているのはビッグデータであることから、データのボリューム、速度そしておそらく種類の限界をも押し広げることが期待されています。 リアルタイムデータ処理は、スケーラビリティ、フォールトトレランス、予測可能性、ストリームの不完全性に対する回復力などの品質を必要とし、拡張可能でなければなりません。 新データ時代の新アーキテクチャー この必要性に取り組むために、新しいアーキテクチャーが生まれました。つまり、「必要は発明の母」です。 Nathan MarzによるLambdaアーキテクチャーは、今日のリアルタイムデータ処理で最も一般的なアーキテクチャーの1つです。低レイテンシーの読み取りと更新を直線的にスケーラブルかつフォールトトレラントな方法で処理するように設計されています。 システムに到着するデータストリームは、バッチレイヤーとスピードレイヤーの両方に二重に供給されます。 バッチレイヤーは、生データを到着時に保存し、消費のためにバッチビューを計算します。当然のことながら、バッチ処理は一定の時間間隔で発生し、長期間存続します。データの範囲は数時間から数年まで及びます。 スピードレイヤーは、リアルタイムビューを計算してバッチビューを補完するために使用されます。 どのようなクエリーでも、バッチビューとリアルタイムビューの両方からデータを取得することで、全体像を把握できます。クエリーは両方の長所を利用します。バッチビューはより複雑で高価なルールで処理され、データクオリティにより優れ、スキューがより少ないかもしれません。一方、リアルタイムビューは可能な限り新しいデータへのアクセスを提供します。時間が経過するにつれて、リアルタイムデータは期限切れになり、バッチビューのデータに置き換えられます。 このアーキテクチャーのもう1つのメリットは、コードまたは式が変更された場合に、同じ受信データを再生して新しいビューを生成できることです。 このアーキテクチャーの最大の欠点は、バッチレイヤーとスピードレイヤーの両方を生成するために、2つの異なる(そして、おそらく複雑な)システムを維持する必要があることです。幸いなことに、Spark Streaming(抽象化レイヤー)やTalend(Spark BatchおよびStreamingコードジェネレーター)を使うことで、操作上の負担は残りますが、問題ははるかに少なくなります。 次に、Kappaアーキテクチャーについて説明します。 Kappaアーキテクチャーは、Jay Krepsによって最初に記述されました。データをストリームとして処理することのみに焦点を当てています。ユースケースが当てはまる場合を除き、Lambdaアーキテクチャーの代替にはなりません。このアーキテクチャーでは、受信データはリアルタイムレイヤーを介してストリーミングされ、その結果はクエリー用のサービングレイヤーに配置されます。 その目的は、リアルタイムのデータ処理と連続的な再処理の両方に単一のストリーム処理エンジンで対応することです。つまり、再処理はストリームから発生することになります。そのためには、受信データストリームを丸ごと、または特定の位置から(非常に高速で)再生できることが必要です。コードに変更があると、2番目のストリームプロセスが最新のリアルタイムエンジンを介して以前のデータをすべて再生し、サービングレイヤーに格納されているデータを置き換えます。 このアーキテクチャーは、Lambdaアーキテクチャーのようにバッチレイヤーとスピードレイヤーそれぞれのコードベースを管理するのではなく、コードベースを1つだけにすることで単純化を図ります。さらに、クエリーは、バッチビューとスピードビューに対してではなく、単一のサービング場所を検索するだけで済みます。 このアーキテクチャーの複雑さは、重複するイベントの処理、イベントの相互参照、順序操作の維持など、通常はバッチ処理で簡単に実行できるデータ処理をストリームで実行する必要があることに、主として起因します。 単独ですべてに対応できない場合もある Lambdaアーキテクチャーには多くのリアルタイムユースケースが適合しますが、Kappaアーキテクチャーについて同じことは言えません。バッチ分析とストリーミング分析が同じ場合は、Kappaを使用するのが最善の解決策です。ただし、場合によっては、バッチウィンドウで完全なデータセットにアクセスすると特定の最適化が起こるため、Lambdaの方がパフォーマンスに優れ、実装が簡単になる可能性があります。 また、バッチアルゴリズムとストリーミングのアルゴリズムがまったく異なる結果を生成する、非常に複雑な状況(機械学習モデル、エキスパートシステム、またはリアルタイムで異なる方法で実行する必要がある本質的に非常に高価な操作を使用する)もあります。そのような場合には、Lambdaを使用する必要があります。 以上、最も人気のある2つのリアルタイムデータ処理アーキテクチャーについて取り上げました。このシリーズの次回の記事では、各アーキテクチャーについてさらに詳しく説明し、具体的なユースケースと、よく見られるテクノロジーについて検討します。 参考文献: 「How to beat the CAP theorem」(Nathan Marz) http://nathanmarz.com/blog/how-to-beat-the-cap-theorem.html 「Questioning the Lambda Architecture」(Jay Kreps) https://www.oreilly.com/ideas/questioning-the-lambda-architecture 「Big Data」(Nathan Marz、James …

Read Article

Informatica PowerCenter開発者の視点で考えるTalendの機能

| 2017年6月16日 | Database Integration Developer Step-by-Step ETL / ELT

先日Talendに入社したカスタマーサクセスアーキテクトのSundaramです。Talendによるデータ戦略を管理するためのアーキテクチャーのガイドラインとベストプラクティスをお客様に提供しています。Talendに入社する前にデータウェアハウスの実装を何件か担ったことがありましたが、そのような場合にはETLツールとしてInformatica PowerCenterが一般的に採用されていました。あるテクノロジーから別のテクノロジーへの移行は、大きな課題となることがあります。しかし、PowerCenterでのやり方をTalendにそのまま「複製」するのではなく、一歩引いて、Talendの仕組みや機能、PowerCenterとの違いを理解することが重要です。このブログでは、Informaticaから移行して先端的な統合プラットフォームを導入した経験をご紹介し、皆さんがInformaticaからTalendに移行する際の作業を最小限に抑えるうえでの参考にしていただきたいと思います。 Talend vs. Informatica PowerCenter:両者の違いについて どちらのツールも本質的にはデータをソースからターゲットに移行するという同じ機能を果たしますが、その実現方法が異なっています。それぞれの方法にはそれぞれのメリットがあります。ETLジョブを設計する前に、これらの方法の長所と短所を理解することが重要です。 まず、どちらのツールもグラフィカルユーザーインターフェイスを提供し、ソースからデータを抽出してトランスフォーメーションを行い、ターゲットにロードしますが、実装が異なることを理解する必要があります。TalendはネイティブJavaコードを生成するので、どこでも実行できます。一方、PowerCenterは、Informatica独自のエンジンが実行に使用するメタデータを生成してRDBMSリポジトリに格納します。 Talendがコードジェネレーターであることを理解しておくことが重要です。つまり、ETL(独自のスタンドアロンサーバーで実行)エンジンとしても、ELT(ターゲットサーバーでネイティブに実行)エンジンとして実行できます。Talendによって生成されたJavaコードは、Javaをサポートするプラットフォームであれば、どこでも(データセンターのサーバーでも、クラウド上でも、またはラップトップ上でも)実行できます。どちらのプラットフォームもデータ統合に必要なタスクの大半を処理するコンポーネントを提供していますが、カスタマイズが必要な場合もあります。そのためにカスタムコーディングが行われますが、このプロセスがPowerCenterを使用する際のやっかいで非効率的な点です。一方、Talend Open Studioの場合はお客様独自のカスタムコンポーネントをJavaで構築し、面倒なく統合できます。これらが、データ統合ジョブを設計する際に重要となる考慮事項です。 ダウンロード >> Talend Open Studio for Data Integration ジョブの設計方法 ジョブの構築方法が異なる点も重要です。PowerCenterの場合は、まずマッピング(本質的には「データフロー」)を開発します。この段階で、ソースとターゲットの間のマッピングとトランスフォーメーションロジックが定義されます。マッピングが検証され、そのメタデータがリポジトリに保存されると、セッションとワークフロー(「プロセスフロー」)が作成されます。その後、ソースとターゲットのオブジェクトへの物理接続が割り当てられ、タスクが実行順に並べ替えられ、エラー処理/通知手順が実装されます。 Talendでは、データフローとプロセスフローの両方がシームレスに実装されます。「データフロー」を実装する特定機能を提供するさまざまなコンポーネントを使用して、「プロセスフロー」を定義するジョブを構築します。「プロセスフロー」は、特定のスキーマに基づく「行」を使用するコンポーネント間の「トリガー」と「データフロー」を使用して実装されます。 理解しやすいように、PowerCenterとTalendの概念を対応させて説明します。 Informatica PowerCenter Talend Studio 説明 リポジトリ プロジェクトリポジトリ PowerCenterリポジトリとTalendプロジェクトリポジトリには、再利用可能なメタデータオブジェクト(ジョブ、DB接続、スキーマ定義など)が含まれます。Talendの場合は、独自のソースコード管理システムを使用するのではなく、SVNまたはGITのソースコード管理システムにシームレスに統合されます。 フォルダー フォルダー フォルダーは、機能に基づいてオブジェクトを整理するのに役立ちます。PowerCenterの場合はサブフォルダが許可されませんが、Talendでは許可されます。 ワークフロー ジョブ ワークフロー/ジョブは、すべての接続と依存関係が定義されたETLプロセスフローを実装します。Talendの場合、ジョブはプロセスフローとデータフローの両方を表します。 ワークレット/再利用可能セッション ジョブレット ワークフロー/ジョブで再利用可能な一連のタスクの組み合わせです。エラー処理、通知、繰り返し可能プロセスなどの再利用可能コードに使用できます。 セッションとマッピング コンポーネント PowerCenterでは、接続、ファイルの場所、エラー処理がセッションで個別に定義されます。一方Talendでは、マッピングとセッションの機能が結合され、コンポーネントまたはプロセス/データフローによりリンクされたコンポーネントセットに実装されます。 トランスフォーメーション コンポーネント Talendにはコンポーネントの大規模ライブラリーがあり、これがさまざまなトランスフォーメーションをサポートします。たとえば、頻繁に使用されるコンポーネントのtMapは、Informaticaの式、ルックアップ、ルーター、ジョイナーのトランスフォーメーションを組み合わせたものとなります。 ソースとターゲット …

Read Article