Skip to content

Kafka Streams DSL

Complete reference for Kafka Streams Domain Specific Language operations.


Stream Operations

Creating Streams

StreamsBuilder builder = new StreamsBuilder();

// From topic
KStream<String, String> stream = builder.stream("topic");

// With specific serdes
KStream<String, Event> events = builder.stream(
    "events",
    Consumed.with(Serdes.String(), eventSerde)
);

// From multiple topics
KStream<String, String> merged = builder.stream(
    Arrays.asList("topic1", "topic2", "topic3")
);

// With pattern
KStream<String, String> pattern = builder.stream(
    Pattern.compile("events-.*")
);

Transformations

Operation Input Output Description
map (K, V) (K', V') Transform key and value
mapValues V V' Transform value only
flatMap (K, V) Iterable<(K', V')> One-to-many
flatMapValues V Iterable One-to-many values
filter (K, V) boolean Keep matching
filterNot (K, V) boolean Keep non-matching
selectKey (K, V) K' Change key
// map - transform key and value
KStream<String, Integer> mapped = stream.map(
    (key, value) -> KeyValue.pair(key.toUpperCase(), value.length())
);

// mapValues - transform value only (more efficient)
KStream<String, Integer> lengths = stream.mapValues(String::length);

// flatMap - one-to-many
KStream<String, String> words = stream.flatMap(
    (key, value) -> Arrays.stream(value.split(" "))
        .map(word -> KeyValue.pair(key, word))
        .collect(Collectors.toList())
);

// filter
KStream<String, String> filtered = stream.filter(
    (key, value) -> value != null && value.length() > 0
);

// selectKey - change the key
KStream<String, Order> rekeyed = orders.selectKey(
    (key, order) -> order.getCustomerId()
);

Branching

Split a stream into multiple streams based on predicates.

Map<String, KStream<String, Event>> branches = events.split(Named.as("branch-"))
    .branch((key, event) -> event.getType().equals("click"),
        Branched.as("clicks"))
    .branch((key, event) -> event.getType().equals("view"),
        Branched.as("views"))
    .defaultBranch(Branched.as("other"));

KStream<String, Event> clicks = branches.get("branch-clicks");
KStream<String, Event> views = branches.get("branch-views");
KStream<String, Event> other = branches.get("branch-other");

Merging

Combine multiple streams.

KStream<String, String> merged = stream1
    .merge(stream2)
    .merge(stream3);

Table Operations

Creating Tables

// From topic (changelog semantics)
KTable<String, String> table = builder.table("topic");

// With materialized state store
KTable<String, User> users = builder.table(
    "users",
    Materialized.<String, User, KeyValueStore<Bytes, byte[]>>as("users-store")
        .withKeySerde(Serdes.String())
        .withValueSerde(userSerde)
);

// GlobalKTable (fully replicated)
GlobalKTable<String, Config> config = builder.globalTable("config");

Table Transformations

// mapValues
KTable<String, Integer> ages = users.mapValues(User::getAge);

// filter
KTable<String, User> activeUsers = users.filter(
    (key, user) -> user.isActive()
);

// Convert to stream
KStream<String, User> userStream = users.toStream();

Grouping

KStream Grouping

// Group by existing key
KGroupedStream<String, Event> grouped = events.groupByKey();

// Group by new key
KGroupedStream<String, Event> groupedByType = events.groupBy(
    (key, event) -> event.getType(),
    Grouped.with(Serdes.String(), eventSerde)
);

KTable Grouping

// Group by existing key
KGroupedTable<String, User> grouped = users.groupBy(
    (key, user) -> KeyValue.pair(key, user),
    Grouped.with(Serdes.String(), userSerde)
);

Aggregations

Count

KTable<String, Long> counts = grouped.count();

// With materialized store
KTable<String, Long> counts = grouped.count(
    Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")
);

Reduce

KTable<String, Long> maxValues = grouped.reduce(
    (value1, value2) -> Math.max(value1, value2)
);

Aggregate

KTable<String, Aggregate> aggregated = grouped.aggregate(
    // Initializer
    () -> new Aggregate(0, 0.0),
    // Aggregator
    (key, value, aggregate) -> aggregate.add(value),
    // Materialized
    Materialized.with(Serdes.String(), aggregateSerde)
);

Windowed Aggregations

Tumbling Windows

KTable<Windowed<String>, Long> hourlyCount = events
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
    .count();

Hopping Windows

KTable<Windowed<String>, Long> slidingCount = events
    .groupByKey()
    .windowedBy(
        TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1))
            .advanceBy(Duration.ofMinutes(1))
    )
    .count();

Session Windows

KTable<Windowed<String>, Long> sessionCount = events
    .groupByKey()
    .windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
    .count();

Suppress

Control when windowed results are emitted.

KTable<Windowed<String>, Long> finalCounts = events
    .groupByKey()
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
    .count()
    .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()));

Joins

KStream-KStream Join

KStream<String, EnrichedClick> enriched = clicks.join(
    impressions,
    (click, impression) -> new EnrichedClick(click, impression),
    JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
    StreamJoined.with(Serdes.String(), clickSerde, impressionSerde)
);

KStream-KTable Join

KStream<String, EnrichedOrder> enriched = orders.join(
    customers,
    (order, customer) -> new EnrichedOrder(order, customer)
);

// Left join (order always emitted)
KStream<String, EnrichedOrder> enriched = orders.leftJoin(
    customers,
    (order, customer) -> new EnrichedOrder(order, customer)
);

KStream-GlobalKTable Join

KStream<String, EnrichedOrder> enriched = orders.join(
    products,
    (orderId, order) -> order.getProductId(),  // Key mapper
    (order, product) -> new EnrichedOrder(order, product)
);

KTable-KTable Join

KTable<String, UserProfile> profiles = users.join(
    preferences,
    (user, pref) -> new UserProfile(user, pref)
);

Output Operations

To Topic

// To single topic
stream.to("output-topic");

// With specific serdes
stream.to("output-topic", Produced.with(Serdes.String(), eventSerde));

// Dynamic topic selection
stream.to(
    (key, value, recordContext) -> "output-" + value.getType(),
    Produced.with(Serdes.String(), eventSerde)
);

Through (Repartition)

// Repartition through intermediate topic
KStream<String, Event> repartitioned = events.through("repartition-topic");

Peek (Side Effects)

stream.peek((key, value) -> logger.info("Processing: {}", key));
stream.print(Printed.toSysOut());
stream.print(Printed.<String, Event>toSysOut().withLabel("events"));

Processor API

For advanced use cases requiring full control.

stream.process(
    () -> new Processor<String, Event, String, ProcessedEvent>() {
        private ProcessorContext<String, ProcessedEvent> context;
        private KeyValueStore<String, Long> store;

        @Override
        public void init(ProcessorContext<String, ProcessedEvent> context) {
            this.context = context;
            this.store = context.getStateStore("my-store");
        }

        @Override
        public void process(Record<String, Event> record) {
            // Custom processing logic
            Long count = store.get(record.key());
            count = (count == null) ? 1L : count + 1;
            store.put(record.key(), count);

            context.forward(new Record<>(
                record.key(),
                new ProcessedEvent(record.value(), count),
                record.timestamp()
            ));
        }
    },
    Named.as("custom-processor"),
    "my-store"
);