Real-Time Analytics Using SQL on Streaming Data with Apache Kafka and Rockset

January 16, 2019

,
Register for
Index Conference

Hear talks on search and AI from engineers at Netflix, DoorDash, Uber and more.

This post offers a how-to guide to real-time analytics using SQL on streaming data with Apache Kafka and Rockset, using the Rockset Kafka Connector, a Kafka Connect Sink.

Kafka is commonly used by many organizations to handle their real-time data streams. We will show how Rockset integrates with Kafka to ingest and index our fast-moving event data, enabling us to build operational apps and live dashboards on top of Rockset. We will use a simulated event stream of orders on an e-commerce site for this example.

kafka-rockset

Setting up Your Environment

We’ll provide all the steps you'll need to connect Kafka and Rockset, and run some simple ad hoc queries and visualizations.

Kafka Cluster

If you already have a Kafka cluster ready and data flowing through it, then you can skip this portion of the guide. Otherwise, set up a Kafka cluster and verify it is running. A Kafka quickstart tutorial can be found here. A single-node Kafka cluster is sufficient for the example in this blog, although you may want a multi-node cluster for further work with Kafka and Rockset.

Python

All the code used in this blog is available under kafka-rockset-integration in the Rockset recipes repository. The Python code provided will simulate e-commerce order events and write them to Kafka. The following steps will guide you in downloading the code and setting up the Python environment.

git clone git@github.com:rockset/recipes.git
cd recipes/kafka-rockset-integration/

Create and activate Python virtual environment rockset-kafka-demo and install all the Python dependencies.

python3 -m virtualenv rockset-kafka-demo
source rockset-kafka-demo/bin/activate
pip install -r requirements.txt

Open config.py in your favorite editor and update the following configuration parameters. You will need to enter your Rockset API key.

# Kafka Configuration
KAFKA_TOPIC = 'orders'
KAFKA_BOOTSTRAP_SERVER = ['localhost:9092']

# Rockset Configuration
ROCKSET_API_KEY = '' # Create API Key - https://console.rockset.com/manage/apikeys
ROCKSET_API_SERVER = 'https://api.rs2.usw2.rockset.com'

Creating a Collection in Rockset

We will use the rock CLI tool to manage and query our data in Rockset.

Installing the rock CLI tool has already been done in as part of the pip install -r requirements.txt step above. Alternatively, you can install the rock CLI tool with the pip3 install rockset command.

Configure the rock CLI client with your Rockset API key.

rock configure --api_key <YOUR-API-KEY>

Create a Rockset collection named orders.

rock create collection orders \
--event-time-field=InvoiceDate \
--event-time-format=seconds_since_epoch

The --event-time-field=InvoiceDate option instructs Rockset to treat a document’s InvoiceDate as its _event_time, a special field used to handle time-series data efficiently.

Read more about special fields in Rockset and working with event data. Users working with event data in Rockset can set time-based retention policies on their data.

Connecting Rockset to Kafka

Kafka Connect, an open-source component of Apache Kafka, is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems. Rockset provides Kafka Connect for Rockset, a Kafka Connect Sink that helps load data from Kafka into a Rockset collection.

Kafka Connect for Rockset can be run in standalone or distributed mode. For the scope of this blog, the following steps explain how to set up Kafka Connect for Rockset in standalone mode.

Build

Clone the repo

git clone https://github.com/rockset/kafka-connect-rockset.git
cd kafka-connect-rockset

Build a maven artifact

mvn package

This will build the jar in the /target directory. Its name will be kafka-connect-rockset-[VERSION]-SNAPSHOT-jar-with-dependencies.jar.

Open the file ./config/connect-standalone.properties, which contains the configuration required for Kafka Connect. Update the following configuration values.

bootstrap.servers=localhost:9092
plugin.path=/path/to/kafka-connect-rockset-[VERSION]-SNAPSHOT-jar-with-dependencies.jar

Open the file ./config/connect-rockset-sink.properties to configure Rockset-specific properties.

topics - This is the list of comma-separated Kafka topics that should be watched by this Rockset Kafka Connector

topics=orders

rockset.collection - The Rockset connector will write data into this collection

rockset.collection=orders

rockset.apikey - Use the API Key of your Rockset account

Run

Start Kafka Rockset Connect and keep the terminal open to monitor progress.

$KAFKA_HOME/bin/connect-standalone.sh ./config/connect-standalone.properties ./config/connect-rockset-sink.properties

Refer to the documentation to set up Kafka Connect for Rockset in distributed mode and for other configuration information.

Ingesting Data from Kafka into Rockset

Make sure all the components (Kafka cluster, Kafka Connect for Rockset) are up and running.

Verify Zookeeper is up.

jps -l | grep org.apache.zookeeper.server
[process_id] org.apache.zookeeper.server.quorum.QuorumPeerMain

Verify Kafka is up.

jps -l | grep kafka.Kafka
[process_id] kafka.Kafka

Verify Kafka Connect for Rockset is up.

curl localhost:8083/connectors
["rockset-sink"]

Start writing new data into Kafka. write_data_into_kafka.py will generate 1 to 300 orders every second across multiple e-commerce customers and countries. (We borrowed the product set from this e-commerce data set and randomly generate orders for this example from this set.)

python write_data_into_kafka.py 

Writing records into Kafka. Kafka Server - localhost:9092, Topic - orders
100 records are written
200 records are written
300 records are written
400 records are written
500 records are written
600 records are written

...
Long output ahead

The JSON data for an example order containing two different products is shown below.

{
  "InvoiceNo": 14,
  "InvoiceDate": 1547523082,
  "CustomerID": 10140,
  "Country": "India",
  "StockCode": 3009,
  "Description": "HAND WARMER RED POLKA DOT",
  "Quantity": 6,
  "UnitPrice": 1.85
}
{
  "InvoiceNo": 14,
  "InvoiceDate": 1547523082,
  "CustomerID": 10140,
  "Country": "India",
  "StockCode": 3008,
  "Description": "HAND WARMER UNION JACK",
  "Quantity": 2,
  "UnitPrice": 1.85
}

In your terminal where Kafka Connect for Rockset is running, observe that the Rockset sink is active and documents are being ingested into your Rockset collection.

[2019-01-08 17:33:44,801] INFO Added doc: {"Invoi... (rockset.RocksetClientWrapper:37)
[2019-01-08 17:33:44,802] INFO Added doc: {"Invoi... (rockset.RocksetClientWrapper:37)
[2019-01-08 17:33:44,838] INFO Added doc: {"Invoi... (rockset.RocksetClientWrapper:37)
...
Long output ahead

Running SQL Queries in Rockset

The orders collection is ready for querying as soon as data ingestion is started. We will show some common queries you can run on the e-commerce data.

Open a SQL command line.

rock sql

Let’s try a simple query.

SELECT *
FROM orders
LIMIT 10;

You should see the output of the above query in a tabular format. You will notice the special fields _id and _event_time in the output, as mentioned earlier in the blog.

Let’s dig into the data and get some insights. Note that the results you get will be different from these, because the order data is generated randomly and continuously.

Highest Selling Products

Query

SELECT Description, SUM(Quantity) as QuantitiesSold
FROM "orders" 
GROUP BY Description
ORDER By QuantitiesSold DESC
LIMIT 5;

Output

+------------------------------------+------------------+
| Description                        | QuantitiesSold   |
|------------------------------------+------------------|
| ASSORTED COLOUR BIRD ORNAMENT      | 87786            |
| WHITE METAL LANTERN                | 65821            |
| WHITE HANGING HEART T-LIGHT HOLDER | 65319            |
| CREAM CUPID HEARTS COAT HANGER     | 43978            |
| RED WOOLLY HOTTIE WHITE HEART.     | 43260            |
+------------------------------------+------------------+
Time: 0.423s

Number of Orders by Country

Query

SELECT Country, COUNT(DISTINCT InvoiceNo) as TotalOrders
FROM "orders" 
GROUP BY Country
ORDER By TotalOrders DESC;

Output

+----------------+---------------+
| Country        | TotalOrders   |
|----------------+---------------|
| United States  | 4762          |
| India          | 3304          |
| China          | 3242          |
| United Kingdom | 1610          |
| Canada         | 1524          |
+----------------+---------------+
Time: 0.395s

Minute-By-Minute Sales

We’ve used very familiar SQL constructs like aggregate functions, GROUP BY, and ORDER BY up till now. Let’s try subqueries written using the WITH clause to show total sales observed every minute.

Query

WITH X AS (
    SELECT InvoiceNo, FORMAT_TIMESTAMP('%H:%M', DATETIME(_event_time)) as Minute, SUM(UnitPrice) as OrderValue
    FROM "orders"
    GROUP BY InvoiceNo, _event_time
)
SELECT Minute, CEIL(SUM(OrderValue)) as TotalSale
FROM X
GROUP BY Minute
ORDER BY Minute;

Output

+------------------+-------------+
| Minute           | TotalSale   |
|------------------+-------------|
| 2019-01-08 11:52 | 40261.0     |
| 2019-01-08 11:53 | 66759.0     |
| 2019-01-08 11:54 | 72043.0     |
| 2019-01-08 11:55 | 56221.0     |
+------------------+-------------+
Time: 0.451s

You can easily perform other ad hoc SQL queries on the data at any time. The write_data_into_kafka.py script will keep streaming the orders data continuously. You can stream as much as you want to get more data written into Rockset collection.

Joining Kafka Event Data with CSV Data in S3

Let's say that we have customer data from another source that we want to join with our orders data for analysis. With Rockset, we can easily ingest data from a range of data sources and combine it with our Kafka stream using SQL.

The kafka-rockset-integration directory contains a customers.csv file containing the CustomerID and AcquisitionSource of each customer. We will store this data on how customers were acquired in Amazon S3 and create a Rockset collection from it.

head customers.csv 

CustomerID,AcquisitionSource
10000,Display
10001,AffiliateReferral
10002,OrganicSearch
10003,OrganicSearch
10004,Display
10005,SocialMedia
10006,OrganicSearch
10007,SocialMedia
10008,AffiliateReferral

Upload the customers.csv file to S3 following these instructions for setting up an S3 bucket. From your S3 source, create a Rockset collection named customers, based on customers.csv.

Sales by Customer Acquisition Source

Join the real-time orders data coming from Kafka with customer acquisition data to determine the total sales by customer acquisition source. The following query demonstrates an inner join on the CustomerID field between the orders collection, from Kafka, and the customers collection, from S3.

Query

SELECT C.AcquisitionSource, CEIL(SUM(O.UnitPrice)) as TotalSale
FROM customers AS C JOIN orders as O on O.CustomerID = Cast(C.CustomerID AS integer)
GROUP BY C.AcquisitionSource
ORDER BY TotalSale DESC

Output

+---------------------+-------------+
| AcquisitionSource   | TotalSale   |
|---------------------+-------------|
| AffiliateReferral   | 45779.0     |
| PaidSearch          | 42668.0     |
| OrganicSearch       | 41467.0     |
| Email               | 37040.0     |
| SocialMedia         | 36509.0     |
| Display             | 34516.0     |
+---------------------+-------------+

Visualize

Now that we’ve run some SQL queries on the order data in Rockset, let’s extend our example. The provided code includes a visualize.py script, which translates Rockset query results into graphical widgets. The script uses the Dash library to plot the results graphically. (It is possible to use data visualization tools like Tableau, Apache Superset, Redash, and Grafana as well.)

Run visualize.py.

python visualize.py

Open a dashboard in your browser.

http://127.0.0.1:8050/

You can see some dashboard widgets plotted using Dash over the Rockset data.

dashboard 1

dashboard 2

dashboard 3

dashboard 4

Conclusion

With Rockset, you can build apps without writing complex ETL pipelines. Rockset continuously syncs new data as it lands in your data sources without the need for a fixed schema. The Kafka-Rockset integration outlined above allows you to build operational apps and live dashboards quickly and easily, using SQL on real-time event data streaming through Kafka.

Visit our Kafka solutions page for more information on building real-time dashboards and APIs on Kafka event streams.