diff --git a/ros2topic/ros2topic/verb/pub.py b/ros2topic/ros2topic/verb/pub.py index 346c9d0c9..7a1bc680f 100644 --- a/ros2topic/ros2topic/verb/pub.py +++ b/ros2topic/ros2topic/verb/pub.py @@ -83,6 +83,11 @@ def add_arguments(self, parser, cli_name): group.add_argument( '-t', '--times', type=nonnegative_int, default=0, help='Publish this number of times and then exit') + parser.add_argument( + '-w', '--wait-matching-subscriptions', type=nonnegative_int, default=None, + help=( + 'Wait until finding the specified number of matching subscriptions. ' + 'Defaults to 1 when using "-1"/"--once"/"--times", otherwise defaults to 0.')) parser.add_argument( '--keep-alive', metavar='N', type=positive_float, default=0.1, help='Keep publishing node alive for N seconds after the last msg ' @@ -146,6 +151,8 @@ def main(args): 1. / args.rate, args.print, times, + args.wait_matching_subscriptions + if args.wait_matching_subscriptions is not None else int(times != 0), qos_profile, args.keep_alive) @@ -158,6 +165,7 @@ def publisher( period: float, print_nth: int, times: int, + wait_matching_subscriptions: int, qos_profile: QoSProfile, keep_alive: float, ) -> Optional[str]: @@ -172,6 +180,15 @@ def publisher( pub = node.create_publisher(msg_module, topic_name, qos_profile) + times_since_last_log = 0 + while pub.get_subscription_count() < wait_matching_subscriptions: + # Print a message reporting we're waiting each 1s, check condition each 100ms. + if not times_since_last_log: + print( + f'Waiting for at least {wait_matching_subscriptions} matching subscription(s)...') + times_since_last_log = (times_since_last_log + 1) % 10 + time.sleep(0.1) + msg = msg_module() try: set_message_fields(msg, values_dictionary) @@ -188,8 +205,6 @@ def timer_callback(): print('publishing #%d: %r\n' % (count, msg)) pub.publish(msg) - # give some time for discovery process - time.sleep(0.1) timer_callback() if times != 1: timer = node.create_timer(period, timer_callback) diff --git a/ros2topic/test/test_cli.py b/ros2topic/test/test_cli.py index 53b9fa35b..8b3b5874d 100644 --- a/ros2topic/test/test_cli.py +++ b/ros2topic/test/test_cli.py @@ -660,7 +660,7 @@ def test_topic_pub(self): 'publisher: beginning loop', "publishing #1: std_msgs.msg.String(data='foo')", '' - ], strict=True + ] ), timeout=10) assert self.listener_node.wait_for_output(functools.partial( launch_testing.tools.expect_output, expected_lines=[ @@ -686,7 +686,7 @@ def test_topic_pub_once(self): 'publisher: beginning loop', "publishing #1: std_msgs.msg.String(data='bar')", '' - ], strict=True + ] ), timeout=10) assert topic_command.wait_for_shutdown(timeout=10) assert self.listener_node.wait_for_output(functools.partial( @@ -696,6 +696,51 @@ def test_topic_pub_once(self): ), timeout=10) assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + def test_topic_pub_once_matching_two_listeners( + self, launch_service, proc_info, proc_output, path_to_listener_node_script, additional_env + ): + second_listener_node_action = Node( + executable=sys.executable, + arguments=[path_to_listener_node_script], + remappings=[('chatter', 'chit_chatter')], + additional_env=additional_env, + name='second_listener', + ) + with launch_testing.tools.launch_process( + launch_service, second_listener_node_action, proc_info, proc_output + ) as second_listener_node, \ + self.launch_topic_command( + arguments=[ + 'pub', '--once', + '--keep-alive', '3', # seconds + '-w', '2', + '--qos-durability', 'transient_local', + '--qos-reliability', 'reliable', + '/chit_chatter', + 'std_msgs/msg/String', + '{data: bar}' + ] + ) as topic_command: + assert topic_command.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + 'publisher: beginning loop', + "publishing #1: std_msgs.msg.String(data='bar')", + '' + ] + ), timeout=10) + assert topic_command.wait_for_shutdown(timeout=10) + assert self.listener_node.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'\[INFO\] \[\d+\.\d*\] \[listener\]: I heard: \[bar\]') + ] + ), timeout=10) + assert second_listener_node.wait_for_output(functools.partial( + launch_testing.tools.expect_output, expected_lines=[ + re.compile(r'\[INFO\] \[\d+\.\d*\] \[second_listener\]: I heard: \[bar\]') + ] + ), timeout=10) + assert topic_command.exit_code == launch_testing.asserts.EXIT_OK + def test_topic_pub_print_every_two(self): with self.launch_topic_command( arguments=[ @@ -716,7 +761,7 @@ def test_topic_pub_print_every_two(self): '', "publishing #4: std_msgs.msg.String(data='fizz')", '' - ], strict=True + ] ), timeout=10), 'Output does not match: ' + topic_command.output assert self.listener_node.wait_for_output(functools.partial( launch_testing.tools.expect_output, expected_lines=[ diff --git a/ros2topic/test/test_echo_pub.py b/ros2topic/test/test_echo_pub.py index 7f41b6d87..6f5a95160 100644 --- a/ros2topic/test/test_echo_pub.py +++ b/ros2topic/test/test_echo_pub.py @@ -180,7 +180,7 @@ def message_callback(msg): @launch_testing.markers.retry_on_failure(times=5) def test_pub_times(self, launch_service, proc_info, proc_output): command_action = ExecuteProcess( - cmd=(['ros2', 'topic', 'pub', '-t', '5', '/clitest/topic/pub_times', + cmd=(['ros2', 'topic', 'pub', '-t', '5', '-w', '0', '/clitest/topic/pub_times', 'std_msgs/String', 'data: hello']), additional_env={ 'PYTHONUNBUFFERED': '1'