Kafka Java Driver
The official Apache Kafka Java client provides producer, consumer, and admin APIs for JVM applications.
Installation
Maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.0</version>
</dependency>
Gradle
implementation 'org.apache.kafka:kafka-clients:3.6.0'
Producer
Basic Producer
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class BasicProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
try (Producer<String, String> producer = new KafkaProducer<>(props)) {
ProducerRecord<String, String> record =
new ProducerRecord<>("events", "key", "value");
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Sent to partition %d offset %d%n",
metadata.partition(), metadata.offset());
} else {
exception.printStackTrace();
}
});
}
}
}
Reliable Producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
// Reliability settings
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// Performance settings
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
Transactional Producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic1", "key", "value1"));
producer.send(new ProducerRecord<>("topic2", "key", "value2"));
producer.commitTransaction();
} catch (Exception e) {
producer.abortTransaction();
throw e;
} finally {
producer.close();
}
Consumer
Basic Consumer
import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class BasicConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
try (Consumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("events"));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset=%d key=%s value=%s%n",
record.offset(), record.key(), record.value());
}
}
}
}
}
Manual Commit
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("events"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
// Synchronous commit after processing
consumer.commitSync();
}
} finally {
consumer.close();
}
At-Least-Once with Retry
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
boolean processed = false;
int attempts = 0;
while (!processed && attempts < 3) {
try {
processRecord(record);
processed = true;
} catch (Exception e) {
attempts++;
if (attempts >= 3) {
sendToDeadLetterQueue(record);
}
}
}
}
consumer.commitSync();
}
Admin Client
Create Topic
import org.apache.kafka.clients.admin.*;
import java.util.Collections;
import java.util.Properties;
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
try (AdminClient admin = AdminClient.create(props)) {
NewTopic topic = new NewTopic("my-topic", 3, (short) 3);
admin.createTopics(Collections.singleton(topic)).all().get();
}
Describe Topics
DescribeTopicsResult result = admin.describeTopics(Collections.singletonList("my-topic"));
TopicDescription description = result.values().get("my-topic").get();
System.out.println("Partitions: " + description.partitions().size());
for (TopicPartitionInfo partition : description.partitions()) {
System.out.printf("Partition %d: leader=%d replicas=%s%n",
partition.partition(),
partition.leader().id(),
partition.replicas());
}
Configuration Reference
Producer
| Property |
Description |
Default |
acks |
Acknowledgment level |
1 |
retries |
Retry count |
2147483647 |
batch.size |
Batch size bytes |
16384 |
linger.ms |
Batch delay |
0 |
buffer.memory |
Buffer memory |
33554432 |
enable.idempotence |
Idempotent producer |
false |
transactional.id |
Transaction ID |
null |
Consumer
| Property |
Description |
Default |
group.id |
Consumer group |
Required |
auto.offset.reset |
Reset behavior |
latest |
enable.auto.commit |
Auto commit |
true |
auto.commit.interval.ms |
Commit interval |
5000 |
max.poll.records |
Max records per poll |
500 |
session.timeout.ms |
Session timeout |
45000 |
heartbeat.interval.ms |
Heartbeat interval |
3000 |
Serialization
JSON with Jackson
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;
public class JsonSerializer<T> implements Serializer<T> {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public byte[] serialize(String topic, T data) {
try {
return mapper.writeValueAsBytes(data);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Avro with Schema Registry
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.5.0</version>
</dependency>
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://localhost:8081");
Error Handling
Producer Errors
producer.send(record, (metadata, exception) -> {
if (exception != null) {
if (exception instanceof RetriableException) {
// Will be retried automatically
log.warn("Retriable error", exception);
} else {
// Fatal error
log.error("Fatal error", exception);
// Handle: dead letter queue, alerting, etc.
}
}
});
Consumer Errors
try {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// process records
} catch (WakeupException e) {
// Shutdown signal
} catch (AuthorizationException e) {
// Fatal - exit
throw e;
} catch (KafkaException e) {
// Log and continue
log.error("Kafka error", e);
}