Skip to content

Kafka .NET Driver

The Confluent.Kafka library provides a high-performance .NET client based on librdkafka.


Installation

NuGet Package Manager

Install-Package Confluent.Kafka

.NET CLI

dotnet add package Confluent.Kafka

With Avro Serialization

dotnet add package Confluent.SchemaRegistry.Serdes.Avro

Producer

Basic Producer

using Confluent.Kafka;

var config = new ProducerConfig
{
    BootstrapServers = "localhost:9092"
};

using var producer = new ProducerBuilder<string, string>(config).Build();

var result = await producer.ProduceAsync("events",
    new Message<string, string>
    {
        Key = "key",
        Value = "value"
    });

Console.WriteLine($"Delivered to {result.TopicPartitionOffset}");

Reliable Producer

var config = new ProducerConfig
{
    BootstrapServers = "localhost:9092",
    Acks = Acks.All,
    EnableIdempotence = true,
    MaxInFlight = 5,
    MessageSendMaxRetries = 10,
    RetryBackoffMs = 100,
    LingerMs = 5,
    BatchSize = 16384,
    CompressionType = CompressionType.Lz4
};

using var producer = new ProducerBuilder<string, string>(config)
    .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
    .Build();

Fire and Forget

producer.Produce("events", new Message<string, string>
{
    Key = "key",
    Value = "value"
}, deliveryReport =>
{
    if (deliveryReport.Error.Code != ErrorCode.NoError)
    {
        Console.WriteLine($"Delivery failed: {deliveryReport.Error.Reason}");
    }
});

// Flush before exit
producer.Flush(TimeSpan.FromSeconds(10));

Transactional Producer

var config = new ProducerConfig
{
    BootstrapServers = "localhost:9092",
    TransactionalId = "my-transactional-id",
    EnableIdempotence = true
};

using var producer = new ProducerBuilder<string, string>(config).Build();

producer.InitTransactions(TimeSpan.FromSeconds(30));

try
{
    producer.BeginTransaction();

    await producer.ProduceAsync("topic1", new Message<string, string> { Value = "msg1" });
    await producer.ProduceAsync("topic2", new Message<string, string> { Value = "msg2" });

    producer.CommitTransaction();
}
catch (Exception)
{
    producer.AbortTransaction();
    throw;
}

Consumer

Basic Consumer

using Confluent.Kafka;

var config = new ConsumerConfig
{
    BootstrapServers = "localhost:9092",
    GroupId = "my-group",
    AutoOffsetReset = AutoOffsetReset.Earliest
};

using var consumer = new ConsumerBuilder<string, string>(config).Build();

consumer.Subscribe("events");

var cts = new CancellationTokenSource();

try
{
    while (true)
    {
        var result = consumer.Consume(cts.Token);
        Console.WriteLine($"Received: {result.Message.Value}");
    }
}
catch (OperationCanceledException)
{
    consumer.Close();
}

Manual Commit

var config = new ConsumerConfig
{
    BootstrapServers = "localhost:9092",
    GroupId = "my-group",
    EnableAutoCommit = false,
    AutoOffsetReset = AutoOffsetReset.Earliest
};

using var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe("events");

while (true)
{
    var result = consumer.Consume(TimeSpan.FromMilliseconds(100));

    if (result == null) continue;

    // Process message
    ProcessMessage(result.Message);

    // Commit offset
    consumer.Commit(result);
}

Batch Processing

var batch = new List<ConsumeResult<string, string>>();

while (true)
{
    var result = consumer.Consume(TimeSpan.FromMilliseconds(100));

    if (result != null)
    {
        batch.Add(result);
    }

    if (batch.Count >= 100 || (batch.Count > 0 && result == null))
    {
        ProcessBatch(batch);
        consumer.Commit(batch.Last());
        batch.Clear();
    }
}

Admin Client

Create Topic

using Confluent.Kafka.Admin;

var config = new AdminClientConfig
{
    BootstrapServers = "localhost:9092"
};

using var adminClient = new AdminClientBuilder(config).Build();

await adminClient.CreateTopicsAsync(new[]
{
    new TopicSpecification
    {
        Name = "my-topic",
        NumPartitions = 3,
        ReplicationFactor = 3
    }
});

List Topics

var metadata = adminClient.GetMetadata(TimeSpan.FromSeconds(10));

foreach (var topic in metadata.Topics)
{
    Console.WriteLine($"Topic: {topic.Topic}, Partitions: {topic.Partitions.Count}");
}

Configuration Reference

Producer

Property Description Default
Acks Acknowledgments Leader
MessageSendMaxRetries Retry count 2147483647
BatchSize Batch bytes 16384
LingerMs Batch delay 5
EnableIdempotence Idempotent false
CompressionType Compression None

Consumer

Property Description Default
GroupId Consumer group Required
AutoOffsetReset Reset behavior Latest
EnableAutoCommit Auto commit true
SessionTimeoutMs Session timeout 45000
MaxPollIntervalMs Max poll interval 300000

Serialization

JSON Serialization

using System.Text.Json;

public class JsonSerializer<T> : ISerializer<T>
{
    public byte[] Serialize(T data, SerializationContext context)
    {
        return JsonSerializer.SerializeToUtf8Bytes(data);
    }
}

public class JsonDeserializer<T> : IDeserializer<T>
{
    public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
    {
        return JsonSerializer.Deserialize<T>(data);
    }
}

// Usage
var producer = new ProducerBuilder<string, MyEvent>(config)
    .SetValueSerializer(new JsonSerializer<MyEvent>())
    .Build();

Avro Serialization

using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;

var schemaRegistryConfig = new SchemaRegistryConfig
{
    Url = "http://localhost:8081"
};

var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);

var producer = new ProducerBuilder<string, MyEvent>(config)
    .SetValueSerializer(new AvroSerializer<MyEvent>(schemaRegistry))
    .Build();

Error Handling

var producer = new ProducerBuilder<string, string>(config)
    .SetErrorHandler((_, error) =>
    {
        if (error.IsFatal)
        {
            Console.WriteLine($"Fatal error: {error.Reason}");
            // Shut down
        }
        else
        {
            Console.WriteLine($"Error: {error.Reason}");
        }
    })
    .Build();

try
{
    var result = await producer.ProduceAsync("events",
        new Message<string, string> { Value = "message" });
}
catch (ProduceException<string, string> ex)
{
    Console.WriteLine($"Produce failed: {ex.Error.Reason}");

    if (ex.Error.Code == ErrorCode.Local_MsgTimedOut)
    {
        // Retry logic
    }
}

Graceful Shutdown

var cts = new CancellationTokenSource();

Console.CancelKeyPress += (_, e) =>
{
    e.Cancel = true;
    cts.Cancel();
};

AppDomain.CurrentDomain.ProcessExit += (_, _) =>
{
    cts.Cancel();
};

using var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe("events");

try
{
    while (!cts.Token.IsCancellationRequested)
    {
        var result = consumer.Consume(cts.Token);
        ProcessMessage(result.Message);
    }
}
catch (OperationCanceledException)
{
    // Expected during shutdown
}
finally
{
    consumer.Close();
}

Dependency Injection

// Program.cs
builder.Services.AddSingleton<IProducer<string, string>>(sp =>
{
    var config = new ProducerConfig
    {
        BootstrapServers = "localhost:9092"
    };
    return new ProducerBuilder<string, string>(config).Build();
});

// Service
public class EventService
{
    private readonly IProducer<string, string> _producer;

    public EventService(IProducer<string, string> producer)
    {
        _producer = producer;
    }

    public async Task PublishAsync(string key, string value)
    {
        await _producer.ProduceAsync("events",
            new Message<string, string> { Key = key, Value = value });
    }
}