Comment on page
Kafka Sink Connector (Java)
The Batch.sh Kafka Sink Connector lets you stream your events directly to batch with minimal configuration. Use this guide to help you get started.
The Kafka sink connector funnels records sent over specified topics to the batch.sh collector service.
Batch offers the ability to tee any events produced within a Kafka cluster up to a remote collector that can optionally analyze those events and map them to a configured schema. These events can then be queried and replayed at target destinations.
- Internally tested with Kafka 2.2.0 and 2.2.1 (on AWS MSK)
- newer versions of Kafka up through 2.5.x should work, older versions are not tested
- Built for use with Kafka Connect API 2.5.x
- should be compatible with Connect API 2.0.x - 2.5.0
- 1.Download the connector ZIP archive from:
- 2.Follow confluent-platform's guide on installing custom connectors using that file
- 1.
- 2.Copy the connector JAR file to the
plugin.path
on your Connect worker nodes; this path is specified in your
Kafka Connect offers two modes of operation: distributed and standalone. These modes each have their own configuration values that can be passed along with the specific values needed by the connected. See: https://docs.confluent.io/current/connect/userguide.html#running-workers
Distributed mode is configured through the connect REST API.
Example:
curl -X POST \
KAFKA_CONNECT_REST_API_HOST:8083/connectors \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
-d '{"name": "FooBarBatchSh", "config": {
"connector.class": "sh.batch.kafka.BatchSinkConnector",
"tasks.max": "5",
"topics": "topic1,topic2",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"batch.token": "2494b6fd-74dc-45a3-9c7c-fc0d81e9591c",
}}'
Standalone mode is configured through a properties file that is passed in as an argument to the kafka connect CLI.
Example:
Create this file somewhere on your host, for example to
/config/batchsh-connector.properties
name=FooBarBatchSh
connector.class=sh.batch.kafka.BatchSinkConnector
batch.token=2494b6fd-74dc-45a3-9c7c-fc0d81e9591c
tasks.max=5
topics=topic1,topic2
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
Then use the Kafka connect CLI tools to start the connector:
bin/connect-standalone /path/to/worker.properties /config/batchsh-connector.properties
Within Confluent there is a menu-driven setup that will show all the possible configuration values that can be passed to the connector. The keys and values would not change.

Navigate through the Control Center to the desired Kafka Connect cluster.

Select Add connector.

Select the BatchSinkConnector.

Fill in our sample configuration.

The batch.token is very important! It uniquely identifies your collection.

Confirm our settings.

Success!
Our goal with our connectors is to get you up and running as swiftly as possible. The following configuration values are the ones which are strictly necessary to start the batch.sh connector. There are other parameters that can be adjusted for the Kafka Connect cluster or the workers themselves which can be found on the Connect documention.
Unique name for the connector. Attempting to register again with the same name will fail.
sh.batch.kafka.BatchSinkConnector
Maximum number of tasks to use for this connector. Tasks act as consumer threads for Connect workers so tune this setting to be in line with a typical consumer group for the configured topic(s).
The ID of the batch.sh collection that will be the destination for the events. The collection ID allows the collector service to map the data to a known schema, model it, and store it.
Do not share this UUID around. It is used to uniquely identify an individual collection and should be treated like a secret. All production uses of this connector should use secure transport whenever possible.
A comma-separated list of topics to use as input for this connector.
topics.regex
Regular expression giving topics to consume. Under the hood, the regex is compiled to a java.util.regex.Pattern. Only one of topics or topics.regex should be specified.
topics
or topics.regex
may be specified, but not both!key.converter AND value.converter
org.apache.kafka.connect.converters.ByteArrayConverter
The batch.sh sink connector was designed from the start as a pass-through connector. If the kafka-connect runtime tries to do its own data conversion then the resulting byte array sent to the collector is no longer the untainted data sent originally by the producer. No matter what data type your records are (Avro, Protobuf, etc) ALWAYS choose the ByteArray converter and let the collector resolve the data with its own schemas.
The Kafka Sink Connector attempts to retry errors indefinitely. It prioritizes messages being delivered so if for some reason records cannot be processed then by default it will block send records, retrying about once a minute until records can be sent again. Due to the configurable nature of Kafka Connect, it is possible to tune some of this behavior by introducing dead letter queues or modifying the other error parameters but our recommendation is to leave these unset. Given the connector is a pass-through for any raw data the only time it is expected to see processing errors under normal operation is a remote collector outage. In that scenario, letting the connector retry until the collector service is back online is recommended.
If your configured connector is unhealthy or is not sending data properly, the first step would be to check the Kafka Connect logs. The location of these logs will vary by setup, but for our test environments the Connect hosts have their own instances.
Here is an example of some logs printed by our example connector in Confluent:
[2020-10-04 16:58:27,837] INFO [Consumer clientId=connector-consumer-UniqueName-0, groupId=connect-UniqueName] Cluster ID: oRLHturITV6x2CA8VRw_jg (org.apache.kafka.clients.Metadata)
[2020-10-04 16:58:27,837] INFO [Consumer clientId=connector-consumer-UniqueName-0, groupId=connect-UniqueName] Discovered group coordinator broker:29092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-10-04 16:58:27,838] INFO [Consumer clientId=connector-consumer-UniqueName-0, groupId=connect-UniqueName] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
Our specific connector is identified in logs through its given
name
.The Kafka Sink Connector sends a snapshot of metrics for the connector and its workers every 2 minutes. We do this in an effort to provide health and uptime stats on the batch.sh dashboard. The metrics reporter runs independently of the main connector class and does not need to necessarily be working properly in order for the connector to keep streaming events. Any log messages from the connector that deal with sending a "heartbeat" are referring to the payload of stats send by this reporter. Like the connector itself, the only variable in configuration with the metrics reporter/collector is the
batch.token
. Errors such as the following indicate you may have an invalid batch.token
configured:[2020-10-04 17:00:28,734] ERROR error while pushing heartbeat: {"status":404,"message":"","errors":"error retrieving collection: error while trying to fetch collection from cache: not found"}
(sh.batch.kafka.BatchSinkMetricsReporter)
If you see this message, double check that your token is correct by verifying the collection through the batch.sh dashboard.
Last modified 1yr ago