Kafka Transactional Producer¶
Transactions enable atomic writes to multiple partitions and exactly-once semantics when combined with transactional consumers.
Overview¶
All writes within a transaction succeed or fail atomically.
Configuration¶
Producer¶
# Required for transactions
transactional.id=my-app-instance-1
enable.idempotence=true # Implied
# Transaction timeout
transaction.timeout.ms=60000
Consumer (for EOS)¶
# Read only committed transactions
isolation.level=read_committed
enable.auto.commit=false
Transaction API¶
Basic Pattern¶
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-processor-1");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Initialize once on startup
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("orders", key, value1));
producer.send(new ProducerRecord<>("audit", key, value2));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException e) {
// Fatal - must close
producer.close();
} catch (KafkaException e) {
// Recoverable - abort and retry
producer.abortTransaction();
}
Read-Process-Write Pattern¶
producer.initTransactions();
consumer.subscribe(Collections.singletonList("input"));
while (running) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
if (records.isEmpty()) continue;
producer.beginTransaction();
try {
for (ConsumerRecord<String, String> record : records) {
String output = process(record.value());
producer.send(new ProducerRecord<>("output", record.key(), output));
}
// Commit offsets atomically with output
producer.sendOffsetsToTransaction(
getOffsets(records),
consumer.groupMetadata()
);
producer.commitTransaction();
} catch (KafkaException e) {
producer.abortTransaction();
}
}
Transaction States¶
| State | Description |
|---|---|
Empty |
No active transaction |
Ongoing |
Transaction in progress |
PrepareCommit |
Committing |
PrepareAbort |
Aborting |
Complete |
Finished |
Error Handling¶
| Exception | Severity | Action |
|---|---|---|
ProducerFencedException |
Fatal | Close producer, another instance active |
OutOfOrderSequenceException |
Fatal | Close producer, state corruption |
KafkaException |
Recoverable | Abort transaction, retry |
Zombie Fencing¶
When a producer with the same transactional.id starts, the old producer is fenced:
try {
producer.commitTransaction();
} catch (ProducerFencedException e) {
// Another instance took over
log.error("Fenced by new producer instance");
producer.close();
System.exit(1);
}
Performance Considerations¶
| Aspect | Impact |
|---|---|
| Latency | +10-50ms per transaction commit |
| Throughput | 30-50% lower than non-transactional |
| Complexity | Higher error handling requirements |
Batch multiple records per transaction to amortize overhead.
Related Documentation¶
- Producer Guide - Producer overview
- Idempotence - Idempotent producer
- Exactly-Once Semantics - EOS deep dive
- Transaction APIs - Protocol details