• Loading Data
  • Adding a Data Source


This page covers how to use a Kafka topic as a data source in Rockset. This involves:

  • Creating an integration - Securely send data from your Kafka cluster into Rockset.
  • Creating a collection - Continuously syncs your data from a Kafka topic into Rockset in real-time.

To set up your Kafka cluster, use one of the following processes:

Confluent Cloud

The Confluent Cloud integration is managed entirely by Rockset and does not require any infrastructure setup on the Kafka side.

Create a Confluent Cloud integration

Follow the steps below to create an integration between Rockset and your Confluent Cloud cluster:

  1. Identify your bootstrap server URL. You can find this on Confluent Cloud by navigating to Cluster settings > Identification:

Cluster Settings

  1. Identify your Confluent Cloud cluster’s API key and secret. You can create a new API key or use an existing one from the API keys tab under Data integration:

API Keys

Note: There are two places to create an API key, and one method is using the "Cloud API keys" under the right-navigation. Do not use it as it is for RBAC setup on the Confluent Cloud.

Cloud API keys

  1. Input these fields into the Kafka integration creation form and save your integration:

Add Integration

Note: There is a known issue with Confluent Cloud. Due to the delay of Confluent API key propagation, an authentication error may occur if you use it immediately after the creation:

Authentication Error

Try waiting at least 3-5 minutes before retrying to allow time for the new API key to be effective on the Confluent side. Reach out to Rockset if your retry is still failing due to authentication.

Optional: If you expect to have topics that use Avro data format ingested into a Rockset collection, the integration must also be configured with schema registry settings. Avro schemas are stored on a schema registry server, and access to the schema registry is required for Rockset to process Avro messages from a Kafka topic. Currently, using schema registry is only supported for Avro data format. A Kafka integration configured with schema registry can be used to ingest from topics serving either JSON or Avro format messages (mixing JSON and Avro messages within the same topic is not a supported configuration). A Kafka integration without schema registry configured can only be used to ingest from topics serving JSON format messages.

To configure schema registry with Confluent Cloud:

Schema Registry 1 Schema Registry 2

Use the schema registry URL, key and secret from above while creating Kafka integration on Rockset.

Create a collection from a Confluent Cloud integration

Follow the steps below to create a collection:

  1. Navigate to the Collections tab of the Rockset Console to create a collection from a Kafka source.

  2. Add a source which maps to a single Kafka topic. The required inputs are:

  • Topic name
  • Starting offset
  1. Specify the Starting Offset. This will determine where in the topic Rockset will begin ingesting data from. This can be either the earliest offset or the latest offset of the topic.
  • Earliest: The collection will contain all of the data starting from the beginning of the topic. Choose this option if you require all of the existing data in the Kafka topic.
  • Latest: The collection will contain data starting from the end of the topic, which means Rockset will only ingest records arriving after the collection is created. Choose this option to avoid extra data ingestion costs if earlier data is not necessary for your use case.
  1. Specify the Format of messages in the Kafka topic. Currently, Rockset supports processing JSON and Avro message formats. For a Rockset collection to process Avro messages from a topic, the corresponding Kafka integration on Rockset must be configured with valid schema registry settings.

New Collection

After you create a collection backed by a Kafka topic, you should be able to see data flowing from your Kafka topic into the new collection. Rockset continuously ingests objects from your topic source and updates your collection automatically as new objects are added.


If ACLs are configured on the Kafka service account associated with the API key/secret pair that was used to create the integration related to the collection, make sure that proper permissions are set. Otherwise, you may see following permission issues at collection preview itself:

Topic Authentication Error Consumer Groups Error

For example, you can use confluent CLI to configure permissions as follows:

> confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --topic <topic>
> confluent kafka acl create --allow --service-account <service-account-id> --operation READ --topic <topic>
> confluent kafka acl create --allow --service-account <service-account-id> --operation DESCRIBE --prefix --consumer-group rockset
> confluent kafka acl create --allow --service-account <service-account-id> --operation READ --prefix --consumer-group rockset

Rockset uses consumer group ID prefixed with "rockset", so the above commands ensure appropriate permissions are set for such a consumer group ID pattern. Confirm permissions are set:

> confluent kafka acl list
    Principal    | Permission | Operation | ResourceType | ResourceName  | PatternType
  User:sa-abcdef | ALLOW      | READ      | TOPIC        | <topic>       | LITERAL
  User:sa-abcdef | ALLOW      | DESCRIBE  | TOPIC        | <topic>       | LITERAL
  User:sa-abcdef | ALLOW      | READ      | GROUP        | rockset       | PREFIXED
  User:sa-abcdef | ALLOW      | DESCRIBE  | GROUP        | rockset       | PREFIXED

Metadata Access

Rockset automatically stores the following Kafka message metadata fields for you:

  • key with type bytes
  • topic with type string
  • partition with type int
  • offset with type int
  • timestamp with type int
  • header, which is a nested structure including key-value pairs

They are all stored in the _meta.kafka field which could be accessed like below:

Kafka Metadata Collection

If you don't want to retain all the metadata fields, consider using the following sample SQL transformation to drop them, for example this transformation query will only keep the source timestamp within the _meta fields:

SELECT {'kafka': {'timestamp': _input._meta.kafka.timestamp}} AS _meta, * EXCEPT(_meta)
FROM _input

Kafka Metadata With Only Timestamp

Supporting Kafka Tombstone

Tombstone is a special type of record in Kafka which has a valid key and a null value. It is widely used for log compaction where the older message holding the same record key will be dropped from the topic. Rockset does support this behavior with a little additional work. To enable tombstone, you need to use SQL transformation to replace the content of _id, a special Rockset field for document identification, with Kafka message key.

For example, we have the following sample collection enabled with the following SQL transformation:

SELECT CAST(_input._meta.kafka.key AS string) AS _id, * EXCEPT(_id)
FROM _input

Kafka Tombstone Demo Before

When we produce an empty record with key 20 into the source topic, we could see it gets automatically dropped in Rockset:

Kafka Tombstone Demo After


Please reach out to us if the above limitation(s) are blocking for your use case. For now, you can still use the integration with the same Kafka Connect setup used for Confluent Platform and Open Source Apache Kafka.

Confluent Platform and Apache Kafka

To connect to Confluent Platform or Apache Kafka, you will need to have a Kafka Connect installation.

How it works

You can use Kafka Connect to send data from one or more Kafka topics into Rockset collections. Kafka Connect is a separate component from your Kafka brokers. Typically, it runs on a separate set of nodes. Kafka connect is the recommended mechanism for reading and writing to/from Kafka topics in a reliable, fault tolerant and scalable manner.

A Kafka connect installation can be configured with different type of connector plugins:

  • Source connector plugin for writing data from an external data source into a Kafka topic
  • Sink connector plugin for writing from a Kafka topic into an external data source

Rockset has a sink connector plugin that can be installed into a Kafka connect cluster and can send JSON and Avro data from Kafka topics to Rockset.

How it works

Set up Kafka Connect

As described above, in order to connect Kafka topics to Rockset, you must have a Kafka Connect installation that is connected to your Kafka broker cluster.

  • If you are using Confluent Platform, it may already come with a Kafka Connect installation. In that case, you can skip to the section that describes installing the Rockset sink connector plugin below.
  • If you are using Apache Kafka, you must set up and operate your own Kafka Connect installation.

Prototype (Standalone Mode)

To quickly connect an existing Kafka cluster to Rockset, you can run a Kafka connect process in standalone mode. Note that this is good for quickly getting started but not recommended to run in production. You can run the following steps on your local machine, in docker, or a cloud VM instance.

  1. Download the latest version of Apache Kafka binary distribution and extract it to your local directory.
  2. Download the Rockset Kafka Connect plugin.

Finally, you can edit the file $KAFKA_HOME/config/ and modify the following properties.


We are now ready to run the standalone Kafka Connect. You can generate configuration for forwarding specific topics from Rockset into Kafka by setting up a Kafka integration.

Once you have this configuration, you can create a file containing that configuration in $KAFKA_HOME/config/ and run the Kafka connect standalone cluster as follows:

$KAFKA_HOME/bin/ $KAFKA_HOME/config/ $KAFKA_HOME/config/

Production (Distributed Mode)

If you want to run Kafka connect in production, you must run it in distributed mode. This may need setting up separate cloud VM instances, or docker containers that all work together to ensure data is sent from Kafka topics into Rockset in a fault tolerant and scalable manner. You will need to separately download the Rockset Kafka Connect plugin into each node of the Kafka connect distributed cluster.

  • If you are using the official docker images, you can install the above connector into it as described here.
  • If you are using VMs and setting up Kafka Connect from the official binaries on them, you can follow the instructions here.

The configuration of the Rockset Kafka Connect plugin can be done using the REST endpoint. The cURL command to do this can be obtained at the end of setting up a Kafka integration

Running Kafka with SSL

If you have configured your Kafka cluster to use SSL, then you have to specify additional properties in the connect-properties file as documented in the Kafka SSL configuration. Rockset is a sink connector that uses the Kafka source consumer, so you have to add the following properties to either or file. The sample values shown below are placeholders and you have to replace them with the appropriate ones from your Kafka cluster.


Download the Rockset Kafka Connect plugin

Navigate to the release page of the plugin's GitHub repository. Download kafka-connect-rockset-VERSION-jar-with-dependencies.jar, where VERSION is the current latest version. Note that only version 1.1.0 or later is supported.

Create a Kafka Connect based integration

Create a new Kafka integration using the Rockset console by navigating to Integrations > Add Integration > Apache Kafka. You can then name your integration and configure your data format and topics. The only currently supported data formats are JSON and Avro.

Integration details page

After saving the integration, you will see the configuration for the Rockset sink plugin that can be used to configure a Kafka Connect cluster in standalone or distributed mode.

Create a collection from a Kafka Connect based integration

To create a collection, you must pick an integration that has already been successfully set up. During collection creation, you may choose one or more Kafka topics that you want to sink into the collection.

You can create a collection from a Kafka source in the Collections tab of the Rockset Console.

Create Kafka Collection

Note: These operations can also be performed using any of the following:

Configuration Reference

nameSpecifies the name of the plugin currently running. We use the name of the integration for this, but that is not required.
connector.classUsed by the Kafka Connect framework to instantiate our plugin correctly.
task.maxThe maximum number of sink tasks that can be run concurrently. This is used by the Kafka Connect framework to tune the performance of our plugin.
topicsThe topics we want the sink plugin to consume documents from.
rockset.task.threadsThe number of threads the plugin runs per task.
rockset.apiserver.urlThe URL of the API server the plugin will write the documents to.
rockset.integration.keyUsed by our internal services to make sure your data is tied to the correct integration and thus lands in the correct collections.
formatSpecifies the format our plugin expects your data to conform to.
key.converterConverter class for key data from Kafka Connect. This controls the converter we use to deserialize data from Kafka. The converter is different depending on whether JSON or Avro is specified as the data format.
value.converterConverter class for value data from Kafka Connect. This controls the converter we use to deserialize data from Kafka. The converter is different depending on whether JSON or Avro is specified as the data format.
key.converter.schemas.enable (JSON only)Whether the JSON data in Kafka has a schema associated with it. This is false in our case, as Rockset does not require a fixed schema for your data.
value.converter.schemas.enable (JSON only)Whether the JSON data in Kafka has a schema associated with it. This is false in our case, as Rockset does not require a fixed schema for your data.
key.converter.schema.registry.url (Avro only)The schema registry instances that will be used to look up schemas for key data from Kafka Connect. For more information, see the Confluent documentation.
value.converter.schema.registry.url (Avro only)The schema registry instances that will be used to look up schemas for value data from Kafka Connect. For more information, see the Confluent documentation.