Kafka Java Client¶
The official Apache Kafka Java client provides native protocol support with full feature coverage. This is the reference implementation for Kafka client behavior.
Client Information¶
| Library | org.apache.kafka:kafka-clients |
| Repository | github.com/apache/kafka |
| Documentation | kafka.apache.org/documentation |
| Package | Maven Central |
| Current Version | 4.1.x (as of 2025) |
| Maintainer | Apache Software Foundation |
| License | Apache License 2.0 |
History¶
The Java client is the reference implementation, developed as part of the core Apache Kafka project. Originally, Kafka included a Scala-based client (the "old consumer"). In Kafka 0.8.0 (December 2013), a new Java Producer was introduced. The new Java Consumer followed in Kafka 0.9.0 (November 2015), marking the deprecation of the Scala clients. The Java client supports all Kafka features immediately upon release, including Kafka Streams (introduced in 0.10.0), exactly-once semantics (0.11.0), and all subsequent protocol enhancements.
Version Compatibility¶
| Client Version | Minimum Kafka Broker | Recommended Broker |
|---|---|---|
| 4.1.x | 0.10.0+ | 3.0+ |
| 4.0.x | 0.10.0+ | 3.0+ |
| 3.9.x | 0.10.0+ | 3.0+ |
| 3.7.x | 0.10.0+ | 3.0+ |
External Resources¶
Installation¶
Maven¶
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
Gradle¶
implementation 'org.apache.kafka:kafka-clients:3.7.0'
With Schema Registry¶
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.5.0</version>
</dependency>
<!-- Confluent Maven repository -->
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
Producer¶
Basic Producer¶
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class SimpleProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record = new ProducerRecord<>(
"orders", // topic
"order-123", // key
"{\"id\": 123}" // value
);
// Asynchronous send with callback
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Send failed: " + exception.getMessage());
} else {
System.out.printf("Sent to partition %d offset %d%n",
metadata.partition(), metadata.offset());
}
});
}
}
}
Production Producer Configuration¶
Properties props = new Properties();
// Connection
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "order-service-producer");
// Serialization
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// Durability
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// Retries
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// Batching
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864);
// Compression
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
Synchronous Send¶
try {
RecordMetadata metadata = producer.send(record).get();
System.out.printf("Sent to %s-%d@%d%n",
metadata.topic(), metadata.partition(), metadata.offset());
} catch (ExecutionException e) {
if (e.getCause() instanceof RetriableException) {
// Retry logic
} else {
// Non-retriable error
throw e;
}
}
Send with Headers¶
ProducerRecord<String, String> record = new ProducerRecord<>("orders", "key", "value");
record.headers()
.add("correlation-id", "abc-123".getBytes(StandardCharsets.UTF_8))
.add("source", "order-service".getBytes(StandardCharsets.UTF_8));
producer.send(record);
Custom Partitioner¶
public class OrderPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (key == null) {
// Round-robin for null keys
return ThreadLocalRandom.current().nextInt(numPartitions);
}
// Custom logic: route by region prefix
String keyStr = (String) key;
if (keyStr.startsWith("US-")) {
return 0;
} else if (keyStr.startsWith("EU-")) {
return 1;
}
// Default: hash-based
return Math.abs(key.hashCode()) % numPartitions;
}
@Override
public void close() {}
@Override
public void configure(Map<String, ?> configs) {}
}
// Usage
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, OrderPartitioner.class.getName());
Consumer¶
Basic Consumer¶
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processors");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Arrays.asList("orders"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Received: key=%s value=%s partition=%d offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
}
}
}
Production Consumer Configuration¶
Properties props = new Properties();
// Connection
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1:9092,kafka-2:9092,kafka-3:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-processors");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "order-processor-1");
// Deserialization
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// Offset management
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Session management
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 45000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 15000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
// Fetch configuration
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
// Assignment strategy
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
Manual Offset Commit¶
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Arrays.asList("orders"));
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// Synchronous commit after processing batch
consumer.commitSync();
}
}
Per-Partition Commit¶
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
processRecord(record);
}
// Commit this partition's offset
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Map.of(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
Rebalance Listener¶
consumer.subscribe(Arrays.asList("orders"), new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Partitions revoked: " + partitions);
// Commit pending offsets before rebalance
consumer.commitSync();
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Partitions assigned: " + partitions);
// Initialize state for new partitions
}
@Override
public void onPartitionsLost(Collection<TopicPartition> partitions) {
System.out.println("Partitions lost: " + partitions);
// Handle unexpected partition loss (cooperative rebalancing)
}
});
Graceful Shutdown¶
public class GracefulConsumer {
private final AtomicBoolean running = new AtomicBoolean(true);
private KafkaConsumer<String, String> consumer;
public void consume() {
try {
consumer.subscribe(Arrays.asList("orders"));
while (running.get()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
processRecords(records);
consumer.commitSync();
}
} catch (WakeupException e) {
if (running.get()) throw e;
} finally {
consumer.close();
}
}
public void shutdown() {
running.set(false);
consumer.wakeup();
}
}
// Register shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(consumer::shutdown));
Admin Client¶
Create Topics¶
import org.apache.kafka.clients.admin.*;
import java.util.Collections;
import java.util.Properties;
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
try (AdminClient admin = AdminClient.create(props)) {
NewTopic topic = new NewTopic("orders", 6, (short) 3)
.configs(Map.of(
"retention.ms", "604800000",
"cleanup.policy", "delete"
));
CreateTopicsResult result = admin.createTopics(Collections.singleton(topic));
result.all().get(); // Wait for completion
}
Describe Topics¶
try (AdminClient admin = AdminClient.create(props)) {
DescribeTopicsResult result = admin.describeTopics(Arrays.asList("orders"));
Map<String, TopicDescription> descriptions = result.allTopicNames().get();
for (TopicDescription desc : descriptions.values()) {
System.out.println("Topic: " + desc.name());
for (TopicPartitionInfo partition : desc.partitions()) {
System.out.printf(" Partition %d: leader=%d replicas=%s isr=%s%n",
partition.partition(),
partition.leader().id(),
partition.replicas(),
partition.isr());
}
}
}
List Consumer Groups¶
try (AdminClient admin = AdminClient.create(props)) {
ListConsumerGroupsResult result = admin.listConsumerGroups();
Collection<ConsumerGroupListing> groups = result.all().get();
for (ConsumerGroupListing group : groups) {
System.out.println("Group: " + group.groupId());
}
}
Transactions¶
Transactional Producer¶
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-txn");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("orders", "key1", "value1"));
producer.send(new ProducerRecord<>("audit", "key1", "order created"));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
// Fatal errors - close producer
producer.close();
} catch (KafkaException e) {
// Abort and retry
producer.abortTransaction();
}
Consume-Transform-Produce¶
// Consumer configuration
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "processor");
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
// Producer configuration
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
producerProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "processor-txn");
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);
KafkaProducer<String, String> producer = new KafkaProducer<>(producerProps);
producer.initTransactions();
consumer.subscribe(Arrays.asList("input"));
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (!records.isEmpty()) {
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
String transformed = transform(record.value());
producer.send(new ProducerRecord<>("output", record.key(), transformed));
}
// Commit offsets within transaction
producer.sendOffsetsToTransaction(
getOffsetsToCommit(records),
consumer.groupMetadata()
);
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
}
}
}
Error Handling¶
Producer Error Handling¶
producer.send(record, (metadata, exception) -> {
if (exception == null) {
// Success
return;
}
if (exception instanceof RetriableException) {
// Network issues, leader election - producer will retry automatically
log.warn("Retriable error, producer will retry: {}", exception.getMessage());
} else if (exception instanceof SerializationException) {
// Bad data - skip or dead letter queue
sendToDeadLetterQueue(record, exception);
} else if (exception instanceof AuthorizationException) {
// Permission issue - requires intervention
log.error("Authorization failed", exception);
shutdown();
} else {
// Unknown error
log.error("Send failed", exception);
}
});
Consumer Error Handling¶
while (running) {
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
try {
processRecord(record);
} catch (ProcessingException e) {
handleProcessingError(record, e);
}
}
consumer.commitSync();
} catch (WakeupException e) {
if (running.get()) throw e;
} catch (SerializationException e) {
// Poison pill - skip offset
log.error("Deserialization error", e);
skipBadRecord();
} catch (AuthorizationException e) {
log.error("Authorization error", e);
shutdown();
}
}
Testing¶
Embedded Kafka (Spring)¶
@EmbeddedKafka(partitions = 3, topics = {"orders"})
class KafkaIntegrationTest {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
void testProduceConsume() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
embeddedKafka.getBrokersAsString());
// ... test implementation
}
}
Testcontainers¶
@Testcontainers
class KafkaContainerTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.0")
);
@Test
void testWithRealKafka() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
producer.send(new ProducerRecord<>("test", "key", "value")).get();
}
}
}
MockProducer¶
@Test
void testProducerLogic() {
MockProducer<String, String> mockProducer = new MockProducer<>(
true, // autoComplete
new StringSerializer(),
new StringSerializer()
);
MyService service = new MyService(mockProducer);
service.processOrder(new Order("123"));
List<ProducerRecord<String, String>> records = mockProducer.history();
assertEquals(1, records.size());
assertEquals("orders", records.get(0).topic());
}
Related Documentation¶
- Producer Development - Producer patterns
- Consumer Development - Consumer patterns
- Transactions - Transaction patterns