Real-Time Analytics Using SQL on Streaming Data with Apache Kafka and Rockset

January 16, 2019


This post offers a how-to guide to real-time analytics using SQL on streaming data with Apache Kafka and Rockset, using the Rockset Kafka Connector, a Kafka Connect Sink.

Kafka is commonly used by many organizations to handle their real-time data streams. We will show how Rockset integrates with Kafka to ingest and index our fast-moving event data, enabling us to build operational apps and live dashboards on top of Rockset. We will use a simulated event stream of orders on an e-commerce site for this example.


Setting up Your Environment

We’ll provide all the steps you'll need to connect Kafka and Rockset, and run some simple ad hoc queries and visualizations.

Kafka Cluster

If you already have a Kafka cluster ready and data flowing through it, then you can skip this portion of the guide. Otherwise, set up a Kafka cluster and verify it is running. A Kafka quickstart tutorial can be found here. A single-node Kafka cluster is sufficient for the example in this blog, although you may want a multi-node cluster for further work with Kafka and Rockset.


All the code used in this blog is available under kafka-rockset-integration in the Rockset recipes repository. The Python code provided will simulate e-commerce order events and write them to Kafka. The following steps will guide you in downloading the code and setting up the Python environment.

git clone
cd recipes/kafka-rockset-integration/

Create and activate Python virtual environment rockset-kafka-demo and install all the Python dependencies.

python3 -m virtualenv rockset-kafka-demo
source rockset-kafka-demo/bin/activate
pip install -r requirements.txt

Open in your favorite editor and update the following configuration parameters. You will need to enter your Rockset API key.

# Kafka Configuration
KAFKA_TOPIC = 'orders'
KAFKA_BOOTSTRAP_SERVER = ['localhost:9092']

# Rockset Configuration
ROCKSET_API_KEY = '' # Create API Key -

Creating a Collection in Rockset

We will use the rock CLI tool to manage and query our data in Rockset.

Installing the rock CLI tool has already been done in as part of the pip install -r requirements.txt step above. Alternatively, you can install the rock CLI tool with the pip3 install rockset command.

Configure the rock CLI client with your Rockset API key.

rock configure --api_key <YOUR-API-KEY>

Create a Rockset collection named orders.

rock create collection orders \
--event-time-field=InvoiceDate \

The --event-time-field=InvoiceDate option instructs Rockset to treat a document’s InvoiceDate as its _event_time, a special field used to handle time-series data efficiently.

Read more about special fields in Rockset and working with event data. Users working with event data in Rockset can set time-based retention policies on their data.

Connecting Rockset to Kafka

Kafka Connect, an open-source component of Apache Kafka, is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems. Rockset provides Kafka Connect for Rockset, a Kafka Connect Sink that helps load data from Kafka into a Rockset collection.

Kafka Connect for Rockset can be run in standalone or distributed mode. For the scope of this blog, the following steps explain how to set up Kafka Connect for Rockset in standalone mode.


Clone the repo

git clone
cd kafka-connect-rockset

Build a maven artifact

mvn package

This will build the jar in the /target directory. Its name will be kafka-connect-rockset-[VERSION]-SNAPSHOT-jar-with-dependencies.jar.

Open the file ./config/, which contains the configuration required for Kafka Connect. Update the following configuration values.


Open the file ./config/ to configure Rockset-specific properties.

topics - This is the list of comma-separated Kafka topics that should be watched by this Rockset Kafka Connector


rockset.collection - The Rockset connector will write data into this collection


rockset.apikey - Use the API Key of your Rockset account


Start Kafka Rockset Connect and keep the terminal open to monitor progress.

$KAFKA_HOME/bin/ ./config/ ./config/

Refer to the documentation to set up Kafka Connect for Rockset in distributed mode and for other configuration information.

Ingesting Data from Kafka into Rockset

Make sure all the components (Kafka cluster, Kafka Connect for Rockset) are up and running.

Verify Zookeeper is up.

jps -l | grep org.apache.zookeeper.server
[process_id] org.apache.zookeeper.server.quorum.QuorumPeerMain

Verify Kafka is up.

jps -l | grep kafka.Kafka
[process_id] kafka.Kafka

Verify Kafka Connect for Rockset is up.

curl localhost:8083/connectors

Start writing new data into Kafka. will generate 1 to 300 orders every second across multiple e-commerce customers and countries. (We borrowed the product set from this e-commerce data set and randomly generate orders for this example from this set.)


Writing records into Kafka. Kafka Server - localhost:9092, Topic - orders
100 records are written
200 records are written
300 records are written
400 records are written
500 records are written
600 records are written

Long output ahead

The JSON data for an example order containing two different products is shown below.

  "InvoiceNo": 14,
  "InvoiceDate": 1547523082,
  "CustomerID": 10140,
  "Country": "India",
  "StockCode": 3009,
  "Description": "HAND WARMER RED POLKA DOT",
  "Quantity": 6,
  "UnitPrice": 1.85
  "InvoiceNo": 14,
  "InvoiceDate": 1547523082,
  "CustomerID": 10140,
  "Country": "India",
  "StockCode": 3008,
  "Description": "HAND WARMER UNION JACK",
  "Quantity": 2,
  "UnitPrice": 1.85

In your terminal where Kafka Connect for Rockset is running, observe that the Rockset sink is active and documents are being ingested into your Rockset collection.

[2019-01-08 17:33:44,801] INFO Added doc: {"Invoi... (rockset.RocksetClientWrapper:37)
[2019-01-08 17:33:44,802] INFO Added doc: {"Invoi... (rockset.RocksetClientWrapper:37)
[2019-01-08 17:33:44,838] INFO Added doc: {"Invoi... (rockset.RocksetClientWrapper:37)
Long output ahead

Running SQL Queries in Rockset

The orders collection is ready for querying as soon as data ingestion is started. We will show some common queries you can run on the e-commerce data.

Open a SQL command line.

rock sql

Let’s try a simple query.

FROM orders

You should see the output of the above query in a tabular format. You will notice the special fields _id and _event_time in the output, as mentioned earlier in the blog.

Let’s dig into the data and get some insights. Note that the results you get will be different from these, because the order data is generated randomly and continuously.

Highest Selling Products


SELECT Description, SUM(Quantity) as QuantitiesSold
FROM "orders" 
GROUP BY Description
ORDER By QuantitiesSold DESC


| Description                        | QuantitiesSold   |
| ASSORTED COLOUR BIRD ORNAMENT      | 87786            |
| WHITE METAL LANTERN                | 65821            |
| CREAM CUPID HEARTS COAT HANGER     | 43978            |
| RED WOOLLY HOTTIE WHITE HEART.     | 43260            |
Time: 0.423s

Number of Orders by Country


SELECT Country, COUNT(DISTINCT InvoiceNo) as TotalOrders
FROM "orders" 
GROUP BY Country
ORDER By TotalOrders DESC;


| Country        | TotalOrders   |
| United States  | 4762          |
| India          | 3304          |
| China          | 3242          |
| United Kingdom | 1610          |
| Canada         | 1524          |
Time: 0.395s

Minute-By-Minute Sales

We’ve used very familiar SQL constructs like aggregate functions, GROUP BY, and ORDER BY up till now. Let’s try subqueries written using the WITH clause to show total sales observed every minute.


    SELECT InvoiceNo, FORMAT_TIMESTAMP('%H:%M', DATETIME(_event_time)) as Minute, SUM(UnitPrice) as OrderValue
    FROM "orders"
    GROUP BY InvoiceNo, _event_time
SELECT Minute, CEIL(SUM(OrderValue)) as TotalSale
ORDER BY Minute;


| Minute           | TotalSale   |
| 2019-01-08 11:52 | 40261.0     |
| 2019-01-08 11:53 | 66759.0     |
| 2019-01-08 11:54 | 72043.0     |
| 2019-01-08 11:55 | 56221.0     |
Time: 0.451s

You can easily perform other ad hoc SQL queries on the data at any time. The script will keep streaming the orders data continuously. You can stream as much as you want to get more data written into Rockset collection.

Joining Kafka Event Data with CSV Data in S3

Let's say that we have customer data from another source that we want to join with our orders data for analysis. With Rockset, we can easily ingest data from a range of data sources and combine it with our Kafka stream using SQL.

The kafka-rockset-integration directory contains a customers.csv file containing the CustomerID and AcquisitionSource of each customer. We will store this data on how customers were acquired in Amazon S3 and create a Rockset collection from it.

head customers.csv 


Upload the customers.csv file to S3 following these instructions for setting up an S3 bucket. From your S3 source, create a Rockset collection named customers, based on customers.csv.

Sales by Customer Acquisition Source

Join the real-time orders data coming from Kafka with customer acquisition data to determine the total sales by customer acquisition source. The following query demonstrates an inner join on the CustomerID field between the orders collection, from Kafka, and the customers collection, from S3.


SELECT C.AcquisitionSource, CEIL(SUM(O.UnitPrice)) as TotalSale
FROM customers AS C JOIN orders as O on O.CustomerID = Cast(C.CustomerID AS integer)
GROUP BY C.AcquisitionSource


| AcquisitionSource   | TotalSale   |
| AffiliateReferral   | 45779.0     |
| PaidSearch          | 42668.0     |
| OrganicSearch       | 41467.0     |
| Email               | 37040.0     |
| SocialMedia         | 36509.0     |
| Display             | 34516.0     |


Now that we’ve run some SQL queries on the order data in Rockset, let’s extend our example. The provided code includes a script, which translates Rockset query results into graphical widgets. The script uses the Dash library to plot the results graphically. (It is possible to use data visualization tools like Tableau, Apache Superset, Redash, and Grafana as well.)



Open a dashboard in your browser.

You can see some dashboard widgets plotted using Dash over the Rockset data.

dashboard 1

dashboard 2

dashboard 3

dashboard 4


With Rockset, you can build apps without writing complex ETL pipelines. Rockset continuously syncs new data as it lands in your data sources without the need for a fixed schema. The Kafka-Rockset integration outlined above allows you to build operational apps and live dashboards quickly and easily, using SQL on real-time event data streaming through Kafka.

Visit our Kafka solutions page for more information on building real-time dashboards and APIs on Kafka event streams.