Skip to main content

Command Palette

Search for a command to run...

nestjs with kafkajs Producer and Consumer

Updated
4 min read
nestjs with kafkajs Producer and Consumer
T

I'm a full-stack software developer creating open-source projects and writing about modern JavaScript client-side and server-side. Working remotely from India.

NestJS with Kafkajs: A Powerful Combination for Building Scalable Applications

https://github.com/tkssharma/nestjs-kafka-monorepo

Introduction

Kafka is a distributed streaming platform that is widely used for real-time data processing and messaging. NestJS, on the other hand, is a progressive Node.js framework for building scalable and efficient applications. Combining Kafka with NestJS can create a powerful and scalable solution for various use cases.

Integrating Kafka with NestJS

To integrate Kafka with NestJS, we can use the @nestjs/microservices package. This package provides a convenient way to create microservices that communicate using different protocols, including Kafka.

Creating a Kafka Producer

To create a Kafka producer in NestJS, we can use the @nestjs/microservices module and the KafkaOptions interface. Here's an example:

import { KafkaOptions, Transport } from '@nestjs/microservices';

@Injectable()
export class KafkaProducerService {
  private readonly kafkaClient: ClientKafka;

  constructor() {
    this.kafkaClient = new ClientKafka({
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092'],
        },
      },
    });
  }

  async send(message: any) {
    await this.kafkaClient.emit('topic-name', message);
  }
}

Creating a Kafka Consumer

To create a Kafka consumer in NestJS, we can use the @nestjs/microservices module and the KafkaOptions interface. Here's an example:

import { KafkaOptions, Transport } from '@nestjs/microservices';

@Injectable()
export class KafkaConsumerService {
  private readonly kafkaClient: ClientKafka;

  constructor() {
    this.kafkaClient = new ClientKafka({
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092'],
        },
      },
    });
  }

  @EventPattern('topic-name')
  async handleEvent(message: any) {
    // Process the message
  }
}

Using Kafka Producer and Consumer

Once you have created the producer and consumer, you can use them to send and receive messages. Here's an example:

@Controller('kafka')
export class KafkaController {
  constructor(private readonly kafkaProducerService: KafkaProducerService) {}

  @Post()
  async sendMessage(@Body() message: any) {
    await this.kafkaProducerService.send(message);
    return { message: 'Message sent successfully' };
  }
}

Integrate nestjs with simple kafkajs library

import { Module } from '@nestjs/common';
import { KafkaService } from './kafka.service';
import { AppConfigModule } from '@app/config';
import { KafkaConsumerService } from './kafka.consumer.service';
import { KafkaProducerService } from './kafka.producer.service';

@Module({
  imports: [AppConfigModule],
  providers: [KafkaConsumerService, KafkaProducerService],
  exports: [KafkaConsumerService, KafkaProducerService],
})
export class KafkaModule { }
import { AppConfigService } from '@app/config';
import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { ConsumerConfig, ConsumerSubscribeTopic, KafkaMessage } from 'kafkajs';
import { KafkaConsumer } from './consumer.service';

interface KafkajsConsumerOptions {
  topic: ConsumerSubscribeTopic;
  config: ConsumerConfig;
  onMessage: (message: KafkaMessage) => Promise<void>;
}

export interface IConsumer {
  connect: () => Promise<void>;
  disconnect: () => Promise<void>;
  consume: (message: any) => Promise<void>;
}


@Injectable()
export class KafkaConsumerService implements OnApplicationShutdown {
  private readonly consumers: IConsumer[] = [];

  constructor(
    private readonly configService: AppConfigService,
  ) { }

  async consume({ topic, config, onMessage }: KafkajsConsumerOptions) {
    const consumer = new KafkaConsumer(
      topic,
      config,
      this.configService.kafka.broker
    );
    await consumer.connect();
    await consumer.consume(onMessage);
    this.consumers.push(consumer);
  }

  async onApplicationShutdown() {
    for (const consumer of this.consumers) {
      await consumer.disconnect();
    }
  }
}

Consumer service

import { Consumer, ConsumerConfig, ConsumerSubscribeTopic, Kafka, KafkaMessage, Producer } from "kafkajs";
import { IProducer } from "./kafka.producer.service";
import { Logger } from "@nestjs/common";
import { IConsumer } from "./kafka.consumer.service";
import * as retry from 'async-retry';

export const sleep = (timeout: number) => {
  return new Promise<void>((resolve) => setTimeout(resolve, timeout));
};

export class KafkaConsumer implements IConsumer {
  private readonly kafka: Kafka;
  private readonly consumer: Consumer;
  private readonly logger: Logger;

  constructor(
    private readonly topic: ConsumerSubscribeTopic,
    config: ConsumerConfig,
    broker: string) {
    this.kafka = new Kafka({
      brokers: [broker]
    })
    this.consumer = this.kafka.consumer(config);
    this.logger = new Logger(`${topic}-${config.groupId}`)
  }
  async consume(onMessage: (message: KafkaMessage) => Promise<void>) {
    await this.consumer.subscribe(this.topic)
    await this.consumer.run({
      eachMessage: async ({ message, partition }) => {
        this.logger.debug(`Processing message partition: ${partition}`);
        try {
          await retry(async () => onMessage(message), {
            retries: 3,
            onRetry: (error, attempt) =>
              this.logger.error(
                `Error consuming message, executing retry ${attempt}/3...`,
                error,
              ),
          });
        } catch (err) {
          // handle failure of message 
          // write then to DB table or log them 
          // better write to DATABASE 
        }
      },
    })
  }

  async connect() {
    try {
      await this.consumer.connect();
    } catch (err) {
      this.logger.error('Failed to connect to Kafka. trying again ...', err);
      await sleep(5000);
      await this.connect();
    }

  }
  async disconnect() {
    this.consumer.disconnect()
  }
}

Conclusion

By integrating Kafka with NestJS, you can build scalable and distributed applications that can handle high volumes of data and real-time processing. The @nestjs/microservices module provides a convenient way to interact with Kafka, making it easy to incorporate into your NestJS projects.

More from this blog

C

Code with tkssharma || blogs for developers

349 posts

I’m Tarun, I am Publisher, Trainer Developer, working on Enterprise and open source Technologies JavaScript frameworks