S3 Sink Connector¶
Stream Kafka events to Amazon S3 for data lake storage, analytics, and long-term archival.
Overview¶
The S3 Sink Connector writes Kafka records to S3 objects, supporting:
- Multiple output formats (Parquet, Avro, JSON)
- Time-based and field-based partitioning
- Exactly-once delivery with commit tracking
- Automatic file rotation by size, time, or record count
Installation¶
The S3 Sink Connector is included in Confluent Platform. For standalone installation:
# Download from Confluent Hub
confluent-hub install confluentinc/kafka-connect-s3:latest
# Or manually download
curl -O https://packages.confluent.io/archive/7.5/kafka-connect-s3-7.5.0.zip
unzip kafka-connect-s3-7.5.0.zip -d /usr/share/kafka/plugins/
Basic Configuration¶
{
"name": "s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "3",
"topics": "events",
"s3.bucket.name": "data-lake-bucket",
"s3.region": "us-east-1",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"flush.size": "10000",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner"
}
}
AWS Configuration¶
IAM Credentials¶
{
"aws.access.key.id": "${secrets:aws/access-key}",
"aws.secret.access.key": "${secrets:aws/secret-key}"
}
IAM Role (Recommended)¶
When running on EC2 or EKS, use instance profiles or IAM roles for service accounts:
{
"s3.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
}
Required IAM Permissions¶
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:ListBucket",
"s3:DeleteObject",
"s3:GetBucketLocation"
],
"Resource": [
"arn:aws:s3:::data-lake-bucket",
"arn:aws:s3:::data-lake-bucket/*"
]
}
]
}
Output Formats¶
Parquet¶
Columnar format optimized for analytics workloads.
{
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy"
}
| Codec | Compression | Speed | Use Case |
|---|---|---|---|
snappy |
Medium | Fast | General purpose |
gzip |
High | Slow | Storage optimization |
zstd |
High | Medium | Balanced |
none |
None | Fastest | Raw data |
Avro¶
Schema-aware binary format.
{
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"avro.codec": "snappy"
}
JSON¶
Human-readable format for debugging or simple integrations.
{
"format.class": "io.confluent.connect.s3.format.json.JsonFormat"
}
Partitioning¶
Time-Based Partitioning¶
Organize data by time for efficient querying.
{
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "en-US",
"timezone": "UTC",
"partition.duration.ms": "3600000"
}
Output path: topics/events/year=2024/month=01/day=15/hour=10/events+0+0000000000.parquet
Field-Based Partitioning¶
Partition by record field values.
{
"partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner",
"partition.field.name": "region,event_type"
}
Output path: topics/events/region=us-east/event_type=click/events+0+0000000000.parquet
Default Partitioning¶
Simple topic-partition structure.
{
"partitioner.class": "io.confluent.connect.storage.partitioner.DefaultPartitioner"
}
Output path: topics/events/partition=0/events+0+0000000000.parquet
File Rotation¶
By Record Count¶
{
"flush.size": "10000"
}
Create new file every 10,000 records.
By Time¶
{
"rotate.schedule.interval.ms": "3600000"
}
Create new file every hour.
By File Size¶
{
"s3.part.size": "26214400"
}
Flush when S3 part reaches 25MB.
Schema Configuration¶
With Schema Registry¶
{
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
Without Schema¶
{
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false"
}
Error Handling¶
{
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-s3-sink",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.context.headers.enable": true,
"errors.log.enable": true
}
Performance Tuning¶
Parallelism¶
{
"tasks.max": "6"
}
Set to number of topic partitions for maximum throughput.
Buffer Settings¶
{
"s3.part.size": "52428800",
"flush.size": "50000",
"rotate.interval.ms": "600000"
}
Retry Configuration¶
{
"s3.retry.backoff.ms": "200",
"s3.part.retries": "3"
}
Complete Example¶
{
"name": "events-to-s3",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "6",
"topics": "events,transactions",
"s3.bucket.name": "analytics-data-lake",
"s3.region": "us-east-1",
"s3.credentials.provider.class": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain",
"format.class": "io.confluent.connect.s3.format.parquet.ParquetFormat",
"parquet.codec": "snappy",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
"locale": "en-US",
"timezone": "UTC",
"partition.duration.ms": "3600000",
"flush.size": "50000",
"rotate.interval.ms": "600000",
"s3.part.size": "52428800",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "dlq-s3-sink",
"errors.deadletterqueue.topic.replication.factor": 3
}
}
S3 Object Lifecycle¶
Configure S3 lifecycle rules for cost optimization:
{
"Rules": [
{
"ID": "MoveToGlacier",
"Status": "Enabled",
"Filter": {"Prefix": "topics/"},
"Transitions": [
{"Days": 90, "StorageClass": "GLACIER"},
{"Days": 365, "StorageClass": "DEEP_ARCHIVE"}
]
}
]
}
Integration with Analytics¶
AWS Athena¶
Query S3 data directly:
CREATE EXTERNAL TABLE events (
event_id STRING,
event_type STRING,
timestamp BIGINT,
user_id STRING
)
PARTITIONED BY (year STRING, month STRING, day STRING, hour STRING)
STORED AS PARQUET
LOCATION 's3://analytics-data-lake/topics/events/';
MSCK REPAIR TABLE events;
AWS Glue¶
Configure Glue crawler to automatically discover schema:
{
"s3.part.size": "52428800",
"schema.compatibility": "FULL"
}
Troubleshooting¶
| Issue | Cause | Solution |
|---|---|---|
| Access Denied | IAM permissions | Verify S3 bucket policy and IAM role |
| Slow uploads | Small flush size | Increase flush.size |
| Many small files | Frequent rotation | Increase rotate.interval.ms |
| Schema errors | Registry mismatch | Verify converter configuration |
Related Documentation¶
- Connectors Overview - All connectors
- Kafka Connect - Connect framework
- Converters - Serialization formats