The 2019 Gartner Magic Quadrant for Data Quality Tools : Talend named a leader

Talend Pipeline Designer – Avro schema considerations when working with Java and Python

Talend Pipeline Designer – Avro schema considerations when working with Java and Python

  • Richard Hall
    With more than 10 years of data integration consulting experience (many of which having been spent implementing Talend), Richard really knows his stuff. He’s provided solutions for companies on 6 of the 7 continents and has consulted across many different market verticals. Richard is a keen advocate of open source software, which is one of the reasons he first joined Talend in 2012. He is also a firm believer in engaging developers in “cool ways”, which is why he looks for opportunities to demonstrate Talend’s uses with technologies found around the home. Things like hooking his Sonos sound system to Twitter via Talend, getting Google Home to call Talend web services, and controlling his TV with Talend calling universal plug and play services, are a handful of examples. Prior to 2019, Richard had been running his own business providing Talend solutions. During that time he became a prominent contributor on Talend Community, providing both examples of how to solve business problems and also how to do some of the cool stuff mentioned above. In 2019 he was invited to return to Talend as the Technical Community Manager.

When working with Talend Pipeline Designer, you may come across situations where you need to write an Avro schema. Go to the Apache AVRO site to read more about this serialization technology. This blog has come about thanks to some experiences I’ve had when working with Avro schemas and getting them to work when serializing using Python and Java. Spoiler alert!....they don’t necessarily work the same way when used with Python and Java.

Since Pipeline Designer processes Avro using Python we need to ensure that our Avro schemas work with Python. However, I felt it important to demonstrate Pipeline Designer being used with Talend ESB….which is a Java tool. It was here I spotted the differences in behavior. Rather than let you guys struggle through the same confusions I’ve had, I figured that this blog would be a nice prequel to the blog series I planned to write when I stumbled upon this. This blog will include examples of the issues I have found as well as useful code to serialize JSON to Avro using Python and Java.

The differences between serializing JSON to Avro when using Java and Python

Let’s say you wish to build an Avro schema to serialize the following JSON using Java, but to supply it to Talend Pipeline Designer for processing….

{ "name": "Richard", "age": 40, "siblings": 2 }

It’s a very basic example of a person record, with “name”, “age” and the number of “siblings” the person has. A very basic Avro schema for the above can be seen below….

{
  "type" : "record",
  "name" : "Demo",
  "namespace" : "com.talend.richard.demo",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "age",
    "type" : "int"
  }, {
    "name" : "siblings",
    "type" : "int"
  } ]
}

This schema identifies the “name” field as expecting a String, the “age” field as expecting an int and the “siblings” field as expecting an int. This Avro schema, used to serialize the JSON above that, will work exactly the same way in both Java and Python. Awesome…..so what is the problem? The problem is that sometimes your JSON may not hold every value you initially specify it to hold. If your JSON MUST hold every value, then it is right for it to fail. But this will seldom be the case.

 

What if your JSON is missing some values, like this…

{ "name": "Richard", "age": null, "siblings": null }

Notice that both “age” and “siblings” are set to “null”. The Avro schema that we have just specified will not handle this for either Java or Python. An easy fix for Python would be to add “Unions” to the fields that can be null. A Union is essentially a type option. We can specify multiple types in Unions. The important thing to remember is that "null" is always listed first in the Union.  

 

So, the Avro schema might look like below for a JSON String where only “name” is essential, but all other fields are optional …

{
  "type" : "record",
  "name" : "Demo",
  "namespace" : "com.talend.richard.demo",
  "fields" : [ {
    "name" : "name",
    "type" : "string"
  }, {
    "name" : "age",
    "type" : [ "null", "int"] 
  }, {
    "name" : "siblings",
    "type" : [ "null", "int" ]
  } ]
}

 

The changes are represented in red. This is important. As I said, this will work when being processed using Python. However, to get this to work with Java, you need to slightly alter the JSON to give each of the optional fields a type. Examples that would work with Java can be seen below…

 

{ "name": "Richard", "age": {"int":40}, "siblings":{"int": 3} }

 

…if values are supplied, or this…

 

{ "name": "Richard", "age": {"int":40}, "siblings": null }

 

…if “name” and “age” have values, but “siblings” is null.

 

However, if we take the examples of JSON above and try to process them using Python, we will get an error. Python will see the “types” specified (“int”, “string”, etc) as objects in their right. Since they are not represented in the Avro schema, this will cause problems. Java understands that these are indicating a “type” for the value following.

Now, if we are using Python to serialize our JSON, we can also completely omit optional fields. Given the last Avro schema we looked at, all of the following JSON Strings would be acceptable when serializing using Python…

{ "name": "Richard", "age": 40, "siblings": 3 }
{ "name": "Richard", "age": 40, "siblings": null }
{ "name": "Richard", "age": 40 }
{ "name": "Richard"}

 

This, unfortunately, is not the case for Java. Given the examples I have shown above (factoring in the “type” requirements for Java), only …

 

{ "name": "Richard", "age": {"int":40}, "siblings":{"int": 3} }

 

…and…

 

{ "name": "Richard", "age": {"int":40}, "siblings": null }

 

…will work when serializing using Java.

 

Let’s distill the above into a set of rules.

  1. If ALL objects in a JSON String will always be supplied, a basic Avro schema with no Unions will work equally well for both Python and Java.
  2. If there will be optional values (but the keys will still be supplied), the Avro schema will need to include Unions for the optional values. No further changes are needed for the JSON when being serialized by Python. When being serialized by Java, a “type” is required before the value, but ONLY for those objects where Unions are used. The changes required for Java will cause Python to fail when serializing.
  3. If there are optional objects (no “key” or “value”), this will ONLY work with Python.

 

So, what does this mean for working with Pipeline Designer? Essentially, if you are serializing your JSON using Python, once you have it working in Python, it will almost certainly be readable by Pipeline Designer. Python serialization for Pipeline Designer is always going to be relatively straight forward. However, many Talend developers will want to serialize their JSON using Java. What do we need to consider?

 

  1. All object keys in the JSON MUST be supplied regardless of whether there is data.
  2. Where Unions are used in the Avro schema, we must supply a “type” before the value in the JSON (if there is a value, otherwise “null” will suffice).
  3. When listing types for a Union, “null” should always come first.
  4. Even though the Avro data will be consumed by Python, the presence of the “type” information in the JSON schema before serialization will not cause problems when Python de-serializes it.

 

Why not try this out for yourself with some pre-built code? 

 It’s great having the rules, but there will be times when you just want to try it out for yourself. This is why I am including some Python code to serialize and deserialize JSON, and some Java methods with a bit a code to do the same. Much of what I have learnt to write this blog, came from serializing using Java, saving the byte array in a file, then deserializing using Python….and vice versa. Give it a go. 

 

Python Code 

Below is some code which will serialize JSON into Avro using Python. You will need to change the file paths to point to your own schemas and JSON files. The code below will serialize the JSON without including the Avro schema in the output. 

import io
import json
import avro.io

def deserialize_json(bytes):
    schema = avro.schema.Parse(avro_schema)
    reader = avro.io.DatumReader(schema)
    bytes_reader = io.BytesIO(bytes)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    # Deserialize JSON
    deserialized_json = reader.read(decoder)
    json_str = json.dumps(deserialized_json)
    return json_str

def serialize_json(json_data):
    schema = avro.schema.Parse(avro_schema)
    # Create Avro encoder
    writer = avro.io.DatumWriter(schema)
    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    # Serialize JSON
    writer.write(json.loads(json_data), encoder)
    return bytes_writer.getvalue()

#Read schema file into String
filename = '/Users/richardhall/Documents/Avro.txt'
f = open(filename, "r")
avro_schema = f.read()
f.close()
print('Avro Schema')
print(avro_schema)

#Read JSON file into String
filename = '/Users/richardhall/Documents/PYTHON_JSON.txt'
f = open(filename, "r")
input_json = f.read()
f.close()
print('Python JSON')
print(input_json)

#Read the schema
schema = avro.schema.Parse(avro_schema)

#Serialize Python JSON
output_bytes = serialize_json(input_json)
print('Serialized Python JSON')
print(output_bytes.decode("latin-1"))

#Write serialized content to file
filename = '/Users/richardhall/Documents/serialized_python.txt'
f = open(filename, 'w+b')
binary_format = bytearray(output_bytes)
f.write(binary_format)
f.close()

#De-serialize JSON
output_str = deserialize_json(output_bytes)
print('De-Serialized Python JSON')
print(output_str)

#Read JAVA serialized file into byte array - Comment out if not present
filename = '/Users/richardhall/Documents/serialized_json.txt'
f = open(filename, "rb")
binary_read = f.read()
f.close()
print('Serialized JAVA JSON')
print(binary_read.decode("latin-1"))


#De-serialize JSON from file - Comment out if not present
output_str = deserialize_json(binary_read)
print('De-Serialized Java JSON')
print(output_str)

Java Code 

Below is some code which will serialize JSON into Avro using Java. There are a couple of parts to this code. There is a routine for some static methods and some code making use of those methods. This has been written to be run using a tJava in a Talend Job, but can be run in any Java IDE.

The routine below requires the following Java libraries. These can all be found packaged with Talend Studio 7.1. Alternatives may work, but this has not been tested with other libraries…

The routine using these libraries can be seen below…

 

package routines;


import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;

import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;



/*
 * user specification: the function's comment should contain keys as follows: 1. write about the function's comment.but
 * it must be before the "{talendTypes}" key.
 * 
 * 2. {talendTypes} 's value must be talend Type, it is required . its value should be one of: String, char | Character,
 * long | Long, int | Integer, boolean | Boolean, byte | Byte, Date, double | Double, float | Float, Object, short |
 * Short
 * 
 * 3. {Category} define a category for the Function. it is required. its value is user-defined .
 * 
 * 4. {param} 's format is: {param} [()] [ : ]
 * 
 *  's value should be one of: string, int, list, double, object, boolean, long, char, date. 's value is the
 * Function's parameter name. the {param} is optional. so if you the Function without the parameters. the {param} don't
 * added. you can have many parameters for the Function.
 * 
 * 5. {example} gives a example for the Function. it is optional.
 */
public class AVROUtils {

    
	//Static schema variable
    static String schemaStr = null;
    

    /**
     * setSchema: sets the schemaStr variable.
     * 
     * 
     * {Category} User Defined
     * 
     * {param} string("{\"type\":\"record\",\"name\":\"Demo\",\"namespace\":\"com.talend.richard.demo\"}") schema: 
     * The schema to be set.
     * 
     * {example} setSchema("{\"type\":\"record\",\"name\":\"Demo\",\"namespace\":\"com.talend.richard.demo\"}") 
     */

    public static void setSchema(String schema){
    	schemaStr = schema;
    	
    }


    
    /**
     * jsonToAvroWithSchema: Serialize a JSON String to an Avro byte array including a copy of the schema in the
     * serialized output
     *	 
     * {talendTypes} byte[]
     * 
     * {Category} User Defined
     * 
     * {param} string("{ \"name\": {\"string\":\"Richard\"}}") json: The JSON to be serialized
     * 
     * {example} jsonToAvroWithSchema("{ \"name\": {\"string\":\"Richard\"}}") .
     */
    public static byte[] jsonToAvroWithSchema(String json) throws IOException {
        InputStream input = null;
        DataFileWriter writer = null;
        Encoder encoder = null;
        ByteArrayOutputStream output = null;
        try {
            Schema schema = new Schema.Parser().parse(schemaStr);
            DatumReader reader = new GenericDatumReader(schema);
            input = new ByteArrayInputStream(json.getBytes());
            output = new ByteArrayOutputStream();
            DataInputStream dis = new DataInputStream(input);
            writer = new DataFileWriter(new GenericDatumWriter());
            writer.create(schema, output);
            Decoder decoder = DecoderFactory.get().jsonDecoder(schema, dis);
            GenericRecord datum;
            while (true) {
                try {
                    datum = reader.read(null, decoder);
                } catch (EOFException eofe) {
                    break;
                }
                writer.append(datum);
            }
            writer.flush();
            writer.close();
               
            return output.toByteArray();
        } finally {
            try { input.close(); } catch (Exception e) { }
        }
    }


    
    /**
     * avroToJsonWithSchema: De-Serialize a JSON String from an Avro byte array including a copy of the schema 
     * inside the serialized Avro output
     *  
     * {talendTypes} String
     * 
     * {Category} User Defined
     * 
     * {param} byte[]("{ \"name\": {\"string\":\"Richard\"}}") avro: The JSON to be serialized
     * 
     * {example} jsonToAvroWithSchema(bytes) .
     */
    public static String avroToJsonWithSchema(byte[] avro) throws IOException {
    	
        boolean pretty = false;
        GenericDatumReader reader = null;
        JsonEncoder encoder = null;
        ByteArrayOutputStream output = null;
        try {
            reader = new GenericDatumReader();
            InputStream input = new ByteArrayInputStream(avro);
            DataFileStream streamReader = new DataFileStream(input, reader);
            output = new ByteArrayOutputStream();
            Schema schema = streamReader.getSchema();
            DatumWriter writer = new GenericDatumWriter(schema);
            encoder = EncoderFactory.get().jsonEncoder(schema, output, pretty);
            for (GenericRecord datum : streamReader) {
                writer.write(datum, encoder);
            }
            streamReader.close();
            encoder.flush();
            output.flush();
            return new String(output.toByteArray());
        } finally {
            try { if (output != null) output.close(); } catch (Exception e) { }
        }
    }

    
    /**
     * avroToJsonWithoutSchema: De-Serialize a JSON String from an Avro byte array without a copy of the schema in 
     * the serialized Avro output
     *  
     * {talendTypes} String
     * 
     * {Category} User Defined
     * 
     * {param} byte[]("{ \"name\": {\"string\":\"Richard\"}}") avro: The JSON to be serialized
     * 
     * {example} avroToJsonWithoutSchema(bytes) .
     */
    public static String avroToJsonWithoutSchema(byte[] avro) throws IOException {
        boolean pretty = false;
        GenericDatumReader reader = null;
        JsonEncoder encoder = null;
        ByteArrayOutputStream output = null;
        try {
        	Schema schema = new Schema.Parser().parse(schemaStr);
            reader = new GenericDatumReader(schema);
            InputStream input = new ByteArrayInputStream(avro);
            output = new ByteArrayOutputStream();
            DatumWriter writer = new GenericDatumWriter(schema);
            encoder = EncoderFactory.get().jsonEncoder(schema, output, pretty);
            Decoder decoder = DecoderFactory.get().binaryDecoder(input, null);
            GenericRecord datum;
            while (true) {
                try {
                    datum = reader.read(null, decoder);
                } catch (EOFException eofe) {
                    break;
                }
                writer.write(datum, encoder);
            }
            encoder.flush();
            output.flush();
            return new String(output.toByteArray());
        } finally {
            try {
                if (output != null) output.close();
            } catch (Exception e) {
            }
        }
    }
    
    
    /**
     * jsonToAvroWithoutSchema: Serialize a JSON String to an Avro byte array excluding a copy of the schema in 
     * the serialized Avro output
     *
     *  
     * {talendTypes} byte[]
     * 
     * {Category} User Defined
     * 
     * {param} string("{ \"name\": {\"string\":\"Richard\"}}") json: The JSON to be serialized
     * 
     * {example} jsonToAvroWithoutSchema("{ \"name\": {\"string\":\"Richard\"}}") .
     */
    public static byte[] jsonToAvroWithoutSchema(String json) throws IOException {
        InputStream input = null;
        GenericDatumWriter writer = null;
        Encoder encoder = null;
        ByteArrayOutputStream output = null;
        try {
            Schema schema = new Schema.Parser().parse(schemaStr);
            DatumReader reader = new GenericDatumReader(schema);
            input = new ByteArrayInputStream(json.getBytes());
            output = new ByteArrayOutputStream();
            DataInputStream dis = new DataInputStream(input);
            writer = new GenericDatumWriter(schema);
            Decoder decoder = DecoderFactory.get().jsonDecoder(schema, dis);
            encoder = EncoderFactory.get().binaryEncoder(output, null);
            GenericRecord datum;
            while (true) {
                try {
                    datum = reader.read(null, decoder);
                } catch (EOFException eofe) {
                    break;
                }
                writer.write(datum, encoder);
            }
            encoder.flush();
            return output.toByteArray();
        } finally {
            try { input.close(); } catch (Exception e) { }
        }
    }
}

 

The code below shows how you can use the methods above in a tJava component. You will need to change the file paths to point to your own schemas and JSON files. This code will serialize the JSON without including the Avro schema in the output.

 

//Set Avro schema
String avro_schema = "";
String avroFilePath = "/Users/richardhall/Documents/Avro.txt";
  
try{
	avro_schema = new String ( java.nio.file.Files.readAllBytes( java.nio.file.Paths.get(avroFilePath) ) );
}catch (IOException e){
	e.printStackTrace();
}
System.out.println("Avro Schema");
System.out.println(avro_schema);
routines.AVROUtils.setSchema(avro_schema);

//Set the JSON configured for JAVA
String json = "";
String jsonFilePath = "/Users/richardhall/Documents/JAVA_JSON.txt";    
try{
	json = new String ( java.nio.file.Files.readAllBytes( java.nio.file.Paths.get(jsonFilePath) ) );
}catch (IOException e){
	e.printStackTrace();
}
System.out.println("JSON");
System.out.println(json);

//Serialize the JSON
byte[] byteArray = routines.AVROUtils.jsonToAvroWithoutSchema(json);
System.out.println("Serialized JSON");
System.out.println(new String(byteArray));

//Write the Serialized JSON to a file
String serializedFileJava = "/Users/richardhall/Documents/serialized_json.txt"; 
java.io.FileOutputStream fos = new java.io.FileOutputStream(serializedFileJava);
fos.write(byteArray);
fos.close();

//De-serialize the JSON
String deserializedJson = routines.AVROUtils.avroToJsonWithoutSchema(byteArray);
System.out.println("De-Serialized JSON");
System.out.println(deserializedJson);

//Load the serialized JSON file produced by Python -- Comment out if not present
String serializedFilePython = "/Users/richardhall/Documents/serialized_python.txt"; 
java.io.File file = new java.io.File(serializedFilePython);
byte[] serializedJsonFromPythonFileBytes = new byte[(int) file.length()]; 

java.io.FileInputStream fis = new java.io.FileInputStream(file);
fis.read(serializedJsonFromPythonFileBytes); //read file into bytes[]
fis.close();

//De-serialize the JSON file produced by Python -- Comment out if not present
deserializedJson = routines.AVROUtils.avroToJsonWithoutSchema(serializedJsonFromPythonFileBytes);
System.out.println("De-Serialized JSON from Python");
System.out.println(deserializedJson);    

 

Running the code above (Python and Java) will allow you to build an Avro schema and test it against both Python and Java. It will also allow you to serialize using one language and deserialize using the other. This should help you deal with Avro issues before you get anywhere near trying to use it with Talend Pipeline Designer.

Join The Conversation

0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *