Skip to content

Kafka Connect Operations

Operational guidance for running Kafka Connect in production.


Monitoring

Key Metrics

Metric Description Alert
connector-count Active connectors ≠ expected
task-count Running tasks ≠ expected
connector-startup-failure-total Startup failures > 0
source-record-poll-total Records polled Anomaly
sink-record-send-total Records sent Anomaly
offset-commit-failure-total Commit failures > 0
deadletterqueue-produce-total DLQ records > 0

JMX MBeans

kafka.connect:type=connector-metrics,connector={name}
kafka.connect:type=connector-task-metrics,connector={name},task={id}
kafka.connect:type=source-task-metrics,connector={name},task={id}
kafka.connect:type=sink-task-metrics,connector={name},task={id}

Health Check

# Cluster info
curl http://connect:8083/

# List connectors
curl http://connect:8083/connectors

# Check connector status
curl http://connect:8083/connectors/my-connector/status

REST API Operations

Create Connector

curl -X POST http://connect:8083/connectors \
  -H "Content-Type: application/json" \
  -d @config.json

Update Configuration

curl -X PUT http://connect:8083/connectors/my-connector/config \
  -H "Content-Type: application/json" \
  -d @updated-config.json

Pause/Resume

# Pause
curl -X PUT http://connect:8083/connectors/my-connector/pause

# Resume
curl -X PUT http://connect:8083/connectors/my-connector/resume

Restart

# Restart connector
curl -X POST http://connect:8083/connectors/my-connector/restart

# Restart specific task
curl -X POST http://connect:8083/connectors/my-connector/tasks/0/restart

Delete

curl -X DELETE http://connect:8083/connectors/my-connector

Scaling

Add Workers

Start additional worker processes with same group.id:

# On new node
connect-distributed.sh config/connect-distributed.properties

Tasks automatically rebalance across workers.

Increase Tasks

curl -X PUT http://connect:8083/connectors/my-connector/config \
  -H "Content-Type: application/json" \
  -d '{
    "connector.class": "...",
    "tasks.max": "6"
  }'

Troubleshooting

Common Issues

Issue Symptom Solution
Task failed FAILED status Check logs, fix config, restart
Connector not starting UNASSIGNED tasks Check worker logs
High lag Slow processing Scale tasks/workers
Memory issues OOM errors Increase heap, reduce batch size

Check Task Status

curl http://connect:8083/connectors/my-connector/status | jq
{
  "name": "my-connector",
  "connector": {"state": "RUNNING", "worker_id": "worker1:8083"},
  "tasks": [
    {"id": 0, "state": "RUNNING", "worker_id": "worker1:8083"},
    {"id": 1, "state": "FAILED", "worker_id": "worker2:8083", "trace": "..."}
  ]
}

View Worker Logs

# Check Connect worker logs
tail -f /var/log/kafka-connect/connect.log | grep ERROR

Configuration Management

Validate Before Deploy

curl -X PUT http://connect:8083/connector-plugins/MyConnector/config/validate \
  -H "Content-Type: application/json" \
  -d @config.json

Export Configuration

curl http://connect:8083/connectors/my-connector/config > backup.json

Internal Topics

Topic Purpose Cleanup
connect-offsets Source offsets Compacted
connect-configs Configurations Compacted
connect-status Status updates Compacted

Verify topics exist with proper replication:

kafka-topics.sh --bootstrap-server kafka:9092 --describe \
  --topic connect-offsets