Kafka Cluster Management¶
Operational procedures for managing Apache Kafka clusters.
Cluster Operations Overview¶
Adding Brokers¶
Pre-Addition Checklist¶
- [ ] New broker has same Kafka version
- [ ] Network connectivity to all existing brokers
- [ ] Sufficient disk space
- [ ] Proper configuration (broker.id, listeners, etc.)
Addition Process¶
- Configure the new broker
# server.properties
broker.id=4 # Unique ID
node.id=4 # KRaft mode
# Controller connection
controller.quorum.voters=1@ctrl1:9093,2@ctrl2:9093,3@ctrl3:9093
# Listeners
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://broker4:9092
# Log directories
log.dirs=/var/kafka-logs
# Rack (if applicable)
broker.rack=rack-2
- Start the broker
# Format storage (KRaft mode, first time only)
kafka-storage.sh format -t <cluster-id> -c /etc/kafka/server.properties
# Start broker
kafka-server-start.sh /etc/kafka/server.properties
- Verify registration
# Check broker is registered
kafka-broker-api-versions.sh --bootstrap-server broker4:9092
# Check cluster membership
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
--command "brokers"
- Reassign partitions to new broker
# Generate reassignment plan including new broker
cat > topics.json << 'EOF'
{
"topics": [{"topic": "topic1"}, {"topic": "topic2"}],
"version": 1
}
EOF
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--topics-to-move-json-file topics.json \
--broker-list "1,2,3,4" \
--generate
# Execute reassignment
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--reassignment-json-file reassignment.json \
--throttle 100000000 \
--execute
Removing Brokers¶
Pre-Removal Checklist¶
- [ ] All partitions have replicas on other brokers
- [ ] Reassignment completed successfully
- [ ] No under-replicated partitions
Decommission Process¶
- Move all partitions off the broker
# Generate plan without the broker to remove (broker 4)
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--topics-to-move-json-file topics.json \
--broker-list "1,2,3" \
--generate
# Execute
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--reassignment-json-file reassignment.json \
--throttle 100000000 \
--execute
# Verify completion
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--reassignment-json-file reassignment.json \
--verify
- Verify no partitions remain
# Check broker has no partitions
kafka-log-dirs.sh --bootstrap-server kafka:9092 \
--describe --broker-list 4
- Graceful shutdown
# Stop broker gracefully (controlled shutdown)
kafka-server-stop.sh
# Or
kill -TERM <kafka-pid>
- Verify removal
# Check broker is no longer registered
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
--command "brokers"
Rolling Upgrades¶
Upgrade Process¶
Step-by-Step¶
- Prepare for upgrade
# Before upgrade, set in server.properties
inter.broker.protocol.version=3.6
log.message.format.version=3.6
- Upgrade each broker
#!/bin/bash
# upgrade-broker.sh
BROKER=$1
echo "Upgrading $BROKER..."
# Stop broker
ssh $BROKER "sudo systemctl stop kafka"
# Install new version
ssh $BROKER "sudo yum install kafka-3.7.0" # Or appropriate command
# Start broker
ssh $BROKER "sudo systemctl start kafka"
# Wait for recovery
sleep 30
# Verify health
kafka-broker-api-versions.sh --bootstrap-server $BROKER:9092
# Check under-replicated partitions
kafka-topics.sh --bootstrap-server $BROKER:9092 \
--describe --under-replicated-partitions
echo "Waiting for ISR to recover..."
while true; do
URP=$(kafka-topics.sh --bootstrap-server $BROKER:9092 \
--describe --under-replicated-partitions 2>/dev/null | wc -l)
if [ "$URP" -eq 0 ]; then
echo "ISR recovered"
break
fi
sleep 10
done
- After all brokers upgraded
# Update protocol version
inter.broker.protocol.version=3.7
log.message.format.version=3.7
- Rolling restart to apply
for broker in broker1 broker2 broker3; do
./upgrade-broker.sh $broker
done
Partition Reassignment¶
Generate Reassignment Plan¶
# Create topics file
cat > topics.json << 'EOF'
{
"topics": [
{"topic": "orders"},
{"topic": "events"}
],
"version": 1
}
EOF
# Generate plan
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--topics-to-move-json-file topics.json \
--broker-list "1,2,3,4" \
--generate
Custom Reassignment¶
{
"version": 1,
"partitions": [
{"topic": "orders", "partition": 0, "replicas": [1,2,3]},
{"topic": "orders", "partition": 1, "replicas": [2,3,4]},
{"topic": "orders", "partition": 2, "replicas": [3,4,1]}
]
}
Execute with Throttling¶
# Start reassignment with throttle (100 MB/s)
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--reassignment-json-file reassignment.json \
--throttle 100000000 \
--execute
# Monitor progress
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--reassignment-json-file reassignment.json \
--verify
# Adjust throttle if needed
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--reassignment-json-file reassignment.json \
--throttle 200000000 \
--execute
# Remove throttle after completion
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--reassignment-json-file reassignment.json \
--verify
Leader Election¶
Preferred Leader Election¶
# All partitions
kafka-leader-election.sh --bootstrap-server kafka:9092 \
--election-type preferred \
--all-topic-partitions
# Specific topic
kafka-leader-election.sh --bootstrap-server kafka:9092 \
--election-type preferred \
--topic my-topic
# Specific partition
kafka-leader-election.sh --bootstrap-server kafka:9092 \
--election-type preferred \
--topic my-topic \
--partition 0
Unclean Leader Election¶
Data Loss Risk
Unclean leader election allows out-of-sync replicas to become leader, potentially causing data loss.
kafka-leader-election.sh --bootstrap-server kafka:9092 \
--election-type unclean \
--topic my-topic \
--partition 0
Increasing Partitions¶
# Increase partition count (cannot decrease)
kafka-topics.sh --bootstrap-server kafka:9092 \
--alter \
--topic my-topic \
--partitions 24
Key-Based Ordering
Increasing partitions changes key-to-partition mapping. Existing keys may route to different partitions.
Topic Management¶
Create Topic¶
kafka-topics.sh --bootstrap-server kafka:9092 \
--create \
--topic new-topic \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config cleanup.policy=delete
Delete Topic¶
# Delete topic (data is permanently removed)
kafka-topics.sh --bootstrap-server kafka:9092 \
--delete \
--topic old-topic
Modify Configuration¶
# Add/update config
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics \
--entity-name my-topic \
--alter \
--add-config retention.ms=86400000
# Remove config (revert to default)
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics \
--entity-name my-topic \
--alter \
--delete-config retention.ms
Health Verification¶
Post-Operation Checks¶
#!/bin/bash
# verify-cluster-health.sh
BOOTSTRAP="kafka:9092"
echo "=== Cluster Health Check ==="
# Check broker connectivity
echo "Checking broker connectivity..."
kafka-broker-api-versions.sh --bootstrap-server $BOOTSTRAP > /dev/null 2>&1
if [ $? -eq 0 ]; then
echo "✓ Broker connectivity OK"
else
echo "✗ Broker connectivity FAILED"
exit 1
fi
# Check offline partitions
OFFLINE=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP \
--describe --unavailable-partitions 2>/dev/null | grep -c "Topic:")
echo "Offline partitions: $OFFLINE"
if [ "$OFFLINE" -gt 0 ]; then
echo "✗ CRITICAL: Offline partitions detected"
exit 2
fi
# Check under-replicated partitions
URP=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP \
--describe --under-replicated-partitions 2>/dev/null | grep -c "Topic:")
echo "Under-replicated partitions: $URP"
if [ "$URP" -gt 0 ]; then
echo "⚠ WARNING: Under-replicated partitions"
fi
echo "=== Health Check Complete ==="
Related Documentation¶
- Operations Overview - Operations guide
- Monitoring - Metrics and alerting
- Backup and Restore - DR procedures
- Maintenance - Routine maintenance