After Install Kafka…
Download Kafka MQTT Connector
Besure that java and maven were installed.
java -version
mvn -v
Download:
su -l kafka
cd /home/kafka
git clone https://github.com/johanvandevenne/kafka-connect-mqtt.git
cd kafka-connect-mqtt
mvn clean install
cp /home/kafka/kafka-connect-mqtt/target/kafka-connect-mqtt-1.1.0-package/kafka-connect-mqtt/* /home/kafka/kafka/libs
cd ../kafka
If not working, download from this source: https://drive.google.com/file/d/10Gw-NGxdz48eg7LpJ1klIpVjv2XxlolB/view
Note: Kafka Connector default port is TCP 8083. It’s conflict Mosquitto default port. We need to change config of Mosquitto (or your broker). Example, I change websocket port from 8083 to 9093:
listener 1883 127.0.0.1
listener 8883
certfile /etc/mosquitto/ssl/cert.pem
cafile /etc/mosquitto/ssl/fullchain.pem
keyfile /etc/mosquitto/ssl/privkey.pem
listener 9093
protocol websockets
certfile /etc/mosquitto/ssl/cert.pem
cafile /etc/mosquitto/ssl/fullchain.pem
keyfile /etc/mosquitto/ssl/privkey.pem
Start Kafka Connector service
bin/connect-distributed.sh config/connect-distributed.properties
Besure it working:
curl http://127.0.0.1:8083/connector-plugins
Is will show at least or more modules:
[
{
"class": "be.jovacon.kafka.connect.MQTTSinkConnector",
"type": "sink",
"version": "1.1.0"
},
{
"class": "be.jovacon.kafka.connect.MQTTSourceConnector",
"type": "source",
"version": "1.1.0"
}
]
Add source connectors (listen MQTT topic then send to Kafka topic):
curl -X POST \
http://<kafkaconnect>:8083/connectors \
-H 'Content-Type: application/json' \
-d '{ "name": "mqtt-source-connector",
"config":
{
"connector.class":"be.jovacon.kafka.connect.MQTTSourceConnector",
"mqtt.topic":"test1",
"kafka.topic":"mqtt-test",
"mqtt.clientID":"kafka-mqtt-source-connector",
"mqtt.broker":"tcp://127.0.0.1:1883",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":false,
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable":false
}
}'
Add sink connectors (listen Kafka topic then send to MQTT topic):
curl -X POST \
http://<kafkaconnect>>:8083/connectors \
-H 'Content-Type: application/json' \
-d '{ "name": "mqtt-sink-connector",
"config":
{
"connector.class":"be.jovacon.kafka.connect.MQTTSinkConnector",
"mqtt.topic":"test2",
"topics":"mqtt-test",
"mqtt.clientID":"kafka-mqtt-sink-connector",
"mqtt.broker":"tcp://127.0.0.1:1883",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":false,
"value.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter.schemas.enable":false
}
}'
Read more connector config: https://github.com/johanvandevenne/kafka-connect-mqtt.git
Show all MQTT connector:
curl http://IP:8083/connectors
Show details of one connector (example: “mqtt-source-connector”):
curl http://IP:8083/connectors/mqtt-source-connector
Check status/ error of a connector then fix if have error:
curl http://IP6:8083/connectors/mqtt-source-connector/status
delete connector:
curl -X DELETE http://localhost:8083/connectors/mqtt-source-connector
Send a message from MQTT topic to Kafka topic (source connector)
Subscribe a Kafka topic “mqqt-test” in terminal window 1:
cd /home/kafka/kafka
bin/kafka-console-consumer.sh --topic mqqt-test --from-beginning --bootstrap-server localhost:9092
publish a message on MQTT topic “test” in terminal window 2:
mosquitto_pub -h localhost -t test1 -m "hello worlD"
See result in terminal window 1.
Send a message from Kafka topic to MQTT topic (sink connector)
Subscribe a MQTT topic “test” in terminal window 1:
mosquitto_sub -h localhost -t test2
publish a message on Kafka topic “mqtt-test” in terminal window 2:
cd /home/kafka/kafka
bin/kafka-console-producer.sh --topic mqtt-test --bootstrap-server localhost:9092
See result in terminal window 1.
Note: Don’t config sink and source MQTT topic is same name with same MQTT server. This make infinity loop.