VIRTUAL TRAINING : Step-by-Step to Big Data Integration

Building Ingestion Pipelines for IoT Data with Talend Data Streams

Building Ingestion Pipelines for IoT Data with Talend Data Streams

  • Benoit Barranco
    Benoît Barranco is a Customer Success Architect at Talend. He brings years of experience working with customers and partners across multiple industries, helping them in their solution architectures to become data-driven businesses. In recent years, Benoit has focused on Analytics and Big Data, Data science and Internet of Things working at IBM and PPG Industries. Outside of the office, he likes spending time with his family (wife and twins), playing rugby, Kitesurfing, skiing, snowboarding and making electronics projects.
  • June 01, 2018

Introduction

This month, Talend released a new product called Talend Data Streams. Talend Data Streams was designed for data scientists, analysts and engineers to make streaming data integration faster, easier and more accessible.  I was incredibly excited when it finally released on the AWS marketplace and have been testing out a few use cases.

Today, I want to walk you through a simple use case of building ingestion pipelines for IoT data. I’m going to show you how to connect to your Kafka queue from Talend Data Streams, collect data from an IoT device, transform that raw data and then store it in an S3 bucket. Let’s dive in!

Pre-requisites

If you want to build your pipelines along with me, here’s what you’ll need :

Let's Get Started: IoT with Talend Data Streams

Kafka

Network setup

I’m going to start with the network setup. Here I have an Amazon Web Services EC2 Windows instance and I've installed Apache Zookeeper and Kafka using the default settings and ports (Zookeeper: 2181; Kafka: 9092) as mentioned in this article: Install & setup Kafka on Windows.

A couple of things to remember as you are setting up your Kafka network

  • On the Kafka machine, make sure that all firewalls are turned off.
  • On AWS management console, make sure that you've create inbound rules that allow all TCP and UDP traffic from your Data Streams AMI (using the security group Id of your Data Streams AMI).

Launch Kafka 

Run Zookeeper then Kafka as described in the article:

  • Launch Zookeeper
zkserver
  • Launch Kafka
.\bin\windows\kafka-server-start.bat .\config\server.properties

Zookeeper and Kafka up and running

Create Kafka topic

To finish up, let's create a Kafka topic from the command line tools. I'm going to label the topic "mytopic".

kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytopic

Your Kafka broker is now up and running!

Setting Up Your S3 Bucket

Create your S3 bucket

Now, we need somewhere to put the incoming data. Let’s create that place using Amazon S3. To get started, login to your AWS account and look for S3 service in the “Storage” category.

AWS Management Console

Create a new bucket from the user interface.

Next, fill in your bucket name and select a region.

Leave the default setting by clicking twice on Next then “Create Bucket”.

Create IAM role for an S3 bucket

In order to access the bucket from a third-party application, we need to create an IAM role that has access to S3 and generates the Access Key ID and Secret Access Key. It’s a critical set up in this use case. Here’s how you get that done.

From the AWS management console, look for IAM.

Select the User section and click on Add user.

Choose a username and tick the box "Programmatic Access" then click on "Next".

To make this easy we will use existing policies for S3 with full access. To do this, select “Attach existing policies” and check the AmazonS3FullAccess (you can change the policy setting afterward).

Make sure your setup is correct and click on "Create User".

Now write down your access key and click on "Show" to see your secret key (as you will see it just once).

IoT device: Raspberry Pi & Arduino Uno

Set up your device

I have a Raspberry Pi with internet connectivity over wifi; I've set it up using this tutorial: Install Raspbian using NOOBS. An Arduino Uno is connected to the Raspberry Pi over serial. It has one RGB LED, a temperature and humidity sensor and a distance sensor (if you are interested to learn more about it, contact me and I'll share my setup with you).

The Arduino Uno reads temperature, humidity and distance values from the sensors, the RGB LED color change based on the distance measured.

Send Sensor Data to Kafka

The Raspberry Pi acts as a cloud gateway and a hub that collects sensor values from the Arduino. I'm using Node-red (embedded with Raspbian) on the Pi in order to read sensor value from serial and send them to Kafka broker.

Node-red Flow on the Raspberry Pi

Talend Data Streams

As a reminder, if you don't have your own Data Streams AMI follow this tutorial: Getting Started on Talend Data Streams

Talend Data Streams is free of charge in this open source version, and you only pay for AWS storage and compute consumption.

Create Connections

When your Data Streams AMI is up and running you can access it using the public DNS.

On the login screen use “admin” for user and your AMI ID (found in the EC2 console) for the password.

Now select the Connections section and click on Add Connection.

Let's create first our Kafka connection. First, give it a name, select the type as Kafka and fill in your broker DNS with port and click on Check Connection then Validate.

Now create the S3 bucket connection, check the connection and Validate.

Create your Dataset

Click on Datasets then Add Dataset. From there, select the Kafka connection that we've just created, write the topic where the IoT device is sending over data (mytopic), choose the value format (CSV) and the field delimiter (;) and click on View Sample.

Do the same for your Amazon S3 instance.

Create a Pipeline

Now that our source and target are set, it's time to create a Data Streams pipeline. Here's how to do that:

  • Click on Pipeline and press ADD PIPELINE.
 
  • Now edit the name of your pipeline.

  • Now on the canvas, click on "Create Source".

  • Select the dataset we created on the top of our Kafka broker and press select dataset.

Now you have your first source defined on your canvas, and you can press the refresh button to retrieve new data from Kafka.

 

Now, click on the green + button next to your Kafka component and select a Python Row processor.

The Python processor is used to rename the column, change data type and create an extra field based on the value of the Distance sensor. Copy and paste the Python code below and click on Save.

output = json.loads("{}")
 
led="test"
 
test=input['field2']
 
 
 
if test <= 20:
 
    led = "Red"
 
elif test > 80:
 
    led="Green"
 
else:
 
    led="Blue"
 
   
 
output['Temperature'] = int(input['field0'])
 
output['Humidity'] = int(input['field1'])
 
output['Distance'] = int(input['field2'])
 
output['LedColor'] = led
 
 
 
outputList.append(output)

Let's now add a sink. I'm going to use the S3 bucket connection I created earlier. Click on Create sink in the canvas.

Select the loT dataset from Amazon S3.

Now that we are all set, press the run button on the top to run your pipeline.

Just like that, we've built our first Talend Data Streams pipeline that reads from Kafka, uses Python Row to process the data that is then stored on Amazon S3.

In my next blog post, we will dig deeper into Talend Data Streams components and capabilities by leveraging this pipeline to create a real-time anomalies detection model on the humidity sensor, using the Window component and by calculating the Z-Score for each sensor value in a Python processor.

Happy Streaming!

Join The Conversation

0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *