The 2019 Gartner Magic Quadrant for Data Quality Tools : Talend named a leader

Generating a Heat Map with Twitter data using Pipeline Designer – Part 1

Generating a Heat Map with Twitter data using Pipeline Designer – Part 1

  • Richard Hall
    With more than 10 years of data integration consulting experience (many of which having been spent implementing Talend), Richard really knows his stuff. He’s provided solutions for companies on 6 of the 7 continents and has consulted across many different market verticals. Richard is a keen advocate of open source software, which is one of the reasons he first joined Talend in 2012. He is also a firm believer in engaging developers in “cool ways”, which is why he looks for opportunities to demonstrate Talend’s uses with technologies found around the home. Things like hooking his Sonos sound system to Twitter via Talend, getting Google Home to call Talend web services, and controlling his TV with Talend calling universal plug and play services, are a handful of examples.Prior to 2019, Richard had been running his own business providing Talend solutions. During that time he became a prominent contributor on Talend Community, providing both examples of how to solve business problems and also how to do some of the cool stuff mentioned above. In 2019 he was invited to return to Talend as the Technical Community Manager.

For me, the most exciting thing about Pipeline Designer is the way that it makes working with streaming data easy. Traditionally this has required a completely different way of thinking if you have come from a "batch" world. So when Pipeline Designer was released, the first thing I wanted to do was to find a good streaming data source and do something fun and interesting with the data.

Twitter was my first choice of streaming data. The data is easy to acquire, constantly being produced and is not limited to a specific genre or domain. The challenge I set myself was to build a solution using Talend products to acquire and process the Twitter data, and AWS tools to store and present the data. This is the first of a couple of blogs I will write to demonstrate exactly how I have taken the Twitter data, processed it and presented it in a heat map.

 

The tools and services I will be using for this project are as follows...

Twitter  - To supply the data

Talend ESB - To retrieve the data, serialize it into an Apache Avro schema, and send it to AWS Kinesis

AWS Kinesis - To collect and stream the data to Talend Pipeline Designer

Talend Pipeline Designer - To process the data and output it to an AWS Elasticsearch Service

AWS Elasticsearch Service - To analyse the data and produce the heat map

 

In this first blog, I will focus on acquiring the data from Twitter, serializing it to Apache Avro and sending it to AWS Kinesis.

 

Creating a Twitter App

The first thing we need to do is to configure a developer account with Twitter. I could spend ages taking screenshots of how I did this, only for Twitter to change the process in a few months time. So instead of doing that, I will give you the objectives you need to achieve with Twitter and list where you can get the information using Twitter's own documentation. 

The first step is to apply for a Twitter developer account.

The second step is to create a Twitter App and generate tokens for your app. Once you've read the linked page and followed a few links from there, you may still be a little confused about the settings you need to set. So this short step by step guide should fill in the blanks. Hopefully the gist of this will not change too much if Twitter do evolve their developer environment.

  1. Click on the "Create an app" button to reveal the following screen. The fields (in the image you can see) which you need to populate are the "App name", the "Application description", and the "Website URL". You can essentially use anything you want for these values. The "Website URL" can be completely made up.  However, the "App name" must be unique.




    The bottom half of the "Create an app" form can be seen below. I've only filled in the required fields. In the bottom half of the form only an application description is required.  You can then click on "Create".



  2. Assuming that everything you entered is OK, you will see the next screen. If there was a problem, you will need to fix the problem before getting to this screen.



    At the top of the screen you will see a link with the title "Keys and tokens". Click that.

  3. This is where you can configured your keys and tokens. These are the whole point of going through this process of creating a Twitter app. They will be used as Context Variable values for the Route we will create.



    The "API key" and the "API secret key" will already be generated. You can regenerate these if you wish. However, before you are finished with this process you will need to create your "Access token & Access token secret". Click on the "Create" button to generate these. Once finished, you will see the screen below.



    Copy these keys and tokens ready to be used later. These MUST be kept secret otherwise you run the risk of somebody being able to attack your Twitter account.

 

Configuring an AWS Kinesis Stream

If the Twitter data stream is the source for this subset of the project, an AWS Kinesis stream is the target. Since it always makes sense to get your source and target configured before working on the "bit in the middle", we will configure our AWS Kinesis stream before I get to the Talend ESB Route, which will join the dots. First you will need an AWS account. As with the section on configuring Twitter, I will point you to official documentation on this here.

The next thing you need to do is to create your Kinesis Stream. For this I will point you towards the AWS documentation, but I will also share some screenshots of what I did to configure mine. It is pretty straight forward and hopefully those of you who already have AWS accounts will be fully configured in the time it will take you to read this section.

 

  1. Click on the "Services" link (in the blue box) at the top of the AWS dashboard to reveal the screen you see below. Then click on the "Kinesis" link (in the red box).



  2. Then select your region (where the yellow box is). I have selected London here. Once you have selected that, click on the "Create data stream" button (in the red box).



  3. We now have to fill out the "Kinesis stream name" (the red box), the "Number of shards" (the blue box) and click on "Create Kinesis stream" (the green box). I have chosen 1 shard for this project as that is all it will need for the data we will be processing. 


  4. Our Kinesis stream is now configured. Remember the stream name and the region for when we are creating the Talend Route. We will also need this information when we get to creating our Pipeline in the next blog.

We now have our source and target configured, we can now look at building our Talend Route.

 

Creating a Talend Route to send Tweet data to our Kinesis Stream

There are 4 processes we need to carry out with this Talend Route

  1. Retrieve a stream of Tweets from Twitter
  2. Convert those Tweets into a JSON String
  3. Serialize that JSON String into Apache Avro
  4. Send the Apache Avro data message to our AWS Kinesis stream

Before I started writing this blog I realised that Avro serialization was a potentially massive subject. I have written a couple of blogs on this already. I will be referring to those blogs in this section, as the JSON format I shall be using here is the format I used in the blogs. I will link to them when they are needed, but you can also see the links below incase you are interested in getting an understanding before we get to that point in this project.

The second link is where I talk about the JSON format we shall be using here.

As far as this Talend Route is concerned, it is pretty simple. It consists of 3 components and 2 code routines. I will start by explaining the code routines. Once they are explained, we have all of the pieces we need and can just fit them together in a few easy steps.

 

Code Routines

There are two code routines I use with this Talend Route. 

I have re-used a code routine from the Talend Pipeline Designer – Avro schema considerations when working with Java and Python blog. This will be used unchanged, so I won't go over this again. This routine is called "AVROUtils".

In order to send messages to AWS Kinesis I have created a new code routine. I will describe this routine here.

 

AWSKinesisUtils

The "AWSKinesisUtils" routine is a relatively basic routine which simply creates a connection to our AWS Kinesis stream and sends messages to it. The routine can be seen below.

Create a routine by the same name and add the following code....

 

package routines;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.*;

import java.nio.ByteBuffer;
import java.util.List;


public class AWSKinesisUtils {

        static BasicAWSCredentials credentials = null;
        static AmazonKinesis amazonKinesis = null;
        
        
        /**
         * setAWSCredentials: set the "credentials" static object
         * 
         * 
         * {talendTypes} String
         * 
         * {Category} User Defined
         * 
         * {param} string("JHGSAHJTR%^%TRYJSJ") accessKey: The AWS Access Key.
         * {param} string("JHGSAHJHGFHR$%£RTIUTTR%^%TRYJSJ") secretKey: The AWS Secret Key.
         * 
         * {example} setAWSCredentials("JHGSAHJTR%^%TRYJSJ","JHGSAHJHGFHR$%£RTIUTTR%^%TRYJSJ") 
         */
        public static void setAWSCredentials(String accessKey, String secretKey){
        	credentials = new BasicAWSCredentials(accessKey, secretKey);      	
        }
        
        
        /**
         * createAmazonKinesisConnection: creates and stores the "amazonKinesis" static object
         * 
         * 
         * {talendTypes} String
         * 
         * {Category} User Defined
         * 
         * {param} string("JHGSAHJTR%^%TRYJSJ") region: The AWS Region.
         * 
         * {example} createAmazonKinesisConnection("EU_WEST_2") 
         */
        public static void createAmazonKinesisConnection(String region){
        	
        	amazonKinesis = AmazonKinesisClientBuilder
                    .standard().withRegion(region)
                    .withCredentials(new AWSStaticCredentialsProvider(credentials))
                    .build();
        }
        
        /**
         * putMessage: adds message to Kinesis stream
         * 
         * 
         * {talendTypes} String
         * 
         * {Category} User Defined
         * 
         * {param} byte[]("#EA132EA") rawMessage: A message as a byte array
         * {param} string("twitter_stream") streamName: The AWS Stream Name.
         * 
         * {example} createAmazonKinesisConnection("EU_WEST_2") 
         */
        public static void putMessage(byte[] rawMessage, String streamName) {
        	
            
            PutRecordRequest putRecordRequest = new PutRecordRequest();
            putRecordRequest.setStreamName(streamName); 
            putRecordRequest.setPartitionKey("filler");
            putRecordRequest.withData(ByteBuffer.wrap(rawMessage));
            PutRecordResult putRecordResult = amazonKinesis.putRecord(putRecordRequest);

 
        }
    }

This routine requires the following Java libraries. Some of these can be found packaged with Talend Studio 7.1. Unfortunately not all are packaged. I will give sources for the ones that are not packaged. You may find alternative versions packaged with Talend, but they are not guaranteed to work. The Jars I am listing are guaranteed to work.

All of the Jars are listed below. The Jars that are not included are listed as links to where they can be obtained...

JarPresent in Talend v7.1
httpcore-4.4.9.jarYes
httpclient-4.5.5.jarYes
jackson-core-2.9.8.jarNo
jackson-databind-2.9.8.jarNo
jackson-annotations-2.9.8.jarNo
aws-java-sdk-core-1.11.333.jarNo
aws-java-sdk-kinesis-1.11.333.jarNo
aws-java-sdk-1.11.333.jarNo
jackson-dataformat-cbor-2.9.8.jarNo
joda-time-2.9.jarYes

 

Once this routine has been created and the Jars have been added to the routine, we are ready to go. Remember to also create this routine, if you have not done so already. 

 

Talend Route

As I mentioned earlier, this is a pretty simple route. The layout can be seen in the screenshot below.

 

Context Variables

Before discussing the components numbered above, I will briefly talk about the Context Variables that you will need for this. The code displayed in the next section will refer to several Context Variables which must be configured before they are used. The Context Variables used by this Route can be seen below.



You will notice that some of the values have been blanked out. This is because I do not want anyone to be "borrowing" my resources :-)

Each of the Context Variables I use are explained below.

NameDescription
ConsumerKeyThe Twitter Consumer Key created here
ConsumerSecretThe Twitter Consumer Secret created here 
AccessTokenThe Twitter Access Token created here 
AccessTokenSecretThe Twitter Access Token Secret created here 
LocationsA set of GPS coordinates to specify a bounding box location. -180, -90; 180, 90 specifies the whole world.
AWSAccessKeyIDThe AWS Access Key created here
AWSSecretAccessKeyThe AWS Secret Access Key created here
AWSRegionThe AWS region in which your stream is located. Check here
AWSKinesisStreamThe AWS Kinesis stream we set up here

 

Component Configuration

The configuration of each of the components used in the Route is detailed below.

  1. cConfig_1
    This component is used to set up an AWS Kinesis connection and to set the Avro schema to be used. You can simply copy the code below, but you may want to play around with this project to achieve a slightly different outcome, so I will explain what is taking place below.

    routines.AVROUtils.setSchema("{  \r\n   \"type\":\"record\",\r\n   \"name\":\"geo_data_object\",\r\n   \"namespace\":\"org.talend\",\r\n   \"fields\":[  \r\n      {  \r\n         \"name\":\"geo_bounding_box\",\r\n         \"type\":{  \r\n            \"type\":\"array\",\r\n            \"items\":{  \r\n               \"type\":\"record\",\r\n               \"name\":\"gps_coordinates\",\r\n               \"namespace\":\"\",\r\n               \"fields\":[  \r\n                  {  \r\n                     \"name\":\"latitude\",\r\n                     \"type\":[  \r\n                        \"null\",\r\n                        \"double\"\r\n                     ]\r\n                  },\r\n                  {  \r\n                     \"name\":\"longitude\",\r\n                     \"type\":[  \r\n                        \"null\",\r\n                        \"double\"\r\n                     ]\r\n                  }\r\n               ]\r\n            }\r\n         }\r\n      },\r\n      {  \r\n         \"name\":\"gps_coords\",\r\n         \"type\":[\"null\",\"gps_coordinates\"]\r\n      },\r\n      {  \r\n         \"name\":\"created_at\",\r\n         \"type\":[  \r\n            \"null\",\r\n            \"string\"\r\n         ]\r\n      },\r\n      {  \r\n         \"name\":\"text\",\r\n         \"type\":[  \r\n            \"null\",\r\n            \"string\"\r\n         ]\r\n      },\r\n      {  \r\n         \"name\":\"id\",\r\n         \"type\":[  \r\n            \"null\",\r\n            \"string\"\r\n         ]\r\n      },\r\n      {  \r\n         \"name\":\"type\",\r\n         \"type\":[  \r\n            \"null\",\r\n            \"string\"\r\n         ]\r\n      }\r\n   ]\r\n}");
    
    
    routines.AWSKinesisUtils.setAWSCredentials(context.AWSAccessKeyID, context.AWSSecretAccessKey);
    routines.AWSKinesisUtils.createAmazonKinesisConnection(context.AWSRegion);


    The first block of code is where the Avro schema required to serialize the JSON, is set. The schema that we will be using for this project is the schema that I described (and showed how to generate) here. The difference between the schema built in the linked blog and the code you can see above, is that the schema text has been "escaped" to be used with Java. A nice tool for doing that can be found here.

    The next two lines of code are used to configure the AWS credentials and to create an Amazon Kinesis connection. All of the code in this component makes use of the routines described above.

     

  2. cMessagingEndpoint_1
    This component is configured using both the "Basic settings" and "Advanced settings". The cMessagingEndpoint component allows us to use any of the Apache Camel Components. With this component, we are using the Twitter Apache Camel Component. The following screenshot shows the "Basic settings".


    Notice that the URI is actually a piece of Java code generating a String making use of Context Variables. The endpoint that is used can be copied from below.
    "twitter://streaming/filter?consumerKey="+context.ConsumerKey+"&consumerSecret="+context.ConsumerSecret+"&accessToken="+context.AccessToken+"&accessTokenSecret="+context.AccessTokenSecret+"&locations="+context.Locations
    
    The above endpoint will return a Twitter stream of messages which have location data within the boundary specified by the Locations context variable. For this project, I have set the boundary to be the whole world.

    The next screenshot shows the "Advanced settings" of this component.

    Here we simply click on the green plus button and select "twitter". This is used to add the appropriate library for the Camel Component we wish to use.


  3. cProcessor_1
    This component is used to dissect the content of each message from the cMessagingEndpoint. Each message will hold Twitter data in a Twitter4J object. This data is extracted and built into a JSON object which matches the Avro schema we used in the cConfig_1 component. Since the cProcessor component is a component used for Java, I will not take a screenshot of this. Instead I will simply show the Java that is used in the "Code" section.

    The code below should be commented well enough for you to figure out what is going on. However, I will summarise here. The first thing that is done is to retrieve the Twitter4J Status object from the Apache Camel Exchange object. The rest of the code is used to build a JSONObject which holds the data that we require to meet the Avro schema that we are working to. The fields that are retrieved from Twitter4J Status are the Place, GeoLocation, CreatedAt, Text and Id fields. These are all explained in the Twitter4J documentation linked above.

    After the JSONObject has been created, it is printed to the output window as a String (so that we can see that it is OK....this can be commented out later), it is then serialized using the AVROUtils routine, then it is sent to our AWS Kinesis stream using our AWSKinesisUtils routine.

    Code
    //Get access to the Twitter4j Status object from the Exchange
    twitter4j.Status tweet = exchange.getIn().getBody(twitter4j.Status.class);
    
    //Create new JSON object
    JSONObject json = new JSONObject();
    
    //Create the Geo Bounding Box JSON Array
    JSONArray jsonGeoBoundingBox = new JSONArray();
    
    if(tweet.getPlace()!=null){
    	if(tweet.getPlace().getBoundingBoxCoordinates().length==1){
    		
    		for(int i = 0; i<tweet.getPlace().getBoundingBoxCoordinates()[0].length; i++){
    			JSONObject point = new JSONObject();
    			JSONObject lat = new JSONObject();
    			lat.put("double", tweet.getPlace().getBoundingBoxCoordinates()[0][i].getLatitude());
    			JSONObject lon = new JSONObject();
    			lon.put("double", tweet.getPlace().getBoundingBoxCoordinates()[0][i].getLongitude());
    			point.put("latitude",lat);
    			point.put("longitude",lon);
    			jsonGeoBoundingBox.put(point);
    		}
    		
    	}
    }
    
    //Add the Geo Bounding Box to the JSON object
    json.put("geo_bounding_box",jsonGeoBoundingBox);
    
    //Create a JSON object to hold Coords 
    JSONObject jsonCoords = new JSONObject();
    
    //If the Tweet has a GeoLocation add to the gps_coords object
    if(tweet.getGeoLocation()!=null){
    	JSONObject lat2 = new JSONObject();
    	JSONObject lon2 = new JSONObject();
    	lat2.put("double", tweet.getGeoLocation().getLatitude());
    	jsonCoords.put("latitude", lat2);	
    	lon2.put("double", tweet.getGeoLocation().getLongitude());
    	jsonCoords.put("longitude", lon2);
    	JSONObject coordComplexType = new JSONObject();
    	coordComplexType.put("gps_coordinates",jsonCoords);
    	json.put("gps_coords",coordComplexType);
    }else{ //Add an empty gps_coords object
    	json.put("gps_coords",JSONObject.NULL);
    }
    
    //Add a created_at object
    JSONObject createdAt = new JSONObject();
    createdAt.put("string", tweet.getCreatedAt());
    json.put("created_at", createdAt);
    
    //Add a text object
    JSONObject text = new JSONObject();
    text.put("string", tweet.getText());
    json.put("text", text);
    
    //Add a type object
    JSONObject type = new JSONObject();
    type.put("string", "tweet");
    json.put("type", type);
    
    //Add an id object
    JSONObject id = new JSONObject();
    id.put("string", tweet.getId()+"");
    json.put("id", id);
    
    //Print the String JSON to the output window
    System.out.println(json.toString());
    
    //Serialize the JSON to an AVRO byte array
    byte[] rawMessage = routines.AVROUtils.jsonToAvroWithoutSchema(json.toString());
    
    //Send the AVRO byte array to Kinesis
    routines.AWSKinesisUtils.putMessage(rawMessage, context.AWSKinesisStream);
    

 

Once you have got to this point, we are ready to look at the next stage which is to write the Pipeline to consume the data. I will talk about this in my next blog. However, since you have a streaming source which can be easily consumed by Pipeline Designer, maybe you can give it a try to see what you can produce with this data.

If you have any questions related to this blog, please feel free to raise them below. I will check periodically to ensure that I answer as many questions as I can. Alternatively, you can raise your questions in the Pipeline Designer board on Talend Community.

Join The Conversation

0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *