Skip to content

Kafka Connect

Kafka Connect is a framework for streaming data between Apache Kafka and external systems using pre-built or custom connectors.


Overview

Kafka Connect eliminates the need to write custom integration code for common data sources and sinks. The framework handles:

  • Parallelization and scaling
  • Offset management and exactly-once delivery
  • Schema integration with Schema Registry
  • Fault tolerance and automatic recovery
  • Standardized monitoring and operations

uml diagram


Architecture

Components

Component Description
Worker JVM process that executes connectors and tasks
Connector Plugin that defines how to connect to external system
Task Unit of work; connectors are divided into tasks for parallelism
Converter Serializes/deserializes data between Connect and Kafka
Transform Modifies records in-flight (Single Message Transforms)

Worker Architecture

uml diagram

Data Flow

uml diagram


Deployment Modes

Standalone Mode

Single worker process—suitable for development and simple use cases.

# Start standalone worker
connect-standalone.sh \
  config/connect-standalone.properties \
  config/file-source.properties \
  config/file-sink.properties

Standalone properties:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Offset storage (local file)
offset.storage.file.filename=/tmp/connect.offsets

# REST API
rest.port=8083
Characteristic Standalone Mode
Workers Single process
Offset storage Local file
Fault tolerance None
Scaling Not supported
Use case Development, testing

Distributed Mode

Multiple workers forming a cluster—required for production.

# Start distributed worker (on each node)
connect-distributed.sh config/connect-distributed.properties

Distributed properties:

bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092

# Group coordination
group.id=connect-cluster

# Offset storage (Kafka topics)
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25

# Config storage
config.storage.topic=connect-configs
config.storage.replication.factor=3

# Status storage
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=5

# Converters
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081

# REST API
rest.advertised.host.name=connect-worker-1
rest.port=8083
Characteristic Distributed Mode
Workers Multiple processes (cluster)
Offset storage Kafka topic (connect-offsets)
Fault tolerance Automatic task redistribution
Scaling Add workers to scale
Use case Production

Internal Topics

Topic Purpose Recommended Config
connect-offsets Source connector offsets RF=3, partitions=25
connect-configs Connector configurations RF=3, partitions=1, compacted
connect-status Connector/task status RF=3, partitions=5, compacted

Connector Configuration

Creating Connectors via REST API

# Create a Cassandra Sink connector
curl -X POST http://connect:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "cassandra-sink",
    "config": {
      "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
      "tasks.max": "1",
      "topics": "events",
      "contactPoints": "cassandra1,cassandra2,cassandra3",
      "loadBalancing.localDc": "datacenter1",
      "port": "9042",
      "topic.events.keyspace.table.mapping": "events.events_by_time",
      "topic.events.keyspace.table.consistencyLevel": "LOCAL_QUORUM"
    }
  }'

Common Configuration Properties

Property Description
name Unique connector name
connector.class Fully qualified connector class
tasks.max Maximum number of tasks
key.converter Key converter (overrides worker default)
value.converter Value converter (overrides worker default)
transforms Comma-separated list of transforms
errors.tolerance Error handling: none, all
errors.deadletterqueue.topic.name Dead letter queue topic

Connector Lifecycle

uml diagram


REST API Reference

Connector Management

Endpoint Method Description
/connectors GET List all connectors
/connectors POST Create connector
/connectors/{name} GET Get connector info
/connectors/{name} DELETE Delete connector
/connectors/{name}/config GET Get connector config
/connectors/{name}/config PUT Update connector config
/connectors/{name}/status GET Get connector status
/connectors/{name}/restart POST Restart connector
/connectors/{name}/pause PUT Pause connector
/connectors/{name}/resume PUT Resume connector

Task Management

Endpoint Method Description
/connectors/{name}/tasks GET List tasks
/connectors/{name}/tasks/{id}/status GET Get task status
/connectors/{name}/tasks/{id}/restart POST Restart task

Cluster Information

Endpoint Method Description
/ GET Cluster info
/connector-plugins GET List installed plugins
/connector-plugins/{plugin}/config/validate PUT Validate config

Converters

Converters serialize and deserialize data between Connect's internal format and Kafka.

Available Converters

Converter Format Schema Support
JsonConverter JSON Optional (schemas.enable)
AvroConverter Avro binary Yes (Schema Registry)
ProtobufConverter Protobuf binary Yes (Schema Registry)
JsonSchemaConverter JSON with schema Yes (Schema Registry)
StringConverter Plain string No
ByteArrayConverter Raw bytes No

Converter Configuration

# JSON without schemas (simple)
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false

# Avro with Schema Registry (production)
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081

Converter Selection Guide

Use Case Recommended Converter
Development/debugging JsonConverter (schemas.enable=false)
Production with schema evolution AvroConverter or ProtobufConverter
Existing JSON consumers JsonConverter or JsonSchemaConverter
Maximum compatibility StringConverter (manual serialization)

Converters Guide


Single Message Transforms (SMTs)

SMTs modify records as they flow through Connect—useful for simple transformations without custom code.

Built-in Transforms

Transform Description
InsertField Add field with static or metadata value
ReplaceField Rename, include, or exclude fields
MaskField Replace field value with valid null
ValueToKey Copy fields from value to key
ExtractField Extract single field from struct
SetSchemaMetadata Set schema name and version
TimestampRouter Route to topic based on timestamp
RegexRouter Route to topic based on regex
Flatten Flatten nested structures
Cast Cast field to different type
HeaderFrom Copy field to header
InsertHeader Add static header
DropHeaders Remove headers
Filter Drop records matching predicate

Transform Configuration

{
  "name": "my-connector",
  "config": {
    "connector.class": "...",
    "transforms": "addTimestamp,route",

    "transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
    "transforms.addTimestamp.timestamp.field": "processed_at",

    "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.route.regex": "(.*)_raw",
    "transforms.route.replacement": "$1_processed"
  }
}

Transform Chain

uml diagram

Transforms Guide


Error Handling

Error Tolerance

{
  "name": "my-connector",
  "config": {
    "connector.class": "...",
    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "my-connector-dlq",
    "errors.deadletterqueue.topic.replication.factor": 3,
    "errors.deadletterqueue.context.headers.enable": true,
    "errors.log.enable": true,
    "errors.log.include.messages": true
  }
}
Configuration Description
errors.tolerance=none Fail on first error (default)
errors.tolerance=all Log errors and continue
errors.deadletterqueue.topic.name Topic for failed records
errors.deadletterqueue.context.headers.enable Include error context in headers
errors.log.enable Log errors to Connect log
errors.log.include.messages Include record content in logs

Dead Letter Queue

uml diagram

Error Handling Guide


Exactly-Once Delivery

Kafka Connect supports exactly-once semantics for source connectors (Kafka 3.3+).

Source Connector EOS

# Worker configuration
exactly.once.source.support=enabled
transaction.boundary=poll  # or connector, interval

# Connector configuration (automatically uses transactions)
Transaction Boundary Behavior
poll Transaction per poll() call
connector Connector defines boundaries
interval Transaction every N milliseconds

Sink Connector EOS

Sink connectors achieve exactly-once through idempotent writes to external systems:

Strategy Implementation
Upsert Use primary key for idempotent updates
Deduplication Track processed offsets in sink
Transactions Commit offset with sink transaction

Exactly-Once Guide


Monitoring

Key Metrics

Metric Description Alert Threshold
connector-count Number of connectors Expected count
task-count Number of running tasks Expected count
connector-startup-failure-total Connector startup failures > 0
task-startup-failure-total Task startup failures > 0
source-record-poll-total Records polled by source Depends on workload
sink-record-send-total Records sent by sink Depends on workload
offset-commit-failure-total Offset commit failures > 0
deadletterqueue-produce-total Records sent to DLQ > 0 (investigate)

JMX MBeans

kafka.connect:type=connector-metrics,connector={connector}
kafka.connect:type=connector-task-metrics,connector={connector},task={task}
kafka.connect:type=source-task-metrics,connector={connector},task={task}
kafka.connect:type=sink-task-metrics,connector={connector},task={task}

Operations Guide


Scaling

Horizontal Scaling

Add workers to the Connect cluster to distribute load:

uml diagram

Task Parallelism

Increase tasks.max for connectors that support parallelism:

{
  "name": "jdbc-source",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "10",
    "table.whitelist": "orders,customers,products"
  }
}
Connector Type Parallelism Model
HTTP Source One task per endpoint (typically)
File Source One task per file or directory
S3 Sink Tasks share topic partitions
Cassandra Sink Tasks share topic partitions