Kafka .NET Client
The Confluent.Kafka library provides a high-performance .NET client built on librdkafka. This guide covers installation, configuration, and usage patterns for .NET applications.
History
The .NET Kafka client was originally released as RdKafka in 2016 by Andreas Heider, providing P/Invoke bindings to librdkafka. Confluent acquired and rebranded the project to Confluent.Kafka in 2017, investing in improved APIs and documentation. The library uses Platform Invocation Services (P/Invoke) to call librdkafka, which is bundled as native binaries for Windows, Linux, and macOS. The library supports .NET Standard 2.0+, .NET Framework 4.6.2+, and .NET Core 2.0+, making it suitable for both legacy and modern .NET applications.
| Platform |
Architecture |
Notes |
| Windows |
x64, x86 |
Native libraries bundled |
| Linux |
x64, arm64 |
Native libraries bundled |
| macOS |
x64, arm64 |
Native libraries bundled |
Version Compatibility
| Client Version |
librdkafka |
.NET |
.NET Framework |
| 2.12.x |
2.12.x |
6.0+ |
4.6.2+ |
| 2.10.x |
2.10.x |
6.0+ |
4.6.2+ |
| 2.6.x |
2.6.x |
6.0+ |
4.6.2+ |
| 2.3.x |
2.3.x |
6.0+ |
4.6.2+ |
External Resources
Installation
NuGet Package Manager
Install-Package Confluent.Kafka
.NET CLI
dotnet add package Confluent.Kafka
PackageReference
<PackageReference Include="Confluent.Kafka" Version="2.3.0" />
With Schema Registry
dotnet add package Confluent.SchemaRegistry
dotnet add package Confluent.SchemaRegistry.Serdes.Avro
Producer
Basic Producer
using Confluent.Kafka;
using System;
class Program
{
static void Main(string[] args)
{
var config = new ProducerConfig
{
BootstrapServers = "kafka:9092",
ClientId = "order-service"
};
using (var producer = new ProducerBuilder<string, string>(config).Build())
{
try
{
var result = producer.ProduceAsync("orders",
new Message<string, string>
{
Key = "order-123",
Value = "{\"id\": 123, \"amount\": 99.99}"
}).GetAwaiter().GetResult();
Console.WriteLine($"Delivered to {result.TopicPartitionOffset}");
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}
Production Producer Configuration
var config = new ProducerConfig
{
// Connection
BootstrapServers = "kafka-1:9092,kafka-2:9092,kafka-3:9092",
ClientId = "order-service-producer",
// Durability
Acks = Acks.All,
EnableIdempotence = true,
// Retries
MessageSendMaxRetries = int.MaxValue,
RequestTimeoutMs = 30000,
MessageTimeoutMs = 120000,
MaxInFlight = 5,
// Batching
BatchSize = 65536,
LingerMs = 10,
QueueBufferingMaxMessages = 100000,
QueueBufferingMaxKbytes = 1048576,
// Compression
CompressionType = CompressionType.Lz4,
// Error handling
EnableDeliveryReports = true,
DeliveryReportFields = "all"
};
Asynchronous Send
using var producer = new ProducerBuilder<string, string>(config).Build();
// Fire-and-forget (not recommended for production)
producer.Produce("orders",
new Message<string, string> { Key = "key", Value = "value" },
deliveryHandler: (deliveryReport) =>
{
if (deliveryReport.Error.Code != ErrorCode.NoError)
{
Console.WriteLine($"Delivery failed: {deliveryReport.Error.Reason}");
}
else
{
Console.WriteLine($"Delivered to {deliveryReport.TopicPartitionOffset}");
}
});
// Async/await pattern
var deliveryResult = await producer.ProduceAsync("orders",
new Message<string, string> { Key = "key", Value = "value" });
Console.WriteLine($"Delivered to partition {deliveryResult.Partition} at offset {deliveryResult.Offset}");
var message = new Message<string, string>
{
Key = "order-123",
Value = "{\"id\": 123}",
Headers = new Headers
{
{ "correlation-id", Encoding.UTF8.GetBytes("abc-123") },
{ "source", Encoding.UTF8.GetBytes("order-service") }
}
};
await producer.ProduceAsync("orders", message);
Custom Partitioner
public class RegionPartitioner : IPartitioner
{
public int Partition(string topic, int partitionCount, ReadOnlySpan<byte> keyData, bool keyIsNull)
{
if (keyIsNull)
{
return Random.Shared.Next(partitionCount);
}
var key = Encoding.UTF8.GetString(keyData);
// Route by region prefix
if (key.StartsWith("US-")) return 0;
if (key.StartsWith("EU-")) return 1;
// Default: hash-based
return Math.Abs(key.GetHashCode()) % partitionCount;
}
public void Dispose() { }
}
// Usage
var config = new ProducerConfig
{
BootstrapServers = "kafka:9092",
Partitioner = Partitioner.Consistent
};
var producer = new ProducerBuilder<string, string>(config)
.SetPartitioner(new RegionPartitioner())
.Build();
Producer Service with Dependency Injection
public interface IKafkaProducer
{
Task<DeliveryResult<string, string>> SendAsync(string topic, string key, string value);
}
public class KafkaProducer : IKafkaProducer, IDisposable
{
private readonly IProducer<string, string> _producer;
private readonly ILogger<KafkaProducer> _logger;
public KafkaProducer(IConfiguration configuration, ILogger<KafkaProducer> logger)
{
_logger = logger;
var config = new ProducerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
ClientId = configuration["Kafka:ClientId"],
Acks = Acks.All,
EnableIdempotence = true,
CompressionType = CompressionType.Lz4
};
_producer = new ProducerBuilder<string, string>(config)
.SetErrorHandler((_, error) =>
{
_logger.LogError("Producer error: {Error}", error.Reason);
})
.Build();
}
public async Task<DeliveryResult<string, string>> SendAsync(string topic, string key, string value)
{
try
{
var result = await _producer.ProduceAsync(topic,
new Message<string, string> { Key = key, Value = value });
_logger.LogInformation(
"Message delivered to {Topic} [{Partition}] at offset {Offset}",
result.Topic, result.Partition, result.Offset);
return result;
}
catch (ProduceException<string, string> ex)
{
_logger.LogError(ex, "Failed to deliver message: {Reason}", ex.Error.Reason);
throw;
}
}
public void Dispose()
{
_producer?.Flush(TimeSpan.FromSeconds(10));
_producer?.Dispose();
}
}
// Startup.cs / Program.cs
services.AddSingleton<IKafkaProducer, KafkaProducer>();
Consumer
Basic Consumer
using Confluent.Kafka;
using System;
class Program
{
static void Main(string[] args)
{
var config = new ConsumerConfig
{
BootstrapServers = "kafka:9092",
GroupId = "order-processors",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
using (var consumer = new ConsumerBuilder<string, string>(config).Build())
{
consumer.Subscribe("orders");
try
{
while (true)
{
var cr = consumer.Consume(TimeSpan.FromMilliseconds(100));
if (cr != null)
{
Console.WriteLine($"Received: {cr.Message.Key} = {cr.Message.Value}");
Console.WriteLine($"Partition: {cr.Partition}, Offset: {cr.Offset}");
consumer.Commit(cr);
}
}
}
catch (ConsumeException e)
{
Console.WriteLine($"Error: {e.Error.Reason}");
}
finally
{
consumer.Close();
}
}
}
}
Production Consumer Configuration
var config = new ConsumerConfig
{
// Connection
BootstrapServers = "kafka-1:9092,kafka-2:9092,kafka-3:9092",
GroupId = "order-processors",
ClientId = "order-processor-1",
// Offset management
EnableAutoCommit = false,
AutoOffsetReset = AutoOffsetReset.Earliest,
// Session management
SessionTimeoutMs = 45000,
HeartbeatIntervalMs = 15000,
MaxPollIntervalMs = 300000,
// Fetch configuration
FetchMinBytes = 1,
FetchMaxBytes = 52428800,
MaxPartitionFetchBytes = 1048576,
// Assignment strategy
PartitionAssignmentStrategy = PartitionAssignmentStrategy.CooperativeSticky,
// Performance
IsolationLevel = IsolationLevel.ReadCommitted
};
Async Consumer Pattern
public class ConsumerService : BackgroundService
{
private readonly ILogger<ConsumerService> _logger;
private readonly IConsumer<string, string> _consumer;
public ConsumerService(IConfiguration configuration, ILogger<ConsumerService> logger)
{
_logger = logger;
var config = new ConsumerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"],
GroupId = "order-processors",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
_consumer = new ConsumerBuilder<string, string>(config)
.SetErrorHandler((_, e) => _logger.LogError("Consumer error: {Reason}", e.Reason))
.Build();
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.Subscribe("orders");
try
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
var cr = _consumer.Consume(stoppingToken);
if (cr != null)
{
await ProcessMessageAsync(cr.Message, stoppingToken);
_consumer.Commit(cr);
}
}
catch (ConsumeException ex)
{
_logger.LogError(ex, "Consume error: {Reason}", ex.Error.Reason);
}
}
}
finally
{
_consumer.Close();
}
}
private async Task ProcessMessageAsync(Message<string, string> message, CancellationToken cancellationToken)
{
_logger.LogInformation("Processing message: {Key} = {Value}", message.Key, message.Value);
await Task.Delay(100, cancellationToken); // Simulate processing
}
public override void Dispose()
{
_consumer?.Dispose();
base.Dispose();
}
}
// Program.cs
services.AddHostedService<ConsumerService>();
Batch Processing
var batch = new List<ConsumeResult<string, string>>();
var batchSize = 500;
var timeout = TimeSpan.FromSeconds(1);
while (!cancellationToken.IsCancellationRequested)
{
batch.Clear();
var deadline = DateTime.UtcNow.Add(timeout);
while (batch.Count < batchSize && DateTime.UtcNow < deadline)
{
var remaining = deadline - DateTime.UtcNow;
if (remaining <= TimeSpan.Zero) break;
var cr = consumer.Consume(remaining);
if (cr != null)
{
batch.Add(cr);
}
}
if (batch.Count > 0)
{
await ProcessBatchAsync(batch);
// Commit last message in batch
consumer.Commit(batch[^1]);
}
}
Rebalance Callbacks
var consumer = new ConsumerBuilder<string, string>(config)
.SetPartitionsAssignedHandler((c, partitions) =>
{
_logger.LogInformation("Partitions assigned: {Partitions}",
string.Join(", ", partitions));
// Initialize state for new partitions
})
.SetPartitionsRevokedHandler((c, partitions) =>
{
_logger.LogInformation("Partitions revoked: {Partitions}",
string.Join(", ", partitions));
// Commit offsets before rebalance
try
{
c.Commit(partitions);
}
catch (KafkaException ex)
{
_logger.LogError(ex, "Commit error during rebalance");
}
})
.SetPartitionsLostHandler((c, partitions) =>
{
_logger.LogWarning("Partitions lost: {Partitions}",
string.Join(", ", partitions));
// Handle unexpected partition loss
})
.Build();
Manual Partition Assignment
using Confluent.Kafka;
var consumer = new ConsumerBuilder<string, string>(config).Build();
// Assign specific partitions
var partitions = new[]
{
new TopicPartition("orders", 0),
new TopicPartition("orders", 1)
};
consumer.Assign(partitions);
// Seek to specific offset
consumer.Seek(new TopicPartitionOffset("orders", 0, 1000));
// Seek to beginning
consumer.Seek(new TopicPartitionOffset("orders", 0, Offset.Beginning));
Admin Client
Create Topics
using Confluent.Kafka.Admin;
using System.Collections.Generic;
var config = new AdminClientConfig
{
BootstrapServers = "kafka:9092"
};
using var admin = new AdminClientBuilder(config).Build();
var topicSpec = new TopicSpecification
{
Name = "orders",
NumPartitions = 6,
ReplicationFactor = 3,
Configs = new Dictionary<string, string>
{
{ "retention.ms", "604800000" },
{ "cleanup.policy", "delete" }
}
};
try
{
await admin.CreateTopicsAsync(new[] { topicSpec });
Console.WriteLine("Topic created successfully");
}
catch (CreateTopicsException ex)
{
Console.WriteLine($"Failed to create topic: {ex.Results[0].Error.Reason}");
}
Describe Topics
var metadata = admin.GetMetadata("orders", TimeSpan.FromSeconds(10));
foreach (var topic in metadata.Topics)
{
Console.WriteLine($"Topic: {topic.Topic}");
foreach (var partition in topic.Partitions)
{
Console.WriteLine($" Partition {partition.PartitionId}:");
Console.WriteLine($" Leader: {partition.Leader}");
Console.WriteLine($" Replicas: {string.Join(", ", partition.Replicas)}");
Console.WriteLine($" ISR: {string.Join(", ", partition.InSyncReplicas)}");
}
}
Delete Topics
try
{
await admin.DeleteTopicsAsync(new[] { "old-topic" });
Console.WriteLine("Topic deleted successfully");
}
catch (DeleteTopicsException ex)
{
Console.WriteLine($"Failed to delete topic: {ex.Results[0].Error.Reason}");
}
List Consumer Groups
var groups = await admin.ListGroupsAsync(TimeSpan.FromSeconds(10));
foreach (var group in groups)
{
Console.WriteLine($"Group: {group.Group} (Protocol: {group.ProtocolType})");
}
Describe Consumer Group
var groupInfo = await admin.DescribeConsumerGroupsAsync(
new[] { "order-processors" },
new DescribeConsumerGroupsOptions { RequestTimeout = TimeSpan.FromSeconds(10) });
foreach (var group in groupInfo)
{
Console.WriteLine($"Group: {group.GroupId}");
Console.WriteLine($"State: {group.State}");
foreach (var member in group.Members)
{
Console.WriteLine($" Member: {member.MemberId}");
Console.WriteLine($" Client: {member.ClientId}");
}
}
Error Handling
Producer Error Handling
var producer = new ProducerBuilder<string, string>(config)
.SetErrorHandler((_, error) =>
{
_logger.LogError("Producer error: {Code} - {Reason}", error.Code, error.Reason);
if (error.IsFatal)
{
_logger.LogCritical("Fatal producer error - requires restart");
}
if (error.IsLocalError)
{
_logger.LogWarning("Local error - may be transient");
}
})
.Build();
// Delivery error handling
try
{
var result = await producer.ProduceAsync("orders",
new Message<string, string> { Key = "key", Value = "value" });
}
catch (ProduceException<string, string> ex)
{
if (ex.Error.Code == ErrorCode.TopicAuthorizationFailed)
{
_logger.LogError("Authorization failed for topic");
}
else if (ex.Error.Code == ErrorCode.MsgSizeTooLarge)
{
_logger.LogError("Message too large");
await SendToDeadLetterQueueAsync(ex.DeliveryResult.Message);
}
else if (ex.Error.IsRetriable)
{
_logger.LogWarning("Retriable error: {Reason}", ex.Error.Reason);
}
else
{
_logger.LogError(ex, "Fatal delivery error");
throw;
}
}
Consumer Error Handling
while (!stoppingToken.IsCancellationRequested)
{
try
{
var cr = consumer.Consume(stoppingToken);
if (cr == null) continue;
try
{
await ProcessMessageAsync(cr.Message);
consumer.Commit(cr);
}
catch (Exception ex)
{
_logger.LogError(ex, "Processing error for message at offset {Offset}", cr.Offset);
// Send to dead letter queue
await deadLetterProducer.ProduceAsync("orders-dlq",
new Message<string, string>
{
Key = cr.Message.Key,
Value = cr.Message.Value,
Headers = new Headers(cr.Message.Headers)
{
{ "original-topic", Encoding.UTF8.GetBytes(cr.Topic) },
{ "original-partition", BitConverter.GetBytes(cr.Partition.Value) },
{ "original-offset", BitConverter.GetBytes(cr.Offset.Value) },
{ "error", Encoding.UTF8.GetBytes(ex.Message) }
}
});
// Still commit to move forward
consumer.Commit(cr);
}
}
catch (ConsumeException ex)
{
_logger.LogError(ex, "Consume error: {Code} - {Reason}", ex.Error.Code, ex.Error.Reason);
if (ex.Error.Code == ErrorCode.TopicAuthorizationFailed)
{
throw; // Fatal - can't recover
}
}
catch (KafkaException ex)
{
_logger.LogError(ex, "Kafka error: {Reason}", ex.Error.Reason);
if (ex.Error.IsFatal)
{
throw;
}
}
}
Testing
Mock Producer
public class MockProducer<TKey, TValue> : IProducer<TKey, TValue>
{
public List<Message<TKey, TValue>> Messages { get; } = new();
public Handle Handle => throw new NotImplementedException();
public string Name => "MockProducer";
public Task<DeliveryResult<TKey, TValue>> ProduceAsync(
string topic,
Message<TKey, TValue> message,
CancellationToken cancellationToken = default)
{
Messages.Add(message);
var result = new DeliveryResult<TKey, TValue>
{
TopicPartitionOffset = new TopicPartitionOffset(topic, 0, 0),
Message = message,
Status = PersistenceStatus.Persisted
};
return Task.FromResult(result);
}
public void Produce(string topic, Message<TKey, TValue> message,
Action<DeliveryReport<TKey, TValue>> deliveryHandler = null)
{
Messages.Add(message);
deliveryHandler?.Invoke(new DeliveryReport<TKey, TValue>
{
TopicPartitionOffset = new TopicPartitionOffset(topic, 0, 0),
Message = message,
Status = PersistenceStatus.Persisted
});
}
public int Flush(TimeSpan timeout) => 0;
public void Flush(CancellationToken cancellationToken = default) { }
public int Poll(TimeSpan timeout) => 0;
public void Dispose() { }
// Other interface methods omitted for brevity
}
Unit Testing with xUnit
public class OrderServiceTests
{
[Fact]
public async Task CreateOrder_ShouldSendMessageToKafka()
{
// Arrange
var mockProducer = new MockProducer<string, string>();
var service = new OrderService(mockProducer);
var order = new Order { Id = "123", Amount = 99.99m };
// Act
await service.CreateOrderAsync(order);
// Assert
Assert.Single(mockProducer.Messages);
var message = mockProducer.Messages[0];
Assert.Equal("123", message.Key);
var orderData = JsonSerializer.Deserialize<Order>(message.Value);
Assert.Equal(99.99m, orderData.Amount);
}
[Fact]
public async Task CreateOrder_WithHeaders_ShouldIncludeCorrelationId()
{
// Arrange
var mockProducer = new MockProducer<string, string>();
var service = new OrderService(mockProducer);
// Act
await service.CreateOrderAsync(new Order { Id = "123" }, correlationId: "abc-123");
// Assert
var message = mockProducer.Messages[0];
Assert.NotNull(message.Headers);
var header = message.Headers.FirstOrDefault(h => h.Key == "correlation-id");
Assert.NotNull(header);
Assert.Equal("abc-123", Encoding.UTF8.GetString(header.GetValueBytes()));
}
}
Integration Testing
public class KafkaIntegrationTests : IClassFixture<KafkaFixture>
{
private readonly KafkaFixture _fixture;
public KafkaIntegrationTests(KafkaFixture fixture)
{
_fixture = fixture;
}
[Fact]
public async Task ProduceConsume_ShouldWorkEndToEnd()
{
// Arrange
var producerConfig = new ProducerConfig
{
BootstrapServers = _fixture.BootstrapServers
};
var consumerConfig = new ConsumerConfig
{
BootstrapServers = _fixture.BootstrapServers,
GroupId = "test",
AutoOffsetReset = AutoOffsetReset.Earliest
};
// Act - Produce
using var producer = new ProducerBuilder<string, string>(producerConfig).Build();
await producer.ProduceAsync("test",
new Message<string, string> { Key = "key", Value = "value" });
// Act - Consume
using var consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
consumer.Subscribe("test");
var cr = consumer.Consume(TimeSpan.FromSeconds(10));
// Assert
Assert.NotNull(cr);
Assert.Equal("value", cr.Message.Value);
}
}
public class KafkaFixture : IDisposable
{
// Use Testcontainers or Docker Compose for integration tests
public string BootstrapServers { get; } = "localhost:9092";
public void Dispose()
{
// Cleanup
}
}