The 2019 Gartner Magic Quadrant for Data Quality Tools : Talend named a leader

Creating Avro schemas for Pipeline Designer with Pipeline Designer

Creating Avro schemas for Pipeline Designer with Pipeline Designer

  • Richard Hall
    With more than 10 years of data integration consulting experience (many of which having been spent implementing Talend), Richard really knows his stuff. He’s provided solutions for companies on 6 of the 7 continents and has consulted across many different market verticals. Richard is a keen advocate of open source software, which is one of the reasons he first joined Talend in 2012. He is also a firm believer in engaging developers in “cool ways”, which is why he looks for opportunities to demonstrate Talend’s uses with technologies found around the home. Things like hooking his Sonos sound system to Twitter via Talend, getting Google Home to call Talend web services, and controlling his TV with Talend calling universal plug and play services, are a handful of examples. Prior to 2019, Richard had been running his own business providing Talend solutions. During that time he became a prominent contributor on Talend Community, providing both examples of how to solve business problems and also how to do some of the cool stuff mentioned above. In 2019 he was invited to return to Talend as the Technical Community Manager.

I have had the privilege of playing with and following the progress of Pipeline Designer for a while now. I am really excited about this new tool. If you haven’t seen it yet, then don’t delay and get your free trial now…..actually, maybe read this blog first ;-) 

Pipeline Designer is an incredibly intuitive, web-based, batch and stream processing integration tool. The real genius of this tool is that batch and stream processing “Pipelines” can be built in exactly the same way. This might not sound that significant, but consider the differences between processing a batch of data with a start and an end, and processing an on-going stream of data with a known start, but no known end. Without going into too much detail, just contemplate the differences you’d need to take into account when considering aggregates, for batch and stream data sources. But batch and stream processing is not the only cool thing about this product. There is also the schema-on-read and data preview functionality. I won’t go into too much detail reiterating what my colleague Stephanie Yanaga has discussed here, but the schema functionality is particularly special. In most cases, you do not need to understand how this works, but in a couple of cases, it is really useful to have an insight into this. This blog discusses those cases and focuses on how to work with the tool in the most efficient way.     

The schema functionality is largely provided by Apache AVRO. In several scenarios, this is completely hidden from users unless they choose to output their data in AVRO format. However, there are a couple of scenarios where you need to supply your data in AVRO format. Two examples of these use cases are pushing JSON data to Kafka and Kinesis. Unluckily for me (luckily for you?) I had to work around this having never heard of AVRO before. My use case was consuming Twitter data to display it on a geographical heatmap. This use case is one which I will elaborate on in a future blog. The problem I had here was that I could retrieve the Twitter data in JSON format really easily, but I didn’t know how to convert it into AVRO format. I then discovered a nice way of getting Pipeline Designer to do the majority of the work for you. I will explain….

Pipeline Designer supports several connection types (many of which I am sure you have come across) …..Amazon Kinesis, Amazon S3, Database, Elasticsearch, HDFS, Kafka, Salesforce and the Test Connection. It is the Test Connection which is really useful to us here. With this connection, we can simulate the consumption of JSON data and the AVRO schema is calculated for us. We can then create a Pipeline which will output this file to an Amazon S3 bucket or an HDFS filesystem, open the file and get a copy of the schema we need to use.

 

Building a Pipeline to generate your AVRO schema

The above is a massively simplified description of what you need to do. There are other steps involved, particularly if you want to use the schema to create an AVRO String in Java. This section of the blog will describe in detail how you can achieve the above for your JSON data.

  1. Open up Pipeline Designer and click on the “Connections” option on the menu on the left-hand side. Then click the “Add Connection” button at the top.

  2. You will be presented with a screen like the one shown in the simple example below. Add a “Name”, “Description”, select a “Remote Engine” and select “Test Connection” as the “Type”. Click “Validate” once completed.

  3. You now need to create a Dataset which uses the Test Connection you have just created. Click on the “Datasets” left menu option and then “Add Dataset”

  4. In the “Add a New Dataset” screen, fill out the “Name” you wish to use, the “Connection” type (the one we have just created), and select JSON for the “Format”.

  5. Below the “Dataset” option, you will see the “Values” box. This is where we will add our example JSON. I will go into a bit more detail about the JSON we need to use later on in this blog. Once this is done, click on “Validate”.

  6. In this example we will be using an Amazon S3 bucket as the target for the AVRO file. You can use HDFS is you prefer, but Amazon S3 is just easier and quicker for me to use. If you do not know how to set up an Amazon S3 bucket, take a look here.

  7. Once you have created your S3 bucket, we need to create a connection to it. Click on “Add Connection” as demonstrated in step 1.

  8. In the “Add a New Connection” screen, add a “Name” for your S3 Connection, select a “Remote Engine”, select the “Type” (Amazon S3) and fill out your access credentials (make sure you keep yours hidden as I have, otherwise these credentials could cost you a lot of money in the wrong hands). Check the connection by clicking on the “Check Connection” button. If all is OK, then click the “Validate” button.
  9. We now need to create our S3 bucket dataset. To do this, repeat step 3 and click on “Add Dataset”.

  10. In the “Add a New Dataset” screen, select the “Name” of your dataset, select the “Connection” type (the Connection we have just created), select the “Bucket” you have created in S3, decide upon an “Object” name (essentially a folder as far as we are concerned here) and select the “Format” to be “Avro”. Click on “Validate” once completed.


    We now have the source and target created. The next step is to create our Pipeline.

  11. Go to the left-hand side menu and select “Pipelines”. Then click on “Add Pipeline”.



  12. You will see the Pipeline Designer development window open up. This is where we can build a Pipeline. This one will be a very easy one. First, select our Pipeline name. This is done where the red box is in the screenshot. Next, select the Run Profile. This is done where the pink box is. Add your source by clicking on the “Add Source” option in the development screen and selecting the “Dummy JSON File” source we created earlier (where the green box is). This will reveal an “Add Destination” option on the development screen. Click on this and select your S3 bucket target (where the blue box is).


    At the bottom of the screen you will see the data preview area. This is where (orange box) you can look at your data while building your Pipeline.

    Once everything is built and looking similar to the above, click on the “Save” button (in the yellow box).

  13. Once the Pipeline is built, we are ready to run it. Click on the Run button (in the red box) and you should see stats starting to generate where the blue box is. This might take between 30 and 40 seconds to start up.



  14. Once the Pipeline has finished running, we are ready to take a look at our Avro file.

    Go to your Amazon account (or HDFS filesystem if you have tried this using HDFS) and find the file that was generated. You can see the file that I generated in my S3 bucket in the screen shot below.


    Download this file and open it.

  15. Once the file is opened, you will see something like the screenshot below



    We are interested in the text between “Objavro.schema<” and “avro.codec”. This is our Avro schema. We *may* need to go through some extra processes, but I will talk about these (and the details on the example JSON that I mentioned in step 5) in the following section.

 

JSON Data Format

An example of the JSON we will be working with here (and in my next blog on Pipeline Designer) can be seen below.

 

{  
   "geo_bounding_box":[  
      {  
         "latitude":{  
            "double":36.464085
         },
         "longitude":{  
            "double":139.74277
         }
      },
      {  
         "latitude":{  
            "double":36.464085
         },
         "longitude":{  
            "double":139.74277
         }
      },
      {  
         "latitude":{  
            "double":36.464085
         },
         "longitude":{  
            "double":139.74277
         }
      },
      {  
         "latitude":{  
            "double":36.464085
         },
         "longitude":{  
            "double":139.74277
         }
      }
   ],
   "gps_coords":{  
      "latitude":{  
         "double":36.464085
      },
      "longitude":{  
         "double":139.74277
      }
   },
   "created_at":{  
      "string":"Fri Apr 26 14:08:41 BST 2019"
   },
   "text":{  
      "string":"Hello World"
   },
   "id":{  
      "string":"1121763025612988417"
   },
   "type":{  
      "string":"tweet"
   }
}

This is a rather generic schema that is built to hold rough bounding box location data (essentially GPS coordinates forming a box covering an area), exact GPS coordinates, a timestamp, some text, a record Id and a type. The purpose of this is to allow several sources of real-time data with GPS information, to be processed by a Pipeline. The example above is using Twitter data.

The eagle eyed amongst you will notice that the JSON above does not match the JSON in the screenshot I presented in point 5. The reason for this is that Pipeline Designer uses Python to serialize and deserialize the JSON to and from AVRO format. Python does not need the “type” of data to precede the data when dealing with potentially nullable fields, Java does. I have written another blog explaining the differences between Java and Python AVRO serialization here. Since the objective of this project is to serialize the JSON using Java to be deserialized by Pipeline Designer, the schema I am using must accommodate both Java and Python. FYI Python can interpret the JSON above, but Pipeline Designer will generate an AVRO schema which is not suitable for Java. So, the JSON that is used in this example is changed to remove the type prefixes. Like so…..

{  
   "geo_bounding_box":[  
      {  
         "latitude":36.464085,
         "longitude":139.74277
      },
      {  
         "latitude":36.464085,
         "longitude":139.74277
      },
      {  
         "latitude":36.464085,
         "longitude":139.74277
      },
      {  
         "latitude":36.464085,
         "longitude":139.74277
      }
   ],
   "gps_coords":{  
      "latitude":36.464085,
      "longitude":139.74277
   },
   "created_at":"Fri Apr 26 14:08:41 BST 2019",
   "text":"Hello World",
   "id":"1121763025612988417",
   "type":"tweet"
   }
}
 

The above JSON will generate an AVRO schema somewhat similar to below….

{  
   "type":"record",
   "name":"outer_record_1952535649249628006",
   "namespace":"org.talend",
   "fields":[  
      {  
         "name":"geo_bounding_box",
         "type":{  
            "type":"array",
            "items":{  
               "type":"record",
               "name":"subrecord_1062623557832651415",
               "namespace":"",
               "fields":[  
                  {  
                     "name":"latitude",
                     "type":[  
                        "double",
                        "null"
                     ]
                  },
                  {  
                     "name":"longitude",
                     "type":[  
                        "double",
                        "null"
                     ]
                  }
               ]
            }
         }
      },
      {  
         "name":"gps_coords",
         "type":"subrecord_1062623557832651415"
      },
      {  
         "name":"created_at",
         "type":[  
            "string",
            "null"
         ]
      },
      {  
         "name":"text",
         "type":[  
            "string",
            "null"
         ]
      },
      {  
         "name":"id",
         "type":[  
            "string",
            "null"
         ]
      },
      {  
         "name":"type",
         "type":[  
            "string",
            "null"
         ]
      }
   ]
}
 

The autogenerated “record types” will almost certainly be named differently to the above. In the blog I previously wrote about this subject, I describe how the above schema may need to be adjusted to make it as bullet proof as possible.

First, lets look at the type named “subrecord_1062623557832651415”. This is used in two places; the “geo_bounding_box” and the “geo_coords” records. These are exactly the same. So, let’s give them a more meaningful name. I have chosen “gps_coordinates” and have changed this in both locations.

Since we have changed the above record type to give it a meaningful name, lets do the same with the name of the outer record as well. This is called “outer_record_1952535649249628006”, I’ll change this to “geo_data_object”.

These changes can be seen below (in red)….

{  
   "type":"record",
   "name":"geo_data_object",
   "namespace":"org.talend",
   "fields":[  
      {  
         "name":"geo_bounding_box",
         "type":{  
            "type":"array",
            "items":{  
               "type":"record",
               "name":"gps_coordinates",
               "namespace":"",
               "fields":[  
                  {  
                     "name":"latitude",
                     "type":[  
                        "double",
                        "null"
                     ]
                  },
                  {  
                     "name":"longitude",
                     "type":[  
                        "double",
                        "null"
                     ]
                  }
               ]
            }
         }
      },
      {  
         "name":"gps_coords",
         "type":"gps_coordinates"
      },
      {  
         "name":"created_at",
         "type":[  
            "string",
            "null"
         ]
      },
      {  
         "name":"text",
         "type":[  
            "string",
            "null"
         ]
      },
      {  
         "name":"id",
         "type":[  
            "string",
            "null"
         ]
      },
      {  
         "name":"type",
         "type":[  
            "string",
            "null"
         ]
      }
   ]
}
 

Once the above changes have been made, there are a couple more changes required to enable nullable fields when serializing and deserializing using Java and Python. You should notice that there are fields where the “type” has two options in a square bracket. These are known as “Unions”. These essentially tell AVRO what data to expect. Pipeline Designer has output these with the data type (string or double) preceding the “null”. We need to change these around to ensure that these fields can be nullable. These changes can be seen in red below.

The other change that is needed is as follows. The "gps_coords" key has a "type" of "gps_coordinates". This value could potentially be null. To enable this, we need to add a "Union" with "null" as a possible type. This can be seen in blue below. 

{  
   "type":"record",
   "name":"geo_data_object",
   "namespace":"org.talend",
   "fields":[  
      {  
         "name":"geo_bounding_box",
         "type":{  
            "type":"array",
            "items":{  
               "type":"record",
               "name":"gps_coordinates",
               "namespace":"",
               "fields":[  
                  {  
                     "name":"latitude",
                     "type":[  
                        "null",
                        "double"
                     ]
                  },
                  {  
                     "name":"longitude",
                     "type":[  
                        "null",
                        "double"
                     ]
                  }
               ]
            }
         }
      },
      {  
         "name":"gps_coords",
         "type":["null",
                 "gps_coordinates"
                ]
      },
      {  
         "name":"created_at",
         "type":[  
            "null",
            "string"
         ]
      },
      {  
         "name":"text",
         "type":[  
            "null",
            "string"
         ]
      },
      {  
         "name":"id",
         "type":[  
            "null",
            "string"
         ]
      },
      {  
         "name":"type",
         "type":[  
            "null",
            "string"
         ]
      }
   ]
}
 

The above change is VERY important, so don’t forget that one.

Once you have got to this point, you are ready to start building/collecting and serializing your JSON using Talend Studio.

The next blog I will write on this subject will demonstrate how to use the above schema to serialize Tweets with location data, send them to AWS Kinesis, consume and process the data using Pipeline Designer, before sending that data to Elastic Search so that a real-time heatmap of worldwide Twitter data can be generated.

Join The Conversation

0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *