Kafka Serialization
Kafka transmits messages as byte arrays. Serializers convert objects to bytes for producers, and deserializers convert bytes back to objects for consumers. This guide covers serialization formats, Schema Registry integration, and best practices.
Serialization Overview
Data Flow

| Format |
Schema |
Type Safety |
Size |
Human Readable |
| String |
None |
None |
Large |
Yes |
| JSON |
Optional |
Runtime |
Large |
Yes |
| Avro |
Required |
Compile-time |
Compact |
No |
| Protobuf |
Required |
Compile-time |
Compact |
No |
| Byte Array |
None |
None |
Minimal |
No |
Built-in Serializers
StringSerializer
Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("topic", "key", "value"));
ByteArraySerializer
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Producer<byte[], byte[]> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("topic", keyBytes, valueBytes));
Available Serializers
| Type |
Serializer |
Deserializer |
| String |
StringSerializer |
StringDeserializer |
| Byte Array |
ByteArraySerializer |
ByteArrayDeserializer |
| Integer |
IntegerSerializer |
IntegerDeserializer |
| Long |
LongSerializer |
LongDeserializer |
| Double |
DoubleSerializer |
DoubleDeserializer |
| Float |
FloatSerializer |
FloatDeserializer |
| Short |
ShortSerializer |
ShortDeserializer |
| UUID |
UUIDSerializer |
UUIDDeserializer |
| Void |
VoidSerializer |
VoidDeserializer |
JSON Serialization
Jackson Serializer
public class JsonSerializer<T> implements Serializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public byte[] serialize(String topic, T data) {
if (data == null) return null;
try {
return objectMapper.writeValueAsBytes(data);
} catch (JsonProcessingException e) {
throw new SerializationException("Error serializing JSON message", e);
}
}
}
public class JsonDeserializer<T> implements Deserializer<T> {
private final ObjectMapper objectMapper = new ObjectMapper();
private Class<T> targetType;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
targetType = (Class<T>) configs.get("json.value.type");
}
@Override
public T deserialize(String topic, byte[] data) {
if (data == null) return null;
try {
return objectMapper.readValue(data, targetType);
} catch (IOException e) {
throw new SerializationException("Error deserializing JSON message", e);
}
}
}
Usage
// Producer
props.put("value.serializer", JsonSerializer.class);
// Consumer
props.put("value.deserializer", JsonDeserializer.class);
props.put("json.value.type", Order.class);
Spring Kafka JSON Support
@Configuration
public class KafkaConfig {
@Bean
public ProducerFactory<String, Order> producerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public ConsumerFactory<String, Order> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Order.class);
config.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example.domain");
return new DefaultKafkaConsumerFactory<>(config);
}
}
Avro Serialization
Schema Definition
{
"type": "record",
"name": "Order",
"namespace": "com.example.avro",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "USD"},
{"name": "createdAt", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{
"name": "items",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{"name": "productId", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "price", "type": "double"}
]
}
}
}
]
}
Avro with Schema Registry
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("schema.registry.url", "http://schema-registry:8081");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", KafkaAvroSerializer.class);
Producer<String, Order> producer = new KafkaProducer<>(props);
Order order = Order.newBuilder()
.setOrderId("order-123")
.setCustomerId("customer-456")
.setAmount(99.99)
.setCurrency("USD")
.setCreatedAt(System.currentTimeMillis())
.setItems(List.of(
OrderItem.newBuilder()
.setProductId("product-789")
.setQuantity(2)
.setPrice(49.99)
.build()
))
.build();
producer.send(new ProducerRecord<>("orders", order.getOrderId(), order));
Consumer with Avro
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("schema.registry.url", "http://schema-registry:8081");
props.put("group.id", "order-consumers");
props.put("key.deserializer", StringDeserializer.class);
props.put("value.deserializer", KafkaAvroDeserializer.class);
props.put("specific.avro.reader", "true"); // Use generated classes
Consumer<String, Order> consumer = new KafkaConsumer<>(props);
Generic Avro Records
For dynamic schemas:
props.put("specific.avro.reader", "false"); // Use GenericRecord
Consumer<String, GenericRecord> consumer = new KafkaConsumer<>(props);
consumer.subscribe(List.of("orders"));
while (running) {
ConsumerRecords<String, GenericRecord> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, GenericRecord> record : records) {
GenericRecord order = record.value();
String orderId = order.get("orderId").toString();
double amount = (double) order.get("amount");
}
}
Protobuf Serialization
Proto Definition
syntax = "proto3";
package com.example.proto;
option java_package = "com.example.proto";
option java_outer_classname = "OrderProtos";
message Order {
string order_id = 1;
string customer_id = 2;
double amount = 3;
string currency = 4;
int64 created_at = 5;
repeated OrderItem items = 6;
}
message OrderItem {
string product_id = 1;
int32 quantity = 2;
double price = 3;
}
Protobuf with Schema Registry
Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("schema.registry.url", "http://schema-registry:8081");
props.put("key.serializer", StringSerializer.class);
props.put("value.serializer", KafkaProtobufSerializer.class);
Producer<String, Order> producer = new KafkaProducer<>(props);
Order order = Order.newBuilder()
.setOrderId("order-123")
.setCustomerId("customer-456")
.setAmount(99.99)
.setCurrency("USD")
.setCreatedAt(System.currentTimeMillis())
.addItems(OrderItem.newBuilder()
.setProductId("product-789")
.setQuantity(2)
.setPrice(49.99)
.build())
.build();
producer.send(new ProducerRecord<>("orders", order.getOrderId(), order));
Schema Registry Integration
Schema Registry Architecture

Configuration
// Producer
props.put("schema.registry.url", "http://schema-registry:8081");
props.put("auto.register.schemas", "true"); // Automatic schema registration
props.put("value.subject.name.strategy",
"io.confluent.kafka.serializers.subject.TopicRecordNameStrategy");
// Consumer
props.put("schema.registry.url", "http://schema-registry:8081");
props.put("specific.avro.reader", "true");
Subject Naming Strategies
| Strategy |
Subject Name |
Use Case |
TopicNameStrategy |
{topic}-value |
One schema per topic |
RecordNameStrategy |
{record.name} |
Schema per record type |
TopicRecordNameStrategy |
{topic}-{record.name} |
Multiple schemas per topic |
Schema Evolution
Compatibility Types
| Type |
Allowed Changes |
| BACKWARD |
Delete fields, add optional fields |
| FORWARD |
Add fields, delete optional fields |
| FULL |
Add optional fields, delete optional fields |
| NONE |
Any change allowed |
Backward Compatible Evolution
// Version 1
{
"type": "record",
"name": "Order",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "amount", "type": "double"}
]
}
// Version 2 - Add optional field (BACKWARD compatible)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "currency", "type": "string", "default": "USD"}
]
}
Forward Compatible Evolution
// Version 1
{
"type": "record",
"name": "Order",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "notes", "type": ["null", "string"], "default": null}
]
}
// Version 2 - Remove optional field (FORWARD compatible)
{
"type": "record",
"name": "Order",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "amount", "type": "double"}
]
}
Setting Compatibility
# Set compatibility for a subject
curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" \
--data '{"compatibility": "BACKWARD"}' \
http://schema-registry:8081/config/orders-value
Custom Serializers
Implementing a Custom Serializer
public class OrderSerializer implements Serializer<Order> {
private final ObjectMapper mapper = new ObjectMapper();
private boolean isKey;
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.isKey = isKey;
// Apply any configuration
}
@Override
public byte[] serialize(String topic, Order order) {
if (order == null) return null;
try {
return mapper.writeValueAsBytes(order);
} catch (JsonProcessingException e) {
throw new SerializationException("Error serializing Order", e);
}
}
@Override
public byte[] serialize(String topic, Headers headers, Order order) {
// Add metadata to headers if needed
headers.add("serializer.version", "1".getBytes());
return serialize(topic, order);
}
@Override
public void close() {
// Cleanup resources if needed
}
}
Topic-Specific Serialization
public class DelegatingSerializer implements Serializer<Object> {
private final Map<String, Serializer<?>> serializers = new HashMap<>();
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
// Configure serializers per topic
serializers.put("orders", new OrderSerializer());
serializers.put("payments", new PaymentSerializer());
serializers.put("events", new JsonSerializer<>());
}
@Override
public byte[] serialize(String topic, Object data) {
Serializer serializer = serializers.getOrDefault(topic, new JsonSerializer<>());
return serializer.serialize(topic, data);
}
}
Error Handling
Deserialization Errors
// Use ErrorHandlingDeserializer to catch errors
props.put("value.deserializer", ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_FUNCTION, MyFailedDeserializationHandler.class);
public class MyFailedDeserializationHandler implements Function<FailedDeserializationInfo, Order> {
@Override
public Order apply(FailedDeserializationInfo info) {
log.error("Failed to deserialize message from topic {}: {}",
info.getTopic(), info.getException().getMessage());
// Return null or a default value
return null;
}
}
Schema Mismatch Handling
try {
Order order = consumer.poll(Duration.ofMillis(100)).iterator().next().value();
} catch (SerializationException e) {
if (e.getCause() instanceof SchemaException) {
// Schema incompatibility
log.error("Schema mismatch: {}", e.getMessage());
// Update consumer or handle gracefully
}
}
Serialization Overhead
| Format |
Serialization Speed |
Message Size |
CPU Usage |
| Byte Array |
Fastest |
Smallest |
Lowest |
| Protobuf |
Fast |
Small |
Low |
| Avro |
Fast |
Small |
Low |
| JSON |
Slow |
Large |
High |
Optimization Tips
// Reuse ObjectMapper (thread-safe)
private static final ObjectMapper MAPPER = new ObjectMapper();
// Use byte array buffer pool
props.put("buffer.memory", 67108864); // 64MB
// Enable compression for large messages
props.put("compression.type", "lz4");
Best Practices
| Scenario |
Recommended Format |
| Strict schema enforcement |
Avro or Protobuf |
| Cross-language compatibility |
Avro or Protobuf |
| Debugging / human readable |
JSON |
| Maximum performance |
Protobuf or custom |
| Flexible schema |
JSON |
Schema Management
| Practice |
Recommendation |
| Version schemas |
Use Schema Registry |
| Enforce compatibility |
Set compatibility mode |
| Test evolution |
Validate changes before deployment |
| Document schemas |
Include field descriptions |
Error Handling
| Practice |
Recommendation |
| Handle null |
Check for null before serialization |
| Catch errors |
Use ErrorHandlingDeserializer |
| Log failures |
Include topic, partition, offset |
| DLQ |
Send undeserializable messages to DLQ |