diff --git a/agent/mqtt.py b/agent/mqtt.py index 8ebb11e..59f824f 100755 --- a/agent/mqtt.py +++ b/agent/mqtt.py @@ -19,6 +19,9 @@ # import rclpy +import re +import json + from rclpy.node import Node from muto_msgs.msg import Gateway, MutoActionMeta @@ -31,7 +34,6 @@ class MQTT(Node): def __init__(self): super().__init__("mqtt_gateway") - # Declare Parameters self.declare_parameter("host", "sandbox.composiv.ai") self.declare_parameter("port", 1883) @@ -39,6 +41,7 @@ def __init__(self): self.declare_parameter("user", "") self.declare_parameter("password", "") self.declare_parameter("namespace", "") + self.declare_parameter("prefix", "muto") self.declare_parameter("name", "") self.declare_parameter("agent_to_gateway_topic", "msg1") @@ -50,6 +53,7 @@ def __init__(self): self.keep_alive = self.get_parameter("keep_alive").value self.user = self.get_parameter("user").value self.password = self.get_parameter("password").value + self.prefix = self.get_parameter("prefix").value self.namespace = self.get_parameter("namespace").value self.name = self.get_parameter("name").value @@ -106,7 +110,9 @@ def on_connect(self, client, userdata, flags, reasonCode, properties): of the Properties class. """ topic = f"{self.namespace}:{self.name}/#" + twintopics = f"{self.prefix}/{self.namespace}:{self.name}/#" self.mqtt.subscribe(topic) + self.mqtt.subscribe(twintopics) self.get_logger().info(f"Subscribed to {topic}") def on_message(self, client, userdata, message): @@ -149,7 +155,16 @@ def on_message(self, client, userdata, message): meta.correlation_data = CorrelationData # Send message to agent - self.send_to_agent(topic, payload, meta) + thingmsg = json.loads(payload) + twinmsg = [] + if bool(thingmsg.get('topic', None)): + twinmsg = re.findall('.*/things/twin/events/(.*)', thingmsg['topic']) + if len(twinmsg) > 0 and bool(twinmsg[0]): + change = twinmsg[0] + logmsg = f"{self.name} received event {topic} {thingmsg['topic']} {change} {thingmsg.get('path', None)} {thingmsg.get('value',None)}" + self.get_logger().info(logmsg) + else: + self.send_to_agent(topic, payload, meta) def send_to_agent(self, topic, payload, meta): """