Skip to content

S3 Sink Connector

Stream Kafka events to Amazon S3 for data lake storage, analytics, and long-term archival.


Overview

The S3 Sink Connector writes Kafka records to S3 objects, supporting:

  • Multiple output formats (Parquet, Avro, JSON)
  • Time-based and field-based partitioning
  • Exactly-once delivery with commit tracking
  • Automatic file rotation by size, time, or record count

Installation

The S3 Sink Connector is included in Confluent Platform. For standalone installation:

# Download from Confluent Hub
confluent-hub install confluentinc/kafka-connect-s3:latest

# Or manually download
curl -O https://packages.confluent.io/archive/7.5/kafka-connect-s3-7.5.0.zip
unzip kafka-connect-s3-7.5.0.zip -d /usr/share/kafka/plugins/

Basic Configuration

{
  "name": "s3-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "3",
    "topics": "events",

    "s3.bucket.name": "data-lake-bucket",
    "s3.region": "us-east-1",

    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "flush.size": "10000",

    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner"
  }
}

AWS Configuration

IAM Credentials

{
  "aws.access.key.id": "${secrets:aws/access-key}",
  "aws.secret.access.key": "${secrets:aws/secret-key}"
}

When running on EC2 or EKS, use instance profiles or IAM roles for service accounts:

{
  "s3.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
}

Required IAM Permissions

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:PutObject",
        "s3:GetObject",
        "s3:ListBucket",
        "s3:DeleteObject",
        "s3:GetBucketLocation"
      ],
      "Resource": [
        "arn:aws:s3:::data-lake-bucket",
        "arn:aws:s3:::data-lake-bucket/*"
      ]
    }
  ]
}

Output Formats

Parquet

Columnar format optimized for analytics workloads.

{
  "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
  "parquet.codec": "snappy"
}
Codec Compression Speed Use Case
snappy Medium Fast General purpose
gzip High Slow Storage optimization
zstd High Medium Balanced
none None Fastest Raw data

Avro

Schema-aware binary format.

{
  "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
  "avro.codec": "snappy"
}

JSON

Human-readable format for debugging or simple integrations.

{
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat"
}

Partitioning

Time-Based Partitioning

Organize data by time for efficient querying.

{
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
  "locale": "en-US",
  "timezone": "UTC",
  "partition.duration.ms": "3600000"
}

Output path: topics/events/year=2024/month=01/day=15/hour=10/events+0+0000000000.parquet

Field-Based Partitioning

Partition by record field values.

{
  "partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner",
  "partition.field.name": "region,event_type"
}

Output path: topics/events/region=us-east/event_type=click/events+0+0000000000.parquet

Default Partitioning

Simple topic-partition structure.

{
  "partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner"
}

Output path: topics/events/partition=0/events+0+0000000000.parquet


File Rotation

By Record Count

{
  "flush.size": "10000"
}

Create new file every 10,000 records.

By Time

{
  "rotate.schedule.interval.ms": "3600000"
}

Create new file every hour.

By File Size

{
  "s3.part.size": "26214400"
}

Flush when S3 part reaches 25MB.


Schema Configuration

With Schema Registry

{
  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://schema-registry:8081",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://schema-registry:8081"
}

Without Schema

{
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": "false"
}

Error Handling

{
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "dlq-s3-sink",
  "errors.deadletterqueue.topic.replication.factor": 3,
  "errors.deadletterqueue.context.headers.enable": true,
  "errors.log.enable": true
}

Performance Tuning

Parallelism

{
  "tasks.max": "6"
}

Set to number of topic partitions for maximum throughput.

Buffer Settings

{
  "s3.part.size": "52428800",
  "flush.size": "50000",
  "rotate.interval.ms": "600000"
}

Retry Configuration

{
  "s3.retry.backoff.ms": "200",
  "s3.part.retries": "3"
}

Complete Example

{
  "name": "events-to-s3",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "6",
    "topics": "events,transactions",

    "s3.bucket.name": "analytics-data-lake",
    "s3.region": "us-east-1",
    "s3.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",

    "format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
    "parquet.codec": "snappy",

    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "en-US",
    "timezone": "UTC",
    "partition.duration.ms": "3600000",

    "flush.size": "50000",
    "rotate.interval.ms": "600000",
    "s3.part.size": "52428800",

    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",

    "errors.tolerance": "all",
    "errors.deadletterqueue.topic.name": "dlq-s3-sink",
    "errors.deadletterqueue.topic.replication.factor": 3
  }
}

S3 Object Lifecycle

Configure S3 lifecycle rules for cost optimization:

{
  "Rules": [
    {
      "ID": "MoveToGlacier",
      "Status": "Enabled",
      "Filter": {"Prefix": "topics/"},
      "Transitions": [
        {"Days": 90, "StorageClass": "GLACIER"},
        {"Days": 365, "StorageClass": "DEEP_ARCHIVE"}
      ]
    }
  ]
}

Integration with Analytics

AWS Athena

Query S3 data directly:

CREATE EXTERNAL TABLE events (
  event_id STRING,
  event_type STRING,
  timestamp BIGINT,
  user_id STRING
)
PARTITIONED BY (year STRING, month STRING, day STRING, hour STRING)
STORED AS PARQUET
LOCATION 's3://analytics-data-lake/topics/events/';

MSCK REPAIR TABLE events;

AWS Glue

Configure Glue crawler to automatically discover schema:

{
  "s3.part.size": "52428800",
  "schema.compatibility": "FULL"
}

Troubleshooting

Issue Cause Solution
Access Denied IAM permissions Verify S3 bucket policy and IAM role
Slow uploads Small flush size Increase flush.size
Many small files Frequent rotation Increase rotate.interval.ms
Schema errors Registry mismatch Verify converter configuration