From ccafe855f35f3c2f9df0a12ed75bf3bf3bc852c9 Mon Sep 17 00:00:00 2001 From: Geoffrey Biggs Date: Thu, 2 Jun 2022 14:13:27 +0900 Subject: [PATCH] Add play-for functionality (#960) * Add play-for functionality Signed-off-by: Geoffrey Biggs * Add play-for to the CLI Signed-off-by: Geoffrey Biggs * Solves most of the comments from https://github.com/ros2/rosbag2/pull/960/files (#14) * Add play-for functionality Signed-off-by: Geoffrey Biggs * Add play-for to the CLI Signed-off-by: Geoffrey Biggs * Adds playback_duration to PlayOptions. * Changes from PlayFor to Play srv message and changes start_offset and playback_duration. * Restores play_for tests. * Removes extra SubscriptionManager methods. * Solves comment about extra sent message. * Reorders code and comment. * Removes the SKIP_TEST flag. Co-authored-by: Geoffrey Biggs Signed-off-by: Geoffrey Biggs * Second round of comments from #960 upstream (#15) * Removes duration parameter. A leftover after switching to playback_duration. * Fixes comment. * Solves format in rosbag2_py -> _transport.cpp * Applies style suggestions. * Changes play() to return a boolean indicating whether the request could be fulfilled. * Removes extra unnecessary code. Signed-off-by: Geoffrey Biggs * Updates test execution time for https://github.com/ros2/rosbag2/pull/960 (#16) * Adresses reviewer's comments. * Improve test time by adding an optional argument to SubscriptionManager::spin_subscriptions() - Reduces test_play_for execution time from 50s to 6s approximately. Signed-off-by: Geoffrey Biggs * Redesign tests in test_play_for.cpp (#17) * Redesigned tests to be more deterministic and running faster * Fixed bug in `play_for()` flow when replaying in loop or multiple times from the same player instance. Signed-off-by: Michael Orlov Signed-off-by: Geoffrey Biggs * Remove unnecessary source file from test binary Signed-off-by: Geoffrey Biggs * Correct errors introduced by rebase Signed-off-by: Geoffrey Biggs * Correct play_next behaviour Signed-off-by: Geoffrey Biggs Co-authored-by: Agustin Alba Chicar Co-authored-by: Michael Orlov --- ros2bag/ros2bag/verb/play.py | 9 + rosbag2_interfaces/CMakeLists.txt | 1 + rosbag2_interfaces/srv/Play.srv | 7 + rosbag2_py/src/rosbag2_py/_transport.cpp | 17 +- rosbag2_transport/CMakeLists.txt | 6 + .../rosbag2_transport/play_options.hpp | 4 + .../include/rosbag2_transport/player.hpp | 8 +- .../src/rosbag2_transport/player.cpp | 47 ++- .../test/rosbag2_transport/test_play_for.cpp | 316 ++++++++++++++++++ 9 files changed, 409 insertions(+), 6 deletions(-) create mode 100644 rosbag2_interfaces/srv/Play.srv create mode 100644 rosbag2_transport/test/rosbag2_transport/test_play_for.cpp diff --git a/ros2bag/ros2bag/verb/play.py b/ros2bag/ros2bag/verb/play.py index 92e01bcbb1..4932522a44 100644 --- a/ros2bag/ros2bag/verb/play.py +++ b/ros2bag/ros2bag/verb/play.py @@ -85,6 +85,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102 parser.add_argument( '-d', '--delay', type=positive_float, default=0.0, help='Sleep duration before play (each loop), in seconds. Negative durations invalid.') + parser.add_argument( + '--playback_duration', type=float, default=-1.0, + help='Playback duration, in seconds. Negative durations mark an infinite playback. ' + 'Default is -1.0.') parser.add_argument( '--disable-keyboard-controls', action='store_true', help='disables keyboard controls for playback') @@ -111,6 +115,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102 'By default, if loaned message can be used, messages are published as loaned ' 'message. It can help to reduce the number of data copies, so there is a greater ' 'benefit for sending big data.') + parser.add_argument( + '-f', '--duration', type=float, default=None, + help='Play for SEC seconds. Default is None, meaning that playback will continue ' + 'until the end of the bag. Valid range > 0.0') def main(self, *, args): # noqa: D102 qos_profile_overrides = {} # Specify a valid default @@ -146,6 +154,7 @@ def main(self, *, args): # noqa: D102 play_options.topic_remapping_options = topic_remapping play_options.clock_publish_frequency = args.clock play_options.delay = args.delay + play_options.playback_duration = args.playback_duration play_options.disable_keyboard_controls = args.disable_keyboard_controls play_options.start_paused = args.start_paused play_options.start_offset = args.start_offset diff --git a/rosbag2_interfaces/CMakeLists.txt b/rosbag2_interfaces/CMakeLists.txt index 9b82745b2d..cb0393bccd 100644 --- a/rosbag2_interfaces/CMakeLists.txt +++ b/rosbag2_interfaces/CMakeLists.txt @@ -17,6 +17,7 @@ rosidl_generate_interfaces(${PROJECT_NAME} "srv/GetRate.srv" "srv/IsPaused.srv" "srv/Pause.srv" + "srv/Play.srv" "srv/PlayNext.srv" "srv/Resume.srv" "srv/Seek.srv" diff --git a/rosbag2_interfaces/srv/Play.srv b/rosbag2_interfaces/srv/Play.srv new file mode 100644 index 0000000000..54edf7dd45 --- /dev/null +++ b/rosbag2_interfaces/srv/Play.srv @@ -0,0 +1,7 @@ +# See rosbag2_transport::PlayOptions::start_offset +builtin_interfaces/Time start_offset +# See rosbag2_transport::PlayOptions::playback_duration +builtin_interfaces/Duration playback_duration +--- +# returns false when another playback execution is running, otherwise true +bool success diff --git a/rosbag2_py/src/rosbag2_py/_transport.cpp b/rosbag2_py/src/rosbag2_py/_transport.cpp index bbfa21b73a..5de5419bab 100644 --- a/rosbag2_py/src/rosbag2_py/_transport.cpp +++ b/rosbag2_py/src/rosbag2_py/_transport.cpp @@ -69,6 +69,17 @@ struct OptionsWrapper : public T static_cast(RCUTILS_S_TO_NS(delay))); } + double getPlaybackDuration() const + { + return RCUTILS_NS_TO_S(static_cast(this->playback_duration.nanoseconds())); + } + + void setPlaybackDuration(double playback_duration) + { + this->playback_duration = rclcpp::Duration::from_nanoseconds( + static_cast(RCUTILS_S_TO_NS(playback_duration))); + } + double getDelay() const { return RCUTILS_NS_TO_S(static_cast(this->delay.nanoseconds())); @@ -241,6 +252,10 @@ PYBIND11_MODULE(_transport, m) { "delay", &PlayOptions::getDelay, &PlayOptions::setDelay) + .def_property( + "playback_duration", + &PlayOptions::getPlaybackDuration, + &PlayOptions::setPlaybackDuration) .def_readwrite("disable_keyboard_controls", &PlayOptions::disable_keyboard_controls) .def_readwrite("start_paused", &PlayOptions::start_paused) .def_property( @@ -278,7 +293,7 @@ PYBIND11_MODULE(_transport, m) { py::class_(m, "Player") .def(py::init()) - .def("play", &rosbag2_py::Player::play) + .def("play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options")) ; py::class_(m, "Recorder") diff --git a/rosbag2_transport/CMakeLists.txt b/rosbag2_transport/CMakeLists.txt index e7d341fb66..20020c84a2 100644 --- a/rosbag2_transport/CMakeLists.txt +++ b/rosbag2_transport/CMakeLists.txt @@ -110,6 +110,12 @@ function(create_tests_for_rmw_implementation) AMENT_DEPS test_msgs rosbag2_test_common ${SKIP_TEST}) + rosbag2_transport_add_gmock(test_play_for + test/rosbag2_transport/test_play_for.cpp + INCLUDE_DIRS $ + LINK_LIBS rosbag2_transport + AMENT_DEPS test_msgs rosbag2_test_common) + rosbag2_transport_add_gmock(test_play_loop test/rosbag2_transport/test_play_loop.cpp LINK_LIBS rosbag2_transport diff --git a/rosbag2_transport/include/rosbag2_transport/play_options.hpp b/rosbag2_transport/include/rosbag2_transport/play_options.hpp index 2e8c012c5b..92225373cc 100644 --- a/rosbag2_transport/include/rosbag2_transport/play_options.hpp +++ b/rosbag2_transport/include/rosbag2_transport/play_options.hpp @@ -50,6 +50,10 @@ struct PlayOptions // Sleep before play. Negative durations invalid. Will delay at the beginning of each loop. rclcpp::Duration delay = rclcpp::Duration(0, 0); + // Determines the maximum duration of the playback. Negative durations will make the playback to + // not stop. Default configuration makes the player to not stop execution. + rclcpp::Duration playback_duration = rclcpp::Duration(-1, 0); + // Start paused. bool start_paused = false; diff --git a/rosbag2_transport/include/rosbag2_transport/player.hpp b/rosbag2_transport/include/rosbag2_transport/player.hpp index 02b0741f8d..9088f14fba 100644 --- a/rosbag2_transport/include/rosbag2_transport/player.hpp +++ b/rosbag2_transport/include/rosbag2_transport/player.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -37,6 +38,7 @@ #include "rosbag2_interfaces/srv/get_rate.hpp" #include "rosbag2_interfaces/srv/is_paused.hpp" #include "rosbag2_interfaces/srv/pause.hpp" +#include "rosbag2_interfaces/srv/play.hpp" #include "rosbag2_interfaces/srv/play_next.hpp" #include "rosbag2_interfaces/srv/burst.hpp" #include "rosbag2_interfaces/srv/resume.hpp" @@ -94,7 +96,7 @@ class Player : public rclcpp::Node virtual ~Player(); ROSBAG2_TRANSPORT_PUBLIC - void play(); + bool play(); // Playback control interface /// Pause the flow of time for playback. @@ -218,14 +220,17 @@ class Player : public rclcpp::Node rosbag2_storage::StorageOptions storage_options_; rosbag2_transport::PlayOptions play_options_; + rcutils_time_point_value_t play_until_time_ = -1; moodycamel::ReaderWriterQueue message_queue_; mutable std::future storage_loading_future_; + std::atomic_bool load_storage_content_{true}; std::unordered_map topic_qos_profile_overrides_; std::unique_ptr clock_; std::shared_ptr clock_publish_timer_; std::mutex skip_message_in_main_play_loop_mutex_; bool skip_message_in_main_play_loop_ RCPPUTILS_TSA_GUARDED_BY( skip_message_in_main_play_loop_mutex_) = false; + std::atomic_bool is_in_playback_{false}; rcutils_time_point_value_t starting_time_; @@ -236,6 +241,7 @@ class Player : public rclcpp::Node rclcpp::Service::SharedPtr srv_is_paused_; rclcpp::Service::SharedPtr srv_get_rate_; rclcpp::Service::SharedPtr srv_set_rate_; + rclcpp::Service::SharedPtr srv_play_; rclcpp::Service::SharedPtr srv_play_next_; rclcpp::Service::SharedPtr srv_burst_; rclcpp::Service::SharedPtr srv_seek_; diff --git a/rosbag2_transport/src/rosbag2_transport/player.cpp b/rosbag2_transport/src/rosbag2_transport/player.cpp index dfae9cae6f..e54b951e6a 100644 --- a/rosbag2_transport/src/rosbag2_transport/player.cpp +++ b/rosbag2_transport/src/rosbag2_transport/player.cpp @@ -159,6 +159,10 @@ Player::Player( set_rate(play_options_.rate); topic_qos_profile_overrides_ = play_options_.topic_qos_profile_overrides; prepare_publishers(); + + if (play_options_.playback_duration >= rclcpp::Duration(0, 0)) { + play_until_time_ = starting_time_ + play_options_.playback_duration.nanoseconds(); + } } create_control_services(); add_keyboard_callbacks(); @@ -191,8 +195,13 @@ bool Player::is_storage_completely_loaded() const return !storage_loading_future_.valid(); } -void Player::play() +bool Player::play() { + if (is_in_playback_.exchange(true)) { + RCLCPP_WARN_STREAM(get_logger(), "Trying to play() while in playback, dismissing request."); + return false; + } + rclcpp::Duration delay(0, 0); if (play_options_.delay >= rclcpp::Duration(0, 0)) { delay = play_options_.delay; @@ -202,21 +211,28 @@ void Player::play() "Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled."); } + RCLCPP_INFO_STREAM(get_logger(), "Playback duration value: " << play_until_time_); + try { do { if (delay > rclcpp::Duration(0, 0)) { RCLCPP_INFO_STREAM(get_logger(), "Sleep " << delay.nanoseconds() << " ns"); - std::chrono::nanoseconds duration(delay.nanoseconds()); - std::this_thread::sleep_for(duration); + std::chrono::nanoseconds delay_duration(delay.nanoseconds()); + std::this_thread::sleep_for(delay_duration); } { std::lock_guard lk(reader_mutex_); reader_->seek(starting_time_); clock_->jump(starting_time_); } + load_storage_content_ = true; storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();}); wait_for_filled_queue(); play_messages_from_queue(); + + load_storage_content_ = false; + if (storage_loading_future_.valid()) {storage_loading_future_.get();} + while (message_queue_.pop()) {} // cleanup queue { std::lock_guard lk(ready_to_play_from_queue_mutex_); is_ready_to_play_from_queue_ = false; @@ -225,6 +241,9 @@ void Player::play() } while (rclcpp::ok() && play_options_.loop); } catch (std::runtime_error & e) { RCLCPP_ERROR(get_logger(), "Failed to play: %s", e.what()); + load_storage_content_ = false; + if (storage_loading_future_.valid()) {storage_loading_future_.get();} + while (message_queue_.pop()) {} // cleanup queue } std::lock_guard lk(ready_to_play_from_queue_mutex_); is_ready_to_play_from_queue_ = false; @@ -254,6 +273,9 @@ void Player::play() } } } + + is_in_playback_ = false; + return true; } void Player::pause() @@ -344,6 +366,9 @@ bool Player::play_next() bool next_message_published = false; while (message_ptr != nullptr && !next_message_published) { + if (play_until_time_ >= starting_time_ && message_ptr->time_stamp > play_until_time_) { + break; + } { next_message_published = publish_message(message_ptr); clock_->jump(message_ptr->time_stamp); @@ -395,6 +420,7 @@ void Player::seek(rcutils_time_point_value_t time_point) // Restart queuing thread if it has finished running (previously reached end of bag), // otherwise, queueing should continue automatically after releasing mutex if (is_storage_completely_loaded() && rclcpp::ok()) { + load_storage_content_ = true; storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();}); } @@ -417,7 +443,7 @@ void Player::load_storage_content() static_cast(play_options_.read_ahead_queue_size * read_ahead_lower_bound_percentage_); auto queue_upper_boundary = play_options_.read_ahead_queue_size; - while (rclcpp::ok()) { + while (rclcpp::ok() && load_storage_content_) { TSAUniqueLock lk(reader_mutex_); if (!reader_->has_next()) {break;} @@ -456,6 +482,9 @@ void Player::play_messages_from_queue() ready_to_play_from_queue_cv_.notify_all(); } while (message_ptr != nullptr && rclcpp::ok()) { + if (play_until_time_ >= starting_time_ && message_ptr->time_stamp > play_until_time_) { + break; + } // Do not move on until sleep_until returns true // It will always sleep, so this is not a tight busy loop on pause while (rclcpp::ok() && !clock_->sleep_until(message_ptr->time_stamp)) { @@ -678,6 +707,16 @@ void Player::create_control_services() { response->success = set_rate(request->rate); }); + srv_play_ = create_service( + "~/play", + [this]( + rosbag2_interfaces::srv::Play::Request::ConstSharedPtr request, + rosbag2_interfaces::srv::Play::Response::SharedPtr response) + { + play_options_.start_offset = rclcpp::Time(request->start_offset).nanoseconds(); + play_options_.playback_duration = rclcpp::Duration(request->playback_duration); + response->success = play(); + }); srv_play_next_ = create_service( "~/play_next", [this]( diff --git a/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp new file mode 100644 index 0000000000..0ec975369f --- /dev/null +++ b/rosbag2_transport/test/rosbag2_transport/test_play_for.cpp @@ -0,0 +1,316 @@ +// Copyright 2022, Open Source Robotics Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License.Bosch Software Innovations GmbH. + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" + +#include "rosbag2_test_common/subscription_manager.hpp" + +#include "rosbag2_transport/player.hpp" +#include "rosbag2_transport/qos.hpp" + +#include "test_msgs/msg/arrays.hpp" +#include "test_msgs/msg/basic_types.hpp" +#include "test_msgs/message_fixtures.hpp" + +#include "rosbag2_play_test_fixture.hpp" +#include "rosbag2_transport_test_fixture.hpp" + +#include "mock_player.hpp" + +using namespace ::testing; // NOLINT +using namespace rosbag2_transport; // NOLINT +using namespace std::chrono_literals; // NOLINT +using namespace rosbag2_test_common; // NOLINT + +constexpr int kIntValue{32}; + +constexpr float kFloat1Value{40.}; +constexpr float kFloat2Value{2.}; +constexpr float kFloat3Value{0.}; + +constexpr bool kBool1Value{false}; +constexpr bool kBool2Value{true}; +constexpr bool kBool3Value{false}; + +#define EVAL_REPLAYED_PRIMITIVES(replayed_primitives) \ + EXPECT_THAT( \ + replayed_primitives, \ + Each(Pointee(Field(&test_msgs::msg::BasicTypes::int32_value, kIntValue)))) + +#define EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_float_array_primitive) \ + EXPECT_THAT( \ + replayed_float_array_primitive, \ + Each( \ + Pointee( \ + Field( \ + &test_msgs::msg::Arrays::float32_values, \ + ElementsAre(kFloat1Value, kFloat2Value, kFloat3Value))))) + +#define EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_bool_array_primitive) \ + EXPECT_THAT( \ + replayed_bool_array_primitive, \ + Each( \ + Pointee( \ + Field( \ + &test_msgs::msg::Arrays::bool_values, \ + ElementsAre(kBool1Value, kBool2Value, kBool3Value))))) + +class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture +{ +public: + static constexpr const char * kTopic1Name_{"topic1"}; + static constexpr const char * kTopic2Name_{"topic2"}; + static constexpr const char * kTopic1_{"/topic1"}; + static constexpr const char * kTopic2_{"/topic2"}; + + std::vector get_topic_types() + { + return {{kTopic1Name_, "test_msgs/BasicTypes", "", ""}, + {kTopic2Name_, "test_msgs/Arrays", "", ""}}; + } + + std::vector> + get_serialized_messages() + { + auto primitive_message1 = get_messages_basic_types()[0]; + primitive_message1->int32_value = kIntValue; + + auto complex_message1 = get_messages_arrays()[0]; + complex_message1->float32_values = {{kFloat1Value, kFloat2Value, kFloat3Value}}; + complex_message1->bool_values = {{kBool1Value, kBool2Value, kBool3Value}}; + + // @{ Ordering matters. The mock reader implementation moves messages + // around without any knowledge about message chronology. It just picks + // the next one Make sure to keep the list in order or sort it! + std::vector> messages = + {serialize_test_message(kTopic1Name_, 100, primitive_message1), + serialize_test_message(kTopic2Name_, 120, complex_message1), + serialize_test_message(kTopic1Name_, 200, primitive_message1), + serialize_test_message(kTopic2Name_, 220, complex_message1), + serialize_test_message(kTopic1Name_, 300, primitive_message1), + serialize_test_message(kTopic2Name_, 320, complex_message1)}; + // @} + return messages; + } + + void InitPlayerWithPlaybackDurationAndPlay( + int64_t milliseconds, + size_t expected_number_of_messages_on_topic1 = 3, + size_t expected_number_of_messages_on_topic2 = 3) + { + auto topic_types = get_topic_types(); + auto messages = get_serialized_messages(); + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + sub_->add_subscription( + kTopic1_, expected_number_of_messages_on_topic1); + sub_->add_subscription(kTopic2_, expected_number_of_messages_on_topic2); + + play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(milliseconds)); + player_ = std::make_shared(std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); + await_received_messages.get(); + } + + std::shared_ptr player_; +}; + + +TEST_F(RosBag2PlayForTestFixture, play_for_all_are_played_due_to_duration) +{ + InitPlayerWithPlaybackDurationAndPlay(350); + + auto replayed_test_primitives = sub_->get_received_messages( + kTopic1_); + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(3u))); + EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives); + + auto replayed_test_arrays = sub_->get_received_messages( + kTopic2_); + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(3u))); + EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); + EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); +} + +TEST_F(RosBag2PlayForTestFixture, play_for_none_are_played_due_to_duration) +{ + auto primitive_message1 = get_messages_basic_types()[0]; + auto primitive_message2 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 1; + primitive_message2->int32_value = 2; + + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message(kTopic1Name_, 50, primitive_message1), + serialize_test_message(kTopic1Name_, 100, primitive_message2), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + // Expect to receive messages only from play_next in second round + sub_->add_subscription(kTopic1_, messages.size()); + play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(49)); + + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); + + // Playing one more time with play_next to save time and count messages + player_->pause(); + auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();}); + + EXPECT_FALSE(player_->play_next()); + player_->resume(); + player_future.get(); + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(0)); +} + +TEST_F(RosBag2PlayForTestFixture, play_for_less_than_the_total_duration) +{ + auto primitive_message1 = get_messages_basic_types()[0]; + auto primitive_message2 = get_messages_basic_types()[0]; + primitive_message1->int32_value = 1; + primitive_message2->int32_value = 2; + + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; + + std::vector> messages = + { + serialize_test_message(kTopic1Name_, 10, primitive_message1), + serialize_test_message(kTopic1Name_, 50, primitive_message2), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + // Expect to receive 1 message from play() and 2 messages from play_next in second round + sub_->add_subscription(kTopic1_, messages.size() + 1); + play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(49)); + + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + ASSERT_TRUE(player_->play()); + + // Playing one more time with play_next to save time and count messages + player_->pause(); + auto player_future = std::async(std::launch::async, [&player_]() -> void {player_->play();}); + + ASSERT_TRUE(player_->play_next()); + ASSERT_FALSE(player_->play_next()); + player_->resume(); + player_future.get(); + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(2)); + EXPECT_EQ(replayed_topic1[0]->int32_value, 1); + EXPECT_EQ(replayed_topic1[1]->int32_value, 1); +} + +TEST_F( + RosBag2PlayForTestFixture, + play_for_intermediate_duration_recorded_messages_with_filtered_topics) +{ + // Filter allows /topic2, blocks /topic1 + play_options_.topics_to_filter = {"topic2"}; + InitPlayerWithPlaybackDurationAndPlay(270, 0, 2); + + auto replayed_test_primitives = + sub_->get_received_messages("/topic1"); + // No messages are allowed to have arrived + EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u))); + + auto replayed_test_arrays = sub_->get_received_messages("/topic2"); + // Some messages should have arrived. + EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(2u))); + EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays); + EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays); +} + +TEST_F(RosBag2PlayForTestFixture, play_should_return_false_when_interrupted) +{ + auto topic_types = std::vector{ + {kTopic1Name_, "test_msgs/BasicTypes", "", ""}}; + + auto primitive_message = get_messages_basic_types()[0]; + primitive_message->int32_value = kIntValue; + std::vector> messages = + { + serialize_test_message(kTopic1Name_, 50, primitive_message), + serialize_test_message(kTopic1Name_, 100, primitive_message), + }; + + auto prepared_mock_reader = std::make_unique(); + prepared_mock_reader->prepare(messages, topic_types); + auto reader = std::make_unique(std::move(prepared_mock_reader)); + + // Let the player only reproduce one message. + sub_->add_subscription(kTopic1_, 1); + play_options_.playback_duration = rclcpp::Duration(std::chrono::milliseconds(75)); + + std::shared_ptr player_ = std::make_shared( + std::move(reader), storage_options_, play_options_); + + // Wait for discovery to match publishers with subscribers + ASSERT_TRUE(sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), 5s)); + + auto await_received_messages = sub_->spin_subscriptions(); + player_->pause(); + auto player_future = std::async(std::launch::async, [player_]() {return player_->play();}); + player_->wait_for_playback_to_start(); + ASSERT_TRUE(player_->is_paused()); + ASSERT_FALSE(player_->play()); + + player_->resume(); + player_future.get(); + await_received_messages.get(); + auto replayed_topic1 = sub_->get_received_messages(kTopic1_); + EXPECT_THAT(replayed_topic1, SizeIs(1)); +}