SQL API for Real-Time Kafka Analytics in 3 Steps
January 17, 2020
In this blog we will set up a real-time SQL API on Kafka using AWS Lambda and Rockset.
At the time of writing (in early 2020) the San Francisco 49ers are doing remarkably well! To honor their success, we will focus on answering the following question.
What are the most popular hashtags in tweets that mentioned the 49ers in the last 20 minutes?
Because Twitter moves fast, we will only look at very recent tweets.
We will also show how to make the same API and query work for any of the teams in the NFL.
There are 3 steps to get started.
- Load the Twitter Stream into Rockset.
- Get a fast SQL Table
- Power your API
Step 0: Setup
This tutorial assumes you have already set up a Rockset Account, an AWS account (for the lambda), and a Twitter Development Account (for the twitter stream).
If you already have data you want to build an API on skip to Step 1.
The Kafka set up instructions for the remainder of this tutorial are for Confluent Kafka. Follow the directions below to set it up if you don’t have a Kafka installation to experiment with.
Optional: Set up Confluent Kafka locally
First you have to set up Confluent Hub Platform, the Confluent Hub client and the Confluent CLI.
# Start up a local version of Kafka.
confluent local start
You should see several services start. Navigate to the Kafka UI at http://localhost:9021/ to make sure it works properly.
Setting up the Twitter Connector
Install Kafka Connect Twitter as well as the Rockset Kafka Connector.
confluent-hub install jcustenborder/kafka-connect-twitter:0.3.33
confluent-hub install rockset/kafka-connect-rockset:1.2.1
# restart Kafka if you are running locally
confluent local stop
confluent local start
Now navigate to the Kafka UI, select your cluster, and create a topic called "rockset-kafka". Then add the Twitter connector to your cluster, and point them both to the topic you just made. You will need to put in your twitter-dev account information for the Twitter Connector. Subscribe to the following topics to see football related results.
49ers Broncos packers Raiders Giants Redskins MiamiDolphins Buccaneers Seahawks Nyjets Ravens Bengals Titans HoustanTexans Dallascowboys Browns Colts AtlantaFalcons Vikings Lions Patriots RamsNFL steelers Jaguars BuffaloBills Chiefs Saints AZCardinals Panthers Eagles ChicagoBears Chargers NFL SportsNews American football Draft ESPN
Click create. You should now see many tweets flowing through the topic "rockset-kafka".
Troubleshooting: if you have issues, double check your credentials. You can also check the Kafka logs.
# Locally
confluent local log connect
Step 1: Load the Kafka Stream into Rockset
Setting up the Rockset Connector
- If you haven’t already, install the Rockset Kafka Connector on your Kafka cluster, and point it to “rockset-kafka”
- Log in to Rockset Console and navigate to the Create Kafka Integration at Manage > Integrations > Kafka.
- Name your integration, set the content type to AVRO, and add a topic called "rockset-kafka".
- Follow the Kafka connector installation instructions on the Integration Details page that you land on when you finish creating the integration.
- There are further instructions at https://docs.rockset.com/apache-kafka/.
Optional: Local Kafka If you are using a local standalone Kafka, skip to Step 3 of the instructions on the installation page. Set the Schema Registry Instance to be http://localhost:8081 (which is the default port of the schema-registry service) and click "Download Rockset Sink Connector Properties" to get a properties file that should give you all of the variables you need to set up to use Rockset with Kafka.
Here is a sample properties file that should look similar to yours:
name=twitter-kafka
connector.class=rockset.RocksetSinkConnector
tasks.max=10
topics=rockset-kafka
rockset.task.threads=5
rockset.apiserver.url=https://api.rs2.usw2.rockset.com
rockset.integration.key=kafka://<secret>@api.rs2.usw2.rockset.com
format=AVRO
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
On the Kafka UI connect page, select your cluster and click Add Connector. Select the RocksetSinkConnector. Finally, fill in all of the properties from the connect-rockset-sink.properties file you downloaded from the Rockset Console.
Once you are done, click Create. You are now ready to create a Kafka Collection in Rockset!
Creating a Kafka Collection in Rockset
Navigate to the Rockset Console and go to Collections > Create Collection > Kafka. Select the integration you created in the previous step and click Start.
Name your collection, and select the topic "rockset-kafka". Wait for a few moments and you should see documents coming into the preview. This means your connectors are configured correctly! Go ahead and set a retention policy and create your collection.
Congratulations! You have successfully created a Kafka collection in Rockset.
Step 2: Get a fast SQL Table.
Our driving question is: What are the most popular hashtags in tweets that mentioned the 49ers in the last 20 minutes?
Let's explore the data a little bit in the Rockset Console and develop our query.
If we look on the left side of the console, we see that the twitter data is a stream of huge json objects nested in complex ways. The fields we care about most to answer our driving question are:
- entities.user_mentions (an array of mentions for a particular tweet)
- entities.hashtags (an array of hashtags in a particular tweet)
We also care about the following two system fields
_id
(unique field for each tweet)_event_time
(time that a tweet was ingested into Rockset)
To answer our question, we want to find all tweets where at least one user_mention is @49ers. Then we need to aggregate over all hashtags to count how many of each one there is.
But how do we filter and aggregate over nested arrays? Enter the UNNEST
command. Unnest takes a single row in our table that contains an array (of length n) and multiplies it into n rows that each contain an element of the original array.
We thus want to unnest mentions and hashtags, and filter by mentions.
SELECT
h.hashtags.text hashtag
FROM
commons."twitter-kafka" t,
unnest(t.entities.hashtags hashtags) h,
unnest(t.entities.mentions mentions) mt
WHERE
mt.mentions.screen_name = '49ers'
LIMIT 10;
This returns all of the hashtags in tweets mentioning the 49ers. Let’s aggregate over the hashtags, calculate a count, then sort by the count.
SELECT
h.hashtags.text hashtag,
count(t._id) "count"
FROM
commons."twitter-kinesis" t,
unnest(t.entities.user_mentions mentions) mt,
unnest(t.entities.hashtags hashtags) h
WHERE
mt.mentions.screen_name = '49ers'
GROUP BY
h.hashtags.text
ORDER BY
"count" DESC;
Finally we filter to only include tweets in the last 20 minutes. Furthermore, we replace '49ers' with a Rockset Query Parameter. This will enable us to use different query parameters from the Rockset REST API.
SELECT
h.hashtags.text hashtag,
count(t._id) "count",
UNIX_MILLIS(MAX(t._event_time)) time
FROM
commons."twitter-kinesis" t,
unnest(t.entities.user_mentions mentions) mt,
unnest(t.entities.hashtags hashtags) h
WHERE
t."_event_time" > CURRENT_TIMESTAMP() - INTERVAL 20 minute
and mt.mentions.screen_name = :team
GROUP BY
h.hashtags.text
ORDER BY
"count" DESC;
Go 49ers!
Step 3: Power your API
We can already execute queries over HTTP with Rockset’s REST API. The problem is that using the REST API requires you to pass a secret API key. If we are creating a public dashboard, we don’t want to reveal our API key. We also don’t want to expose our Rockset account to a DOS attack, and control account expenses.
The solution is to use an AWS Lambda to hide the API key and to set a Reserve Concurrency to limit the amount of compute we have to use. In the following section, we will go through the process of writing a Node.js lambda using the Rockset Node.js API.
Writing a lambda
Writing the lambda is quick and dirty using the rockset-node-client v2. We just run the query on the query route and pass in the query string parameters. This should execute the desired query with the desired parameters. Rockset automatically handles escaping parameters for us, so we don’t need to worry about that.
- Make sure you have nodejs 12.x and npm installed
- In an empty directory, run
npm i rockset
- Paste the lambda code below into
index.js
in the root of the directory
const rocksetConfigure = require("rockset").default;
const APIKEY = process.env.ROCKSET_APIKEY;
const API_ENDPOINT = "https://api.rs2.usw2.rockset.com";
const client = rocksetConfigure(APIKEY, API_ENDPOINT);
const wrapResult = res => ({
statusCode: 200,
headers: {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Credentials": true
},
body: JSON.stringify(res),
isBase64Encoded: false
});
const wrapErr = mes => ({
statusCode: 400,
errorMessage: JSON.stringify(mes),
isBase64Encoded: false
});
const query = `
SELECT
h.hashtags.text hashtag,
Count(t._id) "count",
UNIX_MILLIS(MAX(t._event_time)) time
FROM
commons."twitter-kinesis" t,
unnest(t.entities.user_mentions mentions) mt,
unnest(t.entities.hashtags hashtags) h
WHERE
t."_event_time" > CURRENT_TIMESTAMP() - INTERVAL 20 minute
and mt.mentions.screen_name = :team
GROUP BY
h.hashtags.text
ORDER BY
"count" DESC;
`;
exports.handler = async (event, context) =>
event.queryStringParameters && event.queryStringParameters.param
? client.queries
.query({
sql: {
query,
parameters: [
{
name: "team",
type: "string",
value: event.queryStringParameters.param
}
]
}
})
.then(wrapResult)
: wrapErr("Need parameters for query");
However, we need to package the rockset dependency along with the lambda code in order to deploy to aws. In order to do so, we zip our whole directory, including node_modules
, and use that to deploy.
Our final directory structure looks like:
├── index.js
├── node_modules
└── package-lock.json
Deploying our lambda to AWS
- First we will create a lambda. Go to your AWS console, go to the lambda tool.
- Click "Create Function", then click "From Scratch"
- Select a runtime of Node.js 12.x.
- Upload the zip we created in the previous step.
- Add an environment variable called ROCKSET_APIKEY, and set it to your API key
-
Execute the lambda with the following test json.
{ "queryStringParameters": { "param": "49ers" } }
-
You should see a green run with an output similar to the following.
{ "statusCode":200, "headers":{ "Access-Control-Allow-Origin":"*", "Access-Control-Allow-Credentials":true }, "body":"{\"results\":[{\"text\":\"to-mah-to\", ..." . . . }
- Finally, make sure to set a Reserve Concurrency to limit costs associated with the lambda.
Congratulations! If the test JSON returned the correct output, you set up your lambda correctly. We can now move on to configuring an API gateway to use this lambda.
Configure an API Gateway
Next we will configure an API Gateway to receive api requests and pass them to our lambda. There are several tutorials that describe this process in detail.
Once you have set up your api, you can send a request directly from your browser, or use codepen. Just modify the lambda URL to be your lambda and you should see some result tweets!
Conclusion
In this tutorial, we created a SQL API on Kafka in 3 easy steps that let us slice the Twitter Streaming API in real time.
See the final result below!