Connecting MQTT services with the Messaging System. This component subscribes to topics from the Internet of Things messaging system MQTT and forwards them to the messaging system which is based on Apache Kafka. Therefore both services must be running:
- MQTT-broker in the same repository.
- panta-rhei stack
The MQTT Adapter is based on the components:
- Paho-MQTT Messaging Client, paho.mqtt version 1.4.0
- Kafka Client librdkafka version 2.1
- Python Kafka module confluent-kafka-python version 0.11.6
- Install Docker version 1.10.0+
- Install Docker Compose version 1.6.0+
- Make sure the Panta Rhei stack is running. This MQTT-Adaper requires Apache Kafka, as well as the SensorThings server GOST.
- Make sure the MQTT-broker runs. The
dockerfile
is in the repository undersetup-broker
that can also be deployed in a Docker Swarm. Instruction is here - Clone the Panta Rhei client into the
src
-directory:
git clone https://github.com/iot-salzburg/panta_rhei src/panta_rhei > /dev/null 2>&1 || echo "Repo already exists"
git -C src/panta_rhei/ checkout client_0v3
git -C src/panta_rhei/ pull
Now, the client can be imported and used in mqtt-adapter.py
with:
```python
import os, sys
sys.path.append(os.getcwd())
from client.digital_twin_client import DigitalTwinClient
# Set the configs, create a new Digital Twin Instance and register file structure
config = {"client_name": "mqtt-adapter",
"system": "at.srfg.iot.dtz",
"gost_servers": "192.168.48.71:8082",
"kafka_bootstrap_servers": "192.168.48.71:9092,192.168.48.72:9092,192.168.48.73:9092,192.168.48.74:9092,192.168.48.75:9092"}
client = DigitalTwinClient(**config)
client.register_existing(mappings_file=MAPPINGS)
# client.register_new(instance_file=INSTANCES) # Registering of new instances should be outsourced to the platform
```
Note that the paths might be undetermined when executed locally,
however using the dockerfile.yml
it will work.
The MQTT-Adapter uses SensorThings to semantically augment the forwarded data. Data that is later on consumed by the suggested DB-Adapter decodes the generic data format using the same SensorThings server.
cd dtz_mqtt-adapter/setup-broker
sudo docker-compose up --build -d
If zookeeper is specified by :2181
, the local zookeeper service will be used.
It may take some seconds until the new topics are distributed on each zookeeper instance in
a cluster setup.
/kafka/bin/kafka-topics.sh --zookeeper :2181 --list
/kafka/bin/kafka-topics.sh --zookeeper :2181 --create --partitions 3 --replication-factor 3 --config min.insync.replicas=2 --config cleanup.policy=compact --config retention.ms=241920000 --topic eu.srfg.iot.dtz.data
/kafka/bin/kafka-topics.sh --zookeeper :2181 --create --partitions 3 --replication-factor 3 --config min.insync.replicas=2 --config cleanup.policy=compact --config retention.ms=241920000 --topic eu.srfg.iot.dtz.external
/kafka/bin/kafka-topics.sh --zookeeper :2181 --create --partitions 3 --replication-factor 1 --config min.insync.replicas=1 --config cleanup.policy=compact --config retention.ms=241920000 --topic eu.srfg.iot.dtz.logging
/kafka/bin/kafka-topics.sh --zookeeper :2181 --list
Configure the connection in the docker-compose.yml
services:
adapter:
...
environment:
# MQTT config
MQTT_BROKER: "192.168.48.71"
MQTT_SUBSCRIBED_TOPICS: "prusa3d/#,sensorpi/#,octoprint/#"
# Panta Rhei configuration
CLIENT_NAME: "mqtt-adapter"
SYSTEM_NAME: "at.srfg.iot.dtz"
SENSORTHINGS_HOST: "192.168.48.71:8082"
BOOTSTRAP_SERVERS: "192.168.48.71:9092,192.168.48.72:9092,192.168.48.73:9092,192.168.48.74:9092,192.168.48.75:9092"
Using docker-compose
: This depends on the Panta Rhei Stack and
configured instance_file
.
cd dtz_mqtt-adapter
sudo docker-compose up --build -d
The flag -d
stands for running it in background (detached mode):
Watch the logs with:
sudo docker-compose logs -f
Using docker stack
:
If not already done, add a registry instance to register the image
sudo docker service create --name registry --publish published=5001,target=5000 registry:2
curl 127.0.0.1:5001/v2/
This should output {}
:
If running with docker-compose works, the stack will start by running:
cd dtz_mqtt-adapter
sh setup-broker/start_mqtt-broker.sh
cd ../dtz_mqtt-adapter
sh start_mqtt-adapter.sh
Watch if everything worked fine with:
./show-adapter-stats.sh
docker service logs -f add-mqtt_adapter
The asset structure is configured in the instance.json
file to
augment the incoming MQTT messages with metadata stored on the
sensorthings server.
If the structure is changed the mqtt-adapter has to be restarted in order to
update the structure in the SensorThings server.
Restart the service
sudo service docker restart
or add the file /etc/docker/daemon.json
with the content:
{
"dns": [your_dns, "8.8.8.8", "8.8.8.4"]
}
where your_dns
can be found with the command:
nmcli device show [interfacename] | grep IP4.DNS
Restart service with
sudo service docker restart
or add your dns address as described above