Skip to content

Commit

Permalink
Let the user inject callables after starting subscribers
Browse files Browse the repository at this point in the history
Signed-off-by: Pintaudi Giorgio <[email protected]>
  • Loading branch information
LastStarDust committed Mar 1, 2023
1 parent 954a211 commit e9a28bc
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
15 changes: 14 additions & 1 deletion launch_testing_ros/launch_testing_ros/wait_for_topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,15 @@ def method_2():
wait_for_topics.shutdown()
"""

def __init__(self, topic_tuples, timeout=5.0):
def __init__(self, topic_tuples, timeout=5.0, callback=None, callback_arguments=None):
self.topic_tuples = topic_tuples
self.timeout = timeout
self.callback = callback
if self.callback is not None and not callable(self.callback):
raise TypeError("The passed callback is not callable")
self.callback_arguments = (
callback_arguments if callback_arguments is not None else []
)
self.__ros_context = rclpy.Context()
rclpy.init(context=self.__ros_context)
self.__ros_executor = SingleThreadedExecutor(context=self.__ros_context)
Expand All @@ -70,6 +76,13 @@ def _prepare_ros_node(self):

def wait(self):
self.__ros_node.start_subscribers(self.topic_tuples)
if self.callback:
if isinstance(self.callback_arguments, dict):
self.callback(**self.callback_arguments)
elif isinstance(self.callback_arguments, (list, set, tuple)):
self.callback(*self.callback_arguments)
else:
self.callback(self.callback_arguments)
return self.__ros_node.msg_event_object.wait(self.timeout)

def shutdown(self):
Expand Down
17 changes: 17 additions & 0 deletions launch_testing_ros/test/examples/wait_for_topic_launch_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,20 @@ def test_topics_unsuccessful(self, count):
assert wait_for_node_object.topics_received() == expected_topics
assert wait_for_node_object.topics_not_received() == {'invalid_topic'}
wait_for_node_object.shutdown()

def test_callback(self, count):
topic_list = [('chatter_' + str(i), String) for i in range(count)]
expected_topics = {'chatter_' + str(i) for i in range(count)}

# Method 1 : Using the magic methods and 'with' keyword

is_callback_called = [[False]]

def callback(arg):
arg[0] = True

with WaitForTopics(topic_list, timeout=2.0, callback=callback,
callback_arguments=is_callback_called) as wait_for_node_object_1:
assert wait_for_node_object_1.topics_received() == expected_topics
assert wait_for_node_object_1.topics_not_received() == set()
assert is_callback_called[0]

0 comments on commit e9a28bc

Please sign in to comment.