How We Use Rockset's Real-Time Analytics to Debug Distributed Systems
December 21, 2021
Jonathan Kula was a software engineering intern at Rockset in 2021. He is currently studying computer science and education at Stanford University, with a particular focus on systems engineering.
Rockset takes in, or ingests, many terabytes of data a day on average. To process this volume of data, we at Rockset distribute our ingest framework across many different units of computation, some to coordinate (coordinators) and some to actually download and ready your data for indexing in Rockset (workers).
Running a distributed system like this, of course, comes with its fair share of challenges. One such challenge is backtracing when something goes wrong. We have a pipeline that moves data forward from your sources to your collections in Rockset, but if something breaks within this pipeline, we need to make sure that we know where and how it broke.
The process of debugging such an issue used to be slow and painful, involving searching through the logs of each individual worker process. When we found a stack trace, we needed to ensure it belonged to the task we were interested in, and we didn’t have a natural way to sort through and filter by account, collection and other features of the task. From there, we would have to conduct additional searching to find which coordinator handed out the task, and so on.
This was an area we needed to improve on. We needed to be able to quickly filter and discover which worker process was working on which tasks, both currently and historically, so that we could debug and resolve ingest issues quickly and efficiently.
We needed to answer two questions: one, how do we get live information from our highly distributed system, and two, how do we get historical information about what has occurred within our system in the past, even once our system has finished processing a given task?
Our custom-built ingest coordination system assigns sources — associated with collections — to individual coordinators. These coordinators store data about how much of a source has been ingested, and about a given task’s current status in memory. For example, if your data is hosted in S3, the coordinator would keep track of information like which keys have been fully ingested into Rockset, which are in process and which keys we still need to ingest. This data is used to create small tasks that our army of worker processes can take on. To ensure that we don’t lose our place if our coordinators crash or die, we frequently write checkpoint data to S3 that coordinators can pick up and re-use when they restart. However, this checkpoint data doesn't give information about currently running tasks. rather, it just gives a new coordinator a starting point when it comes back online. We needed to expose the in-memory data structures somehow, and how better than through good ol’ HTTP? We already expose an HTTP health endpoint on all our coordinators so we can quickly know if they die and can confirm that new coordinators have spun up. We reused this existing framework to service requests to our coordinators on their own private network that expose currently running ingest tasks, and allow our engineers to filter by account, collection and source.
However, we don’t keep track of tasks forever; once they complete, we note the work that task accomplished and record that into our checkpoint data, and then discard all the details we no longer need. These are details that, however unnecessary to our normal operation, would be invaluable when debugging ingest problems we notice later. We need a way to retain these details without relying on keeping them in memory (as we don’t want to run out of memory), keeps costs low, and offers an easy way to query and filter data (even with the enormous number of tasks we create). S3 is a natural choice for storing this information durably and cheaply, but it doesn’t offer an easy way to query or filter that data, and doing so manually is slow. Now, if only there was a product that could take in new data from S3 in real time, and make it instantly available and queriable. Hmmm.
Ah ha! Rockset!
We ingest our own logs back into Rockset, which turns them into queriable objects using Smart Schema. We use this to find logs and details we otherwise discard, in real-time. In fact, Rockset’s ingest times for our own logs are fast enough that we often search through Rockset to find these events rather than spend time querying the aforementioned HTTP endpoints on our coordinators.
Of course, this requires that ingest be working correctly — perhaps a problem if we’re debugging ingest problems. So, in addition to this we built a tool that can pull the logs from S3 directly as a fallback if we need it.
This problem was only solvable because Rockset already solves so many of the hard problems we otherwise would have run into, and allows us to solve it elegantly. To reiterate in simple terms, all we had to do was push some key data to S3 to be able to powerfully and quickly query information about our entire, hugely-distributed ingest system — hundreds of thousands of records, queryable in a matter of milliseconds. No need to bother with database schemas or connection limits, transactions or failed inserts, additional recording endpoints or slow databases, race conditions or version mismatching. Something as simple as pushing data into S3 and setting up a collection in Rockset has unlocked for our engineering team the power to debug an entire distributed system with data going as far back as they would find useful.
This power isn’t something we keep for just our own engineering team. It can be yours too!
“Something is elegant if it is two things at once: unusually simple and surprisingly powerful.” — Matthew E. May, business author, interviewed by blogger and VC Guy Kawasaki