Kafka CLI Tools¶
Command-line tools for Apache Kafka administration and operations.
Tool Overview¶
| Tool | Purpose |
|---|---|
kafka-topics.sh |
Topic management |
kafka-consumer-groups.sh |
Consumer group management |
kafka-configs.sh |
Configuration management |
kafka-acls.sh |
ACL management |
kafka-reassign-partitions.sh |
Partition reassignment |
kafka-leader-election.sh |
Leader election |
kafka-metadata.sh |
KRaft metadata inspection |
kafka-dump-log.sh |
Log segment inspection |
kafka-topics.sh¶
List Topics¶
# List all topics
kafka-topics.sh --bootstrap-server kafka:9092 --list
# List with details
kafka-topics.sh --bootstrap-server kafka:9092 --describe
Create Topic¶
kafka-topics.sh --bootstrap-server kafka:9092 \
--create \
--topic my-topic \
--partitions 12 \
--replication-factor 3
# With configuration
kafka-topics.sh --bootstrap-server kafka:9092 \
--create \
--topic my-topic \
--partitions 12 \
--replication-factor 3 \
--config retention.ms=604800000 \
--config cleanup.policy=compact
Describe Topic¶
# Single topic
kafka-topics.sh --bootstrap-server kafka:9092 \
--describe --topic my-topic
# Show under-replicated partitions
kafka-topics.sh --bootstrap-server kafka:9092 \
--describe --under-replicated-partitions
# Show unavailable partitions
kafka-topics.sh --bootstrap-server kafka:9092 \
--describe --unavailable-partitions
Alter Topic¶
# Increase partitions (cannot decrease)
kafka-topics.sh --bootstrap-server kafka:9092 \
--alter --topic my-topic --partitions 24
Delete Topic¶
kafka-topics.sh --bootstrap-server kafka:9092 \
--delete --topic my-topic
kafka-consumer-groups.sh¶
List Consumer Groups¶
kafka-consumer-groups.sh --bootstrap-server kafka:9092 --list
# With state
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--list --state
Describe Consumer Group¶
# Show members and offsets
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group my-group
# Show only members
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group my-group --members
# Show verbose member details
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group my-group --members --verbose
# Show state
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--describe --group my-group --state
Reset Offsets¶
# Reset to earliest (dry run)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group my-group \
--reset-offsets \
--to-earliest \
--topic my-topic \
--dry-run
# Reset to earliest (execute)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group my-group \
--reset-offsets \
--to-earliest \
--topic my-topic \
--execute
# Reset to specific offset
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group my-group \
--reset-offsets \
--to-offset 1000 \
--topic my-topic:0 \
--execute
# Reset to timestamp
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group my-group \
--reset-offsets \
--to-datetime "2024-01-15T10:00:00.000" \
--all-topics \
--execute
# Reset by shifting
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--group my-group \
--reset-offsets \
--shift-by -100 \
--topic my-topic \
--execute
Delete Consumer Group¶
# Group must be empty (no active members)
kafka-consumer-groups.sh --bootstrap-server kafka:9092 \
--delete --group my-group
kafka-configs.sh¶
Describe Configuration¶
# Broker config
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type brokers --entity-name 1 --describe
# All brokers
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type brokers --describe --all
# Topic config
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics --entity-name my-topic --describe
# Client quotas
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type users --entity-name my-user --describe
Alter Configuration¶
# Add/update broker config
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type brokers --entity-name 1 \
--alter --add-config log.cleaner.threads=4
# Add/update topic config
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics --entity-name my-topic \
--alter --add-config retention.ms=86400000
# Delete config (revert to default)
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type topics --entity-name my-topic \
--alter --delete-config retention.ms
Client Quotas¶
# Set producer quota
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type users --entity-name my-user \
--alter --add-config producer_byte_rate=1048576
# Set consumer quota
kafka-configs.sh --bootstrap-server kafka:9092 \
--entity-type users --entity-name my-user \
--alter --add-config consumer_byte_rate=2097152
kafka-acls.sh¶
List ACLs¶
# All ACLs
kafka-acls.sh --bootstrap-server kafka:9092 --list
# For specific topic
kafka-acls.sh --bootstrap-server kafka:9092 \
--list --topic my-topic
# For specific principal
kafka-acls.sh --bootstrap-server kafka:9092 \
--list --principal User:my-user
Add ACLs¶
# Producer access
kafka-acls.sh --bootstrap-server kafka:9092 \
--add \
--allow-principal User:producer-app \
--operation Write \
--operation Describe \
--topic my-topic
# Consumer access
kafka-acls.sh --bootstrap-server kafka:9092 \
--add \
--allow-principal User:consumer-app \
--operation Read \
--operation Describe \
--topic my-topic \
--group my-group
# Wildcard topic access
kafka-acls.sh --bootstrap-server kafka:9092 \
--add \
--allow-principal User:admin \
--operation All \
--topic '*'
Remove ACLs¶
kafka-acls.sh --bootstrap-server kafka:9092 \
--remove \
--allow-principal User:producer-app \
--operation Write \
--topic my-topic
kafka-reassign-partitions.sh¶
Generate Reassignment Plan¶
# Create topics JSON file
cat > topics.json << 'EOF'
{
"topics": [
{"topic": "my-topic"}
],
"version": 1
}
EOF
# Generate plan
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--topics-to-move-json-file topics.json \
--broker-list "1,2,3,4" \
--generate
Execute Reassignment¶
# Save generated plan to file, then execute
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--reassignment-json-file reassignment.json \
--execute
# With throttle
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--reassignment-json-file reassignment.json \
--throttle 50000000 \
--execute
Verify Reassignment¶
kafka-reassign-partitions.sh --bootstrap-server kafka:9092 \
--reassignment-json-file reassignment.json \
--verify
kafka-leader-election.sh¶
# Preferred leader election for all partitions
kafka-leader-election.sh --bootstrap-server kafka:9092 \
--election-type preferred \
--all-topic-partitions
# For specific topic
kafka-leader-election.sh --bootstrap-server kafka:9092 \
--election-type preferred \
--topic my-topic
# Unclean election (data loss risk)
kafka-leader-election.sh --bootstrap-server kafka:9092 \
--election-type unclean \
--topic my-topic \
--partition 0
kafka-metadata.sh (KRaft)¶
# Describe cluster
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
--command "describe"
# List brokers
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
--command "brokers"
# Show topic details
kafka-metadata.sh --snapshot /var/kafka-logs/__cluster_metadata-0/*.log \
--command "topic" --topic-name my-topic
kafka-dump-log.sh¶
# Dump log segment
kafka-dump-log.sh --files /var/kafka-logs/my-topic-0/00000000000000000000.log \
--print-data-log
# Dump index
kafka-dump-log.sh --files /var/kafka-logs/my-topic-0/00000000000000000000.index
# Verify indexes
kafka-dump-log.sh --files /var/kafka-logs/my-topic-0/00000000000000000000.log \
--index-sanity-check
Console Producer/Consumer¶
Console Producer¶
kafka-console-producer.sh --bootstrap-server kafka:9092 \
--topic my-topic
# With key
kafka-console-producer.sh --bootstrap-server kafka:9092 \
--topic my-topic \
--property "parse.key=true" \
--property "key.separator=:"
Console Consumer¶
# From beginning
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
--topic my-topic \
--from-beginning
# With keys
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
--topic my-topic \
--property print.key=true \
--property print.timestamp=true
# Specific partition and offset
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
--topic my-topic \
--partition 0 \
--offset 100
# Max messages
kafka-console-consumer.sh --bootstrap-server kafka:9092 \
--topic my-topic \
--max-messages 10
Related Documentation¶
- Operations - Operations guide
- Configuration - Configuration reference
- Monitoring - Monitoring guide