Skip to content

Elasticsearch Sink Connector

Stream Kafka events to Elasticsearch for full-text search, log analytics, and real-time dashboards.


Overview

The Elasticsearch Sink Connector writes Kafka records to Elasticsearch indices, supporting:

  • Automatic index creation and mapping
  • Document ID generation from record keys
  • Bulk indexing for performance
  • Schema-less and schema-aware modes

Installation

# Confluent Hub
confluent-hub install confluentinc/kafka-connect-elasticsearch:latest

# Manual installation
curl -O https://packages.confluent.io/archive/7.5/kafka-connect-elasticsearch-7.5.0.zip
unzip kafka-connect-elasticsearch-7.5.0.zip -d /usr/share/kafka/plugins/

Basic Configuration

{
  "name": "elasticsearch-sink",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "3",
    "topics": "events",

    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",

    "key.ignore": "false",
    "schema.ignore": "true"
  }
}

Connection Settings

Single Node

{
  "connection.url": "http://elasticsearch:9200"
}

Cluster

{
  "connection.url": "http://es1:9200,http://es2:9200,http://es3:9200"
}

Authentication

{
  "connection.url": "https://elasticsearch:9200",
  "connection.username": "${secrets:es/username}",
  "connection.password": "${secrets:es/password}"
}

SSL/TLS

{
  "elastic.security.protocol": "SSL",
  "elastic.https.ssl.keystore.location": "/path/to/keystore.jks",
  "elastic.https.ssl.keystore.password": "${secrets:ssl/keystore-password}",
  "elastic.https.ssl.truststore.location": "/path/to/truststore.jks",
  "elastic.https.ssl.truststore.password": "${secrets:ssl/truststore-password}"
}

Index Configuration

Index Naming

{
  "topics": "events",
  "index.name.format": "${topic}"
}

Records from topic events write to index events.

Custom Index Names

{
  "topics": "user-events,system-events",
  "index.name.format": "logs-${topic}"
}

Time-Based Indices

{
  "index.name.format": "${topic}-${timestamp}",
  "index.name.format.datetime.pattern": "yyyy-MM-dd"
}

Creates indices like events-2024-01-15.


Document ID

From Record Key

{
  "key.ignore": "false"
}

Uses the Kafka record key as the Elasticsearch document _id.

Auto-Generated

{
  "key.ignore": "true"
}

Elasticsearch generates document IDs automatically.

From Record Field

Use SMT to extract document ID:

{
  "transforms": "extractId",
  "transforms.extractId.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.extractId.fields": "event_id"
}

Schema Handling

Schema-less Mode

For schemaless JSON data:

{
  "schema.ignore": "true",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false"
}

Schema-Aware Mode

With Schema Registry:

{
  "schema.ignore": "false",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://schema-registry:8081"
}

Mapping Configuration

Dynamic Mapping

Elasticsearch creates mappings automatically based on data types.

Explicit Mapping

Create index mapping before starting connector:

PUT /events
{
  "mappings": {
    "properties": {
      "event_id": {"type": "keyword"},
      "event_type": {"type": "keyword"},
      "timestamp": {"type": "date", "format": "epoch_millis"},
      "user_id": {"type": "keyword"},
      "message": {"type": "text"},
      "metadata": {"type": "object", "dynamic": true}
    }
  }
}

Bulk Operations

Batch Settings

{
  "batch.size": "2000",
  "max.buffered.records": "20000",
  "linger.ms": "1000"
}
Property Description Default
batch.size Records per bulk request 2000
max.buffered.records Max records in memory 20000
linger.ms Wait time before flush 1

Flush Settings

{
  "flush.timeout.ms": "180000",
  "max.in.flight.requests": "5"
}

Error Handling

Retry Configuration

{
  "max.retries": "5",
  "retry.backoff.ms": "100"
}

Dead Letter Queue

{
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "dlq-elasticsearch-sink",
  "errors.deadletterqueue.topic.replication.factor": 3,
  "errors.deadletterqueue.context.headers.enable": true
}

Drop Invalid Records

{
  "behavior.on.malformed.documents": "warn",
  "behavior.on.null.values": "delete"
}
Behavior Description
fail Stop connector on error
warn Log warning and skip
ignore Silently skip
delete Delete document (null values)

Performance Tuning

Parallelism

{
  "tasks.max": "6"
}

Match number of topic partitions.

Bulk Optimization

{
  "batch.size": "5000",
  "max.buffered.records": "50000",
  "linger.ms": "5000",
  "max.in.flight.requests": "10"
}

Compression

{
  "compression.type": "gzip"
}

Complete Example

{
  "name": "events-to-elasticsearch",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "6",
    "topics": "events,logs",

    "connection.url": "https://es1:9200,https://es2:9200,https://es3:9200",
    "connection.username": "${secrets:es/username}",
    "connection.password": "${secrets:es/password}",

    "type.name": "_doc",
    "key.ignore": "false",
    "schema.ignore": "true",

    "index.name.format": "${topic}-${timestamp}",
    "index.name.format.datetime.pattern": "yyyy-MM-dd",

    "batch.size": "5000",
    "max.buffered.records": "50000",
    "linger.ms": "5000",
    "flush.timeout.ms": "180000",

    "max.retries": "5",
    "retry.backoff.ms": "100",

    "behavior.on.malformed.documents": "warn",
    "behavior.on.null.values": "delete",

    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "false",

    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq-elasticsearch-sink",
    "errors.deadletterqueue.topic.replication.factor": 3
  }
}

Index Lifecycle Management

Configure ILM policy in Elasticsearch:

PUT _ilm/policy/logs-policy
{
  "policy": {
    "phases": {
      "hot": {
        "actions": {
          "rollover": {"max_size": "50gb", "max_age": "1d"}
        }
      },
      "warm": {
        "min_age": "7d",
        "actions": {
          "shrink": {"number_of_shards": 1},
          "forcemerge": {"max_num_segments": 1}
        }
      },
      "delete": {
        "min_age": "30d",
        "actions": {"delete": {}}
      }
    }
  }
}

Monitoring

Connector Metrics

# Check connector status
curl http://connect:8083/connectors/elasticsearch-sink/status

# Check task status
curl http://connect:8083/connectors/elasticsearch-sink/tasks/0/status

Elasticsearch Metrics

# Index stats
curl http://elasticsearch:9200/events/_stats

# Cluster health
curl http://elasticsearch:9200/_cluster/health

Troubleshooting

Issue Cause Solution
Connection refused Network/firewall Verify Elasticsearch is reachable
Authentication failed Invalid credentials Check username/password
Mapping conflict Type mismatch Review index mapping
Bulk rejection Queue full Reduce batch size or add nodes
High latency Slow indexing Tune bulk settings