Kafka Connect
Kafka Connect is a framework for streaming data between Apache Kafka and external systems using pre-built or custom connectors.
Overview
Kafka Connect eliminates the need to write custom integration code for common data sources and sinks. The framework handles:
- Parallelization and scaling
- Offset management and exactly-once delivery
- Schema integration with Schema Registry
- Fault tolerance and automatic recovery
- Standardized monitoring and operations

Architecture
Components
| Component |
Description |
| Worker |
JVM process that executes connectors and tasks |
| Connector |
Plugin that defines how to connect to external system |
| Task |
Unit of work; connectors are divided into tasks for parallelism |
| Converter |
Serializes/deserializes data between Connect and Kafka |
| Transform |
Modifies records in-flight (Single Message Transforms) |
Worker Architecture

Data Flow

Deployment Modes
Standalone Mode
Single worker process—suitable for development and simple use cases.
# Start standalone worker
connect-standalone.sh \
config/connect-standalone.properties \
config/file-source.properties \
config/file-sink.properties
Standalone properties:
bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Offset storage (local file)
offset.storage.file.filename=/tmp/connect.offsets
# REST API
rest.port=8083
| Characteristic |
Standalone Mode |
| Workers |
Single process |
| Offset storage |
Local file |
| Fault tolerance |
None |
| Scaling |
Not supported |
| Use case |
Development, testing |
Distributed Mode
Multiple workers forming a cluster—required for production.
# Start distributed worker (on each node)
connect-distributed.sh config/connect-distributed.properties
Distributed properties:
bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092
# Group coordination
group.id=connect-cluster
# Offset storage (Kafka topics)
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=25
# Config storage
config.storage.topic=connect-configs
config.storage.replication.factor=3
# Status storage
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=5
# Converters
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
# REST API
rest.advertised.host.name=connect-worker-1
rest.port=8083
| Characteristic |
Distributed Mode |
| Workers |
Multiple processes (cluster) |
| Offset storage |
Kafka topic (connect-offsets) |
| Fault tolerance |
Automatic task redistribution |
| Scaling |
Add workers to scale |
| Use case |
Production |
Internal Topics
| Topic |
Purpose |
Recommended Config |
connect-offsets |
Source connector offsets |
RF=3, partitions=25 |
connect-configs |
Connector configurations |
RF=3, partitions=1, compacted |
connect-status |
Connector/task status |
RF=3, partitions=5, compacted |
Connector Configuration
Creating Connectors via REST API
# Create a Cassandra Sink connector
curl -X POST http://connect:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "cassandra-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "1",
"topics": "events",
"contactPoints": "cassandra1,cassandra2,cassandra3",
"loadBalancing.localDc": "datacenter1",
"port": "9042",
"topic.events.keyspace.table.mapping": "events.events_by_time",
"topic.events.keyspace.table.consistencyLevel": "LOCAL_QUORUM"
}
}'
Common Configuration Properties
| Property |
Description |
name |
Unique connector name |
connector.class |
Fully qualified connector class |
tasks.max |
Maximum number of tasks |
key.converter |
Key converter (overrides worker default) |
value.converter |
Value converter (overrides worker default) |
transforms |
Comma-separated list of transforms |
errors.tolerance |
Error handling: none, all |
errors.deadletterqueue.topic.name |
Dead letter queue topic |
Connector Lifecycle

REST API Reference
Connector Management
| Endpoint |
Method |
Description |
/connectors |
GET |
List all connectors |
/connectors |
POST |
Create connector |
/connectors/{name} |
GET |
Get connector info |
/connectors/{name} |
DELETE |
Delete connector |
/connectors/{name}/config |
GET |
Get connector config |
/connectors/{name}/config |
PUT |
Update connector config |
/connectors/{name}/status |
GET |
Get connector status |
/connectors/{name}/restart |
POST |
Restart connector |
/connectors/{name}/pause |
PUT |
Pause connector |
/connectors/{name}/resume |
PUT |
Resume connector |
Task Management
| Endpoint |
Method |
Description |
/connectors/{name}/tasks |
GET |
List tasks |
/connectors/{name}/tasks/{id}/status |
GET |
Get task status |
/connectors/{name}/tasks/{id}/restart |
POST |
Restart task |
| Endpoint |
Method |
Description |
/ |
GET |
Cluster info |
/connector-plugins |
GET |
List installed plugins |
/connector-plugins/{plugin}/config/validate |
PUT |
Validate config |
Converters
Converters serialize and deserialize data between Connect's internal format and Kafka.
Available Converters
| Converter |
Format |
Schema Support |
JsonConverter |
JSON |
Optional (schemas.enable) |
AvroConverter |
Avro binary |
Yes (Schema Registry) |
ProtobufConverter |
Protobuf binary |
Yes (Schema Registry) |
JsonSchemaConverter |
JSON with schema |
Yes (Schema Registry) |
StringConverter |
Plain string |
No |
ByteArrayConverter |
Raw bytes |
No |
Converter Configuration
# JSON without schemas (simple)
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
# Avro with Schema Registry (production)
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://schema-registry:8081
Converter Selection Guide
| Use Case |
Recommended Converter |
| Development/debugging |
JsonConverter (schemas.enable=false) |
| Production with schema evolution |
AvroConverter or ProtobufConverter |
| Existing JSON consumers |
JsonConverter or JsonSchemaConverter |
| Maximum compatibility |
StringConverter (manual serialization) |
→ Converters Guide
SMTs modify records as they flow through Connect—useful for simple transformations without custom code.
| Transform |
Description |
InsertField |
Add field with static or metadata value |
ReplaceField |
Rename, include, or exclude fields |
MaskField |
Replace field value with valid null |
ValueToKey |
Copy fields from value to key |
ExtractField |
Extract single field from struct |
SetSchemaMetadata |
Set schema name and version |
TimestampRouter |
Route to topic based on timestamp |
RegexRouter |
Route to topic based on regex |
Flatten |
Flatten nested structures |
Cast |
Cast field to different type |
HeaderFrom |
Copy field to header |
InsertHeader |
Add static header |
DropHeaders |
Remove headers |
Filter |
Drop records matching predicate |
{
"name": "my-connector",
"config": {
"connector.class": "...",
"transforms": "addTimestamp,route",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "processed_at",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "(.*)_raw",
"transforms.route.replacement": "$1_processed"
}
}

→ Transforms Guide
Error Handling
Error Tolerance
{
"name": "my-connector",
"config": {
"connector.class": "...",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "my-connector-dlq",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.context.headers.enable": true,
"errors.log.enable": true,
"errors.log.include.messages": true
}
}
| Configuration |
Description |
errors.tolerance=none |
Fail on first error (default) |
errors.tolerance=all |
Log errors and continue |
errors.deadletterqueue.topic.name |
Topic for failed records |
errors.deadletterqueue.context.headers.enable |
Include error context in headers |
errors.log.enable |
Log errors to Connect log |
errors.log.include.messages |
Include record content in logs |
Dead Letter Queue

→ Error Handling Guide
Exactly-Once Delivery
Kafka Connect supports exactly-once semantics for source connectors (Kafka 3.3+).
Source Connector EOS
# Worker configuration
exactly.once.source.support=enabled
transaction.boundary=poll # or connector, interval
# Connector configuration (automatically uses transactions)
| Transaction Boundary |
Behavior |
poll |
Transaction per poll() call |
connector |
Connector defines boundaries |
interval |
Transaction every N milliseconds |
Sink Connector EOS
Sink connectors achieve exactly-once through idempotent writes to external systems:
| Strategy |
Implementation |
| Upsert |
Use primary key for idempotent updates |
| Deduplication |
Track processed offsets in sink |
| Transactions |
Commit offset with sink transaction |
→ Exactly-Once Guide
Monitoring
Key Metrics
| Metric |
Description |
Alert Threshold |
connector-count |
Number of connectors |
Expected count |
task-count |
Number of running tasks |
Expected count |
connector-startup-failure-total |
Connector startup failures |
> 0 |
task-startup-failure-total |
Task startup failures |
> 0 |
source-record-poll-total |
Records polled by source |
Depends on workload |
sink-record-send-total |
Records sent by sink |
Depends on workload |
offset-commit-failure-total |
Offset commit failures |
> 0 |
deadletterqueue-produce-total |
Records sent to DLQ |
> 0 (investigate) |
JMX MBeans
kafka.connect:type=connector-metrics,connector={connector}
kafka.connect:type=connector-task-metrics,connector={connector},task={task}
kafka.connect:type=source-task-metrics,connector={connector},task={task}
kafka.connect:type=sink-task-metrics,connector={connector},task={task}
→ Operations Guide
Scaling
Horizontal Scaling
Add workers to the Connect cluster to distribute load:

Task Parallelism
Increase tasks.max for connectors that support parallelism:
{
"name": "jdbc-source",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "10",
"table.whitelist": "orders,customers,products"
}
}
| Connector Type |
Parallelism Model |
| HTTP Source |
One task per endpoint (typically) |
| File Source |
One task per file or directory |
| S3 Sink |
Tasks share topic partitions |
| Cassandra Sink |
Tasks share topic partitions |