Real-Time Analytics Using SQL on Streaming Data with Apache Kafka and Rockset
January 16, 2019
This post offers a how-to guide to real-time analytics using SQL on streaming data with Apache Kafka and Rockset, using the Rockset Kafka Connector, a Kafka Connect Sink.
Kafka is commonly used by many organizations to handle their real-time data streams. We will show how Rockset integrates with Kafka to ingest and index our fast-moving event data, enabling us to build operational apps and live dashboards on top of Rockset. We will use a simulated event stream of orders on an e-commerce site for this example.
Setting up Your Environment
We’ll provide all the steps you'll need to connect Kafka and Rockset, and run some simple ad hoc queries and visualizations.
Kafka Cluster
If you already have a Kafka cluster ready and data flowing through it, then you can skip this portion of the guide. Otherwise, set up a Kafka cluster and verify it is running. A Kafka quickstart tutorial can be found here. A single-node Kafka cluster is sufficient for the example in this blog, although you may want a multi-node cluster for further work with Kafka and Rockset.
Python
All the code used in this blog is available under kafka-rockset-integration in the Rockset recipes repository. The Python code provided will simulate e-commerce order events and write them to Kafka. The following steps will guide you in downloading the code and setting up the Python environment.
git clone git@github.com:rockset/recipes.git
cd recipes/kafka-rockset-integration/
Create and activate Python virtual environment rockset-kafka-demo
and install all the Python dependencies.
python3 -m virtualenv rockset-kafka-demo
source rockset-kafka-demo/bin/activate
pip install -r requirements.txt
Open config.py
in your favorite editor and update the following configuration parameters. You will need to enter your Rockset API key.
# Kafka Configuration
KAFKA_TOPIC = 'orders'
KAFKA_BOOTSTRAP_SERVER = ['localhost:9092']
# Rockset Configuration
ROCKSET_API_KEY = '' # Create API Key - https://console.rockset.com/manage/apikeys
ROCKSET_API_SERVER = 'https://api.rs2.usw2.rockset.com'
Creating a Collection in Rockset
We will use the rock CLI tool to manage and query our data in Rockset.
Installing the rock CLI tool has already been done in as part of the pip install -r requirements.txt
step above. Alternatively, you can install the rock CLI tool with the pip3 install rockset
command.
Configure the rock CLI client with your Rockset API key.
rock configure --api_key <YOUR-API-KEY>
Create a Rockset collection named orders
.
rock create collection orders \
--event-time-field=InvoiceDate \
--event-time-format=seconds_since_epoch
The --event-time-field=InvoiceDate
option instructs Rockset to treat a document’s InvoiceDate
as its _event_time
, a special field used to handle time-series data efficiently.
Read more about special fields in Rockset and working with event data. Users working with event data in Rockset can set time-based retention policies on their data.
Connecting Rockset to Kafka
Kafka Connect, an open-source component of Apache Kafka, is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems. Rockset provides Kafka Connect for Rockset, a Kafka Connect Sink that helps load data from Kafka into a Rockset collection.
Kafka Connect for Rockset can be run in standalone or distributed mode. For the scope of this blog, the following steps explain how to set up Kafka Connect for Rockset in standalone mode.
Build
Clone the repo
git clone https://github.com/rockset/kafka-connect-rockset.git
cd kafka-connect-rockset
Build a maven artifact
mvn package
This will build the jar in the /target directory. Its name will be kafka-connect-rockset-[VERSION]-SNAPSHOT-jar-with-dependencies.jar
.
Open the file ./config/connect-standalone.properties
, which contains the configuration required for Kafka Connect. Update the following configuration values.
bootstrap.servers=localhost:9092
plugin.path=/path/to/kafka-connect-rockset-[VERSION]-SNAPSHOT-jar-with-dependencies.jar
Open the file ./config/connect-rockset-sink.properties
to configure Rockset-specific properties.
topics
- This is the list of comma-separated Kafka topics that should be watched by this Rockset Kafka Connector
topics=orders
rockset.collection
- The Rockset connector will write data into this collection
rockset.collection=orders
rockset.apikey
- Use the API Key of your Rockset account
Run
Start Kafka Rockset Connect and keep the terminal open to monitor progress.
$KAFKA_HOME/bin/connect-standalone.sh ./config/connect-standalone.properties ./config/connect-rockset-sink.properties
Refer to the documentation to set up Kafka Connect for Rockset in distributed mode and for other configuration information.
Ingesting Data from Kafka into Rockset
Make sure all the components (Kafka cluster, Kafka Connect for Rockset) are up and running.
Verify Zookeeper is up.
jps -l | grep org.apache.zookeeper.server
[process_id] org.apache.zookeeper.server.quorum.QuorumPeerMain
Verify Kafka is up.
jps -l | grep kafka.Kafka
[process_id] kafka.Kafka
Verify Kafka Connect for Rockset is up.
curl localhost:8083/connectors
["rockset-sink"]
Start writing new data into Kafka. write_data_into_kafka.py
will generate 1 to 300 orders every second across multiple e-commerce customers and countries. (We borrowed the product set from this e-commerce data set and randomly generate orders for this example from this set.)
python write_data_into_kafka.py
Writing records into Kafka. Kafka Server - localhost:9092, Topic - orders
100 records are written
200 records are written
300 records are written
400 records are written
500 records are written
600 records are written
...
Long output ahead
The JSON data for an example order containing two different products is shown below.
{
"InvoiceNo": 14,
"InvoiceDate": 1547523082,
"CustomerID": 10140,
"Country": "India",
"StockCode": 3009,
"Description": "HAND WARMER RED POLKA DOT",
"Quantity": 6,
"UnitPrice": 1.85
}
{
"InvoiceNo": 14,
"InvoiceDate": 1547523082,
"CustomerID": 10140,
"Country": "India",
"StockCode": 3008,
"Description": "HAND WARMER UNION JACK",
"Quantity": 2,
"UnitPrice": 1.85
}
In your terminal where Kafka Connect for Rockset is running, observe that the Rockset sink is active and documents are being ingested into your Rockset collection.
[2019-01-08 17:33:44,801] INFO Added doc: {"Invoi... (rockset.RocksetClientWrapper:37)
[2019-01-08 17:33:44,802] INFO Added doc: {"Invoi... (rockset.RocksetClientWrapper:37)
[2019-01-08 17:33:44,838] INFO Added doc: {"Invoi... (rockset.RocksetClientWrapper:37)
...
Long output ahead
Running SQL Queries in Rockset
The orders
collection is ready for querying as soon as data ingestion is started. We will show some common queries you can run on the e-commerce data.
Open a SQL command line.
rock sql
Let’s try a simple query.
SELECT *
FROM orders
LIMIT 10;
You should see the output of the above query in a tabular format. You will notice the special fields _id
and _event_time
in the output, as mentioned earlier in the blog.
Let’s dig into the data and get some insights. Note that the results you get will be different from these, because the order data is generated randomly and continuously.
Highest Selling Products
Query
SELECT Description, SUM(Quantity) as QuantitiesSold
FROM "orders"
GROUP BY Description
ORDER By QuantitiesSold DESC
LIMIT 5;
Output
+------------------------------------+------------------+
| Description | QuantitiesSold |
|------------------------------------+------------------|
| ASSORTED COLOUR BIRD ORNAMENT | 87786 |
| WHITE METAL LANTERN | 65821 |
| WHITE HANGING HEART T-LIGHT HOLDER | 65319 |
| CREAM CUPID HEARTS COAT HANGER | 43978 |
| RED WOOLLY HOTTIE WHITE HEART. | 43260 |
+------------------------------------+------------------+
Time: 0.423s
Number of Orders by Country
Query
SELECT Country, COUNT(DISTINCT InvoiceNo) as TotalOrders
FROM "orders"
GROUP BY Country
ORDER By TotalOrders DESC;
Output
+----------------+---------------+
| Country | TotalOrders |
|----------------+---------------|
| United States | 4762 |
| India | 3304 |
| China | 3242 |
| United Kingdom | 1610 |
| Canada | 1524 |
+----------------+---------------+
Time: 0.395s
Minute-By-Minute Sales
We’ve used very familiar SQL constructs like aggregate functions, GROUP BY
, and ORDER BY
up till now. Let’s try subqueries written using the WITH
clause to show total sales observed every minute.
Query
WITH X AS (
SELECT InvoiceNo, FORMAT_TIMESTAMP('%H:%M', DATETIME(_event_time)) as Minute, SUM(UnitPrice) as OrderValue
FROM "orders"
GROUP BY InvoiceNo, _event_time
)
SELECT Minute, CEIL(SUM(OrderValue)) as TotalSale
FROM X
GROUP BY Minute
ORDER BY Minute;
Output
+------------------+-------------+
| Minute | TotalSale |
|------------------+-------------|
| 2019-01-08 11:52 | 40261.0 |
| 2019-01-08 11:53 | 66759.0 |
| 2019-01-08 11:54 | 72043.0 |
| 2019-01-08 11:55 | 56221.0 |
+------------------+-------------+
Time: 0.451s
You can easily perform other ad hoc SQL queries on the data at any time. The write_data_into_kafka.py
script will keep streaming the orders data continuously. You can stream as much as you want to get more data written into Rockset collection.
Joining Kafka Event Data with CSV Data in S3
Let's say that we have customer data from another source that we want to join with our orders data for analysis. With Rockset, we can easily ingest data from a range of data sources and combine it with our Kafka stream using SQL.
The kafka-rockset-integration directory contains a customers.csv
file containing the CustomerID and AcquisitionSource of each customer. We will store this data on how customers were acquired in Amazon S3 and create a Rockset collection from it.
head customers.csv
CustomerID,AcquisitionSource
10000,Display
10001,AffiliateReferral
10002,OrganicSearch
10003,OrganicSearch
10004,Display
10005,SocialMedia
10006,OrganicSearch
10007,SocialMedia
10008,AffiliateReferral
Upload the customers.csv
file to S3 following these instructions for setting up an S3 bucket. From your S3 source, create a Rockset collection named customers
, based on customers.csv
.
Sales by Customer Acquisition Source
Join the real-time orders data coming from Kafka with customer acquisition data to determine the total sales by customer acquisition source. The following query demonstrates an inner join on the CustomerID
field between the orders
collection, from Kafka, and the customers
collection, from S3.
Query
SELECT C.AcquisitionSource, CEIL(SUM(O.UnitPrice)) as TotalSale
FROM customers AS C JOIN orders as O on O.CustomerID = Cast(C.CustomerID AS integer)
GROUP BY C.AcquisitionSource
ORDER BY TotalSale DESC
Output
+---------------------+-------------+
| AcquisitionSource | TotalSale |
|---------------------+-------------|
| AffiliateReferral | 45779.0 |
| PaidSearch | 42668.0 |
| OrganicSearch | 41467.0 |
| Email | 37040.0 |
| SocialMedia | 36509.0 |
| Display | 34516.0 |
+---------------------+-------------+
Visualize
Now that we’ve run some SQL queries on the order data in Rockset, let’s extend our example. The provided code includes a visualize.py
script, which translates Rockset query results into graphical widgets. The script uses the Dash library to plot the results graphically. (It is possible to use data visualization tools like Tableau, Apache Superset, Redash, and Grafana as well.)
Run visualize.py
.
python visualize.py
Open a dashboard in your browser.
http://127.0.0.1:8050/
You can see some dashboard widgets plotted using Dash over the Rockset data.
Conclusion
With Rockset, you can build apps without writing complex ETL pipelines. Rockset continuously syncs new data as it lands in your data sources without the need for a fixed schema. The Kafka-Rockset integration outlined above allows you to build operational apps and live dashboards quickly and easily, using SQL on real-time event data streaming through Kafka.
Visit our Kafka solutions page for more information on building real-time dashboards and APIs on Kafka event streams.