Skip to content

Cassandra Sink Connector

Stream Kafka events to Apache Cassandra tables using the DataStax Kafka Connector.


Overview

The Cassandra Sink Connector writes Kafka records to Cassandra tables, supporting:

  • Automatic table mapping from topic structure
  • Multiple consistency levels
  • TTL configuration
  • Batch writes for performance
  • Exactly-once semantics (idempotent upserts)

Installation

Download

# Download connector
curl -O https://downloads.datastax.com/kafka/kafka-connect-cassandra-sink.tar.gz
tar -xzf kafka-connect-cassandra-sink.tar.gz

# Copy to Connect plugin path
cp -r kafka-connect-cassandra-sink /usr/share/kafka/plugins/

Docker

services:
  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.5.0
    environment:
      CONNECT_PLUGIN_PATH: /usr/share/kafka/plugins
    volumes:
      - ./kafka-connect-cassandra-sink:/usr/share/kafka/plugins/cassandra-sink

Basic Configuration

{
  "name": "cassandra-sink",
  "config": {
    "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
    "tasks.max": "3",
    "topics": "events",

    "contactPoints": "cassandra1,cassandra2,cassandra3",
    "loadBalancing.localDc": "datacenter1",
    "port": "9042",

    "topic.events.keyspace.table.mapping": "analytics.events_by_time",
    "topic.events.keyspace.table.consistencyLevel": "LOCAL_QUORUM"
  }
}

Connection Settings

Property Description Default
contactPoints Cassandra contact points Required
port CQL native port 9042
loadBalancing.localDc Local datacenter for routing Required
maxConcurrentRequests Max concurrent requests 500
maxNumberOfRecordsInBatch Batch size 32

Authentication

{
  "auth.provider": "PLAIN",
  "auth.username": "cassandra_user",
  "auth.password": "cassandra_password"
}

SSL/TLS

{
  "ssl.provider": "JDK",
  "ssl.hostnameValidation": true,
  "ssl.keystore.path": "/path/to/keystore.jks",
  "ssl.keystore.password": "keystore_password",
  "ssl.truststore.path": "/path/to/truststore.jks",
  "ssl.truststore.password": "truststore_password"
}

Table Mapping

Simple Mapping

Map topic fields directly to table columns.

Kafka Message:

{
  "event_id": "abc123",
  "event_type": "click",
  "timestamp": 1705312800000,
  "user_id": "user456"
}

Cassandra Table:

CREATE TABLE analytics.events_by_time (
    event_date date,
    event_time timestamp,
    event_id text,
    event_type text,
    user_id text,
    PRIMARY KEY ((event_date), event_time, event_id)
) WITH CLUSTERING ORDER BY (event_time DESC);

Configuration:

{
  "topic.events.keyspace.table.mapping": "analytics.events_by_time",
  "topic.events.analytics.events_by_time.mapping": "event_id=value.event_id, event_type=value.event_type, event_time=value.timestamp, user_id=value.user_id, event_date=now()"
}

Mapping Functions

Function Description Example
value.field Extract from value value.user_id
key.field Extract from key key.partition_key
header.field Extract from header header.trace_id
now() Current timestamp event_time=now()

Time-Based Partitioning

{
  "topic.events.analytics.events_by_time.mapping": "event_date=value.timestamp:toDate, event_time=value.timestamp, event_id=value.event_id"
}

Consistency Levels

Level Description Use Case
LOCAL_ONE One replica in local DC High throughput, lower durability
LOCAL_QUORUM Majority in local DC Balanced durability/performance
QUORUM Majority across all DCs Strong consistency
ALL All replicas Maximum durability
{
  "topic.events.keyspace.table.consistencyLevel": "LOCAL_QUORUM"
}

TTL Configuration

Automatically expire records after a specified time.

{
  "topic.events.keyspace.table.ttl": "86400",
  "topic.events.keyspace.table.ttlTimeUnit": "SECONDS"
}
Time Unit Description
SECONDS TTL in seconds
MINUTES TTL in minutes
HOURS TTL in hours
DAYS TTL in days

Error Handling

Dead Letter Queue

{
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "dlq-cassandra-sink",
  "errors.deadletterqueue.topic.replication.factor": 3,
  "errors.deadletterqueue.context.headers.enable": true
}

Retry Configuration

{
  "maxNumberOfRecordsInBatch": 32,
  "queryExecutionTimeout": 30,
  "connectionPoolLocalSize": 4
}

Performance Tuning

Batch Settings

{
  "maxNumberOfRecordsInBatch": 100,
  "maxConcurrentRequests": 1000
}

Task Parallelism

{
  "tasks.max": "6"
}

Recommendation: Set tasks.max equal to or greater than the number of topic partitions for maximum parallelism.

Connection Pool

{
  "connectionPoolLocalSize": 4,
  "connectionPoolRemoteSize": 2
}

Complete Example

{
  "name": "events-to-cassandra",
  "config": {
    "connector.class": "com.datastax.oss.kafka.sink.CassandraSinkConnector",
    "tasks.max": "6",
    "topics": "events",

    "contactPoints": "cass1.example.com,cass2.example.com,cass3.example.com",
    "loadBalancing.localDc": "datacenter1",
    "port": "9042",

    "auth.provider": "PLAIN",
    "auth.username": "${secrets:cassandra/username}",
    "auth.password": "${secrets:cassandra/password}",

    "ssl.provider": "JDK",
    "ssl.truststore.path": "/etc/kafka-connect/ssl/truststore.jks",
    "ssl.truststore.password": "${secrets:ssl/truststore-password}",

    "topic.events.keyspace.table.mapping": "analytics.events_by_time",
    "topic.events.analytics.events_by_time.mapping": "event_date=value.timestamp:toDate, event_time=value.timestamp, event_id=value.event_id, event_type=value.event_type, user_id=value.user_id, payload=value.payload",
    "topic.events.keyspace.table.consistencyLevel": "LOCAL_QUORUM",
    "topic.events.keyspace.table.ttl": "365",
    "topic.events.keyspace.table.ttlTimeUnit": "DAYS",

    "maxNumberOfRecordsInBatch": 100,
    "maxConcurrentRequests": 1000,

    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq-cassandra-sink",
    "errors.deadletterqueue.topic.replication.factor": 3
  }
}

Monitoring

Connector Metrics

Metric Description
connector-metrics/record-send-rate Records sent per second
connector-metrics/record-error-rate Error rate
connector-metrics/batch-size-avg Average batch size

Health Check

# Check connector status
curl http://connect:8083/connectors/cassandra-sink/status

# Check task status
curl http://connect:8083/connectors/cassandra-sink/tasks/0/status

Troubleshooting

Issue Cause Solution
Connection timeout Network/firewall Verify connectivity to contact points
Authentication failed Invalid credentials Check username/password
WriteTimeoutException Slow cluster Increase queryExecutionTimeout
High error rate Schema mismatch Verify table mapping