Websockets feeding Kinesis

by
Tags: , ,
Category:

We recently explored a project to retrieve data from a third-party service. They didn’t offer any push capabilities such as writing to a Kafka or Kinesis stream, or even a web-hook. But they did offer a WebSocket interface, so we explored whether we could use that as our streaming source. We didn’t go that route, but I was intrigued by the idea enough to make a proof-of-concept.

What are WebSockets?

WebSockets appeared in 2009, to provide a way for servers to push information to web browsers. Before that time, browsers would have to poll their servers, typically using background AJAX requests from JavaScript. With WebSockets, the browser and server establish a long-lived connection and can send arbitrary messages back and forth on this connection at any time.

A WebSocket connection starts out as an HTTP GET request to the server, with an Upgrade header that requests the WebSocket connection. If the server supports WebSockets, it responds with a normal HTTP response (with status code 101), but expects all subsequent communication on the underlying TCP connection to follow the WebSockets protocol.

Typical use-cases include chat services (eg, Slack), collaborative documents (eg, Google Docs), and real-time dashboards. That last case was the one that applied to our client: the third-party server provided a WebSocket interface for telemetry data that would be displayed by a browser. But that data would also be useful for longer-term analytics queries, provided that it could be routed into a data lake.

Implementation: Python program running as an ECS service

The client program

While WebSockets originated as a browser technology, it’s just a protocol on top of a TCP socket; there are plenty of non-browser implementations. I chose the websockets library for Python for this example: it has a long history (going back to 2013), lots of users (based on the number of GitHub stars), and is actively maintained.

The actual program is quite small (the full example is here): it connects to the server, then enters a loop that reads messages and writes them to Kinesis:

with ws_client.connect(server_url) as websocket:
    logger.info(f"opened connection {websocket.id} to {websocket.remote_address}")
    for msg in websocket:
        logger.debug(msg)
        kinesis_client.put_record(StreamName=stream_name, PartitionKey=str(random.random()), Data=msg.encode('utf-8'))
    logger.info("connection closed by server")

As a proof-of-concept rather than production code, I made several simplifications. The first is that I use a synchronous WebSocket client, rather than the asynchronous client that is one of the key features of this library. Similarly, I use the Kinesis PutRecord API, which sends one record at a time, rather than PutRecords, which sends a batch but must handle failures of individual records within that batch.

These choices do introduce some limitations on performance, which I’ll describe below.

Partition Keys

You must provide a partition key when you write to Kinesis. In multi-shard streams, the partition key is used to select the shard that receives the record; all records with the same partition key go to the same shard.

Depending on your uses for the data, some partition keys may make more sense than others. For example, if you want to do streaming aggregation on the raw data, then you should pick the field you want to aggregate by (such as user ID).

If you’re just using Kinesis as a way to get data into a data lake, however, a random partition key is best since it distributes data to all of the shards equally. My example program uses Python’s random module to generate random numbers, which is simple and sufficient.

Deployment

Since a WebSocket connection is long-lived, AWS Lambda is the wrong choice of platform (in a departure from my usual posts). Instead, I implemented the client as a containerized Python program, deployed to an ECS (Elastic Container Service) cluster, with an ECS Service resource to restart it on any failure.

One characteristic of deploying to ECS is that you need a way to talk to the Internet, both to communicate once the task has been launched, and also when launching the task. The standard solution is to deploy into private subnets, with a NAT Gateway to provide access to the Internet. However, depending on how much data you move through this connection, that could be quite expensive: you’ll pay data transfer both to read records from the server and to write them to Kinesis.

I think that a better approach is to deploy your container into a public subnet, with a public IP address. You’ll pay $0.005/hour for each IP address, but you won’t pay data transfer charges. At least, that’s what I’ve gleaned from the Byzantine pricing rules, but I haven’t looked at my bill to confirm.

Potential issues and how to work-around

Any time that you have one process talking to another, you run the risk that they will stop communicating – or worse, miscommunicate. In this section I look at some of the more common issues, and ways to work around them.

Client can’t keep up with server

This is, in my opinion, the most common issue. It’s why we didn’t use WebSockets for that client: they were dealing with extremely high-volume data.

My example client can theoretically write 1,000 messages per second to Kinesis. But each call to PutRecord takes 20-30 milliseconds, so the actual limit is around 50 records per second.

It’s important to remember that this is a steady-state limit: if your source sends a baseline of 30 records per second, but bursts to 60 or even 100, then the simple client is fine: There are enough network buffers along the way to absorb bursts. But sooner or later those buffers will be exhausted as well, and the server will abort the connection.

If your volume is high enough, you’ll need to switch to the batch-oriented PutRecords API call. As I mentioned above, this API can return failure notices for individual records, which you must attempt to send with the next batch. This also introduces the possibility that records will be accepted by Kinesis in a different order than they were written to the WebSocket, so you can’t rely on the order of records within a Kinesis shard.

Connection loss

TCP connections do not live forever, and they fail for a variety of reasons. Perhaps the data provider regularly cycles their servers to apply patches (you hope they do!); perhaps they have a time limit on how long the connection remains open, or perhaps your corporate firewall does; perhaps there are long periods without any data, and the TCP connection times out (or, my favorite, a NAT removes the connection from its internal tables). The list goes on, and your client has to be prepared for it.

And how your client prepares for connection loss depends on what options the server provides to reestablish that connection. With luck, it will accept a transaction ID or timestamp when establishing the connection. In this case, your client must store the most recent value that it’s processed, so that it can tell the server where to restart.

If the server doesn’t provide this capability – if it’s just sending a raw stream of events – then you’ll have to use redundant clients, so that if one fails, the others will still be reading data. In such an environment, you should plan to cycle your clients based on the expected lifetime of a connection. For example, if you know that you can’t expect more than an hour out of one connection, start three clients that are offset by 20 minutes. This, of course, increases the load on the server, and may increase the cost of your data.

Duplicated data

If you have concurrent clients, you will have duplicated data. You might have duplicated data even without concurrent clients. To properly process that data, you must identify the key field(s), and use post-processing to eliminate the duplicates.

Missing data

The biggest risk with extracting data via WebSocket is that the data provider probably intended it to feed a real-time dashboard. So they might not provide all of the actual data. Or they might provide summary records. Before using a WebSocket interface as your source of record, you need to do due diligence to determine whether it is, in fact, a source of record.

Conclusion

While the source pushes messages into a WebSocket, the overall pipeline behaves more like a “pull” system in that most of the failure modes are client-based. This drives pipeline designs to be resilient, and likely redundant, especially if there’s no way to recover messages once lost. And monitoring, important for any data pipeline, becomes critical here, if only to alert you that your data is not complete.