Skip to content

Kafka Producer Partitioning

Partitioning determines which partition receives each record. The partitioning strategy affects message ordering, load distribution, and consumer parallelism.


Partitioning Overview

uml diagram

Why Partitioning Matters

Aspect Impact
Ordering Records with same key go to same partition (ordered)
Parallelism More partitions enable more consumers
Load balancing Even distribution prevents hot partitions
Locality Related records can be co-located

Default Partitioner

Algorithm

uml diagram

Key-Based Partitioning

When a key is provided, the default partitioner uses murmur2 hashing.

// Same key always goes to same partition
producer.send(new ProducerRecord<>("orders", "user-123", order1));  // Partition X
producer.send(new ProducerRecord<>("orders", "user-123", order2));  // Partition X
producer.send(new ProducerRecord<>("orders", "user-456", order3));  // Partition Y

Hash calculation:

partition = (murmur2(keyBytes) & 0x7fffffff) % numPartitions
Property Guarantee
Deterministic Same key → same partition (given same partition count)
Distribution Generally uniform across partitions
Ordering Records with same key maintain order

Sticky Partitioning (Kafka 2.4+)

For null keys, sticky partitioning improves batching efficiency.

uml diagram

Behavior Round-Robin (Pre-2.4) Sticky (2.4+)
Record distribution Even Bursty per partition
Batch efficiency Poor Excellent
Network utilization Higher Lower
Latency Higher (small batches) Lower (full batches)

Custom Partitioner

Implementation Interface

public interface Partitioner extends Configurable, Closeable {
    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key (or null if no key)
     * @param keyBytes Serialized key (or null)
     * @param value The value (or null if no value)
     * @param valueBytes Serialized value (or null)
     * @param cluster Current cluster metadata
     * @return The partition number
     */
    int partition(String topic, Object key, byte[] keyBytes,
                  Object value, byte[] valueBytes, Cluster cluster);
}

Region-Based Partitioner

public class RegionPartitioner implements Partitioner {
    private Map<String, List<Integer>> regionPartitions;

    @Override
    public void configure(Map<String, ?> configs) {
        // US: partitions 0-2, EU: 3-5, APAC: 6-8
        regionPartitions = Map.of(
            "US", List.of(0, 1, 2),
            "EU", List.of(3, 4, 5),
            "APAC", List.of(6, 7, 8)
        );
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
        Order order = (Order) value;
        String region = order.getRegion();

        List<Integer> partitions = regionPartitions.getOrDefault(
            region, List.of(0, 1, 2, 3, 4, 5, 6, 7, 8));

        // Hash within region's partitions
        int hash = Utils.murmur2(keyBytes);
        int index = (hash & 0x7fffffff) % partitions.size();
        return partitions.get(index);
    }

    @Override
    public void close() {}
}

Priority Partitioner

public class PriorityPartitioner implements Partitioner {
    // High priority: partitions 0-1, Normal: 2-7, Low: 8-9

    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
        Event event = (Event) value;

        int hash = keyBytes != null ? Utils.murmur2(keyBytes) : 0;
        int hashAbs = hash & 0x7fffffff;

        switch (event.getPriority()) {
            case HIGH:
                return hashAbs % 2;        // Partitions 0-1
            case NORMAL:
                return 2 + (hashAbs % 6);  // Partitions 2-7
            case LOW:
                return 8 + (hashAbs % 2);  // Partitions 8-9
            default:
                return hashAbs % cluster.partitionCountForTopic(topic);
        }
    }
}

Configuration

partitioner.class=com.example.RegionPartitioner
# Additional partitioner-specific config
region.partition.mapping=US:0-2,EU:3-5,APAC:6-8

Explicit Partition Selection

Direct Partition Specification

// Send to specific partition (bypasses partitioner)
ProducerRecord<String, String> record = new ProducerRecord<>(
    "orders",    // topic
    2,           // partition (explicit)
    "key",       // key
    "value"      // value
);
producer.send(record);

Use Cases for Explicit Partitioning

Use Case Rationale
Ordered replay Replay events to same partition
Testing Control partition for test isolation
Migration Maintain partition assignment during migration
Special routing Override partitioner for specific records

Partition Key Design

Choosing Keys

uml diagram

Key Design Patterns

Pattern Key Format Use Case
Entity key user-123 Group by entity
Composite key tenant-1:user-123 Multi-tenant systems
Time-bucketed 2024-01:user-123 Time-series with ordering
Hash key md5(payload) Deduplication

Avoiding Hot Partitions

// WRONG: All orders from large customer go to one partition
String key = order.getCustomerId();  // Customer "BigCorp" = 80% of orders

// BETTER: Distribute within customer
String key = order.getCustomerId() + ":" + order.getOrderId();

// OR: Use composite key with salt
String key = order.getCustomerId() + ":" + (order.getOrderId().hashCode() % 10);

Partition Count Implications

Changing Partition Count

Partition Count Changes Affect Key Distribution

Adding partitions changes the key-to-partition mapping. Records with the same key may go to different partitions before and after the change.

uml diagram

Mitigation Strategies

Strategy Description
Over-provision partitions Create more partitions than immediately needed
Use partition-aware keys Include partition hint in key
Custom partitioner Maintain backward compatibility
Reprocess data Rebuild from source after partition change

Load Balancing

Measuring Distribution

// Monitor partition distribution
Map<Integer, Long> partitionCounts = new HashMap<>();

producer.send(record, (metadata, exception) -> {
    if (metadata != null) {
        partitionCounts.merge(metadata.partition(), 1L, Long::sum);
    }
});

// Check for skew
long total = partitionCounts.values().stream().mapToLong(Long::longValue).sum();
double expected = total / (double) partitionCounts.size();

partitionCounts.forEach((partition, count) -> {
    double ratio = count / expected;
    if (ratio > 1.5 || ratio < 0.5) {
        log.warn("Partition {} is skewed: {} records (expected ~{})",
            partition, count, (long) expected);
    }
});

Addressing Skew

Cause Solution
Few high-volume keys Add salt to key, use composite key
Poor hash distribution Use different key field
Null keys with round-robin Upgrade to sticky partitioner
Custom partitioner bug Review and fix partitioner logic

Partition Availability

Handling Unavailable Partitions

public class AvailabilityAwarePartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                        Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        List<PartitionInfo> available = cluster.availablePartitionsForTopic(topic);

        if (available.isEmpty()) {
            // All partitions unavailable; use any partition
            int hash = keyBytes != null ? Utils.murmur2(keyBytes) : 0;
            return (hash & 0x7fffffff) % partitions.size();
        }

        if (keyBytes == null) {
            // Sticky to available partition
            return available.get(ThreadLocalRandom.current().nextInt(available.size()))
                .partition();
        }

        // Hash to available partition
        int hash = Utils.murmur2(keyBytes);
        return available.get((hash & 0x7fffffff) % available.size()).partition();
    }
}

Configuration

# Wait for partition availability
partitioner.availability.timeout.ms=0  # 0 = don't wait

Monitoring Partitioning

Key Metrics

Metric Description
record-send-total per partition Records sent to each partition
byte-total per partition Bytes sent to each partition
batch-size-avg per partition Batch sizes per partition

Detecting Issues

Issue Symptom Solution
Hot partition One partition with much higher traffic Improve key distribution
Empty partitions Some partitions receive no traffic Check key cardinality
Ordering violations Out-of-order processing Ensure consistent key usage