Skip to content

Kafka Cluster Management

Operational procedures for managing Apache Kafka clusters.


Cluster Operations Overview

uml diagram


Adding Brokers

Pre-Addition Checklist

  • [ ] New broker has same Kafka version
  • [ ] Network connectivity to all existing brokers
  • [ ] Sufficient disk space
  • [ ] Proper configuration (broker.id, listeners, etc.)

Addition Process

  1. Configure the new broker
# server.properties
broker.id=4  # Unique ID
node.id=4    # KRaft mode

# Controller connection
controller.quorum.voters=1@ctrl1:9093,2@ctrl2:9093,3@ctrl3:9093

# Listeners
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://broker4:9092

# Log directories
log.dirs=/var/kafka-logs

# Rack (if applicable)
broker.rack=rack-2
  1. Start the broker
# Format storage (KRaft mode, first time only)
kafka-storage.sh format -t <cluster-id> -c /etc/kafka/server.properties

# Start broker
kafka-server-start.sh /etc/kafka/server.properties
  1. Verify registration
# Check broker is registered
kafka-broker-api-versions.sh --bootstrap-server broker4:9092

# Check cluster membership
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
  --command "brokers"
  1. Reassign partitions to new broker
# Generate reassignment plan including new broker
cat > topics.json << 'EOF'
{
  "topics": [{"topic": "topic1"}, {"topic": "topic2"}],
  "version": 1
}
EOF

kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --topics-to-move-json-file topics.json \
  --broker-list "1,2,3,4" \
  --generate

# Execute reassignment
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --throttle 100000000 \
  --execute

Removing Brokers

Pre-Removal Checklist

  • [ ] All partitions have replicas on other brokers
  • [ ] Reassignment completed successfully
  • [ ] No under-replicated partitions

Decommission Process

  1. Move all partitions off the broker
# Generate plan without the broker to remove (broker 4)
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --topics-to-move-json-file topics.json \
  --broker-list "1,2,3" \
  --generate

# Execute
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --throttle 100000000 \
  --execute

# Verify completion
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --verify
  1. Verify no partitions remain
# Check broker has no partitions
kafka-log-dirs.sh --bootstrap-server kafka:9092 \
  --describe --broker-list 4
  1. Graceful shutdown
# Stop broker gracefully (controlled shutdown)
kafka-server-stop.sh

# Or
kill -TERM <kafka-pid>
  1. Verify removal
# Check broker is no longer registered
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
  --command "brokers"

Rolling Upgrades

Upgrade Process

uml diagram

Step-by-Step

  1. Prepare for upgrade
# Before upgrade, set in server.properties
inter.broker.protocol.version=3.6
log.message.format.version=3.6
  1. Upgrade each broker
#!/bin/bash
# upgrade-broker.sh

BROKER=$1

echo "Upgrading $BROKER..."

# Stop broker
ssh $BROKER "sudo systemctl stop kafka"

# Install new version
ssh $BROKER "sudo yum install kafka-3.7.0"  # Or appropriate command

# Start broker
ssh $BROKER "sudo systemctl start kafka"

# Wait for recovery
sleep 30

# Verify health
kafka-broker-api-versions.sh --bootstrap-server $BROKER:9092

# Check under-replicated partitions
kafka-topics.sh --bootstrap-server $BROKER:9092 \
  --describe --under-replicated-partitions

echo "Waiting for ISR to recover..."
while true; do
  URP=$(kafka-topics.sh --bootstrap-server $BROKER:9092 \
    --describe --under-replicated-partitions 2>/dev/null | wc -l)
  if [ "$URP" -eq 0 ]; then
    echo "ISR recovered"
    break
  fi
  sleep 10
done
  1. After all brokers upgraded
# Update protocol version
inter.broker.protocol.version=3.7
log.message.format.version=3.7
  1. Rolling restart to apply
for broker in broker1 broker2 broker3; do
  ./upgrade-broker.sh $broker
done

Partition Reassignment

Generate Reassignment Plan

# Create topics file
cat > topics.json << 'EOF'
{
  "topics": [
    {"topic": "orders"},
    {"topic": "events"}
  ],
  "version": 1
}
EOF

# Generate plan
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --topics-to-move-json-file topics.json \
  --broker-list "1,2,3,4" \
  --generate

Custom Reassignment

{
  "version": 1,
  "partitions": [
    {"topic": "orders", "partition": 0, "replicas": [1,2,3]},
    {"topic": "orders", "partition": 1, "replicas": [2,3,4]},
    {"topic": "orders", "partition": 2, "replicas": [3,4,1]}
  ]
}

Execute with Throttling

# Start reassignment with throttle (100 MB/s)
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --throttle 100000000 \
  --execute

# Monitor progress
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --verify

# Adjust throttle if needed
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --throttle 200000000 \
  --execute

# Remove throttle after completion
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
  --reassignment-json-file reassignment.json \
  --verify

Leader Election

Preferred Leader Election

# All partitions
kafka-leader-election.sh --bootstrap-server kafka:9092 \
  --election-type preferred \
  --all-topic-partitions

# Specific topic
kafka-leader-election.sh --bootstrap-server kafka:9092 \
  --election-type preferred \
  --topic my-topic

# Specific partition
kafka-leader-election.sh --bootstrap-server kafka:9092 \
  --election-type preferred \
  --topic my-topic \
  --partition 0

Unclean Leader Election

Data Loss Risk

Unclean leader election allows out-of-sync replicas to become leader, potentially causing data loss.

kafka-leader-election.sh --bootstrap-server kafka:9092 \
  --election-type unclean \
  --topic my-topic \
  --partition 0

Increasing Partitions

# Increase partition count (cannot decrease)
kafka-topics.sh --bootstrap-server kafka:9092 \
  --alter \
  --topic my-topic \
  --partitions 24

Key-Based Ordering

Increasing partitions changes key-to-partition mapping. Existing keys may route to different partitions.


Topic Management

Create Topic

kafka-topics.sh --bootstrap-server kafka:9092 \
  --create \
  --topic new-topic \
  --partitions 12 \
  --replication-factor 3 \
  --config retention.ms=604800000 \
  --config cleanup.policy=delete

Delete Topic

# Delete topic (data is permanently removed)
kafka-topics.sh --bootstrap-server kafka:9092 \
  --delete \
  --topic old-topic

Modify Configuration

# Add/update config
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name my-topic \
  --alter \
  --add-config retention.ms=86400000

# Remove config (revert to default)
kafka-configs.sh --bootstrap-server kafka:9092 \
  --entity-type topics \
  --entity-name my-topic \
  --alter \
  --delete-config retention.ms

Health Verification

Post-Operation Checks

#!/bin/bash
# verify-cluster-health.sh

BOOTSTRAP="kafka:9092"

echo "=== Cluster Health Check ==="

# Check broker connectivity
echo "Checking broker connectivity..."
kafka-broker-api-versions.sh --bootstrap-server $BOOTSTRAP > /dev/null 2>&1
if [ $? -eq 0 ]; then
  echo "✓ Broker connectivity OK"
else
  echo "✗ Broker connectivity FAILED"
  exit 1
fi

# Check offline partitions
OFFLINE=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP \
  --describe --unavailable-partitions 2>/dev/null | grep -c "Topic:")
echo "Offline partitions: $OFFLINE"
if [ "$OFFLINE" -gt 0 ]; then
  echo "✗ CRITICAL: Offline partitions detected"
  exit 2
fi

# Check under-replicated partitions
URP=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP \
  --describe --under-replicated-partitions 2>/dev/null | grep -c "Topic:")
echo "Under-replicated partitions: $URP"
if [ "$URP" -gt 0 ]; then
  echo "⚠ WARNING: Under-replicated partitions"
fi

echo "=== Health Check Complete ==="