Kafka Cluster Management
Internal mechanisms for cluster coordination, metadata management, and administrative operations.
Controller Architecture

Controller Responsibilities
| Responsibility |
Description |
| Broker registration |
Track broker membership and liveness |
| Leader election |
Elect new partition leaders on failure |
| Partition assignment |
Assign partitions to brokers |
| Topic management |
Create, delete, and modify topics |
| Configuration management |
Store and distribute configurations |
| Metadata propagation |
Distribute cluster metadata to brokers |
| Client quota management |
Enforce producer/consumer quotas |
In KRaft mode, all metadata is stored in the __cluster_metadata topic.

| Record Type |
Description |
RegisterBrokerRecord |
Broker joins cluster |
UnregisterBrokerRecord |
Broker leaves cluster |
TopicRecord |
Topic created |
PartitionRecord |
Partition assignment |
PartitionChangeRecord |
Leader/ISR change |
ConfigRecord |
Configuration change |
ClientQuotaRecord |
Quota configuration |
ProducerIdsRecord |
Producer ID allocation |
# Dump metadata log
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
--command "cat"
# Describe cluster
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
--command "describe"
# List brokers
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
--command "brokers"
# Show topic details
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/00000000000000000000.log \
--command "topic" --topic-name my-topic
Broker Registration
Registration Flow

Broker States
| State |
Description |
| FENCED |
Broker registered but not yet active |
| UNFENCED |
Broker active and can accept requests |
| CONTROLLED_SHUTDOWN |
Broker shutting down gracefully |
| SHUTDOWN |
Broker removed from cluster |
Heartbeat Configuration
# Broker heartbeat interval
broker.heartbeat.interval.ms=2000
# Session timeout (broker considered dead if no heartbeat)
broker.session.timeout.ms=18000
Leader Election
Election Triggers
| Trigger |
Description |
| Broker failure |
Leader broker becomes unavailable |
| Controlled shutdown |
Broker initiates graceful shutdown |
| Manual election |
Administrator triggers election |
| Preferred leader |
Automatic rebalancing to preferred replica |
Election Process

Preferred Leader Election
# Trigger preferred leader election
kafka-leader-election.sh --bootstrap-server kafka:9092 \
--election-type preferred \
--all-topic-partitions
# For specific topic
kafka-leader-election.sh --bootstrap-server kafka:9092 \
--election-type preferred \
--topic my-topic
# Unclean election (data loss risk)
kafka-leader-election.sh --bootstrap-server kafka:9092 \
--election-type unclean \
--topic my-topic --partition 0
Partition Assignment
Initial Assignment
When creating a topic, partitions are assigned to brokers considering:
- Rack awareness (distribute across racks)
- Broker load (balance partition count)
- Existing assignments (minimize movement)
# Create topic with specific assignment
kafka-topics.sh --bootstrap-server kafka:9092 \
--create \
--topic my-topic \
--replica-assignment 1:2:3,2:3:1,3:1:2
# Format: partition0_replicas,partition1_replicas,...
Reassignment
# Generate reassignment plan
cat > topics.json << 'EOF'
{
"topics": [{"topic": "my-topic"}],
"version": 1
}
EOF
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--topics-to-move-json-file topics.json \
--broker-list "1,2,3,4" \
--generate
# Execute plan
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--reassignment-json-file reassignment.json \
--throttle 100000000 \
--execute
# Verify completion
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--reassignment-json-file reassignment.json \
--verify
Configuration Management
Dynamic Configuration
Configurations can be changed without broker restart.
# Broker configuration
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type brokers \
--entity-name 1 \
--alter \
--add-config log.cleaner.threads=4
# Topic configuration
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics \
--entity-name my-topic \
--alter \
--add-config retention.ms=86400000
# Client quota
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type users \
--entity-name producer-user \
--alter \
--add-config producer_byte_rate=10485760
Configuration Precedence
| Level |
Description |
| Per-topic |
Highest priority, topic-specific |
| Per-broker |
Broker-specific override |
| Cluster-wide dynamic |
Dynamic default for all brokers |
| Static (server.properties) |
File-based configuration |
| Default |
Built-in defaults |
Controlled Shutdown
Graceful Shutdown Process

Shutdown Configuration
# Enable controlled shutdown
controlled.shutdown.enable=true
# Maximum retries
controlled.shutdown.max.retries=3
# Retry backoff
controlled.shutdown.retry.backoff.ms=5000
Cluster Health Monitoring
Key Metrics
| Metric |
Description |
Alert Threshold |
ActiveControllerCount |
Active controllers |
≠ 1 |
OfflinePartitionsCount |
Offline partitions |
> 0 |
UnderReplicatedPartitions |
Under-replicated partitions |
> 0 |
GlobalPartitionCount |
Total partitions |
Growing unexpectedly |
Health Check Commands
# Check controller
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
--command "describe" | grep -i controller
# Check offline partitions
kafka-topics.sh --bootstrap-server kafka:9092 \
--describe --unavailable-partitions
# Check under-replicated
kafka-topics.sh --bootstrap-server kafka:9092 \
--describe --under-replicated-partitions