Live Dashboards on Streaming Data - A Tutorial Using Amazon Kinesis and Rockset

We live in a world where diverse systems—social networks, monitoring, stock exchanges, websites, IoT devices—all continuously generate volumes of data in the form of events, captured in systems like Apache Kafka and Amazon Kinesis. One can perform a wide variety of analyses, like aggregations, filtering, or sampling, on these event streams, either at the record level or over sliding time windows. In this blog, I will show how Rockset can serve a live dashboard, which surfaces analytics on real-time Twitter data ingested into Rockset from a Kinesis stream.

Setting up a Kinesis Stream

The Python code snippet below shows how to create a Kinesis stream programmatically. This can also be achieved through the AWS Console or the AWS CLI.

import boto3
kinesis = boto3.client('kinesis') # requires AWS credentials to be present in env
kinesis.create_stream(StreamName='twitter-stream', ShardCount=5)

Writing Tweets to Kinesis

Here, I will be using the Tweepy module to fetch tweets through a streaming search API. This API allows me to specify a list of terms that I want to include in my search (e.g. “music”, “facebook”, “apple”). You need to have a Twitter developer account in order to get access to the Twitter Streaming API. Here, I have a StreamListener, which is registered to be notified on a tweet arrival. Upon receiving a tweet, it writes the tweet data to one of the 5 random shards of the Kinesis stream.

# twitter api credentials
access_token=...
access_token_secret=...
consumer_key=...
consumer_secret=...

class TweetListener(StreamListener):
    def __init__(self, stream_name):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name

    def on_data(self, data):
        record = {}
        record['Data'] = data
        record['PartitionKey'] = ''.join(random.choice(chars) for _ in range(size))
        self.kinesis.put_records(Records=[record], StreamName=self.stream_name)

auth=OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

stream=Stream(auth, TweetListener("twitter-stream"))
search_terms=["music", "facebook", "apple"]
stream.filter(track=search_terms)

Connecting Kinesis to Rockset

The following snippet shows how to create a collection in Rockset, backed by a Kinesis stream. Note: You need to create an Integration (an object that represents your AWS credentials) and set up relevant permissions on the Kinesis stream, which allows Rockset to perform certain read operations on that stream.

from rockset import Client, Q, F
rs=Client(api_key=...)

aws_integration=rs.Integration.retrieve(...)
sources=[
    rs.Source.kinesis(
        stream_name="twitter-stream",
        integration=aws_integration)]
twitter_kinesis_demo=rs.Collection.create("twitter-kinesis-demo", sources=sources)

Alternatively, collections can also be created from the Rockset console, as shown below.

console kinesis

Building the Live Dashboard

Now that I have a producer writing tweets to a Kinesis stream and a collection to ingest them into Rockset, I can build a dashboard on top of this collection. My dashboard has two views.

Tweets View

The first view displays analytics on all the tweets coming into Rockset and has 3 panels, each of which makes its own query to Rockset.

live dashboard 1

Live Tweets

The Live Tweets panel constantly refreshes to show the latest tweets appearing in the collection. A query is made at a fixed refresh interval to fetch tweets that were tweeted in the last minute. Here, I am selecting required fields to show on the feed and filtering out tweets older than a minute.

SELECT t.timestamp_ms,
   t.created_at AS created_at,
   t.text AS text,
   t.user.screen_name AS screen_name
FROM "twitter-kinesis-demo" t
WHERE CAST(timestamp_ms AS INT) > UNIX_MILLIS(current_timestamp() - minutes(1))
ORDER BY timestamp_ms DESC
LIMIT 100;

Top Hashtags

The Top Hashtags panel shows trending hashtags, which were found in most number of tweets in the last hour, along with the associated tweet count. In this query, all hashtags appearing in the last one hour are filtered into a temporary relation latest_hashtags. Using a WITH clause, latest_hashtags is used it the main query, where we group by all the hashtags and order by tweet_count to obtain the trending hashtags.

WITH lastest_hashtags AS
  (SELECT lower(ht.text) AS hashtag
   FROM "twitter-kinesis-demo" t,
        unnest(t.extended_tweet.entities.hashtags) ht
   WHERE CAST(t.timestamp_ms AS INT) > UNIX_MILLIS(current_timestamp() - hours(1)))
SELECT count(hashtag) AS tweet_count,
       hashtag
FROM latest_hashtags
GROUP BY hashtag
ORDER BY tweet_count DESC
LIMIT 10;

Incoming Tweets

The last panel is a chart which shows the rate at which users are tweeting. We obtain data points for the number of incoming tweets every 2 seconds and plot them in a chart.

SELECT count(*)
FROM "twitter-kinesis-demo"
WHERE cast(timestamp_ms AS INT) > unix_millis(current_timestamp() - seconds(2));

Hashtags View

The second view displays analytics on tweets with a specific hashtag and also has 3 panels: Live Tweets, Related Hashtags, and Influencers. Each panel in the dashboard makes a query to Rockset. This is very similar to the first dashboard view but narrows the analytics to a selected hashtag of interest.

live dashboard 2

Influencers

As we have narrowed our analysis to a single hashtag, it would be interesting to see who the most influential users are around this topic. For this, we define influencers as users with the highest number of followers who are tweeting the hashtag of interest.

SELECT t.user.screen_name,
       t.user.followers_count AS fc
FROM "twitter-kinesis-demo" t
WHERE 'music' IN
    (SELECT hashtags.text
     FROM unnest(t.entities.hashtags) hashtags)
GROUP BY (t.user.screen_name,
          t.user.followers_count)
ORDER BY t.user.followers_count DESC
LIMIT 5;

Related Hashtags

This section is somewhat similar to the Top Hashtags panel we saw in the Tweets view of the dashboard. It shows the hashtags that co-occur most often along with our hashtag of interest.

SELECT hashtags.text as hashtag,
     count(*) AS occurence_count
FROM "twitter-kinesis-demo" t,
    unnest(t.entites.hashtags) hashtags
WHERE 'music' IN
    (SELECT ht.text
     FROM unnest(t.entities.hashtags) ht)
  AND hashtags.text != 'music'
GROUP BY hashtags.text
ORDER BY occurence_count DESC
LIMIT 10;

Live Tweets

The Live Tweets panel is very similar to one we saw in the Tweets view of the dashboard. The only difference is a new filter is applied in order to select those tweets which contain our hashtag of interest. I already used this filter for the other two panels in the Hashtags view.

Where to Go from Here

While I created this example live dashboard to illustrate how real-time analytics could be performed on data from Kinesis streams, Rockset supports Kafka, as a streaming source, and standard visualization tools, like Tableau, Apache Superset, Redash, and Grafana, as well.

You can refer to the full source code for this example here, if you are interested in building on streaming data using Rockset and Kinesis. Happy building!

Real-time SQL on raw data

Related Posts

How We Reduced DynamoDB Costs by Using DynamoDB Streams and Scans More Efficiently

Get an inside look at the some of the techniques we used to reduce the cost of ingesting data from DynamoDB.

The Kafka Connect Plugin for Rockset and How It Works

Get an in-depth look at the Kafka Connect Plugin for Rockset and the process to get it listed in Confluent Hub.

Data-Driven Decisions for Where to Park in SF

We built an app to estimate the risk of a car break-in based on historical incidents.