Integrating Real-Time IoT data with Python and Talend Pipeline Designer


Following my previous post on Pipeline Designer, today, I want to walk you through a simple use case of anomaly detection pipeline for IoT data. I’m going to show you how to connect to your Kafka queue from Pipeline Designer, collect data from an IoT device, transform the data, create a streaming window and build Z-Score Model with a Python processor to detect anomalies in humidity readings.


If you want to build your pipelines along with me, here’s what you’ll need:

  • Kafka running instance (if you don’t have one have a look at this article: Install & setup Kafka on Windows)
  • An AWS account
  • A Talend Cloud Account and an access to Pipeline Designer (you can create and use a trial account Try Pipeline designer)
  • An IoT device (can be replaced by any IoT data simulator)

Let’s Get Started: IoT with Talend Pipeline Designer


In my previous post, I’ve described how to install, run and create a topic in Kafka. Feel free to reference that post if you need more detail on how to use Kafka here. In this post, I’ll focus on Talend Pipeline Designer itself.

IoT device: Raspberry Pi & Arduino Uno

I will be using the same device as my previous post, but this time we will analyze humidity values. As I mentioned before, you could use any device simulator.

Humidity sensor attached to Arduino, Raspberry reading sensor value from serial.

Anomaly detection: Z-Score model

Anomaly detection

Anomaly detection is a technique used to identify unusual patterns that do not conform to expected behavior, called outliers. It has many applications in business, from intrusion detection (identifying strange patterns in network traffic that could signal a hack) to system health monitoring (spotting a malignant tumor in an MRI scan), and from fraud detection in credit card transactions to fault detection in operating environments.

One of the most popular and simple method for outlier detection in the IoT world is Z-Score.

You can forget about importing NumPy, SciPy or Scikit learn libraries since Data Streams can only access to Python 2.7 standard libraries for now PythonDoc2.7 This is not a problem that we can’t fix. Let’s start with Statistics lesson and calculate the Z-Score manually.


The z-score or standard score of an observation is a metric that indicates how many standard deviations a data point is from the sample’s mean, assuming a Gaussian (normal) distribution. This makes z-score a parametric method.

Let’s do the Math

After making the appropriate transformations to the selected feature space of the dataset (in our case creating a sliding window), the z-score of any data point can be calculated with the following expression:

x is the sensor value, m is the mean of the sensor values within the window and the s Standard deviation.


So now what is the Standard Deviation? The simple definition according to MathIsFun is that a standard deviation is a measure of how spread out numbers are. It can be calculated with the following expression:

xi is the series of sensor value within the defined windows and N the number of values within the window.



Coming back to the Z-Score with a simple example, the goal is to find how many standard deviations a value is from the mean. 

In this example, the value 1,7 which is 2 standard deviations away from the mean of 1,4, so 1,7 has a z-score of 2. Similarly 1,85 has a z-score of 3. So to convert a value to a Z-Score  first subtract the mean, then divide by the standard deviation

In IoT use cases we usually consider that an anomaly is detected when the sensor value is outside the -2s and +2 s range.Z-score is a simple yet powerful method to get rid of outliers in data if you are dealing with parametric distributions in a low dimensional feature space.

Pipeline Designer: Getting Started

I’ll assume for this part that you have a Talend Cloud Account, you have access to Pipeline and you have a Remote Engine for Pipelines up and running. If you don’t, follow this tutorial: Getting Started With Talend Cloud Pipeline Designer.

Additionally, you can have a look at the Talend Pipeline Designer User Guide

Create a Connection

Let’s create a connection to the Kafka cluster that I have used in my previous blog post, select Connections section and click on Add Connection.

Create a Kafka connection, give it a name, the type is Kafka and fill in your broker DNS with port and click on CHECK CONNECTION then VALIDATE.

Create a Dataset

Click on DATASETS then ADD DATASET and select the Kafka connection that we’ve just created. Next, write the topic where the IoT device is sending over data (humidity), choose the value format (CSV), specify the field delimiter (semicolon) and click on VIEW SAMPLE.

DPipeline Designer: Pipeline

Create your Pipeline

Now click on Pipeline and press the button ADD PIPELINE.

Rename your Pipeline (we have chosen PiedPiper for this example).

On the canvas click on Create source, select the data we’ve just created and click on SELECT DATASET.

Our PiedPiper Pipeline is now ready to ingest data from the Kafka queue.

Data Preparation

Since the incoming IoT messages are really raw at this point, let’s convert current value types (string) to number, click on the green + sign next to Kafka component and select the Type Converter processor.

Let’s convert all our fields to “Integer”. To do that, select the first field (.field0) and change the output type to Integer. To change the field type on the next fields, click on NEW ELEMENT. Once you have done this for all of the fields, click on SAVE.

If you look at the Data Preview at the bottom you’ll see the new output.

Next to the Type Converter processor on your canvas, click on the green + sign and add a Windows processor—remember, in order to calculate a Z-Score, we need to create a processing window.

Now let’s set up our window. My Arduino sends sensor values every second, and I want to create a Fixed Time window that contains more or less 20 values, so I’ll choose Window duration = Window slide length = 20000 ms— don’t forget to click Save.

Since I’m only interested about Humidity, which I know is in field1, I’ll make things easier for myself later by converting the humidity row values in my window into a list of values (or array in Python) by aggregating by the field1 (humidity) data. To do this, add an aggregation processor next to the Window Z-Score component. Within the aggregation processor, choose .field1 as your Field and List as the Operation (since you will be aggregating field1 into a list).

On the Aggregate processor click on the preview button to see the list.

Calculate the Z-Score

In order to create a more advanced transformation, we need to use the Python processor, so next to the Aggregate processor add a Python Row processor.

Remember we can’t import non-standard Python libraries yet, so we need to break up the Z-Score calculation into a few steps.

Even if the code below is simple and self-explanatory, let me sum up the different steps:

  • Calculate the average humidity within the window
  • Find the number of sensor values within the window
  • Calculate the variance
  • Calculate the standard deviation
  • Calculate Z-Score
#Import Standard python libraries

import math

#average function

def mean(numbers):

    return float(sum(numbers)) / max(len(numbers), 1)

#initialize variables


#Load input list

output = json.loads("{}")

#average value for window


##lenth window



# x100 in order to wokraround Python limitation


#Calculate Variance

for i in range(len(mylist)):

    std= std + math.pow(mylist[i]-avg,2)

#Calculate Standard deviation    

stdev= math.sqrt(lon100*std/100)

#Re-import all sensor values within the window


#Calculate Z-Score for all sensor value within the window

for j in range(len(myZscore)):

    myZscore[j]= (myZscore[j]-avg)/stdev

#Ouput results




Change the Map type from FLATMAP to MAP, click on the 4 arrows to open up the Python editor and paste the code above and click SAVE. In the Data Preview, you can see what we’ve calculated in the Python processor: the average humidity, standard deviation and Z-Score array for the current window.

If you open up the Z-Score array, you’ll see Z-score for each sensor value.


Let’s go one step further and select only the anomalies, if you remember anomalies are Zscore that outside the -2s and +2 s range, in our case -0.88 and +0.88.

Next to the Python processor add a Normalize processor to flatten the python array into records, in the column to normalize type Zscore and select is list option then save.


And now add a FilterRow processor, the product doesn’t allow us yet to filter on range of values so we will filter the Absolute value of the Zscore that are superior to 0.88, we use test on the absolute value because Zscore can be negative.

The last output shows that 14 records out of the 50 are anomalies.


There you go! We’ve built our first Anomaly Detection Pipeline with Talend Cloud Pipeline Designer that reads from Kafka, uses Type Convertor, Aggregation and Window processors to transform our raw data and then Python row to calculate Standard Deviation, Average and Z-Score for each individual humidity sensor readings. Then we’ve filtered out normal values and kept anomalies.

Next step of course would be to act on anomalies detected like turning off a system before it breaks down for example.

Again, Happy Streaming!

Join The Conversation


Leave a Reply