The 2019 Gartner Magic Quadrant for Data Quality Tools : Talend named a leader

Generating a Heat Map with Twitter data using Pipeline Designer – Part 3

Generating a Heat Map with Twitter data using Pipeline Designer – Part 3

  • Richard Hall
    With more than 10 years of data integration consulting experience (many of which having been spent implementing Talend), Richard really knows his stuff. He’s provided solutions for companies on 6 of the 7 continents and has consulted across many different market verticals. Richard is a keen advocate of open source software, which is one of the reasons he first joined Talend in 2012. He is also a firm believer in engaging developers in “cool ways”, which is why he looks for opportunities to demonstrate Talend’s uses with technologies found around the home. Things like hooking his Sonos sound system to Twitter via Talend, getting Google Home to call Talend web services, and controlling his TV with Talend calling universal plug and play services, are a handful of examples.Prior to 2019, Richard had been running his own business providing Talend solutions. During that time he became a prominent contributor on Talend Community, providing both examples of how to solve business problems and also how to do some of the cool stuff mentioned above. In 2019 he was invited to return to Talend as the Technical Community Manager.

If you have got through part 1 and part 2 of this series of blogs, there are only a few more steps to carry out before you can see the end to end flow of data and create your Heatmap. If you have not read the first two blogs, the links to the blogs are above. Although these blogs have been quite lengthy, I hope you understand that I have tried to make sure that any level of experience can achieve this. Since Pipeline Designer is a new product, I felt that it made sense to be as explicit as possible.

In this last blog I will talk about putting your Pipeline together and running it. Once you have finished this, I am hoping you will be able to extrapolate from this to build a Pipeline to achieve your own use cases or to extend this use case for your purposes.

But now, it's time to build your Pipeline....

Building the Pipeline

By now we should have all of our Datasets created and be ready to use them. I will describe each Processor we are using as shown in the Pipeline screenshot that can be seen here. However, first we need to create our Pipeline.

  1. To create the Pipeline, select the "Pipelines" link in the left sidebar (in a red box in the image below), then click on the "Add Pipeline" button (in the blue box below).


  2. When the Pipeline screen opens, we need to give it a name (where the blue box in the screenshot below is), we need to select a "Remote Engine" (the drop down in the red box), and we need to add our "Source" (click on the "Add Source" + symbol surrounded by the green box). 



  3. You will see the next screen pop-up. This is where we can select our Dataset to use as a source. Select the required Dataset (I've selected "Twitter Data" as seen below) and click on  "Select Dataset" (surrounded by the red box).


    When the pop-up screen disappears, we will be presented with the layout below.


  4. If you went through the process of retrieving the sample data for our Dataset (described here), you will see the "Data Sample" shown in the green box below. This will be massively useful in our debugging while we are building this Pipeline. 




  5. Now it is time to add our first Processor. The first Processor we will add is a "Window" Processor. These are described here. Windows allow us to chunk our streaming data into mini batches. This allows us to build our Pipelines in the same way for both batch and stream Pipelines, they also allow us to control how the stream of data will be batched. Take a look at the link I gave you above to see more about the different ways in which you can use Windows.

    To add a "Window" Processor simply click on the green "+" symbol (surrounded by the orange box below) and an "Add a processor" pop-up box will appear. Select the "Window" Processor and the box will disappear, leaving your new Processor on your Pipeline.

  6. Once the "Window" Processor is on your Pipeline, select it and you will see the configuration side panel showing you the config options. I have chosen to leave the defaults shown in the blue and red boxes. They essentially create a batch of 5 seconds worth of data, every 5 seconds. This is known as a "Window train", the simplest type of batching.



  7. Our next Processor is the "Field Selector". We click on the green "+" symbol to the right of our "Window" Processor (surrounded by the red box below), then select the "Field Selector" option. This will then appear in our Pipeline.


  8. The "Field Selector" Processor is used to filter, reorganise and rename fields. This component is described here. We will not be removing any fields, we will simply be changing the order and renaming a couple. As seen in the screenshot below, the first one we are going to deal with is the "id" field. We will keep its name of "id" (in the blue box), but it will be selected first, therefore changing its position in the document. Its path is ".id" which is the avpath of the field.



    After each field has been configured, click the "New Element" button to create a new field. Once all fields are created, click the "Save" button. The following fields are needed to be configured....

    Field NamePath
    id.id
    gps_coords.gps_coords
    bounding_box.geo_bounding_box
    created_date.created_at
    text.text
    type.type

    Once all of the fields have been configured and the Processor has been saved, your Pipeline's Data Preview will look like below. 


    Notice that the input and output are different. This is an example of the usefulness of having the Data Preview while you are configuring your Pipeline.

  9. Our next Processor is the "Filter" Processor. This is used for filtering data into two sets. It is described here. As with other Processors, we click on the green "+" symbol to the right of the last Processor we added, then select the Processor required from the pop-up. In this case, we are selecting the "Filter" Processor.



  10. This "Filter" is going to be used to ensure that all data being sent to our Elasticsearch instance, has GPS data. As soon as this Processor is dropped into the Pipeline, it will have one filter element waiting to be configured. We will need to have two filter elements. To add a second, we click on the "New Element" button.


    As you can see, there are 4 properties to configure per element. The table below lists the settings I am using for both fields being used to filter.

    Field PathApply a function firstOperatorValue
    .gps_coordsNONE!=NONE
    .bounding_box[0]NONE!=NONE

    All of the "Field Path" values follow the avpath specification.




    Once both of our filter elements are configured, we need to set the "Select rows that match" to "ANY". This means that as long as one of the filter elements is true, the data can proceed. Once that is done, select "Save".

  11. You will notice that there are two outputs for the "Filter" Processor. There is an output for the data filtered in (the data we want) with a blue line, and an output for the data filtered out (the data we do not want) which has a yellow line. We are adding our "S3 Bucket" Dataset as the Destination for our rejects here.  The data we will be gathering for this Pipeline should always have GPS data, so we shouldn't see much, if anything going to S3. However, we may wish to add some more filter elements in the future to filter out Tweets based on other logic. So this is useful to have in place.

    Click on the "+" symbol above "Add Destination" which is in the box linked to the yellow output from the "Filter" Processor (surrounded by the blue box below). This will reveal a pop-up showing available Datasets. We need to select the "S3 Bucket" Dataset, then click on "Select Dataset".



  12. When we configure this Dataset in the configuration sidebar on the right, we just need to make one change to the defaults. We need to change the "Merge Output" slider to be set to on, as seen below. This will mean that the data will be appended for every run of the Pipeline.



  13. The next Processor to be added is the "Python" one. This allows us to use the power of Python to carry out some changes to our data before it is sent to Elasticsearch. 

    To do this, click on the green "+" symbol along the blue "Filter" output line (surrounded by the blue box) and select the "Python" Processor.



  14. When we configure this Processor in the configuration sidebar on the right, we need to carry out two tasks. The first is simply to ensure the "Map type" is set to "Map". The second task is to set the "Python Code". I have included the code to use after the image below.



    The following code needs to be copied and pasted into the "Python Code" section of the  configuration window.

    # Here you can define your custom MAP transformations on the input
    # The input record is available as the "input" variable
    # The output record is available as the "output" variable
    # The record columns are available as defined in your input/output schema
    # The return statement is added automatically to the generated code,
    # so there's no need to add it here
    
    # Code Sample :
    
    # output['col1'] = input['col1'] + 1234
    # output['col2'] = "The " + input['col2'] + ":"
    # output['col3'] = CustomTransformation(input['col3'])
    
    output = json.loads("{}")
    output['latitude']=None 
    output['longitude']=None
    output['location']=None
    output['id']=input['id']
    output['created_date']=input['created_date']
    output['text']=input['text']
    output['type']=input['type']
    
    if input['gps_coords']==None:
        output['latitude']=(input['bounding_box'][0]['latitude']+input['bounding_box'][1]['latitude']+input['bounding_box'][2]['latitude']+input['bounding_box'][3]['latitude'])/4
        output['longitude']=(input['bounding_box'][0]['longitude']+input['bounding_box'][1]['longitude']+input['bounding_box'][2]['longitude']+input['bounding_box'][3]['longitude'])/4
        output['location']=str(output['latitude'])+','+str(output['longitude'])    	
    else:        
        output['latitude']=input['gps_coords']['latitude'] 
        output['longitude']=input['gps_coords']['longitude']
        output['location']=str(output['latitude'])+','+str(output['longitude'])    	
    
    
    The code above creates a new field called "location" and also sets the "latitude" and "longitude" values. If a bounding box has supplied our only GPS data, we need to get an average "latitude" and "longitude". The "location" is a comma separated String value which includes "latitude" and "longitude". The other fields are output along with these computed values.

    Once this is done, simply click the "Save" button.

  15. Once the "Python" Processor has been saved, you will see how it has changed the input record (surrounded by the blue box) to the output record (surrounded by the red box).



  16. The last step we need to carry out is to add our Destination. To do this, click on the "+" symbol above "Add Destination" (surrounded by the red box), then select your "ElasticSearch Tweets" Dataset. Click "Select Dataset".


    Your Pipeline is now created and ready to run.

 

Running the Pipeline and Generating the Heatmap

The last part to this, and the part you have been waiting for, is to join all of the dots and run this project. So far we have created a Talend Route to collect the Twitter data and send it to an AWS Kinesis Stream, built a Pipeline to consume that data and send it Elasticsearch, created the Elasticsearch service and configured the required Elasticsearch index and mapping using Kibana. Before we can see the Heatmap, we need to switch on all of the components, get some data into Elasticsearch and then create our Heatmap. This section will talk you through each of those steps.

  1. The first thing we need to do is make sure that our AWS Kinesis Stream and Elasticsearch service are running. When I described the building of each of these, we got them to a point where they were all live. Check to see that these are live.
  2. Once they are live, we need to go back to our Talend Route. This was configured to use Twitter credentials. So long as those credentials are still valid, you should be able to simply turn the Route on.

    Open the Route and click the "Run" button (surrounded by the blue box). In the screenshot below the Route is running. You will notice the "Kill" button (surrounded by the red box). You can use that to stop it. However, don't stop it before you have managed to get some data into Elasticsearch.




  3. We now need to check that our Kinesis Stream is populating. To do this, log into AWS and go to the Kinesis Stream you created earlier. You should see the screen below.



    We need to switch to the "Monitoring" tab to see what is happening. Click on the "Monitoring" tab (surrounded by a red box above).

  4. On the "Monitoring" tab, scroll down until you see the "Put Record (Bytes) -Sum" chart. That is the chart surrounded by the red box below. If you can see a line like in the chart below, which starts at about the time you started your Talend Route, you are good to go.

  5. We are now ready to switch on our Pipeline. Load your Pipeline and start it by clicking on the start button which is surrounded by the green box below.



  6. The Pipeline might take a minute or two to start up. Once it does you will some stats generating in the right hand "Pipeline Details" panel (surrounded by the green box). As soon as you see some data here, your Pipeline is working and sending data to Elasticsearch. When you want to switch off the Pipeline, click on the stop button (surrounded by the blue box). Like the Talend Route, you might not want to do this yet. The more data you have, the better your Heatmap will look. I waited until I had about 100,000 records.



  7. We now need to load Kibana. This can be done by loading the Kibana URL shown here.  Load that URL and you will see the following screen. If not, click on the "Discover" link and if you've not used Elasticsearch for anything before, it will show you this screen.

    EDIT: I realised while proof reading (and after I had finished with the screenshots) that you may need to click on the "Management" option to get this screen. 



    In the "Index pattern" box (surrounded by the red box above), add "heatmap_index". Then click on the "Next step" button (surrounded by the blue box above). "heatmap_index" is the name of index we created in the previous blog.

  8. On the next screen, simply press the "Create index pattern" button (surrounded by the green box below).



  9. The next screen shows the "Index Pattern" created. You do not need to do anything here.

  10. If you click on the "Discover" link (surrounded by the green box in the image below), you will see the data that has been sent to Elasticsearch with the index "heatmap_index".



  11. Click on the "Visualize" link (surrounded by the green box in the image below) and you will see the "Visualize" screen. Click on the "Create a visualization" button.



  12. The next screen gives you a choice of visualization types. Although we want a Heatmap, we do not want the "Heat Map" option. Click on the "Coordinate Map" option.



  13. On the next screen, simply click on "heatmap_index" (surrounded by the red box below).



  14. You will be presented with the following screen. Click on the "Value" selector (surrounded by the red box below).



  15. Ensure that "Count" is selected for the "Aggregation" (surrounded by the blue box below) and click on "Geo Coordinates" (surrounded by the red box below).



  16. Once the "Geo Coordinate" section is expanded, select "Geohash" (surrounded by the blue box below) and select "location" for the field (surrounded by the red box below).



  17. The final step before we can see the Heatmap is to set the "Options". Click on the "Options" tab (surrounded by the red box below), select a "Map type" of "Heatmap" (surrounded by the blue box), set a "Cluster size" using the slider (anything will do, you can tweak this later) and set the "Layers" to "road_map" (surrounded by the orange box).



    Once the above has been set, you are ready to start the Heatmap.

  18. Below is an example of the sort of result you might get once you start the Heatmap. To start it, click on the "Start" button (surrounded by the red box). You will see the map produced with the data that Elasticsearch has up to the point at which you started it. If you want it to refresh periodically, you can set this using the controls surrounded by the purple box. You can also zoom in and out of the map using the controls surrounded by the green box. If you want to tweak the look of the map, simply have a play around with the "Options" panel we edited in step 17.

 

Hopefully you have found these blogs useful and they have revealed a few possibilities for how you can use Pipeline Designer. If you have any questions about the blogs, please feel free to add comments. If you have any questions about Pipeline Designer, log on to the Talend Community and go to the Pipeline Designer board. This board is monitored by several Talend employees who have been working with Pipeline Designer, it is also viewed by fellow Community members who may be able to help you out.

 

Join The Conversation

0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *