Skip to content

Kafka Client Throttling

Kafka implements throttling mechanisms to protect brokers from overload and ensure fair resource sharing among clients. This guide covers quota types, configuration, client-side backpressure, and quota monitoring.

Throttling Overview

uml diagram


Broker-Side Quotas

Quota Types

Quota Type Description Unit
producer_byte_rate Max produce throughput bytes/sec
consumer_byte_rate Max fetch throughput bytes/sec
request_percentage Max request handler time percentage
controller_mutation_rate Max admin operations mutations/sec

Quota Hierarchy

uml diagram

Configuration Examples

Per-User Quota:

# Set quota for specific user
kafka-configs.sh --bootstrap-server localhost:9092 \
    --alter --add-config 'producer_byte_rate=10485760,consumer_byte_rate=20971520' \
    --entity-type users --entity-name alice

Per-Client-ID Quota:

# Set quota for specific client ID
kafka-configs.sh --bootstrap-server localhost:9092 \
    --alter --add-config 'producer_byte_rate=5242880' \
    --entity-type clients --entity-name batch-producer

User + Client-ID Quota:

# Set quota for specific user and client combination
kafka-configs.sh --bootstrap-server localhost:9092 \
    --alter --add-config 'producer_byte_rate=52428800' \
    --entity-type users --entity-name alice \
    --entity-type clients --entity-name high-priority-producer

Default Quota:

# Set default quota for all users
kafka-configs.sh --bootstrap-server localhost:9092 \
    --alter --add-config 'producer_byte_rate=1048576,consumer_byte_rate=2097152' \
    --entity-type users --entity-default

# Set default quota for all clients
kafka-configs.sh --bootstrap-server localhost:9092 \
    --alter --add-config 'request_percentage=25' \
    --entity-type clients --entity-default

Quota Enforcement

How Throttling Works

uml diagram

Throttle Time Calculation

throttle_time = (observed_rate - quota_limit) / quota_limit × window_size

Example: - Quota: 10 MB/s - Observed rate: 15 MB/s - Excess: 5 MB/s (50% over) - Throttle time: ~500ms to bring back to limit

Client Response to Throttling

uml diagram


Request Rate Quotas

Request Handler Time

# Percentage of I/O thread time
request_percentage=25  # Max 25% of one I/O thread

Calculation:

request_percentage = (processing_time / window_time) × 100 × num_io_threads

Mutation Rate Quota (Kafka 2.7+)

Limits admin operations to protect the controller:

# Set controller mutation rate quota
kafka-configs.sh --bootstrap-server localhost:9092 \
    --alter --add-config 'controller_mutation_rate=10' \
    --entity-type users --entity-name admin-user
Operation Mutations
CreateTopics (1 topic, 10 partitions) 11
DeleteTopics (1 topic) 1
CreatePartitions (+5 partitions) 5
AlterConfigs Variable

Client-Side Backpressure

Producer Backpressure

uml diagram

Configuration:

# Total buffer memory
buffer.memory=33554432  # 32 MB

# Max concurrent requests per connection
max.in.flight.requests.per.connection=5

# Max time to block on send()
max.block.ms=60000  # 60 seconds

Consumer Backpressure

uml diagram

Configuration:

# Max bytes per fetch response
fetch.max.bytes=52428800  # 50 MB

# Max bytes per partition
max.partition.fetch.bytes=1048576  # 1 MB

# Max records per poll
max.poll.records=500

Consumer Pause/Resume

// Implement backpressure with pause/resume
while (running) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (TopicPartition partition : records.partitions()) {
        if (shouldPause(partition)) {
            // Stop fetching from this partition
            consumer.pause(Collections.singleton(partition));
            pausedPartitions.add(partition);
        } else {
            processRecords(records.records(partition));
        }
    }

    // Resume partitions that can accept more
    for (TopicPartition partition : pausedPartitions) {
        if (canResume(partition)) {
            consumer.resume(Collections.singleton(partition));
            pausedPartitions.remove(partition);
        }
    }
}

Quota Monitoring

Broker Metrics

Metric Description
kafka.server:type=FetchThrottleQueueSize Queued throttled fetch requests
kafka.server:type=ProduceThrottleQueueSize Queued throttled produce requests
kafka.server:type=ClientQuotaManager,name=throttle-time Total throttle time

Client Metrics

Metric Description
produce-throttle-time-avg Average produce throttle time
produce-throttle-time-max Max produce throttle time
fetch-throttle-time-avg Average fetch throttle time
fetch-throttle-time-max Max fetch throttle time

Viewing Quotas

# List all quotas
kafka-configs.sh --bootstrap-server localhost:9092 \
    --describe --entity-type users

# Describe specific user quota
kafka-configs.sh --bootstrap-server localhost:9092 \
    --describe --entity-type users --entity-name alice

# Describe specific client quota
kafka-configs.sh --bootstrap-server localhost:9092 \
    --describe --entity-type clients --entity-name my-producer

Quota Planning

Capacity Estimation

uml diagram

Quota Guidelines

Client Type Producer Quota Consumer Quota Request %
Critical real-time 50-100 MB/s 100-200 MB/s 50%
Standard production 10-50 MB/s 50-100 MB/s 25%
Batch processing 5-20 MB/s 20-50 MB/s 10%
Development 1-5 MB/s 5-10 MB/s 5%

Dynamic Quota Updates

Online Quota Changes

# Increase quota without restart
kafka-configs.sh --bootstrap-server localhost:9092 \
    --alter --add-config 'producer_byte_rate=52428800' \
    --entity-type users --entity-name alice

# Remove quota (fall back to default)
kafka-configs.sh --bootstrap-server localhost:9092 \
    --alter --delete-config 'producer_byte_rate' \
    --entity-type users --entity-name alice

Quota Change Propagation

uml diagram


Handling Throttling

Producer Handling

// Monitor throttle metrics
producer.metrics().forEach((name, metric) -> {
    if (name.name().contains("throttle")) {
        double value = (double) metric.metricValue();
        if (value > 0) {
            log.warn("Producer throttled: {} = {}", name, value);
        }
    }
});

Consumer Handling

// Check for throttling in consumer
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

// Get metrics
Metric throttleMetric = consumer.metrics().get(
    new MetricName("fetch-throttle-time-avg", "consumer-fetch-manager-metrics",
        "", Collections.emptyMap()));

if (throttleMetric != null) {
    double throttleTime = (double) throttleMetric.metricValue();
    if (throttleTime > 100) {
        log.warn("Consumer being throttled: {}ms average", throttleTime);
    }
}

Adaptive Rate Limiting

public class AdaptiveProducer {
    private final RateLimiter rateLimiter;
    private double currentRate;

    public void send(ProducerRecord<String, String> record) {
        rateLimiter.acquire();  // Client-side rate limiting

        producer.send(record, (metadata, exception) -> {
            if (metadata != null && metadata.throttleTimeMs() > 0) {
                // Reduce rate when throttled
                adjustRate(metadata.throttleTimeMs());
            }
        });
    }

    private void adjustRate(long throttleTimeMs) {
        if (throttleTimeMs > 100) {
            currentRate *= 0.8;  // Reduce by 20%
            rateLimiter.setRate(currentRate);
        }
    }
}

Network Bandwidth Throttling

Inter-Broker Throttling

Throttle replication traffic during reassignment:

# Set replication throttle
kafka-configs.sh --bootstrap-server localhost:9092 \
    --alter --add-config 'leader.replication.throttled.rate=50000000,follower.replication.throttled.rate=50000000' \
    --entity-type brokers --entity-name 1

Throttle During Reassignment

# Reassignment with throttle
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
    --reassignment-json-file reassignment.json \
    --throttle 50000000 \
    --execute

# Verify and remove throttle after completion
kafka-reassign-partitions.sh --bootstrap-server localhost:9092 \
    --reassignment-json-file reassignment.json \
    --verify

Troubleshooting Throttling

Diagnosing Throttling Issues

Symptom Possible Cause Solution
High throttle time Quota too low Increase quota
Inconsistent throttling Multiple client IDs Consolidate or set per-client quotas
Sudden throttling Burst traffic Implement client-side rate limiting
All clients throttled Default quota too low Increase default or set specific quotas

Debug Commands

# Check current quotas
kafka-configs.sh --bootstrap-server localhost:9092 \
    --describe --entity-type users --entity-type clients

# Check broker metrics
kafka-run-class.sh kafka.tools.JmxTool \
    --object-name 'kafka.server:type=ClientQuotaManager,user=*,client-id=*' \
    --jmx-url service:jmx:rmi:///jndi/rmi://localhost:9999/jmxrmi

Version Compatibility

Feature Minimum Version
Byte rate quotas 0.9.0
Request percentage quota 0.11.0
User quotas 0.10.1
User + Client-ID quotas 0.10.1
Controller mutation quota 2.7.0

Best Practices

Quota Configuration

Practice Rationale
Set default quotas Prevent unknown clients from overloading
Use user quotas with authentication Accurate tracking per user
Leave headroom Allow for traffic bursts
Monitor throttle metrics Detect quota issues early

Client Design

Practice Rationale
Implement client-side rate limiting Smooth traffic patterns
Handle throttle response Avoid wasted requests
Use pause/resume for consumers Control processing rate
Monitor throttle metrics Adapt to quota changes