Perils of Partitioning

by
Tags: , , ,
Category:

Partitioning is one of the easiest ways to improve the performance of your data lake, because it reduces the amount of data scanned. But implementing partitions can be surprisingly challenging, as can their effective use. In this post I look at several of the issues that you should consider when partitioning your data.

What is partitioning?

Partitioning is a way to organize your data so that the path to the data (in S3 terms, the “prefix”) contains information that can be used to reduce the number of files processed by a query. For example, consider the following two files:

sales_data/2024/02/07/23bf644b36e55a84.parquet
sales_data/2024/01/12/3ef3e9ade331514c.parquet

As you can see, the file keys incorporate a date: 2024-02-01 for the first, and 2024-01-12 for the second. There may be tens of thousands of such files in your S3 repository, and without partitioning your Athena queries would have to read all of them, all of the time. But if you tell Athena about these embedded values, then you can write a query that will read one of these files but not the other:

select  customer_id,
        sum(amount) as total_sales
from    sales_data
where   year = '2024'
and     month = '02'
group   by 1
order   by 2 desc
limit   10;

You can use partition fields anywhere in your query, not just in the WHERE clause. And you don’t need to specify all (or any!) of the fields in your WHERE clause. For example, to compare your year-over-year February sales:

select  year,
        sum(amount) as total_sales
from    sales_data
where   month = '02'
group   by 1
order   by 1;

If you don’t specify any partition fields in your WHERE clause, then Athena scans all of the data associated with the table.

Defining a table with partitions

This is easy: add the PARTITIONED BY clause to your CREATE TABLE statement, and specify the fields that represent partition values.

CREATE EXTERNAL TABLE example (
    -- data field definitions
)
PARTITIONED BY ( 
    year                    string, 
    month                   string,
    day                     string
)
-- SerDe configuration, location, and table properties

Simple, but how does Athena know which S3 prefixes correspond to which partition values? There are two ways to do this.

The first is that you maintain a list of partitions in the Glue Data Catalog. You can use the CreatePartition API from a Lambda or other program, the ALTER TABLE ADD PARTITION or MSCK REPAIR TABLE statements from within Athena, or let a Glue Crawler identify the partitions for you. When you use explicit partitions, Athena makes a “pre-query” to the Glue Data Catalog to find those partitions that apply to the query.

The alternative is partition projection, in which you define rules for each of the partition values, and a template for S3 paths based on those rules:

TBLPROPERTIES (
    'projection.enabled'        = 'true',
    'storage.location.template' = 's3://com-example-mybucket/example/${year}/${month}/${day}/',
    'projection.year.type'      = 'integer',
    'projection.year.range'     = '1970,2038',
    'projection.year.digits'    = '4',
    'projection.month.type'     = 'integer',
    'projection.month.range'    = '1,12',
    'projection.month.digits'   = '2',
    'projection.day.type'       = 'integer',
    'projection.day.range'      = '1,31',
    'projection.day.digits'     = '2'
);

Partition projection can be much more performant if you have a large number of partitions, because it doesn’t require calls to the Glue Data Catalog. In my experience, those calls add several seconds to every query against a highly partitioned table.

Hive-style vs unadorned partition values

The last quirk of partitioning that I want to call out is that there are two ways to specify partitions. Above, I showed what I call “unadorned” partition values. There’s also “Hive-style” values, which combine the field name and value:

sales_data/year=2024/month=02/day=07/23bf644b36e55a84.parquet
sales_data/year=2024/month=01/day=12/3ef3e9ade331514c.parquet

Hive-style partition schemes are more amenable to Glue crawlers. With unadorned values, the crawler names the partition fields partition_0, partition_1, and so on.

When you use a CREATE TABLE AS or INSERT INTO from Athena, it will use Hive-style partition names, as do the default writers for Glue. However, in my experience unadorned values are more common. As you’ve seen in my prior posts, CloudTrail logs use this format. By default, Kinesis Firehose produces files with unadorned partitons, as do many of the third-party services that I’ve worked with.

With the intro done, let’s move on to the issues.

Issue #1: do your queries actually use partitions?

Here’s a slight variation on my earlier query: it removes the predicate on year and month, and so finds the all-time top 10 customers. To do this, it must scan the entire dataset.

select  customer_id,
        sum(amount) as total_sales
from    sales_data
group   by 1
order   by 2 desc
limit   10;

In my experience, these types of queries are distressingly common, which is why I made it the top issue. The simplest solution, of course, is to rewrite the query to use an appropriate partition field. After all, when it comes to customer sales, “what have you done for me lately” is usually more important than “what did you do in the past 10 years.”

If you can’t use partitioning, and the query is run frequently – perhaps it’s part of a dashboard – then the best solution is to create a summary table that’s updated on a regular basis.

How granular this table must be will depend on your needs: if you break down the data by date range as well as total, then a daily “fact” table will be required. But if it’s just a case of running a five-minute query once a day rather than each time a dashboard is refreshed, by all means create a table with just the columns customer_id and total_sales.

Issue #2: does your data organization reflect your access patterns?

The CloudTrail filename structure has six partitions: organization, account, region, year, month, and day:

s3://com-example-mybucket/AWSLogs/o-x92e8b2muz/123456789012/CloudTrail/us-east-1/2023/04/09/123456789012_CloudTrail_us-east-1_20230409T0000Z_zfMuDKoxabwgrBkb.json.gz

This naming structure results in poor performance if most of your queries involve date ranges, as Athena must first identify all S3 prefixes that end with those dates, and then use each prefix to select the associated files. This may translate to dozens or hundreds of calls to S3. And while each of these calls executes quickly, it adds up.

If possible, your partition hierarchy should reflect your most common access patterns. For most real-world data, that means organized by date, usually year/month/day. If you have control over the pipeline that writes data to S3 – for example, a Kinesis Firehose – then it’s a simple matter to configure that pipeline to write data in the desired hierarchy.

If you don’t have control over the source data pipeline, then it’s worth investing in a Lambda that restructures the data for use. This Lambda can be simple, responding to a new file on S3 by copying that file to a different location that has the desired hiearchy. Or it can be complex, aggregating multiple source files into one destination file.

Issue #3: are your partitions too granular?

This is a point that I keep repeating: small files are Kryptonite for Athena. It may be tempting to partition your data by more than just date. For example, with retail sales data, you might think of adding the store ID as a partition, to improve single-store queries. If you have only a few stores with tens or hundreds of thousands of transactions a day, that may be fine. If you have hundreds or thousands of stores, with a relatively small number of transactions in each, not so much.

With that said, if you write a lot of queries that select a particular store (or group of stores), then it’s again worthwhile to create another table that organizes data by store, preferably aggregating per-store records by year or month to ensure a reasonable file size.

Issue #4: do your partitions cover all of your data?

As I mentioned earlier, Athena has two ways to identify the partitions in your data: an explicit list of partitions stored in the Glue data catalog, or partition projection. If you write a query involving partition fields, but Athena can’t find or construct that partition, then it won’t read any files and the query won’t return any results.

This is most often a problem with explicit partitions, because you must update the list every time there’s a new “directory” in your data. If you’re partitioning by date, that has to happen every day. Often, this becomes a job for a Glue crawler.

Partition projection, however, isn’t immune to the problem of missing partitions. In my examples, I use a very wide range of years: 1970 to 2038. This introduces some inefficiency if your queries don’t specify the year, but that should be rare. On the other hand, if you set a range of, say, 2019 to 2023, which covers all of your existing data, but then forget to update before New Years Day, your queries can’t ask for data from 2024 (I should note here that Athena supports a date partition type, which can use a range that’s relative to the day the query is run; I tend to prefer explicit values out of habit).

Another place where partitions can be missed is in enumerated values. Using CloudTrail as an example, you must explicitly list values for the account number and region partitions; there’s no way to generate them from a formula. If you add a new account, or AWS adds a new region, you won’t see those entries until you update the table definition (CloudTrail is especially prone to this problem because it puts account and region at the top of the partition hierarchy).

Issue #5: do your partitions reflect the actual data?

Date-based partitioning is pretty common; most business-analytics queries are restricted to a range of dates. But consider the case where your S3 data is built by Kinesis Firehose, from events written to a Kinesis Stream. Firehose is happy to partition your data by date … but by default it’s the date when the file was written. So if an event from 11:59 PM gets written to a file at 12:01 AM, and your query just aggregates by partition date, that event will be counted against the wrong day.

Often, this doesn’t matter. In business, there’s a big difference between financial analysts, who are allowed to use floating point numbers and pencils with erasers, and accountants, who aren’t. If you’re building a dashboard of sales data, then it probably doesn’t matter which day a particular sale appears on (at least until your top salesperson doesn’t see their month-end “whale” showing up when they expected it).

Unfortunately, the only way that you can have valid partitions from an accounting perspective is to repartition the data using the actual record timestamp. In other words, reading the files. And at this point you might find to your dismay that the timestamps in your source data aren’t consistent. Some might be ISO-8601 strings (with or without timezone), some might be Java “millis since epoch” values. Some might be missing entirely, or have bizarre values that come from some source system misinterpreting data from another source system.

Conclusion

To restate my introduction, partitioning can improve query performance by reducing the number of prefixes scanned, and therefore the number of files read. But it’s not a silver bullet, and blindly creating partitions may in fact decrease performance. Ultimately, it’s the number and size of the files that Athena reads that determines performance.

And for what it’s worth, I think partitioning by date – and nothing else – is the correct choice in almost every case.