MirrorMaker 2¶
MirrorMaker 2 (MM2) is Apache Kafka's tool for replicating data between Kafka clusters. Built on Kafka Connect, it provides topic replication, consumer offset synchronization, and automatic topic configuration mirroring.
What is MirrorMaker 2?¶
MirrorMaker 2 replaces the legacy MirrorMaker (MM1) with a redesigned architecture based on Kafka Connect. It addresses fundamental limitations of MM1:
| Limitation (MM1) | Solution (MM2) |
|---|---|
| No offset translation | Checkpoint-based offset synchronization |
| Topic configuration not synced | Automatic config mirroring |
| No ACL replication | ACL synchronization support |
| Manual topic creation | Automatic topic creation |
| Single cluster pair | Multiple cluster topologies |
| Difficult to monitor | Kafka Connect metrics and status |
MirrorMaker 2 Components¶
MM2 consists of three Kafka Connect connectors:
| Connector | Purpose |
|---|---|
| MirrorSourceConnector | Replicates topic data from source to target cluster |
| MirrorCheckpointConnector | Synchronizes consumer group offsets between clusters |
| MirrorHeartbeatConnector | Emits heartbeats for replication health monitoring |
Deployment Models¶
Dedicated MM2 Cluster¶
Run MM2 as a standalone Connect cluster dedicated to replication.
Advantages:
- Isolated from other Connect workloads
- Independent scaling
- Clear resource allocation
Embedded in Existing Connect Cluster¶
Deploy MM2 connectors alongside other connectors.
Advantages:
- Simpler infrastructure
- Shared monitoring
- Lower operational overhead
Disadvantages:
- Resource contention with other connectors
- Replication affected by other connector issues
Co-located with Target Cluster¶
Run MM2 workers on target cluster broker nodes.
Advantages:
- Reduced network hops for writes
- Simpler networking
Disadvantages:
- Competes for broker resources
- Not recommended for high-volume replication
Configuration Reference¶
Cluster Configuration¶
# Define clusters
clusters = source, target
# Source cluster connection
source.bootstrap.servers = source-broker-1:9092,source-broker-2:9092,source-broker-3:9092
# Target cluster connection
target.bootstrap.servers = target-broker-1:9092,target-broker-2:9092,target-broker-3:9092
# Security for source cluster (if required)
source.security.protocol = SASL_SSL
source.sasl.mechanism = SCRAM-SHA-512
source.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \
username="mm2-user" \
password="secret";
# Security for target cluster (if required)
target.security.protocol = SASL_SSL
target.sasl.mechanism = SCRAM-SHA-512
target.sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required \
username="mm2-user" \
password="secret";
Replication Flow Configuration¶
# Enable replication from source to target
source->target.enabled = true
# Topics to replicate (regex patterns)
source->target.topics = .*
# Topics to exclude from replication
source->target.topics.exclude = .*[\-\.]internal, .*\.replica, __.*
# Consumer groups to replicate offsets for
source->target.groups = .*
# Groups to exclude
source->target.groups.exclude = console-consumer-.*, connect-.*
Topic Configuration¶
# Replication factor for replicated topics
replication.factor = 3
# Replication factor for MM2 internal topics
checkpoints.topic.replication.factor = 3
heartbeats.topic.replication.factor = 3
offset-syncs.topic.replication.factor = 3
# Topic creation settings
refresh.topics.enabled = true
refresh.topics.interval.seconds = 60
# Sync topic configurations
sync.topic.configs.enabled = true
sync.topic.configs.interval.seconds = 60
# Sync topic ACLs (requires ACL access)
sync.topic.acls.enabled = false
Offset Synchronization¶
# Enable consumer group offset sync
sync.group.offsets.enabled = true
sync.group.offsets.interval.seconds = 60
# Emit checkpoints for offset translation
emit.checkpoints.enabled = true
emit.checkpoints.interval.seconds = 60
# Emit heartbeats
emit.heartbeats.enabled = true
emit.heartbeats.interval.seconds = 1
Performance Tuning¶
# Number of tasks (parallelism)
tasks.max = 10
# Producer settings for replication
producer.buffer.memory = 67108864
producer.batch.size = 524288
producer.linger.ms = 100
producer.compression.type = lz4
# Consumer settings
consumer.fetch.min.bytes = 1048576
consumer.fetch.max.wait.ms = 500
consumer.max.poll.records = 1000
# Offset sync frequency
offset.lag.max = 100
Naming Configuration¶
# Replication policy (controls topic naming)
replication.policy.class = org.apache.kafka.connect.mirror.DefaultReplicationPolicy
# Topic separator (default is ".")
replication.policy.separator = .
# Custom replication policy example:
# replication.policy.class = com.example.CustomReplicationPolicy
Topic Naming¶
Default Naming (DefaultReplicationPolicy)¶
Replicated topics are prefixed with the source cluster alias:
| Source Cluster | Source Topic | Target Topic |
|---|---|---|
east |
orders |
east.orders |
west |
events |
west.events |
prod |
user.activity |
prod.user.activity |
Identity Naming (IdentityReplicationPolicy)¶
Preserves original topic names (use with caution—requires careful loop prevention):
replication.policy.class = org.apache.kafka.connect.mirror.IdentityReplicationPolicy
| Source Topic | Target Topic |
|---|---|
orders |
orders |
Loop Prevention Required
IdentityReplicationPolicy requires explicit topic filtering to prevent replication loops in bidirectional setups.
Custom Replication Policy¶
Implement custom naming logic:
public class CustomReplicationPolicy implements ReplicationPolicy {
@Override
public String formatRemoteTopic(String sourceClusterAlias, String topic) {
// Custom naming: replicated-<source>-<topic>
return "replicated-" + sourceClusterAlias + "-" + topic;
}
@Override
public String topicSource(String topic) {
if (topic.startsWith("replicated-")) {
String[] parts = topic.split("-", 3);
return parts.length >= 2 ? parts[1] : null;
}
return null;
}
@Override
public String upstreamTopic(String topic) {
if (topic.startsWith("replicated-")) {
String[] parts = topic.split("-", 3);
return parts.length >= 3 ? parts[2] : null;
}
return null;
}
@Override
public boolean isInternalTopic(String topic) {
return topic.endsWith(".internal") || topic.startsWith("__");
}
}
Offset Translation¶
How Offset Translation Works¶
Source and target clusters have different offsets for the same logical data. MM2 maintains mappings via checkpoints.
Checkpoint Format¶
{
"consumer_group": "my-consumer-group",
"topic": "orders",
"partition": 0,
"upstream_offset": 1000,
"downstream_offset": 1000,
"metadata": ""
}
Translating Offsets During Failover¶
# View checkpoints
kafka-console-consumer.sh \
--bootstrap-server target:9092 \
--topic source.checkpoints.internal \
--from-beginning \
--property print.key=true \
--property key.separator=": "
# Use RemoteClusterUtils API for programmatic translation
// Programmatic offset translation
Map<TopicPartition, OffsetAndMetadata> translatedOffsets =
RemoteClusterUtils.translateOffsets(
targetProperties,
"source",
consumerGroupId,
Duration.ofSeconds(30)
);
// Reset consumer to translated offsets
consumer.commitSync(translatedOffsets);
Deployment Procedures¶
Initial Deployment¶
- Create MM2 configuration file
# mm2.properties
clusters = source, target
source.bootstrap.servers = source:9092
target.bootstrap.servers = target:9092
source->target.enabled = true
source->target.topics = .*
replication.factor = 3
- Start MirrorMaker 2
# Dedicated mode
connect-mirror-maker.sh mm2.properties
# Or as Connect connectors
curl -X POST -H "Content-Type: application/json" \
--data @mirror-source-connector.json \
http://connect:8083/connectors
- Verify replication
# Check connector status
curl http://connect:8083/connectors/MirrorSourceConnector/status
# Verify topics created on target
kafka-topics.sh --bootstrap-server target:9092 --list | grep "source\."
# Check replication lag
kafka-consumer-groups.sh --bootstrap-server target:9092 \
--group connect-MirrorSourceConnector \
--describe
Adding Topics to Replication¶
Topics matching the configured patterns are automatically replicated when created. To replicate additional existing topics:
-
Update topic pattern:
source->target.topics = orders.*,events.*,new-topic-pattern.* -
Restart or reconfigure MM2
-
Verify new topics appear on target
Removing Topics from Replication¶
-
Update exclusion pattern:
source->target.topics.exclude = .*internal,deprecated-topic -
Restart or reconfigure MM2
-
Optionally delete replicated topic from target:
kafka-topics.sh --bootstrap-server target:9092 \ --delete --topic source.deprecated-topic
Monitoring¶
Key Metrics¶
| Metric | Description | Alert Threshold |
|---|---|---|
kafka.connect.mirror.source.connector.record-count |
Records replicated | Sudden drops |
kafka.connect.mirror.source.connector.record-age-ms |
Age of last replicated record | > 60000 ms |
kafka.connect.mirror.source.connector.replication-latency-ms |
End-to-end replication latency | > 30000 ms |
kafka.connect.mirror.source.connector.byte-count |
Bytes replicated | Baseline deviation |
kafka.connect.mirror.checkpoint.connector.checkpoint-latency-ms |
Checkpoint sync delay | > 120000 ms |
Health Checks¶
# Connector status
curl -s http://connect:8083/connectors/MirrorSourceConnector/status | jq .
# Task status
curl -s http://connect:8083/connectors/MirrorSourceConnector/tasks/0/status | jq .
# Replication lag
kafka-consumer-groups.sh --bootstrap-server target:9092 \
--group connect-MirrorSourceConnector \
--describe
# Heartbeat verification
kafka-console-consumer.sh --bootstrap-server target:9092 \
--topic heartbeats \
--from-beginning \
--max-messages 5
Lag Monitoring Script¶
#!/bin/bash
# monitor-mm2-lag.sh
CONNECT_HOST="localhost:8083"
TARGET_BOOTSTRAP="target:9092"
# Check connector status
STATUS=$(curl -s "http://${CONNECT_HOST}/connectors/MirrorSourceConnector/status" | jq -r '.connector.state')
echo "Connector status: ${STATUS}"
# Check consumer group lag
kafka-consumer-groups.sh --bootstrap-server ${TARGET_BOOTSTRAP} \
--group connect-MirrorSourceConnector \
--describe 2>/dev/null | \
awk 'NR>1 {sum += $6} END {print "Total lag: " sum}'
Failover Procedures¶
Planned Failover¶
-
Stop producers to source cluster
-
Wait for replication to complete
# Check lag is zero kafka-consumer-groups.sh --bootstrap-server target:9092 \ --group connect-MirrorSourceConnector \ --describe -
Stop MirrorMaker 2
-
Translate consumer offsets
# For each consumer group kafka-consumer-groups.sh --bootstrap-server target:9092 \ --group my-app \ --reset-offsets \ --to-offset <translated-offset> \ --topic source.my-topic \ --execute -
Redirect applications to target cluster
-
Start consumers on target cluster
Unplanned Failover¶
-
Detect source cluster failure
-
Stop MirrorMaker 2 (prevent split-brain when source recovers)
-
Assess data loss
# Compare last checkpoint with consumer position kafka-console-consumer.sh --bootstrap-server target:9092 \ --topic source.checkpoints.internal \ --from-beginning | tail -n 100 -
Translate offsets using last known checkpoint
-
Accept potential data loss (records between last checkpoint and failure)
-
Redirect applications
-
Start consumers
Failback Procedure¶
-
Restore source cluster
-
Configure reverse replication (target -> source)
-
Replicate changes made during outage
-
Stop reverse replication
-
Redirect applications back to source
-
Resume normal replication (source -> target)
Troubleshooting¶
Common Issues¶
| Issue | Symptoms | Resolution |
|---|---|---|
| Connector not starting | FAILED state | Check logs, verify connectivity, check credentials |
| High replication lag | Increasing lag metric | Increase tasks.max, tune producer/consumer settings |
| Topics not replicating | Missing topics on target | Verify topic pattern matches, check ACLs |
| Offset sync failing | Checkpoint connector errors | Verify consumer groups exist, check permissions |
| Authentication failures | SASL errors in logs | Verify credentials, check JAAS config |
Diagnostic Commands¶
# View connector logs
docker logs kafka-connect 2>&1 | grep -i mirror
# Check source cluster connectivity
kafka-broker-api-versions.sh --bootstrap-server source:9092
# Verify topic exists on source
kafka-topics.sh --bootstrap-server source:9092 --describe --topic my-topic
# Check target cluster ACLs
kafka-acls.sh --bootstrap-server target:9092 --list
# Test producer to target
echo "test" | kafka-console-producer.sh --bootstrap-server target:9092 --topic test-topic
Performance Issues¶
# Check task distribution
curl -s http://connect:8083/connectors/MirrorSourceConnector/tasks | jq .
# Monitor network throughput
iftop -i eth0 -f "port 9092"
# Check producer metrics
curl -s http://connect:8083/connectors/MirrorSourceConnector/tasks/0/status | jq '.trace'
Configuration Examples¶
Active-Passive DR¶
clusters = primary, dr
primary.bootstrap.servers = primary-1:9092,primary-2:9092,primary-3:9092
dr.bootstrap.servers = dr-1:9092,dr-2:9092,dr-3:9092
primary->dr.enabled = true
primary->dr.topics = .*
primary->dr.topics.exclude = .*\.internal, __.*
primary->dr.groups = .*
sync.group.offsets.enabled = true
emit.checkpoints.enabled = true
replication.factor = 3
tasks.max = 10
Active-Active Bidirectional¶
Active-active replication requires careful provenance configuration to prevent infinite replication loops and enable consumers to distinguish data origins.
How Provenance Works¶
The DefaultReplicationPolicy automatically handles provenance by prefixing replicated topics with the source cluster alias:
Loop Prevention: MM2 never replicates prefixed topics. east.orders in the West cluster is not replicated back to East because it already has the east. prefix, indicating it originated from East.
Configuration¶
clusters = east, west
east.bootstrap.servers = east-1:9092,east-2:9092,east-3:9092
west.bootstrap.servers = west-1:9092,west-2:9092,west-3:9092
# East to West
east->west.enabled = true
east->west.topics = orders, events, users
# West to East
west->east.enabled = true
west->east.topics = orders, events, users
# Use DefaultReplicationPolicy (default) - adds cluster prefix
replication.policy.class = org.apache.kafka.connect.mirror.DefaultReplicationPolicy
# Both directions
sync.group.offsets.enabled = true
emit.checkpoints.enabled = true
emit.heartbeats.enabled = true
replication.factor = 3
tasks.max = 6
Topic Naming Results¶
| Cluster | Local Topic | Replicated From | Resulting Topic |
|---|---|---|---|
| East | orders |
- | orders |
| East | - | West | west.orders |
| West | orders |
- | orders |
| West | - | East | east.orders |
Consumer Configuration for Global View¶
Consumers that need to see all data from both clusters must subscribe to both local and replicated topics:
// Consumer in East cluster wanting all orders globally
consumer.subscribe(Arrays.asList(
"orders", // Local East orders
"west.orders" // Replicated West orders
));
// Process with origin awareness
for (ConsumerRecord<String, Order> record : records) {
String origin = record.topic().startsWith("west.") ? "west" : "east";
processOrder(record.value(), origin);
}
Conflict Avoidance Strategies¶
| Strategy | Description | Configuration |
|---|---|---|
| Topic prefixing | Each DC writes to local topic, reads both | Default behavior |
| Key partitioning | Route specific keys to owning DC | Application-level routing |
| Last-write-wins | Accept all writes, latest timestamp wins | Application-level merge |
Recommended Pattern
Use the default DefaultReplicationPolicy with topic prefixing. Have consumers subscribe to both local and prefixed topics. This provides clear provenance with no additional configuration.
Fan-Out (One to Many)¶
clusters = central, region-us, region-eu, region-apac
central.bootstrap.servers = central:9092
region-us.bootstrap.servers = us:9092
region-eu.bootstrap.servers = eu:9092
region-apac.bootstrap.servers = apac:9092
central->region-us.enabled = true
central->region-us.topics = global-.*
central->region-eu.enabled = true
central->region-eu.topics = global-.*
central->region-apac.enabled = true
central->region-apac.topics = global-.*
replication.factor = 3
Aggregation (Many to One)¶
clusters = region-us, region-eu, region-apac, central
region-us.bootstrap.servers = us:9092
region-eu.bootstrap.servers = eu:9092
region-apac.bootstrap.servers = apac:9092
central.bootstrap.servers = central:9092
region-us->central.enabled = true
region-us->central.topics = events
region-eu->central.enabled = true
region-eu->central.topics = events
region-apac->central.enabled = true
region-apac->central.topics = events
# Central sees: region-us.events, region-eu.events, region-apac.events
replication.factor = 3
Related Documentation¶
- Multi-Datacenter Concepts - Architecture patterns
- Kafka Connect - Connect framework
- Cluster Management - Operational procedures
- Monitoring - Metrics and alerting