Kafka Python Client¶
The confluent-kafka-python library provides a high-performance Python client built on librdkafka. This guide covers installation, configuration, and usage patterns.
Client Information¶
| Library | confluent-kafka |
| Repository | github.com/confluentinc/confluent-kafka-python |
| Documentation | docs.confluent.io/platform/current/clients/confluent-kafka-python |
| Package | PyPI |
| Current Version | 2.12.x (as of 2025) |
| Maintainer | Confluent |
| License | Apache License 2.0 |
| Base | librdkafka (C library) |
History¶
The confluent-kafka-python library was first released by Confluent in 2016 as a Python wrapper around librdkafka using C extensions. It was created to provide significantly higher performance than pure-Python alternatives like kafka-python. The library provides a Pythonic API while delegating the heavy lifting to the battle-tested librdkafka implementation. In 2019, Confluent added Schema Registry integration with support for Avro, Protobuf, and JSON Schema serializers. The library remains the recommended Python client for production Kafka deployments.
Alternative: kafka-python (Pure Python)¶
For environments where native compilation is problematic (serverless, some containers), the pure-Python kafka-python library is a popular alternative:
| Library | kafka-python |
| Repository | github.com/dpkp/kafka-python |
| Documentation | kafka-python.readthedocs.io |
| Package | PyPI |
| Current Version | 2.2.x (as of 2025) |
| Maintainer | Dana Powers (dpkp) |
| License | Apache License 2.0 |
| Base | Pure Python (no native dependencies) |
kafka-python is designed to mirror the official Java client API while incorporating Pythonic features like consumer iterators. It supports consumer groups, transactions, all compression types (gzip, LZ4, Snappy, Zstandard), and message headers. With 5.9k GitHub stars and use by 35,000+ projects, it's the most popular pure-Python option.
Client Comparison¶
| Feature | confluent-kafka | kafka-python |
|---|---|---|
| Implementation | librdkafka (C) | Pure Python |
| Performance | Highest | Lower |
| Installation | Requires build tools | pip install only |
| Serverless | Difficult | Easy |
| Transactions | ✅ | ✅ |
| Compression | All types | All types |
| Consumer groups | ✅ | ✅ |
Version Compatibility¶
| Client Version | librdkafka | Minimum Kafka Broker |
|---|---|---|
| 2.12.x | 2.12.x | 0.8.0+ |
| 2.10.x | 2.10.x | 0.8.0+ |
| 2.6.x | 2.6.x | 0.8.0+ |
External Resources¶
Installation¶
pip¶
pip install confluent-kafka
With Avro Support¶
pip install confluent-kafka[avro]
With Schema Registry¶
pip install confluent-kafka[schemaregistry]
Poetry¶
poetry add confluent-kafka
Producer¶
Basic Producer¶
from confluent_kafka import Producer
config = {
'bootstrap.servers': 'kafka:9092',
'client.id': 'order-service'
}
producer = Producer(config)
def delivery_callback(err, msg):
if err:
print(f'Delivery failed: {err}')
else:
print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')
# Asynchronous send
producer.produce(
topic='orders',
key='order-123',
value='{"id": 123, "amount": 99.99}',
callback=delivery_callback
)
# Trigger delivery reports
producer.poll(0)
# Wait for all messages to be delivered
producer.flush()
Production Configuration¶
config = {
# Connection
'bootstrap.servers': 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
'client.id': 'order-service-producer',
# Durability
'acks': 'all',
'enable.idempotence': True,
# Retries
'retries': 2147483647,
'delivery.timeout.ms': 120000,
'max.in.flight.requests.per.connection': 5,
# Batching
'batch.size': 65536,
'linger.ms': 10,
'queue.buffering.max.messages': 100000,
'queue.buffering.max.kbytes': 1048576,
# Compression
'compression.type': 'lz4',
# Error handling
'error_cb': error_callback,
}
producer = Producer(config)
Send with Headers¶
producer.produce(
topic='orders',
key='order-123',
value='{"id": 123}',
headers=[
('correlation-id', b'abc-123'),
('source', b'order-service'),
],
callback=delivery_callback
)
Synchronous Send¶
from confluent_kafka import KafkaException
def sync_produce(producer, topic, key, value):
"""Synchronous produce with error handling."""
result = {'delivered': False, 'error': None, 'metadata': None}
def callback(err, msg):
if err:
result['error'] = err
else:
result['delivered'] = True
result['metadata'] = {
'topic': msg.topic(),
'partition': msg.partition(),
'offset': msg.offset()
}
producer.produce(topic, key=key, value=value, callback=callback)
producer.flush()
if result['error']:
raise KafkaException(result['error'])
return result['metadata']
Context Manager Pattern¶
from contextlib import contextmanager
from confluent_kafka import Producer
@contextmanager
def kafka_producer(config):
producer = Producer(config)
try:
yield producer
finally:
producer.flush()
# Usage
with kafka_producer(config) as producer:
producer.produce('orders', key='123', value='data')
Consumer¶
Basic Consumer¶
from confluent_kafka import Consumer, KafkaError
config = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest',
'enable.auto.commit': False,
}
consumer = Consumer(config)
consumer.subscribe(['orders'])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
print(f'End of partition {msg.partition()}')
else:
raise KafkaException(msg.error())
else:
print(f'Received: key={msg.key()} value={msg.value()}')
process_message(msg)
consumer.commit(asynchronous=False)
finally:
consumer.close()
Production Configuration¶
config = {
# Connection
'bootstrap.servers': 'kafka-1:9092,kafka-2:9092,kafka-3:9092',
'group.id': 'order-processors',
'client.id': 'order-processor-1',
# Offset management
'enable.auto.commit': False,
'auto.offset.reset': 'earliest',
# Session management
'session.timeout.ms': 45000,
'heartbeat.interval.ms': 15000,
'max.poll.interval.ms': 300000,
# Fetch configuration
'fetch.min.bytes': 1,
'fetch.max.bytes': 52428800,
'max.partition.fetch.bytes': 1048576,
# Assignment strategy
'partition.assignment.strategy': 'cooperative-sticky',
# Callbacks
'error_cb': error_callback,
'stats_cb': stats_callback,
'statistics.interval.ms': 60000,
}
consumer = Consumer(config)
Batch Processing¶
def consume_batch(consumer, batch_size=100, timeout=1.0):
"""Consume messages in batches."""
messages = consumer.consume(num_messages=batch_size, timeout=timeout)
valid_messages = []
for msg in messages:
if msg is None:
continue
if msg.error():
handle_error(msg.error())
else:
valid_messages.append(msg)
return valid_messages
# Usage
while running:
batch = consume_batch(consumer, batch_size=500)
if batch:
process_batch(batch)
consumer.commit(asynchronous=False)
Rebalance Callback¶
def on_assign(consumer, partitions):
print(f'Assigned: {partitions}')
# Initialize state for partitions
def on_revoke(consumer, partitions):
print(f'Revoked: {partitions}')
# Commit offsets, cleanup state
consumer.commit(asynchronous=False)
def on_lost(consumer, partitions):
print(f'Lost: {partitions}')
# Handle unexpected partition loss
consumer.subscribe(
['orders'],
on_assign=on_assign,
on_revoke=on_revoke,
on_lost=on_lost
)
Manual Partition Assignment¶
from confluent_kafka import TopicPartition
# Assign specific partitions (no consumer group)
consumer.assign([
TopicPartition('orders', 0),
TopicPartition('orders', 1),
])
# Seek to specific offset
consumer.seek(TopicPartition('orders', 0, 1000))
# Seek to beginning
consumer.seek(TopicPartition('orders', 0, OFFSET_BEGINNING))
Graceful Shutdown¶
import signal
import sys
running = True
def shutdown_handler(signum, frame):
global running
running = False
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
consumer = Consumer(config)
consumer.subscribe(['orders'])
try:
while running:
msg = consumer.poll(timeout=1.0)
if msg and not msg.error():
process_message(msg)
consumer.commit()
except Exception as e:
print(f'Error: {e}')
finally:
print('Closing consumer...')
consumer.close()
sys.exit(0)
Admin Client¶
Create Topics¶
from confluent_kafka.admin import AdminClient, NewTopic
admin = AdminClient({'bootstrap.servers': 'kafka:9092'})
topics = [
NewTopic(
topic='orders',
num_partitions=6,
replication_factor=3,
config={
'retention.ms': '604800000',
'cleanup.policy': 'delete'
}
)
]
futures = admin.create_topics(topics)
for topic, future in futures.items():
try:
future.result()
print(f'Topic {topic} created')
except Exception as e:
print(f'Failed to create topic {topic}: {e}')
Describe Topics¶
from confluent_kafka.admin import AdminClient
admin = AdminClient({'bootstrap.servers': 'kafka:9092'})
topics = admin.list_topics(timeout=10)
for topic in topics.topics.values():
print(f'Topic: {topic.topic}')
for partition in topic.partitions.values():
print(f' Partition {partition.id}: leader={partition.leader}')
List Consumer Groups¶
from confluent_kafka.admin import AdminClient
admin = AdminClient({'bootstrap.servers': 'kafka:9092'})
groups = admin.list_consumer_groups()
result = groups.result()
for group in result.valid:
print(f'Group: {group.group_id} ({group.group_type})')
Describe Consumer Group¶
from confluent_kafka.admin import AdminClient
admin = AdminClient({'bootstrap.servers': 'kafka:9092'})
group_metadata = admin.describe_consumer_groups(['order-processors'])
for group_id, future in group_metadata.items():
try:
group = future.result()
print(f'Group: {group.group_id}')
print(f'State: {group.state}')
for member in group.members:
print(f' Member: {member.member_id}')
print(f' Client: {member.client_id}')
except Exception as e:
print(f'Error: {e}')
Schema Registry Integration¶
Avro Producer¶
from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
schema_registry_conf = {'url': 'http://schema-registry:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
value_schema_str = """
{
"type": "record",
"name": "Order",
"fields": [
{"name": "id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "customer_id", "type": "string"}
]
}
"""
avro_serializer = AvroSerializer(
schema_registry_client,
value_schema_str,
lambda obj, ctx: obj # to_dict function
)
producer_conf = {
'bootstrap.servers': 'kafka:9092',
'key.serializer': StringSerializer('utf_8'),
'value.serializer': avro_serializer
}
producer = SerializingProducer(producer_conf)
order = {
'id': 'order-123',
'amount': 99.99,
'customer_id': 'cust-456'
}
producer.produce(topic='orders', key='order-123', value=order)
producer.flush()
Avro Consumer¶
from confluent_kafka import DeserializingConsumer
from confluent_kafka.serialization import StringDeserializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer
schema_registry_conf = {'url': 'http://schema-registry:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
avro_deserializer = AvroDeserializer(
schema_registry_client,
lambda obj, ctx: obj # from_dict function
)
consumer_conf = {
'bootstrap.servers': 'kafka:9092',
'group.id': 'order-processors',
'key.deserializer': StringDeserializer('utf_8'),
'value.deserializer': avro_deserializer,
'auto.offset.reset': 'earliest'
}
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe(['orders'])
while True:
msg = consumer.poll(1.0)
if msg and not msg.error():
order = msg.value()
print(f"Order: {order['id']} - ${order['amount']}")
Error Handling¶
Error Callback¶
from confluent_kafka import KafkaError
def error_callback(err):
"""Handle Kafka errors."""
if err.code() == KafkaError._ALL_BROKERS_DOWN:
print('All brokers are down')
elif err.code() == KafkaError._AUTHENTICATION:
print('Authentication failed')
elif err.code() == KafkaError.TOPIC_AUTHORIZATION_FAILED:
print('Topic authorization failed')
else:
print(f'Error: {err}')
config = {
'bootstrap.servers': 'kafka:9092',
'error_cb': error_callback
}
Delivery Error Handling¶
def delivery_callback(err, msg):
if err is None:
return
# Check if retriable
if err.retriable():
print(f'Retriable error: {err}')
# Producer will retry automatically
elif err.code() == KafkaError.MSG_SIZE_TOO_LARGE:
print('Message too large, sending to error topic')
send_to_error_topic(msg)
elif err.code() == KafkaError.TOPIC_AUTHORIZATION_FAILED:
print('Authorization failed')
raise Exception(f'Authorization error: {err}')
else:
print(f'Fatal error: {err}')
raise Exception(f'Delivery failed: {err}')
Consumer Error Handling¶
from confluent_kafka import KafkaError, KafkaException
while running:
try:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition - not an error
continue
elif msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
print('Topic does not exist')
continue
else:
raise KafkaException(msg.error())
# Process message
try:
process_message(msg)
except ProcessingError as e:
send_to_dead_letter_queue(msg, e)
consumer.commit()
except KafkaException as e:
print(f'Kafka error: {e}')
if not e.args[0].retriable():
raise
Async/Await Pattern¶
With asyncio¶
import asyncio
from confluent_kafka import Producer, Consumer
class AsyncKafkaProducer:
def __init__(self, config):
self.producer = Producer(config)
self.loop = asyncio.get_event_loop()
async def produce(self, topic, key, value):
future = self.loop.create_future()
def callback(err, msg):
if err:
self.loop.call_soon_threadsafe(future.set_exception, KafkaException(err))
else:
self.loop.call_soon_threadsafe(future.set_result, msg)
self.producer.produce(topic, key=key, value=value, callback=callback)
self.producer.poll(0)
return await future
async def flush(self):
self.producer.flush()
# Usage
async def main():
producer = AsyncKafkaProducer(config)
await producer.produce('orders', 'key', 'value')
await producer.flush()
asyncio.run(main())
Testing¶
MockProducer Pattern¶
class MockProducer:
def __init__(self):
self.messages = []
def produce(self, topic, key=None, value=None, callback=None, headers=None):
msg = {'topic': topic, 'key': key, 'value': value, 'headers': headers}
self.messages.append(msg)
if callback:
callback(None, MockMessage(topic, 0, 0, key, value))
def flush(self):
pass
def poll(self, timeout):
pass
# Usage in tests
def test_order_producer():
mock = MockProducer()
service = OrderService(producer=mock)
service.create_order({'id': '123'})
assert len(mock.messages) == 1
assert mock.messages[0]['topic'] == 'orders'
Integration Testing¶
import pytest
from testcontainers.kafka import KafkaContainer
@pytest.fixture(scope='module')
def kafka_container():
with KafkaContainer() as kafka:
yield kafka
def test_produce_consume(kafka_container):
config = {'bootstrap.servers': kafka_container.get_bootstrap_server()}
# Produce
producer = Producer(config)
producer.produce('test', key='key', value='value')
producer.flush()
# Consume
consumer_config = {**config, 'group.id': 'test', 'auto.offset.reset': 'earliest'}
consumer = Consumer(consumer_config)
consumer.subscribe(['test'])
msg = consumer.poll(10.0)
assert msg is not None
assert msg.value() == b'value'
consumer.close()
Related Documentation¶
- Producer Development - Producer patterns
- Consumer Development - Consumer patterns
- Schema Registry - Schema management