Distributed Aggregation Queries - A Rockset Intern Story

I first met with the Rockset team when they were just four people in a small office in San Francisco. I was taken aback by their experience and friendliness, but most importantly, their willingness to spend a lot of time mentoring me. I knew very little about Rockset's technologies and didn’t know what to expect from such an agile early-stage startup, but decided to join the team for a summer internship anyway.

I Was Rockset's First Ever Intern

Since I didn’t have much experience with software engineering, I was interested in touching as many different pieces as I could to get a feel for what I might be interested in. The team was very accommodating of this—since I was the first and only intern, I had a lot of freedom to explore different areas of the Rockset stack. I spent a week working on the Python client, a week working on the Java ingestion code, and a week working on the C++ SQL backend.

There is always a lot of work to be done at a startup, so I had the opportunity to work on whatever was needed and interesting to me. I decided to delve into the SQL backend, and started working on the query compiler and execution system. A lot of the work I did over the summer ended up being focused on aggregation queries, and in this blog post I will dive deeper into how aggregation queries are executed in Rockset. We’ll first talk about serial execution of simple and complex aggregation queries, and then explore ways to distribute the workload to improve time and space efficiency.

Serial Execution of Aggregation Queries

Let’s say we have a table ratings, where each row consists of a user, a restaurant, an entree and that user’s rating of that entree at that restaurant.

image4

The aggregation query select restaurant, avg(rating) from ratings group by restaurant computes the average rating of each restaurant. (See here for more info on the GROUP BY notation.)

image3

A straightforward way to execute this computation would be to traverse the rows in the table and build a hash map from restaurant to a (sum, count) pair, representing the sum and count of all the ratings seen so far. Then, we can traverse each entry of the map and add (restaurant, sum/count) to the set of returned results. Indeed, for simple and low-memory aggregations, this single computation stage suffices. However, with more complex queries, we’ll need multiple computation stages.

Suppose we wanted to compute not just the average rating of each restaurant, but also the breakdown of that average rating by entree. The SQL query for that would be select restaurant, entree, avg(rating) from ratings group by rollup(restaurant, entree). (See our docs and this tutorial for more info on the ROLLUP notation).

image1

Executing this query is very similar to executing the previous one, except now we have to construct the key(s) for the hash map differently. The example query has three distinct groupings: (), (restaurant) and (restaurant, entree). For each row in the table, we create three hash keys, one for each grouping. A hash key is generated by hashing together an identifier for which grouping it corresponds to and the values of the columns in the grouping. We now have two computation stages: first, computing the hash keys, and second, using the hash keys to build a hash map that keeps track of the running sum and count (similar to the first query). Going forward, we’ll call them the hashing and aggregation stages, respectively.

So far, we’ve made the assumption that the whole table is stored on the same machine and all computation is done on the same machine. However, Rockset uses a distributed design where data is partitioned and stored on multiple leaf nodes and queries are executed on multiple aggregator nodes.

Reducing Query Latency Using Partial Aggregations in Rockset

Let’s say there are three leaf machines (L1, L2, L3) and three aggregators (A1, A2, A3). (See this blog post for details on the Aggregator Leaf Tailer architecture.) The straightforward solution would be to have all three leaves send their data to a single aggregator, say A1, and have A1 execute the hashing and aggregation stages. Note that we can reduce the computation time by having the leaves run the hashing stages in parallel and send the results to the aggregator, which will then only have to run the aggregation stage.

We can further reduce the computation time by having each leaf node run a “partial” aggregation stage on the data it has and send that result to the aggregator, which can then finish the aggregation stage. In concrete terms, if a single leaf contains multiple rows with the same hash key, it doesn’t need to send all of them to an aggregator—it can compute the sum and count of those rows and only send that. In our example, if the rows corresponding to users 4 and 8 are both stored on the same leaf, that leaf doesn’t need to send both rows to the aggregator. This decreases the serialization and communication load and parallelizes some of the aggregation computation.

partial aggregations

A crude analysis tells us that for sufficiently large datasets, this will usually decrease the computation time, but it’s easy to see that partial aggregations improve some queries more than others. The performance of the query select count(*) from ratings will drastically improve, since instead of sending all the rows to the aggregator and counting them there, each leaf will count the number of rows it has and the aggregator will only need to sum them up. The crux of the query is run in parallel and the serialization load is drastically decreased. On the contrary, the performance of the query select user, avg(rating) group by user won’t improve at all (it will actually get worse due to overhead), since the users are all distinct so the partial aggregation stages won’t actually accomplish anything.

Reducing Memory Requirements Using Distributed Aggregations in Rockset

We’ve talked about reducing the execution time, but what about the memory usage? Aggregation queries are especially space-intensive, because the aggregation stage cannot run in a streaming fashion. It must see all the input data before being able to finalize any output row, and therefore must store the entire hash map (which takes as much space as the whole output) until the end. If the output is too large to be stored on a single machine, the machine will run out of memory and crash. Partial aggregations don’t help with this problem, however, running the aggregation stage in a distributed fashion does. Namely, we can run the aggregation stage on multiple aggregators concurrently, and distribute the data in a consistent manner.

distributed aggregation

To decide which aggregator to send a row of data to, the leaves could simply take the hash key modulo the number of available aggregators. Each aggregator would then execute the aggregation stage on the data it receives, and then we can merge the result from each aggregator to get the final result. This way, the hash map is distributed over all three aggregators, so we can compute aggregations that are thrice as large. The more machines we have, the larger the aggregation we can compute.

My Rockset Internship - A Great Opportunity to Experience Startup Life

Interning at Rockset gave me the opportunity to design and implement a lot of the features we’ve talked about, and to learn (at a high level) how a SQL compiler and execution system is designed. With the mentorship of the Rockset team, I was able to push these features into production within a week of implementing them, and see how quickly and effectively aggregation queries ran.

Beyond the technical aspects, it was very interesting to see how an agile, early-stage startup like Rockset functions on a day-to-day and month-to-month basis. For someone like me who’d never been at such a small startup before, the experience taught me a lot of intangible skills that I’m sure will be incredibly useful wherever I end up. The size of the startup made for an open and collegial atmosphere, which allowed me to gain experiences beyond a traditional software engineering role. For instance, since the engineers at Rockset are also the ones in charge of customer service, I could listen in on any of those conversations and be included in discussions about how to more effectively serve customers. I was also exposed to a lot of the broader company strategy, so I could learn about how startups like Rockset plan and execute longer-term growth goals.

For someone who loves food like I do, there’s no shortage of options in San Mateo. Rockset caters lunch from a different local restaurant each day, and once a week the whole team goes out for lunch together. The office is just a ten minute walk from the Caltrain station, which makes commuting to the office much easier. In addition to a bunch of fun people to work with, when I was at Rockset we had off-sites every month (my favorite was archery).

IMG 0465

If you’re interested in challenges similar to the ones discussed in this blog post, I hope you’ll consider applying to join the team at Rockset!

Real-time SQL on NoSQL

Related Posts

How We Reduced DynamoDB Costs by Using DynamoDB Streams and Scans More Efficiently

Get an inside look at the some of the techniques we used to reduce the cost of ingesting data from DynamoDB.

The Kafka Connect Plugin for Rockset and How It Works

Get an in-depth look at the Kafka Connect Plugin for Rockset and the process to get it listed in Confluent Hub.

Optimizing Bulk Load in RocksDB

What’s the fastest we can load data into RocksDB?