Kafka Connect Connectors¶
Connector guides for common Kafka Connect integrations.
Connector Overview¶
| Connector | Type | Use Case |
|---|---|---|
| Cassandra Sink | Sink | Persist events to Cassandra |
| S3 Sink | Sink | Data lake ingestion |
| Elasticsearch Sink | Sink | Search indexing |
| HTTP Source | Source | REST API ingestion |
| File Source | Source | Log file streaming |
Sink Connectors¶
Cassandra Sink¶
Persist Kafka events to Apache Cassandra tables.
{
"name": "cassandra-sink",
"config": {
"connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
"tasks.max": "3",
"topics": "events",
"contactPoints": "cassandra1,cassandra2,cassandra3",
"loadBalancing.localDc": "datacenter1",
"topic.events.keyspace.table.mapping": "analytics.events",
"topic.events.keyspace.table.consistencyLevel": "LOCAL_QUORUM"
}
}
S3 Sink¶
Stream events to Amazon S3 in Parquet, Avro, or JSON format.
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "3",
"topics": "events",
"s3.bucket.name": "data-lake",
"s3.region": "us-east-1",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"flush.size": "10000"
}
}
Elasticsearch Sink¶
Index events for search and analytics.
{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "3",
"topics": "events",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": "true",
"schema.ignore": "true"
}
}
Source Connectors¶
HTTP Source¶
Poll REST APIs and stream responses to Kafka.
{
"name": "http-source",
"config": {
"connector.class": "io.confluent.connect.http.HttpSourceConnector",
"tasks.max": "1",
"http.url": "https://api.example.com/events",
"http.method": "GET",
"http.headers": "Authorization: Bearer ${token}",
"kafka.topic": "api-events",
"http.timer.interval.ms": "60000"
}
}
File Source¶
Stream log files to Kafka topics.
{
"name": "file-source",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"tasks.max": "1",
"file": "/var/log/application.log",
"topic": "application-logs"
}
}
Common Configuration¶
Error Handling¶
{
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-connector-name",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.context.headers.enable": true,
"errors.log.enable": true,
"errors.log.include.messages": true
}
Exactly-Once (Kafka 3.3+)¶
{
"exactly.once.support": "required",
"transaction.boundary": "poll"
}
Schema Registry¶
{
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
Connector Management¶
Create Connector¶
curl -X POST http://connect:8083/connectors \
-H "Content-Type: application/json" \
-d @connector-config.json
List Connectors¶
curl http://connect:8083/connectors
Check Status¶
curl http://connect:8083/connectors/my-connector/status
Restart Connector¶
curl -X POST http://connect:8083/connectors/my-connector/restart
Delete Connector¶
curl -X DELETE http://connect:8083/connectors/my-connector
Related Documentation¶
- Kafka Connect - Connect framework
- Kafka Connect Concepts - Conceptual overview
- Schema Registry - Schema management