Skip to content

Topic Configuration

Topic configuration controls per-topic behavior for retention, compaction, compression, and storage. Topic-level settings override broker defaults.


Configuration Hierarchy

Topic configuration follows a precedence hierarchy:

uml diagram


Managing Topic Configuration

Create Topic with Configuration

kafka-topics.sh --bootstrap-server kafka:9092 \
  --create \
  --topic events \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=86400000 \
  --config cleanup.policy=delete \
  --config compression.type=lz4 \
  --config min.insync.replicas=2

Modify Topic Configuration

# Add or update configuration
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name events \
  --alter \
  --add-config retention.ms=172800000,max.message.bytes=10485760

# Remove configuration (revert to default)
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name events \
  --alter \
  --delete-config retention.ms

View Topic Configuration

# Show all topic configurations
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name events \
  --describe

# Show topic details including configuration
kafka-topics.sh --bootstrap-server kafka:9092 \
  --describe \
  --topic events

Retention Configuration

Time-Based Retention

# Retain messages for 24 hours
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name events \
  --alter \
  --add-config retention.ms=86400000
Setting Default Description
retention.ms 604800000 (7 days) Time to retain messages
retention.bytes -1 (unlimited) Size limit per partition
segment.ms 604800000 (7 days) Time before rolling segment

Size-Based Retention

# Retain 10GB per partition
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name events \
  --alter \
  --add-config retention.bytes=10737418240

Size Retention Behavior

retention.bytes applies per partition, not per topic. A topic with 12 partitions and retention.bytes=10GB can retain up to 120GB total.

Combined Retention

When both time and size limits are set, messages are deleted when either limit is exceeded:

# Delete when older than 7 days OR when partition exceeds 50GB
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name events \
  --alter \
  --add-config retention.ms=604800000,retention.bytes=53687091200

Cleanup Policies

Delete Policy

kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name events \
  --alter \
  --add-config cleanup.policy=delete

Segments are deleted when: - Segment age exceeds retention.ms - Partition size exceeds retention.bytes - Segment is closed (newer segment exists)

Compact Policy

Log compaction retains the latest value for each key:

kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name user-profiles \
  --alter \
  --add-config cleanup.policy=compact

uml diagram

Compaction Settings

Setting Default Description
min.cleanable.dirty.ratio 0.5 Minimum dirty/total ratio to trigger
min.compaction.lag.ms 0 Minimum age before compacting
max.compaction.lag.ms MAX_LONG Maximum age before forced compaction
delete.retention.ms 86400000 Time to retain tombstones
segment.ms 604800000 Time before rolling active segment
# Configure compaction behavior
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name user-profiles \
  --alter \
  --add-config min.cleanable.dirty.ratio=0.3,\
min.compaction.lag.ms=3600000,\
delete.retention.ms=86400000

Compact + Delete Policy

Combines compaction with time-based deletion:

kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name changelog \
  --alter \
  --add-config cleanup.policy=compact,delete

With this policy: - Records are compacted (latest value per key retained) - Compacted segments are deleted after retention.ms


Compression

Topic-Level Compression

kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name events \
  --alter \
  --add-config compression.type=zstd
Value Description
producer Use producer's compression (default)
uncompressed Store uncompressed
gzip GZIP compression
snappy Snappy compression
lz4 LZ4 compression
zstd Zstandard compression

Compression Behavior

compression.type Producer Compression Stored As
producer Any Producer's choice
lz4 none LZ4 (broker compresses)
lz4 gzip LZ4 (broker recompresses)
uncompressed lz4 Uncompressed (broker decompresses)

Performance Consideration

Setting topic compression different from producer compression causes broker CPU overhead for recompression. Let producer control compression when possible (compression.type=producer).


Message Size

kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name large-events \
  --alter \
  --add-config max.message.bytes=10485760
Setting Default Description
max.message.bytes 1048588 (~1MB) Maximum record batch size
message.timestamp.type CreateTime CreateTime or LogAppendTime
message.timestamp.difference.max.ms MAX_LONG Max timestamp deviation

Coordinated Size Limits

Increasing max.message.bytes requires coordinating with: - Producer: max.request.size - Consumer: max.partition.fetch.bytes - Broker: message.max.bytes, replica.fetch.max.bytes


Replication

ISR Configuration

kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name critical-events \
  --alter \
  --add-config min.insync.replicas=2
Setting Default Description
min.insync.replicas 1 Minimum replicas for acks=all
unclean.leader.election.enable false Allow out-of-sync leader

Durability Patterns

Replication Factor min.insync.replicas Tolerates
3 1 2 broker failures (data risk)
3 2 1 broker failure (safe)
5 3 2 broker failures (safe)

min.insync.replicas=1

With min.insync.replicas=1, a single replica can acknowledge writes. If that replica fails before replication, data is lost even with acks=all.


Segment Configuration

kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name events \
  --alter \
  --add-config segment.bytes=536870912,segment.ms=3600000
Setting Default Description
segment.bytes 1073741824 (1GB) Segment file size
segment.ms 604800000 (7 days) Time before rolling
segment.index.bytes 10485760 Index file size
segment.jitter.ms 0 Random jitter for segment rolling

Index Settings

Setting Default Description
index.interval.bytes 4096 Bytes between index entries
flush.messages MAX_LONG Messages before fsync
flush.ms MAX_LONG Time before fsync

Segment Rolling

Active segments are not eligible for deletion or compaction. Smaller segments enable faster cleanup but increase file handle usage.


Timestamps

kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name events \
  --alter \
  --add-config message.timestamp.type=LogAppendTime
Value Description
CreateTime Producer-set timestamp (default)
LogAppendTime Broker-set timestamp on append

Timestamp Validation

# Reject messages with timestamps too far from broker time
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name events \
  --alter \
  --add-config message.timestamp.difference.max.ms=86400000

Follower Replication

kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name events \
  --alter \
  --add-config follower.replication.throttled.replicas=0:1,0:2,1:1,1:2
Setting Description
follower.replication.throttled.replicas Replicas subject to replication throttle
leader.replication.throttled.replicas Leaders subject to replication throttle

Throttling is used during partition reassignment to limit network impact.


Configuration Reference

All Topic Settings

Setting Default Description
cleanup.policy delete delete, compact, or both
compression.type producer Compression algorithm
delete.retention.ms 86400000 Tombstone retention
file.delete.delay.ms 60000 Delay before file deletion
flush.messages MAX_LONG Messages before fsync
flush.ms MAX_LONG Time before fsync
index.interval.bytes 4096 Index interval
max.compaction.lag.ms MAX_LONG Max time before compaction
max.message.bytes 1048588 Maximum message size
message.timestamp.type CreateTime Timestamp type
message.timestamp.difference.max.ms MAX_LONG Max timestamp difference
min.cleanable.dirty.ratio 0.5 Compaction trigger ratio
min.compaction.lag.ms 0 Min time before compaction
min.insync.replicas 1 Minimum ISR
preallocate false Preallocate segment files
retention.bytes -1 Size retention per partition
retention.ms 604800000 Time retention
segment.bytes 1073741824 Segment size
segment.index.bytes 10485760 Index file size
segment.jitter.ms 0 Segment roll jitter
segment.ms 604800000 Segment roll time
unclean.leader.election.enable false Allow unclean election