Rockset real-time analytics software architecture with co-founder and CTO Dhruba Borthakur

Hear Rockset's CTO and co-founder, Dhruba Borthakur, talk through Rockset's architecture. In this video, Dhruba reviews how data is ingested, stored, and queried in Rockset and why it is simple to use, incredibly fast, and infinitely scalable.

Show Notes

Dhruba Borthakur:

Hi, my name is Dhruba. I'm the co-founder and CTO at Rockset. Today, I'm planning to do a whiteboarding session to describe the internal architecture of Rockset. I'm going to cover the overview of the architecture. I'm going to talk a lot about how data is indexed, how data is stored, how data is served in queries. And I'll also do some follow up whiteboarding sessions after this. Those will cover specifics about query engine, fault-tolerance, how data connectors scale up and scale down and other things. But today I'm going to talk about, mostly about the overall architecture of our system. So what is Rockset? So Rockset is a real time analytics indexing database that lets you do a lot of analytical queries, which queries are fast and low latency. And also you can scale up and scale down, enter convenience. So the data in this world, or today's world mostly comes from different sources.

We all know that data might be coming in from an event stream or it might be coming in from, let's say you already accumulated a lot of data from your databases, like the said transactional databases via CDC or some other formats or data could also be coming in from bulk storage, like S3 or Google Cloud storage and other historical data storages in the world. And all this data usually comes in, in some kind of a semi-structured format. And the goal of Rockset is that, how can you take these data and create a system that you can easily query and find insights or intelligence from this data? And how can you take action on this data? So on this right side of our platform is like essentially a SQL based API that you can use to query this data.

So data is coming in from these sources, semi-structured and you can use SQL to query it. The goal of this architecture is that it has to be fast. So the queries have to be fast and should be able to give you the best price performance for your speed. It has to scale to large sizes of data, hundreds of terabytes or more. And then it also has to be simple to use. So when we are looking at this architecture, please evaluate on how the architecture is doing, or I can also explain how this architecture does to take care of these three things in a very broad perspective. So, let me draw this picture. So the part that I'm going to fill up today is essentially this part, from here to there. This is what Rockset is made of.

So, what happens when data comes in from these data sources? So all this data gets into a piece of code called the tailers. So the tailers are essentially tailering data. You could also write data using the right API. So you can just, like a database has a right call or an update and sort call. You can also write data to a tailer from your application. But if you already have data in these streams, you can actually set up the tailer so that you can ingest all these data. The tailers read these data, and then they write it in a distributed log. So a distributed log. So they're writing all this data, that's coming in into the distributed log. So why is the distributed log important here? It's important because it's making your data durable.

It is something that can tolerate lots of heavy rights. And so you can essentially write in, let's say, 100 megas a second or 500 megas a second of data coming in and you can persist them in the distributed log. And as soon as it is persisted, you return success to the application because now you know that you'll never lose this data. It is stored in a good way in a distributed log.

And then the second part of this architecture is that, how to make this queryable? So, I'm going to draw this in red. So there are these leaf processes. So these are called the leaf. So these leaf processes take data from the distributor log, index it. So this is what is used for indexing. Index it and then make it in such a format that's easy to query. So the data in the distributed log is stored in such a way that's easy to persist and quick and streaming, but the data in the leaf is indexed so that you can actually optimize for low latency queries. So this is where the data is stored and indexed. And then you have this other set of, or let's call the aggregators and then other aggregators. So this is how the queries come in from the application and the aggregators get data from the leaf and return your results for the query.

So you would see that there's tailers, there's leafs and there's aggregators. This is the reason why it's popularly called the ALT architecture. Again, Aggregator, Leaf, Tailer architecture. This is a new architecture that we developed when we were building a lot of large scale applications, data applications at Facebook. The uniqueness of this architecture is that it's designed for real time. And I'm going to explain how this architecture is different from previous generations, Lambda architectures or streaming architectures that you might be familiar with. So you would see that there are three different tiers. So tailers are the ones that are used for ingesting data and making them queryable. The leafs are there to store this data so that it can be queried fast. So it's a storage tier, and then there's the aggregators tiers that are used for query. So serving queries. So if you have a lot of rights, so in real time systems, usually there's a lot of rights coming in or bursty traffic. So to handle bursty traffic, the tailers have to scale up and down.

Nothing needs to change on leafs and aggregators. So this is a great way for you to spin up more compute so that you can handle bursty traffic. If your total data size increases, then you just grow the number of leafs. You don't have to grow the tailers or the aggregator tier. And if your query volume increases or your query complexity increases, then you spin up more aggregators and you don't have to spin up these two anymore. So this completely disaggregated architecture, and the bursty right happens here, and it doesn't impact your queries. Or your bursty queries are upticking queries, doesn't impact your speed at which data is coming in. So, what we do is that we separate out the storage from the query compute and the ingest compute. So this is important for real time systems, because you have to have disaggregation between ingest compute and query compute as well.

So if you're using a data system currently, which is not Rockset, you can ask this question to your data system saying, is my data system separating ingest compute from query compute, if I want to do real time analytics? Because that's the best way to make sure that queries don't affect ingestion or ingestion don't affect queries. So at the very high level, now I'm going to deep dive a little bit more on the leaf. This is where data is stored. So, this data, it is stored in a format which is, the storage engine that we use is RocksDB. So what is RocksDB? So RocksDB is an open source storage engine. It's built way back in 2013, I think. When we were part of the Facebook team. So it has been in existence for the last almost nine or 10 years.

In the very early days, it was used to power a lot of real time data applications at Facebook, including spam detection or immediate real time ad placement, these kind of use cases. But over time RocksDB has become very popular among many other systems like Flink and other data systems as well. For us, we use it to index this data that is coming in from our data sources. So how does the index work and why is it different? So, what we use RocksDB for is, we use it to build something called a conversed index. So what is a conversed index? So a conversed index is essentially a way to take these semi-structured JSON and build multiple indexes on this data. So, the main popular indexes that we build is a column index, an inverted index, and I'll explain why we need to build this, and a row index.

And we also build some secondary indexes, some other types of indexes, which are like a type index and a range index. So why do we need to build all these types of indexes? The reason we build this is because we want to make this system simple for you to use, which is why we build all these indexes by default on the entire data set. So this comes back to our simplicity thing. It's not just about speed and scale. It's also a lot of simplicity thing that we have built in Rockset. So you don't need to create indexes by giving a command, everything is indexed. So it's column indexed, just like in warehouse in like Redshift or Snowflake or something else. So those queries that run fast on those systems, also run equally fast on Rockset if we're using column index.

If we're using an inverted index like Elastic or Lucene, queries which are high selective queries, where basically results that is a smaller data set, then you can use the inverted index. Row index, essentially like Mongo or Postgre or DocumentDB, where given a primary queue, you can find all the fields of the record. Type index, every field in the incoming data set is also, the type is also indexed. So you can make queries like, find me all records where the field called age is a string instead of an integer. So the system under remembers this. And then there's a range index for numeric types where you can do fast range queries on the data. So this is what I mean by conversed index. And the reason that we build a conversed index is because this is one of the important source to make real time analytics happen.

So what happens is that the application where it's making SQL queries, it doesn't have to tell us which index it is going to use. The system automatically determines which index to use. If it's a large aggregation, obviously it'll use a column scan and figure out how to use that. But if it's a very highly selective, it'll automatically decide to use inverted index. Again, I'm explaining this, not because you have to understand it and use it in your queries, because this is how the system automatically does it under the covers.

Now, how do you make this data durable? And how do you do elasticity on this. Because it's a cloud native system. So how does this architecture help in elasticity, and the scale of data that you have? So what happens is that here, we actually don't use RocksDB, but we do use something called RocksDB Cloud. So this is an open source project as well. What it does, it has the same API as RocksDB. It has the same performance guarantees as RocksDB. But what it also does is that it persists your data on cloud storage. So I'll explain how. So let us say, so we are talking about the leaf here. Let's say, this is the RocksDB instance on where you are storing this data, the leaf is storing this data. Now when it stores data in the RocksDB instance, it also uploads the SST files in S3.

So the SST files also get uploaded to S3. So RocksDB has a set of SST files where the data actually resides and the RocksDB Cloud also persists the data in S3, those SST files on cloud storage. On AWS it's S3 cloud storage. So what happens is that, let's assume that this machine dies. We don't lose this data because this data is persisted in S3. So this is another beauty of running a cloud native solution is that, you don't need replication of the same data for it to be durable. If you're using, let's say Apache Druid or you are using like Hadoop or Hive or some other systems that you have installed yourself, or even Elastic, you'd have to replicate your data multiple times for durability. You make three copies and the reason you make three copies is so that if one machine dies, you don't lose your data.

Whereas in Rockset, you don't need to make multiple copies for durability. Durability is handled by S3. So you can have one copy of the data, store it in S3 and so that reduces the total price perform or makes price performance far more, better. You can still have replicas if you want, but that's basically for performance reasons, not for durability reasons. So this is one unique part of the architecture. It's very cloud native architecture. So how do you scale this up? The way you scale this up is that the data is organized here in the form of micro shards. Micro shards. So there is no big shards that are out there, but there are thousands of micro shards in every table or every collection. And let's say there's a collection which has a thousand micro shards, and let's say there are four leafs in this collection.

So each one of them might get 250 micro shards to serve. So if you get 250 micro shards to serve and a query comes in, the aggregator contacts this leafs, does the processing, and then there's more aggregations to be done. They do the aggregation and return query to the leafs. So if you now need to add more CPU to the system, let's say your query volume is increasing, or you need to scale up your system more. You don't need to do anything other than saying that, instead of four leafs, I want to use eight leafs. Rockset has this concept of virtual instances. That's a product concept, but the internal things, we talk about the leafs. So you can add eight more leafs to the system, and then you can see how this will work. I'm going to remove this for a second.

So what will happen is that, let's say you added four more of these leaf nodes here, and these 250 micro shards that you have in each of these four leafs, they get redistributed among these other four machines that are just provisioned. By the way, everything is provisioned using a Kubernetes cluster, so I'm talking about Kubernetes pods. So, that's one way how we scale up. Of course this is a stateful system, so there is a lot of hidden things that we do, or custom things that we do. We have custom metrics controllers and other ways of scaling Kubernetes pods, because these are a stateful service, but I'm handwaving there for this discussion. And then there's 250 micro shards that we have get split up here as well. And then now queries, here automatically know that instead of talking to those four leaf nodes, now I need to talk to these additional ones as part of the query. So now just see how simple it is for user to scale up or scale down based on your resources.

So again, this is mostly for scaling. How can you actually scale to large volumes of data? The other type of thing that is needed for real time analytics is speed. How can you make queries fast? Because your real time, analytical queries might be somebody like say, a real time personalization for users who are browsing your website. They're looking at objects in your website and you need to give them personalized recommendations. So, the architecture of Rockset here is built, is optimized. Let me write it here. So this part is optimized for low latency. And I'll explain why that is so. So what happens is that you already see that when a query comes in, unlike traditional data systems, which are partitioned, here this is very much like a search engine.

Think about like Google Search. When you send a query to Google search, it has to look through maybe few billion records and give you results within a few microseconds. How does it do that? The way it does it is because its architecture is a search engine. So this part is what is popularly called as Scatter Gather query architecture. Which basically means that when a query comes in, the aggregators need to fan out to the leaf nodes to be able to run parts of the query, because every leaf node might have certain amount of data. So when a query comes in, let's say a simple query is coming in. It has to go through all these eight leaf nodes to be able to do their portion of the query on each of these leaf nodes and return your results.

So the good part of this or the part that is interesting for speed is that, let's say each leaf node has eight CPUs. And there are a thousand leaf nodes here. You can use thousands of CPUs to serve one single query. So this is very important for analytics because analytical query sometimes need compute and you need to paralyze the compute for a single query. I'm not talking about two different queries able to use all the CPU in your cluster. So if you're using again, another analytical system, existing analytical system, that's not Rockset, you can ask this question to that software saying that, how much compute can single query use in your system? In this case, the single query, not separate queries that are coming in parallel. I'm talking about a single query can leverage all the CPU on the leaf nodes.

So that could be hundreds and thousands of CPU serving one. And if it's a high computation analytical query, because you can paralyze it hundred ways or thousand ways, the total latency of the system is low. So how is data distributed among these leaf nodes? So before talking about Scatter Gather, or maybe I could have explained this one better. So all these documents that are coming in, all these documents have a thing called underscore ID. It's kind of the primary key. It is ID which determines or shows, its unique identifier for the document. And the reason there is a primary key for this document is because the system is again, built for mutability. So for realtime analytics, you cannot afford to have a long pipeline or ETL that transforms data and duplicates data, and then updates it because those ETL processes add data latency to your data system.

So this is why Rockset is built for mutability. Every document in Rockset is mutable, document is identified by an underscore ID. And if a new right comes for the same underscore ID, you can override the document or you can override parts of the document and it is designed to be efficient. And I'll explain why. So these underscore ID is hashed to one of these leaf nodes. Let's say document is coming in, based on underscore ID it might be on this node. Different document comes in with a different underscore ID. It might be part of this node. So all the data is spread on all the leaf nodes.

So this is why rights are very fast because all the leaf nodes can participate in all the data that's coming in. It's not like you don't have, or you have less chance of having hot spots, like traditional databases and also the queries, they can leverage all the CPU because when you're making a query, it needs to look at some documents here, some documents here and all this. So again, this is how the system is optimized for low latency queries. And the focus is how can you run complex SQL queries in a few milliseconds or a few seconds?

So the underscore idea, I'm talking about mutability for a second. So the mutability happens this way. So let's say there is a document here with an age and a zip code. So I'm just talking about three fields in this document. What will happen is that inside the leaf, it is stored in the RocksDB database. All these leafs are storing it in the RocksDB database and RocksDB is a key value store. So the only thing that is there are keys and values. That's what RocksDB understands. So let's say the underscore ID is stored here, and the value is stored here. Age value is stored here. I'm simplifying it. I will explain conversed index in a separate whiteboarding session later. And then there's a zip code here that is stored here. So now when somebody says, I'm sending you the same underscore ID with a different age, it's going to go update only that specifically in RocksDB, it doesn't need to rewrite the entire document.

You see what I'm saying? So this is why, because we separate out different fields, and this actually applies to nested objects. So nested objects and arrays. Every element of the array is indexed like this. So you can update one specific component of the area without having to rewrite a document. Now take, for example, let's say your documents are 5 kilobytes in size. And once a while you need to update one part of the nested object, many other systems, they're going to read the entire document and update small parts of it and write it back. So that leads to high array amplification or in Rockset it's just that key that will get updated, that you're updating it. So it's very tuned for mutability and mutability is needed for realtime analytics. Otherwise, your data latency gets delayed. So what we measure in Rockset is the data latency.

So data latency is the time when the data is written at the time when it is made queryable. Our goal is to make you say one to two seconds, average latency, which means that as soon as you write a data, it'll be queryable within a second. And then we also focus a lot on the query latency, which is essentially the time when you issue a query and when the results are sent out to the application. So we focus a lot on these two things. So because again, for real time, both of these things are important. If we're just talking about a non-real time system, maybe like a warehouse, for example. You might be just focused on query latency because you want your queries to be fast. But if you're talking about real time, we focus a lot on the data latencies so that there are less of pipelining or less of ETLs when we use the system for real time analytics.

The last thing that I thought I'll mention are something called Rollups. So again, I will talk about this query engine or deep dive in a query engine later. But the last thing that I thought I'll mention today is something called Rollups.

So, what are these Rollups? And why are these useful? So sometimes you want this architecture to allow you, as a user, to be flexible on whether it's best for you to use the compute when you are querying or whether it is best for you to use some compute when data is coming into the system. Let's say, this record again. Let's say the only query you want to make on these type of records is to count what is the average age of this person who is being recorded here? So if the only thing that you want is to compute the average age, you don't need to store every individual record in your database. You can actually store an aggregate of this in your database, which your queries can just fetch at query time. But how is it implemented in Rockset?

And I'll explain a little bit about this average age. So let us say, somebody wants to calculate the average age of age. So the only thing that they need to do for Rockset is that the specified using a SQL Rollup query. So it's all the Rollups are expressed again in the form of SQL. It's like a standing query that happens when the data comes in. And the average, so the system automatically decides how to take the SQL query and split it into two parts internally. So the two parts, one is called the ingest query. Again, this is not something that you have to do yourself. I'm talking about under the covers, this is what happens. And this is all the part is, the wrap query.

So the ingest system creates an ingest query. The ingest query runs when data is being deposited by tailers, in this case, because it's an average, the two things that are needed to be stored in the database is a sum and count. So Rockset system is intelligent enough that it takes these data coming in and storing the sums and the total count of the objects that are coming in. And the wrap query in this case are things that happens when the query happens and when the query happens, it'll use the wrap query. The wrap query takes the sum and divides it by the count and returns you the average. So see how it gives you flexibility for doing realtime analytics. Flexibility is that the summing and the counting is the CPU needed for summing and counting is happening when data is coming in.

And then the actual calculation of the average is happening when queries are coming in. So depending on your query volume and the complexity of the query, you can afford to set up some Rollups and then the remaining part gets done as part of the query. The system handles exactly once because in Rollups, it's super important for us to make sure that data is handled exactly once. Take, for example, data is coming in from an event stream, let's say this is a Kafka event stream. So every record that comes in has the Kafka offset from where we picked up the data. And that's used to uniquely identify all the way through the leaf that even if the same record comes in twice, because our system has recovery options and other things, the leaf knows that it should handle that exactly once and not redo that work that came in from the record.

There are many other things that we can talk about in more detail, especially related to say recovery. What happens when the leafs die? Or how do you recover when the tailers die? We can also talk a lot, deep dive into here, and these are kind of the data connectors and how we have architected them to scale up and scale down, and also how they can recover from failures. So those are things that I will cover in some future whiteboarding sessions. I hope you'll like this session, and I'm sure there'll be far more questions in your mind. So please, to ask me questions, please go to and send in your questions. And I'll try to answer many of them personally, if I can. Thank you. Thanks a lot. It's great fun having you here. Bye.

Recommended Videos