Skip to content

Data Integration

Patterns for integrating Apache Kafka with data systems across the enterprise.


Integration Approaches

Approach Description Use Case
Kafka Connect Declarative connectors Standard integrations
Custom Producers/Consumers Application code Complex logic
Kafka Streams Stream processing Transformations

ETL vs ELT with Kafka

Traditional ETL

uml diagram

Kafka-Based Streaming ETL

uml diagram

ELT Pattern

uml diagram


Streaming Pipeline Architecture

uml diagram


Integration Patterns

Fan-Out Pattern

One source feeds multiple sinks for different use cases.

uml diagram

Fan-In Pattern

Multiple sources aggregate into unified topics.

uml diagram

Enrichment Pattern

Enrich events with reference data.

uml diagram


Kafka Connect Patterns

Source Connector Patterns

Pattern Description Example
Polling Periodically fetch data HTTP Source
Push Receive pushed events MQTT Source
Log tailing Stream log files FileStream Source
Message bridge Bridge message systems JMS Source

Sink Connector Patterns

Pattern Description Example
Upsert Insert or update records Cassandra Sink
Append Append-only writes S3 Sink
Index Update search index Elasticsearch Sink
Time-partitioned Partition by time S3 Sink with partitioner

Exactly-Once Sink Pattern

uml diagram


Data Lake Integration

S3 Sink Configuration

{
  "name": "s3-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "3",
    "topics": "events",
    "s3.bucket.name": "data-lake",
    "s3.region": "us-east-1",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "partition.duration.ms": "3600000",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "rotate.interval.ms": "600000",
    "flush.size": "10000"
  }
}

Resulting S3 Structure

s3://data-lake/topics/events/
  year=2024/
    month=01/
      day=15/
        hour=10/
          events+0+0000000000.parquet
          events+1+0000000000.parquet
        hour=11/
          events+0+0000000100.parquet

Cassandra Integration

Cassandra Sink Configuration

{
  "name": "cassandra-sink",
  "config": {
    "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
    "tasks.max": "3",
    "topics": "events",
    "contactPoints": "cassandra1,cassandra2,cassandra3",
    "loadBalancing.localDc": "datacenter1",
    "port": "9042",
    "topic.events.keyspace.table.mapping": "analytics.events_by_time",
    "topic.events.keyspace.table.consistencyLevel": "LOCAL_QUORUM",
    "topic.events.keyspace.table.ttlTimeUnit": "DAYS",
    "topic.events.keyspace.table.ttl": "365"
  }
}

Target Table Design

CREATE TABLE analytics.events_by_time (
    event_date date,
    event_time timestamp,
    event_id uuid,
    event_type text,
    payload text,
    PRIMARY KEY ((event_date), event_time, event_id)
) WITH CLUSTERING ORDER BY (event_time DESC);

Best Practices

Schema Management

Practice Rationale
Use Schema Registry Ensure compatibility across pipeline
Prefer Avro or Protobuf Compact, schema evolution support
Version schemas explicitly Track changes, enable rollback

Error Handling

Strategy Implementation
Dead letter queue Route failed records for investigation
Error tolerance Configure errors.tolerance=all
Monitoring Alert on DLQ growth

Performance

Optimization Configuration
Batching Increase batch.size, linger.ms
Compression Use compression.type=lz4
Parallelism Increase tasks.max
Partitioning Align with downstream partitioning