Skip to content

Kafka Python Driver

The confluent-kafka-python library provides a high-performance Python client based on librdkafka.


Installation

pip install confluent-kafka

With Avro Support

pip install confluent-kafka[avro]

With Schema Registry

pip install confluent-kafka[schemaregistry]

Producer

Basic Producer

from confluent_kafka import Producer

conf = {
    'bootstrap.servers': 'localhost:9092'
}

producer = Producer(conf)

def delivery_callback(err, msg):
    if err:
        print(f'Delivery failed: {err}')
    else:
        print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}')

producer.produce('events', key='key', value='value', callback=delivery_callback)
producer.flush()

Reliable Producer

from confluent_kafka import Producer

conf = {
    'bootstrap.servers': 'localhost:9092',
    'acks': 'all',
    'retries': 10,
    'retry.backoff.ms': 100,
    'enable.idempotence': True,
    'max.in.flight.requests.per.connection': 5,
    'linger.ms': 5,
    'batch.size': 16384,
    'compression.type': 'lz4'
}

producer = Producer(conf)

Async Producer with Polling

from confluent_kafka import Producer
import time

producer = Producer({'bootstrap.servers': 'localhost:9092'})

def delivery_callback(err, msg):
    if err:
        print(f'Error: {err}')

# Produce messages
for i in range(1000):
    producer.produce('events', value=f'message-{i}', callback=delivery_callback)

    # Periodically poll for callbacks
    producer.poll(0)

# Wait for all messages to be delivered
producer.flush()

Consumer

Basic Consumer

from confluent_kafka import Consumer, KafkaException

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(conf)
consumer.subscribe(['events'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue

        if msg.error():
            raise KafkaException(msg.error())

        print(f'Received: key={msg.key()} value={msg.value().decode("utf-8")}')
finally:
    consumer.close()

Manual Commit

from confluent_kafka import Consumer

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'enable.auto.commit': False,
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(conf)
consumer.subscribe(['events'])

try:
    while True:
        msg = consumer.poll(timeout=1.0)

        if msg is None:
            continue

        if msg.error():
            continue

        # Process message
        process(msg)

        # Commit after processing
        consumer.commit(asynchronous=False)
finally:
    consumer.close()

Batch Processing

from confluent_kafka import Consumer

conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'enable.auto.commit': False,
    'max.poll.interval.ms': 300000
}

consumer = Consumer(conf)
consumer.subscribe(['events'])

try:
    while True:
        messages = consumer.consume(num_messages=100, timeout=1.0)

        if not messages:
            continue

        batch = []
        for msg in messages:
            if msg.error():
                continue
            batch.append(msg.value())

        # Process batch
        process_batch(batch)

        # Commit after batch
        consumer.commit(asynchronous=False)
finally:
    consumer.close()

Admin Client

Create Topic

from confluent_kafka.admin import AdminClient, NewTopic

admin = AdminClient({'bootstrap.servers': 'localhost:9092'})

new_topic = NewTopic('my-topic', num_partitions=3, replication_factor=3)
futures = admin.create_topics([new_topic])

for topic, future in futures.items():
    try:
        future.result()
        print(f'Topic {topic} created')
    except Exception as e:
        print(f'Failed to create topic {topic}: {e}')

List Topics

from confluent_kafka.admin import AdminClient

admin = AdminClient({'bootstrap.servers': 'localhost:9092'})

metadata = admin.list_topics(timeout=10)

for topic in metadata.topics.values():
    print(f'Topic: {topic.topic}, Partitions: {len(topic.partitions)}')

Avro Serialization

Producer with Avro

from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

schema_registry_conf = {'url': 'http://localhost:8081'}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)

schema_str = """
{
    "type": "record",
    "name": "Event",
    "fields": [
        {"name": "id", "type": "string"},
        {"name": "timestamp", "type": "long"},
        {"name": "data", "type": "string"}
    ]
}
"""

avro_serializer = AvroSerializer(schema_registry_client, schema_str)

producer_conf = {'bootstrap.servers': 'localhost:9092'}
producer = Producer(producer_conf)

event = {'id': '123', 'timestamp': 1699900000, 'data': 'test'}

producer.produce(
    topic='events',
    key='key',
    value=avro_serializer(event, SerializationContext('events', MessageField.VALUE)),
    callback=delivery_callback
)
producer.flush()

Consumer with Avro

from confluent_kafka import Consumer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer

schema_registry_client = SchemaRegistryClient({'url': 'http://localhost:8081'})
avro_deserializer = AvroDeserializer(schema_registry_client)

consumer_conf = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest'
}

consumer = Consumer(consumer_conf)
consumer.subscribe(['events'])

while True:
    msg = consumer.poll(timeout=1.0)
    if msg is None:
        continue

    event = avro_deserializer(msg.value(), SerializationContext(msg.topic(), MessageField.VALUE))
    print(f'Received: {event}')

Configuration Reference

Producer

Property Description Default
acks Acknowledgments 1
retries Retry count 2147483647
batch.size Batch bytes 16384
linger.ms Batch delay 5
enable.idempotence Idempotent false
compression.type Compression none

Consumer

Property Description Default
group.id Consumer group Required
auto.offset.reset Reset behavior latest
enable.auto.commit Auto commit true
max.poll.interval.ms Max poll interval 300000
session.timeout.ms Session timeout 45000

Error Handling

from confluent_kafka import KafkaException, KafkaError

try:
    msg = consumer.poll(timeout=1.0)

    if msg is None:
        continue

    if msg.error():
        if msg.error().code() == KafkaError._PARTITION_EOF:
            # End of partition
            continue
        elif msg.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
            # Topic doesn't exist
            raise KafkaException(msg.error())
        else:
            raise KafkaException(msg.error())

    # Process message
    process(msg)

except KafkaException as e:
    print(f'Kafka error: {e}')
except Exception as e:
    print(f'Error: {e}')

Graceful Shutdown

import signal
import sys
from confluent_kafka import Consumer

running = True

def shutdown(signum, frame):
    global running
    running = False

signal.signal(signal.SIGINT, shutdown)
signal.signal(signal.SIGTERM, shutdown)

consumer = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'my-group'
})
consumer.subscribe(['events'])

try:
    while running:
        msg = consumer.poll(timeout=1.0)
        if msg and not msg.error():
            process(msg)
finally:
    consumer.close()