Skip to content

Kafka Node.js Driver

KafkaJS is a modern Apache Kafka client for Node.js, written entirely in JavaScript with no native dependencies.


Installation

npm install kafkajs

Or with Yarn:

yarn add kafkajs

Client Setup

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
});

With Authentication

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092'],
  ssl: true,
  sasl: {
    mechanism: 'plain',
    username: 'user',
    password: 'password'
  }
});

Producer

Basic Producer

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
});

const producer = kafka.producer();

async function produce() {
  await producer.connect();

  await producer.send({
    topic: 'events',
    messages: [
      { key: 'key1', value: 'value1' },
      { key: 'key2', value: 'value2' }
    ]
  });

  await producer.disconnect();
}

produce().catch(console.error);

Reliable Producer

const producer = kafka.producer({
  idempotent: true,
  maxInFlightRequests: 5,
  transactionalId: 'my-transactional-id'  // For transactions
});

await producer.connect();

await producer.send({
  topic: 'events',
  acks: -1,  // all
  messages: [{ value: 'message' }]
});

Batch Producer

async function produceBatch(messages) {
  await producer.sendBatch({
    topicMessages: [
      {
        topic: 'events',
        messages: messages.map(m => ({ value: JSON.stringify(m) }))
      }
    ]
  });
}

Transactional Producer

const producer = kafka.producer({
  transactionalId: 'my-transactional-id',
  idempotent: true
});

await producer.connect();

const transaction = await producer.transaction();

try {
  await transaction.send({
    topic: 'topic1',
    messages: [{ value: 'message1' }]
  });

  await transaction.send({
    topic: 'topic2',
    messages: [{ value: 'message2' }]
  });

  await transaction.commit();
} catch (e) {
  await transaction.abort();
  throw e;
}

Consumer

Basic Consumer

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
});

const consumer = kafka.consumer({ groupId: 'my-group' });

async function consume() {
  await consumer.connect();
  await consumer.subscribe({ topic: 'events', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        key: message.key?.toString(),
        value: message.value.toString()
      });
    }
  });
}

consume().catch(console.error);

Manual Commit

const consumer = kafka.consumer({
  groupId: 'my-group',
  autoCommit: false
});

await consumer.connect();
await consumer.subscribe({ topic: 'events' });

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    await processMessage(message);

    await consumer.commitOffsets([
      {
        topic,
        partition,
        offset: (parseInt(message.offset, 10) + 1).toString()
      }
    ]);
  }
});

Batch Processing

await consumer.run({
  eachBatch: async ({ batch, resolveOffset, heartbeat, commitOffsetsIfNecessary }) => {
    for (const message of batch.messages) {
      await processMessage(message);
      resolveOffset(message.offset);
      await heartbeat();
    }

    await commitOffsetsIfNecessary();
  }
});

Pause and Resume

// Pause consumption
consumer.pause([{ topic: 'events' }]);

// Resume consumption
consumer.resume([{ topic: 'events' }]);

Admin Client

Create Topic

const admin = kafka.admin();

await admin.connect();

await admin.createTopics({
  topics: [
    {
      topic: 'my-topic',
      numPartitions: 3,
      replicationFactor: 3
    }
  ]
});

await admin.disconnect();

List Topics

const admin = kafka.admin();
await admin.connect();

const topics = await admin.listTopics();
console.log('Topics:', topics);

const metadata = await admin.fetchTopicMetadata({ topics: ['my-topic'] });
console.log('Metadata:', metadata);

await admin.disconnect();

Delete Topic

await admin.deleteTopics({
  topics: ['my-topic']
});

Configuration Reference

Producer Options

Option Description Default
acks Acknowledgments (-1, 0, 1) -1
timeout Request timeout ms 30000
idempotent Idempotent producer false
maxInFlightRequests Max in-flight 5
transactionalId Transaction ID null

Consumer Options

Option Description Default
groupId Consumer group Required
sessionTimeout Session timeout ms 30000
heartbeatInterval Heartbeat ms 3000
maxBytesPerPartition Max bytes per partition 1048576
autoCommit Auto commit true
autoCommitInterval Commit interval ms 5000

Error Handling

const { Kafka, logLevel } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092'],
  logLevel: logLevel.ERROR,
  retry: {
    initialRetryTime: 100,
    retries: 8
  }
});

const producer = kafka.producer();

producer.on('producer.disconnect', () => {
  console.log('Producer disconnected');
});

try {
  await producer.send({
    topic: 'events',
    messages: [{ value: 'message' }]
  });
} catch (error) {
  if (error.type === 'UNKNOWN_TOPIC_OR_PARTITION') {
    console.error('Topic does not exist');
  } else if (error.type === 'REQUEST_TIMED_OUT') {
    console.error('Request timed out');
  } else {
    console.error('Error:', error);
  }
}

Graceful Shutdown

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
});

const consumer = kafka.consumer({ groupId: 'my-group' });

const shutdown = async () => {
  console.log('Shutting down...');
  await consumer.disconnect();
  process.exit(0);
};

process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);

async function run() {
  await consumer.connect();
  await consumer.subscribe({ topic: 'events' });

  await consumer.run({
    eachMessage: async ({ message }) => {
      console.log(message.value.toString());
    }
  });
}

run().catch(console.error);

TypeScript Support

import { Kafka, Producer, Consumer, Message } from 'kafkajs';

interface Event {
  id: string;
  timestamp: number;
  data: string;
}

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['localhost:9092']
});

const producer: Producer = kafka.producer();

async function produceEvent(event: Event): Promise<void> {
  await producer.send({
    topic: 'events',
    messages: [{ value: JSON.stringify(event) }]
  });
}

Compression

const { CompressionTypes } = require('kafkajs');

await producer.send({
  topic: 'events',
  compression: CompressionTypes.GZIP,
  messages: [{ value: 'message' }]
});

Available compression types: - CompressionTypes.None - CompressionTypes.GZIP - CompressionTypes.Snappy - CompressionTypes.LZ4 - CompressionTypes.ZSTD