Kafka Sink Connector (Java)

The Batch.sh Kafka Sink Connector lets you stream your events directly to batch with minimal configuration. Use this guide to help you get started.

The Kafka sink connector funnels records sent over specified topics to the batch.sh collector service.

Batch offers the ability to tee any events produced within a Kafka cluster up to a remote collector that can optionally analyze those events and map them to a configured schema. These events can then be queried and replayed at target destinations.

Installation

Prerequisites

Kafka Versions

  • Internally tested with Kafka 2.2.0 and 2.2.1 (on AWS MSK)

    • newer versions of Kafka up through 2.5.x should work, older versions are not tested

  • Built for use with Kafka Connect API 2.5.x

    • should be compatible with Connect API 2.0.x - 2.5.0

Getting the Kafka Sink Connector

Confluent

  1. Download the connector ZIP archive from:

  2. Follow confluent-platform's guide on installing custom connectors using that file

Open Source Apache Kafka

  1. Download the connector JAR file from the official releases

  2. Copy the connector JAR file to the plugin.path on your Connect worker nodes; this path is specified in your

    worker properties (see: https://docs.confluent.io/current/connect/userguide.html#installing-kconnect-plugins)

Configuring the Kafka Sink Connector

Kafka Connect offers two modes of operation: distributed and standalone. These modes each have their own configuration values that can be passed along with the specific values needed by the connected. See: https://docs.confluent.io/current/connect/userguide.html#running-workers

Distributed Mode

Distributed mode is configured through the connect REST API.

Example:

curl -X POST \
    KAFKA_CONNECT_REST_API_HOST:8083/connectors \
    -H 'Content-Type: application/json' \
    -H 'Accept: application/json' \
    -d '{"name": "FooBarBatchSh", "config": {
    "connector.class": "sh.batch.kafka.BatchSinkConnector",
    "tasks.max": "5",
    "topics": "topic1,topic2",
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
    "batch.token": "2494b6fd-74dc-45a3-9c7c-fc0d81e9591c",
}}'

Standalone Mode

Standalone mode is configured through a properties file that is passed in as an argument to the kafka connect CLI.

Example:

Create this file somewhere on your host, for example to /config/batchsh-connector.properties

name=FooBarBatchSh
connector.class=sh.batch.kafka.BatchSinkConnector
batch.token=2494b6fd-74dc-45a3-9c7c-fc0d81e9591c
tasks.max=5
topics=topic1,topic2
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

Then use the Kafka connect CLI tools to start the connector:

bin/connect-standalone /path/to/worker.properties /config/batchsh-connector.properties

Confluent

Within Confluent there is a menu-driven setup that will show all the possible configuration values that can be passed to the connector. The keys and values would not change.

Configuration Values

Our goal with our connectors is to get you up and running as swiftly as possible. The following configuration values are the ones which are strictly necessary to start the batch.sh connector. There are other parameters that can be adjusted for the Kafka Connect cluster or the workers themselves which can be found on the Connect documention.

name

Unique name for the connector. Attempting to register again with the same name will fail.

connector.class

sh.batch.kafka.BatchSinkConnector

tasks.max

Maximum number of tasks to use for this connector. Tasks act as consumer threads for Connect workers so tune this setting to be in line with a typical consumer group for the configured topic(s).

batch.token

The ID of the batch.sh collection that will be the destination for the events. The collection ID allows the collector service to map the data to a known schema, model it, and store it.

Do not share this UUID around. It is used to uniquely identify an individual collection and should be treated like a secret. All production uses of this connector should use secure transport whenever possible.

topics

A comma-separated list of topics to use as input for this connector.

topics.regex

Regular expression giving topics to consume. Under the hood, the regex is compiled to a java.util.regex.Pattern. Only one of topics or topics.regex should be specified.

topics or topics.regex may be specified, but not both!

key.converter AND value.converter

org.apache.kafka.connect.converters.ByteArrayConverter

The batch.sh sink connector was designed from the start as a pass-through connector. If the kafka-connect runtime tries to do its own data conversion then the resulting byte array sent to the collector is no longer the untainted data sent originally by the producer. No matter what data type your records are (Avro, Protobuf, etc) ALWAYS choose the ByteArray converter and let the collector resolve the data with its own schemas.

Error Handling

The Kafka Sink Connector attempts to retry errors indefinitely. It prioritizes messages being delivered so if for some reason records cannot be processed then by default it will block send records, retrying about once a minute until records can be sent again. Due to the configurable nature of Kafka Connect, it is possible to tune some of this behavior by introducing dead letter queues or modifying the other error parameters but our recommendation is to leave these unset. Given the connector is a pass-through for any raw data the only time it is expected to see processing errors under normal operation is a remote collector outage. In that scenario, letting the connector retry until the collector service is back online is recommended.

Logs

If your configured connector is unhealthy or is not sending data properly, the first step would be to check the Kafka Connect logs. The location of these logs will vary by setup, but for our test environments the Connect hosts have their own instances.

Here is an example of some logs printed by our example connector in Confluent:

[2020-10-04 16:58:27,837] INFO [Consumer clientId=connector-consumer-UniqueName-0, groupId=connect-UniqueName] Cluster ID: oRLHturITV6x2CA8VRw_jg (org.apache.kafka.clients.Metadata)
[2020-10-04 16:58:27,837] INFO [Consumer clientId=connector-consumer-UniqueName-0, groupId=connect-UniqueName] Discovered group coordinator broker:29092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-10-04 16:58:27,838] INFO [Consumer clientId=connector-consumer-UniqueName-0, groupId=connect-UniqueName] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

Our specific connector is identified in logs through its given name .

Metrics

The Kafka Sink Connector sends a snapshot of metrics for the connector and its workers every 2 minutes. We do this in an effort to provide health and uptime stats on the batch.sh dashboard. The metrics reporter runs independently of the main connector class and does not need to necessarily be working properly in order for the connector to keep streaming events. Any log messages from the connector that deal with sending a "heartbeat" are referring to the payload of stats send by this reporter. Like the connector itself, the only variable in configuration with the metrics reporter/collector is the batch.token. Errors such as the following indicate you may have an invalid batch.token configured:

[2020-10-04 17:00:28,734] ERROR error while pushing heartbeat: {"status":404,"message":"","errors":"error retrieving collection: error while trying to fetch collection from cache: not found"}
 (sh.batch.kafka.BatchSinkMetricsReporter)

If you see this message, double check that your token is correct by verifying the collection through the batch.sh dashboard.

Last updated