How Rockset Supports Kinesis Shard Autoscaling to Handle Varying Throughputs
March 4, 2022
Amazon Kinesis is a platform to ingest real-time events from IoT devices, POS systems, and applications, producing many kinds of events that need real-time analysis. Due to Rockset's ability to provide a highly scalable solution to perform real-time analytics of these events in sub-second latency without worrying about schema, many Rockset users choose Kinesis with Rockset. Plus, Rockset can intelligently scale with the capabilities of a Kinesis stream, providing a seamless high-throughput experience for our customers while optimizing cost.
Background on Amazon Kinesis
Image Source: https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html
A Kinesis stream is composed of shards, and each shard has a sequence of data records. A shard can be thought of as a data pipe, where the ordering of events is preserved. See Amazon Kinesis Data Streams Terminology and Concepts for more information.
Throughput and Latency
Throughput is a measure of the amount of data that is transferred between source and destination. A Kinesis stream with a single shard cannot scale beyond a certain limit because of the ordering guarantees provided by a shard. To manage high throughput requirements when there are multiple applications writing to a Kinesis stream, it makes sense to increase the number of shards configured for the stream so that different applications can write to different shards in parallel. Latency can also be reasoned similarly. A single shard accumulating events from multiple sources will increase end-to-end latency in delivering messages to the consumers.
Capacity Modes
At the time of creation of a Kinesis stream, there are two modes to configure shards/capacity mode:
- Provisioned capacity mode: In this mode, the number of Kinesis shards is user configured. Kinesis will create as many shards as specified by the user.
- On-demand capacity mode: In this mode, Kinesis responds to the incoming throughput to adjust the shard count.
With this as the background, let’s explore the implications.
Cost
AWS Kinesis charges customers by the shard hour. The greater the number of shards, the greater the cost. If the shard utilization is expected to be high with a certain number of shards, it makes sense to statically define the number of shards for a Kinesis stream. However, if the traffic pattern is more variable, it may be more cost-effective to let Kinesis scale shards based on throughput by configuring the Kinesis stream with on-demand capacity mode.
AWS Kinesis with Rockset
Shard Discovery and Ingestion
Before we explore ingesting data from Kinesis into Rockset, let’s recap what a Rockset collection is. A collection is a container of documents that is typically ingested from a source. Users can run analytical queries in SQL against this collection. A typical configuration consists of mapping a Kinesis stream to a Rockset Collection.
While configuring a Rockset collection for a Kinesis stream it is not required to specify the source of the shards that need to be ingested into the collection. The Rockset collection will automatically discover shards that are part of the stream and come up with a blueprint for generating ingestion jobs. Based on this blueprint, ingestion jobs are coordinated that read data from a Kinesis shard into the Rockset system. Within the Rockset system, ordering of events within each shard is preserved, while also taking advantage of parallelization potential for ingesting data across shards.
If the Kinesis shards are created statically, and just once during stream initialization, it is straightforward to create ingestion jobs for each shard and run these in parallel. These ingestion jobs can also be long-running, potentially for the lifetime of the stream, and would continually move data from the assigned shards to the Rockset collection. If however, shards can grow or shrink in number, in response to either throughput (as in the case of on-demand capacity mode) or user re-configuration (for example, resetting shard count for a stream configured in the provisioned capacity mode), managing ingestion is not as straightforward.
Shards That Wax and Wane
Resharding in Kinesis refers to an existing shard being split or two shards being merged into a single shard. When a Kinesis shard is split, it generates two child shards from a single parent shard. When two Kinesis shards are merged, it generates a single child shard that has two parents. In both these cases, the child shard maintains a back pointer or a reference to the parent shards. Using the LIST SHARDS API, we can infer these shards and the relationships.
Choosing a Data Structure
Let’s go a little below the surface into the world of engineering. Why can we not hold all shards in a flat list and start ingestion jobs for all of them in parallel? Remember what we said about shards maintaining events in order. This ordering guarantee must be honored across shard generations, too. In other words, we cannot process a child shard without processing its parent shard(s). The astute reader might already be thinking about a hierarchical data structure like a tree or a DAG (directed acyclic graph). Indeed, we choose a DAG as the data structure (only because in a tree you cannot have multiple parent nodes for a child node). Each node in our DAG refers to a shard. The blueprint we referred to earlier has assumed the form of a DAG.
Putting the Blueprint Into Action
Now we are ready to schedule ingestion jobs by referring to the DAG, aka blueprint. Traversing a DAG in an order that respects ordering is achieved via a common technique known as topological sorting. There is one caveat, however. Though a topological sorting results in an order that does not violate dependency relationships, we can optimize a little further. If a child shard has two parent shards, we cannot process the child shard until the parent shards are fully processed. But there is no dependency relationship between those two parent shards. So, to optimize processing throughput, we can schedule ingestion jobs for those two parent shards to run in parallel. This yields the following algorithm:
void schedule(Node current, Set<Node> output) {
if (processed(current)) {
return;
}
boolean flag = false;
for (Node parent: current.getParents()) {
if (!processed(parent)) {
flag = true;
schedule(parent, output);
}
}
if (!flag) {
output.add(current);
}
}
The above algorithm results in a set of shards that can be processed in parallel. As new shards get created on Kinesis or existing shards get merged, we periodically poll Kinesis for the latest shard information so we can modify our processing state and spawn new ingestion jobs, or wind down existing ingestion jobs as needed.
Keeping the House Manageable
At some point, the shards get deleted by the retention policy set on the stream. We can clean up the shard processing information we have cached accordingly so that we can keep our state management in check.
To Sum Up
We have seen how Kinesis uses the concept of shards to maintain event ordering and at the same time provide means to scale them out/in in response to throughput or user reconfiguration. We have also seen how Rockset responds to this practically in lockstep to keep up with the throughput requirements, providing our customers a seamless experience. By supporting on-demand capacity mode with Kinesis data streams, Rockset ingestion also allows our customers to benefit from any cost savings offered by this mode.
If you are interested in learning more or contributing to the discussion on this topic, please join the Rockset Community. Happy sharding!
Rockset is the real-time analytics database in the cloud for modern data teams. Get faster analytics on fresher data, at lower costs, by exploiting indexing over brute-force scanning.