Kafka Sink Connector (Java)
The Batch.sh Kafka Sink Connector lets you stream your events directly to batch with minimal configuration. Use this guide to help you get started.
The Kafka sink connector funnels records sent over specified topics to the batch.sh collector service.
Batch offers the ability to tee any events produced within a Kafka cluster up to a remote collector that can optionally analyze those events and map them to a configured schema. These events can then be queried and replayed at target destinations.
Installation
Prerequisites
Kafka Versions
Internally tested with Kafka 2.2.0 and 2.2.1 (on AWS MSK)
newer versions of Kafka up through 2.5.x should work, older versions are not tested
Built for use with Kafka Connect API 2.5.x
should be compatible with Connect API 2.0.x - 2.5.0
Getting the Kafka Sink Connector
Confluent
Download the connector ZIP archive from:
the official releases
Follow confluent-platform's guide on installing custom connectors using that file
Open Source Apache Kafka
Download the connector JAR file from the official releases
Copy the connector JAR file to the
plugin.path
on your Connect worker nodes; this path is specified in yourworker properties (see: https://docs.confluent.io/current/connect/userguide.html#installing-kconnect-plugins)
Configuring the Kafka Sink Connector
Kafka Connect offers two modes of operation: distributed and standalone. These modes each have their own configuration values that can be passed along with the specific values needed by the connected. See: https://docs.confluent.io/current/connect/userguide.html#running-workers
Distributed Mode
Distributed mode is configured through the connect REST API.
Example:
Standalone Mode
Standalone mode is configured through a properties file that is passed in as an argument to the kafka connect CLI.
Example:
Create this file somewhere on your host, for example to /config/batchsh-connector.properties
Then use the Kafka connect CLI tools to start the connector:
bin/connect-standalone /path/to/worker.properties /config/batchsh-connector.properties
Confluent
Within Confluent there is a menu-driven setup that will show all the possible configuration values that can be passed to the connector. The keys and values would not change.
Configuration Values
Our goal with our connectors is to get you up and running as swiftly as possible. The following configuration values are the ones which are strictly necessary to start the batch.sh connector. There are other parameters that can be adjusted for the Kafka Connect cluster or the workers themselves which can be found on the Connect documention.
name
Unique name for the connector. Attempting to register again with the same name will fail.
connector.class
sh.batch.kafka.BatchSinkConnector
tasks.max
Maximum number of tasks to use for this connector. Tasks act as consumer threads for Connect workers so tune this setting to be in line with a typical consumer group for the configured topic(s).
batch.token
The ID of the batch.sh collection that will be the destination for the events. The collection ID allows the collector service to map the data to a known schema, model it, and store it.
Do not share this UUID around. It is used to uniquely identify an individual collection and should be treated like a secret. All production uses of this connector should use secure transport whenever possible.
topics
A comma-separated list of topics to use as input for this connector.
topics.regex
Regular expression giving topics to consume. Under the hood, the regex is compiled to a java.util.regex.Pattern. Only one of topics or topics.regex should be specified.
topics
or topics.regex
may be specified, but not both!
key.converter AND value.converter
org.apache.kafka.connect.converters.ByteArrayConverter
The batch.sh sink connector was designed from the start as a pass-through connector. If the kafka-connect runtime tries to do its own data conversion then the resulting byte array sent to the collector is no longer the untainted data sent originally by the producer. No matter what data type your records are (Avro, Protobuf, etc) ALWAYS choose the ByteArray converter and let the collector resolve the data with its own schemas.
Error Handling
The Kafka Sink Connector attempts to retry errors indefinitely. It prioritizes messages being delivered so if for some reason records cannot be processed then by default it will block send records, retrying about once a minute until records can be sent again. Due to the configurable nature of Kafka Connect, it is possible to tune some of this behavior by introducing dead letter queues or modifying the other error parameters but our recommendation is to leave these unset. Given the connector is a pass-through for any raw data the only time it is expected to see processing errors under normal operation is a remote collector outage. In that scenario, letting the connector retry until the collector service is back online is recommended.
Logs
If your configured connector is unhealthy or is not sending data properly, the first step would be to check the Kafka Connect logs. The location of these logs will vary by setup, but for our test environments the Connect hosts have their own instances.
Here is an example of some logs printed by our example connector in Confluent:
Our specific connector is identified in logs through its given name
.
Metrics
The Kafka Sink Connector sends a snapshot of metrics for the connector and its workers every 2 minutes. We do this in an effort to provide health and uptime stats on the batch.sh dashboard. The metrics reporter runs independently of the main connector class and does not need to necessarily be working properly in order for the connector to keep streaming events. Any log messages from the connector that deal with sending a "heartbeat" are referring to the payload of stats send by this reporter. Like the connector itself, the only variable in configuration with the metrics reporter/collector is the batch.token
. Errors such as the following indicate you may have an invalid batch.token
configured:
If you see this message, double check that your token is correct by verifying the collection through the batch.sh dashboard.
Last updated