Kafka Rust Client¶
The rdkafka library provides a high-performance Rust client built on librdkafka with native async/await support. This guide covers installation, configuration, and usage patterns for Rust applications.
Client Information¶
| Library | rdkafka |
| Repository | github.com/fede1024/rust-rdkafka |
| Documentation | docs.rs/rdkafka |
| Package | crates.io |
| Current Version | 0.38.x (as of 2025) |
| Maintainer | Federico Giraud and community |
| License | MIT |
| Base | librdkafka (C library via FFI) |
History¶
The rust-rdkafka library was created by Federico Giraud in 2016, providing Rust bindings to librdkafka through the Foreign Function Interface (FFI). It was designed to provide idiomatic Rust APIs while leveraging the performance and reliability of librdkafka. Async/await support via the Tokio runtime was added in 2019. The library remains at version 0.x, indicating ongoing API evolution, though the core APIs are stable and widely used in production. It is the de facto standard Kafka client for the Rust ecosystem.
Feature Flags¶
The library supports various cargo features:
| Feature | Description |
|---|---|
cmake-build |
Build librdkafka from source |
ssl |
Enable SSL/TLS support |
sasl |
Enable SASL authentication |
tokio |
Async runtime support (default) |
smol |
Alternative async runtime |
Version Compatibility¶
| Client Version | librdkafka | Minimum Rust | Tokio |
|---|---|---|---|
| 0.38.x | 2.10.x+ | 1.70+ | 1.x |
| 0.36.x | 2.3.x+ | 1.70+ | 1.x |
| 0.34.x | 2.2.x+ | 1.65+ | 1.x |
Alternative: kafka-rust¶
A pure Rust implementation without librdkafka dependency (less maintained):
| Library | kafka |
| Repository | github.com/kafka-rust/kafka-rust |
| Note | Pure Rust but less feature-complete and less maintained |
External Resources¶
Installation¶
Cargo.toml¶
[dependencies]
rdkafka = { version = "0.36", features = ["cmake-build"] }
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
With SSL Support¶
[dependencies]
rdkafka = { version = "0.36", features = ["cmake-build", "ssl", "sasl"] }
Native Dependencies¶
The library requires librdkafka. The cmake-build feature compiles librdkafka from source. For system librdkafka:
[dependencies]
rdkafka = "0.36"
# macOS
brew install librdkafka
# Ubuntu/Debian
apt-get install librdkafka-dev
# RHEL/CentOS
yum install librdkafka-devel
Producer¶
Basic Producer¶
use rdkafka::config::ClientConfig;
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;
#[tokio::main]
async fn main() {
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", "kafka:9092")
.set("client.id", "order-service")
.create()
.expect("Failed to create producer");
let topic = "orders";
let key = "order-123";
let payload = r#"{"id": 123, "amount": 99.99}"#;
let delivery_result = producer
.send(
FutureRecord::to(topic)
.key(key)
.payload(payload),
Duration::from_secs(5),
)
.await;
match delivery_result {
Ok((partition, offset)) => {
println!("Delivered to partition {} at offset {}", partition, offset);
}
Err((err, _)) => {
eprintln!("Delivery failed: {:?}", err);
}
}
}
Production Producer Configuration¶
use rdkafka::config::ClientConfig;
use rdkafka::producer::FutureProducer;
let producer: FutureProducer = ClientConfig::new()
// Connection
.set("bootstrap.servers", "kafka-1:9092,kafka-2:9092,kafka-3:9092")
.set("client.id", "order-service-producer")
// Durability
.set("acks", "all")
.set("enable.idempotence", "true")
// Retries
.set("retries", "2147483647")
.set("delivery.timeout.ms", "120000")
.set("max.in.flight.requests.per.connection", "5")
// Batching
.set("batch.size", "65536")
.set("linger.ms", "10")
.set("queue.buffering.max.messages", "100000")
.set("queue.buffering.max.kbytes", "1048576")
// Compression
.set("compression.type", "lz4")
.create()
.expect("Failed to create producer");
Send with Headers¶
use rdkafka::message::{Header, OwnedHeaders};
let headers = OwnedHeaders::new()
.insert(Header {
key: "correlation-id",
value: Some("abc-123"),
})
.insert(Header {
key: "source",
value: Some("order-service"),
});
let delivery_result = producer
.send(
FutureRecord::to("orders")
.key("order-123")
.payload(r#"{"id": 123}"#)
.headers(headers),
Duration::from_secs(5),
)
.await;
Async Producer with Error Handling¶
use rdkafka::error::KafkaError;
use rdkafka::producer::FutureProducer;
use std::time::Duration;
struct ProducerService {
producer: FutureProducer,
}
impl ProducerService {
fn new(bootstrap_servers: &str) -> Result<Self, KafkaError> {
let producer = ClientConfig::new()
.set("bootstrap.servers", bootstrap_servers)
.set("client.id", "order-service")
.set("acks", "all")
.set("enable.idempotence", "true")
.create()?;
Ok(Self { producer })
}
async fn send(
&self,
topic: &str,
key: &str,
payload: &str,
) -> Result<(i32, i64), KafkaError> {
let record = FutureRecord::to(topic).key(key).payload(payload);
match self.producer.send(record, Duration::from_secs(5)).await {
Ok((partition, offset)) => {
println!("Delivered to partition {} at offset {}", partition, offset);
Ok((partition, offset))
}
Err((err, _)) => {
eprintln!("Delivery failed: {:?}", err);
Err(err)
}
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let service = ProducerService::new("kafka:9092")?;
service.send("orders", "123", r#"{"id": 123}"#).await?;
Ok(())
}
ThreadedProducer for Fire-and-Forget¶
use rdkafka::config::ClientConfig;
use rdkafka::producer::{BaseRecord, ThreadedProducer};
let producer: ThreadedProducer<_> = ClientConfig::new()
.set("bootstrap.servers", "kafka:9092")
.create()
.expect("Failed to create producer");
// Fire-and-forget (callback-based)
producer
.send(
BaseRecord::to("orders")
.key("order-123")
.payload(r#"{"id": 123}"#),
)
.unwrap_or_else(|e| {
eprintln!("Failed to send: {:?}", e.0);
});
// Wait for all messages to be delivered
producer.flush(Duration::from_secs(10));
Consumer¶
Basic Consumer¶
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{StreamConsumer, Consumer};
use rdkafka::message::Message;
#[tokio::main]
async fn main() {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "kafka:9092")
.set("group.id", "order-processors")
.set("auto.offset.reset", "earliest")
.set("enable.auto.commit", "false")
.create()
.expect("Failed to create consumer");
consumer
.subscribe(&["orders"])
.expect("Failed to subscribe");
loop {
match consumer.recv().await {
Ok(message) => {
let key = message.key().map(|k| String::from_utf8_lossy(k));
let payload = message.payload().map(|p| String::from_utf8_lossy(p));
println!(
"Received: key={:?} value={:?} partition={} offset={}",
key,
payload,
message.partition(),
message.offset()
);
consumer.commit_message(&message, rdkafka::consumer::CommitMode::Async)
.expect("Failed to commit");
}
Err(e) => {
eprintln!("Consumer error: {:?}", e);
}
}
}
}
Production Consumer Configuration¶
use rdkafka::config::ClientConfig;
use rdkafka::consumer::StreamConsumer;
let consumer: StreamConsumer = ClientConfig::new()
// Connection
.set("bootstrap.servers", "kafka-1:9092,kafka-2:9092,kafka-3:9092")
.set("group.id", "order-processors")
.set("client.id", "order-processor-1")
// Offset management
.set("enable.auto.commit", "false")
.set("auto.offset.reset", "earliest")
// Session management
.set("session.timeout.ms", "45000")
.set("heartbeat.interval.ms", "15000")
.set("max.poll.interval.ms", "300000")
// Fetch configuration
.set("fetch.min.bytes", "1")
.set("fetch.max.bytes", "52428800")
.set("max.partition.fetch.bytes", "1048576")
// Assignment strategy
.set("partition.assignment.strategy", "cooperative-sticky")
// Isolation level
.set("isolation.level", "read_committed")
.create()
.expect("Failed to create consumer");
Stream-based Consumer¶
use rdkafka::consumer::StreamConsumer;
use futures::StreamExt;
#[tokio::main]
async fn main() {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "kafka:9092")
.set("group.id", "order-processors")
.set("enable.auto.commit", "false")
.create()
.expect("Failed to create consumer");
consumer.subscribe(&["orders"]).expect("Failed to subscribe");
let mut message_stream = consumer.stream();
while let Some(message) = message_stream.next().await {
match message {
Ok(msg) => {
if let Some(payload) = msg.payload() {
let value = String::from_utf8_lossy(payload);
println!("Received: {}", value);
process_message(&value).await;
consumer
.commit_message(&msg, rdkafka::consumer::CommitMode::Async)
.expect("Failed to commit");
}
}
Err(e) => {
eprintln!("Stream error: {:?}", e);
}
}
}
}
async fn process_message(value: &str) {
// Process message
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
Consumer with Graceful Shutdown¶
use rdkafka::consumer::{Consumer, StreamConsumer};
use tokio::signal;
use futures::StreamExt;
#[tokio::main]
async fn main() {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "kafka:9092")
.set("group.id", "order-processors")
.create()
.expect("Failed to create consumer");
consumer.subscribe(&["orders"]).expect("Failed to subscribe");
let mut message_stream = consumer.stream();
loop {
tokio::select! {
_ = signal::ctrl_c() => {
println!("Shutting down...");
break;
}
message = message_stream.next() => {
match message {
Some(Ok(msg)) => {
process_message(&msg).await;
consumer.commit_message(&msg, rdkafka::consumer::CommitMode::Async)
.expect("Failed to commit");
}
Some(Err(e)) => {
eprintln!("Error: {:?}", e);
}
None => break,
}
}
}
}
println!("Consumer stopped");
}
async fn process_message(msg: &rdkafka::message::BorrowedMessage<'_>) {
if let Some(payload) = msg.payload() {
let value = String::from_utf8_lossy(payload);
println!("Processing: {}", value);
}
}
Batch Processing with Buffer¶
use rdkafka::consumer::StreamConsumer;
use futures::StreamExt;
use tokio::time::{Duration, timeout};
async fn consume_batch(
consumer: &StreamConsumer,
batch_size: usize,
timeout_duration: Duration,
) -> Vec<rdkafka::message::OwnedMessage> {
let mut batch = Vec::with_capacity(batch_size);
let mut message_stream = consumer.stream();
let batch_future = async {
while batch.len() < batch_size {
if let Some(Ok(msg)) = message_stream.next().await {
batch.push(msg.detach());
} else {
break;
}
}
batch
};
timeout(timeout_duration, batch_future)
.await
.unwrap_or_else(|_| batch)
}
#[tokio::main]
async fn main() {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "kafka:9092")
.set("group.id", "order-processors")
.set("enable.auto.commit", "false")
.create()
.expect("Failed to create consumer");
consumer.subscribe(&["orders"]).expect("Failed to subscribe");
loop {
let batch = consume_batch(&consumer, 500, Duration::from_secs(1)).await;
if !batch.is_empty() {
println!("Processing batch of {} messages", batch.len());
for msg in &batch {
process_owned_message(msg).await;
}
// Commit last message
if let Some(last_msg) = batch.last() {
consumer
.commit_consumer_state(rdkafka::consumer::CommitMode::Async)
.expect("Failed to commit");
}
}
}
}
async fn process_owned_message(msg: &rdkafka::message::OwnedMessage) {
if let Some(payload) = msg.payload() {
let value = String::from_utf8_lossy(payload);
println!("Processed: {}", value);
}
}
Manual Partition Assignment¶
use rdkafka::consumer::{Consumer, BaseConsumer};
use rdkafka::topic_partition_list::TopicPartitionList;
use rdkafka::Offset;
let consumer: BaseConsumer = ClientConfig::new()
.set("bootstrap.servers", "kafka:9092")
.set("group.id", "order-processors")
.create()
.expect("Failed to create consumer");
// Assign specific partitions
let mut tpl = TopicPartitionList::new();
tpl.add_partition("orders", 0);
tpl.add_partition("orders", 1);
consumer.assign(&tpl).expect("Failed to assign partitions");
// Seek to specific offset
let mut tpl_seek = TopicPartitionList::new();
tpl_seek.add_partition_offset("orders", 0, Offset::Offset(1000))
.expect("Failed to add partition");
consumer.seek_partitions(tpl_seek, Duration::from_secs(5))
.expect("Failed to seek");
Error Handling¶
Producer Error Handling¶
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
async fn send_with_error_handling(
producer: &FutureProducer,
topic: &str,
key: &str,
payload: &str,
) -> Result<(i32, i64), String> {
let record = FutureRecord::to(topic).key(key).payload(payload);
match producer.send(record, Duration::from_secs(5)).await {
Ok((partition, offset)) => Ok((partition, offset)),
Err((err, _)) => {
match err {
KafkaError::MessageProduction(code) => {
match code {
RDKafkaErrorCode::MessageSizeTooLarge => {
eprintln!("Message too large, sending to DLQ");
Err("Message too large".to_string())
}
RDKafkaErrorCode::TopicAuthorizationFailed => {
eprintln!("Authorization failed");
Err("Not authorized".to_string())
}
_ => {
eprintln!("Production error: {:?}", code);
Err(format!("Production error: {:?}", code))
}
}
}
_ => {
eprintln!("Kafka error: {:?}", err);
Err(format!("Kafka error: {:?}", err))
}
}
}
}
}
Consumer Error Handling¶
use rdkafka::error::KafkaError;
use rdkafka::consumer::ConsumerContext;
use rdkafka::client::ClientContext;
// Custom context for error handling
struct CustomContext;
impl ClientContext for CustomContext {}
impl ConsumerContext for CustomContext {
fn commit_callback(&self, result: rdkafka::error::KafkaResult<()>, _offsets: &rdkafka::TopicPartitionList) {
match result {
Ok(_) => println!("Offset committed successfully"),
Err(e) => eprintln!("Commit error: {:?}", e),
}
}
}
#[tokio::main]
async fn main() {
let context = CustomContext;
let consumer: StreamConsumer<CustomContext> = ClientConfig::new()
.set("bootstrap.servers", "kafka:9092")
.set("group.id", "order-processors")
.create_with_context(context)
.expect("Failed to create consumer");
consumer.subscribe(&["orders"]).expect("Failed to subscribe");
loop {
match consumer.recv().await {
Ok(msg) => {
match process_message(&msg).await {
Ok(_) => {
consumer
.commit_message(&msg, rdkafka::consumer::CommitMode::Async)
.unwrap_or_else(|e| eprintln!("Commit error: {:?}", e));
}
Err(e) => {
eprintln!("Processing error: {:?}", e);
// Send to DLQ
send_to_dlq(&msg).await;
// Still commit to move forward
consumer
.commit_message(&msg, rdkafka::consumer::CommitMode::Async)
.unwrap_or_else(|e| eprintln!("Commit error: {:?}", e));
}
}
}
Err(e) => {
eprintln!("Consumer error: {:?}", e);
}
}
}
}
async fn process_message(msg: &rdkafka::message::BorrowedMessage<'_>) -> Result<(), String> {
// Process message
Ok(())
}
async fn send_to_dlq(msg: &rdkafka::message::BorrowedMessage<'_>) {
// Send to dead letter queue
}
Retry with Exponential Backoff¶
use tokio::time::{sleep, Duration};
async fn send_with_retry(
producer: &FutureProducer,
topic: &str,
key: &str,
payload: &str,
max_retries: u32,
) -> Result<(i32, i64), KafkaError> {
let mut attempt = 0;
loop {
let record = FutureRecord::to(topic).key(key).payload(payload);
match producer.send(record, Duration::from_secs(5)).await {
Ok((partition, offset)) => return Ok((partition, offset)),
Err((err, _)) => {
attempt += 1;
if attempt >= max_retries {
return Err(err);
}
// Exponential backoff
let backoff = Duration::from_millis(100 * 2u64.pow(attempt));
let backoff = backoff.min(Duration::from_secs(30));
eprintln!("Retry attempt {} after {:?}", attempt, backoff);
sleep(backoff).await;
}
}
}
}
Testing¶
Mock Producer¶
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
struct MockProducer {
messages: Arc<Mutex<Vec<(String, String, String)>>>,
}
impl MockProducer {
fn new() -> Self {
Self {
messages: Arc::new(Mutex::new(Vec::new())),
}
}
async fn send(&self, topic: &str, key: &str, payload: &str) -> Result<(), String> {
self.messages
.lock()
.unwrap()
.push((topic.to_string(), key.to_string(), payload.to_string()));
Ok(())
}
fn get_messages(&self) -> Vec<(String, String, String)> {
self.messages.lock().unwrap().clone()
}
}
#[tokio::test]
async fn test_order_service() {
let mock = MockProducer::new();
mock.send("orders", "123", r#"{"id": 123}"#)
.await
.expect("Failed to send");
let messages = mock.get_messages();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].0, "orders");
assert_eq!(messages[0].1, "123");
}
}
Integration Testing¶
#[cfg(test)]
mod integration_tests {
use super::*;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{Consumer, StreamConsumer};
use rdkafka::producer::FutureProducer;
use tokio::time::Duration;
async fn setup_kafka() -> (FutureProducer, StreamConsumer) {
let bootstrap_servers = "localhost:9092";
let producer = ClientConfig::new()
.set("bootstrap.servers", bootstrap_servers)
.create()
.expect("Failed to create producer");
let consumer = ClientConfig::new()
.set("bootstrap.servers", bootstrap_servers)
.set("group.id", "test")
.set("auto.offset.reset", "earliest")
.create()
.expect("Failed to create consumer");
(producer, consumer)
}
#[tokio::test]
#[ignore] // Run with: cargo test -- --ignored
async fn test_produce_consume() {
let (producer, consumer) = setup_kafka().await;
consumer.subscribe(&["test"]).expect("Failed to subscribe");
// Produce
let record = FutureRecord::to("test")
.key("key")
.payload("value");
producer
.send(record, Duration::from_secs(5))
.await
.expect("Failed to produce");
// Consume
let msg = consumer.recv().await.expect("Failed to consume");
assert_eq!(
msg.payload().map(|p| String::from_utf8_lossy(p)),
Some(std::borrow::Cow::from("value"))
);
}
}
Property-based Testing with Proptest¶
#[cfg(test)]
mod proptests {
use proptest::prelude::*;
proptest! {
#[test]
fn test_message_serialization(
id in 0u64..1000000,
amount in 0.0f64..1000000.0
) {
let json = serde_json::json!({
"id": id,
"amount": amount
});
let serialized = serde_json::to_string(&json).unwrap();
let deserialized: serde_json::Value = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized["id"], id);
assert_eq!(deserialized["amount"], amount);
}
}
}
Tokio Integration¶
Multi-threaded Runtime¶
use rdkafka::consumer::StreamConsumer;
use tokio::runtime::Runtime;
use futures::StreamExt;
fn main() {
let runtime = Runtime::new().unwrap();
runtime.block_on(async {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "kafka:9092")
.set("group.id", "order-processors")
.create()
.expect("Failed to create consumer");
consumer.subscribe(&["orders"]).expect("Failed to subscribe");
let mut message_stream = consumer.stream();
while let Some(message) = message_stream.next().await {
match message {
Ok(msg) => {
tokio::spawn(async move {
process_message_async(&msg).await;
});
}
Err(e) => eprintln!("Error: {:?}", e),
}
}
});
}
async fn process_message_async(msg: &rdkafka::message::BorrowedMessage<'_>) {
// Async processing
}
Concurrent Consumer Pool¶
use rdkafka::consumer::StreamConsumer;
use futures::StreamExt;
use std::sync::Arc;
#[tokio::main]
async fn main() {
let num_consumers = 4;
let mut handles = Vec::new();
for i in 0..num_consumers {
let handle = tokio::spawn(async move {
let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", "kafka:9092")
.set("group.id", "order-processors")
.set("client.id", &format!("consumer-{}", i))
.create()
.expect("Failed to create consumer");
consumer.subscribe(&["orders"]).expect("Failed to subscribe");
let mut message_stream = consumer.stream();
while let Some(message) = message_stream.next().await {
match message {
Ok(msg) => {
process_message(&msg).await;
consumer
.commit_message(&msg, rdkafka::consumer::CommitMode::Async)
.expect("Failed to commit");
}
Err(e) => eprintln!("Error: {:?}", e),
}
}
});
handles.push(handle);
}
// Wait for all consumers
for handle in handles {
handle.await.unwrap();
}
}
async fn process_message(msg: &rdkafka::message::BorrowedMessage<'_>) {
// Process message
}
Related Documentation¶
- Producer Development - Producer patterns
- Consumer Development - Consumer patterns
- Transactions - Transaction patterns