Aggregating Files in your Data Lake – Part 3

by
Tags: , ,
Category: ,

Several years ago, I wrote the post Friends Don’t Let Friends Use JSON (in their data lakes). The crux of that post was that JSON doesn’t provide enough information about the data, requiring that knowledge to be built into every query. Yet I’ve seemingly ignored this advice, aggregating the raw CloudTrail logs into a JSON file. Isn’t that being a bit inconsistent?

In this post I’m going to take the next step, reading a month’s worth of JSON files and writing them out in Parquet format.

Do We Need to Do This?

One of my key arguments for not storing JSON is that every query has to know the details of the data. But with CloudTrail, that’s a given anyway: the CloudTrail event representing an EC2 RunInstances operation is dramatically different from the event representing a Secrets Manager GetSecretValue call. Queries that dig into the details of an event already need to know the structure of that event’s data.

Moreover, attempting to extract every possible field from every possible CloudTrail event is a fool’s errand. AWS adds new events, or enhancements to existing events, on a near-daily basis. And if you did extract each individual field, you’d have an extremely sparse file, which would not make for efficient storage.

Finally, in terms of performance, a daily aggregation is acceptable: query times are measured in seconds rather than minutes. Going from high single-digit seconds to low single-digit seconds isn’t going to measureably improve anybody’s life.

With that said, this series of posts has never been just about CloudTrail; it’s about any data source that’s bursty and results in lots of small files. And even with CloudTrail data, we can decide on some rules that make Parquet useful:

  • Aggregate a month’s worth of data at a time
  • In general, Parquet works best when you can store your data in large files. But against that, you want to be able to run your queries in parallel, and although Parquet is a “splittable” format, Athena seems to work best when it deals with multiple files. You may also want to partition your files such that you have a “rolling” active set, from which you drop files that are no longer needed.

    In my opinion, monthly aggregation meets all of these criteria for CloudTrail. You will have enough events to make an efficiently-sized file (at least with an active AWS environment), and enough parallelism to let Athena shine (especially with a few years of data). If you’re aggregating a different data source, you might make a different decision.

  • Store variant fields as stringified JSON
  • Many of the top-level fields in a CloudTrail event are simple scalars, common to all events. Where events differ is in the contents of a half-dozen nested objects, such as requestParameters and responseElements. Rather than attempt to extract fields from those nested structures, we’ll simply write them a stringified JSON; queries will have to parse that JSON to extract event-specific values.

    I’ll note here that the AWS-provided CloudTrailInputFormat does the same thing, at least for the requestParameters and responseElements fields.

Implement as a Second Aggregation Step

Based on the performance numbers that I highlighted in my last post, it should be clear that we can’t simply process raw files into Parquet, at least using Lambda. And while we could switch from Lambda to a service that doesn’t have timeouts, such as ECS (Elastic Container Service) or AWS Batch, I think there are some definite benefits to using a second aggregation step:

  • Failures won’t disrupt a long-running task
  • If it takes three minutes for a single day’s aggregation, then an entire month will take 90 minutes. A failure halfway through means wasted money, and potentially more time spent debugging. Worse, it may encourage attempting to recover from failures, or writing partial results, which goes against my resiliency principles.

  • The business wants partial months
  • Waiting a month for up-to-date data is a non-starter in just about every place I’ve worked. So you might be running the monthly aggregation every night, using month-to-date data. Re-aggregating the same source files over and over makes me cringe.

  • You can produce multiple aggregations
  • The daily aggregated files contain all of the source CloudTrail data. You may not want all of that data for your aggregations (for example, I’ve dropped UserAgent). But then you might find that you do, in fact, need something that you’ve dropped. Or you have a different business user that wants a different view of the data (again, it’s not just CloudTrail I’m talking about). It will be much faster to implement this using daily aggregated files rather than going back to the source.

Preserve the Result of the First Step

At this point you might be wondering: should you keep the output from the first step. Perhaps the second step should delete those files once it processes them?

In my opinion, no, because this violates the principle of idempotency: if you run the transform and it deletes its source data, then you can’t run it again without regenerating that source data.

Storage on S3 is cheap. You can transition the daily aggregated files to infrequent access storage and pay just over a cent per gigabyte/month; or go to Glacier, and pay a fraction of that. If you do feel that the first-stage aggregation should have a limited life, then I recommend deleting these files using a life-cycle rule that runs long after the transform.

What Library/Language?

Parquet originated in the Hadoop ecosystem, which is written in Java. Does it therefore make sense to write this second stage in Java, using the official Parquet library?

I’m going to answer by saying that I’ve written a Java program to generate Parquet. The only how-to that I could find online directed the reader to first write the output in Avro, then use an Avro-Parquet translation library. Instead, I spent several days digging through the Parquet source code and figuring out how to do it directly. And promised myself that I would never do that again. It’s been several years, so things might be easier, but I suspect the library has just deprecated more functionality and my hard-won knowledge is no longer useful.

Instead, I’m sticking with Python and using PyArrow, a Python wrapper over the Apache Arrow library. Arrow provides in-memory data manipulation; it also supports a large number of file formats, allowing it to be a single dependency for data conversion. PyArrow also integrates with the NumPy and Pandas libraries, letting you use Arrow to read and write your files, and those libraries to manipulate them.

PyArrow does have its drawbacks, which I’ll call out later.

Transformation

The core of this Lambda function looks a lot like the daily aggregation function: it finds a list of files, accumulates their individual records until a predefined limit is reached, and then writes the output. It differs in that the predefined limit is number of rows rather than output file size, because file size isn’t easily determined when writing Parquet.

There’s another significant change from the daily version: I transform field names from camelCase to snake_case. Athena, like any SQL database, very much wants case-insensitive names. If you use a Glue crawler to create the table definition, it will simply lowercase the source key name, producing an identifier that’s hard to read. Since I need to do explicit record transformation anyway, it makes sense to perform that extra step at the same time (note, however, that the embedded objects continue to use camel case).

Schema

One of the key features of Parquet is that each file has a schema that describes the columns it holds. PyArrow provides helper functions to define these fields, making for a compact description:

SCHEMA = pa.schema([
    pa.field('event_id', pa.string()),
    pa.field('request_id', pa.string()),
    pa.field('shared_event_id', pa.string()),
    pa.field('event_time', pa.timestamp('ms')),
    # ...
    pa.field('resources', pa.string()),
])

Most of the fields in the data record are stored as strings, which reflects the original data (not a lot of numbers in CloudTrail). The one exception is event_time (eventTime in the source), which holds an ISO-8601 timestamp. Converting it to a native timestamp type will permit temporal queries without the need to transform at query time.

Transformation Function

The transformation function takes a JSON object from the source data and turns it into a Python dict that matches the PyArrow schema. Along the way it parses the event timestamp and converts sub-objects into stringified JSON.

My implementation is table-driven: for each field in the output, I specify the source field name, the destination field name, and a Python function that transforms the source field value. For example, the event ID is a string in the source and a string in the destination, so I use the Python str() function as a “null” transform. By comparison, the resources field is a nested object, so I use json.dumps() to strinigify it.

TRANSFORM = [
    # src field name            dst field name              transform fn
    #------------------------------------------------------------------
    ('eventID',                 'event_id',                 str),
    ('requestID',               'request_id',               str),
    ('sharedEventID',           'shared_event_id',          str),
    ('eventTime',               'event_time',               datetime.fromisoformat),
    # ...
    ('resources',               'resources',                json.dumps),
]

With this table, the actual transformation function is quite short: if the source record has a particular field, it applies the specified function to it. If not, it stores None, which becomes a null in the Parquet file.

def transform_record(src_rec):
    rec = json.loads(src_rec)
    xformed = {}
    for src_key, dst_key, xform_fn in TRANSFORM:
        if rec.get(src_key):
            xformed[dst_key] = xform_fn(rec.get(src_key))
        else:
            xformed[dst_key] = None
    return xformed

I like table-driven transformations, because they provide a lot of flexibility. Adding or removing fields from the transform is a matter of adding or deleting entries in the table. If you need to dig into arbitrary objects for your source fields, you can use JSONPath expressions. You can write a custom transformation function if built-in functions aren’t sufficient. And you could also put the type information in the transformation table and generate the PyArrow schema from it (I didn’t do that here to maintain clarity, but would in a real-world transform).

The risk with table-driven transforms is that you try to encapulate too much functionality in the table. It’s hard to say what “too much” means, but if you don’t understand the transformation table a week after writing it, it’s time to step back and simplify.

Writing the File

PyArrow makes writing output easy:

table = pa.Table.from_pylist(records, schema=SCHEMA)
pq.write_table(table, s3_url, compression='SNAPPY')

The first line transforms a list of Python dicts into Arrow in-memory structures, using the defined schema. The second line writes that file directly to S3; no need to stage it locally.

Deployment: PyArrow’s “Big” Problem

While PyArrow makes it easy to read and write different file formats, it’s not so easy to deploy on Lambda. The first reason is that it exceeds Lambda’s 50 MB (compressed) bundle size. You can’t simply package a build directory and deploy it.

You can solve this with Lambda Layers, but that reveals that second problem with PyArrow: because it includes a native library, you must match both Python version and processor architecture to the Lambda that uses it. To do this reliably, you need to use one of Amazon’s published Docker images, as I describe in this post (and also in the README for the example code).

To be honest, I think it’s a lot of effort, and the only real benefit is that you’ll be able to tweak the Lambda’s code using the Console – something that’s not a great idea anyway. So, instead, I use a Lambda Container Image for this Lambda.

FROM amazon/aws-lambda-python:3.11

RUN pip install boto3 pyarrow

COPY lambda.py ${LAMBDA_TASK_ROOT}

CMD [ "lambda.lambda_handler" ]

Conclusion

This has been a long series of posts, and you may be wondering exactly what results these aggregations have. To answer that question, I ran an Athena query to find the top 10 calls by API name:

SELECT event_name, count(*)
FROM "default"."cloudtrail_monthly" 
GROUP BY 1
ORDER BY 2 desc
limit 10;

The source data was our 7,703,861 CloudTrail events from 2023. I ran this query against the events stored as raw CloudTrail log files, the daily NDJSON aggregation, and the monthly Parquet aggregation. Here are the results:

  Number of Files Data Scanned Query Time
Raw CloudTrail Logs 1,673,712 3.09 GB 2 min 46.505 sec
Aggregated by Day (NDJSON) 367 1.99 GB 7.889 sec
Aggregated by Month (Parquet) 38 4.46 MB 1.495 sec

The Parquet version is not significantly more performant than the NDJSON version – at least when compared to the raw files – but is dramatically lower in terms of data scanned. Since this is the key cost component for Athena queries, that may be sufficient to argue for multi-step aggregation.