Kafka Diagnosis
Diagnostic procedures for identifying and isolating Apache Kafka issues.
Diagnostic Workflow

Connectivity Checks
Basic Connectivity
# Test TCP connectivity
nc -zv kafka-host 9092
# Test with timeout
timeout 5 bash -c 'cat < /dev/null > /dev/tcp/kafka-host/9092' && echo "OK" || echo "FAILED"
# DNS resolution
dig kafka-host
nslookup kafka-host
Broker API Versions
# List supported API versions (verifies connectivity and protocol)
kafka-broker-api-versions.sh --bootstrap-server kafka:9092
# Output shows:
# ApiVersion(apiKey=0, minVersion=0, maxVersion=12) -- Produce
# ApiVersion(apiKey=1, minVersion=0, maxVersion=13) -- Fetch
# ...
SSL/TLS Connectivity
# Test SSL handshake
openssl s_client -connect kafka:9093 -CAfile ca-cert.pem
# Verify certificate
openssl s_client -connect kafka:9093 -showcerts
# Check certificate expiry
openssl s_client -connect kafka:9093 2>/dev/null | \
openssl x509 -noout -dates
SASL Authentication
# Test with SASL/PLAIN
kafka-broker-api-versions.sh --bootstrap-server kafka:9093 \
--command-config client.properties
# client.properties:
# security.protocol=SASL_SSL
# sasl.mechanism=PLAIN
# sasl.jaas.config=...
Cluster State Verification
Controller Status
# KRaft mode - check controller quorum
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
--command "quorum"
# Check active controller
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
--command "describe" | grep -i controller
Broker Status
# List all brokers
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
--command "brokers"
# Check each broker
for broker in kafka1 kafka2 kafka3; do
echo -n "$broker: "
timeout 5 kafka-broker-api-versions.sh --bootstrap-server $broker:9092 \
> /dev/null 2>&1 && echo "OK" || echo "FAILED"
done
Partition Status
# Under-replicated partitions
kafka-topics.sh --bootstrap-server kafka:9092 \
--describe --under-replicated-partitions
# Unavailable partitions
kafka-topics.sh --bootstrap-server kafka:9092 \
--describe --unavailable-partitions
# Under min-ISR partitions
kafka-topics.sh --bootstrap-server kafka:9092 \
--describe --under-min-isr-partitions
# Describe specific topic
kafka-topics.sh --bootstrap-server kafka:9092 \
--describe --topic my-topic
ISR Verification
# Check ISR for all partitions
kafka-topics.sh --bootstrap-server kafka:9092 --describe | \
grep -E "Topic:|Leader:|Isr:"
# Monitor ISR changes
watch -n 5 'kafka-topics.sh --bootstrap-server kafka:9092 \
--describe --under-replicated-partitions'
Data Flow Testing
Producer Test
# Send test messages
echo "test-message-$(date +%s)" | kafka-console-producer.sh \
--bootstrap-server kafka:9092 \
--topic test-topic
# Send with key
kafka-console-producer.sh --bootstrap-server kafka:9092 \
--topic test-topic \
--property "parse.key=true" \
--property "key.separator=:" << EOF
key1:value1
key2:value2
EOF
# Performance test producer
kafka-producer-perf-test.sh \
--topic test-topic \
--num-records 10000 \
--record-size 1024 \
--throughput -1 \
--producer-props bootstrap.servers=kafka:9092
Consumer Test
# Consume from beginning
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
--topic test-topic \
--from-beginning \
--max-messages 10
# Consume with timestamps
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
--topic test-topic \
--from-beginning \
--property print.timestamp=true \
--property print.key=true
# Performance test consumer
kafka-consumer-perf-test.sh \
--bootstrap-server kafka:9092 \
--topic test-topic \
--messages 10000 \
--threads 1
End-to-End Latency Test
# Measure end-to-end latency
kafka-run-class.sh kafka.tools.EndToEndLatency \
kafka:9092 \
test-topic \
10000 \
all \
1024
Replication Verification
# Verify replica lag
kafka-replica-verification.sh \
--broker-list kafka1:9092,kafka2:9092,kafka3:9092 \
--topic-white-list ".*"
# Check log end offsets
kafka-run-class.sh kafka.tools.GetOffsetShell \
--bootstrap-server kafka:9092 \
--topic my-topic
Consumer Group Diagnosis
Group State
# List all groups
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list
# Group details
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group my-group
# Group state
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group my-group --state
# Member details
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group my-group --members --verbose
Consumer Lag Analysis
# Lag per partition
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group my-group
# Sample output:
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# my-group my-topic 0 1000 1050 50
# my-group my-topic 1 2000 2000 0
Lag Interpretation
| Lag Behavior |
Meaning |
Action |
| Lag = 0 |
Consumer caught up |
Normal |
| Lag constant |
Keeping pace |
Normal |
| Lag growing |
Falling behind |
Scale consumers |
| Lag very large |
Significantly behind |
Investigate, reset offset |
JMX Metrics Collection
# Enable JMX
export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote \
-Dcom.sun.management.jmxremote.port=9999 \
-Dcom.sun.management.jmxremote.authenticate=false \
-Dcom.sun.management.jmxremote.ssl=false"
# Query metrics with JMX tool
kafka-run-class.sh kafka.tools.JmxTool \
--jmx-url service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi \
--object-name kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec
Key Metrics to Check
| Metric |
Description |
Alert Threshold |
UnderReplicatedPartitions |
Partitions with missing replicas |
> 0 |
OfflinePartitionsCount |
Partitions with no leader |
> 0 |
ActiveControllerCount |
Controllers in cluster |
≠ 1 |
RequestQueueSize |
Queued requests |
Growing |
TotalTimeMs |
Request latency |
P99 > 100ms |
System Resource Checks
# CPU usage
top -b -n 1 | head -20
# Memory usage
free -m
vmstat 1 5
# Disk I/O
iostat -x 1 5
# Network
netstat -s | grep -i error
ss -s
Disk Space
# Log directories
kafka-log-dirs.sh --bootstrap-server kafka:9092 \
--describe --broker-list 0,1,2
# Filesystem usage
df -h /var/kafka-logs
# Directory sizes
du -sh /var/kafka-logs/*
Log Segment Verification
Check Log Integrity
# Verify log segment
kafka-dump-log.sh \
--files /var/kafka-logs/my-topic-0/00000000000000000000.log \
--verify-index-only
# Deep verification
kafka-dump-log.sh \
--files /var/kafka-logs/my-topic-0/00000000000000000000.log \
--deep-iteration
# Index sanity check
kafka-dump-log.sh \
--files /var/kafka-logs/my-topic-0/00000000000000000000.log \
--index-sanity-check
View Log Contents
# Print log entries
kafka-dump-log.sh \
--files /var/kafka-logs/my-topic-0/00000000000000000000.log \
--print-data-log
# Print specific offset range
kafka-dump-log.sh \
--files /var/kafka-logs/my-topic-0/00000000000000000000.log \
--print-data-log \
--max-message-size 1000
Health Check Script
#!/bin/bash
# kafka-health-check.sh
BOOTSTRAP_SERVER=${1:-"localhost:9092"}
EXIT_CODE=0
echo "=========================================="
echo "Kafka Cluster Health Check"
echo "Timestamp: $(date)"
echo "Bootstrap: $BOOTSTRAP_SERVER"
echo "=========================================="
# Function to check and report
check() {
local name=$1
local result=$2
local expected=$3
if [ "$result" == "$expected" ]; then
echo "[OK] $name: $result"
else
echo "[FAIL] $name: $result (expected: $expected)"
EXIT_CODE=1
fi
}
# Broker connectivity
echo ""
echo "--- Connectivity ---"
if kafka-broker-api-versions.sh --bootstrap-server $BOOTSTRAP_SERVER > /dev/null 2>&1; then
echo "[OK] Broker connectivity"
else
echo "[FAIL] Broker connectivity"
EXIT_CODE=2
exit $EXIT_CODE
fi
# Offline partitions
echo ""
echo "--- Partition Health ---"
OFFLINE=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVER \
--describe --unavailable-partitions 2>/dev/null | grep -c "Topic:" || echo "0")
if [ "$OFFLINE" -eq 0 ]; then
echo "[OK] Offline partitions: 0"
else
echo "[CRIT] Offline partitions: $OFFLINE"
EXIT_CODE=2
fi
# Under-replicated partitions
UNDER_REP=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVER \
--describe --under-replicated-partitions 2>/dev/null | grep -c "Topic:" || echo "0")
if [ "$UNDER_REP" -eq 0 ]; then
echo "[OK] Under-replicated partitions: 0"
else
echo "[WARN] Under-replicated partitions: $UNDER_REP"
[ $EXIT_CODE -eq 0 ] && EXIT_CODE=1
fi
# Under min-ISR partitions
UNDER_ISR=$(kafka-topics.sh --bootstrap-server $BOOTSTRAP_SERVER \
--describe --under-min-isr-partitions 2>/dev/null | grep -c "Topic:" || echo "0")
if [ "$UNDER_ISR" -eq 0 ]; then
echo "[OK] Under-MinISR partitions: 0"
else
echo "[WARN] Under-MinISR partitions: $UNDER_ISR"
[ $EXIT_CODE -eq 0 ] && EXIT_CODE=1
fi
# Consumer groups
echo ""
echo "--- Consumer Groups ---"
EMPTY_GROUPS=$(kafka-consumer-groups.sh --bootstrap-server $BOOTSTRAP_SERVER \
--list --state | grep -c "Empty" || echo "0")
echo "[INFO] Empty consumer groups: $EMPTY_GROUPS"
echo ""
echo "=========================================="
echo "Health check completed with exit code: $EXIT_CODE"
echo "=========================================="
exit $EXIT_CODE