HTTP Source Connector¶
Poll REST APIs and stream responses to Kafka topics for integration with external services.
Overview¶
The HTTP Source Connector polls HTTP endpoints and writes responses to Kafka, supporting:
- Scheduled polling at configurable intervals
- Multiple authentication methods
- Pagination handling
- Response parsing and transformation
Installation¶
# Confluent Hub
confluent-hub install confluentinc/kafka-connect-http:latest
# Manual installation
curl -O https://packages.confluent.io/archive/7.5/kafka-connect-http-7.5.0.zip
unzip kafka-connect-http-7.5.0.zip -d /usr/share/kafka/plugins/
Basic Configuration¶
{
"name": "http-source",
"config": {
"connector.class": "io.confluent.connect.http.HttpSourceConnector",
"tasks.max": "1",
"http.url": "https://api.example.com/events",
"http.method": "GET",
"kafka.topic": "api-events",
"http.timer.interval.ms": "60000"
}
}
HTTP Settings¶
Request Configuration¶
{
"http.url": "https://api.example.com/data",
"http.method": "GET",
"http.headers": "Accept: application/json, X-API-Version: v2"
}
| Property | Description | Default |
|---|---|---|
http.url |
Endpoint URL | Required |
http.method |
HTTP method | GET |
http.headers |
Request headers | None |
http.request.body |
Request body (POST/PUT) | None |
POST Request¶
{
"http.url": "https://api.example.com/query",
"http.method": "POST",
"http.headers": "Content-Type: application/json",
"http.request.body": "{\"query\": \"SELECT * FROM events\"}"
}
Authentication¶
API Key¶
{
"http.headers": "Authorization: Bearer ${secrets:api/token}"
}
Basic Authentication¶
{
"http.auth.type": "BASIC",
"http.auth.user": "${secrets:api/username}",
"http.auth.password": "${secrets:api/password}"
}
OAuth 2.0¶
{
"http.auth.type": "OAUTH2",
"http.oauth2.token.url": "https://auth.example.com/oauth/token",
"http.oauth2.client.id": "${secrets:oauth/client-id}",
"http.oauth2.client.secret": "${secrets:oauth/client-secret}",
"http.oauth2.scope": "read:events"
}
Polling Configuration¶
Fixed Interval¶
{
"http.timer.interval.ms": "60000"
}
Poll every 60 seconds.
Cron Schedule¶
{
"http.timer.type": "cron",
"http.timer.cron.expression": "0 */5 * * * ?"
}
Poll every 5 minutes.
Offset Tracking¶
Track last processed item for incremental polling:
{
"http.offset.mode": "TIMESTAMP",
"http.offset.field": "updated_at",
"http.initial.offset": "2024-01-01T00:00:00Z"
}
Response Handling¶
JSON Response¶
{
"http.response.format": "JSON",
"http.response.records.path": "$.data[*]"
}
Extract records from JSON array at $.data.
Array Response¶
{
"http.response.format": "JSON",
"http.response.records.path": "$[*]"
}
Process top-level array elements.
Single Record¶
{
"http.response.format": "JSON",
"http.response.records.path": "$"
}
Treat entire response as single record.
Pagination¶
Offset-Based¶
{
"http.pagination.type": "OFFSET",
"http.pagination.offset.param": "offset",
"http.pagination.limit.param": "limit",
"http.pagination.limit.value": "100"
}
URL: https://api.example.com/events?offset=0&limit=100
Cursor-Based¶
{
"http.pagination.type": "CURSOR",
"http.pagination.cursor.param": "cursor",
"http.pagination.cursor.path": "$.next_cursor"
}
Extract cursor from response for next request.
Link Header¶
{
"http.pagination.type": "LINK_HEADER"
}
Follow Link header for pagination (GitHub API style).
Error Handling¶
Retry Configuration¶
{
"http.retry.max.attempts": "5",
"http.retry.backoff.ms": "1000",
"http.retry.backoff.max.ms": "60000"
}
Error Responses¶
{
"http.response.error.codes": "400,401,403,404,500,502,503",
"behavior.on.error": "fail"
}
| Behavior | Description |
|---|---|
fail |
Stop connector |
log |
Log and continue |
ignore |
Silently skip |
SSL/TLS¶
{
"http.ssl.enabled": "true",
"http.ssl.truststore.location": "/path/to/truststore.jks",
"http.ssl.truststore.password": "${secrets:ssl/truststore-password}",
"http.ssl.keystore.location": "/path/to/keystore.jks",
"http.ssl.keystore.password": "${secrets:ssl/keystore-password}"
}
Complete Example¶
{
"name": "github-events-source",
"config": {
"connector.class": "io.confluent.connect.http.HttpSourceConnector",
"tasks.max": "1",
"http.url": "https://api.github.com/repos/apache/kafka/events",
"http.method": "GET",
"http.headers": "Accept: application/vnd.github+json, Authorization: Bearer ${secrets:github/token}",
"kafka.topic": "github-events",
"http.timer.interval.ms": "300000",
"http.response.format": "JSON",
"http.response.records.path": "$[*]",
"http.pagination.type": "LINK_HEADER",
"http.retry.max.attempts": "3",
"http.retry.backoff.ms": "5000",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"errors.tolerance": "all",
"errors.log.enable": "true"
}
}
Transforms¶
Add Metadata¶
{
"transforms": "addSource,addTimestamp",
"transforms.addSource.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addSource.static.field": "source",
"transforms.addSource.static.value": "github-api",
"transforms.addTimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.addTimestamp.timestamp.field": "ingested_at"
}
Extract Key¶
{
"transforms": "extractKey",
"transforms.extractKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.extractKey.fields": "id"
}
Rate Limiting¶
Throttling¶
{
"http.timer.interval.ms": "60000",
"http.throttle.max.requests": "100",
"http.throttle.window.ms": "60000"
}
Maximum 100 requests per minute.
Respect Rate Limit Headers¶
{
"http.rate.limit.header": "X-RateLimit-Remaining",
"http.rate.limit.reset.header": "X-RateLimit-Reset"
}
Monitoring¶
Connector Status¶
curl http://connect:8083/connectors/http-source/status
Metrics¶
| Metric | Description |
|---|---|
http-source-requests-total |
Total HTTP requests |
http-source-errors-total |
Failed requests |
http-source-records-polled |
Records retrieved |
Troubleshooting¶
| Issue | Cause | Solution |
|---|---|---|
| Connection timeout | Network/firewall | Verify endpoint accessibility |
| 401 Unauthorized | Invalid credentials | Check authentication config |
| 429 Too Many Requests | Rate limited | Increase polling interval |
| Empty response | Wrong JSON path | Verify records.path |
| Duplicate records | No offset tracking | Enable offset mode |
Related Documentation¶
- Connectors Overview - All connectors
- Kafka Connect - Connect framework
- Transforms - Single Message Transforms