• Loading Your Data
  • Data Sources
  • Kafka

Kafka

This page covers how to use a Kafka topic as a data source in Rockset. This involves:

  • Creating an integration - Securely send data from your Kafka cluster into Rockset.
  • Creating a collection - Continuously syncs your data from a Kafka topic into Rockset in real-time.

To set up your Kafka cluster, use one of the following processes:

#Confluent Cloud

The Confluent Cloud integration is managed entirely by Rockset and does not require any infrastructure setup on the Kafka side.

#Create a Confluent Cloud integration

Follow the steps below to create an integration between Rockset and your Confluent Cloud cluster:

  1. Identify your bootstrap server URL. You can find this on Confluent Cloud by navigating to Cluster settings > Identification:

Cluster Settings

  1. Identify your Confluent Cloud cluster’s API key and secret. You can create a new API key or use an existing one from the API keys tab under Data integration:

API Keys

Note: There are two places to create an API key, and one method is using the "Cloud API keys" under the right-navigation. Do not use it as it is for RBAC setup on the Confluent Cloud.

Cloud API keys

  1. Input these fields into the Kafka integration creation form and save your integration:

Add Integration

Note: There is a known issue with Confluent Cloud. Due to the delay of Confluent API key propagation, an authentication error may occur if you use it immediately after the creation:

Authentication Error

Try waiting at least 3-5 minutes before retrying to allow time for the new API key to be effective on the Confluent side. Reach out to Rockset if your retry is still failing due to authentication.

#Create a collection from a Confluent Cloud integration

Follow the steps below to create a collection:

  1. Navigate to the Collections tab of the Rockset Console to create a collection from a Kafka source.

  2. Add a source which maps to a single Kafka topic. The required inputs are:

  • Topic name
  • Starting offset
  1. Specify the Starting Offset. This will determine where in the topic Rockset will begin ingesting data from. This can be either the earliest offset or the latest offset of the topic.
  • Earliest: The collection will contain all of the data starting from the beginning of the topic. Choose this option if you require all of the existing data in the Kafka topic.
  • Latest: The collection will contain data starting from the end of the topic, which means Rockset will only ingest records arriving after the collection is created. Choose this option to avoid extra data ingestion costs if earlier data is not necessary for your use case.

New Collection

After you create a collection backed by a Kafka topic, you should be able to see data flowing from your Kafka topic into the new collection. Rockset continuously ingests objects from your topic source and updates your collection automatically as new objects are added.

#Limitations

The first release of the Confluent Cloud Connector has several limitations which we wish to address in the next couple of months:

  • Only JSON format is supported. We are planning to add AVRO in the next release.
  • Does not support automatic topic partition number change. For example, if your topic increases the partitions from 10 to 20, then the data produced on the 10 new partitions will not be detected. Contact Support if you need to increase your topic partition count.

Please reach out to us if the above limitations are blocking for your use case. For now, you can still use the integration with the same Kafka Connect setup used for Confluent Platform and Open Source Apache Kafka.

#Confluent Platform and Apache Kafka

To connect to Confluent Platform or Apache Kafka, you will need to have a Kafka Connect installation.

#How it works

You can use Kafka Connect to send data from one or more Kafka topics into Rockset collections. Kafka Connect is a separate component from your Kafka brokers. Typically, it runs on a separate set of nodes. Kafka connect is the recommended mechanism for reading and writing to/from Kafka topics in a reliable, fault tolerant and scalable manner.

A Kafka connect installation can be configured with different type of connector plugins:

  • Source connector plugin for writing data from an external data source into a Kafka topic
  • Sink connector plugin for writing from a Kafka topic into an external data source

Rockset has a sink connector plugin that can be installed into a Kafka connect cluster and can send JSON and Avro data from Kafka topics to Rockset.

How it works

#Set up Kafka Connect

As described above, in order to connect Kafka topics to Rockset, you must have a Kafka Connect installation that is connected to your Kafka broker cluster.

  • If you are using Confluent Platform, it may already come with a Kafka Connect installation. In that case, you can skip to the section that describes installing the Rockset sink connector plugin below.
  • If you are using Apache Kafka, you must set up and operate your own Kafka Connect installation.

#Prototype (Standalone Mode)

To quickly connect an existing Kafka cluster to Rockset, you can run a Kafka connect process in standalone mode. Note that this is good for quickly getting started but not recommended to run in production. You can run the following steps on your local machine, in docker, or a cloud VM instance.

  1. Download the latest version of Apache Kafka binary distribution and extract it to your local directory.
  2. Download the Rockset Kafka Connect plugin.

Finally, you can edit the file $KAFKA_HOME/config/connect-standalone.properties and modify the following properties.

bootstrap.servers=broker1,broker2,broker3
plugin.path=/path/to/kafka-connect-rockset-VERSION-jar-with-dependencies.jar

We are now ready to run the standalone Kafka Connect. You can generate configuration for forwarding specific topics from Rockset into Kafka by setting up a Kafka integration.

Once you have this configuration, you can create a file containing that configuration in $KAFKA_HOME/config/connect-rockset-sink.properties and run the Kafka connect standalone cluster as follows:

$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-rockset-sink.properties

#Production (Distributed Mode)

If you want to run Kafka connect in production, you must run it in distributed mode. This may need setting up separate cloud VM instances, or docker containers that all work together to ensure data is sent from Kafka topics into Rockset in a fault tolerant and scalable manner. You will need to separately download the Rockset Kafka Connect plugin into each node of the Kafka connect distributed cluster.

  • If you are using the official docker images, you can install the above connector into it as described here.
  • If you are using VMs and setting up Kafka Connect from the official binaries on them, you can follow the instructions here.

The configuration of the Rockset Kafka Connect plugin can be done using the REST endpoint. The cURL command to do this can be obtained at the end of setting up a Kafka integration

#Running Kafka with SSL

If you have configured your Kafka cluster to use SSL, then you have to specify additional properties in the connect-properties file as documented in the Kafka SSL configuration. Rockset is a sink connector that uses the Kafka source consumer, so you have to add the following properties to either connect-standalone.properties or connect-distributed.properties file. The sample values shown below are placeholders and you have to replace them with the appropriate ones from your Kafka cluster.

consumer.bootstrap.servers=kafka1:9093
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
consumer.ssl.truststore.password=test1234
consumer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
consumer.ssl.keystore.password=test1234
consumer.ssl.key.password=test1234

#Download the Rockset Kafka Connect plugin

Navigate to the release page of the plugin's GitHub repository. Download kafka-connect-rockset-VERSION-jar-with-dependencies.jar, where VERSION is the current latest version. Note that only version 1.1.0 or later is supported.

#Create a Kafka Connect based integration

Create a new Kafka integration using the Rockset console by navigating to Integrations > Add Integration > Apache Kafka. You can then name your integration and configure your data format and topics. The only currently supported data formats are JSON and Avro.

Integration details page

After saving the integration, you will see the configuration for the Rockset sink plugin that can be used to configure a Kafka Connect cluster in standalone or distributed mode.

#Create a collection from a Kafka Connect based integration

To create a collection, you must pick an integration that has already been successfully set up. During collection creation, you may choose one or more Kafka topics that you want to sink into the collection.

You can create a collection from a Kafka source in the Collections tab of the Rockset Console.

Create Kafka Collection

Note: These operations can also be performed using any of the following:

#Configuration Reference

FieldDescription
nameSpecifies the name of the plugin currently running. We use the name of the integration for this, but that is not required.
connector.classUsed by the Kafka Connect framework to instantiate our plugin correctly.
task.maxThe maximum number of sink tasks that can be run concurrently. This is used by the Kafka Connect framework to tune the performance of our plugin.
topicsThe topics we want the sink plugin to consume documents from.
rockset.task.threadsThe number of threads the plugin runs per task.
rockset.apiserver.urlThe URL of the API server the plugin will write the documents to.
rockset.integration.keyUsed by our internal services to make sure your data is tied to the correct integration and thus lands in the correct collections.
formatSpecifies the format our plugin expects your data to conform to.
key.converterConverter class for key data from Kafka Connect. This controls the converter we use to deserialize data from Kafka. The converter is different depending on whether JSON or Avro is specified as the data format.
value.converterConverter class for value data from Kafka Connect. This controls the converter we use to deserialize data from Kafka. The converter is different depending on whether JSON or Avro is specified as the data format.
key.converter.schemas.enable (JSON only)Whether the JSON data in Kafka has a schema associated with it. This is false in our case, as Rockset does not require a fixed schema for your data.
value.converter.schemas.enable (JSON only)Whether the JSON data in Kafka has a schema associated with it. This is false in our case, as Rockset does not require a fixed schema for your data.
key.converter.schema.registry.url (Avro only)The schema registry instances that will be used to look up schemas for key data from Kafka Connect. For more information, see the Confluent documentation.
value.converter.schema.registry.url (Avro only)The schema registry instances that will be used to look up schemas for value data from Kafka Connect. For more information, see the Confluent documentation.