Generating a Heat Map with Twitter data using Pipeline Designer – Part 2

Generating a Heat Map with Twitter data using Pipeline Designer – Part 2

  • Richard Hall
    With more than 10 years of data integration consulting experience (many of which having been spent implementing Talend), Richard really knows his stuff. He’s provided solutions for companies on 6 of the 7 continents and has consulted across many different market verticals. Richard is a keen advocate of open source software, which is one of the reasons he first joined Talend in 2012. He is also a firm believer in engaging developers in “cool ways”, which is why he looks for opportunities to demonstrate Talend’s uses with technologies found around the home. Things like hooking his Sonos sound system to Twitter via Talend, getting Google Home to call Talend web services, and controlling his TV with Talend calling universal plug and play services, are a handful of examples.Prior to 2019, Richard had been running his own business providing Talend solutions. During that time he became a prominent contributor on Talend Community, providing both examples of how to solve business problems and also how to do some of the cool stuff mentioned above. In 2019 he was invited to return to Talend as the Technical Community Manager.

First of all, sorry about the delay in getting the second part of this blog written up. Things are moving very fast at Talend at the moment, which is great, but it's led to me having to delay in getting this second part out. I've been spending quite a bit of time working with the Pipeline Designer Summer 2019 release. Some really exciting additions are coming to Pipeline Designer, but we will look into those at a later date. For now, we will start looking into configuring our AWS data targets and setting up the beginning of our Pipeline. For those of you who may be reading this without having seen part 1 of this blog series, take a look here.

So, in the last blog we got to the point of having an AWS Kinesis Stream being fed by a Talend Route consuming data from Twitter. AWS Kinesis will be our Pipeline's source of data. Before we can configure our Pipeline we need to configure our targets for the data.

Configuring AWS Data Targets

There will be two AWS targets for our Pipeline. One of them will be an Elasticsearch Service, so that we can analyse our collected data and create a heat map there. The other target will be an AWS S3 Bucket. This will be used to collect any rejected data from our Pipeline.  

Configuring an AWS S3 Bucket

First of all I want to point out that it is not expected that this will be used very much (given how this project is configured), but will be useful if we want to extend the functionality to include some extra data filtering. In the Pipeline we have a filter component which filters out messages without suitable GPS data. Since our data is being acquired based on GPS parameters, nothing should end up being filtered out. However, we need to have a target for data that is filtered out. As such, we are using an Amazon S3 Bucket. 

Creating an Amazon S3 Bucket is possibly the easiest thing to set up within AWS. As such, I will simply point to the AWS documentation and give you the details I am using for my bucket so that you can follow the section where I configure the Pipeline to use this bucket. The AWS documentation for this is here.

The S3 Bucket that I created is called "rh-tweets".

 

Configuring an AWS Elasticsearch Service

The first thing I want to point out is that you do not need to use Elasticsearch with AWS. You can use your own server or anyone else's server (if you have permission). I am simply using the service provided by AWS because it is quick and easy for me to set up from scratch. There are cost implications for using AWS services, and for these blogs I am choosing to use the cheapest configurations I can within AWS to achieve my goal. I am pointing this out as these configurations will not be the most efficient and if you want to build something upon what you have learnt in these tutorials, you will likely have to put a bit more thought into the amount of data you will be processing and the performance you will require.

As with the previous blog, I will point to third party documentation when it comes to configuring tools other than Talend. However, for this you will need to follow some of the steps I am taking in order for your Elasticsearch Service to work with everything else in this tutorial. So if you are going to follow the AWS documentation, please pay attention to the steps below. Once you are happy you understand everything, feel free to play around and extend this if required.

 

  1. Log into your AWS console and go to the Services page. Click on "Elasticsearch Service" which is surrounded by a red box below.



  2. On the next screen click on the "Create a new domain" button. This is surrounded by the red box below.
  3. On the next screen select the "Development and testing" radio button for "Deployment type". This is surrounded by the green box. We are doing this to make this service as open as possible for our development and learning purposes. If you were building something for a production system, you would clearly not use this. 

    For the "Elasticsearch version" we are using "6.3". At present, this is the highest version that is supported by Pipeline Designer. This is surrounded by the red box.

    Once these have been selected, click the "Next" button, which is surrounded by the blue box.
  4. Now we need to configure our domain. I have chosen "rhall" for my "Elasticsearch domain name" (surrounded by the green box), but you can put whatever you like here.

    For the "Instance type" (surrounded by the blue box) I chose "t2.small.elasticsearch". This should be OK for our purposes here, but you may wish to go a bit bigger in a production environment.

    For the "Number of instances" (surrounded by the red box) I selected 1. Again, you may wish to play with this for your versions.

    Everything else on this page I have left as standard. It is entirely up to you if you want to tweak these settings, but you will not need for this tutorial.


    As mentioned above, none of the settings below have been modified from defaults. All you need to do here is to click on "Next" (the bottom of the page).


  5. When setting up access to our server, we are really cutting a few security corners here. This IS NOT a configuration I would recommend at all, but it is useful for the purposes of this demonstration. This enables us to play around and develop this environment quickly without having to deal with complicated security settings. This is NOT recommended for anything but a sandpit environment. Please keep in mind that third parties that get access to your server details can flood it with data given this configuration.

    OK, now the warning message has been consumed, we can go back to the configuration. I have set the "Network Configuration" to "Public access" (surrounded by the red box).


    Next you need to click on the "Select a template" and select "Allow open access to the domain" option (Surrounded by the green box). 



    You will be asked to tick a warning box which will pop-up, pretty much warning you of what I stated above. Do this and click "OK".


    Once you have clicked the security warning above, you should be left with a screen like below. Click "Next".


  6. The final configuration screen simply lists your choices. You do not need to make any changes here. Simply click on "Confirm"
  7. We should now be left with the Elasticsearch Service dashboard, showing instances that are up and running or loading. Your instance will stay with a "Domain status" of "Loading" for approximately 10 mins. Once everything is loaded, the screen will look similar to the one below.

    In the screenshot below you will notice two boxes; a red one and a blue one. These are surrounding two important URLs. The red box surrounds the Elasticsearch Endpoint that we will need for the Pipeline Designer Elasticsearch Connection. You will need to use this later. The blue box surrounds the Kibana URL. We will be using Kibana in the next step where we configure Elasticsearch for the data mapping we will need for our data. Once that is done, we are ready to move to Pipeline Designer.
  8. Copy the Kibana URL (surrounded by the blue box above) and load the page in a web browser. You will see the screen below.

    Click on the "Dev Tools" towards the bottom in the left sidebar.

  9. The "Dev Tools" screen looks like below. We need to add a mapping here. Essentially what this screen does is make it easy to send API messages to Elasticsearch. You can do everything here using a third party REST API tool, but it is just easier to do it here.



    You will notice the following code in the screenshot above....
    PUT heatmap_index
    {  
       "settings":{  
          "number_of_shards":1
       },
       "mappings":{  
          "sm_loc":{  
             "properties":{  
                "latitude":{  
                   "type":"double"
                },
                "longitude":{  
                   "type":"double"
                },
                "location":{  
                   "type":"geo_point"
                },
                "id":{  
                   "type":"text"
                },
                "created_date":{  
                   "type":"text"
                },
                "text":{  
                   "type":"text"
                },
                "type":{  
                   "type":"text"
                }
             }
          }
       }
    }
    
    Essentially what this does is identify the elements being supplied by the Pipeline we will build next, and assign a data type. Most of these would normally be identified automatically by Elasticsearch, however we have a special data type that we need to ensure is typed correctly. This is "location" which needs to be identified as a "geo_point".

    By loading the above code and clicking on the green triangle next to "PUT heatmap_index", we are creating an index called "heatmap_index" with mapping information for the data we will be sending ("sm_loc"). You can see more about this here

    Once we have got to this point, our Elasticsearch Service is ready to start consuming data.

 

Creating a Pipeline to process our Data in AWS Kinesis

We have finally got to the point where we can start playing with Pipeline Designer. However, first we may need to install a Pipeline Designer Remote Engine.

Installing a Pipeline Designer Remote Engine

Depending on how you are using Pipeline Designer, there will be different steps you need to take here. If you are using the Free Trial then you shouldn't need to worry about installing or configuring a Pipeline Designer Remote Engine and can move on to the next section. You may have been using Pipeline Designer for a while and have already installed a Remote Engine. If so, you can move on as well. But should you need one, some of my colleagues have put together some step-by-step videos on how to do this. You have the following choices of installation...

Remote Engine for Pipeline Designer - The most basic local install

Remote Engine for Pipeline Designer Setup in AWS - An AWS install

Remote Engine for Pipeline Designer with AWS EMR Run Profile - An AWS EMR install

For my purposes here, I have chosen the basic install. However this project will work using any of the above.

The Pipeline

OK, we are ready to start building. First of all, I will share a screenshot of the Pipeline I have built. For the remainder of this blog and the first part of the final blog (yes, there is one more after this one) I will describe how this is built.

As can be seen, this Pipeline is made up of 7 components. There are 3 Datasets ("Twitter Data", "S3 Bucket" and "ElasticSearch Tweets") which need to be created after having created suitable Connections for them, then there are the 4 processors ("Window", "FieldSelector", "Filter" and "Python").

I will start by detailing how each of the Datasets and Connections are constructed, then I will demonstrate how each of the components are added to the Pipeline.

Creating the new Connections and Datasets

We have 3 new Connections and Datasets to set up in Pipeline Designer. We need a Connection and Dataset for AWS Kinesis, a Connection for AWS S3 and a Connection for Elasticsearch. These are all pretty straight forward, but will refer to some values we created while building the collateral in both this blog and the previous blog. You will need to be aware of this and remember where you can find your values, since you will not necessarily be able to simply copy the values I am using (particularly if you have modified any names).

In order to reduce the amount of repeated information, I will cover the initial stages for creating all Connections here. It is the same for every type of Connection, so there is no need to demonstrate this 3 times.

 

  1. Inside Pipeline Designer, click on the "Connections" link in the left sidebar (surrounded by the red box), then press the "Add Connection" button (surrounded by the blue box).


  2. This will reveal the standard Connection form as seen below.


    Depending upon what you select, this form will change. Every Connection will need a "Name" (Red box), a "Remote Engine" (Green box) and a "Type" (Orange box). The "Description" (Blue box) can be useful, but is not essential. Once you have selected a "Type", further fields will appear to help you configure the Connection correctly for that "Type".

    Notice the "Add Dataset" button at the bottom of the screen. Once the Connections have been created, we will re-open each Connection and create a Dataset using this button. It saves on a tiny bit of configuration assigning the Dataset to a Connection.


Amazon S3 Bucket Connection and Dataset

For the S3 Bucket Connection we will need our AWS Access Key and our AWS Secret Access Key. If you are unsure of how to find these, take a quick look here.

  1. Fill in the "Name", "Remote Engine" and "Type" as below (you don't have to call yours the same as mine and your Remote Engine will be from a list available to you), and you will see a "Specify Credentials" slider appear. Click on it to switch it on.


  2. You now need to add your "Access Key" and "Secret Key" (the red and blue boxes). Once you have done this, you can test them by clicking on the "Check Connection" button. 


    If your details are valid, you will see a popup notification like below.

    If everything is OK, click on the "Validate" button to complete configuration.

  3. We now need to create the Dataset. Reopen the Connection we have just created and click on the "Add Dataset" button. You will see a screen which looks like below. This image has all of the settings already applied. I will go through this underneath the image.



    You will see that the "Connection" has already been set because you initiated this from the Connection that is used. You simply need to set the "Name", the "Bucket", the "Object" and the "Format". If you recall, I set my "Bucket" to be called "rh-tweets". Set this to whatever you called yours. The "Object" refers to the name of data. Think of this like a folder name. The "Format" should be set to "Avro". This will correspond to the Avro schema we talked about in this blog

    Once everything has been set, simply click on "Validate" and the Dataset is ready to be used.

Amazon Kinesis Connection and Dataset

As with the S3 Connection, for the Amazon Kinesis Connection we will also need our AWS Access Key and our AWS Secret Access Key. 

  1. Fill in the "Name", "Remote Engine" and "Type" as below, and you will see a "Specify Credentials" slider appear. Click on it to switch it on.



  2. You now need to add your "Access Key" and "Secret Key" (the red and blue boxes). Once you have done this, you can test them by clicking on the "Check Connection" button. 


    If everything is set up OK and your test works, click on the "Validate" button.

  3. We now need to create the Dataset. As before, reopen the Connection we have just created and click on the "Add Dataset" button. You will see a screen which looks like below. 



    Again the "Connection" has already been set because you initiated this from the Connection that is used. You simply need to set the "Name", the "AWS Region", the "Stream", the "Value Format" and the "Avro Schema".

    The region I created the Kinesis Stream in is London. AWS use codes which do not necessarily correspond to what you can set in the console. To make this easier for you, here is a link to the codes that AWS use.

    The "Stream" I created in the previous blog was called "twitter" and the "Value Format" is Avro. The "Avro Schema" I am using is the schema used in this blog. To make it easier, I will include it below...

    {  
       "type":"record",
       "name":"outer_record_1952535649249628006",
       "namespace":"org.talend",
       "fields":[  
          {  
             "name":"geo_bounding_box",
             "type":{  
                "type":"array",
                "items":{  
                   "type":"record",
                   "name":"gps_coordinates",
    	       "namespace":"",
                   "fields":[  
                      {  
                         "name":"latitude",
                         "type":[
                         	"null",  
                            "double"
                         ]
                      },
                      {  
                         "name":"longitude",
                         "type":[  
                            "null",  
                            "double"
                         ]
                      }
                   ]
                }
             }
          },
          {  
             "name":"gps_coords",
             "type":["null","gps_coordinates"]
          },
          {  
             "name":"created_at",
             "type":[ 
             	"null", 
                "string"
             ]
          },
          {  
             "name":"text",
             "type":[  
                "null", 
                "string"
             ]
          },
          {  
             "name":"id",
             "type":[  
                "null", 
                "string"
             ]
          },
          {  
             "name":"type",
             "type":[  
                "null", 
                "string"
             ]
          }
       ]
    }
    
  4. There is one more very important step for this Dataset that will help us when building our Pipeline. It is REALLY useful for Datasets used as sources (as this one will be) to have a sample of the data that they will be delivering. This is really useful when building a Pipeline, as you can see as you build it exactly what will happen to the data. In order to get a sample of our data, there are a couple of things we need to do. The first thing is to ensure that our Kinesis Stream is live. The configuration of the Kinesis Stream can be seen here.

  5. Once the Kinesis Stream is live, we need to send Twitter data to it using the Talend Route we created in this blog. Simply load the Route and start it running.

  6. Once a few records have been loaded into our Kinesis Stream, we can start retrieving the sample for our Dataset. Assuming that we are still looking at our Dataset after having copied the Avro schema into it, click on the "View Sample" button (this is where the "Refresh Sample" button, surrounded by the red box below, is located).


    Once the sample has been obtained (this might take a minute or so), you will see the data at the bottom of the screen. This can be seen inside the blue box above. You can explore this data and refresh it with new data if you choose.

    Once you are happy, simply click on the "Validate" button and the Dataset is ready to be used.

 

Elasticsearch Connection and Dataset

For the Elasticsearch Connection we are not concerned with AWS credentials since we have left this open as explained here. So this Connection is pretty easy to configure.

 

  1. Simply fill out the details required for the other Connections and use the Endpoint given to you here for the "Nodes*" config (in Orange). Click the "Check Connection" button and if everything is OK, click the "Validate" button.


  2. Finally we need to create the Dataset. Reopen the Connection as before and click on the "Add Dataset" button. You will see a screen which looks like below. 


    Simply add a "Name", the "Index" and the "Type" here. The "Index" and "Type" are briefly spoken about here. Ensure that if you have done something different when configuring your index and mapping for Elasticsearch, that you put the appropriate values here.


We are now ready to start putting our Pipeline together.

In the final part of this blog series, I will describe how this Pipeline is built, how to run it and how we can create a Heatmap to display the data that has been processed.

Join The Conversation

0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *