How Rockset Enables SQL-Based Rollups for Streaming Data

August 31, 2021

,
Register for
Index Conference

Hear talks on search and AI from engineers at Netflix, DoorDash, Uber and more.

Until Now: The Slow Crawl from Batch to Real-Time Analytics

The world is moving from batch to real-time analytics but it's been at a crawl. Apache Kafka has made acquiring real-time data more mainstream, but only a small sliver are turning batch analytics, run nightly, into real-time analytical dashboards with alerts and automatic anomaly detection. The majority are still draining streaming data into a data lake or a warehouse and are doing batch analytics. That’s because traditional OLTP systems and data warehouses are ill-equipped to power real-time analytics easily or efficiently. OLTP systems aren't suited to handle the scale of real-time streams and aren't built to serve complex analytics. Warehouses struggle to serve fresh real-time data and lack the speed and compute efficiency to power real-time analytics. It becomes prohibitively complex and expensive to use a data warehouse to serve real-time analytics.

Rockset: Real-time Analytics Built for the Cloud

Rockset is doing for real-time analytics what Snowflake did for batch. Rockset is a real-time analytics database in the cloud that uses an indexing approach to deliver low-latency analytics at scale. It eliminates the cost and complexity around data preparation, performance tuning and operations, helping to accelerate the movement from batch to real-time analytics.

The latest Rockset release, SQL-based rollups, has made real-time analytics on streaming data a lot more affordable and accessible. Anyone who knows SQL, the lingua franca of analytics, can now rollup, transform, enrich and aggregate real-time data at massive scale.

In the rest of this blog post, I’ll go into more detail on what’s changed with this release, how we implemented rollups and why we think this is crucial to expediting the real-time analytics movement.

A Quick Primer on Indexing in Rockset

Rockset allows users to connect real-time data sources — data streams (Kafka, Kinesis), OLTP databases (DynamoDB, MongoDB, MySQL, PostgreSQL) and also data lakes (S3, GCS) — using built-in connectors. When you point Rockset at an OLTP database like MySQL, Postgres, DynamoDB, or MongoDB, Rockset will first do a full copy and then cut over to the CDC stream automatically. All these connectors are real-time connectors so new data added to the source or INSERTS/UPDATES/DELETES in upstream databases will be reflected in Rockset within 1-2 seconds. All data will be indexed in real-time, and Rockset’s distributed SQL engine will leverage the indexes and provide sub-second query response times.

But until this release, all these data sources involved indexing the incoming raw data on a record by record basis. For example, if you connected a Kafka stream to Rockset, then every Kafka message would get fully indexed and the Kafka topic would be turned into fully typed, fully indexed SQL table. That is sufficient for some use cases. However, for many use cases at huge volumes — such as a Kafka topic that streams tens of TBs of data every day — it becomes prohibitively expensive to index the raw data stream and then calculate the desired metrics downstream at query processing time.

Opening the Streaming Gates with Rollups

With SQL-based Rollups, Rockset allows you to define any metric you want to track in real-time, across any number of dimensions, simply using SQL. The rollup SQL will act as a standing query and will continuously run on incoming data. All the metrics will be accurate up to the second. You can use all the power and flexibility of SQL to define complex expressions to define your metric.

The rollup SQL will typically be of the form:

SELECT 
    dimension1, 
    dimension2, 
    ... <more dimensions> ..., 
    agg_function1(measure1), 
    agg_function2(measure2), 
    ... <more measures> ...
FROM 
    _input 
GROUP BY 
    dimension1, 
    dimension2,
    .... <rest of the dimensions> ...

You can also optionally use WHERE clauses to filter out data. Since only the aggregated data is now ingested and indexed into Rockset, this technique reduces the compute and storage required to track real-time metrics by a few orders of magnitude. The resulting aggregated data will get indexed in Rockset as usual, so you should expect really fast queries on top of these aggregated dimensions for any type of slicing/dicing analysis you want to run.

SQL-Based Rollups Are 🔥

Maintaining real-time metrics on simple aggregation functions such as SUM() or COUNT() are fairly straightforward. Any bean-counting software can do this. You simply have to apply the rollup SQL on top of incoming data and transform a new record into a metric increment/decrement command, and off you go. But things get really interesting when you need to use a much more complex SQL expression to define your metric.

Take a look at the error_rate and error_rate_arcsinh [1] metrics in the following real-world example:

SELECT
    merchant,
    operation,
    event_date,
    EXTRACT(hour from event_date) as event_hour,
    EXTRACT(minute from event_date) as event_min,
    COUNT(*) as event_count,
    (CASE
        WHEN count(*) = 0 THEN 0
        ELSE sum(error_flag) * 1.0 / count(*)
     END) AS error_rate,
    LOG10(
        (CASE
            WHEN count(*) = 0 THEN 0
            ELSE sum(error_flag) * 1.0 / sum(event_count)
         END)
        + SQRT(POWER(CASE
                        WHEN count(*) = 0 THEN 0
                        ELSE sum(error_flag) * 1.0 / sum(event_count)
                    END, 2) + 1)
    ) AS error_rate_arcsinh
FROM 
    _input
GROUP BY
    merchant,
    operation,
    event_date,
    event_hour,
    event_min

Maintaining the error_rate and error_rate_arcsinh in real-time is not so simple. The function doesn’t easily decompose into simple increments or decrements that can be maintained in real-time. So, how does Rockset support this you may wonder? If you look closely at those two SQL expressions, you will realize that both those metrics are doing basic arithmetic on top of two simple aggregate metrics: count(*) and sum(error_flag). So, if we can maintain those two simple base aggregate metrics in real-time and then plug in the arithmetic expression at query time, then you can always report the complex metric defined by the user in real-time.

When asked to maintain such complex real-time metrics, Rockset automatically splits the rollup SQL into 2 parts:

  • Part 1: a set of base aggregate metrics that actually need to be maintained at data ingestion time. In example above, these base aggregate metrics are count(*) and sum(error_flag). For sake of understanding, assume these metrics are tracked as _count and _sum_error_flag respectively.
count(*) as _count
sum(error_flag) as _sum_error_flag
  • Part 2: the set of expressions that need to be applied on top of the pre-calculated base aggregate metrics at query time. In the example above, the expression for error_rate would look as follows.
(CASE
       WHEN _count = 0 THEN 0
      ELSE _sum_error_flag * 1.0 / :count
 END) AS error_rate

So, now you can use the full breadth and flexibility available in SQL to construct the metrics that you want to maintain in real-time, which in turn makes real-time analytics accessible to your entire team. No need to learn some archaic domain specific language or fumble with complex YAML configs to achieve this. You already know how to use Rockset because you know how to use SQL.

Accurate Metrics in Face of Dupes and Late Comers

Rockset's real-time data connectors guarantee exactly-once semantics for streaming sources such as Kafka or Kinesis out of the box. So, transient hiccups or reconnects are not going to affect the accuracy of your real-time metrics. This is an important requirement that should not be overlooked while implementing a real-time analytical solution.

But what is even more important is how to handle out-of-order arrivals and late arrivals which are very very common in data streams. Thankfully, Rockset’s indexes are fully mutable at the field level unlike other systems such as Apache Druid that seals older segments which makes updating those segments really expensive. So, late and out-of-order arrivals are trivially simple to deal with in Rockset. When those events arrive, Rockset will process them and update the required metrics exactly as though those events actually arrived in-order and on-time. This eliminates a ton of operational complexity for you while ensuring that your metrics are always accurate.

Now: The Fast Flight from Batch to Real-Time Analytics

You can’t introduce streaming data into a stack that was built for batch. You need to have a database that can easily handle large-scale streaming data while continuing to deliver low latency analytics. Now, with Rockset, we’re able to ease the transition from batch to real-time analytics with an affordable and accessible solution. There’s no need to learn a new query language, massage data pipelines to minimize latency or just waste/throw a lot of compute at a batch-based system to get incrementally better performance. We’re making the move from batch to real-time analytics as simple as constructing a SQL query.

You can learn more about this release in a live interview we did with Tudor Bosman, Rockset’s Chief Architect.

Embedded content: https://youtu.be/bu5MRzd8d-0

References:

[1] If you are wondering who needs to maintain inverse hyperbolic sine functions on error rates, then clearly you haven’t met an econometrician lately.

Applied econometricians often transform variables to make the interpretation of empirical results easier, to approximate a normal distribution, to reduce heteroskedasticity, or to reduce the effect of outliers. Taking the logarithm of a variable has long been a popular such transformation.

One problem with taking the logarithm of a variable is that it does not allow retaining zero-valued observations because ln(0) is undefined. But economic data often include meaningful zero-valued observations, and applied econometricians are typically loath to drop those observations for which the logarithm is undefined. Consequently, researchers have often resorted to ad hoc means of accounting for this when taking the natural logarithm of a variable, such as adding 1 to the variable prior to its transformation (MaCurdy and Pencavel, 1986).

In recent years, the inverse hyperbolic sine (or arcsinh) transformation has grown in popularity among applied econometricians because (i) it is similar to a logarithm, and (ii) it allows retaining zero-valued (and even negative- valued) observations (Burbidge et al., 1988; MacKinnon and Magee, 1990; Pence, 2006).

Source: https://marcfbellemare.com/wordpress/wp-content/uploads/2019/02/BellemareWichmanIHSFebruary2019.pdf