Skip to content

Kafka Cluster Management

Internal mechanisms for cluster coordination, metadata management, and administrative operations.


Controller Architecture

uml diagram


Controller Responsibilities

Responsibility Description
Broker registration Track broker membership and liveness
Leader election Elect new partition leaders on failure
Partition assignment Assign partitions to brokers
Topic management Create, delete, and modify topics
Configuration management Store and distribute configurations
Metadata propagation Distribute cluster metadata to brokers
Client quota management Enforce producer/consumer quotas

Metadata Management

Cluster Metadata Topic

In KRaft mode, all metadata is stored in the __cluster_metadata topic.

uml diagram

Metadata Record Types

Record Type Description
RegisterBrokerRecord Broker joins cluster
UnregisterBrokerRecord Broker leaves cluster
TopicRecord Topic created
PartitionRecord Partition assignment
PartitionChangeRecord Leader/ISR change
ConfigRecord Configuration change
ClientQuotaRecord Quota configuration
ProducerIdsRecord Producer ID allocation

Inspecting Metadata

# Dump metadata log
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
  --command "cat"

# Describe cluster
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
  --command "describe"

# List brokers
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
  --command "brokers"

# Show topic details
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
  --command "topic" --topic-name my-topic

Broker Registration

Registration Flow

uml diagram

Broker States

State Description
FENCED Broker registered but not yet active
UNFENCED Broker active and can accept requests
CONTROLLED_SHUTDOWN Broker shutting down gracefully
SHUTDOWN Broker removed from cluster

Heartbeat Configuration

# Broker heartbeat interval
broker.heartbeat.interval.ms=2000

# Session timeout (broker considered dead if no heartbeat)
broker.session.timeout.ms=18000

Leader Election

Election Triggers

Trigger Description
Broker failure Leader broker becomes unavailable
Controlled shutdown Broker initiates graceful shutdown
Manual election Administrator triggers election
Preferred leader Automatic rebalancing to preferred replica

Election Process

uml diagram

Preferred Leader Election

# Trigger preferred leader election
kafka-leader-election.sh --bootstrap-server kafka:9092 \
  --election-type preferred \
  --all-topic-partitions

# For specific topic
kafka-leader-election.sh --bootstrap-server kafka:9092 \
  --election-type preferred \
  --topic my-topic

# Unclean election (data loss risk)
kafka-leader-election.sh --bootstrap-server kafka:9092 \
  --election-type unclean \
  --topic my-topic --partition 0

Partition Assignment

Initial Assignment

When creating a topic, partitions are assigned to brokers considering: - Rack awareness (distribute across racks) - Broker load (balance partition count) - Existing assignments (minimize movement)

# Create topic with specific assignment
kafka-topics.sh --bootstrap-server kafka:9092 \
  --create \
  --topic my-topic \
  --replica-assignment 1:2:3,2:3:1,3:1:2
# Format: partition0_replicas,partition1_replicas,...

Reassignment

# Generate reassignment plan
cat > topics.json << 'EOF'
{
  "topics": [{"topic": "my-topic"}],
  "version": 1
}
EOF

kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --topics-to-move-json-file topics.json \
  --broker-list "1,2,3,4" \
  --generate

# Execute plan
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --throttle 100000000 \
  --execute

# Verify completion
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --verify

Configuration Management

Dynamic Configuration

Configurations can be changed without broker restart.

# Broker configuration
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type brokers \
  --entity-name 1 \
  --alter \
  --add-config log.cleaner.threads=4

# Topic configuration
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name my-topic \
  --alter \
  --add-config retention.ms=86400000

# Client quota
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type users \
  --entity-name producer-user \
  --alter \
  --add-config producer_byte_rate=10485760

Configuration Precedence

Level Description
Per-topic Highest priority, topic-specific
Per-broker Broker-specific override
Cluster-wide dynamic Dynamic default for all brokers
Static (server.properties) File-based configuration
Default Built-in defaults

Controlled Shutdown

Graceful Shutdown Process

uml diagram

Shutdown Configuration

# Enable controlled shutdown
controlled.shutdown.enable=true

# Maximum retries
controlled.shutdown.max.retries=3

# Retry backoff
controlled.shutdown.retry.backoff.ms=5000

Cluster Health Monitoring

Key Metrics

Metric Description Alert Threshold
ActiveControllerCount Active controllers ≠ 1
OfflinePartitionsCount Offline partitions > 0
UnderReplicatedPartitions Under-replicated partitions > 0
GlobalPartitionCount Total partitions Growing unexpectedly

Health Check Commands

# Check controller
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
  --command "describe" | grep -i controller

# Check offline partitions
kafka-topics.sh --bootstrap-server kafka:9092 \
  --describe --unavailable-partitions

# Check under-replicated
kafka-topics.sh --bootstrap-server kafka:9092 \
  --describe --under-replicated-partitions