My last few posts have focused on Redshift and Athena, two specialized tools for managing and querying Big Data. But there’s a meme that’s been floating around for at least a few years that you should just use Postgres for anything data-related. It may not provide all of the features and capabilities of a dedicated tool, but is one less thing to learn and manage. Should this advice also apply to your data warehouse?
The answer is … maybe. Postgres is an extremely capable database, and modern hardware provides an amazing amount of performance. And, to be honest, most analytics workloads aren’t really that big. As I said in an earlier post, the 100,000,000 rows of my “clickstream” dataset can fit quite easily on my laptop, in a Postgres database; it seems like “big data” but it isn’t.
There isn’t necessarily a cutoff between “small data” and “big data.” In the past, I’ve written “big data style” queries on half-billion-row tables using a read replica from an OLTP system. It’s mostly a matter of breaking the OLTP mindset.
What is the OLTP mindset?
Online transaction processing (OLTP) is all about minimizing the number of rows touched by a query. Your fintech might have a million customers, but each of them has only a few accounts, and each account might average a few dozen transactions per month. When a customer logs in, the queries to retrieve their account information execute quickly.
By comparison, a typical decision-support query might look at all of your customers’ transactions over the past month, to find those with transaction amounts over $10,000. This requires a different approach to query optimization: you will scan all, or a significant number of rows from, potentially large tables.
While database systems like Redshift are optimized for these sorts of queries, Postgres needs some help. OLTP data is typically indexed by primary key, with possibly a few other indexes to support common access paths; such indexes don’t help with decision-support queries. OLTP data is also highly normalized, while decision support data might replicate fields from one table to another, or even create duplicate copies of entire tables.
Overcoming row-oriented storage
Redshift, Parquet, and ORC share one common feature: they store each column’s data independently. This means that, to find all transactions above $10,000, they only need to read the disk blocks that hold the “amount” field. This field might take 16 bytes, so if you have 1,000,000,000 rows in your table, finding those transactions means scanning 16 GB. Postgres, by comparison, stores all of the fields together, so has to read every block of the table to identify those transactions. Even if you have a minimal transaction table – transaction ID, account ID, timestamp, transaction type, and amount – that can easily exceed 100 GB.
There are two primary ways to simulate columnar data storage. The first is a “covering index”: an index that contains all of the fields used by the query. You would never create an index in an OLTP database that just holds the transaction date and amount, because that’s not a way that any rational OLTP application would access the data. But it might be useful in a decision support database (or it might not – see below).
Alternatively, you can create a separate table that just contains the columns that you use, with a foreign-key reference back to the parent table. This is especially appropriate for data such as user profiles, that contain many fields that are potentially useful but not queried. For example, you might care about the postal code and country for your users, but not any of their other address data. Extracting those two columns into their own table will reduce the number of disk blocks scanned, at the cost of a join for the rare times when you need that additional data.
The first thing to understand about indexes is that they probably won’t be used. Especially indexes that exist to facilitate joins. With large tables Postgres usually decides to scan the tables and perform a hash or sort-merge join. Even if the index covers all of the join keys and any other columns used by the query.
Where an index may help is if it contains the columns commonly used for predicates. In this case, pick columns that are selective: product ID is a good candidate, gender isn’t. Postgres
can use multiple indexes for a single query, so if you don’t always use the same columns in your
WHERE clauses it may be better to create separate indexes (as always, verify this with testing).
If you will be updating rows (versus an append-only table), you should add an index on the table’s primary key: that’s one place you don’t want table scans!. And if you’ve partitioned your table, note that the primary key must include the partitioning column, even if it’s not part of the logical key.
Partitioning is perhaps the best way to turn Postgres – or any other OLTP database – into a decision support engine. You may have five years of data in your transaction table, but most queries won’t look at all of that data; more likely is that you’ll query just a single month.
You might think that you could accomplish this with an index on the timestamp column. However, in my experience Postgres often ignores such an index if it doesn’t think it will be sufficiently selective. Indeed, I’ve seen Postgres ignore an index for a “count” query that could be satisfied just by reading the index!
To reliably reduce the number of rows scanned by date-range queries, you need to partition the table by the date/timestamp column. When Postgres processes a query that includes a literal
WHERE condition on the partitioned column, it looks only at the partitions that hold the specified value(s). Note that I highlighted literal: if you join on the date column, or select it via subquery, then Postgres doesn’t have enough information to restrict the partitions it reads.
To partition data in Postgres, you first define the base table and then define individual partitions of that table. Each partition must specify a non-overlapping range of values, and you can’t split a partition once it’s created. This means that partition maintenance becomes an ongoing task: if you partition your transaction table by month, then each month you will need to add a new partition to the table.
This is also a strength: if you only care about the last year’s worth of data, then you can drop partitions when they’re no longer relevant. This is much faster than deleting the individual rows from a consolidated table.
You can, additionally, define an index on the partition column. This may be useful if your partitions cover a relatively wide range (such as a month), while your queries cover a much narrower range (such as a day or range of hours). I will, however, admit to being mystified by the execution plans that I’ve seen when doing this: they appear to scan both the table and the index. That said, I have also seen a performance improvement in those queries with this “belt and suspenders” approach.
Populating Your Database
Getting your data into a Postgres database is the real challenge: unlike Redshift, you can’t use an external schema to read from S3, nor does it understand data formats such as Parquet. Here are some of the techniques that I’ve used, with pros and cons:
Glue job with Postgres connection
I want to like Glue. I really do. But in this case, Glue gives you about 30% of what you need.
What Glue gives you is an easy way to define a database connection, and to use that connection to write a
DynamicFrame to a pre-existing table. And it performs writes in parallel, potentially improving throughput.
What Glue doesn’t give you is any way to work with database tables other than reading and writing them. Such as creating those tables in the first place, or truncating them before writing data, or creating indexes. True, you can retrieve the connection details, so it is possible to have your driver script perform these operations. I’ve always turned to different tools.
As for parallel writes: if you have a lot of workers then you can easily overload your database, decreasing throughput.
And finally, to add insult to injury, Glue uses discrete
INSERT statements, rather than a bulk upload. If your table has many millions of rows, this will run up the DPU hours for your job (at $0.44 per worker per hour), or cause your job to timeout and not complete.
COPY is the old standby for getting data into a Postgres database (and, for that matter, into a Redshift database as well). With a self-hosted server, it can load data from files on the database filesystem. With any server, it can load files from a data stream managed by your program and supported by your database driver, or standard input when using the
psql command-line tool.
There are, however, several big drawbacks to COPY:
- It onlys accept delimited text or a Postgres-specific binary format. If your data is in Parquet, you’ll need to transform it into CSV or other delimited format to perform the update.
- Your data must be parsable into the column types defined by the table. This is particularly an issue with date/time columns.
- COPY fails on any error; unlike Redshift, it does not report individual rows that failed.
- You’ll need to write code to perform the COPY. This might be a Lambda, or a Glue script, or something else. On the positive side, this code can transform the raw data so that it avoids parsing errors.
COPY, RDS provides the
aws_s3 extension. As far as I can tell, this extension performs a
COPY under the covers, so all of the caveats above apply to it. However, it might offer higher performance, because the data is moving directly from S3 to RDS without an intermediary.
If you’re going to write code to handle your database updates (and you are, like it or not), then Lambda is often the best place to host that code. First, because deployment on Lambda is relatively easy, and doesn’t require an always-on server. And second, because Lambda can be triggered by external events, such as data arriving via Kinesis or an S3 file drop.
The primary drawback to Lambda is that a single invocation can only run for 15 minutes. Anything that runs longer will be killed by the execution environment, without warning. In many cases, this is not an issue, especially when triggering Lambda from a streaming source such as Kinesis, becuase it’s given a limited amount of data per invocation.
However, if you’re uploading a large amount of data, such as a file from S3, you might exceed the timeout. In this case you can still use your Lambda as a trigger, but invoke a longer-running task using the Elastic Container Service (ECS) or AWS Batch.
If your mainline database servers are also Postgres, then you can use logical replication to copy updates to your decision-support server. Logical replication differs from streaming replication, used to manage standby servers: streaming replication replicates an entire server, with all databases and tables. Logical replication, by comparison, lets you specify groups of tables as a single publication. You can define multiple publications in the same source server, each with its own set of tables.
On the subscriber side, you can receive updates from multiple publications, published by different source servers. This lets your decision support database combine data from multiple line-of-business systems.
There are several caveats to logical replication. A big one is that tables and columns are matched by name. So if you publish the tables with the same name from two different servers, they’ll both feed into the same subscriber table. If you’re capturing append-only log data, this is OK; if the two tables contain data with overlapping primary keys, you’re in for a world of hurt. And if they have different columns, they won’t replicate at all. You can work around this by using different databases on the server to receive data, then combine that data into the decision support database.
My bigger concern with logical replication is that there are a lot of places where it can be very fragile. For example, if your producer adds a column to a table, then replication fails until your subscriber(s) add the same column.
In the same vein as logical replication, you can use the AWS Database Migration Service to extract data from a variety of relational and non-relational database systems, as well as CSV files stored in S3, and write it to Postgres or other destinations. Along the way, you can perform a limited number of transformations, such as renaming tables or columns, or creating new columns based on expressions that involve existing columns.
DMS uses a replication instance to do the work. This is an always-on EC2 instance (or serverless equivalent) that runs tasks to retrieve and store data. These tasks can be “full load” tasks that retrieve an entire table, or “change data capture” tasks that replicate transactional updates.
To give you a sense of Postgres performance, I took three queries from my previous posts and ran them in both Postgres and Redshift. My assumption was that you’d pick Postgres to save money, so I went with an “entry level” configuration for both systems.
For Redshift, this was a two-node
dc2.large cluster. This has 4 virtual CPUs and 30 GB of RAM, with 320 GB of attached SSD storage. It will cost $0.50/hour ($365/month) in US regions. Although you can go with a single-node Redshift deployment, in my opinion that’s only appropriate for development; Redshift is about parallel execution, and that needs multiple nodes.
For Postgres, I went with a
db.m6g.xlarge instance type, with 200 GB of GP3 EBS storage. This instance type provides 4 virtual CPUs and 16 GB of RAM, and costs $0.318/hour on-demand ($256/month including storage). In the real world, you might go with a smaller, less expensive instance, but I wanted to be at least competitive in terms of performance.
Query times attempt to minimize the effect of disk cache by rebooting the RDS instance / Redshift cluster between queries.
Query #1: single-table aggregation
This is the sort of query that benefits tremendously from parallel execution, so clearly favors Redshift. Postgres does have the ability to perform parallel scans, with the default based on the number of virtual CPUs. You can manually increase the value via RDS Parameter Group, which I didn’t do. I tried adding a covering index to the query, but it didn’t change the execution plan: a table-scan followed by aggregation.
select productid, sum(quantity) as units_added from "public"."add_to_cart" group by productid order by units_added desc limit 10;
Query #2: single-table aggregation by date range
This query highlights the benefits of partitioning your data, as well as the possible benefit of an index.
As you can see from the results, using an index dramatically sped up the query time. However, this was highly dependent on date range. The specified range represented 5% of the total rows in the table; increasing the range to 7.5% caused Postgres to fallback to the same plan as the unindexed query. Partitioning was a more consistent way to improve performance.
select productid, sum(quantity) as units_added from "public"."add_to_cart" where "timestamp" between to_timestamp('2023-06-20 19:00:00', 'YYYY-MM-DD HH24:MI:SS') and to_timestamp('2023-06-20 21:00:00', 'YYYY-MM-DD HH24:MI:SS') group by productid order by units_added desc limit 10;
|Postgres, unindexed||6.923 seconds|
|Postgres, indexed||0.319 seconds|
|Postgres, partitioned||2.268 seconds|
|Postgres, partitioned and indexed||0.400 seconds|
Query #3: two-table join
This query highlights the limitations of a row-oriented database: it touches every row in the two joined tables, so must read every disk block. I experimented with various indexes, including one that covered all columns used by the query; Postgres chose a table-scan followed by sort-merge join every time.
select count(distinct user_id) as users_with_abandoned_carts from ( select atc.userid as user_id, max(atc."timestamp") as max_atc_timestamp, max(cc."timestamp") as max_cc_timestamp from "public"."add_to_cart" atc left join "public"."checkout_complete" cc on cc.userid = atc.userid group by atc.userid ) as sub where max_atc_timestamp > max_cc_timestamp or max_cc_timestamp is null;
I added a second timing column to the results, which shows the time for a second query (ie, the benefit of caching). For Redshift, I disabled the query cache before running the second query. Redshift’s time decreased significantly; Postgres less so, perhaps because it didn’t have enough RAM to cache all pages of the table.
|Postgres||1 minute 41.738 seconds||1 minute 24.619 seconds|
|Redshift||28.741 seconds||9.953 seconds|
If you’re just getting started with a data warehouse, then you can potentially save money using Postgres on one of the “burstable” instance types, versus a single-node Redshift cluster. However, you’ll give up features such as being able to read non-CSV data files. And if you’re working with a high-volume data sources, such as clickstream data, then there really is no alternative to a purpose-built “big data” system.