Kafka Node.js Driver¶
KafkaJS is a modern Apache Kafka client for Node.js, written entirely in JavaScript with no native dependencies.
Installation¶
npm install kafkajs
Or with Yarn:
yarn add kafkajs
Client Setup¶
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
With Authentication¶
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
ssl: true,
sasl: {
mechanism: 'plain',
username: 'user',
password: 'password'
}
});
Producer¶
Basic Producer¶
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const producer = kafka.producer();
async function produce() {
await producer.connect();
await producer.send({
topic: 'events',
messages: [
{ key: 'key1', value: 'value1' },
{ key: 'key2', value: 'value2' }
]
});
await producer.disconnect();
}
produce().catch(console.error);
Reliable Producer¶
const producer = kafka.producer({
idempotent: true,
maxInFlightRequests: 5,
transactionalId: 'my-transactional-id' // For transactions
});
await producer.connect();
await producer.send({
topic: 'events',
acks: -1, // all
messages: [{ value: 'message' }]
});
Batch Producer¶
async function produceBatch(messages) {
await producer.sendBatch({
topicMessages: [
{
topic: 'events',
messages: messages.map(m => ({ value: JSON.stringify(m) }))
}
]
});
}
Transactional Producer¶
const producer = kafka.producer({
transactionalId: 'my-transactional-id',
idempotent: true
});
await producer.connect();
const transaction = await producer.transaction();
try {
await transaction.send({
topic: 'topic1',
messages: [{ value: 'message1' }]
});
await transaction.send({
topic: 'topic2',
messages: [{ value: 'message2' }]
});
await transaction.commit();
} catch (e) {
await transaction.abort();
throw e;
}
Consumer¶
Basic Consumer¶
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({ groupId: 'my-group' });
async function consume() {
await consumer.connect();
await consumer.subscribe({ topic: 'events', fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
console.log({
partition,
offset: message.offset,
key: message.key?.toString(),
value: message.value.toString()
});
}
});
}
consume().catch(console.error);
Manual Commit¶
const consumer = kafka.consumer({
groupId: 'my-group',
autoCommit: false
});
await consumer.connect();
await consumer.subscribe({ topic: 'events' });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
await processMessage(message);
await consumer.commitOffsets([
{
topic,
partition,
offset: (parseInt(message.offset, 10) + 1).toString()
}
]);
}
});
Batch Processing¶
await consumer.run({
eachBatch: async ({ batch, resolveOffset, heartbeat, commitOffsetsIfNecessary }) => {
for (const message of batch.messages) {
await processMessage(message);
resolveOffset(message.offset);
await heartbeat();
}
await commitOffsetsIfNecessary();
}
});
Pause and Resume¶
// Pause consumption
consumer.pause([{ topic: 'events' }]);
// Resume consumption
consumer.resume([{ topic: 'events' }]);
Admin Client¶
Create Topic¶
const admin = kafka.admin();
await admin.connect();
await admin.createTopics({
topics: [
{
topic: 'my-topic',
numPartitions: 3,
replicationFactor: 3
}
]
});
await admin.disconnect();
List Topics¶
const admin = kafka.admin();
await admin.connect();
const topics = await admin.listTopics();
console.log('Topics:', topics);
const metadata = await admin.fetchTopicMetadata({ topics: ['my-topic'] });
console.log('Metadata:', metadata);
await admin.disconnect();
Delete Topic¶
await admin.deleteTopics({
topics: ['my-topic']
});
Configuration Reference¶
Producer Options¶
| Option | Description | Default |
|---|---|---|
acks |
Acknowledgments (-1, 0, 1) | -1 |
timeout |
Request timeout ms | 30000 |
idempotent |
Idempotent producer | false |
maxInFlightRequests |
Max in-flight | 5 |
transactionalId |
Transaction ID | null |
Consumer Options¶
| Option | Description | Default |
|---|---|---|
groupId |
Consumer group | Required |
sessionTimeout |
Session timeout ms | 30000 |
heartbeatInterval |
Heartbeat ms | 3000 |
maxBytesPerPartition |
Max bytes per partition | 1048576 |
autoCommit |
Auto commit | true |
autoCommitInterval |
Commit interval ms | 5000 |
Error Handling¶
const { Kafka, logLevel } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092'],
logLevel: logLevel.ERROR,
retry: {
initialRetryTime: 100,
retries: 8
}
});
const producer = kafka.producer();
producer.on('producer.disconnect', () => {
console.log('Producer disconnected');
});
try {
await producer.send({
topic: 'events',
messages: [{ value: 'message' }]
});
} catch (error) {
if (error.type === 'UNKNOWN_TOPIC_OR_PARTITION') {
console.error('Topic does not exist');
} else if (error.type === 'REQUEST_TIMED_OUT') {
console.error('Request timed out');
} else {
console.error('Error:', error);
}
}
Graceful Shutdown¶
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({ groupId: 'my-group' });
const shutdown = async () => {
console.log('Shutting down...');
await consumer.disconnect();
process.exit(0);
};
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
async function run() {
await consumer.connect();
await consumer.subscribe({ topic: 'events' });
await consumer.run({
eachMessage: async ({ message }) => {
console.log(message.value.toString());
}
});
}
run().catch(console.error);
TypeScript Support¶
import { Kafka, Producer, Consumer, Message } from 'kafkajs';
interface Event {
id: string;
timestamp: number;
data: string;
}
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const producer: Producer = kafka.producer();
async function produceEvent(event: Event): Promise<void> {
await producer.send({
topic: 'events',
messages: [{ value: JSON.stringify(event) }]
});
}
Compression¶
const { CompressionTypes } = require('kafkajs');
await producer.send({
topic: 'events',
compression: CompressionTypes.GZIP,
messages: [{ value: 'message' }]
});
Available compression types:
- CompressionTypes.None
- CompressionTypes.GZIP
- CompressionTypes.Snappy
- CompressionTypes.LZ4
- CompressionTypes.ZSTD
Related Documentation¶
- Drivers Overview - All client drivers
- Producer Guide - Producer patterns
- Consumer Guide - Consumer patterns
- Schema Registry - Schema management