Creating Real-Time Anomaly Detection with AWS and Talend Cloud Pipeline Designer


Thanks for continuing to read all of our streaming data use cases during my exploration of Pipeline Designer. For the last article of this series, I wanted to walk you through a complete IoT integration scenario using a low consumption device and leveraging only cloud services.

In my previous posts, I’ve used a Raspberry Pi and some sensors as my main devices. This single board computer is pretty powerful and you can install a light version of Linux as well. But in real life, enterprises will probably use System On Chip things such as Arduino, PLC, ESP8266 … Those SOC are less powerful, consume less energy and are mostly programmed in C, C++ or Python. I’ll be using an ESP8266 that has embedded Wi-Fi and some GPIO to attach sensors. If you want to know more about IoT hardware have a look at my last article “Everything You Need to Know About IoT: Hardware“.

Our use case is straightforward. First, the IoT device will send sensor values to Amazon Web Services (AWS) IoT using MQTT. Then we will create a rule in AWS IoT to redirect device payload to a Kinesis Stream. Next, from Pipeline Designer we will connect to the Kinesis stream, transform our raw data using standard components. Finally, with the Python processor, we will create an anomaly detection model using Z-Score and all anomalies will be stored in HDFS.


If you want to build your pipelines along with me, here’s what you’ll need:

  • An Amazon Web Services (AWS) account
  • AWS IoT service
  • AWS Kinesis streaming service
  • AWS EMR cluster (version 5.11.1 and Hadoop 2.7.X) on the same VPC and Subnet as your Remote Engine for Pipelines.
  • A Talend Cloud Account and an access to Pipeline Designer (you can create and use a trial account Try Pipeline designer)
  • An IoT device (can be replaced by any IoT data simulator)

High-Level Architecture

Currently, Talend Pipeline Designer doesn’t feature an MQTT connector. In order to get around this, you’ll find an architecture sample below to leverage Pipeline Designer to ingest IoT data in real-time and storing it to a Hadoop Cluster.

Preparing Your IoT Device

As mentioned previously, I ‘m using an ESP8266 or also called Node MCU, it has been programmed to:

  • Connect to a Wi-Fi hotspot
  • Connect securely to AWS IoT broker using the MQTT protocol
  • Read every second distance, temperature and humidity sensor values
  • Publish over MQTT sensor values to the topic IoT

If you are interested in how to develop an MQTT client on the ESP8266 take a look at this link. However, you could use any device simulator.

IoT Infrastructure: AWS IoT and Kinesis



The AWS IoT service is a secure and managed MQTT broker. In this first step I’ll walk you through registering your device, generate public/private key and C.A. to connect securely.

First, login to your Amazon Web Services account and look for IoT. Then, select IoT Core in the list.

Register your connected thing. From the left-hand side menu click on “Manage”, select “Things” and click on “Create”.

Now, select “Create a single thing” from your list of options (alternatively you can select “Create many things “for bulk registration of things).

Now give your thing a name (you can also create device types, groups and other searchable attributes). For this example, let’s keep default settings and click on next.

Now to secure the device authentication using the “One-click certification” creation. Click on “Create a Certificate”.

Download all the files, those have to be stored on the edge device and used with MQTT client to securely connect to AWS IoT, click on “Activate” then “Done”.

In order to allow our device to publish messages and subscribe to topics, we need to attach a policy from the menu. Click on “Secure” and select “Policies”, then click on “Create”.

Give a name to the policy, in action start typing IoT and select IoT. NOTE: To allow all actions, tick the box “Allow” below and click on “Create”.

Let’s attach this policy to a certificate, from the left menu click on “Secure” and select certificate and click on the certificate of your thing.

If you have multiple certificates, click on “Things” to make sure that the right certificate. Next, click on “Actions” and select “Attach Policy”.

Select the policy we’ve just created and click on “Attach”.

Your thing is now registered and can connect, publish messages and subscribe to topics securely! Let’s test it (it’s now time to turn on the ESP).

Testing Your IoT Connection in AWS

From the menu click on Test, select Subscribe to a topic, type IoT for a topic and click on “Subscribe to Topic”. 

You can see that sensor data is being sent to the IoT topic.

Setting Up AWS Kinesis

On your AWS console search for “Kinesis” and select it.

Click on “Create data stream”.

Give your stream a name and select 1 shards to start out. Later on if you add more devices you’ll need to increase the number of shards. Next, click on “create Kinesis stream”.

Ok, now we are all set on the Kinesis part. Let’s return back to AWS IoT, on the left menu click on “Act” and press “Create”.

Name your rule, select all the attributes by typing “*” and filter on the topic IoT.

Scroll down and click on “Add Action” and select “Sends messages to an Amazon Kinesis Stream”. Then, click “Configure action” at the bottom of the page.

Select the stream you’ve previously created, use an existing role or create a new one that can access to AWS IoT. Click on “Add action” and then “Create Rule”.

We are all set at this point, the sensor data collected from the device through MQTT will be redirected to the Kinesis Stream that will be the input source for our Pipeline Designer pipeline.

Cloud Data Lake: AWS EMR

Currently, with the Talend Cloud Pipeline Designer, you can use HDFS and Spark on Yarn only with an EMR cluster. In this part, I’ll describe how to provision a cluster and how to set up the Remote Engine Pipeline to use HDFS as a Data source in our pipelines.

Provision your EMR cluster

Continuing on your AWS Console, look for EMR.

Click on “Create cluster”.

Next, go to advanced options.

Let’s choose a release that is fully compatible with Pipeline Designer. The 5.11.1 and below will do, then select the components of your choice (Hadoop, Spark, Livy and Zeppelin and Hue in my case). We are almost there, but don’t click on next just yet.

In the Edit software settings, we are going to edit the core-site.xml when the cluster is provisioned, in order to use specific compression codecs required for Pipeline Designer and to allow root impersonation.

Paste the following code to the config:



    "Classification": "core-site",

    "Properties": {

      "io.compression.codecs": ",,,",

"hadoop.proxyuser.root.hosts": "*",

"hadoop.proxyuser.root.groups": "*"




On the next step, select the same VPC and subnet as your Remote Engine for Pipelines and click “Next”. Then, name your cluster and click “Next”.

Select an EC2 key pair and go with default settings for the rest and click on “Create Cluster”.  After a few minutes, your cluster should be up and running.

Talend Pipeline Designer and EMR set up

Still on your AWS Console, look for EC2.

You will find 3 new instances with blank names that we need to rename. Then by looking at the security groups you can identify which one is the master node.

Now we need to connect to the master node through SSH (check that your client computer can access port 22, if not add an inbound security rule to allow your IP). Because we need to retrieve Hadoop config files I’m using Cyberduck (alternatively use FileZilla or any tool that features SFTP), use the EC2 DNS for the server, Hadoop as a user and the related EC2 key pair to connect.

Download the Hadoop config files (hdfs-site.xml, mapred-site.xml, yarn-site.xml, core-site.xml) from the EMR master located in the folder /etc/hadoop/conf .

Before uploading the files to the Remote Engine for Pipelines edit them as follow:

  • core-site.xml: Edit the property io.compression.codecs with the value,,,

  • hdfs-site.xml: Add the property dfs.client.use.datanode.hostname with the value true
  • yarn-site.xml: Edit the property yarn.timeline-service.enabled with the value false

Now connect as the Hadoop user to your Master node using a terminal and SSH.

Create the /user/talend folder in HDFS:

hadoop fs -mkdir -p /user/talend

hadoop fs -chown -R talend:talend /user/talend

Create two folders in the location of your choice and give them root:root ownership.

hadoop fs -mkdir -p /talend/deps/pdesigner

hadoop fs -mkdir -p /talend/deps/runtime

hadoop fs -chown -R root:root /talend/deps

Now connect as centos user to your Remote Engine for Pipelines using a terminal and SSH.

Export the following environment variables:

  • HDFS_DSS_DEPENDENCIES_PATH to /talend/deps/pdesigner
  • HDFS_RUNTIME_DEPENDENCIES_PATH to /talend/deps/runtime

Keep this SSH connection and your terminal open as we will use it again.

Now using your favorite SFTP tool connect to your Remote Engine for Pipeline EC2 machine, using the ec2-user (allow your client to access port 22). If you don’t have a Remote Engine for Pipelines yet follow this steps to deploy one: Installing Remote Engine for Pipelines


Navigate to /opt/talend/data/etc/Hadoop and upload the Hadoop config files that you’ve edited before, over there.

Back to your terminal that is connected through ssh to the Remote Engine for Pipelines and execute the script located in /opt/talend/emr.

Now Restart Livy by executing this command:

cd /opt/talend && docker-compose restart livy


The last step is to allow all traffic from Pipeline Designer to your EMR cluster and vice versa. To do so, create security rules to allow all inbound traffic on both sides for Pipeline Designer and EMR security groups ID.


Pipeline Designer: IoT Streaming pipeline

Now it’s time to finalize our real-time anomaly detection pipeline that uses Zscore. This pipeline is based on my previous article, so if you want to understand the math behind the scenes you should read this article.

All the infrastructure is in place and required set up is done, we can now start building some pipelines. Now logon to Talend Cloud and open Pipeline Designer.

Create your Data Sources and add Data Set

In this part, we will create two data sources:

  1. Our Kinesis Input Stream
  2. HDFS using our EMR cluster

From the landing page select Connection on the left-hand side menu and click on “ADD CONNECTION”.

Give a name to your connection, Select your remote engine and for the Type select “Amazon Kinesis” in the drop-down box.

Now use an IAM user that has access to Kinesis with an Access key. Fill in the connection field with Access key and Secret, click on “Check connection” and click on “Validate”. Now from the left-hand side menu select Datasets and click on “ADD DATASET”. 

Give your dataset a name and select the Kinesis connection we’ve created before from the drop-down box. Select the region of your Kinesis stream then your Stream, CSV for the format and Semicolon for the delimiter. Once that is done, click on “View Sample” then “Validate”. 

Our input data source is set up and our samples are ready to be used in our pipeline. Let’s create our output data source connection, on the left-hand-side menu select “CONNECTIONS”, click on “ADD CONNECTION”, give a name to your connection. Then select your Remote Engine, “HDFS” for the type, use “Talend” as User name and click on “Check Connection”. If it says it has connected, then click on “Validate.


That should do it for now, we will create the dataset within the pipeline, but before going further make sure that the Remote Engine for Pipelines have access to all inbound traffic to EMR Master and Slave nodes (add an inbound network security rule for EMR ec2 machine to allow all traffic from Remote Engine for Pipeline Security group) or you will not be able to read and write to the EMR cluster.

Build your Pipeline

From the left-hand side menu select Pipelines, click on Add Pipeline.

In the pipeline, on the canvas click Create source, select Kinesis Stream and click on Select Dataset.

Back to the pipeline canvas you can see the sample data at the bottom. As you’ve noticed incoming IoT messages are really raw at this point, let’s convert current value types (string) to number, click on the green + sign next to Kinesis component and select the Type Converter processor. 

Let’s convert all our fields to “Integer”. To do that, select the first field (.field0) and change the output type to Integer. To change the field type on the next fields, click on NEW ELEMENT. Once you have done this for all fields, click on SAVE. 

Next to the Type Converter processor on your canvas, click on the green + sign and add a Windows processor, in order to calculate a Z-Score, we need to define a processing window.

Now let’s set up our window. My ESP8266 sends sensor values every second, and I want to create a Fixed Time window that contains more or less 20 values, so I’ll choose Window duration = Window slide length = 20000 ms— don’t forget to click Save. 

Since I’m only interested about Humidity, which I know is in field1, I’ll make things easier for myself later by converting the humidity row values in my window into a list of values (or array in Python) by aggregating by the field1 (humidity) data. To do this, add an aggregation processor next to the Window Z-Score component. Within the aggregation processor, choose .field1 as your Field and List as the Operation (since you will be aggregating field1 into a list). 

The next step is to calculate Z-score for humidity values. In order to create a more advanced transformation, we need to use the Python processor, so next to the Aggregate processor add a Python Row processor.

Change the Map type from FLATMAP to MAP, click on the 4 arrows to open up the Python editor and paste the code below and click SAVE. In the Data Preview, you can see what we’ve calculated in the Python processor: the average humidity, standard deviation and Z-Score array and humidity values for the current window.

Even if the code below is simple and self-explanatory, let me sum up the different steps:

  • Calculate the average humidity within the window
  • Find the number of sensor values within the window
  • Calculate the variance
  • Calculate the standard deviation
  • Calculate Z-Score
  • Output Humidity Average, Standard Deviation, Zscore and Humidity values.
#Import Standard python libraries

import math

#average function

def mean(numbers):

    return float(sum(numbers)) / max(len(numbers), 1)

#initialize variables


#Load input list

#average value for window


##lenth window



# x100 in order to workaround Python limitation


#Calculate Variance

for i in range(len(mylist)):

    std= std + math.pow(mylist[i]-avg,2)

#Calculate Standard deviation   

stdev= math.sqrt(lon100*std/100)

#Re-import all sensor values within the window


#Calculate Z-Score for all sensor value within the window

for j in range(len(myZscore)):

    myZscore[j]= (myZscore[j]-avg)/stdev

#Ouput results




If you open up the Z-Score array, you’ll see Z-score for each sensor value.

Next to the Python processor add a Normalize processor to flatten the python array into records, in the column to normalize type Zscore and select is list option then save.

Let’s now recalculate the initial humidity value from the sensor, to do that we will a python processor and write the below code :

#Ouput results






Don’t forget to change the Map type to MAP and click save. Let’s go one step further and select only the anomalies, if you had a look at my previous article, anomalies are Zscores that are outside the -2 Standard Deviation and +2 Standard deviation range, in our case the range is around -1.29 and +1.29. And now add a FilterRow processor, the product doesn’t allow us yet to filter on range of values, so we will filter the Absolute value of the Zscore that are superior to 1.29, we test on absolute value because Zscore can be negative.

The last output shows that 5 records that are anomalies out of the 50 sample records. Let’s now store those anomalies to HDFS, click on “Create a Sink” on the canvas an click on “Add Dataset”. Set it up as per below and click on Validate.

You will end up with an error message, don’t worry it’s just a warning Pipeline Designer cannot fetch sample of a file that has not been created yet. We are now all set, let’s run the pipeline by clicking on the play button on the top.

Let’s stop the pipeline and have a look at our cluster, using Hue on EMR you can easily browse hdfs, go to user/Hadoop/anomalies.csv. Each partition file contains records that are anomalies for each processing windows.

There you go! We’ve built our Anomaly Detection Pipeline with Pipeline Designer, reading sensor values from a SOC based IoT device and only using cloud services. The beauty of Pipeline Designer is that we accomplished all of this without writing any code (apart from the Zscore calculation). I’ve only used the beautiful web UI.

To sum up, we’ve read data from Kinesis, used Type Convertor, Aggregation and Window processors to transform our raw data and then Python row to calculate Standard Deviation, Average and Z-Score for each individual humidity sensor readings. Then we’ve filtered out normal values and stored anomalies in HDFS of an EMR cluster.

That was the last article of this series. Stay tuned, I’ll write the next episodes when new feature will come up. Again, Happy Streaming!

Join The Conversation


Leave a Reply