Kafka Streams DSL¶
Complete reference for Kafka Streams Domain Specific Language operations.
Stream Operations¶
Creating Streams¶
StreamsBuilder builder = new StreamsBuilder();
// From topic
KStream<String, String> stream = builder.stream("topic");
// With specific serdes
KStream<String, Event> events = builder.stream(
"events",
Consumed.with(Serdes.String(), eventSerde)
);
// From multiple topics
KStream<String, String> merged = builder.stream(
Arrays.asList("topic1", "topic2", "topic3")
);
// With pattern
KStream<String, String> pattern = builder.stream(
Pattern.compile("events-.*")
);
Transformations¶
| Operation | Input | Output | Description |
|---|---|---|---|
map |
(K, V) | (K', V') | Transform key and value |
mapValues |
V | V' | Transform value only |
flatMap |
(K, V) | Iterable<(K', V')> | One-to-many |
flatMapValues |
V | Iterable |
One-to-many values |
filter |
(K, V) | boolean | Keep matching |
filterNot |
(K, V) | boolean | Keep non-matching |
selectKey |
(K, V) | K' | Change key |
// map - transform key and value
KStream<String, Integer> mapped = stream.map(
(key, value) -> KeyValue.pair(key.toUpperCase(), value.length())
);
// mapValues - transform value only (more efficient)
KStream<String, Integer> lengths = stream.mapValues(String::length);
// flatMap - one-to-many
KStream<String, String> words = stream.flatMap(
(key, value) -> Arrays.stream(value.split(" "))
.map(word -> KeyValue.pair(key, word))
.collect(Collectors.toList())
);
// filter
KStream<String, String> filtered = stream.filter(
(key, value) -> value != null && value.length() > 0
);
// selectKey - change the key
KStream<String, Order> rekeyed = orders.selectKey(
(key, order) -> order.getCustomerId()
);
Branching¶
Split a stream into multiple streams based on predicates.
Map<String, KStream<String, Event>> branches = events.split(Named.as("branch-"))
.branch((key, event) -> event.getType().equals("click"),
Branched.as("clicks"))
.branch((key, event) -> event.getType().equals("view"),
Branched.as("views"))
.defaultBranch(Branched.as("other"));
KStream<String, Event> clicks = branches.get("branch-clicks");
KStream<String, Event> views = branches.get("branch-views");
KStream<String, Event> other = branches.get("branch-other");
Merging¶
Combine multiple streams.
KStream<String, String> merged = stream1
.merge(stream2)
.merge(stream3);
Table Operations¶
Creating Tables¶
// From topic (changelog semantics)
KTable<String, String> table = builder.table("topic");
// With materialized state store
KTable<String, User> users = builder.table(
"users",
Materialized.<String, User, KeyValueStore<Bytes, byte[]>>as("users-store")
.withKeySerde(Serdes.String())
.withValueSerde(userSerde)
);
// GlobalKTable (fully replicated)
GlobalKTable<String, Config> config = builder.globalTable("config");
Table Transformations¶
// mapValues
KTable<String, Integer> ages = users.mapValues(User::getAge);
// filter
KTable<String, User> activeUsers = users.filter(
(key, user) -> user.isActive()
);
// Convert to stream
KStream<String, User> userStream = users.toStream();
Grouping¶
KStream Grouping¶
// Group by existing key
KGroupedStream<String, Event> grouped = events.groupByKey();
// Group by new key
KGroupedStream<String, Event> groupedByType = events.groupBy(
(key, event) -> event.getType(),
Grouped.with(Serdes.String(), eventSerde)
);
KTable Grouping¶
// Group by existing key
KGroupedTable<String, User> grouped = users.groupBy(
(key, user) -> KeyValue.pair(key, user),
Grouped.with(Serdes.String(), userSerde)
);
Aggregations¶
Count¶
KTable<String, Long> counts = grouped.count();
// With materialized store
KTable<String, Long> counts = grouped.count(
Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")
);
Reduce¶
KTable<String, Long> maxValues = grouped.reduce(
(value1, value2) -> Math.max(value1, value2)
);
Aggregate¶
KTable<String, Aggregate> aggregated = grouped.aggregate(
// Initializer
() -> new Aggregate(0, 0.0),
// Aggregator
(key, value, aggregate) -> aggregate.add(value),
// Materialized
Materialized.with(Serdes.String(), aggregateSerde)
);
Windowed Aggregations¶
Tumbling Windows¶
KTable<Windowed<String>, Long> hourlyCount = events
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofHours(1)))
.count();
Hopping Windows¶
KTable<Windowed<String>, Long> slidingCount = events
.groupByKey()
.windowedBy(
TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofMinutes(1))
.advanceBy(Duration.ofMinutes(1))
)
.count();
Session Windows¶
KTable<Windowed<String>, Long> sessionCount = events
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapWithNoGrace(Duration.ofMinutes(30)))
.count();
Suppress¶
Control when windowed results are emitted.
KTable<Windowed<String>, Long> finalCounts = events
.groupByKey()
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1)))
.count()
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()));
Joins¶
KStream-KStream Join¶
KStream<String, EnrichedClick> enriched = clicks.join(
impressions,
(click, impression) -> new EnrichedClick(click, impression),
JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(5)),
StreamJoined.with(Serdes.String(), clickSerde, impressionSerde)
);
KStream-KTable Join¶
KStream<String, EnrichedOrder> enriched = orders.join(
customers,
(order, customer) -> new EnrichedOrder(order, customer)
);
// Left join (order always emitted)
KStream<String, EnrichedOrder> enriched = orders.leftJoin(
customers,
(order, customer) -> new EnrichedOrder(order, customer)
);
KStream-GlobalKTable Join¶
KStream<String, EnrichedOrder> enriched = orders.join(
products,
(orderId, order) -> order.getProductId(), // Key mapper
(order, product) -> new EnrichedOrder(order, product)
);
KTable-KTable Join¶
KTable<String, UserProfile> profiles = users.join(
preferences,
(user, pref) -> new UserProfile(user, pref)
);
Output Operations¶
To Topic¶
// To single topic
stream.to("output-topic");
// With specific serdes
stream.to("output-topic", Produced.with(Serdes.String(), eventSerde));
// Dynamic topic selection
stream.to(
(key, value, recordContext) -> "output-" + value.getType(),
Produced.with(Serdes.String(), eventSerde)
);
Through (Repartition)¶
// Repartition through intermediate topic
KStream<String, Event> repartitioned = events.through("repartition-topic");
Peek (Side Effects)¶
stream.peek((key, value) -> logger.info("Processing: {}", key));
Print (Debug)¶
stream.print(Printed.toSysOut());
stream.print(Printed.<String, Event>toSysOut().withLabel("events"));
Processor API¶
For advanced use cases requiring full control.
stream.process(
() -> new Processor<String, Event, String, ProcessedEvent>() {
private ProcessorContext<String, ProcessedEvent> context;
private KeyValueStore<String, Long> store;
@Override
public void init(ProcessorContext<String, ProcessedEvent> context) {
this.context = context;
this.store = context.getStateStore("my-store");
}
@Override
public void process(Record<String, Event> record) {
// Custom processing logic
Long count = store.get(record.key());
count = (count == null) ? 1L : count + 1;
store.put(record.key(), count);
context.forward(new Record<>(
record.key(),
new ProcessedEvent(record.value(), count),
record.timestamp()
));
}
},
Named.as("custom-processor"),
"my-store"
);
Related Documentation¶
- Kafka Streams Overview - Concepts and architecture
- Delivery Semantics - Processing guarantees
- Architecture Patterns - Design patterns