diff --git a/agent/mqtt.py b/agent/mqtt.py index 59f824f..8118237 100755 --- a/agent/mqtt.py +++ b/agent/mqtt.py @@ -24,7 +24,7 @@ from rclpy.node import Node -from muto_msgs.msg import Gateway, MutoActionMeta +from muto_msgs.msg import Gateway, MutoActionMeta, Thing, ThingHeaders from paho.mqtt.client import Client, MQTTv5 from paho.mqtt.properties import Properties @@ -47,6 +47,8 @@ def __init__(self): self.declare_parameter("agent_to_gateway_topic", "msg1") self.declare_parameter("gateway_to_agent_topic", "msg2") + self.declare_parameter("thing_messages_topic", "thing_messages") + # Initialize Parameters self.host = self.get_parameter("host").value self.port = self.get_parameter("port").value @@ -60,6 +62,8 @@ def __init__(self): self.agent_to_gateway_topic = self.get_parameter("agent_to_gateway_topic").value self.gateway_to_agent_topic = self.get_parameter("gateway_to_agent_topic").value + self.thing_messages_topic = self.get_parameter("thing_messages_topic").value + # MQTT Client self.mqtt = Client( client_id=f"{self.name}_{self.get_clock().now()}", @@ -82,6 +86,7 @@ def __init__(self): self.pub_agent = self.create_publisher(Gateway, self.gateway_to_agent_topic, 10) self.sub_agent = self.create_subscription(Gateway, self.agent_to_gateway_topic, self.agent_msg_callback, 10) + self.pub_thing = self.create_publisher(Thing, self.thing_messages_topic, 10) def __del__(self): self.mqtt.loop_stop() @@ -143,27 +148,32 @@ def on_message(self, client, userdata, message): properties = message.properties # Create "meta" if ResponseTopic and CorrelationData exists in message properties - ResponseTopic = None - CorrelationData = None meta = MutoActionMeta() - if hasattr(properties, "ResponseTopic") and hasattr(properties, "CorrelationData"): - ResponseTopic = properties.ResponseTopic - CorrelationData = properties.CorrelationData.decode("utf-8") - - meta.response_topic = ResponseTopic - meta.correlation_data = CorrelationData + if hasattr(properties, "ResponseTopic"): + meta.response_topic = properties.ResponseTopic + if hasattr(properties, "CorrelationData"): + meta.correlation_data = properties.CorrelationData.decode("utf-8") - # Send message to agent thingmsg = json.loads(payload) - twinmsg = [] - if bool(thingmsg.get('topic', None)): - twinmsg = re.findall('.*/things/twin/events/(.*)', thingmsg['topic']) - if len(twinmsg) > 0 and bool(twinmsg[0]): - change = twinmsg[0] - logmsg = f"{self.name} received event {topic} {thingmsg['topic']} {change} {thingmsg.get('path', None)} {thingmsg.get('value',None)}" - self.get_logger().info(logmsg) - else: + + try: + parsed = re.findall(".*/things/([^/]*)/([^/]*)/(.*)", thingmsg["topic"])[0] + + if len(parsed) > 2 and bool(parsed[0]): + channel = parsed[0] + criterion = parsed[1] + action = parsed[2].split('/') + if (channel == "live") and (criterion == "messages") and (action[0] == "agent"): + self.send_to_agent(topic, payload, meta) + else: + self.publish_thing_message(thingmsg, channel, action[0], meta) + else: + self.get_logger().info(f"ERROR: incompatible type") + # TODO: implement + + except: + # TODO: remove this after making changes to the dashboard self.send_to_agent(topic, payload, meta) def send_to_agent(self, topic, payload, meta): @@ -200,6 +210,30 @@ def agent_msg_callback(self, data): self.mqtt.publish(response_topic, payload, properties=properties) + def publish_thing_message(self, payload, channel, action, meta): + """ + TODO: add docs. + """ + thing_headers = ThingHeaders() + + headers = payload.get("headers", None) + if headers: + thing_headers.reply_to = headers.get("reply-to", "") + thing_headers.correlation_id = headers.get("correlation-id", "") + thing_headers.ditto_originator = headers.get("ditto-originator", "") + thing_headers.response_required = headers.get("response-required", "") + thing_headers.content_type = headers.get("content-type", "") + + msg_thing = Thing() + msg_thing.topic = payload.get("topic", "") + msg_thing.headers = thing_headers + msg_thing.path = payload.get("path", "") + msg_thing.value = json.dumps(payload.get("value", "")) + msg_thing.channel = channel + msg_thing.action = action + msg_thing.meta = meta + + self.pub_thing.publish(msg_thing) def main(): rclpy.init() diff --git a/config/agent.yaml b/config/agent.yaml index 76919eb..32eebe3 100755 --- a/config/agent.yaml +++ b/config/agent.yaml @@ -29,6 +29,8 @@ agent_to_commands_topic: "agent_to_command" commands_to_agent_topic: "command_to_agent" + thing_messages_topic: "thing_messages" + muto_agent: ros__parameters: type: simulator @@ -41,6 +43,7 @@ mqtt_gateway: keep_alive: 60 user: null password: null + prefix: muto namespace: org.eclipse.muto.sandbox name: f1tenth-01 diff --git a/package.xml b/package.xml index fffb41d..490ea78 100755 --- a/package.xml +++ b/package.xml @@ -11,6 +11,7 @@ rclpy ros2launch + muto_msgs python-paho-mqtt-pip