Skip to content

Event Collaboration Pattern

Event collaboration (also called choreography) enables services to coordinate through domain events without direct coupling. Services react to events independently, enabling loose coupling and autonomous evolution.


Core Concepts

Request/Response vs Event Collaboration

uml diagram

Key Characteristics

Aspect Request/Response Event Collaboration
Coupling Tight - caller knows callees Loose - publisher doesn't know subscribers
Availability Dependent on all services Services operate independently
Scalability Limited by synchronous calls Independent scaling
Evolution Changes affect callers Add subscribers without changes
Debugging Direct call stack Distributed tracing required

Architecture

Event-Driven Communication

uml diagram

Event Types

Type Description Example
Domain events Business-meaningful occurrences OrderPlaced, PaymentReceived
Integration events Cross-service communication CustomerCreated, PriceChanged
Notification events Trigger notifications OrderShipped, RefundProcessed

Implementation

Domain Event Definition

public interface DomainEvent {
    String getEventId();
    String getAggregateId();
    Instant getOccurredAt();
}

public record OrderCreated(
    String eventId,
    String orderId,
    String customerId,
    List<OrderItem> items,
    BigDecimal total,
    Instant occurredAt
) implements DomainEvent {

    @Override
    public String getAggregateId() {
        return orderId;
    }
}

public record OrderCancelled(
    String eventId,
    String orderId,
    String reason,
    String cancelledBy,
    Instant occurredAt
) implements DomainEvent {

    @Override
    public String getAggregateId() {
        return orderId;
    }
}

Event Publisher

@Service
public class OrderService {
    private final OrderRepository repository;
    private final DomainEventPublisher eventPublisher;

    @Transactional
    public Order createOrder(CreateOrderRequest request) {
        Order order = new Order(request);
        order = repository.save(order);

        // Publish domain event
        eventPublisher.publish(new OrderCreated(
            UUID.randomUUID().toString(),
            order.getId(),
            order.getCustomerId(),
            order.getItems(),
            order.getTotal(),
            Instant.now()
        ));

        return order;
    }
}

@Component
public class KafkaDomainEventPublisher implements DomainEventPublisher {
    private final KafkaTemplate<String, DomainEvent> kafka;

    @Override
    public void publish(DomainEvent event) {
        String topic = topicFor(event);
        String key = event.getAggregateId();

        kafka.send(topic, key, event);
    }

    private String topicFor(DomainEvent event) {
        // Route by event type
        return switch (event) {
            case OrderCreated e -> "order-events";
            case OrderCancelled e -> "order-events";
            case PaymentCompleted e -> "payment-events";
            default -> "domain-events";
        };
    }
}

Event Consumers

// Inventory Service - reacts to orders
@Component
public class OrderEventHandler {
    private final InventoryService inventoryService;

    @KafkaListener(topics = "order-events", groupId = "inventory-service")
    public void onOrderEvent(DomainEvent event) {
        switch (event) {
            case OrderCreated created ->
                inventoryService.reserveItems(created.orderId(), created.items());

            case OrderCancelled cancelled ->
                inventoryService.releaseReservation(cancelled.orderId());

            default ->
                log.debug("Ignoring event: {}", event.getClass().getSimpleName());
        }
    }
}

// Payment Service - reacts to orders
@Component
public class PaymentOrderHandler {
    private final PaymentService paymentService;

    @KafkaListener(topics = "order-events", groupId = "payment-service")
    public void onOrderEvent(DomainEvent event) {
        if (event instanceof OrderCreated created) {
            paymentService.processPayment(
                created.orderId(),
                created.customerId(),
                created.total()
            );
        }
    }
}

// Analytics Service - observes all events
@Component
public class AnalyticsEventCollector {
    private final AnalyticsRepository repository;

    @KafkaListener(
        topicPattern = ".*-events",
        groupId = "analytics-service"
    )
    public void onAnyEvent(ConsumerRecord<String, DomainEvent> record) {
        repository.store(new AnalyticsEvent(
            record.topic(),
            record.value().getClass().getSimpleName(),
            record.value(),
            Instant.now()
        ));
    }
}

Event Design

Event Granularity

// Fine-grained events - more flexibility, more events
OrderLineItemAdded
OrderLineItemRemoved
OrderLineItemQuantityChanged
OrderShippingAddressSet
OrderBillingAddressSet
OrderPaymentMethodSet

// Coarse-grained events - simpler, less flexibility
OrderUpdated  // Contains all changes
Granularity Pros Cons
Fine-grained Precise reactions, better auditing More events to handle, ordering complexity
Coarse-grained Simpler handling, fewer events Less precise reactions, may miss changes

Event Content

// Notification style - minimal data, requires lookups
public record OrderCreated(
    String orderId,
    Instant occurredAt
) {}

// Event-carried state transfer - self-contained
public record OrderCreated(
    String orderId,
    String customerId,
    String customerEmail,      // Denormalized
    String customerName,       // Denormalized
    List<OrderItem> items,
    BigDecimal subtotal,
    BigDecimal tax,
    BigDecimal total,
    Address shippingAddress,
    Address billingAddress,
    Instant occurredAt
) {}
Style Pros Cons
Notification Small events, always current data Requires service calls, coupling
State transfer Self-contained, no lookups Larger events, potentially stale data

Recommendation: Prefer event-carried state transfer for loose coupling.


Handling Reactions

Idempotent Handlers

@Component
public class IdempotentInventoryHandler {
    private final InventoryRepository inventoryRepo;
    private final ProcessedEventRepository processedRepo;

    @KafkaListener(topics = "order-events")
    @Transactional
    public void onOrderCreated(OrderCreated event) {
        // Check idempotency
        if (processedRepo.existsByEventId(event.eventId())) {
            return;
        }

        // Process event
        for (OrderItem item : event.items()) {
            inventoryRepo.reserve(item.productId(), item.quantity(), event.orderId());
        }

        // Mark as processed
        processedRepo.save(new ProcessedEvent(event.eventId(), Instant.now()));
    }
}

Reaction Events

Services publish their own events in response:

@Component
public class InventoryOrderHandler {
    private final InventoryService inventoryService;
    private final DomainEventPublisher eventPublisher;

    @KafkaListener(topics = "order-events")
    public void onOrderCreated(OrderCreated event) {
        try {
            inventoryService.reserveItems(event.orderId(), event.items());

            // Publish success event
            eventPublisher.publish(new InventoryReserved(
                UUID.randomUUID().toString(),
                event.orderId(),
                event.items(),
                Instant.now()
            ));

        } catch (InsufficientInventoryException e) {
            // Publish failure event
            eventPublisher.publish(new InventoryReservationFailed(
                UUID.randomUUID().toString(),
                event.orderId(),
                e.getProductId(),
                e.getMessage(),
                Instant.now()
            ));
        }
    }
}

Consumer Groups

Independent Processing

Each service has its own consumer group:

order-events topic:
├── inventory-service group   → reserves inventory
├── payment-service group     → processes payment
├── notification-service group → sends confirmation
└── analytics-service group   → tracks metrics

Consumer Group Configuration

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, DomainEvent> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "inventory-service");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultKafkaConsumerFactory<>(props);
    }
}

Error Handling

Dead Letter Topics

@Component
public class OrderEventHandler {
    private final KafkaTemplate<String, DomainEvent> kafka;

    @KafkaListener(topics = "order-events")
    public void onOrderEvent(ConsumerRecord<String, DomainEvent> record) {
        try {
            processEvent(record.value());
        } catch (RetryableException e) {
            throw e;  // Let retry mechanism handle
        } catch (Exception e) {
            // Send to dead letter topic
            kafka.send(
                record.topic() + ".dlq",
                record.key(),
                record.value()
            );
            log.error("Sent to DLQ: {}", record.value(), e);
        }
    }
}

Retry Configuration

@Bean
public ConcurrentKafkaListenerContainerFactory<String, DomainEvent>
        kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, DomainEvent> factory =
        new ConcurrentKafkaListenerContainerFactory<>();

    factory.setConsumerFactory(consumerFactory());

    // Retry configuration
    factory.setCommonErrorHandler(new DefaultErrorHandler(
        new DeadLetterPublishingRecoverer(kafkaTemplate),
        new FixedBackOff(1000L, 3)  // 3 retries, 1 second apart
    ));

    return factory;
}

Event Versioning

Schema Evolution

// Version 1
public record OrderCreatedV1(
    String orderId,
    String customerId,
    BigDecimal total
) {}

// Version 2 - added fields
public record OrderCreatedV2(
    String orderId,
    String customerId,
    BigDecimal total,
    String currency,           // New field
    List<OrderItem> items      // New field
) {}

Handling Multiple Versions

@Component
public class VersionAwareOrderHandler {

    @KafkaListener(topics = "order-events")
    public void onOrderEvent(ConsumerRecord<String, JsonNode> record) {
        JsonNode event = record.value();
        int version = event.path("version").asInt(1);

        switch (version) {
            case 1 -> handleV1(parseV1(event));
            case 2 -> handleV2(parseV2(event));
            default -> log.warn("Unknown version: {}", version);
        }
    }

    private void handleV1(OrderCreatedV1 event) {
        // Handle with defaults for missing fields
        handleOrder(event.orderId(), event.customerId(),
            event.total(), "USD", List.of());
    }

    private void handleV2(OrderCreatedV2 event) {
        handleOrder(event.orderId(), event.customerId(),
            event.total(), event.currency(), event.items());
    }
}

Observability

Correlation IDs

public record DomainEventHeaders {
    String eventId;
    String correlationId;
    String causationId;
    String source;
    Instant timestamp;
}

@Component
public class CorrelatedEventPublisher {

    public void publish(DomainEvent event, String correlationId, String causationId) {
        ProducerRecord<String, DomainEvent> record = new ProducerRecord<>(
            topicFor(event),
            event.getAggregateId(),
            event
        );

        record.headers()
            .add("correlation-id", correlationId.getBytes())
            .add("causation-id", causationId.getBytes())
            .add("source", "order-service".getBytes());

        kafka.send(record);
    }
}

@Component
public class CorrelatedEventHandler {
    private final DomainEventPublisher publisher;

    @KafkaListener(topics = "order-events")
    public void onOrderCreated(ConsumerRecord<String, OrderCreated> record) {
        String correlationId = header(record, "correlation-id");
        String causationId = record.value().eventId();  // This event caused next

        // Process and publish reaction with correlation
        InventoryReserved reaction = processAndReact(record.value());
        publisher.publish(reaction, correlationId, causationId);
    }
}

Distributed Tracing

@Component
public class TracedEventHandler {
    private final Tracer tracer;

    @KafkaListener(topics = "order-events")
    public void onOrderEvent(ConsumerRecord<String, DomainEvent> record) {
        // Extract trace context from headers
        SpanContext parentContext = tracer.extract(
            Format.Builtin.TEXT_MAP,
            new KafkaHeadersExtractor(record.headers())
        );

        // Create child span
        Span span = tracer.buildSpan("process-order-event")
            .asChildOf(parentContext)
            .withTag("event.type", record.value().getClass().getSimpleName())
            .withTag("event.id", record.value().getEventId())
            .start();

        try (Scope scope = tracer.scopeManager().activate(span)) {
            processEvent(record.value());
        } finally {
            span.finish();
        }
    }
}

Anti-Patterns

Avoid These Mistakes

Event chains as workflow
Long event chains are hard to understand and debug. Use saga orchestration for complex workflows.
Request/response over events
Don't use events for synchronous request/response. Use direct calls or dedicated request/reply topics.
Missing events
Consumers shouldn't assume all events arrive. Design for missing or out-of-order events.
Tight event coupling
Don't design events around consumer needs. Events should represent domain facts.
Ignoring idempotency
Events may be delivered multiple times. All handlers must be idempotent.

Best Practices

Event Design

Practice Description
Past tense naming Events describe what happened: OrderCreated, not CreateOrder
Self-contained Include all data consumers need
Versioned Include version for schema evolution
Immutable Events represent facts, never change them

Consumer Design

Practice Description
Independent groups Each service has its own consumer group
Idempotent handling Handle duplicates gracefully
Selective consumption Only process relevant event types
Error isolation One consumer failure shouldn't affect others

Operations

Practice Description
Correlation IDs Track event chains across services
Dead letter topics Capture failed events for analysis
Monitoring Track consumer lag and processing times
Replay capability Design for event replay scenarios