NestJS (NodeJS) send message to Kafka topic easily in this quick trick:

Create a new NestJS test project:

nest new kafka-sender

Paste this code in main.ts:

import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';

async function bootstrap() {
  const { Kafka } = require('kafkajs')
  // Create the client with the broker list
  const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['10.5.11.16:9092']
  })
  
  const producer = kafka.producer()

  await producer.connect()
  await producer.send({
      topic: 'test-server', // Your Kafka topic name
      messages: [
          { key: 'key1', value: 'Hello World!' },
          { key: 'key2', value: '{"request":"alert","time":1624870785}' }
      ], // value = your message
  })
}
bootstrap();

Install package:

npm i --save @nestjs/microservices
npm i --save kafkajs
npm i

Make sure topic: 'test-server' was created on target Kafka server, and you are subscribing on that topic to watch the result.

Run script:

npm run start

Note:

  • Your script only runs if you put your project on a server that already has Kafka installed and running.
  • In your production projects this script should be included in a service

Other method: Write code in service

Define this to start when server start. Dont call kafka connect everytime. just call “send()”.

In other class:

    constructor(private kafkaClient: KafkaClientService) {
        this.kafkaClient = kafkaClient
    }
    publisher(deviceId:string, message:any){
        let messageString = JSON.stringify(message, null, '\t')
    }

Here is your Kafka service:

import { Injectable } from '@nestjs/common';

@Injectable()
export class KafkaClientService {
    clientId:string
    brokers:[string]
    topic:string
    kafka:any
    producer:any
    constructor(){
        this.topic = 'test-server'
        this.clientId= 'ims-tcp-gateway01'
        this.brokers= ['10.5.11.16:9092'] // multi brokers by array
        const { Kafka } = require('kafkajs')
        // Create the client with the broker list
        this.kafka = new Kafka({
            clientId: this.clientId,
            brokers: this.brokers
        })
        this.producer = this.kafka.producer()
        this.producer.connect()
    }
    async sendMessage(message:string|Object){      
        await this.producer.send({
            topic: this.topic,
            messages: [
                { key: 'key1', value: message }
            ],
        })
    }
}
, , ,

DMCA.com Protection Status


Leave a Reply

Your email address will not be published. Required fields are marked *