Skip to content

Schema Formats

Schema Registry supports multiple serialization formats for data contracts in Kafka topics.


Format Comparison

Feature Avro Protobuf JSON Schema
Binary encoding Yes Yes No
Schema evolution Excellent Good Limited
Human readable No No Yes
Code generation Yes Yes Optional
Default values Yes Yes Yes
Backward compatibility Native Native Manual
Size efficiency High Highest Low

Avro

Apache Avro provides compact binary serialization with strong schema evolution support.

Schema Definition

{
  "type": "record",
  "name": "User",
  "namespace": "com.example.events",
  "fields": [
    {
      "name": "id",
      "type": "string",
      "doc": "Unique user identifier"
    },
    {
      "name": "email",
      "type": "string"
    },
    {
      "name": "created_at",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "status",
      "type": {
        "type": "enum",
        "name": "UserStatus",
        "symbols": ["ACTIVE", "INACTIVE", "PENDING"]
      },
      "default": "PENDING"
    },
    {
      "name": "metadata",
      "type": ["null", {
        "type": "map",
        "values": "string"
      }],
      "default": null
    }
  ]
}

Avro Types

Type Description Example
null No value null
boolean True/false true
int 32-bit signed 42
long 64-bit signed 1234567890
float 32-bit IEEE 754 3.14
double 64-bit IEEE 754 3.14159265
bytes Byte sequence Binary data
string UTF-8 string "hello"
array Ordered collection [1, 2, 3]
map Key-value pairs {"key": "value"}
record Named fields Complex type
enum Enumeration "ACTIVE"
fixed Fixed-size bytes UUID, hash
union Type alternatives ["null", "string"]

Logical Types

{
  "type": "record",
  "name": "Transaction",
  "fields": [
    {
      "name": "amount",
      "type": {
        "type": "bytes",
        "logicalType": "decimal",
        "precision": 10,
        "scale": 2
      }
    },
    {
      "name": "transaction_date",
      "type": {
        "type": "int",
        "logicalType": "date"
      }
    },
    {
      "name": "transaction_time",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    },
    {
      "name": "transaction_id",
      "type": {
        "type": "fixed",
        "name": "uuid",
        "size": 16,
        "logicalType": "uuid"
      }
    }
  ]
}
Logical Type Underlying Type Description
decimal bytes/fixed Arbitrary precision decimal
date int Days from Unix epoch
time-millis int Milliseconds from midnight
time-micros long Microseconds from midnight
timestamp-millis long Milliseconds from epoch
timestamp-micros long Microseconds from epoch
uuid string/fixed UUID string or 16 bytes

Java Producer with Avro

Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", "http://schema-registry:8081");

KafkaProducer<String, User> producer = new KafkaProducer<>(props);

User user = User.newBuilder()
    .setId("user-123")
    .setEmail("[email protected]")
    .setCreatedAt(System.currentTimeMillis())
    .setStatus(UserStatus.ACTIVE)
    .build();

producer.send(new ProducerRecord<>("users", user.getId(), user));

Python Consumer with Avro

from confluent_kafka import Consumer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroDeserializer

schema_registry_client = SchemaRegistryClient({'url': 'http://schema-registry:8081'})
avro_deserializer = AvroDeserializer(schema_registry_client)

consumer = Consumer({
    'bootstrap.servers': 'kafka:9092',
    'group.id': 'my-consumer-group',
    'value.deserializer': avro_deserializer
})

consumer.subscribe(['users'])

while True:
    msg = consumer.poll(1.0)
    if msg is not None:
        user = msg.value()
        print(f"User: {user['id']}, Email: {user['email']}")

Protobuf

Protocol Buffers provide efficient binary serialization with strong typing and code generation.

Schema Definition

syntax = "proto3";

package com.example.events;

import "google/protobuf/timestamp.proto";

message User {
  string id = 1;
  string email = 2;
  google.protobuf.Timestamp created_at = 3;
  UserStatus status = 4;
  map<string, string> metadata = 5;

  enum UserStatus {
    USER_STATUS_UNSPECIFIED = 0;
    USER_STATUS_ACTIVE = 1;
    USER_STATUS_INACTIVE = 2;
    USER_STATUS_PENDING = 3;
  }
}

message UserEvent {
  string event_id = 1;
  EventType event_type = 2;
  User user = 3;
  google.protobuf.Timestamp event_time = 4;

  enum EventType {
    EVENT_TYPE_UNSPECIFIED = 0;
    EVENT_TYPE_CREATED = 1;
    EVENT_TYPE_UPDATED = 2;
    EVENT_TYPE_DELETED = 3;
  }
}

Protobuf Types

Type Description Default Value
double 64-bit float 0.0
float 32-bit float 0.0
int32 Variable-length signed 0
int64 Variable-length signed 0
uint32 Variable-length unsigned 0
uint64 Variable-length unsigned 0
sint32 ZigZag encoded signed 0
sint64 ZigZag encoded signed 0
fixed32 Fixed 4 bytes unsigned 0
fixed64 Fixed 8 bytes unsigned 0
sfixed32 Fixed 4 bytes signed 0
sfixed64 Fixed 8 bytes signed 0
bool Boolean false
string UTF-8 string ""
bytes Byte sequence empty

Java Producer with Protobuf

Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer");
props.put("schema.registry.url", "http://schema-registry:8081");

KafkaProducer<String, User> producer = new KafkaProducer<>(props);

User user = User.newBuilder()
    .setId("user-123")
    .setEmail("[email protected]")
    .setCreatedAt(Timestamps.fromMillis(System.currentTimeMillis()))
    .setStatus(UserStatus.USER_STATUS_ACTIVE)
    .build();

producer.send(new ProducerRecord<>("users", user.getId(), user));

JSON Schema

JSON Schema provides human-readable validation with broad tooling support.

Schema Definition

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "$id": "com.example.events.User",
  "type": "object",
  "title": "User",
  "required": ["id", "email"],
  "properties": {
    "id": {
      "type": "string",
      "description": "Unique user identifier"
    },
    "email": {
      "type": "string",
      "format": "email"
    },
    "created_at": {
      "type": "string",
      "format": "date-time"
    },
    "status": {
      "type": "string",
      "enum": ["ACTIVE", "INACTIVE", "PENDING"],
      "default": "PENDING"
    },
    "age": {
      "type": "integer",
      "minimum": 0,
      "maximum": 150
    },
    "tags": {
      "type": "array",
      "items": {
        "type": "string"
      }
    },
    "metadata": {
      "type": "object",
      "additionalProperties": {
        "type": "string"
      }
    }
  }
}

JSON Schema Types

Type Description Validation Keywords
string Text minLength, maxLength, pattern, format
number Floating point minimum, maximum, multipleOf
integer Whole number minimum, maximum, multipleOf
boolean True/false -
object Key-value properties, required, additionalProperties
array Ordered list items, minItems, maxItems, uniqueItems
null No value -

Format Validation

Format Description
date-time ISO 8601 date-time
date ISO 8601 date
time ISO 8601 time
email Email address
hostname Hostname
ipv4 IPv4 address
ipv6 IPv6 address
uri URI
uuid UUID

Java Producer with JSON Schema

Properties props = new Properties();
props.put("bootstrap.servers", "kafka:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer");
props.put("schema.registry.url", "http://schema-registry:8081");
props.put("json.fail.invalid.schema", "true");

KafkaProducer<String, User> producer = new KafkaProducer<>(props);

User user = new User();
user.setId("user-123");
user.setEmail("[email protected]");
user.setStatus("ACTIVE");

producer.send(new ProducerRecord<>("users", user.getId(), user));

Schema Registry API

Register Schema

# Register Avro schema
curl -X POST http://schema-registry:8081/subjects/users-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schemaType": "AVRO",
    "schema": "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"email\",\"type\":\"string\"}]}"
  }'

# Register Protobuf schema
curl -X POST http://schema-registry:8081/subjects/users-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schemaType": "PROTOBUF",
    "schema": "syntax = \"proto3\"; message User { string id = 1; string email = 2; }"
  }'

# Register JSON Schema
curl -X POST http://schema-registry:8081/subjects/users-value/versions \
  -H "Content-Type: application/vnd.schemaregistry.v1+json" \
  -d '{
    "schemaType": "JSON",
    "schema": "{\"type\":\"object\",\"properties\":{\"id\":{\"type\":\"string\"},\"email\":{\"type\":\"string\"}}}"
  }'

Get Schema

# Get latest schema
curl http://schema-registry:8081/subjects/users-value/versions/latest

# Get specific version
curl http://schema-registry:8081/subjects/users-value/versions/1

# Get schema by ID
curl http://schema-registry:8081/schemas/ids/1

List Subjects

curl http://schema-registry:8081/subjects

Serializer Configuration

Avro Serializer

Property Description Default
schema.registry.url Registry URL Required
auto.register.schemas Auto-register schemas true
use.latest.version Use latest schema version false
avro.use.logical.type.converters Use logical type converters false

Protobuf Serializer

Property Description Default
schema.registry.url Registry URL Required
auto.register.schemas Auto-register schemas true
reference.subject.name.strategy Reference naming strategy -

JSON Schema Serializer

Property Description Default
schema.registry.url Registry URL Required
auto.register.schemas Auto-register schemas true
json.fail.invalid.schema Fail on invalid schema false
json.oneof.for.nullables Use oneOf for nullable false