This page covers how to use a Kafka topic as a data source in Rockset. This involves:

  • Creating an Integration - Securely send data from your Kafka cluster on Confluent Platform or Open Source Apache Kafka into Rockset.
  • Creating a Collection - Continuously syncs your data from a Kafka topic into
    Rockset in real-time.

🚧

To connect to Confluent Platform or Apache Kafka, you will need to have a Kafka Connect installation.

How it works

You can use Kafka Connect to send data from one or more Kafka topics into Rockset collections. Kafka Connect is a separate component from your Kafka brokers. Typically, it runs on a separate set of nodes. Kafka connect is the recommended mechanism for reading and writing to/from Kafka topics in a reliable, fault tolerant and scalable manner.

A Kafka connect installation can be configured with different types of connector plugins:

  • Source connector plugin for writing data from an external data source into a Kafka topic
  • Sink connector plugin for writing from a Kafka topic into an external data source

Rockset has a sink connector plugin that can be installed into a Kafka connect cluster and can send JSON and Avro data from Kafka topics to Rockset.

How it works

Set up Kafka Connect

As described above, in order to connect Kafka topics to Rockset, you must have a Kafka Connect installation that is connected to your Kafka broker cluster.

  • If you are using Confluent Platform, it may already come with a Kafka Connect installation. In that case, you can skip to the section that describes installing the Rockset sink connector plugin below.
  • If you are using Apache Kafka, you must set up and operate your own Kafka Connect installation.

πŸ“˜

As you progress through this section, refer to the Kafka Configuration Reference to tune properties, as needed.

Prototype (Standalone Mode)

To quickly connect an existing Kafka cluster to Rockset, you can run a Kafka connect process in standalone mode. Note that this is good for quickly getting started but not recommended to run in production. You can run the following steps on your local machine, in docker, or a cloud VM instance.

  1. Download the latest version of Apache Kafka binary distribution and extract it to your local directory.
  2. Download the Rockset Kafka Connect plugin.

Finally, you can edit the file $KAFKA_HOME/config/connect-standalone.properties and modify the following properties.

bootstrap.servers=broker1,broker2,broker3
plugin.path=/path/to/kafka-connect-rockset-VERSION-jar-with-dependencies.jar

We are now ready to run the standalone Kafka Connect. You can generate configuration for forwarding specific topics from Rockset into Kafka by setting up a Kafka integration.

Once you have this configuration, you can create a file containing that configuration in $KAFKA_HOME/config/connect-rockset-sink.properties and run the Kafka connect standalone cluster as follows:

$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/connect-rockset-sink.properties

Production (Distributed Mode)

If you want to run Kafka connect in production, you must run it in distributed mode. This may need setting up separate cloud VM instances, or docker containers that all work together to ensure data is sent from Kafka topics into Rockset in a fault tolerant and scalable manner. You will need to separately download the Rockset Kafka Connect plugin into each node of the Kafka connect distributed cluster.

  • If you are using the official docker images, you can install the above connector into it as described here.
  • If you are using VMs and setting up Kafka Connect from the official binaries on them, you can follow the instructions here.

The configuration of the Rockset Kafka Connect plugin can be done using the REST endpoint. The cURL command to do this can be obtained at the end of setting up a Kafka integration.

Running Kafka with SSL

If you have configured your Kafka cluster to use SSL, then you have to specify additional properties in the connect-properties file as documented in the Kafka SSL configuration. Rockset is a sink connector that uses the Kafka source consumer, so you have to add the following properties to either connect-standalone.properties or connect-distributed.properties file. The sample values shown below are placeholders and you have to replace them with the appropriate ones from your Kafka cluster.

consumer.bootstrap.servers=kafka1:9093
consumer.security.protocol=SSL
consumer.ssl.truststore.location=/var/private/ssl/kafka.client.truststore.jks
consumer.ssl.truststore.password=test1234
consumer.ssl.keystore.location=/var/private/ssl/kafka.client.keystore.jks
consumer.ssl.keystore.password=test1234
consumer.ssl.key.password=test1234

Download the Rockset Kafka Connect plugin

Navigate to the release page of the plugin's GitHub repository. Download kafka-connect-rockset-VERSION-jar-with-dependencies.jar, where VERSION is the current latest version. Note that only version 1.1.0 or later is supported.

Create a Kafka Connect based integration

Create a new Kafka integration using the Rockset console by navigating to Integrations > Add Integration > Kafka. You can then name your integration and configure your data format and topics. The only currently supported data formats are JSON and Avro.

Integration details page

After saving the integration, you will see the configuration for the Rockset sink plugin that can be used to configure a Kafka Connect cluster in standalone or distributed mode.

Create a collection from a Kafka Connect based integration

To create a collection, you must pick an integration that has already been successfully set up. During collection creation, you may choose one or more Kafka topics that you want to sink into the collection.

You can create a collection from a Kafka source in the Collections tab of the Rockset Console.

Create Kafka Collection

πŸ’‘

These operations can also be performed using the Rockset client libraries, the Rockset API, or the Rockset CLI.

Configuration Reference

Important Properties

FieldDescription
nameSpecifies the name of the plugin currently running. We use the name of the integration for this, but that is not required.
connector.classUsed by the Kafka Connect framework to instantiate our plugin correctly.
task.maxThe maximum number of sink tasks that can be run concurrently. This is used by the Kafka Connect framework to tune the performance of our plugin.
topicsThe topics we want the sink plugin to consume documents from.
rockset.task.threadsThe number of threads the plugin runs per task.
rockset.apiserver.urlThe URL of the API server the plugin will write the documents to.
rockset.integration.keyUsed by our internal services to make sure your data is tied to the correct integration and thus lands in the correct collections.
formatSpecifies the format our plugin expects your data to conform to.
key.converterConverter class for key data from Kafka Connect. This controls the converter we use to deserialize data from Kafka. The converter is different depending on whether JSON or Avro is specified as the data format.
value.converterConverter class for value data from Kafka Connect. This controls the converter we use to deserialize data from Kafka. The converter is different depending on whether JSON or Avro is specified as the data format.
key.converter.schemas.enable (JSON only)Whether the JSON data in Kafka has a schema associated with it. This is false in our case, as Rockset does not require a fixed schema for your data.
value.converter.schemas.enable (JSON only)Whether the JSON data in Kafka has a schema associated with it. This is false in our case, as Rockset does not require a fixed schema for your data.
key.converter.schema.registry.url (Avro only)The schema registry instances that will be used to look up schemas for key data from Kafka Connect. For more information, see the Confluent documentation.
value.converter.schema.registry.url (Avro only)The schema registry instances that will be used to look up schemas for value data from Kafka Connect. For more information, see the Confluent documentation.

Starting Offset

The default starting offset is set to latest - meaning Kafka Connect starts reading from the end of the topic. To send earliest instead, consumer.auto.offset.reset needs to be updated on the source side. There are two ways to do this:

  1. Override at the Worker level which impacts all connectors on that worker. To do this, add the following to
    your connect-distributed.properties: consumer.auto.offset.reset=earliest

  2. Override at the Connector level (only for Apache Kafka 2.3 or later). To do this, first set
    connector.client.config.override.policy=All in the worker config. If using Docker, then set CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: 'All'. Once set, add the following in the consumer properties for the connector you want to change: "consumer.override.auto.offset.reset":"earliest"

🚧

For both starting offset options, you will need to restart the Kafka Connect process to pick up the configuration change(s).

Tuning Kafka Connect Properties

Below are some tips for tuning Kafka Connect and Rockset sink to help saturate batches going to Rockset:

  1. connector.client.config.override.policy=All
    This needs to be added to the Kafka Connect worker properties to allow overrides for each connector. You can narrow it down to consumer but it is generally safe to set to All. You can also just adjust the worker properties directly by using the consumer prefix if this is the only connector you are using. For more information on this parameter, click here.

  2. consumer.override.fetch.min.bytes=100000
    This goes in the Rockset Sink connector configuration. Try setting this 500,000 (500kB) and adjust up or down, as needed. For more information on this parameter, click here.

  3. consumer.override.fetch.max.wait.ms=500
    This goes in the Rockset Sink connector configuration. While 500 is the default, you may need to adjust this in order to saturate the minimum bytes needed for decent batching. Increasing this can also increase data latency, but will help with throughput. For more information on this parameter, click here.

  4. consumer.override.max.poll.records=1000
    Batches are capped at 500 documents, so we recommend increasing to 1,000 to start and potentially going up to 5,000, if needed. For more information on this parameter, click here.