Skip to content

Kafka Broker Internals

This document describes the internal architecture of a Kafka broker—the threading model, request processing pipeline, storage layer, and coordination mechanisms that enable high-throughput, low-latency message handling.


Architecture Overview

A Kafka broker is a multi-threaded server built around three core subsystems:

uml diagram

Subsystem Responsibility
Network Layer Accept connections, read requests, write responses
Request Handler Pool Execute request logic (produce, fetch, metadata)
Log Manager Manage topic partitions, segments, retention
Replica Manager Leader/follower replication, ISR management
Group Coordinator Consumer group membership, offset commits
Transaction Coordinator Exactly-once semantics, transaction state

Network Architecture

Threading Model

Kafka uses a reactor pattern with distinct thread pools for network I/O and request processing:

uml diagram

Thread Pool Count Configuration Role
Acceptor 1 per listener Fixed Accept new TCP connections
Network Processors num.network.threads 3 default Read requests, write responses (NIO)
Request Handlers num.io.threads 8 default Execute request logic

Request Processing Pipeline

uml diagram

Request Types

API Key Request Handler
0 Produce ReplicaManager.appendRecords()
1 Fetch ReplicaManager.fetchMessages()
2 ListOffsets ReplicaManager.fetchOffsetForTimestamp()
3 Metadata MetadataCache.getTopicMetadata()
8 OffsetCommit GroupCoordinator.handleCommitOffsets()
9 OffsetFetch GroupCoordinator.handleFetchOffsets()
10 FindCoordinator Find group/transaction coordinator
11 JoinGroup GroupCoordinator.handleJoinGroup()
12 Heartbeat GroupCoordinator.handleHeartbeat()
14 SyncGroup GroupCoordinator.handleSyncGroup()
19 CreateTopics Controller via forwarding

Socket Configuration

# Network processor threads (NIO selectors)
num.network.threads=3

# Request handler threads (business logic)
num.io.threads=8

# Maximum queued requests before blocking
queued.max.requests=500

# Socket buffers
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400

# Maximum request size
socket.request.max.bytes=104857600

# Connection limits
max.connections=2147483647
max.connections.per.ip=2147483647

Connection Handling

Each client connection is assigned to a network processor using round-robin. The processor handles all I/O for that connection using NIO selectors.

uml diagram


Log Subsystem

The log subsystem manages durable storage of messages. Each partition is an ordered, immutable sequence of records stored as log segments.

Log Structure

/var/kafka-logs/
├── my-topic-0/                        # Partition directory
│   ├── 00000000000000000000.log       # Segment file (messages)
│   ├── 00000000000000000000.index     # Offset index
│   ├── 00000000000000000000.timeindex # Time index
│   ├── 00000000000012345678.log       # Next segment
│   ├── 00000000000012345678.index
│   ├── 00000000000012345678.timeindex
│   ├── leader-epoch-checkpoint        # Leader epoch history
│   └── partition.metadata             # Topic ID mapping
├── my-topic-1/
│   └── ...
└── __consumer_offsets-0/              # Internal topic
    └── ...

Segment Files

Each segment consists of three files:

File Extension Purpose
Log .log Record batches (actual data)
Offset Index .index Sparse offset → file position mapping
Time Index .timeindex Sparse timestamp → offset mapping

Segment naming uses the base offset (first offset in segment) zero-padded to 20 digits.

Record Batch Format

uml diagram

Offset Index

The offset index is a sparse index mapping offsets to file positions. Not every offset is indexed—entries are added every index.interval.bytes (default 4KB).

uml diagram

Lookup algorithm:

  1. Binary search index for largest offset ≤ target
  2. Sequential scan from that position in log file
  3. Return record at target offset

Time Index

The time index maps timestamps to offsets, enabling time-based lookups:

uml diagram

Segment Lifecycle

uml diagram

Roll conditions (new segment created when any is true):

Condition Configuration Default
Size limit log.segment.bytes 1 GB
Time limit log.roll.ms / log.roll.hours 7 days
Index full log.index.size.max.bytes 10 MB
Offset overflow Relative offset exceeds 32-bit ~2 billion records

Log Retention

Policy Configuration Behavior
Time-based log.retention.hours (168 default) Delete segments older than threshold
Size-based log.retention.bytes (-1 default) Delete oldest segments when partition exceeds size
Compaction log.cleanup.policy=compact Keep only latest value per key
# Retention settings
log.retention.hours=168              # 7 days
log.retention.bytes=-1               # No size limit
log.retention.check.interval.ms=300000  # Check every 5 minutes

# Segment settings
log.segment.bytes=1073741824         # 1 GB segments
log.segment.ms=604800000             # Roll after 7 days

# Compaction
log.cleanup.policy=delete            # or compact, or delete,compact
log.cleaner.enable=true
log.cleaner.threads=1
min.cleanable.dirty.ratio=0.5

Log Compaction

Compaction retains only the last value for each key, enabling "table" semantics:

uml diagram

Tombstones: A record with null value marks key for deletion. Tombstones are retained for delete.retention.ms (24 hours default) before removal.


Replica Manager

The Replica Manager handles partition replication, leader election, and ISR (In-Sync Replicas) management.

Replica Types

Type Role Responsibilities
Leader Primary Handle produce/fetch, maintain ISR
Follower Secondary Fetch from leader, catch up
Observer Read-only (KIP-392) Fetch without joining ISR

ISR (In-Sync Replicas)

A replica is in the ISR if it:

  1. Has fetched up to the leader's log end offset within replica.lag.time.max.ms (30 seconds default)
  2. Is not offline

uml diagram

High Watermark

The high watermark (HW) is the offset up to which all ISR replicas have replicated. Consumers can only read up to the HW.

uml diagram

Leader Epoch

Leader epochs prevent log divergence after leader changes:

uml diagram

Fetch Protocol

Followers fetch from leaders using the same Fetch API as consumers:

uml diagram

Configuration:

# Replica fetcher threads per source broker
num.replica.fetchers=1

# Wait time for fetch when no data
replica.fetch.wait.max.ms=500

# Maximum bytes per fetch
replica.fetch.max.bytes=1048576

# Minimum bytes before responding
replica.fetch.min.bytes=1

# ISR lag threshold
replica.lag.time.max.ms=30000

Request Purgatory

The purgatory holds delayed requests waiting for conditions to be satisfied. This enables efficient handling of requests with timeout semantics.

Delayed Operations

Operation Condition Example
DelayedProduce acks=all waiting for ISR Produce with acks=all
DelayedFetch Not enough data available Consumer fetch min.bytes
DelayedJoin Waiting for all members Consumer group join
DelayedHeartbeat Waiting for session timeout Member liveness check

Purgatory Architecture

uml diagram

Produce Acknowledgment Flow (acks=all)

uml diagram

Fetch Wait Flow

uml diagram

Timer Wheel

Kafka uses a hierarchical timing wheel for efficient timeout management:

uml diagram


Group Coordinator

Each broker hosts a Group Coordinator responsible for consumer groups whose __consumer_offsets partition leader resides on that broker.

Group Coordinator Assignment

uml diagram

Consumer Group State Machine

uml diagram

State Description
Empty No active members, may have committed offsets
PreparingRebalance Waiting for members to join
CompletingRebalance Waiting for leader to sync assignments
Stable Normal operation, members consuming
Dead No members, offsets expired

Rebalance Protocol

uml diagram

Offset Storage

Consumer offsets are stored in __consumer_offsets (50 partitions by default):

Key: (group_id, topic, partition)
Value: (offset, metadata, commit_timestamp)

Example:
Key:   ("my-group", "orders", 0)
Value: (offset=12345, metadata="", timestamp=1703001600000)

Configuration:

# __consumer_offsets topic settings
offsets.topic.num.partitions=50
offsets.topic.replication.factor=3
offsets.retention.minutes=10080  # 7 days

# Group coordinator settings
group.initial.rebalance.delay.ms=3000
group.max.session.timeout.ms=1800000
group.min.session.timeout.ms=6000

Transaction Coordinator

The Transaction Coordinator manages exactly-once semantics for transactional producers.

Transaction State Storage

Transaction state is stored in __transaction_state (50 partitions by default):

Key: transactional.id
Value: (producer_id, producer_epoch, state, partitions, timeout)

Transaction States

uml diagram

Transaction Flow

uml diagram


Internal Topics

Topic Partitions Purpose
__consumer_offsets 50 Consumer group offsets
__transaction_state 50 Transaction coordinator state
__cluster_metadata 1 KRaft metadata log

These topics use log compaction and have special retention policies:

# Consumer offsets
offsets.topic.replication.factor=3
offsets.topic.num.partitions=50
offsets.retention.minutes=10080

# Transaction state
transaction.state.log.replication.factor=3
transaction.state.log.num.partitions=50
transaction.state.log.min.isr=2

Fetch Sessions (KIP-227)

Fetch sessions reduce bandwidth by sending only changed partition data:

Without Fetch Sessions

Request:  [partition list + fetch positions] → 100+ bytes per partition
Response: [partition list + data] → repeated metadata

With Fetch Sessions

First Request:  [full partition list]
First Response: [sessionId, data]

Subsequent:     [sessionId only, or delta]
Response:       [changed partitions only]

Benefits:

  • 50%+ reduction in fetch request size
  • Lower CPU for metadata processing
  • Better performance with many partitions

Memory Architecture

Kafka relies heavily on the OS page cache rather than JVM heap:

uml diagram

Memory Recommendations

Component Memory Source Sizing
JVM Heap Configured 4-8 GB typically sufficient
Page Cache Remaining RAM As much as possible
Direct Buffers Off-heap Network I/O buffers
# JVM settings (in kafka-server-start.sh)
export KAFKA_HEAP_OPTS="-Xms6g -Xmx6g"

# For a 32 GB server:
# - 6 GB JVM heap
# - 26 GB available for page cache

Zero-Copy Transfer

Kafka uses sendfile() syscall for zero-copy transfer from page cache to network:

uml diagram


Broker Startup and Recovery

When a broker starts—especially after a crash—it must recover its state before serving requests. Understanding this process is critical for capacity planning and incident response.

Startup Sequence

uml diagram

Log Recovery Process

On startup, each partition's log must be recovered:

uml diagram

Clean vs Unclean Shutdown

Shutdown Type Detection Recovery Behavior
Clean .kafka_cleanshutdown marker exists Skip log scanning, fast startup
Unclean No marker (crash, kill -9, power loss) Full log recovery, validate all segments

The clean shutdown marker is written during controlled shutdown and deleted on startup.

Index Recovery

Indexes (.index and .timeindex) can become corrupted or out of sync:

Scenario Detection Action
Missing index file File not found Rebuild from log
Truncated index Size mismatch Rebuild from log
Corrupted entries Validation failure Rebuild from log
Index ahead of log Offset beyond log end Truncate index

Index rebuild cost:

Partition Size Approximate Rebuild Time
1 GB 5-15 seconds
10 GB 30-90 seconds
100 GB 5-15 minutes

Many Partitions = Slow Startup

Index rebuilding happens sequentially per partition. A broker with 1000 partitions requiring index rebuild can take 30+ minutes to start.

Log Truncation

After a leader failure, the new leader may have a different log end. Followers must truncate divergent entries:

uml diagram

Why truncation happens:

  1. Old leader accepted writes that weren't fully replicated
  2. Old leader crashed before replicating to new leader
  3. New leader was elected with fewer records
  4. Followers must match new leader's log

Data Loss During Truncation

Records truncated from followers were never fully committed (not in ISR). With acks=all, producers would have received errors. With acks=1, producers may believe these records were committed.

Corrupted Segment Handling

Corruption Type Location Recovery Action
Invalid magic byte Segment header Truncate segment, lose subsequent data
CRC mismatch Record batch Truncate at corruption point
Incomplete record End of segment Truncate incomplete bytes
Invalid offset sequence Mid-segment Truncate at invalid offset

Corruption indicators in logs:

WARN  Found invalid message ... Truncating log
ERROR Corrupt index found ... rebuilding
WARN  Log recovery ... truncated X bytes from segment

Recovery Configuration

# Skip recovery for clean shutdown (default: true)
# Set to false to always validate on startup
log.recovery.per.partition.enabled=true

# Number of threads for log recovery
num.recovery.threads.per.data.dir=1

# Increase for faster recovery (uses more CPU/IO)
# num.recovery.threads.per.data.dir=4

# Unclean leader election (data loss risk)
unclean.leader.election.enable=false

Startup Time Factors

Factor Impact Mitigation
Partition count Linear increase Fewer partitions per broker
Unclean shutdown Full log scan required Graceful shutdown procedures
Corrupted indexes Rebuild time per partition Fast storage, monitoring
Log segment size Larger = slower scan Smaller segments (trade-off: more files)
Storage type HDD much slower than SSD Use SSDs
Recovery threads More threads = faster Increase num.recovery.threads.per.data.dir

Monitoring Recovery

# Watch broker logs during startup
tail -f /var/log/kafka/server.log | grep -E "(Loading|Loaded|recovery|truncat)"

# Expected log sequence:
# Loading logs
# Recovering segment ...
# Loaded log for partition ...
# Recovery complete

JMX metrics during startup:

Metric Meaning
kafka.log:type=LogManager,name=LogsLoadedPerSec Partitions loaded per second
kafka.server:type=BrokerTopicMetrics,name=UnderReplicatedPartitions Partitions still catching up

Best Practices for Fast Recovery

  1. Use controlled shutdown - Always prefer graceful shutdown over kill -9
  2. Monitor disk health - Corrupted sectors cause segment corruption
  3. Size partitions appropriately - Smaller partitions = faster recovery
  4. Use SSDs - 10x+ faster recovery than HDDs
  5. Increase recovery threads - For brokers with many partitions
  6. Regular restarts - Validates recovery process works before emergencies

Metrics Reference

Network Metrics

Metric Description
kafka.network:type=RequestChannel,name=RequestQueueSize Pending requests
kafka.network:type=RequestChannel,name=ResponseQueueSize Pending responses
kafka.network:type=Processor,name=IdlePercent Processor utilization
kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent Average processor idle

Log Metrics

Metric Description
kafka.log:type=LogFlushStats,name=LogFlushRateAndTimeMs Flush rate
kafka.log:type=Log,name=Size Partition size
kafka.log:type=Log,name=NumLogSegments Segment count

Replica Metrics

Metric Description
kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions Partitions below RF
kafka.server:type=ReplicaManager,name=UnderMinIsrPartitionCount Partitions below min ISR
kafka.server:type=ReplicaManager,name=IsrShrinksPerSec ISR shrink rate
kafka.server:type=ReplicaManager,name=IsrExpandsPerSec ISR expand rate

Purgatory Metrics

Metric Description
kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Produce Delayed produces
kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=Fetch Delayed fetches