Live Dashboards on Streaming Data - A Tutorial Using Amazon Kinesis and Rockset
December 20, 2018
We live in a world where diverse systems—social networks, monitoring, stock exchanges, websites, IoT devices—all continuously generate volumes of data in the form of events, captured in systems like Apache Kafka and Amazon Kinesis. One can perform a wide variety of analyses, like aggregations, filtering, or sampling, on these event streams, either at the record level or over sliding time windows. In this blog, I will show how Rockset can serve a live dashboard, which surfaces analytics on real-time Twitter data ingested into Rockset from a Kinesis stream.
Setting up a Kinesis Stream
The Python code snippet below shows how to create a Kinesis stream programmatically. This can also be achieved through the AWS Console or the AWS CLI.
import boto3
kinesis = boto3.client('kinesis') # requires AWS credentials to be present in env
kinesis.create_stream(StreamName='twitter-stream', ShardCount=5)
Writing Tweets to Kinesis
Here, I will be using the Tweepy module to fetch tweets through a streaming search API. This API allows me to specify a list of terms that I want to include in my search (e.g. “music”, “facebook”, “apple”). You need to have a Twitter developer account in order to get access to the Twitter Streaming API. Here, I have a StreamListener, which is registered to be notified on a tweet arrival. Upon receiving a tweet, it writes the tweet data to one of the 5 random shards of the Kinesis stream.
# twitter api credentials
access_token=...
access_token_secret=...
consumer_key=...
consumer_secret=...
class TweetListener(StreamListener):
def __init__(self, stream_name):
self.kinesis = boto3.client('kinesis')
self.stream_name = stream_name
def on_data(self, data):
record = {}
record['Data'] = data
record['PartitionKey'] = ''.join(random.choice(chars) for _ in range(size))
self.kinesis.put_records(Records=[record], StreamName=self.stream_name)
auth=OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
stream=Stream(auth, TweetListener("twitter-stream"))
search_terms=["music", "facebook", "apple"]
stream.filter(track=search_terms)
Connecting Kinesis to Rockset
The following snippet shows how to create a collection in Rockset, backed by a Kinesis stream. Note: You need to create an Integration (an object that represents your AWS credentials) and set up relevant permissions on the Kinesis stream, which allows Rockset to perform certain read operations on that stream.
from rockset import Client, Q, F
rs=Client(api_key=...)
aws_integration=rs.Integration.retrieve(...)
sources=[
rs.Source.kinesis(
stream_name="twitter-stream",
integration=aws_integration)]
twitter_kinesis_demo=rs.Collection.create("twitter-kinesis-demo", sources=sources)
Alternatively, collections can also be created from the Rockset console, as shown below.
Building the Live Dashboard
Now that I have a producer writing tweets to a Kinesis stream and a collection to ingest them into Rockset, I can build a dashboard on top of this collection. My dashboard has two views.
Tweets View
The first view displays analytics on all the tweets coming into Rockset and has 3 panels, each of which makes its own query to Rockset.
Live Tweets
The Live Tweets panel constantly refreshes to show the latest tweets appearing in the collection. A query is made at a fixed refresh interval to fetch tweets that were tweeted in the last minute. Here, I am selecting required fields to show on the feed and filtering out tweets older than a minute.
SELECT t.timestamp_ms,
t.created_at AS created_at,
t.text AS text,
t.user.screen_name AS screen_name
FROM "twitter-kinesis-demo" t
WHERE CAST(timestamp_ms AS INT) > UNIX_MILLIS(current_timestamp() - minutes(1))
ORDER BY timestamp_ms DESC
LIMIT 100;
Top Hashtags
The Top Hashtags panel shows trending hashtags, which were found in most number of tweets in the last hour, along with the associated tweet count. In this query, all hashtags appearing in the last one hour are filtered into a temporary relation latest_hashtags
. Using a WITH
clause, latest_hashtags
is used it the main query, where we group by all the hashtags
and order by tweet_count
to obtain the trending hashtags.
WITH lastest_hashtags AS
(SELECT lower(ht.text) AS hashtag
FROM "twitter-kinesis-demo" t,
unnest(t.extended_tweet.entities.hashtags) ht
WHERE CAST(t.timestamp_ms AS INT) > UNIX_MILLIS(current_timestamp() - hours(1)))
SELECT count(hashtag) AS tweet_count,
hashtag
FROM latest_hashtags
GROUP BY hashtag
ORDER BY tweet_count DESC
LIMIT 10;
Incoming Tweets
The last panel is a chart which shows the rate at which users are tweeting. We obtain data points for the number of incoming tweets every 2 seconds and plot them in a chart.
SELECT count(*)
FROM "twitter-kinesis-demo"
WHERE cast(timestamp_ms AS INT) > unix_millis(current_timestamp() - seconds(2));
Hashtags View
The second view displays analytics on tweets with a specific hashtag and also has 3 panels: Live Tweets, Related Hashtags, and Influencers. Each panel in the dashboard makes a query to Rockset. This is very similar to the first dashboard view but narrows the analytics to a selected hashtag of interest.
Influencers
As we have narrowed our analysis to a single hashtag, it would be interesting to see who the most influential users are around this topic. For this, we define influencers as users with the highest number of followers who are tweeting the hashtag of interest.
SELECT t.user.screen_name,
t.user.followers_count AS fc
FROM "twitter-kinesis-demo" t
WHERE 'music' IN
(SELECT hashtags.text
FROM unnest(t.entities.hashtags) hashtags)
GROUP BY (t.user.screen_name,
t.user.followers_count)
ORDER BY t.user.followers_count DESC
LIMIT 5;
Related Hashtags
This section is somewhat similar to the Top Hashtags panel we saw in the Tweets view of the dashboard. It shows the hashtags that co-occur most often along with our hashtag of interest.
SELECT hashtags.text as hashtag,
count(*) AS occurence_count
FROM "twitter-kinesis-demo" t,
unnest(t.entites.hashtags) hashtags
WHERE 'music' IN
(SELECT ht.text
FROM unnest(t.entities.hashtags) ht)
AND hashtags.text != 'music'
GROUP BY hashtags.text
ORDER BY occurence_count DESC
LIMIT 10;
Live Tweets
The Live Tweets panel is very similar to one we saw in the Tweets view of the dashboard. The only difference is a new filter is applied in order to select those tweets which contain our hashtag of interest. I already used this filter for the other two panels in the Hashtags view.
Where to Go from Here
While I created this example live dashboard to illustrate how real-time analytics could be performed on data from Kinesis streams, Rockset supports Kafka, as a streaming source, and standard visualization tools, like Tableau, Apache Superset, Redash, and Grafana, as well.
You can refer to the full source code for this example here, if you are interested in building on streaming data using Rockset and Kinesis. Happy building!