Kafka Broker Internals¶
This document describes the internal architecture of a Kafka broker—the threading model, request processing pipeline, storage layer, and coordination mechanisms that enable high-throughput, low-latency message handling.
Architecture Overview¶
A Kafka broker is a multi-threaded server built around three core subsystems:
| Subsystem | Responsibility |
|---|---|
| Network Layer | Accept connections, read requests, write responses |
| Request Handler Pool | Execute request logic (produce, fetch, metadata) |
| Log Manager | Manage topic partitions, segments, retention |
| Replica Manager | Leader/follower replication, ISR management |
| Group Coordinator | Consumer group membership, offset commits |
| Transaction Coordinator | Exactly-once semantics, transaction state |
Network Architecture¶
Threading Model¶
Kafka uses a reactor pattern with distinct thread pools for network I/O and request processing:
| Thread Pool | Count | Configuration | Role |
|---|---|---|---|
| Acceptor | 1 per listener | Fixed | Accept new TCP connections |
| Network Processors | num.network.threads |
3 default | Read requests, write responses (NIO) |
| Request Handlers | num.io.threads |
8 default | Execute request logic |
Request Processing Pipeline¶
Request Types¶
| API Key | Request | Handler |
|---|---|---|
| 0 | Produce | ReplicaManager.appendRecords() |
| 1 | Fetch | ReplicaManager.fetchMessages() |
| 2 | ListOffsets | ReplicaManager.fetchOffsetForTimestamp() |
| 3 | Metadata | MetadataCache.getTopicMetadata() |
| 8 | OffsetCommit | GroupCoordinator.handleCommitOffsets() |
| 9 | OffsetFetch | GroupCoordinator.handleFetchOffsets() |
| 10 | FindCoordinator | Find group/transaction coordinator |
| 11 | JoinGroup | GroupCoordinator.handleJoinGroup() |
| 12 | Heartbeat | GroupCoordinator.handleHeartbeat() |
| 14 | SyncGroup | GroupCoordinator.handleSyncGroup() |
| 19 | CreateTopics | Controller via forwarding |
Socket Configuration¶
# Network processor threads (NIO selectors)
num.network.threads=3
# Request handler threads (business logic)
num.io.threads=8
# Maximum queued requests before blocking
queued.max.requests=500
# Socket buffers
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
# Maximum request size
socket.request.max.bytes=104857600
# Connection limits
max.connections=2147483647
max.connections.per.ip=2147483647
Connection Handling¶
Each client connection is assigned to a network processor using round-robin. The processor handles all I/O for that connection using NIO selectors.
Log Subsystem¶
The log subsystem manages durable storage of messages. Each partition is an ordered, immutable sequence of records stored as log segments.
Log Structure¶
/var/kafka-logs/
├── my-topic-0/ # Partition directory
│ ├── 00000000000000000000.log # Segment file (messages)
│ ├── 00000000000000000000.index # Offset index
│ ├── 00000000000000000000.timeindex # Time index
│ ├── 00000000000012345678.log # Next segment
│ ├── 00000000000012345678.index
│ ├── 00000000000012345678.timeindex
│ ├── leader-epoch-checkpoint # Leader epoch history
│ └── partition.metadata # Topic ID mapping
├── my-topic-1/
│ └── ...
└── __consumer_offsets-0/ # Internal topic
└── ...
Segment Files¶
Each segment consists of three files:
| File | Extension | Purpose |
|---|---|---|
| Log | .log |
Record batches (actual data) |
| Offset Index | .index |
Sparse offset → file position mapping |
| Time Index | .timeindex |
Sparse timestamp → offset mapping |
Segment naming uses the base offset (first offset in segment) zero-padded to 20 digits.
Record Batch Format¶
Offset Index¶
The offset index is a sparse index mapping offsets to file positions. Not every offset is indexed—entries are added every index.interval.bytes (default 4KB).
Lookup algorithm:
- Binary search index for largest offset ≤ target
- Sequential scan from that position in log file
- Return record at target offset
Time Index¶
The time index maps timestamps to offsets, enabling time-based lookups:
Segment Lifecycle¶
Roll conditions (new segment created when any is true):
| Condition | Configuration | Default |
|---|---|---|
| Size limit | log.segment.bytes |
1 GB |
| Time limit | log.roll.ms / log.roll.hours |
7 days |
| Index full | log.index.size.max.bytes |
10 MB |
| Offset overflow | Relative offset exceeds 32-bit | ~2 billion records |
Log Retention¶
| Policy | Configuration | Behavior |
|---|---|---|
| Time-based | log.retention.hours (168 default) |
Delete segments older than threshold |
| Size-based | log.retention.bytes (-1 default) |
Delete oldest segments when partition exceeds size |
| Compaction | log.cleanup.policy=compact |
Keep only latest value per key |
# Retention settings
log.retention.hours=168 # 7 days
log.retention.bytes=-1 # No size limit
log.retention.check.interval.ms=300000 # Check every 5 minutes
# Segment settings
log.segment.bytes=1073741824 # 1 GB segments
log.segment.ms=604800000 # Roll after 7 days
# Compaction
log.cleanup.policy=delete # or compact, or delete,compact
log.cleaner.enable=true
log.cleaner.threads=1
min.cleanable.dirty.ratio=0.5
Log Compaction¶
Compaction retains only the last value for each key, enabling "table" semantics:
Tombstones: A record with null value marks key for deletion. Tombstones are retained for delete.retention.ms (24 hours default) before removal.
Replica Manager¶
The Replica Manager handles partition replication, leader election, and ISR (In-Sync Replicas) management.
Replica Types¶
| Type | Role | Responsibilities |
|---|---|---|
| Leader | Primary | Handle produce/fetch, maintain ISR |
| Follower | Secondary | Fetch from leader, catch up |
| Observer | Read-only (KIP-392) | Fetch without joining ISR |
ISR (In-Sync Replicas)¶
A replica is in the ISR if it:
- Has fetched up to the leader's log end offset within
replica.lag.time.max.ms(30 seconds default) - Is not offline
High Watermark¶
The high watermark (HW) is the offset up to which all ISR replicas have replicated. Consumers can only read up to the HW.
Leader Epoch¶
Leader epochs prevent log divergence after leader changes:
Fetch Protocol¶
Followers fetch from leaders using the same Fetch API as consumers:
Configuration:
# Replica fetcher threads per source broker
num.replica.fetchers=1
# Wait time for fetch when no data
replica.fetch.wait.max.ms=500
# Maximum bytes per fetch
replica.fetch.max.bytes=1048576
# Minimum bytes before responding
replica.fetch.min.bytes=1
# ISR lag threshold
replica.lag.time.max.ms=30000
Request Purgatory¶
The purgatory holds delayed requests waiting for conditions to be satisfied. This enables efficient handling of requests with timeout semantics.
Delayed Operations¶
| Operation | Condition | Example |
|---|---|---|
| DelayedProduce | acks=all waiting for ISR |
Produce with acks=all |
| DelayedFetch | Not enough data available | Consumer fetch min.bytes |
| DelayedJoin | Waiting for all members | Consumer group join |
| DelayedHeartbeat | Waiting for session timeout | Member liveness check |
Purgatory Architecture¶
Produce Acknowledgment Flow (acks=all)¶
Fetch Wait Flow¶
Timer Wheel¶
Kafka uses a hierarchical timing wheel for efficient timeout management:
Group Coordinator¶
Each broker hosts a Group Coordinator responsible for consumer groups whose __consumer_offsets partition leader resides on that broker.
Group Coordinator Assignment¶
Consumer Group State Machine¶
| State | Description |
|---|---|
| Empty | No active members, may have committed offsets |
| PreparingRebalance | Waiting for members to join |
| CompletingRebalance | Waiting for leader to sync assignments |
| Stable | Normal operation, members consuming |
| Dead | No members, offsets expired |
Rebalance Protocol¶
Offset Storage¶
Consumer offsets are stored in __consumer_offsets (50 partitions by default):
Key: (group_id, topic, partition)
Value: (offset, metadata, commit_timestamp)
Example:
Key: ("my-group", "orders", 0)
Value: (offset=12345, metadata="", timestamp=1703001600000)
Configuration:
# __consumer_offsets topic settings
offsets.topic.num.partitions=50
offsets.topic.replication.factor=3
offsets.retention.minutes=10080 # 7 days
# Group coordinator settings
group.initial.rebalance.delay.ms=3000
group.max.session.timeout.ms=1800000
group.min.session.timeout.ms=6000
Transaction Coordinator¶
The Transaction Coordinator manages exactly-once semantics for transactional producers.
Transaction State Storage¶
Transaction state is stored in __transaction_state (50 partitions by default):
Key: transactional.id
Value: (producer_id, producer_epoch, state, partitions, timeout)
Transaction States¶
Transaction Flow¶
Internal Topics¶
| Topic | Partitions | Purpose |
|---|---|---|
__consumer_offsets |
50 | Consumer group offsets |
__transaction_state |
50 | Transaction coordinator state |
__cluster_metadata |
1 | KRaft metadata log |
These topics use log compaction and have special retention policies:
# Consumer offsets
offsets.topic.replication.factor=3
offsets.topic.num.partitions=50
offsets.retention.minutes=10080
# Transaction state
transaction.state.log.replication.factor=3
transaction.state.log.num.partitions=50
transaction.state.log.min.isr=2
Fetch Sessions (KIP-227)¶
Fetch sessions reduce bandwidth by sending only changed partition data:
Without Fetch Sessions¶
Request: [partition list + fetch positions] → 100+ bytes per partition
Response: [partition list + data] → repeated metadata
With Fetch Sessions¶
First Request: [full partition list]
First Response: [sessionId, data]
Subsequent: [sessionId only, or delta]
Response: [changed partitions only]
Benefits:
- 50%+ reduction in fetch request size
- Lower CPU for metadata processing
- Better performance with many partitions
Memory Architecture¶
Kafka relies heavily on the OS page cache rather than JVM heap:
Memory Recommendations¶
| Component | Memory Source | Sizing |
|---|---|---|
| JVM Heap | Configured | 4-8 GB typically sufficient |
| Page Cache | Remaining RAM | As much as possible |
| Direct Buffers | Off-heap | Network I/O buffers |
# JVM settings (in kafka-server-start.sh)
export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"
# For a 32 GB server:
# - 6 GB JVM heap
# - 26 GB available for page cache
Zero-Copy Transfer¶
Kafka uses sendfile() syscall for zero-copy transfer from page cache to network:
Broker Startup and Recovery¶
When a broker starts—especially after a crash—it must recover its state before serving requests. Understanding this process is critical for capacity planning and incident response.
Startup Sequence¶
Log Recovery Process¶
On startup, each partition's log must be recovered:
Clean vs Unclean Shutdown¶
| Shutdown Type | Detection | Recovery Behavior |
|---|---|---|
| Clean | .kafka_cleanshutdown marker exists |
Skip log scanning, fast startup |
| Unclean | No marker (crash, kill -9, power loss) | Full log recovery, validate all segments |
The clean shutdown marker is written during controlled shutdown and deleted on startup.
Index Recovery¶
Indexes (.index and .timeindex) can become corrupted or out of sync:
| Scenario | Detection | Action |
|---|---|---|
| Missing index file | File not found | Rebuild from log |
| Truncated index | Size mismatch | Rebuild from log |
| Corrupted entries | Validation failure | Rebuild from log |
| Index ahead of log | Offset beyond log end | Truncate index |
Index rebuild cost:
| Partition Size | Approximate Rebuild Time |
|---|---|
| 1 GB | 5-15 seconds |
| 10 GB | 30-90 seconds |
| 100 GB | 5-15 minutes |
Many Partitions = Slow Startup
Index rebuilding happens sequentially per partition. A broker with 1000 partitions requiring index rebuild can take 30+ minutes to start.
Log Truncation¶
After a leader failure, the new leader may have a different log end. Followers must truncate divergent entries:
Why truncation happens:
- Old leader accepted writes that weren't fully replicated
- Old leader crashed before replicating to new leader
- New leader was elected with fewer records
- Followers must match new leader's log
Data Loss During Truncation
Records truncated from followers were never fully committed (not in ISR). With acks=all, producers would have received errors. With acks=1, producers may believe these records were committed.
Corrupted Segment Handling¶
| Corruption Type | Location | Recovery Action |
|---|---|---|
| Invalid magic byte | Segment header | Truncate segment, lose subsequent data |
| CRC mismatch | Record batch | Truncate at corruption point |
| Incomplete record | End of segment | Truncate incomplete bytes |
| Invalid offset sequence | Mid-segment | Truncate at invalid offset |
Corruption indicators in logs:
WARN Found invalid message ... Truncating log
ERROR Corrupt index found ... rebuilding
WARN Log recovery ... truncated X bytes from segment
Recovery Configuration¶
# Skip recovery for clean shutdown (default: true)
# Set to false to always validate on startup
log.recovery.per.partition.enabled=true
# Number of threads for log recovery
num.recovery.threads.per.data.dir=1
# Increase for faster recovery (uses more CPU/IO)
# num.recovery.threads.per.data.dir=4
# Unclean leader election (data loss risk)
unclean.leader.election.enable=false
Startup Time Factors¶
| Factor | Impact | Mitigation |
|---|---|---|
| Partition count | Linear increase | Fewer partitions per broker |
| Unclean shutdown | Full log scan required | Graceful shutdown procedures |
| Corrupted indexes | Rebuild time per partition | Fast storage, monitoring |
| Log segment size | Larger = slower scan | Smaller segments (trade-off: more files) |
| Storage type | HDD much slower than SSD | Use SSDs |
| Recovery threads | More threads = faster | Increase num.recovery.threads.per.data.dir |
Monitoring Recovery¶
# Watch broker logs during startup
tail -f /var/log/kafka/server.log | grep -E "(Loading|Loaded|recovery|truncat)"
# Expected log sequence:
# Loading logs
# Recovering segment ...
# Loaded log for partition ...
# Recovery complete
JMX metrics during startup:
| Metric | Meaning |
|---|---|
kafka.log:type=LogManager,name=LogsLoadedPerSec |
Partitions loaded per second |
kafka.server:type=BrokerTopicMetrics,name=UnderReplicatedPartitions |
Partitions still catching up |
Best Practices for Fast Recovery¶
- Use controlled shutdown - Always prefer graceful shutdown over kill -9
- Monitor disk health - Corrupted sectors cause segment corruption
- Size partitions appropriately - Smaller partitions = faster recovery
- Use SSDs - 10x+ faster recovery than HDDs
- Increase recovery threads - For brokers with many partitions
- Regular restarts - Validates recovery process works before emergencies
Metrics Reference¶
Network Metrics¶
| Metric | Description |
|---|---|
kafka.network:type=RequestChannel,name=RequestQueueSize |
Pending requests |
kafka.network:type=RequestChannel,name=ResponseQueueSize |
Pending responses |
kafka.network:type=Processor,name=IdlePercent |
Processor utilization |
kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent |
Average processor idle |
Log Metrics¶
| Metric | Description |
|---|---|
kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs |
Flush rate |
kafka.log:type=Log,name=Size |
Partition size |
kafka.log:type=Log,name=NumLogSegments |
Segment count |
Replica Metrics¶
| Metric | Description |
|---|---|
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions |
Partitions below RF |
kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount |
Partitions below min ISR |
kafka.server:type=ReplicaManager,name=IsrShrinksPerSec |
ISR shrink rate |
kafka.server:type=ReplicaManager,name=IsrExpandsPerSec |
ISR expand rate |
Purgatory Metrics¶
| Metric | Description |
|---|---|
kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce |
Delayed produces |
kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch |
Delayed fetches |
Related Documentation¶
- Broker Overview - KRaft, configuration, lifecycle
- Replication - Replication protocol details
- Storage Engine - Log segment internals
- Performance Internals - Optimization