Skip to content

Commit

Permalink
added support for twin messages and events (#4)
Browse files Browse the repository at this point in the history
* added support for twin messages and events

* improve twin and live messages and features

* added correlation-id to ThingHeader.msg
  • Loading branch information
alpsarica authored Jan 26, 2024
1 parent 4604239 commit c0cc080
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 18 deletions.
70 changes: 52 additions & 18 deletions agent/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

from rclpy.node import Node

from muto_msgs.msg import Gateway, MutoActionMeta
from muto_msgs.msg import Gateway, MutoActionMeta, Thing, ThingHeaders

from paho.mqtt.client import Client, MQTTv5
from paho.mqtt.properties import Properties
Expand All @@ -47,6 +47,8 @@ def __init__(self):
self.declare_parameter("agent_to_gateway_topic", "msg1")
self.declare_parameter("gateway_to_agent_topic", "msg2")

self.declare_parameter("thing_messages_topic", "thing_messages")

# Initialize Parameters
self.host = self.get_parameter("host").value
self.port = self.get_parameter("port").value
Expand All @@ -60,6 +62,8 @@ def __init__(self):
self.agent_to_gateway_topic = self.get_parameter("agent_to_gateway_topic").value
self.gateway_to_agent_topic = self.get_parameter("gateway_to_agent_topic").value

self.thing_messages_topic = self.get_parameter("thing_messages_topic").value

# MQTT Client
self.mqtt = Client(
client_id=f"{self.name}_{self.get_clock().now()}",
Expand All @@ -82,6 +86,7 @@ def __init__(self):
self.pub_agent = self.create_publisher(Gateway, self.gateway_to_agent_topic, 10)
self.sub_agent = self.create_subscription(Gateway, self.agent_to_gateway_topic, self.agent_msg_callback, 10)

self.pub_thing = self.create_publisher(Thing, self.thing_messages_topic, 10)

def __del__(self):
self.mqtt.loop_stop()
Expand Down Expand Up @@ -143,27 +148,32 @@ def on_message(self, client, userdata, message):
properties = message.properties

# Create "meta" if ResponseTopic and CorrelationData exists in message properties
ResponseTopic = None
CorrelationData = None
meta = MutoActionMeta()

if hasattr(properties, "ResponseTopic") and hasattr(properties, "CorrelationData"):
ResponseTopic = properties.ResponseTopic
CorrelationData = properties.CorrelationData.decode("utf-8")

meta.response_topic = ResponseTopic
meta.correlation_data = CorrelationData
if hasattr(properties, "ResponseTopic"):
meta.response_topic = properties.ResponseTopic
if hasattr(properties, "CorrelationData"):
meta.correlation_data = properties.CorrelationData.decode("utf-8")

# Send message to agent
thingmsg = json.loads(payload)
twinmsg = []
if bool(thingmsg.get('topic', None)):
twinmsg = re.findall('.*/things/twin/events/(.*)', thingmsg['topic'])
if len(twinmsg) > 0 and bool(twinmsg[0]):
change = twinmsg[0]
logmsg = f"{self.name} received event {topic} {thingmsg['topic']} {change} {thingmsg.get('path', None)} {thingmsg.get('value',None)}"
self.get_logger().info(logmsg)
else:

try:
parsed = re.findall(".*/things/([^/]*)/([^/]*)/(.*)", thingmsg["topic"])[0]

if len(parsed) > 2 and bool(parsed[0]):
channel = parsed[0]
criterion = parsed[1]
action = parsed[2].split('/')
if (channel == "live") and (criterion == "messages") and (action[0] == "agent"):
self.send_to_agent(topic, payload, meta)
else:
self.publish_thing_message(thingmsg, channel, action[0], meta)
else:
self.get_logger().info(f"ERROR: incompatible type")
# TODO: implement

except:
# TODO: remove this after making changes to the dashboard
self.send_to_agent(topic, payload, meta)

def send_to_agent(self, topic, payload, meta):
Expand Down Expand Up @@ -200,6 +210,30 @@ def agent_msg_callback(self, data):

self.mqtt.publish(response_topic, payload, properties=properties)

def publish_thing_message(self, payload, channel, action, meta):
"""
TODO: add docs.
"""
thing_headers = ThingHeaders()

headers = payload.get("headers", None)
if headers:
thing_headers.reply_to = headers.get("reply-to", "")
thing_headers.correlation_id = headers.get("correlation-id", "")
thing_headers.ditto_originator = headers.get("ditto-originator", "")
thing_headers.response_required = headers.get("response-required", "")
thing_headers.content_type = headers.get("content-type", "")

msg_thing = Thing()
msg_thing.topic = payload.get("topic", "")
msg_thing.headers = thing_headers
msg_thing.path = payload.get("path", "")
msg_thing.value = json.dumps(payload.get("value", ""))
msg_thing.channel = channel
msg_thing.action = action
msg_thing.meta = meta

self.pub_thing.publish(msg_thing)

def main():
rclpy.init()
Expand Down
3 changes: 3 additions & 0 deletions config/agent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
agent_to_commands_topic: "agent_to_command"
commands_to_agent_topic: "command_to_agent"

thing_messages_topic: "thing_messages"

muto_agent:
ros__parameters:
type: simulator
Expand All @@ -41,6 +43,7 @@ mqtt_gateway:
keep_alive: 60
user: null
password: null
prefix: muto
namespace: org.eclipse.muto.sandbox
name: f1tenth-01

Expand Down
1 change: 1 addition & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

<depend>rclpy</depend>
<depend>ros2launch</depend>
<depend>muto_msgs</depend>
<depend>python-paho-mqtt-pip</depend>

<export>
Expand Down

0 comments on commit c0cc080

Please sign in to comment.