DOWNLOAD : The 2018 Gartner Magic Quadrant for Data Integration Tools

Building Real-Time IoT Data Pipelines with Python and Talend Data Streams

Building Real-Time IoT Data Pipelines with Python and Talend Data Streams

  • Benoit Barranco
    Benoît Barranco is a Customer Success Architect at Talend. He brings years of experience working with customers and partners across multiple industries, helping them in their solution architectures to become data-driven businesses. In recent years, Benoit has focused on Analytics and Big Data, Data science and Internet of Things working at IBM and PPG Industries. Outside of the office, he likes spending time with his family (wife and twins), playing rugby, Kitesurfing, skiing, snowboarding and making electronics projects.
  • June 07, 2018

Introduction

I was incredibly excited when Talend finally released a new product called Talend Data Streams that you can access for on free on the AWS marketplace. From day one, I have been testing out a few use cases.

<<Download Talend Data Streams for AWS Now>>

Following my previous post on Data Streams, today, I want to walk you through a simple use case of anomaly detection pipeline for IoT data. I’m going to show you how to connect to your Kafka queue from Talend Data Streams, collect data from an IoT device, transform the data, create a streaming window and build Z-Score Model with a Python processor to detect anomalies in humidity readings.

Pre-requisites

If you want to build your pipelines along with me, here’s what you’ll need:

Let's Get Started: IoT with Talend Data Streams

Kafka

In my previous post, I’ve described how to install, run and create a topic in Kafka. Feel free to reference that post if you need more detail on how to use Kafka here. In this post, I’ll focus on Talend Data Streams itself.

IoT device: Raspberry Pi & Arduino Uno

I will be using the same device as my previous post, but this time we will analyze humidity values. As I mentioned before, you could use any device simulator.

Humidity sensor attached to Arduino, Raspberry reading sensor value from serial.

Anomaly detection: Z-Score model

Anomaly detection

Anomaly detection is a technique used to identify unusual patterns that do not conform to expected behavior, called outliers. It has many applications in business, from intrusion detection (identifying strange patterns in network traffic that could signal a hack) to system health monitoring (spotting a malignant tumor in an MRI scan), and from fraud detection in credit card transactions to fault detection in operating environments.

One of the most popular and simple method for outlier detection in the IoT world is Z-Score.

You can forget about importing NumPy, SciPy or Scikit learn libraries since Data Streams can only access to Python 2.7 standard libraries for now PythonDoc2.7 This is not a problem that we can’t fix. Let’s start with Statistics lesson and calculate the Z-Score manually.

Z-Score

The z-score or standard score of an observation is a metric that indicates how many standard deviations a data point is from the sample’s mean, assuming a Gaussian (normal) distribution. This makes z-score a parametric method.

Let’s do the Math

After making the appropriate transformations to the selected feature space of the dataset (in our case creating a sliding window), the z-score of any data point can be calculated with the following expression:

x is the sensor value, m is the mean of the sensor values within the window and the s Standard deviation.

 

So now what is the Standard Deviation? The simple definition according to MathIsFun is that a standard deviation is a measure of how spread out numbers are. It can be calculated with the following expression:

xi is the series of sensor value within the defined windows and N the number of values within the window.

 

 

Coming back to the Z-Score with a simple example, the goal is to find how many standard deviations a value is from the mean. 

In this example, the value 1,7 which is 2 standard deviations away from the mean of 1,4, so 1,7 has a z-score of 2. Similarly 1,85 has a z-score of 3. So to convert a value to a Z-Score  first subtract the mean, then divide by the standard deviation

In IoT use cases we usually consider that an anomaly is detected when the sensor value is outside the -2s and +2 s range.Z-score is a simple yet powerful method to get rid of outliers in data if you are dealing with parametric distributions in a low dimensional feature space.

Data Streams: Getting Started

I’ll assume for this part that you have a Data Streams AMI up and running. If you don’t, follow this tutorial: Getting Started on Talend Data Streams.

When your Data Streams AMI is up and running you can access using the public DNS, on the login screen use Admin for user and the AMI id for the password.

Additionally, you can have a look at the Talend Data Streams Documentation.

Create a Connection

Let’s create a connection to the Kafka cluster that I have used in my previous blog post, select Connections section and click on Add Connection.

Create a Kafka connection, give it a name, the type is Kafka and fill in your broker DNS with port and click on CHECK CONNECTION then VALIDATE.

Create a Dataset

Click on DATASETS then ADD DATASET and select the Kafka connection that we've just created. Next, write the topic where the IoT device is sending over data (humidity), choose the value format (CSV), specify the field delimiter (semicolon) and click on VIEW SAMPLE.

Data Streams Pipeline

Create your Pipeline

Now click on Pipeline and press the button ADD PIPELINE.

Rename your Pipeline (we have chosen PiedPiper for this example).

On the canvas click on Create source, select the data we’ve just created and click on SELECT DATASET.

Our PiedPiper Pipeline is now ready to ingest data from the Kafka queue.

Data Preparation

Since the incoming IoT messages are really raw at this point, let’s convert current value types (string) to number, click on the green + sign next to Kafka component and select the Type Converter processor.

Let’s convert all our fields to “Integer”. To do that, select the first field (.field0) and change the output type to Integer. To change the field type on the next fields, click on NEW ELEMENT. Once you have done this for all of the fields, click on SAVE.

If you look at the Data Preview at the bottom you’ll see the new output.

Next to the Type Converter processor on your canvas, click on the green + sign and add a Windows processor—remember, in order to calculate a Z-Score, we need to create a processing window.

Now let’s set up our window. My Arduino sends sensor values every second, and I want to create a Fixed Time window that contains more or less 20 values, so I’ll choose Window duration = Window slide length = 20000 ms— don’t forget to click Save.

Since I’m only interested about Humidity, which I know is in field1, I’ll make things easier for myself later by converting the humidity row values in my window into a list of values (or array in Python)by aggregating by the field1 (humidity) data. To do this, add an aggregation processor next to the Window Z-Score component. Within the aggregation processor, choose .field1 as your Field and List as the Operation (since you will be aggregating field1 into a list).

On the Aggregate processor click on the preview button to see the list.

Calculate the Z-Score

In order to create a more advanced transformation, we need to use the Python processor, so next to the Aggregate processor add a Python Row processor.

Remember we can’t import non-standard Python libraries yet, so we need to break up the Z-Score calculation into a few steps.

Even if the code below is simple and self-explanatory, let me sum up the different steps:

  • Calculate the average humidity within the window
  • Find the number of sensor values within the window
  • Calculate the variance
  • Calculate the standard deviation
  • Calculate Z-Score
#Import Standard python libraries

import math



#average function

def mean(numbers):

    return float(sum(numbers)) / max(len(numbers), 1)



#initialize variables

std=0



#Load input list

output = json.loads("{}")



#average value for window

avg=mean(input['humidity'])



##lenth window

mylist=input['humidity']

lon=len(mylist)

# x100 in order to wokraround Python limitation

lon100=100/lon



#Calculate Variance

for i in range(len(mylist)):

    std= std + math.pow(mylist[i]-avg,2)



#Calculate Standard deviation    

stdev= math.sqrt(lon100*std/100)



#Re-import all sensor values within the window

myZscore=(input['humidity'])



#Calculate Z-Score for all sensor value within the window

for j in range(len(myZscore)):

    myZscore[j]= (myZscore[j]-avg)/stdev



#Ouput results

output['HumidityAvg']=avg

output['stdev']=stdev

output['Zscore']=myZscore

Change the Map type from FLATMAP to MAP, click on the 4 arrows to open up the Python editor and paste the code above and click SAVE. In the Data Preview, you can see what we’ve calculated in the Python processor: the average humidity, standard deviation and Z-Score array for the current window.

If you open up the Z-Score array, you’ll see Z-score for each sensor value.

Let’s go one step further and select only the anomalies, if you remember anomalies are Zscore that outside the -2s and +2 s range, in our case -0.88 and +0.88.

Next to the Python processor add a Normalize processor to flatten the python array into records, in the column to normalize type Zscore and select is list option then save.

And now add a FilterRow processor, the product doesn’t allow us yet to filter on range of values so we will filter the Absolute value of the Zscore that are superior to 0.88, we use test on the absolute value because Zscore can be negative.

The last output shows that 14 records out of the 50 are anomalies.

Conclusion

There you go! We’ve built our first Anomaly Detection Pipeline with Data Streams that reads from Kafka, uses Type Convertor, Aggregation and Window processors to transform our raw data and then Python row to calculate Standard Deviation, Average and Z-Score for each individual humidity sensor readings. Then we’ve filtered out normal values and kept anomalies.

Next step of course would be to act on anomalies detected like turning off a system before it breaks down for example.

Again, Happy Streaming!

Join The Conversation

0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *