Kafka Testing Guide
Testing Kafka applications requires strategies for unit testing business logic, integration testing with Kafka, and end-to-end testing of complete flows. This guide covers testing approaches, tools, and best practices.
Testing Pyramid

| Level |
Scope |
Speed |
Kafka |
| Unit |
Business logic |
Fast |
Mocked |
| Integration |
Kafka interaction |
Medium |
Embedded/Container |
| E2E |
Complete system |
Slow |
Real cluster |
Unit Testing
Mock Producer
public class MockProducerExample {
@Test
void testOrderProducer() {
// Create mock producer
MockProducer<String, Order> mockProducer = new MockProducer<>(
true, // autoComplete
new StringSerializer(),
new JsonSerializer<Order>()
);
// Create service with mock
OrderService orderService = new OrderService(mockProducer);
// Execute
Order order = new Order("order-123", "customer-456", 99.99);
orderService.publishOrder(order);
// Verify
List<ProducerRecord<String, Order>> records = mockProducer.history();
assertEquals(1, records.size());
assertEquals("orders", records.get(0).topic());
assertEquals("order-123", records.get(0).key());
assertEquals(order, records.get(0).value());
}
@Test
void testProducerWithCallback() {
MockProducer<String, String> mockProducer = new MockProducer<>(
false, // Manual completion
new StringSerializer(),
new StringSerializer()
);
AtomicBoolean callbackCalled = new AtomicBoolean(false);
mockProducer.send(
new ProducerRecord<>("topic", "key", "value"),
(metadata, exception) -> callbackCalled.set(true)
);
// Complete the send
mockProducer.completeNext();
assertTrue(callbackCalled.get());
}
@Test
void testProducerFailure() {
MockProducer<String, String> mockProducer = new MockProducer<>(
false,
new StringSerializer(),
new StringSerializer()
);
AtomicReference<Exception> caughtException = new AtomicReference<>();
mockProducer.send(
new ProducerRecord<>("topic", "key", "value"),
(metadata, exception) -> caughtException.set(exception)
);
// Simulate failure
mockProducer.errorNext(new RuntimeException("Broker unavailable"));
assertNotNull(caughtException.get());
assertEquals("Broker unavailable", caughtException.get().getMessage());
}
}
Mock Consumer
public class MockConsumerExample {
@Test
void testOrderConsumer() {
// Create mock consumer
MockConsumer<String, Order> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
// Create service with mock
OrderProcessor processor = new OrderProcessor(mockConsumer);
// Setup topic partition
TopicPartition tp = new TopicPartition("orders", 0);
mockConsumer.assign(List.of(tp));
mockConsumer.updateBeginningOffsets(Map.of(tp, 0L));
// Add test records
Order order = new Order("order-123", "customer-456", 99.99);
mockConsumer.addRecord(new ConsumerRecord<>("orders", 0, 0, "order-123", order));
// Process
processor.processNextBatch();
// Verify processing occurred
verify(orderRepository).save(order);
}
@Test
void testConsumerRebalance() {
MockConsumer<String, String> mockConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
List<TopicPartition> assigned = new ArrayList<>();
List<TopicPartition> revoked = new ArrayList<>();
mockConsumer.subscribe(List.of("topic"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
revoked.addAll(partitions);
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
assigned.addAll(partitions);
}
});
// Trigger rebalance
TopicPartition tp = new TopicPartition("topic", 0);
mockConsumer.rebalance(List.of(tp));
mockConsumer.updateBeginningOffsets(Map.of(tp, 0L));
assertEquals(1, assigned.size());
assertEquals(tp, assigned.get(0));
}
}
Testing Business Logic in Isolation
public class OrderProcessingServiceTest {
@Mock
private KafkaTemplate<String, OrderEvent> kafkaTemplate;
@Mock
private OrderRepository orderRepository;
@InjectMocks
private OrderProcessingService service;
@Test
void shouldProcessOrderSuccessfully() {
// Given
Order order = new Order("order-123", "customer-456", 99.99);
when(kafkaTemplate.send(anyString(), anyString(), any()))
.thenReturn(CompletableFuture.completedFuture(null));
// When
OrderResult result = service.processOrder(order);
// Then
assertTrue(result.isSuccess());
verify(kafkaTemplate).send("order-events", "order-123", any(OrderCreatedEvent.class));
verify(orderRepository).save(order);
}
@Test
void shouldHandleKafkaFailure() {
// Given
Order order = new Order("order-123", "customer-456", 99.99);
when(kafkaTemplate.send(anyString(), anyString(), any()))
.thenReturn(CompletableFuture.failedFuture(new RuntimeException("Kafka error")));
// When
OrderResult result = service.processOrder(order);
// Then
assertFalse(result.isSuccess());
verify(orderRepository, never()).save(any());
}
}
Integration Testing
Embedded Kafka (Spring)
@SpringBootTest
@EmbeddedKafka(
partitions = 3,
brokerProperties = {
"listeners=PLAINTEXT://localhost:9092",
"auto.create.topics.enable=true"
},
topics = {"orders", "order-events"}
)
public class KafkaIntegrationTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;
@Autowired
private OrderService orderService;
@Test
void testProduceAndConsume() throws Exception {
// Given
Order order = new Order("order-123", "customer-456", 99.99);
// When
kafkaTemplate.send("orders", order.getOrderId(), order).get();
// Then - wait for consumer to process
await().atMost(Duration.ofSeconds(10))
.untilAsserted(() -> {
Order processed = orderService.getOrder("order-123");
assertNotNull(processed);
assertEquals(99.99, processed.getAmount());
});
}
@Test
void testConsumerGroupRebalance() throws Exception {
// Start second consumer
Consumer<String, Order> secondConsumer = createConsumer();
secondConsumer.subscribe(List.of("orders"));
// Send messages
for (int i = 0; i < 100; i++) {
kafkaTemplate.send("orders", "key-" + i, new Order("order-" + i, "cust", 10.0));
}
// Verify distribution
await().atMost(Duration.ofSeconds(30))
.untilAsserted(() -> {
// Both consumers should have received messages
assertTrue(firstConsumerMessageCount.get() > 0);
assertTrue(secondConsumerMessageCount.get() > 0);
});
}
}
Spring Kafka Test Utilities
@SpringBootTest
@EmbeddedKafka
public class ConsumerRecordTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
void testWithConsumerRecords() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(
"test-group", "true", embeddedKafka);
DefaultKafkaConsumerFactory<String, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<String, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "test-topic");
// Produce
Map<String, Object> producerProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaTemplate<String, String> template = new KafkaTemplate<>(
new DefaultKafkaProducerFactory<>(producerProps));
template.send("test-topic", "key", "value");
// Consume and verify
ConsumerRecord<String, String> record =
KafkaTestUtils.getSingleRecord(consumer, "test-topic");
assertEquals("key", record.key());
assertEquals("value", record.value());
consumer.close();
}
}
Testcontainers
Basic Setup
@Testcontainers
public class KafkaTestcontainersTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.0")
);
private Producer<String, String> producer;
private Consumer<String, String> consumer;
@BeforeAll
static void setup() {
kafka.start();
}
@BeforeEach
void createClients() {
producer = new KafkaProducer<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
));
consumer = new KafkaConsumer<>(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
ConsumerConfig.GROUP_ID_CONFIG, "test-group",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class
));
}
@Test
void testProduceConsume() throws Exception {
String topic = "test-topic";
// Create topic
try (AdminClient admin = AdminClient.create(Map.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers()))) {
admin.createTopics(List.of(new NewTopic(topic, 1, (short) 1))).all().get();
}
// Produce
producer.send(new ProducerRecord<>(topic, "key", "value")).get();
// Consume
consumer.subscribe(List.of(topic));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
assertEquals(1, records.count());
assertEquals("value", records.iterator().next().value());
}
@AfterEach
void closeClients() {
producer.close();
consumer.close();
}
}
With Schema Registry
@Testcontainers
public class SchemaRegistryIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.0")
);
@Container
static GenericContainer<?> schemaRegistry = new GenericContainer<>(
DockerImageName.parse("confluentinc/cp-schema-registry:7.5.0")
)
.withExposedPorts(8081)
.withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema-registry")
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS",
"PLAINTEXT://" + kafka.getNetworkAliases().get(0) + ":9092")
.withNetwork(kafka.getNetwork())
.dependsOn(kafka);
@Test
void testAvroProduceConsume() {
String schemaRegistryUrl = "http://" + schemaRegistry.getHost() + ":" +
schemaRegistry.getMappedPort(8081);
// Producer with Avro
Producer<String, Order> producer = new KafkaProducer<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
"schema.registry.url", schemaRegistryUrl,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class
));
Order order = Order.newBuilder()
.setOrderId("order-123")
.setAmount(99.99)
.build();
producer.send(new ProducerRecord<>("orders", order.getOrderId(), order));
producer.flush();
// Consumer with Avro
Consumer<String, Order> consumer = new KafkaConsumer<>(Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers(),
"schema.registry.url", schemaRegistryUrl,
ConsumerConfig.GROUP_ID_CONFIG, "test",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class,
"specific.avro.reader", "true"
));
consumer.subscribe(List.of("orders"));
ConsumerRecords<String, Order> records = consumer.poll(Duration.ofSeconds(10));
assertEquals(1, records.count());
assertEquals("order-123", records.iterator().next().value().getOrderId());
}
}
Reusable Container Module
public class KafkaTestModule {
private static final Network NETWORK = Network.newNetwork();
@Container
public static final KafkaContainer KAFKA = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.0")
).withNetwork(NETWORK);
@Container
public static final GenericContainer<?> SCHEMA_REGISTRY = new GenericContainer<>(
DockerImageName.parse("confluentinc/cp-schema-registry:7.5.0")
)
.withNetwork(NETWORK)
.withExposedPorts(8081)
.dependsOn(KAFKA);
public static String getBootstrapServers() {
return KAFKA.getBootstrapServers();
}
public static String getSchemaRegistryUrl() {
return "http://" + SCHEMA_REGISTRY.getHost() + ":" +
SCHEMA_REGISTRY.getMappedPort(8081);
}
public static void createTopic(String topic, int partitions) {
try (AdminClient admin = AdminClient.create(Map.of(
AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBootstrapServers()))) {
admin.createTopics(List.of(new NewTopic(topic, partitions, (short) 1)))
.all().get(30, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Failed to create topic", e);
}
}
}
Testing Patterns
Consumer Test Pattern
@Test
void testConsumerProcessesMessages() {
// Setup
String topic = "test-orders";
createTopic(topic, 3);
CountDownLatch latch = new CountDownLatch(10);
List<Order> processedOrders = Collections.synchronizedList(new ArrayList<>());
// Start consumer
OrderConsumer orderConsumer = new OrderConsumer(
kafkaConfig,
order -> {
processedOrders.add(order);
latch.countDown();
}
);
orderConsumer.start();
// Produce test messages
try (Producer<String, Order> producer = createProducer()) {
for (int i = 0; i < 10; i++) {
Order order = new Order("order-" + i, "customer", 10.0 * i);
producer.send(new ProducerRecord<>(topic, order.getOrderId(), order));
}
producer.flush();
}
// Wait for processing
assertTrue(latch.await(30, TimeUnit.SECONDS));
assertEquals(10, processedOrders.size());
// Cleanup
orderConsumer.stop();
}
Exactly-Once Testing
@Test
void testExactlyOnceSemantics() throws Exception {
String inputTopic = "input";
String outputTopic = "output";
createTopic(inputTopic, 1);
createTopic(outputTopic, 1);
// Configure transactional producer
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Configure consumer
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
try (Producer<String, String> producer = new KafkaProducer<>(producerProps);
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps)) {
producer.initTransactions();
// Start transaction
producer.beginTransaction();
producer.send(new ProducerRecord<>(outputTopic, "key", "value1"));
producer.send(new ProducerRecord<>(outputTopic, "key", "value2"));
producer.commitTransaction();
// Aborted transaction
producer.beginTransaction();
producer.send(new ProducerRecord<>(outputTopic, "key", "aborted"));
producer.abortTransaction();
// Consumer should only see committed messages
consumer.subscribe(List.of(outputTopic));
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(10));
assertEquals(2, records.count());
// Aborted message not visible
}
}
Error Handling Testing
@Test
void testDeadLetterQueueOnFailure() {
String mainTopic = "orders";
String dlqTopic = "orders.dlq";
createTopic(mainTopic, 1);
createTopic(dlqTopic, 1);
// Configure consumer that fails on specific messages
OrderConsumer consumer = new OrderConsumer(config, order -> {
if (order.getAmount() < 0) {
throw new InvalidOrderException("Negative amount");
}
});
consumer.start();
// Send mix of valid and invalid messages
try (Producer<String, Order> producer = createProducer()) {
producer.send(new ProducerRecord<>(mainTopic, "order-1", new Order("order-1", "cust", 100)));
producer.send(new ProducerRecord<>(mainTopic, "order-2", new Order("order-2", "cust", -50))); // Invalid
producer.send(new ProducerRecord<>(mainTopic, "order-3", new Order("order-3", "cust", 200)));
producer.flush();
}
// Wait for processing
await().atMost(Duration.ofSeconds(30))
.untilAsserted(() -> {
// Valid orders processed
assertEquals(2, processedOrders.size());
// Invalid order in DLQ
try (Consumer<String, Order> dlqConsumer = createDlqConsumer()) {
dlqConsumer.subscribe(List.of(dlqTopic));
ConsumerRecords<String, Order> dlqRecords = dlqConsumer.poll(Duration.ofSeconds(5));
assertEquals(1, dlqRecords.count());
assertEquals("order-2", dlqRecords.iterator().next().value().getOrderId());
}
});
}
Contract Testing
Schema Compatibility Testing
@Test
void testSchemaBackwardCompatibility() throws Exception {
String schemaRegistryUrl = KafkaTestModule.getSchemaRegistryUrl();
SchemaRegistryClient client = new CachedSchemaRegistryClient(schemaRegistryUrl, 100);
// Register initial schema
String subject = "orders-value";
Schema v1 = new Schema.Parser().parse("""
{
"type": "record",
"name": "Order",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "amount", "type": "double"}
]
}
""");
client.register(subject, new AvroSchema(v1));
// Set backward compatibility
client.updateCompatibility(subject, "BACKWARD");
// New schema with optional field (backward compatible)
Schema v2 = new Schema.Parser().parse("""
{
"type": "record",
"name": "Order",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "USD"}
]
}
""");
// Should succeed
assertDoesNotThrow(() -> client.register(subject, new AvroSchema(v2)));
// Breaking change (not backward compatible)
Schema v3Breaking = new Schema.Parser().parse("""
{
"type": "record",
"name": "Order",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "total", "type": "double"}
]
}
""");
// Should fail
assertThrows(RestClientException.class,
() -> client.register(subject, new AvroSchema(v3Breaking)));
}
Best Practices
Test Configuration
| Practice |
Recommendation |
| Isolation |
Each test should use unique topics/groups |
| Cleanup |
Delete topics after tests or use containers |
| Timeouts |
Set reasonable timeouts for async operations |
| Logging |
Enable test-level logging for debugging |
Test Organization
| Practice |
Recommendation |
| Unit tests |
Test business logic with mocks |
| Integration tests |
Test Kafka interaction with containers |
| Contract tests |
Validate schema compatibility |
| E2E tests |
Full system tests (sparingly) |
| Practice |
Recommendation |
| Container reuse |
Use singleton containers across tests |
| Topic reuse |
Avoid creating new topics per test |
| Parallel execution |
Be careful with shared resources |