Avro Three Ways

by
Tags: ,
Category:

In my previous post I recommended using Avro for file storage in a data lake. It has the benefits of compact storage and a schema in every file that tells you what data it holds. But how do you create those files?

If you’re using Spark, you can use the spark-avro package. But Spark is often overkill, especially if your goal is to transform individual files as they drop into a staging area. As it turns out, writing Avro files directly isn’t difficult, and the Avro libraries are relatively small (so can fit into a Lambda deployment bundle, unlike the Parquet libraries).

This post looks at three ways to write Avro files: the Apache Java library, the Apache Python library, and the fastavro library for Python. With each, I show how to write a sample file, and call out any of the quirks that might trip you up. If you’d like to try the example code, you’ll find it here.

The Schema

The core of an Avro file is its schema, which defines the structure of the records in the file. Each file has a single schema; you can’t mix data of different types in the same file.

Avro provides a limited set of “primitive” types: strings, various types of numbers, byte arrays, and nulls. Things like fixed-with decimal fields are “logical” types, which are based on one of the primitive types. For example, timestamps are stored using the long primitive type, holding a count of milliseconds since the Unix epoch.

Avro also supports nested records and arrays, as well as union types (which can be viewed as a way to support mixed types, but is more often used to identify nullable fields: a union of a base data type with the null type).

My sample data is simple: a user activity event that you might get from an eCommerce site.

{
  "eventType": "checkoutComplete",
  "eventId": "dae0e6cc-19e7-4669-b850-9861af09a2f6",
  "timestamp": "2021-08-03 05:11:24.044",
  "userId": "bdb4fd7e-9ddb-469a-8e1e-f2c88bfbaa51",
  "itemsInCart": 1,
  "totalValue": 23.25
}

While this is a simple event, it highlights two of the problems with storing data in JSON:

  • There’s no native representation for dates and times. In this case, I’m using a string representation that’s almost-but-not-quite ISO-8601: it uses a space to separate date and time, and doesn’t have a timezone. If you just store the file in a data lake as-is, every query has to apply its own logic to deal with that inconsistency.
  • Monetary values need to be represented exactly, but most JSON parsers turn numbers into a floating-point value. Avro gives us the opportunity to write the value with a fixed number of decimal places, even if the source data omits decimal places completely.

Avro schemas are typically represented in JSON:

{
  "namespace": "com.chariotsolutions.example.avro",
  "type": "record",
  "name": "CheckoutComplete",
  "fields": [
    {
      "name": "eventType",
      "type": "string"
    },
    {
      "name": "eventId",
      "type": "string"
    },
    {
      "name": "timestamp",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "userId",
      "type": "string"
    },
    {
      "name": "itemsInCart",
      "type": "int"
    },
    {
      "name": "totalValue",
      "type": {
        "type": "bytes",
        "logicalType": "decimal",
        "precision": 16,
        "scale": 2
      }
    }
  ]
}

This schema shows a mix of primitive and logical types. To keep it simple, I did not use the uuid logical type for the various ID fields. Note that the decimal type is stored using an arbitrary-length byte array that holds the unscaled binary value — just like Java’s BigDecimal type.

With the schema in hand, let’s move on to create Avro data files.

Java: Apache Avro

My Java example uses the Jackson object mapper to parse JSON source files. This library can be configured to read numeric values as java.math.BigDecimal values, rather than floating-point:

ObjectMapper mapper = new ObjectMapper();
mapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);

There are, however, a few quirks with its behavior. First is that it doesn’t process all numeric fields as BigDecimal: if your source data doesn’t have a decimal point, it will parse as an integral type (Integer, Long, or java.long.BigInteger depending on size). There’s also no way to limit the scale (the number of digits to the right of the decimal point) of the parsed value.

To solve both of these problems, I convert whatever Jackson gives me into a String, and then parse that to create a BigDecimal value. I explicitly set the sale of the result to fit our schema, and use “banker’s rounding” in case the source has more decimal places than expected:

BigDecimal totalValue = new BigDecimal(data.get("totalValue").toString())
                        .setScale(2, RoundingMode.HALF_EVEN);

In the case of a timestamp, we need to turn that into a java.time.Instant, from which we get the millis-since-epoch value to store in the file. This requires a custom date/time parser, which recognizes the not-quite-ISO-8601 format of the source file:

private final static DateTimeFormatter TIMESTAMP_PARSER 
                     = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")
                       .withZone(ZoneOffset.UTC);

Next, I load the Avro schema (which I stored on the filesystem so that it could be shared between examples):

Schema schema;
try (InputStream in = new FileInputStream(schemaFile))
{
    schema = new Schema.Parser().parse(in);
}

OK, we’re ready to start writing the data. And here is where things get really challenging, because the Avro docs don’t show how to write logical types!

However, the Java implementation provides a tool to generate helper classes that read and write Avro records (and since the example uses a fixed schema, using those classes would actually be a better solution, but I’m trying to capture the bigger picture). So I generated the helper class from my schema, and discovered the classes that perform these conversions.

For fixed-precision numeric values, you use org.apache.avro.Conversions.DecimalConversion; for timestamps org.apache.avro.data.TimeConversions.TimestampMillisConversion. So the first step is to create instances of those classes (while not documented as thread-safe, they have no member variables, so I’m holding as a static variable):

private final static TimestampMillisConversion  TIMESTAMP_CONVERTER = new TimestampMillisConversion();

private final static DecimalConversion          DECIMAL_CONVERTER = new DecimalConversion();

To use these converters, you call the appropriate conversion function with the source value and information about the logical type that you’re converting into:

TIMESTAMP_CONVERTER.toLong(timestamp, schema, timestampLT);

DECIMAL_CONVERTER.toBytes(totalValue, schema, totalValueLT);

And to get the logical types for those calls, you can turn to the parsed schema:

LogicalType timestampLT = schema.getField("timestamp").schema().getLogicalType();

LogicalType totalValueLT = schema.getField("totalValue").schema().getLogicalType();

With all that out of the way, loop through the source records and write your Avro file!

DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(schema);
try (FileInputStream fis = new FileInputStream(inputFile);
     InputStreamReader isr = new InputStreamReader(fis, StandardCharsets.UTF_8);
     BufferedReader in = new BufferedReader(isr);
     DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(datumWriter))
{
    writer.create(schema, outputFile);

    String line = null;
    while ((line = in.readLine()) != null)
    {
        // you’ll have to click on the example to see the rest

The benefits of the Java implementation are that it’s performant and portable. The drawback is that – like many Java programs – there is a lot of ceremony involved.

Python: Apache Avro

By comparison, the Python implementations are extremely simple, but not without their quirks. I’ll start with the “official” implementation from Apache. This is written in pure Python, which means that it can be deployed into any environment.

As I said, the main loop is simple:

with open(schema_file) as f:
    schema = avro.schema.parse(f.read())
    
with DataFileWriter(open(avro_file, "wb"), DatumWriter(), schema) as writer:
    with open(json_file, "r") as f:
        for line in f.readlines():
            writer.append(transform_record(line))

The transform_record() function is where things get interesting. It’s passed a JSON string from the source file, parses it, and returns a dict for DataFileWriter:

def transform_record(json_str):
    data = json.loads(json_str, parse_float=str, parse_int=str)
    return {
        "eventType":    data['eventType'],
        "eventId":      data['eventId'],
        "timestamp":    dateutil.parser.parse(data['timestamp']).replace(tzinfo=timezone.utc),
        "userId":       data['userId'],
        "itemsInCart":  int(data['itemsInCart']),
        "totalValue":   decimal.Decimal(data['totalValue']).shift(2).to_integral_value(decimal.ROUND_HALF_EVEN)
    }

By default, the Python json module parses numerics into one of the standard Python numeric classes: anything that looks like an integer becomes int, and anything with a decimal place becomes float. Since I don’t want this, I tell json.loads() to maintain them as strings via the optional parse_int and parse_float arguments. Then I can parse those strings into the exact form that I want.

To accurately represent a monetary value, I parse the totalValue as a Python Decimal. Unlike Java’s BigDecimal, you can’t specify the scale of a Python Decimal: it uses whatever scale it finds in the parsed string (so “123.45” has scale of 2, but “123.450” has scale of 3).

That’s not great, but the Avro library makes it worse: it applies the scale defined for the column, completely ignoring the scale of the original number. So if my source is “123.4”, the Avro library will store the value as 12.34. This seems like a bug to me, but I didn’t see it in the issue tracker, so apparently is expected behavior. It’s the developer’s responsibility to make a compatible value.

To do that, shift the desired number of digits from the right of the decimal point to the left, and then give Avro the integral portion of that shifted value. For example, my field has a defined scale of 2, so 123.4 becomes 12340, and 123.456 becomes 12345.6. As with Java, I use banker’s rounding to deal with any remaining fractional component. The library applies the column’s scale when writing, so my numbers end up as the desired values when reading. It’s not pretty, but it works.

Timestamp parsing is also quirky. I can’t use the built-in datetime.fromisoformat() method, since the strings in the file aren’t in ISO-8601 format. So instead I turn to the third-party dateutil library, which has flexible parsing (it recognizes the field’s format out of the box). However, it returns a “naive” datetime, so I must explicitly set its timezone to UTC. With that done, DataFileWriter properly converts the value into milliseconds and stores it in the file.

This Python code is certainly simpler than the Java code, but still requires that you transform logical types into primitive. And since it’s written in pure Python, it will be slower than a transform that can use an underlying C library.

Python: Fast Avro

That brings me to the fastavro library. It advertises itself as 10x faster than the Apache library. But what I like most about it is that it does the Right Thing™ with decimal numbers.

Unlike the libraries that we’ve seen earlier, which handle a record at a time, fastavro processes a list of records:

with open(schema_file) as f:
    schema = fastavro.parse_schema(json.load(f))

with open(json_file, "r") as f:
    transformed = [transform_record(line) for line in f.readlines()]
    
with open(avro_file, 'wb') as f:
    fastavro.writer(f, schema, transformed)

As you can see, the transform_record() function is still around. It looks a lot like the previous example. What’s different is that it doesn’t need to do any tricks with the Decimal value; fastavro applies the proper scale:

def transform_record(json_str):
    data = json.loads(json_str, parse_float=str, parse_int=str)
    return {
        "eventType":    data['eventType'],
        "eventId":      data['eventId'],
        "timestamp":    dateutil.parser.parse(data['timestamp']).replace(tzinfo=timezone.utc),
        "userId":       data['userId'],
        "itemsInCart":  int(data['itemsInCart']),
        "totalValue":   decimal.Decimal(data['totalValue'])
    }

The one thing that concerned me about the fastavro library is that it uses a native library. As anyone who’s deployed a Lambda that includes the psycopg-binary package knows, this can require some build tricks. I was pleasantly surprised, then, when I built a deployment package with fastavro on my Ubuntu laptop, and deployed it to Lambda without a problem – including deployment on an ARM runtime, even though my laptop is x86.

Conclusion

If you have incoming data in JSON, any of these libraries should make it easy for you to convert that data into Avro for storage in your data lake. My first choice is the fastavro library in Python, because it offers the easiest development workflow.

 


 

Can we help you?

Ready to transform your business with customized data engineering solutions? Chariot Solutions is your trusted partner. Our consultants specialize in managing software and data complexities, tailoring solutions to your unique needs. Explore our data engineering offerings or reach out today to discuss your project.