Skip to content

Kafka Backup and Restore

Strategies for backing up and restoring Kafka data and configurations.


Backup Strategies Overview

uml diagram

Strategy RPO RTO Use Case
MirrorMaker 2 Near-zero Minutes Cross-DC DR
Sink Connector Minutes Hours Archival, analytics
Filesystem backup Hours Hours Point-in-time recovery

MirrorMaker 2 Replication

Architecture

uml diagram

Configuration

# mm2.properties
clusters=source,target

source.bootstrap.servers=source-kafka:9092
target.bootstrap.servers=target-kafka:9092

# Replication flow
source->target.enabled=true
source->target.topics=.*

# Exclude internal topics
source->target.topics.exclude=.*[\-\.]internal,.*\.replica,__.*

# Preserve partitioning
replication.factor=3
refresh.topics.interval.seconds=30

# Sync consumer offsets
sync.group.offsets.enabled=true
sync.group.offsets.interval.seconds=60

# Heartbeats
emit.heartbeats.enabled=true
emit.heartbeats.interval.seconds=5

# Checkpoints for offset translation
emit.checkpoints.enabled=true
emit.checkpoints.interval.seconds=60

Starting MirrorMaker 2

# Start as distributed Connect cluster
connect-distributed.sh mm2.properties

# Or as standalone
connect-mirror-maker.sh mm2.properties

Sink Connector Backup

Backup to Object Storage

Use Kafka Connect S3 Sink Connector to archive data.

{
  "name": "s3-backup-sink",
  "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "tasks.max": "4",
    "topics": "orders,events,transactions",
    "s3.region": "us-east-1",
    "s3.bucket.name": "kafka-backup",
    "s3.part.size": "5242880",
    "flush.size": "10000",
    "rotate.interval.ms": "3600000",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
    "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
    "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH",
    "locale": "en-US",
    "timezone": "UTC",
    "timestamp.extractor": "RecordField",
    "timestamp.field": "timestamp"
  }
}

Directory Structure

s3://kafka-backup/
└── topics/
    └── orders/
        └── year=2024/
            └── month=01/
                └── day=15/
                    └── hour=10/
                        ├── orders+0+0000000000.avro
                        ├── orders+1+0000000000.avro
                        └── orders+2+0000000000.avro

Filesystem Backup

Log Segment Backup

#!/bin/bash
# backup-kafka-logs.sh

KAFKA_LOG_DIR="/var/kafka-logs"
BACKUP_DIR="/backup/kafka/$(date +%Y%m%d_%H%M%S)"
TOPICS="orders events transactions"

mkdir -p "$BACKUP_DIR"

for topic in $TOPICS; do
  echo "Backing up topic: $topic"

  # Find all partition directories
  for partition_dir in "$KAFKA_LOG_DIR"/${topic}-*; do
    if [ -d "$partition_dir" ]; then
      partition=$(basename "$partition_dir")
      mkdir -p "$BACKUP_DIR/$partition"

      # Copy closed segments only (not active)
      for segment in "$partition_dir"/*.log; do
        # Skip if this is the active segment
        segment_name=$(basename "$segment" .log)
        if [ -f "$partition_dir/$segment_name.log" ] && \
           [ ! -f "$partition_dir/$segment_name.log.deleted" ]; then
          cp "$segment" "$BACKUP_DIR/$partition/"
          cp "$partition_dir/$segment_name.index" "$BACKUP_DIR/$partition/" 2>/dev/null
          cp "$partition_dir/$segment_name.timeindex" "$BACKUP_DIR/$partition/" 2>/dev/null
        fi
      done
    fi
  done
done

# Create manifest
echo "Backup completed at $(date)" > "$BACKUP_DIR/manifest.txt"
kafka-topics.sh --bootstrap-server localhost:9092 --describe >> "$BACKUP_DIR/manifest.txt"

# Compress backup
tar -czf "${BACKUP_DIR}.tar.gz" -C "$(dirname $BACKUP_DIR)" "$(basename $BACKUP_DIR)"
rm -rf "$BACKUP_DIR"

echo "Backup saved to: ${BACKUP_DIR}.tar.gz"

Restore from Filesystem

#!/bin/bash
# restore-kafka-logs.sh

BACKUP_FILE=$1
KAFKA_LOG_DIR="/var/kafka-logs"
RESTORE_TEMP="/tmp/kafka-restore"

# Stop Kafka broker first
# systemctl stop kafka

# Extract backup
mkdir -p "$RESTORE_TEMP"
tar -xzf "$BACKUP_FILE" -C "$RESTORE_TEMP"

# Copy to Kafka log directory
for partition_dir in "$RESTORE_TEMP"/*-*; do
  partition=$(basename "$partition_dir")
  target_dir="$KAFKA_LOG_DIR/$partition"

  mkdir -p "$target_dir"
  cp "$partition_dir"/* "$target_dir/"

  # Fix ownership
  chown -R kafka:kafka "$target_dir"
done

# Start Kafka broker
# systemctl start kafka

rm -rf "$RESTORE_TEMP"

Configuration Backup

Backup Cluster Configuration

#!/bin/bash
# backup-kafka-config.sh

BOOTSTRAP_SERVER="localhost:9092"
BACKUP_DIR="/backup/kafka-config/$(date +%Y%m%d_%H%M%S)"

mkdir -p "$BACKUP_DIR"

# Backup topic configurations
echo "Backing up topic configurations..."
kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVER --describe > "$BACKUP_DIR/topics.txt"

for topic in $(kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVER --list); do
  kafka-configs.sh --bootstrap-server $BOOTSTRAP_SERVER \
    --entity-type topics --entity-name "$topic" --describe \
    >> "$BACKUP_DIR/topic-configs.txt"
done

# Backup broker configurations
echo "Backing up broker configurations..."
kafka-configs.sh --bootstrap-server $BOOTSTRAP_SERVER \
  --entity-type brokers --describe --all > "$BACKUP_DIR/broker-configs.txt"

# Backup ACLs
echo "Backing up ACLs..."
kafka-acls.sh --bootstrap-server $BOOTSTRAP_SERVER --list > "$BACKUP_DIR/acls.txt"

# Backup consumer groups
echo "Backing up consumer group offsets..."
for group in $(kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER --list); do
  kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER \
    --group "$group" --describe >> "$BACKUP_DIR/consumer-groups.txt"
done

# Backup SCRAM credentials (users only, not passwords)
echo "Backing up SCRAM users..."
kafka-configs.sh --bootstrap-server $BOOTSTRAP_SERVER \
  --entity-type users --describe > "$BACKUP_DIR/scram-users.txt"

echo "Configuration backup saved to: $BACKUP_DIR"

Restore Configuration

#!/bin/bash
# restore-kafka-config.sh

BOOTSTRAP_SERVER="localhost:9092"
BACKUP_DIR=$1

# Restore topic configurations
echo "Restoring topic configurations..."
while IFS= read -r line; do
  if [[ $line == *"Config for topic"* ]]; then
    topic=$(echo "$line" | grep -oP "topic '\K[^']+")
  elif [[ $line == *"="* ]] && [[ -n "$topic" ]]; then
    config=$(echo "$line" | tr -d ' ')
    kafka-configs.sh --bootstrap-server $BOOTSTRAP_SERVER \
      --entity-type topics --entity-name "$topic" \
      --alter --add-config "$config"
  fi
done < "$BACKUP_DIR/topic-configs.txt"

# Note: ACLs need to be restored manually with kafka-acls.sh
echo "ACLs must be restored manually. See: $BACKUP_DIR/acls.txt"

Consumer Offset Backup

Export Offsets

#!/bin/bash
# export-offsets.sh

BOOTSTRAP_SERVER="localhost:9092"
OUTPUT_FILE="consumer-offsets-$(date +%Y%m%d).json"

echo "[" > "$OUTPUT_FILE"

first=true
for group in $(kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER --list); do
  kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER \
    --group "$group" --describe 2>/dev/null | \
  tail -n +2 | while read -r line; do
    if [[ -n "$line" ]]; then
      topic=$(echo "$line" | awk '{print $2}')
      partition=$(echo "$line" | awk '{print $3}')
      offset=$(echo "$line" | awk '{print $4}')

      if [[ "$first" == "true" ]]; then
        first=false
      else
        echo "," >> "$OUTPUT_FILE"
      fi

      echo "  {\"group\": \"$group\", \"topic\": \"$topic\", \"partition\": $partition, \"offset\": $offset}" >> "$OUTPUT_FILE"
    fi
  done
done

echo "]" >> "$OUTPUT_FILE"

Restore Offsets

#!/bin/bash
# restore-offsets.sh

BOOTSTRAP_SERVER="localhost:9092"
GROUP=$1
TOPIC=$2
OFFSET_FILE=$3

# Group must have no active members
kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER \
  --group "$GROUP" \
  --reset-offsets \
  --from-file "$OFFSET_FILE" \
  --execute

Disaster Recovery Procedures

Recovery Checklist

  1. Assess damage
  2. Identify failed components
  3. Determine data loss extent

  4. Restore infrastructure

  5. Deploy replacement brokers
  6. Configure networking

  7. Restore data

  8. Apply latest backup
  9. Verify data integrity

  10. Restore configuration

  11. Topics and partitions
  12. ACLs and quotas

  13. Restore consumer offsets

  14. Reset to known good position

  15. Verify recovery

  16. Test producer connectivity
  17. Test consumer connectivity
  18. Verify data flow

Failover to DR Cluster

#!/bin/bash
# failover-to-dr.sh

DR_BOOTSTRAP="dr-kafka:9092"
PRIMARY_BOOTSTRAP="primary-kafka:9092"

echo "=== Kafka DR Failover ==="

# 1. Stop MirrorMaker
echo "Stopping MirrorMaker..."
# systemctl stop kafka-mirrormaker

# 2. Verify DR cluster health
echo "Checking DR cluster health..."
kafka-broker-api-versions.sh --bootstrap-server $DR_BOOTSTRAP

# 3. Check replication lag (should be minimal)
echo "Checking replication status..."
kafka-consumer-groups.sh --bootstrap-server $DR_BOOTSTRAP \
  --describe --all-groups

# 4. Update DNS/load balancer to point to DR
echo "Update DNS to point to DR cluster"

# 5. Verify applications can connect
echo "Failover complete. Verify application connectivity."