Single Message Transforms (SMTs) modify records as they flow through Connect without custom code.
Overview

Field Manipulation
| Transform |
Description |
InsertField |
Add field with static or metadata value |
ReplaceField |
Include, exclude, or rename fields |
MaskField |
Replace field value with null |
ValueToKey |
Copy fields from value to key |
ExtractField |
Extract single field from struct |
Flatten |
Flatten nested structures |
Cast |
Cast field to different type |
Routing
| Transform |
Description |
RegexRouter |
Route to topic based on regex |
TimestampRouter |
Route based on timestamp |
SetSchemaMetadata |
Set schema name and version |
| Transform |
Description |
HeaderFrom |
Copy field to header |
InsertHeader |
Add static header |
DropHeaders |
Remove headers |
Filtering
| Transform |
Description |
Filter |
Drop records matching predicate |
Configuration
Basic Structure
{
"name": "my-connector",
"config": {
"connector.class": "...",
"transforms": "transform1,transform2",
"transforms.transform1.type": "...",
"transforms.transform1.field": "...",
"transforms.transform2.type": "...",
"transforms.transform2.regex": "..."
}
}
Key vs Value
Apply to key with $Key suffix, value with $Value:
{
"transforms": "addTimestamp",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "processed_at"
}
Common Examples
Add Timestamp
{
"transforms": "addTimestamp",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "ingested_at"
}
Route by Pattern
{
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "(.*)_raw",
"transforms.route.replacement": "$1_processed"
}
Drop Fields
{
"transforms": "dropFields",
"transforms.dropFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropFields.exclude": "password,ssn,credit_card"
}
Flatten Nested JSON
{
"transforms": "flatten",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": "_"
}
Before: {"user": {"name": "Alice", "age": 30}}
After: {"user_name": "Alice", "user_age": 30}
Cast Types
{
"transforms": "cast",
"transforms.cast.type": "org.apache.kafka.connect.transforms.Cast$Value",
"transforms.cast.spec": "price:float64,quantity:int32"
}
{
"name": "events-sink",
"config": {
"connector.class": "...",
"transforms": "addTimestamp,dropSensitive,route",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "processed_at",
"transforms.dropSensitive.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropSensitive.exclude": "password,token",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "events_(.*)",
"transforms.route.replacement": "processed_$1"
}
}
Predicates (Kafka 2.6+)
Apply transforms conditionally:
{
"transforms": "insertSource",
"transforms.insertSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertSource.static.field": "source",
"transforms.insertSource.static.value": "kafka",
"transforms.insertSource.predicate": "isOrder",
"transforms.insertSource.negate": "false",
"predicates": "isOrder",
"predicates.isOrder.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
"predicates.isOrder.pattern": "orders-.*"
}
Limitations
| Limitation |
Alternative |
| Complex logic |
Custom SMT or Kafka Streams |
| Stateful transforms |
Kafka Streams |
| Joins |
Kafka Streams |
| Aggregations |
Kafka Streams |