diff --git a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp index f37fa75477..9fe9bc3a88 100644 --- a/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp +++ b/rosbag2_test_common/include/rosbag2_test_common/subscription_manager.hpp @@ -117,13 +117,15 @@ class SubscriptionManager return matched; } - std::future spin_subscriptions() + std::future spin_subscriptions(int maximum_time_spinning_sec = 10) { return async( - std::launch::async, [this]() { + std::launch::async, [this, maximum_time_spinning_sec]() { rclcpp::executors::SingleThreadedExecutor exec; auto start = std::chrono::high_resolution_clock::now(); - while (rclcpp::ok() && continue_spinning(expected_topics_with_size_, start)) { + while (rclcpp::ok() && + continue_spinning(expected_topics_with_size_, start, maximum_time_spinning_sec)) + { exec.spin_node_some(subscriber_node_); } }); @@ -132,10 +134,11 @@ class SubscriptionManager private: bool continue_spinning( const std::unordered_map & expected_topics_with_sizes, - std::chrono::time_point start_time) + std::chrono::time_point start_time, + int maximum_time_spinning_sec = 10) { auto current = std::chrono::high_resolution_clock::now(); - if (current - start_time > std::chrono::seconds(10)) { + if (current - start_time > std::chrono::seconds(maximum_time_spinning_sec)) { return false; } diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index 6e21907fe4..c11ec61604 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -193,11 +193,10 @@ bool Player::is_storage_completely_loaded() const bool Player::play() { - if (is_in_playback_) { + if (is_in_playback_.exchange(true)) { RCLCPP_WARN_STREAM(get_logger(), "Trying to play() while in playback, dismissing request."); return false; } - is_in_playback_ = true; rclcpp::Duration delay(0, 0); if (play_options_.delay >= rclcpp::Duration(0, 0)) { diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp index cd6d49480e..75c151754d 100644 --- a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp +++ b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp @@ -53,6 +53,8 @@ constexpr bool kBool1Value{false}; constexpr bool kBool2Value{true}; constexpr bool kBool3Value{false}; +constexpr int kMaximumTimeSpinningSec{1}; + #define EVAL_REPLAYED_PRIMITIVES(replayed_primitives) \ EXPECT_THAT( \ replayed_primitives, \ @@ -104,12 +106,12 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture // around without any knowledge about message chronology. It just picks // the next one Make sure to keep the list in order or sort it! std::vector> messages = - {serialize_test_message(kTopic1Name, 500, primitive_message1), - serialize_test_message(kTopic2Name, 550, complex_message1), - serialize_test_message(kTopic1Name, 700, primitive_message1), - serialize_test_message(kTopic2Name, 750, complex_message1), - serialize_test_message(kTopic1Name, 900, primitive_message1), - serialize_test_message(kTopic2Name, 950, complex_message1)}; + {serialize_test_message(kTopic1Name, 100, primitive_message1), + serialize_test_message(kTopic2Name, 120, complex_message1), + serialize_test_message(kTopic1Name, 200, primitive_message1), + serialize_test_message(kTopic2Name, 220, complex_message1), + serialize_test_message(kTopic1Name, 300, primitive_message1), + serialize_test_message(kTopic2Name, 320, complex_message1)}; // @} return messages; } @@ -128,11 +130,12 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(milliseconds)); player_ = std::make_shared(std::move(reader), storage_options_, play_options_); + // Wait for discovery to match publishers with subscribers ASSERT_TRUE( sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5))); - auto await_received_messages = sub_->spin_subscriptions(); + auto await_received_messages = sub_->spin_subscriptions(kMaximumTimeSpinningSec); ASSERT_TRUE(player_->play()); @@ -145,7 +148,7 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture TEST_F(RosBag2PlayForTestFixture, play_for_all_are_played_due_to_duration) { - InitPlayerWithPlaybackDurationAndPlay(1000); + InitPlayerWithPlaybackDurationAndPlay(350); auto replayed_test_primitives = sub_->get_received_messages( kTopic1); @@ -161,7 +164,7 @@ TEST_F(RosBag2PlayForTestFixture, play_for_all_are_played_due_to_duration) TEST_F(RosBag2PlayForTestFixture, play_for_none_are_played_due_to_duration) { - InitPlayerWithPlaybackDurationAndPlay(300); + InitPlayerWithPlaybackDurationAndPlay(50); auto replayed_test_primitives = sub_->get_received_messages( kTopic1); @@ -174,7 +177,7 @@ TEST_F(RosBag2PlayForTestFixture, play_for_none_are_played_due_to_duration) TEST_F(RosBag2PlayForTestFixture, play_for_less_than_the_total_duration) { - InitPlayerWithPlaybackDurationAndPlay(800); + InitPlayerWithPlaybackDurationAndPlay(270); auto replayed_test_primitives = sub_->get_received_messages( kTopic1); @@ -202,7 +205,7 @@ TEST_F( RosBag2PlayForFilteredTopicTestFixture, play_for_full_duration_recorded_messages_with_filtered_topics) { - InitPlayerWithPlaybackDurationAndPlay(1000); + InitPlayerWithPlaybackDurationAndPlay(400); auto replayed_test_primitives = sub_->get_received_messages("/topic1"); @@ -220,7 +223,7 @@ TEST_F( RosBag2PlayForFilteredTopicTestFixture, play_for_short_duration_recorded_messages_with_filtered_topics) { - InitPlayerWithPlaybackDurationAndPlay(300); + InitPlayerWithPlaybackDurationAndPlay(50); auto replayed_test_primitives = sub_->get_received_messages("/topic1"); @@ -236,7 +239,7 @@ TEST_F( RosBag2PlayForFilteredTopicTestFixture, play_for_intermediate_duration_recorded_messages_with_filtered_topics) { - InitPlayerWithPlaybackDurationAndPlay(800); + InitPlayerWithPlaybackDurationAndPlay(270); auto replayed_test_primitives = sub_->get_received_messages("/topic1"); @@ -271,9 +274,9 @@ class RosBag2PlayForInterruptedTestFixture : public RosBag2PlayTestFixture // around without any knowledge about message chronology. It just picks // the next one Make sure to keep the list in order or sort it! std::vector> messages = - {serialize_test_message(kTopic1Name, 500, primitive_message), - serialize_test_message(kTopic1Name, 700, primitive_message), - serialize_test_message(kTopic1Name, 900, primitive_message)}; + {serialize_test_message(kTopic1Name, 50, primitive_message), + serialize_test_message(kTopic1Name, 100, primitive_message), + serialize_test_message(kTopic1Name, 150, primitive_message)}; // @} return messages; } @@ -283,7 +286,6 @@ TEST_F( RosBag2PlayForInterruptedTestFixture, play_should_return_false_when_interrupted) { - auto topic_types = get_topic_types(); auto messages = get_serialized_messages(); @@ -291,34 +293,27 @@ TEST_F( prepared_mock_reader->prepare(messages, topic_types); auto reader = std::make_unique(std::move(prepared_mock_reader)); - sub_->add_subscription(kTopic1, 3); + // Let the player only reproduce one message. + sub_->add_subscription(kTopic1, 1); + play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(75)); - play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(1000)); std::shared_ptr player_ = std::make_shared( - std::move( - reader), storage_options_, play_options_); + std::move(reader), storage_options_, play_options_); + // Wait for discovery to match publishers with subscribers ASSERT_TRUE( sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5))); - auto await_received_messages = sub_->spin_subscriptions(); - - auto play_execution_1 = std::async(std::launch::async, [player_]() {return player_->play();}); - auto play_execution_2 = std::async(std::launch::async, [player_]() {return player_->play();}); + auto await_received_messages = sub_->spin_subscriptions(kMaximumTimeSpinningSec); + player_->pause(); + auto player_future = std::async(std::launch::async, [player_]() {return player_->play();}); + player_->wait_for_playback_to_start(); + ASSERT_TRUE(player_->is_paused()); + ASSERT_FALSE(player_->play()); + player_->resume(); + player_future.get(); await_received_messages.get(); - - const auto play_execution_1_wait_result = play_execution_1.wait_for( - std::chrono::milliseconds( - 1500)); - const auto play_execution_2_wait_result = play_execution_2.wait_for( - std::chrono::milliseconds( - 1500)); - - ASSERT_EQ(std::future_status::ready, play_execution_1_wait_result); - ASSERT_EQ(std::future_status::ready, play_execution_2_wait_result); - - ASSERT_TRUE( - (play_execution_1.get() && !play_execution_2.get()) || - (!play_execution_1.get() && play_execution_2.get())); + auto replayed_topic1 = sub_->get_received_messages(kTopic1); + EXPECT_THAT(replayed_topic1, SizeIs(1)); }