Handling Out-of-Order Data in Real-Time Analytics Applications
April 15, 2022
This is the second post in a series by Rockset's CTO Dhruba Borthakur on Designing the Next Generation of Data Systems for Real-Time Analytics. We'll be publishing more posts in the series in the near future, so subscribe to our blog so you don't miss them!
Posts published so far in the series:
- Why Mutability Is Essential for Real-Time Data Analytics
- Handling Out-of-Order Data in Real-Time Analytics Applications
- Handling Bursty Traffic in Real-Time Analytics Applications
- SQL and Complex Queries Are Needed for Real-Time Analytics
- Why Real-Time Analytics Requires Both the Flexibility of NoSQL and Strict Schemas of SQL Systems
Companies everywhere have upgraded, or are currently upgrading, to a modern data stack, deploying a cloud native event-streaming platform to capture a variety of real-time data sources.
So why are their analytics still crawling through in batches instead of real time?
It’s probably because their analytics database lacks the features necessary to deliver data-driven decisions accurately in real time. Mutability is the most important capability, but close behind, and intertwined, is the ability to handle out-of-order data.
Out-of-order data are time-stamped events that for a number of reasons arrive after the initial data stream has been ingested by the receiving database or data warehouse.
In this blog post, I’ll explain why mutability is a must-have for handling out-of-order data, the three reasons why out-of-order data has become such an issue today and how a modern mutable real-time analytics database handles out-of-order events efficiently, accurately and reliably.
The Challenge of Out-of-Order Data
Streaming data has been around since the early 1990s under many names — event streaming, event processing, event stream processing (ESP), etc. Machine sensor readings, stock prices and other time-ordered data are gathered and transmitted to databases or data warehouses, which physically store them in time-series order for fast retrieval or analysis. In other words, events that are close in time are written to adjacent disk clusters or partitions.
Ever since there has been streaming data, there has been out-of-order data. The sensor transmitting the real-time location of a delivery truck could go offline because of a dead battery or the truck traveling out of wireless network range. A web clickstream could be interrupted if the website or event publisher crashes or has internet problems. That clickstream data would need to be re-sent or backfilled, potentially after the ingesting database has already stored it.
Transmitting out-of-order data is not the issue. Most streaming platforms can resend data until it receives an acknowledgment from the receiving database that it has successfully written the data. That is called at-least-once semantics.
The issue is how the downstream database stores updates and late-arriving data. Traditional transactional databases, such as Oracle or MySQL, were designed with the assumption that data would need to be continuously updated to maintain accuracy. Consequently, operational databases are almost always fully mutable so that individual records can be easily updated at any time.
Immutability and Updates: Costly and Risky for Data Accuracy
By contrast, most data warehouses, both on-premises and in the cloud, are designed with immutable data in mind, storing data to disk permanently as it arrives. All updates are appended rather than written over existing data records.
This has some benefits. It prevents accidental deletions, for one. For analytics, the key boon of immutability is that it enables data warehouses to accelerate queries by caching data in fast RAM or SSDs without worry that the source data on disk has changed and become out of date.
(Martin Fowler: Retroactive Event)
However, immutable data warehouses are challenged by out-of-order time-series data since no updates or changes can be inserted into the original data records.
In response, immutable data warehouse makers were forced to create workarounds. One method used by Snowflake, Apache Druid and others is called copy-on-write. When events arrive late, the data warehouse writes the new data and rewrites already-written adjacent data in order to store everything correctly to disk in the right time order.
Another poor solution to deal with updates in an immutable data system is to keep the original data in Partition A (see diagram above) and write late-arriving data to a different location, Partition B. The application, and not the data system, has to keep track of where all linked-but-scattered records are stored, as well as any resulting dependencies. This practice is called referential integrity, and it ensures that the relationships between the scattered rows of data are created and used as defined. Because the database does not provide referential integrity constraints, the onus is on the application developer(s) to understand and abide by these data dependencies.
Both workarounds have significant problems. Copy-on-write requires a significant amount of processing power and time — tolerable when updates are few but intolerably costly and slow as the amount of out-of-order data rises. For example, if 1,000 records are stored within an immutable blob and an update needs to be applied to a single record within that blob, the system would have to read all 1,000 records into a buffer, update the record and write all 1,000 records back to a new blob on disk — and delete the old blob. This is hugely inefficient, expensive and time-wasting. It can rule out real-time analytics on data streams that occasionally receive data out-of-order.
Using referential integrity to keep track of scattered data has its own issues. Queries must be double-checked that they are pulling data from the right locations or run the risk of data errors. Just imagine the overhead and confusion for an application developer when accessing the latest version of a record. The developer must write code that inspects multiple partitions, de-duplicates and merges the contents of the same record from multiple partitions before using it in the application. This significantly hinders developer productivity. Attempting any query optimizations such as data-caching also becomes much more complicated and riskier when updates to the same record are scattered in multiple places on disk.
The Problem with Immutability Today
All of the above problems were manageable when out-of-order updates were few and speed less important. However, the environment has become much more demanding for three reasons:
1. Explosion in Streaming Data
Before Kafka, Spark and Flink, streaming came in two flavors: Business Event Processing (BEP) and Complex Event Processing (CEP). BEP provided simple monitoring and instant triggers for SOA-based systems management and early algorithmic stock trading. CEP was slower but deeper, combining disparate data streams to answer more holistic questions.
BEP and CEP shared three characteristics:
- They were offered by large enterprise software vendors.
- They were on-premises.
- They were unaffordable for most companies.
Then a new generation of event-streaming platforms emerged. Many (Kafka, Spark and Flink) were open source. Most were cloud native (Amazon Kinesis, Google Cloud Dataflow) or were commercially adapted for the cloud (Kafka ⇒ Confluent, Spark ⇒ Databricks). And they were cheaper and easier to start using.
This democratized stream processing and enabled many more companies to begin tapping into their pent-up supplies of real-time data. Companies that were previously locked out of BEP and CEP began to harvest website user clickstreams, IoT sensor data, cybersecurity and fraud data, and more.
Companies also began to embrace change data capture (CDC) in order to stream updates from operational databases — think Oracle, MongoDB or Amazon DynamoDB — into their data warehouses. Companies also started appending additional related time-stamped data to existing datasets, a process called data enrichment. Both CDC and data enrichment boosted the accuracy and reach of their analytics.
As all of this data is time-stamped, it can potentially arrive out of order. This influx of out-of-order events puts heavy pressure on immutable data warehouses, their workarounds not being built with this volume in mind.
2. Evolution from Batch to Real-Time Analytics
When companies first deployed cloud native stream publishing platforms along with the rest of the modern data stack, they were fine if the data was ingested in batches and if query results took many minutes.
However, as my colleague Shruti Bhat points out, the world is going real time. To avoid disruption by cutting-edge rivals, companies are embracing e-commerce customer personalization, interactive data exploration, automated logistics and fleet management, and anomaly detection to prevent cybercrime and financial fraud.
These real- and near-real-time use cases dramatically narrow the time windows for both data freshness and query speeds while amping up the risk for data errors. To support that requires an analytics database capable of ingesting both raw data streams as well as out-of-order data in several seconds and returning accurate results in less than a second.
The workarounds employed by immutable data warehouses either ingest out-of-order data too slowly (copy-on-write) or in a complicated way (referential integrity) that slows query speeds and creates significant data accuracy risk. Besides creating delays that rule out real-time analytics, these workarounds also create extra cost, too.
3. Real-Time Analytics Is Mission Critical
Today’s disruptors are not only data-driven but are using real-time analytics to put competitors in the rear-view window. This can be an e-commerce website that boosts sales through personalized offers and discounts, an online e-sports platform that keeps players engaged through instant, data-optimized player matches or a construction logistics service that ensures concrete and other materials arrive to builders on time.
The flip side, of course, is that complex real-time analytics is now absolutely vital to a company’s success. Data must be fresh, correct and up to date so that queries are error-free. As incoming data streams spike, ingesting that data must not slow down your ongoing queries. And databases must promote, not detract from, the productivity of your developers. That is a tall order, but it is especially difficult when your immutable database uses clumsy hacks to ingest out-of-order data.
How Mutable Analytics Databases Solve Out-of-Order Data
The solution is simple and elegant: a mutable cloud native real-time analytics database. Late-arriving events are simply written to the portions of the database they would have been if they had arrived on time in the first place.
In the case of Rockset, a real-time analytics database that I helped create, individual fields in a data record can be natively updated, overwritten or deleted. There is no need for expensive and slow copy-on-writes, a la Apache Druid, or kludgy segregated dynamic partitions.
Rockset goes beyond other mutable real-time databases, though. Rockset not only continuously ingests data, but also can “rollup” the data as it is being generated. By using SQL to aggregate data as it is being ingested, this greatly reduces the amount of data stored (5-150x) as well as the amount of compute needed queries (boosting performance 30-100x). This frees users from managing slow, expensive ETL pipelines for their streaming data.
We also combined the underlying RocksDB storage engine with our Aggregator-Tailer-Leaf (ALT) architecture so that our indexes are instantly, fully mutable. That ensures all data, even freshly-ingested out-of-order data, is available for accurate, ultra-fast (sub-second) queries.
Rockset’s ALT architecture also separates the tasks of storage and compute. This ensures smooth scalability if there are bursts of data traffic, including backfills and other out-of-order data, and prevents query performance from being impacted.
Finally, RocksDB’s compaction algorithms automatically merge old and updated data records. This ensures that queries access the latest, correct version of data. It also prevents data bloat that would hamper storage efficiency and query speeds.
In other words, a mutable real-time analytics database designed like Rockset provides high raw data ingestion speeds, the native ability to update and backfill records with out-of-order data, all without creating additional cost, data error risk, or work for developers and data engineers. This supports the mission-critical real-time analytics required by today’s data-driven disruptors.
In future blog posts, I’ll describe other must-have features of real-time analytics databases such as bursty data traffic and complex queries. Or, you can skip ahead and watch my recent talk at the Hive on Designing the Next Generation of Data Systems for Real-Time Analytics, available below.
Embedded content: https://www.youtube.com/watch?v=NOuxW_SXj5M
Dhruba Borthakur is CTO and co-founder of Rockset and is responsible for the company's technical direction. He was an engineer on the database team at Facebook, where he was the founding engineer of the RocksDB data store. Earlier at Yahoo, he was one of the founding engineers of the Hadoop Distributed File System. He was also a contributor to the open source Apache HBase project.
Rockset is the real-time analytics database in the cloud for modern data teams. Get faster analytics on fresher data, at lower costs, by exploiting indexing over brute-force scanning.