Skip to content

Telemetry at Scale

IoT telemetry presents extreme data management challenges: millions of devices reporting metrics every second, multi-tenant isolation requirements, and queries spanning both real-time dashboards and historical analysis. This document addresses patterns for handling telemetry at the scale where naive approaches fail.


The Scale Challenge

Consider a fleet telemetry scenario:

  • 1 million devices reporting every 10 seconds
  • 100,000 writes per second sustained
  • 10 metrics per report → 1 million metrics per second
  • 30-day retention at full resolution
  • Multi-tenant with isolation requirements

At this scale, problems that don't exist at smaller deployments become critical:

Hot partitions: A device reporting every second for a year creates 31 million rows in one partition, far exceeding Cassandra's comfortable limits.

Write amplification: With RF=3, 100K writes/second becomes 300K writes/second across the cluster.

Compaction pressure: Continuous writes generate SSTables faster than compaction can process them without proper strategy.

Query fan-out: A query spanning many devices touches many partitions, potentially overwhelming coordinators.

Tenant isolation: One large tenant's workload cannot be allowed to impact others.


Schema Design for Scale

Multi-Level Bucketing

Partition by device, tenant, and time bucket to bound partition size:

CREATE TABLE telemetry (
    tenant_id UUID,
    device_id UUID,
    bucket TIMESTAMP,           -- Hourly bucket
    event_time TIMESTAMP,
    metric_name TEXT,
    metric_value DOUBLE,
    tags MAP<TEXT, TEXT>,
    PRIMARY KEY ((tenant_id, device_id, bucket), event_time, metric_name)
) WITH CLUSTERING ORDER BY (event_time DESC, metric_name ASC)
  AND compaction = {
    'class': 'TimeWindowCompactionStrategy',
    'compaction_window_unit': 'HOURS',
    'compaction_window_size': 1
  }
  AND default_time_to_live = 2592000;  -- 30 days

Sizing analysis: - Device reporting every 10 seconds: 6 reports/minute × 60 × 10 metrics = 3,600 rows/hour - 3,600 rows × ~100 bytes = ~360KB per partition - Well within Cassandra's partition size recommendations

Adaptive Bucket Sizing

Different devices have different reporting frequencies. Use adaptive bucketing:

public class AdaptiveBucketing {

    public Instant calculateBucket(UUID deviceId, Instant eventTime) {
        DeviceProfile profile = getDeviceProfile(deviceId);

        Duration bucketSize = switch (profile.getReportingFrequency()) {
            case HIGH_FREQUENCY -> Duration.ofMinutes(15);   // > 1/sec
            case MEDIUM_FREQUENCY -> Duration.ofHours(1);    // 1/min - 1/sec
            case LOW_FREQUENCY -> Duration.ofHours(6);       // < 1/min
        };

        long bucketMillis = bucketSize.toMillis();
        long eventMillis = eventTime.toEpochMilli();
        long bucketStart = (eventMillis / bucketMillis) * bucketMillis;

        return Instant.ofEpochMilli(bucketStart);
    }
}

Ingestion Architecture

Write Path Optimization

uml diagram

Batched Writes

Group writes by partition for efficient batching:

@KafkaListener(topics = "telemetry", containerFactory = "batchListenerFactory")
public void consumeBatch(List<TelemetryEvent> events) {
    // Group by partition key
    Map<PartitionKey, List<TelemetryEvent>> grouped = events.stream()
        .collect(Collectors.groupingBy(e ->
            new PartitionKey(e.getTenantId(), e.getDeviceId(),
                            calculateBucket(e.getDeviceId(), e.getEventTime()))));

    // Write each partition's events as unlogged batch
    grouped.forEach((key, partitionEvents) -> {
        BatchStatement batch = BatchStatement.newInstance(BatchType.UNLOGGED);

        for (TelemetryEvent event : partitionEvents) {
            batch = batch.add(insertTelemetry.bind(
                key.getTenantId(), key.getDeviceId(), key.getBucket(),
                event.getEventTime(), event.getMetricName(),
                event.getMetricValue(), event.getTags()
            ));
        }

        session.executeAsync(batch);
    });
}

Backpressure Handling

When Cassandra cannot keep up, the ingestion pipeline must apply backpressure:

public class BackpressureController {

    private final Semaphore inFlightPermits;
    private final AtomicLong pendingWrites = new AtomicLong(0);

    public BackpressureController(int maxInFlight) {
        this.inFlightPermits = new Semaphore(maxInFlight);
    }

    public CompletableFuture<Void> writeWithBackpressure(
            BatchStatement batch) {

        try {
            // Block if too many in-flight writes
            inFlightPermits.acquire();
            pendingWrites.incrementAndGet();

            return session.executeAsync(batch)
                .toCompletableFuture()
                .whenComplete((result, error) -> {
                    inFlightPermits.release();
                    pendingWrites.decrementAndGet();

                    if (error != null) {
                        metrics.recordWriteError();
                    }
                })
                .thenApply(rs -> null);

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IngestionException("Interrupted during backpressure", e);
        }
    }

    public boolean isHealthy() {
        return pendingWrites.get() < threshold;
    }
}

Hot Partition Mitigation

Some devices report far more frequently than others. A single malfunctioning sensor sending data every millisecond can create a hot partition.

uml diagram

Detection

Monitor partition-level write rates:

@Scheduled(fixedDelay = 60000)
public void detectHotPartitions() {
    Map<PartitionKey, Long> writeCounts = writeCounter.getAndReset();

    writeCounts.entrySet().stream()
        .filter(e -> e.getValue() > HOT_PARTITION_THRESHOLD)
        .forEach(e -> {
            log.warn("Hot partition detected: {} with {} writes/min",
                e.getKey(), e.getValue());

            // Take action
            handleHotPartition(e.getKey());
        });
}

Mitigation Strategies

Sub-bucketing for hot devices:

public PartitionKey getPartitionKey(TelemetryEvent event) {
    UUID deviceId = event.getDeviceId();
    Instant bucket = calculateBucket(deviceId, event.getEventTime());

    if (isKnownHotDevice(deviceId)) {
        // Add sub-bucket to spread writes
        int subBucket = event.getEventTime().getNano() % SUB_BUCKET_COUNT;
        return new PartitionKey(event.getTenantId(), deviceId, bucket, subBucket);
    }

    return new PartitionKey(event.getTenantId(), deviceId, bucket, 0);
}

Rate limiting per device:

public class DeviceRateLimiter {

    private final LoadingCache<UUID, RateLimiter> limiters = CacheBuilder.newBuilder()
        .expireAfterAccess(Duration.ofMinutes(10))
        .build(new CacheLoader<>() {
            @Override
            public RateLimiter load(UUID deviceId) {
                DeviceProfile profile = getDeviceProfile(deviceId);
                double maxRate = profile.getMaxReportsPerSecond();
                return RateLimiter.create(maxRate);
            }
        });

    public boolean tryAcquire(UUID deviceId) {
        return limiters.get(deviceId).tryAcquire();
    }
}

Sampling for excessive data:

public boolean shouldSample(TelemetryEvent event) {
    if (!isOverThreshold(event.getDeviceId())) {
        return true;  // Accept all
    }

    // Sample at reduced rate
    double samplingRate = calculateSamplingRate(event.getDeviceId());
    return ThreadLocalRandom.current().nextDouble() < samplingRate;
}

Multi-Tenant Isolation

Tenant-Aware Partitioning

Including tenant ID in partition keys provides natural isolation:

-- Telemetry is partitioned by tenant
PRIMARY KEY ((tenant_id, device_id, bucket), ...)

-- Tenant-specific indexes
CREATE TABLE telemetry_by_tenant (
    tenant_id UUID,
    bucket TIMESTAMP,
    device_id UUID,
    event_count COUNTER,
    PRIMARY KEY ((tenant_id, bucket), device_id)
);

Resource Quotas

Enforce per-tenant limits:

public class TenantQuotaEnforcer {

    public void enforceQuota(UUID tenantId, TelemetryBatch batch)
            throws QuotaExceededException {

        TenantQuota quota = quotaService.getQuota(tenantId);
        TenantUsage usage = usageService.getCurrentUsage(tenantId);

        // Check write rate
        if (usage.getWritesPerSecond() + batch.size() > quota.getMaxWritesPerSecond()) {
            metrics.recordQuotaExceeded(tenantId, "writes_per_second");
            throw new QuotaExceededException(tenantId, "Write rate limit exceeded");
        }

        // Check storage
        if (usage.getStorageBytes() > quota.getMaxStorageBytes()) {
            metrics.recordQuotaExceeded(tenantId, "storage");
            throw new QuotaExceededException(tenantId, "Storage limit exceeded");
        }

        // Check device count
        long newDevices = countNewDevices(tenantId, batch);
        if (usage.getDeviceCount() + newDevices > quota.getMaxDevices()) {
            metrics.recordQuotaExceeded(tenantId, "devices");
            throw new QuotaExceededException(tenantId, "Device limit exceeded");
        }
    }
}

Tenant-Specific TTL

Different tenants may have different retention requirements:

public int getTTL(UUID tenantId) {
    TenantConfig config = configService.getConfig(tenantId);
    return config.getRetentionDays() * 86400;  // Convert to seconds
}

// Apply per-write
session.execute(insertTelemetry.bind(...)
    .setInt("[ttl]", getTTL(tenantId)));

Downsampling and Aggregation

Raw telemetry at full resolution is valuable for recent data but impractical for long-term storage and queries.

Aggregation Pipeline

uml diagram

Aggregate Schema

CREATE TABLE telemetry_hourly (
    tenant_id UUID,
    device_id UUID,
    metric_name TEXT,
    hour_bucket TIMESTAMP,
    min_value DOUBLE,
    max_value DOUBLE,
    sum_value DOUBLE,
    count BIGINT,
    stddev DOUBLE,
    percentile_50 DOUBLE,
    percentile_95 DOUBLE,
    percentile_99 DOUBLE,
    PRIMARY KEY ((tenant_id, device_id, metric_name), hour_bucket)
) WITH CLUSTERING ORDER BY (hour_bucket DESC)
  AND default_time_to_live = 31536000;  -- 1 year

Stream Aggregation

Use windowed stream processing for real-time aggregation:

DataStream<TelemetryEvent> raw = /* source */;

raw.keyBy(e -> new AggregationKey(e.getTenantId(), e.getDeviceId(),
                                  e.getMetricName()))
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .aggregate(new TelemetryAggregator())
    .addSink(new CassandraSink<>("telemetry_hourly"));

public class TelemetryAggregator implements
        AggregateFunction<TelemetryEvent, TelemetryAccumulator, TelemetryAggregate> {

    @Override
    public TelemetryAccumulator createAccumulator() {
        return new TelemetryAccumulator();
    }

    @Override
    public TelemetryAccumulator add(TelemetryEvent event,
                                    TelemetryAccumulator acc) {
        acc.addValue(event.getMetricValue());
        return acc;
    }

    @Override
    public TelemetryAggregate getResult(TelemetryAccumulator acc) {
        return new TelemetryAggregate(
            acc.getMin(), acc.getMax(), acc.getSum(), acc.getCount(),
            acc.getStdDev(), acc.getPercentile(50),
            acc.getPercentile(95), acc.getPercentile(99)
        );
    }

    @Override
    public TelemetryAccumulator merge(TelemetryAccumulator a,
                                      TelemetryAccumulator b) {
        return a.merge(b);
    }
}

Query Optimization

Time-Range Queries

Querying recent data for a single device is efficient:

public List<TelemetryPoint> getRecentTelemetry(UUID tenantId, UUID deviceId,
                                               String metricName,
                                               Duration lookback) {
    Instant start = Instant.now().minus(lookback);
    List<Instant> buckets = calculateBuckets(deviceId, start, Instant.now());

    // Query each bucket (typically 1-2 for recent data)
    return buckets.stream()
        .flatMap(bucket -> queryBucket(tenantId, deviceId, bucket,
                                       metricName, start).stream())
        .sorted(Comparator.comparing(TelemetryPoint::getEventTime))
        .collect(Collectors.toList());
}

Fleet-Wide Queries

Queries spanning many devices require careful handling:

public FleetMetrics getFleetMetrics(UUID tenantId, String metricName,
                                    Instant start, Instant end) {
    // Get device list (from device registry, not full scan)
    List<UUID> deviceIds = deviceRegistry.getDevicesForTenant(tenantId);

    // Parallel query with controlled concurrency
    int concurrency = Math.min(deviceIds.size(), MAX_CONCURRENT_QUERIES);

    return deviceIds.stream()
        .parallel()
        .map(deviceId -> queryDeviceMetrics(tenantId, deviceId,
                                            metricName, start, end))
        .reduce(FleetMetrics::merge)
        .orElse(FleetMetrics.empty());
}

Pre-Computed Fleet Aggregates

For frequently-queried fleet metrics, pre-compute aggregates:

CREATE TABLE fleet_telemetry_hourly (
    tenant_id UUID,
    metric_name TEXT,
    hour_bucket TIMESTAMP,
    device_count BIGINT,
    total_readings BIGINT,
    fleet_min DOUBLE,
    fleet_max DOUBLE,
    fleet_avg DOUBLE,
    fleet_stddev DOUBLE,
    PRIMARY KEY ((tenant_id, metric_name), hour_bucket)
) WITH CLUSTERING ORDER BY (hour_bucket DESC);

Monitoring at Scale

Key Metrics

Metric Description Alert Threshold
telemetry.ingestion_rate Events per second Below baseline
telemetry.ingestion_latency_p99 Write latency > 50ms
telemetry.batch_size_avg Events per batch < minimum efficiency
telemetry.hot_partitions Hot partition count > 0
telemetry.quota_rejections Throttled events > 1%
telemetry.backpressure_events Backpressure triggers Increasing
cassandra.pending_compactions Compaction backlog Growing unbounded

Capacity Planning

Raw storage per device per day:
  Reports/day × metrics × row_size = 8,640 × 10 × 100 bytes = 8.64 MB

Fleet storage per day (1M devices):
  8.64 MB × 1,000,000 = 8.64 TB

30-day retention:
  8.64 TB × 30 = 259 TB raw
  + aggregates (~10% of raw) = 285 TB total

With RF=3:
  285 TB × 3 = 855 TB cluster capacity

When to Use These Patterns

Appropriate Use Cases

  • Industrial IoT: Factory sensors, equipment monitoring
  • Fleet telemetry: Vehicles, drones, mobile devices
  • Smart infrastructure: Building management, smart cities
  • Consumer IoT: Large-scale smart home deployments
  • Environmental monitoring: Weather stations, pollution sensors

Consider Alternatives When

  • Low device count: Under 10,000 devices may not need this complexity
  • Infrequent reporting: Daily reports don't require these optimizations
  • Single-tenant: Without multi-tenant requirements, simpler designs work
  • Real-time only: If historical data isn't needed, in-memory solutions may suffice

Summary

Telemetry at scale requires deliberate architecture:

  1. Multi-level bucketing bounds partition sizes regardless of reporting frequency
  2. Batched ingestion through message queues handles write spikes
  3. Hot partition mitigation through detection, sub-bucketing, and rate limiting
  4. Tenant isolation via partitioning, quotas, and per-tenant configuration
  5. Downsampling pipelines maintain query performance across time ranges
  6. Fleet query optimization through pre-computation and controlled parallelism

Cassandra's linear scalability enables handling millions of devices, but only with schema and application design that respects its constraints. The patterns here transform Cassandra from a database that could handle the load into one that handles it efficiently.