Skip to content

Kafka Node.js Client

The KafkaJS library provides a pure JavaScript Kafka client with full protocol support. This guide covers installation, configuration, and usage patterns for Node.js applications.


Client Information

Library kafkajs
Repository github.com/tulios/kafkajs
Documentation kafka.js.org
Package npm
Current Version 2.2.x (as of 2025)
Maintainer Tulio Ornelas and community
License MIT
Base Pure JavaScript (no native dependencies)

History

KafkaJS was created by Tulio Ornelas in 2018 as a pure JavaScript implementation of the Kafka protocol. Unlike librdkafka-based clients, KafkaJS requires no native compilation, making it ideal for serverless environments (AWS Lambda, Google Cloud Functions), containerized deployments, and situations where native compilation is problematic. It gained rapid adoption due to its modern async/await API and zero-dependency installation. KafkaJS implements the Kafka wire protocol directly in JavaScript, supporting transactions, idempotent producers, and most Kafka features.

Alternative: node-rdkafka

For maximum performance at the cost of native dependencies:

Library node-rdkafka
Repository github.com/Blizzard/node-rdkafka
Documentation blizzard.github.io/node-rdkafka
Package npm
Current Version 3.6.x (as of 2025)
Maintainer Blizzard Entertainment
License MIT
Base librdkafka (C library)

node-rdkafka was developed by Blizzard Entertainment and first released in 2016. It provides native Node.js bindings to librdkafka, offering higher throughput than KafkaJS at the cost of requiring native compilation. Best suited for high-throughput applications running in environments where native builds are manageable.

Client Comparison

Feature KafkaJS node-rdkafka
Installation npm install Requires build tools
Performance Good Excellent
Serverless Excellent Difficult
Memory footprint Moderate Lower
Docker builds Simple Complex
Cooperative rebalancing No Yes

Version Compatibility

KafkaJS Version Minimum Kafka Broker Node.js
2.2.x 0.10.0+ 14+
2.1.x 0.10.0+ 12+
2.0.x 0.10.0+ 12+

External Resources


Installation

npm

npm install kafkajs

yarn

yarn add kafkajs

pnpm

pnpm add kafkajs

Alternative: node-rdkafka

For applications requiring maximum performance, node-rdkafka provides a librdkafka-based client:

npm install node-rdkafka

Note: node-rdkafka requires native dependencies and may have installation complexities. KafkaJS is recommended for most use cases.


Producer

Basic Producer

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['kafka:9092']
});

const producer = kafka.producer();

async function run() {
  await producer.connect();

  await producer.send({
    topic: 'orders',
    messages: [
      {
        key: 'order-123',
        value: JSON.stringify({ id: 123, amount: 99.99 })
      }
    ]
  });

  await producer.disconnect();
}

run().catch(console.error);

Production Producer Configuration

const kafka = new Kafka({
  clientId: 'order-service-producer',
  brokers: ['kafka-1:9092', 'kafka-2:9092', 'kafka-3:9092'],

  // Connection timeout
  connectionTimeout: 30000,
  requestTimeout: 30000,

  // Retry configuration
  retry: {
    initialRetryTime: 100,
    retries: 8,
    maxRetryTime: 30000,
    multiplier: 2,
    factor: 0.2
  }
});

const producer = kafka.producer({
  // Durability
  allowAutoTopicCreation: false,
  transactionTimeout: 30000,

  // Idempotence
  idempotent: true,
  maxInFlightRequests: 5,

  // Batching
  compression: CompressionTypes.LZ4,

  // Retries - handled at client level
  retry: {
    maxRetryTime: 30000,
    initialRetryTime: 300,
    retries: 2147483647
  }
});

await producer.connect();

Send with Headers

await producer.send({
  topic: 'orders',
  messages: [
    {
      key: 'order-123',
      value: JSON.stringify({ id: 123 }),
      headers: {
        'correlation-id': 'abc-123',
        'source': 'order-service'
      }
    }
  ]
});

Batch Send

await producer.sendBatch({
  topicMessages: [
    {
      topic: 'orders',
      messages: [
        { key: '1', value: JSON.stringify({ id: 1 }) },
        { key: '2', value: JSON.stringify({ id: 2 }) }
      ]
    },
    {
      topic: 'audit',
      messages: [
        { value: JSON.stringify({ event: 'order_created' }) }
      ]
    }
  ]
});

Custom Partitioner

const producer = kafka.producer({
  createPartitioner: () => {
    return ({ topic, partitionMetadata, message }) => {
      const numPartitions = partitionMetadata.length;

      // Custom logic: route by region prefix
      if (message.key) {
        const key = message.key.toString();
        if (key.startsWith('US-')) return 0;
        if (key.startsWith('EU-')) return 1;
      }

      // Default: hash-based partitioning
      const hash = murmur3(message.key || '');
      return Math.abs(hash) % numPartitions;
    };
  }
});

Producer with Error Handling

class ProducerService {
  constructor(kafka) {
    this.producer = kafka.producer();
    this.connected = false;
  }

  async connect() {
    if (!this.connected) {
      await this.producer.connect();
      this.connected = true;
    }
  }

  async send(topic, messages) {
    try {
      await this.connect();

      const result = await this.producer.send({
        topic,
        messages,
        acks: -1,
        timeout: 30000
      });

      return result;
    } catch (error) {
      console.error('Send failed:', error);

      if (error.name === 'KafkaJSNumberOfRetriesExceeded') {
        // All retries exhausted
        throw new Error('Failed to send after retries');
      }

      if (error.type === 'TOPIC_AUTHORIZATION_FAILED') {
        throw new Error('Not authorized to write to topic');
      }

      throw error;
    }
  }

  async disconnect() {
    if (this.connected) {
      await this.producer.disconnect();
      this.connected = false;
    }
  }
}

Consumer

Basic Consumer

const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: ['kafka:9092']
});

const consumer = kafka.consumer({ groupId: 'order-processors' });

async function run() {
  await consumer.connect();
  await consumer.subscribe({ topic: 'orders', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        topic,
        partition,
        offset: message.offset,
        key: message.key?.toString(),
        value: message.value?.toString()
      });
    }
  });
}

run().catch(console.error);

Production Consumer Configuration

const consumer = kafka.consumer({
  groupId: 'order-processors',

  // Session management
  sessionTimeout: 45000,
  heartbeatInterval: 15000,

  // Rebalancing
  rebalanceTimeout: 60000,
  partitionAssigners: [PartitionAssigners.cooperativeSticky],

  // Offset management
  allowAutoTopicCreation: false,

  // Fetch configuration
  minBytes: 1,
  maxBytes: 10485760,
  maxWaitTimeInMs: 5000,

  // Retry
  retry: {
    retries: 10,
    initialRetryTime: 300,
    maxRetryTime: 30000
  }
});

await consumer.connect();

Manual Offset Commit

await consumer.run({
  autoCommit: false,
  eachMessage: async ({ topic, partition, message }) => {
    await processMessage(message);

    // Commit after processing
    await consumer.commitOffsets([
      {
        topic,
        partition,
        offset: (parseInt(message.offset) + 1).toString()
      }
    ]);
  }
});

Batch Processing

await consumer.run({
  eachBatchAutoResolve: false,
  eachBatch: async ({
    batch,
    resolveOffset,
    heartbeat,
    isRunning,
    isStale
  }) => {
    const { topic, partition } = batch;

    for (const message of batch.messages) {
      if (!isRunning() || isStale()) break;

      await processMessage(message);

      // Resolve offset to commit
      resolveOffset(message.offset);

      // Send heartbeat to avoid rebalance
      await heartbeat();
    }
  }
});

Rebalance Listener

const consumer = kafka.consumer({
  groupId: 'order-processors'
});

consumer.on(consumer.events.GROUP_JOIN, ({ payload }) => {
  console.log('Joined group:', payload);
});

consumer.on(consumer.events.REBALANCING, ({ payload }) => {
  console.log('Rebalancing:', payload);
  // Prepare for partition reassignment
});

consumer.on(consumer.events.CONNECT, ({ payload }) => {
  console.log('Connected:', payload);
});

consumer.on(consumer.events.DISCONNECT, ({ payload }) => {
  console.log('Disconnected:', payload);
});

consumer.on(consumer.events.CRASH, ({ payload }) => {
  console.error('Consumer crashed:', payload.error);
  // Handle crash - may need to restart
});

Graceful Shutdown

const consumer = kafka.consumer({ groupId: 'order-processors' });

const signals = ['SIGTERM', 'SIGINT'];
const shutdown = async () => {
  console.log('Shutting down...');
  await consumer.disconnect();
  process.exit(0);
};

signals.forEach(signal => {
  process.on(signal, shutdown);
});

async function run() {
  await consumer.connect();
  await consumer.subscribe({ topic: 'orders' });

  await consumer.run({
    eachMessage: async ({ message }) => {
      await processMessage(message);
    }
  });
}

run().catch(async (error) => {
  console.error('Error:', error);
  await shutdown();
});

Seek and Pause/Resume

// Seek to specific offset
await consumer.seek({
  topic: 'orders',
  partition: 0,
  offset: '1000'
});

// Seek to beginning
await consumer.seek({
  topic: 'orders',
  partition: 0,
  offset: '-2'  // Kafka's OFFSET_BEGINNING
});

// Pause consumption
consumer.pause([
  { topic: 'orders', partitions: [0, 1] }
]);

// Resume consumption
consumer.resume([
  { topic: 'orders', partitions: [0, 1] }
]);

Admin Client

Create Topics

const kafka = new Kafka({
  clientId: 'admin',
  brokers: ['kafka:9092']
});

const admin = kafka.admin();

await admin.connect();

await admin.createTopics({
  topics: [
    {
      topic: 'orders',
      numPartitions: 6,
      replicationFactor: 3,
      configEntries: [
        { name: 'retention.ms', value: '604800000' },
        { name: 'cleanup.policy', value: 'delete' }
      ]
    }
  ],
  waitForLeaders: true,
  timeout: 30000
});

await admin.disconnect();

Describe Topics

const topics = await admin.fetchTopicMetadata({ topics: ['orders'] });

for (const topic of topics.topics) {
  console.log(`Topic: ${topic.name}`);
  for (const partition of topic.partitions) {
    console.log(`  Partition ${partition.partitionId}:`);
    console.log(`    Leader: ${partition.leader}`);
    console.log(`    Replicas: ${partition.replicas}`);
    console.log(`    ISR: ${partition.isr}`);
  }
}

List Topics

const topics = await admin.listTopics();
console.log('Topics:', topics);

Delete Topics

await admin.deleteTopics({
  topics: ['old-topic'],
  timeout: 30000
});

List Consumer Groups

const groups = await admin.listGroups();

for (const group of groups.groups) {
  console.log(`Group: ${group.groupId} (protocol: ${group.protocolType})`);
}

Describe Consumer Group

const descriptions = await admin.describeGroups(['order-processors']);

for (const group of descriptions.groups) {
  console.log(`Group: ${group.groupId}`);
  console.log(`State: ${group.state}`);
  console.log(`Protocol: ${group.protocol}`);

  for (const member of group.members) {
    console.log(`  Member: ${member.memberId}`);
    console.log(`    Client: ${member.clientId}`);
  }
}

Reset Consumer Group Offsets

await admin.resetOffsets({
  groupId: 'order-processors',
  topic: 'orders',
  earliest: true  // or specify offset: '1000'
});

Transactions

Transactional Producer

const producer = kafka.producer({
  transactionalId: 'order-processor-txn',
  maxInFlightRequests: 1,
  idempotent: true
});

await producer.connect();

const transaction = await producer.transaction();

try {
  await transaction.send({
    topic: 'orders',
    messages: [{ key: '1', value: 'order data' }]
  });

  await transaction.send({
    topic: 'audit',
    messages: [{ value: 'order created' }]
  });

  await transaction.commit();
} catch (error) {
  await transaction.abort();
  throw error;
}

Consume-Transform-Produce

const consumer = kafka.consumer({
  groupId: 'processor',
  isolation: 'read_committed'
});

const producer = kafka.producer({
  transactionalId: 'processor-txn',
  idempotent: true
});

await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: 'input' });

await consumer.run({
  eachBatch: async ({
    batch,
    resolveOffset,
    heartbeat,
    commitOffsetsIfNecessary
  }) => {
    const transaction = await producer.transaction();

    try {
      for (const message of batch.messages) {
        const transformed = transform(message.value.toString());

        await transaction.send({
          topic: 'output',
          messages: [{ key: message.key, value: transformed }]
        });

        resolveOffset(message.offset);
        await heartbeat();
      }

      // Commit consumer offsets in transaction
      await transaction.sendOffsets({
        consumerGroupId: 'processor',
        topics: [
          {
            topic: batch.topic,
            partitions: [
              {
                partition: batch.partition,
                offset: (parseInt(batch.lastOffset()) + 1).toString()
              }
            ]
          }
        ]
      });

      await transaction.commit();
    } catch (error) {
      await transaction.abort();
      throw error;
    }
  }
});

Error Handling

Producer Error Handling

try {
  await producer.send({
    topic: 'orders',
    messages: [{ key: 'key', value: 'value' }]
  });
} catch (error) {
  if (error.name === 'KafkaJSProtocolError') {
    // Protocol-level error
    if (error.type === 'TOPIC_AUTHORIZATION_FAILED') {
      console.error('Not authorized to write to topic');
    } else if (error.type === 'MESSAGE_TOO_LARGE') {
      console.error('Message exceeds max size');
      sendToDeadLetterQueue(message);
    }
  } else if (error.name === 'KafkaJSNumberOfRetriesExceeded') {
    console.error('All retries exhausted');
  } else if (error.name === 'KafkaJSConnectionError') {
    console.error('Connection error:', error.message);
  } else {
    console.error('Unknown error:', error);
  }
}

Consumer Error Handling

await consumer.run({
  eachMessage: async ({ topic, partition, message }) => {
    try {
      await processMessage(message);
    } catch (error) {
      console.error('Processing error:', error);

      // Send to dead letter queue
      await deadLetterProducer.send({
        topic: 'orders-dlq',
        messages: [
          {
            key: message.key,
            value: message.value,
            headers: {
              ...message.headers,
              'original-topic': topic,
              'original-partition': partition.toString(),
              'original-offset': message.offset,
              'error': error.message
            }
          }
        ]
      });
    }
  }
});

// Consumer-level error handling
consumer.on(consumer.events.CRASH, async ({ payload: { error } }) => {
  console.error('Consumer crashed:', error);

  // Restart consumer
  await consumer.disconnect();
  await consumer.connect();
  await consumer.subscribe({ topic: 'orders' });
});

Retry with Exponential Backoff

async function sendWithRetry(producer, message, maxRetries = 3) {
  let lastError;

  for (let attempt = 0; attempt < maxRetries; attempt++) {
    try {
      return await producer.send(message);
    } catch (error) {
      lastError = error;

      if (error.type === 'TOPIC_AUTHORIZATION_FAILED') {
        // Don't retry authorization errors
        throw error;
      }

      const backoff = Math.min(1000 * Math.pow(2, attempt), 30000);
      await new Promise(resolve => setTimeout(resolve, backoff));
    }
  }

  throw lastError;
}

Testing

Mock Producer

class MockProducer {
  constructor() {
    this.messages = [];
    this.connected = false;
  }

  async connect() {
    this.connected = true;
  }

  async send({ topic, messages }) {
    if (!this.connected) {
      throw new Error('Not connected');
    }

    for (const message of messages) {
      this.messages.push({ topic, ...message });
    }

    return messages.map((_, index) => ({
      topicName: topic,
      partition: 0,
      offset: String(index)
    }));
  }

  async disconnect() {
    this.connected = false;
  }

  getMessages() {
    return this.messages;
  }

  clear() {
    this.messages = [];
  }
}

// Usage in tests
describe('OrderService', () => {
  it('should send order to Kafka', async () => {
    const mockProducer = new MockProducer();
    const service = new OrderService(mockProducer);

    await mockProducer.connect();
    await service.createOrder({ id: 123, amount: 99.99 });

    const messages = mockProducer.getMessages();
    expect(messages).toHaveLength(1);
    expect(messages[0].topic).toBe('orders');
    expect(JSON.parse(messages[0].value)).toEqual({ id: 123, amount: 99.99 });
  });
});

Integration Testing with Testcontainers

const { Kafka } = require('kafkajs');
const { GenericContainer } = require('testcontainers');

describe('Kafka Integration', () => {
  let container;
  let kafka;

  beforeAll(async () => {
    container = await new GenericContainer('confluentinc/cp-kafka')
      .withExposedPorts(9093)
      .withEnvironment({
        'KAFKA_BROKER_ID': '1',
        'KAFKA_LISTENER_SECURITY_PROTOCOL_MAP': 'PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT',
        'KAFKA_ADVERTISED_LISTENERS': 'PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9093',
        'KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR': '1'
      })
      .start();

    const host = container.getHost();
    const port = container.getMappedPort(9093);

    kafka = new Kafka({
      clientId: 'test',
      brokers: [`${host}:${port}`]
    });
  }, 60000);

  afterAll(async () => {
    await container.stop();
  });

  it('should produce and consume messages', async () => {
    const producer = kafka.producer();
    const consumer = kafka.consumer({ groupId: 'test' });

    await producer.connect();
    await consumer.connect();

    await consumer.subscribe({ topic: 'test', fromBeginning: true });

    const messages = [];
    await consumer.run({
      eachMessage: async ({ message }) => {
        messages.push(message.value.toString());
      }
    });

    await producer.send({
      topic: 'test',
      messages: [{ value: 'test message' }]
    });

    // Wait for message
    await new Promise(resolve => setTimeout(resolve, 1000));

    expect(messages).toContain('test message');

    await producer.disconnect();
    await consumer.disconnect();
  });
});

Unit Testing with Jest

jest.mock('kafkajs');

const { Kafka } = require('kafkajs');
const OrderService = require('./order-service');

describe('OrderService', () => {
  let mockProducer;
  let mockSend;

  beforeEach(() => {
    mockSend = jest.fn().mockResolvedValue([
      { topicName: 'orders', partition: 0, offset: '0' }
    ]);

    mockProducer = {
      connect: jest.fn().mockResolvedValue(undefined),
      disconnect: jest.fn().mockResolvedValue(undefined),
      send: mockSend
    };

    Kafka.mockImplementation(() => ({
      producer: () => mockProducer
    }));
  });

  it('should send order message', async () => {
    const service = new OrderService();
    await service.connect();

    await service.createOrder({ id: 123, amount: 99.99 });

    expect(mockSend).toHaveBeenCalledWith({
      topic: 'orders',
      messages: [
        expect.objectContaining({
          key: '123',
          value: expect.any(String)
        })
      ]
    });
  });
});