Building a Live Dashboard on Streaming Data Using Amazon Kinesis and Rockset

We live in a world where diverse systems—social networks, monitoring, stock exchanges, websites, IoT devices—all continuously generate volumes of data in the form of events. One can perform a wide variety of analyses, like aggregations, filtering, or sampling, on these event streams, either at the record level or over sliding time windows. In this blog, I will show how Rockset can serve a live dashboard, which surfaces analytics on real-time Twitter data ingested into Rockset from an Amazon Kinesis stream.

Setting up a Kinesis Stream

The Python code snippet below shows how to create a Kinesis stream programmatically. This can also be achieved through the AWS Console or the AWS CLI.

import boto3
kinesis = boto3.client('kinesis') # requires AWS credentials to be present in env
kinesis.create_stream(StreamName='twitter-stream', ShardCount=5)

Writing Tweets to Kinesis

Here, I will be using the Tweepy module to fetch tweets through a streaming search API. This API allows me to specify a list of terms that I want to include in my search (e.g. “music”, “facebook”, “apple”). You need to have a Twitter developer account in order to get access to the Twitter Streaming API. Here, I have a StreamListener, which is registered to be notified on a tweet arrival. Upon receiving a tweet, it writes the tweet data to one of the 5 random shards of the Kinesis stream.

# twitter api credentials
access_token=...
access_token_secret=...
consumer_key=...
consumer_secret=...

class TweetListener(StreamListener):
    def __init__(self, stream_name):
        self.kinesis = boto3.client('kinesis')
        self.stream_name = stream_name

    def on_data(self, data):
        record = {}
        record['Data'] = data
        record['PartitionKey'] = ''.join(random.choice(chars) for _ in range(size))
        self.kinesis.put_records(Records=[record], StreamName=self.stream_name)

auth=OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)

stream=Stream(auth, TweetListener("twitter-stream"))
search_terms=["music", "facebook", "apple"]
stream.filter(track=search_terms)

Connecting Kinesis to Rockset

The following snippet shows how to create a collection in Rockset, backed by a Kinesis stream. Note: You need to create an Integration (an object that represents your AWS credentials) and set up relevant permissions on the Kinesis stream, which allows Rockset to perform certain read operations on that stream.

from rockset import Client, Q, F
rs=Client(api_key=...)

aws_integration=rs.Integration.retrieve(...)
sources=[
    rs.Source.kinesis(
        stream_name="twitter-stream",
        integration=aws_integration)]
twitter_kinesis_demo=rs.Collection.create("twitter-kinesis-demo", sources=sources)

Alternatively, collections can also be created from the Rockset console, as shown below.

console kinesis

Building the Live Dashboard

Now that I have a producer writing tweets to a Kinesis stream and a collection to ingest them into Rockset, I can build a dashboard on top of this collection. My dashboard has two views.

Tweets View

The first view displays analytics on all the tweets coming into Rockset and has 3 panels, each of which makes its own query to Rockset.

dashboard1

Live Tweets

The Live Tweets panel constantly refreshes to show the latest tweets appearing in the collection. A query is made at a fixed refresh interval to fetch tweets that were tweeted in the last minute. Here, I am selecting required fields to show on the feed and filtering out tweets older than a minute.

SELECT t.timestamp_ms,
   t.created_at AS created_at,
   t.text AS text,
   t.user.screen_name AS screen_name
FROM "twitter-kinesis-demo" t
WHERE CAST(timestamp_ms AS INT) > UNIX_MILLIS(current_timestamp() - minutes(1))
ORDER BY timestamp_ms DESC
LIMIT 100;

Top Hashtags

The Top Hashtags panel shows trending hashtags, which were found in most number of tweets in the last hour, along with the associated tweet count. In this query, all hashtags appearing in the last one hour are filtered into a temporary relation latest_hashtags. Using a WITH clause, latest_hashtags is used it the main query, where we group by all the hashtags and order by tweet_count to obtain the trending hashtags.

WITH lastest_hashtags AS
  (SELECT lower(ht.text) AS hashtag
   FROM "twitter-kinesis-demo" t,
        unnest(t.extended_tweet.entities.hashtags) ht
   WHERE CAST(t.timestamp_ms AS INT) > UNIX_MILLIS(current_timestamp() - hours(1)))
SELECT count(hashtag) AS tweet_count,
       hashtag
FROM latest_hashtags
GROUP BY hashtag
ORDER BY tweet_count DESC
LIMIT 10;

Incoming Tweets

The last panel is a chart which shows the rate at which users are tweeting. We obtain data points for the number of incoming tweets every 2 seconds and plot them in a chart.

SELECT count(*)
FROM "twitter-kinesis-demo"
WHERE cast(timestamp_ms AS INT) > unix_millis(current_timestamp() - seconds(2);

Hashtags View

The second view displays analytics on tweets with a specific hashtag and also has 3 panels: Live Tweets, Related Hashtags, and Influencers. Each panel in the dashboard makes a query to Rockset. This is very similar to the first dashboard view but narrows the analytics to a selected hashtag of interest.

dashboard2

Influencers

As we have narrowed our analysis to a single hashtag, it would be interesting to see who the most influential users are around this topic. For this, we define influencers as users with the highest number of followers who are tweeting the hashtag of interest.

SELECT t.user.screen_name,
       t.user.followers_count AS fc
FROM "twitter-kinesis-demo" t
WHERE 'music' IN
    (SELECT hashtags.text
     FROM unnest(t.entities.hashtags) hashtags)
GROUP BY (t.user.screen_name,
          t.user.followers_count)
ORDER BY t.user.followers_count DESC
LIMIT 5;

Related Hashtags

This section is somewhat similar to the Top Hashtags panel we saw in the Tweets view of the dashboard. It shows the hashtags that co-occur most often along with our hashtag of interest.

SELECT hashtags.text as hashtag,
     count(*) AS occurence_count
FROM "twitter-kinesis-demo" t,
    unnest(t.entites.hashtags) hashtags
WHERE 'music' IN
    (SELECT ht.text
     FROM unnest(t.entities.hashtags) ht)
  AND hashtags.text != 'music'
GROUP BY hashtags.text
ORDER BY occurence_count DESC
LIMIT 10;

Live Tweets

The Live Tweets panel is very similar to one we saw in the Tweets view of the dashboard. The only difference is a new filter is applied in order to select those tweets which contain our hashtag of interest. I already used this filter for the other two panels in the Hashtags view.

Where to Go from Here

While I created this simple dashboard to illustrate how live analytics could be performed on data from Kinesis streams, Rockset supports millisecond-latency SQL that powers more complex, responsive dashboards as well.

You can refer to the full source code for this example here, if you are interested in building on streaming data using Rockset and Kinesis. Happy building!

Real-time SQL on raw data

Related Posts

Comparing Approaches to Real-Time Analytics on Amazon DynamoDB

Compare and contrast various approaches to analytics on DynamoDB. Learn how best to build real-time apps, create live dashboards, and run fast SQL queries on DynamoDB.

Fynd - How Does a Growing E-Commerce Portal Respond to Consumer Behavior in Real Time?

Fynd uses Rockset to perform fast queries on real-time event streams, so they can react to consumer behavior as it happens.

How to Build a Facebook Messenger Chatbot Powered by Fast SQL on CSV

Build a chatbot that provides instant responses, leveraging fast SQL queries on CSV data.