NestJS (NodeJS) send message to Kafka topic easily in this quick trick:
Create a new NestJS test project:
nest new kafka-sender
Paste this code in main.ts:
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const { Kafka } = require('kafkajs')
// Create the client with the broker list
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['10.5.11.16:9092']
})
const producer = kafka.producer()
await producer.connect()
await producer.send({
topic: 'test-server', // Your Kafka topic name
messages: [
{ key: 'key1', value: 'Hello World!' },
{ key: 'key2', value: '{"request":"alert","time":1624870785}' }
], // value = your message
})
}
bootstrap();
Install package:
npm i --save @nestjs/microservices
npm i --save kafkajs
npm i
Make sure topic: 'test-server'
was created on target Kafka server, and you are subscribing on that topic to watch the result.
Run script:
npm run start
Note:
- Your script only runs if you put your project on a server that already has Kafka installed and running.
- In your production projects this script should be included in a service
Other method: Write code in service
Define this to start when server start. Dont call kafka connect everytime. just call “send()”.
In other class:
constructor(private kafkaClient: KafkaClientService) {
this.kafkaClient = kafkaClient
}
publisher(deviceId:string, message:any){
let messageString = JSON.stringify(message, null, '\t')
}
Here is your Kafka service:
import { Injectable } from '@nestjs/common';
@Injectable()
export class KafkaClientService {
clientId:string
brokers:[string]
topic:string
kafka:any
producer:any
constructor(){
this.topic = 'test-server'
this.clientId= 'ims-tcp-gateway01'
this.brokers= ['10.5.11.16:9092'] // multi brokers by array
const { Kafka } = require('kafkajs')
// Create the client with the broker list
this.kafka = new Kafka({
clientId: this.clientId,
brokers: this.brokers
})
this.producer = this.kafka.producer()
this.producer.connect()
}
async sendMessage(message:string|Object){
await this.producer.send({
topic: this.topic,
messages: [
{ key: 'key1', value: message }
],
})
}
}