After Install Kafka

Download Kafka MQTT Connector

Besure that java and maven were installed.

java -version
mvn -v

Download:

su -l kafka
cd /home/kafka
git clone https://github.com/johanvandevenne/kafka-connect-mqtt.git
cd kafka-connect-mqtt
mvn clean install
cp /home/kafka/kafka-connect-mqtt/target/kafka-connect-mqtt-1.1.0-package/kafka-connect-mqtt/* /home/kafka/kafka/libs
cd ../kafka

If not working, download from this source: https://drive.google.com/file/d/10Gw-NGxdz48eg7LpJ1klIpVjv2XxlolB/view

Note: Kafka Connector default port is TCP 8083. It’s conflict Mosquitto default port. We need to change config of Mosquitto (or your broker). Example, I change websocket port from 8083 to 9093:

listener 1883 127.0.0.1
listener 8883
certfile /etc/mosquitto/ssl/cert.pem
cafile /etc/mosquitto/ssl/fullchain.pem
keyfile /etc/mosquitto/ssl/privkey.pem
listener 9093
protocol websockets
certfile /etc/mosquitto/ssl/cert.pem
cafile /etc/mosquitto/ssl/fullchain.pem
keyfile /etc/mosquitto/ssl/privkey.pem

Start Kafka Connector service

bin/connect-distributed.sh config/connect-distributed.properties

Besure it working:

curl http://127.0.0.1:8083/connector-plugins

Is will show at least or more modules:

[
  {
    "class": "be.jovacon.kafka.connect.MQTTSinkConnector",
    "type": "sink",
    "version": "1.1.0"
  },
  {
    "class": "be.jovacon.kafka.connect.MQTTSourceConnector",
    "type": "source",
    "version": "1.1.0"
  }
]

Add source connectors (listen MQTT topic then send to Kafka topic):

curl -X POST \
  http://<kafkaconnect>:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "mqtt-source-connector",
    "config":
    {
      "connector.class":"be.jovacon.kafka.connect.MQTTSourceConnector",
      "mqtt.topic":"test1",
      "kafka.topic":"mqtt-test",
      "mqtt.clientID":"kafka-mqtt-source-connector",
      "mqtt.broker":"tcp://127.0.0.1:1883",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schemas.enable":false,
      "value.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter.schemas.enable":false
    }
}'

Add sink connectors (listen Kafka topic then send to MQTT topic):

curl -X POST \
  http://<kafkaconnect>>:8083/connectors \
  -H 'Content-Type: application/json' \
  -d '{ "name": "mqtt-sink-connector",
    "config":
    {
      "connector.class":"be.jovacon.kafka.connect.MQTTSinkConnector",
      "mqtt.topic":"test2",
      "topics":"mqtt-test",
      "mqtt.clientID":"kafka-mqtt-sink-connector",
      "mqtt.broker":"tcp://127.0.0.1:1883",
      "key.converter":"org.apache.kafka.connect.storage.StringConverter",
      "key.converter.schemas.enable":false,
      "value.converter":"org.apache.kafka.connect.storage.StringConverter",
      "value.converter.schemas.enable":false
    }
}'

Read more connector config: https://github.com/johanvandevenne/kafka-connect-mqtt.git

Show all MQTT connector:

curl http://IP:8083/connectors

Show details of one connector (example: “mqtt-source-connector”):

curl http://IP:8083/connectors/mqtt-source-connector

Check status/ error of a connector then fix if have error:

curl http://IP6:8083/connectors/mqtt-source-connector/status

delete connector:

curl -X DELETE http://localhost:8083/connectors/mqtt-source-connector

Send a message from MQTT topic to Kafka topic (source connector)

Subscribe a Kafka topic “mqqt-test” in terminal window 1:

cd /home/kafka/kafka
bin/kafka-console-consumer.sh --topic mqqt-test --from-beginning --bootstrap-server localhost:9092

publish a message on MQTT topic “test” in terminal window 2:

mosquitto_pub -h localhost -t test1 -m "hello worlD"

See result in terminal window 1.

Send a message from Kafka topic to MQTT topic (sink connector)

Subscribe a MQTT topic “test” in terminal window 1:

mosquitto_sub -h localhost -t test2

publish a message on Kafka topic “mqtt-test” in terminal window 2:

cd /home/kafka/kafka
bin/kafka-console-producer.sh --topic mqtt-test --bootstrap-server localhost:9092

See result in terminal window 1.

Note: Don’t config sink and source MQTT topic is same name with same MQTT server. This make infinity loop.

, ,

DMCA.com Protection Status


Leave a Reply

Your email address will not be published. Required fields are marked *