Skip to content

Commit

Permalink
Solves most of the comments from https://github.com/ros2/rosbag2/pull…
Browse files Browse the repository at this point in the history
…/960/files (#14)

* Add play-for functionality

Signed-off-by: Geoffrey Biggs <[email protected]>

* Add play-for to the CLI

Signed-off-by: Geoffrey Biggs <[email protected]>

* Adds playback_duration to PlayOptions.

* Changes from PlayFor to Play srv message and changes start_offset and playback_duration.

* Restores play_for tests.

* Removes extra SubscriptionManager methods.

* Solves comment about extra sent message.

* Reorders code and comment.

* Removes the SKIP_TEST flag.

Co-authored-by: Geoffrey Biggs <[email protected]>
Signed-off-by: Geoffrey Biggs <[email protected]>
  • Loading branch information
agalbachicar and gbiggs committed May 11, 2022
1 parent 03b4aa9 commit a9b779f
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 69 deletions.
5 changes: 5 additions & 0 deletions ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ def add_arguments(self, parser, cli_name): # noqa: D102
parser.add_argument(
'-d', '--delay', type=positive_float, default=0.0,
help='Sleep duration before play (each loop), in seconds. Negative durations invalid.')
parser.add_argument(
'--playback_duration', type=float, default=-1.0,
help='Playback duration, in seconds. Negative durations mark an infinite playback. '
'Default is -1.0.')
parser.add_argument(
'--disable-keyboard-controls', action='store_true',
help='disables keyboard controls for playback')
Expand Down Expand Up @@ -150,6 +154,7 @@ def main(self, *, args): # noqa: D102
play_options.topic_remapping_options = topic_remapping
play_options.clock_publish_frequency = args.clock
play_options.delay = args.delay
play_options.playback_duration = args.playback_duration
play_options.disable_keyboard_controls = args.disable_keyboard_controls
play_options.start_paused = args.start_paused
play_options.start_offset = args.start_offset
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_interfaces/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ rosidl_generate_interfaces(${PROJECT_NAME}
"srv/GetRate.srv"
"srv/IsPaused.srv"
"srv/Pause.srv"
"srv/PlayFor.srv"
"srv/Play.srv"
"srv/PlayNext.srv"
"srv/Resume.srv"
"srv/Seek.srv"
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_interfaces/srv/Play.srv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# See rosbag2_transport::PlayOptions::start_time
builtin_interfaces/Time start_offset
# See rosbag2_transport::PlayOptions::playback_duration
builtin_interfaces/Duration playback_duration
---
bool success # return true
18 changes: 16 additions & 2 deletions rosbag2_py/src/rosbag2_py/_transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ struct OptionsWrapper : public T
static_cast<rcl_duration_value_t>(RCUTILS_S_TO_NS(delay)));
}

double getPlaybackDuration() const
{
return RCUTILS_NS_TO_S(static_cast<double>(this->playback_duration.nanoseconds()));
}

void setPlaybackDuration(double playback_duration)
{
this->playback_duration = rclcpp::Duration::from_nanoseconds(
static_cast<rcl_duration_value_t>(RCUTILS_S_TO_NS(playback_duration)));
}

double getDelay() const
{
return RCUTILS_NS_TO_S(static_cast<double>(this->delay.nanoseconds()));
Expand Down Expand Up @@ -242,6 +253,10 @@ PYBIND11_MODULE(_transport, m) {
"delay",
&PlayOptions::getDelay,
&PlayOptions::setDelay)
.def_property(
"playback_duration",
&PlayOptions::getPlaybackDuration,
&PlayOptions::setPlaybackDuration)
.def_readwrite("disable_keyboard_controls", &PlayOptions::disable_keyboard_controls)
.def_readwrite("start_paused", &PlayOptions::start_paused)
.def_property(
Expand Down Expand Up @@ -278,8 +293,7 @@ PYBIND11_MODULE(_transport, m) {
py::class_<rosbag2_py::Player>(m, "Player")
.def(py::init())
.def(
"play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg(
"play_options"), py::arg("duration") = std::nullopt)
"play", &rosbag2_py::Player::play, py::arg("storage_options"), py::arg("play_options"))
;

py::class_<rosbag2_py::Recorder>(m, "Recorder")
Expand Down
3 changes: 1 addition & 2 deletions rosbag2_transport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,7 @@ function(create_tests_for_rmw_implementation)
test/rosbag2_transport/test_play_for.cpp
INCLUDE_DIRS $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/src/rosbag2_transport>
LINK_LIBS rosbag2_transport
AMENT_DEPS test_msgs rosbag2_test_common
${SKIP_TEST})
AMENT_DEPS test_msgs rosbag2_test_common)

rosbag2_transport_add_gmock(test_play_loop
test/rosbag2_transport/test_play_loop.cpp
Expand Down
4 changes: 4 additions & 0 deletions rosbag2_transport/include/rosbag2_transport/play_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ struct PlayOptions
// Sleep before play. Negative durations invalid. Will delay at the beginning of each loop.
rclcpp::Duration delay = rclcpp::Duration(0, 0);

// Determines the maximum duration of the playback. Negative durations will make the playback to
// not stop. Default configuration makes the player to not stop execution.
rclcpp::Duration playback_duration = rclcpp::Duration(-1, 0);

// Start paused.
bool start_paused = false;

Expand Down
5 changes: 3 additions & 2 deletions rosbag2_transport/include/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
#include "rosbag2_interfaces/srv/get_rate.hpp"
#include "rosbag2_interfaces/srv/is_paused.hpp"
#include "rosbag2_interfaces/srv/pause.hpp"
#include "rosbag2_interfaces/srv/play_for.hpp"
#include "rosbag2_interfaces/srv/play.hpp"
#include "rosbag2_interfaces/srv/play_next.hpp"
#include "rosbag2_interfaces/srv/burst.hpp"
#include "rosbag2_interfaces/srv/resume.hpp"
Expand Down Expand Up @@ -200,7 +200,7 @@ class Player : public rclcpp::Node
bool is_storage_completely_loaded() const;
void enqueue_up_to_boundary(size_t boundary) RCPPUTILS_TSA_REQUIRES(reader_mutex_);
void wait_for_filled_queue() const;
void play_messages_from_queue(const std::optional<rcutils_duration_value_t> & play_until_time);
void play_messages_from_queue(const rcutils_duration_value_t & play_until_time);
void prepare_publishers();
bool publish_message(rosbag2_storage::SerializedBagMessageSharedPtr message);
static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
Expand Down Expand Up @@ -238,6 +238,7 @@ class Player : public rclcpp::Node
rclcpp::Service<rosbag2_interfaces::srv::IsPaused>::SharedPtr srv_is_paused_;
rclcpp::Service<rosbag2_interfaces::srv::GetRate>::SharedPtr srv_get_rate_;
rclcpp::Service<rosbag2_interfaces::srv::SetRate>::SharedPtr srv_set_rate_;
rclcpp::Service<rosbag2_interfaces::srv::Play>::SharedPtr srv_play_;
rclcpp::Service<rosbag2_interfaces::srv::PlayNext>::SharedPtr srv_play_next_;
rclcpp::Service<rosbag2_interfaces::srv::PlayFor>::SharedPtr srv_play_for_;
rclcpp::Service<rosbag2_interfaces::srv::Burst>::SharedPtr srv_burst_;
Expand Down
42 changes: 32 additions & 10 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,25 +202,33 @@ void Player::play(const std::optional<rcutils_duration_value_t> & duration)
"Invalid delay value: " << play_options_.delay.nanoseconds() << ". Delay is disabled.");
}

rcutils_time_point_value_t play_until_time = starting_time_;
if (play_options_.playback_duration >= rclcpp::Duration(0, 0)) {
play_until_time += play_options_.playback_duration.nanoseconds();
} else {
play_until_time = -1;
RCLCPP_INFO_STREAM(
get_logger(),
"Invalid playback duration value: " << play_options_.playback_duration.nanoseconds() <<
". Playback duration is disabled.");
}
RCLCPP_INFO_STREAM(get_logger(), "Playback duration value: " << play_until_time);

try {
do {
if (delay > rclcpp::Duration(0, 0)) {
RCLCPP_INFO_STREAM(get_logger(), "Sleep " << delay.nanoseconds() << " ns");
std::chrono::nanoseconds delay_duration(delay.nanoseconds());
std::this_thread::sleep_for(delay_duration);
}
std::optional<rcutils_time_point_value_t> play_until_time{};
if (duration.has_value() && *duration > 0) {
play_until_time = {starting_time_ + *duration};
}
{
std::lock_guard<std::mutex> lk(reader_mutex_);
reader_->seek(starting_time_);
clock_->jump(starting_time_);
}
storage_loading_future_ = std::async(std::launch::async, [this]() {load_storage_content();});
wait_for_filled_queue();
play_messages_from_queue({play_until_time});
play_messages_from_queue(play_until_time);
{
std::lock_guard<std::mutex> lk(ready_to_play_from_queue_mutex_);
is_ready_to_play_from_queue_ = false;
Expand Down Expand Up @@ -446,8 +454,7 @@ void Player::enqueue_up_to_boundary(size_t boundary)
}
}

void Player::play_messages_from_queue(
const std::optional<rcutils_duration_value_t> & play_until_time)
void Player::play_messages_from_queue(const rcutils_duration_value_t & play_until_time)
{
// Note: We need to use message_queue_.peek() instead of message_queue_.try_dequeue(message)
// to support play_next() API logic.
Expand All @@ -461,11 +468,12 @@ void Player::play_messages_from_queue(
ready_to_play_from_queue_cv_.notify_all();
}
while (message_ptr != nullptr && rclcpp::ok()) {
// Do not move on until sleep_until returns true
// It will always sleep, so this is not a tight busy loop on pause
if (play_until_time.has_value() && message_ptr->time_stamp > *play_until_time) {
rosbag2_storage::SerializedBagMessageSharedPtr message = *message_ptr;
if (play_until_time >= starting_time_ && message->time_stamp > play_until_time) {
break;
}
// Do not move on until sleep_until returns true
// It will always sleep, so this is not a tight busy loop on pause
while (rclcpp::ok() && !clock_->sleep_until(message_ptr->time_stamp)) {
if (std::atomic_exchange(&cancel_wait_for_next_message_, false)) {
break;
Expand Down Expand Up @@ -686,6 +694,20 @@ void Player::create_control_services()
{
response->success = set_rate(request->rate);
});
srv_play_ = create_service<rosbag2_interfaces::srv::Play>(
"~/play",
[this](
rosbag2_interfaces::srv::Play::Request::ConstSharedPtr request,
rosbag2_interfaces::srv::Play::Response::SharedPtr response)
{
play_options_.start_offset =
static_cast<rcutils_time_point_value_t>(RCUTILS_S_TO_NS(request->start_offset.sec)) +
static_cast<rcutils_time_point_value_t>(request->start_offset.nanosec);
play_options_.playback_duration = rclcpp::Duration(
request->playback_duration.sec, request->playback_duration.nanosec);
play();
response->success = true;
});
srv_play_next_ = create_service<rosbag2_interfaces::srv::PlayNext>(
"~/play_next",
[this](
Expand Down
83 changes: 31 additions & 52 deletions rosbag2_transport/test/rosbag2_transport/test_play_for.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@
#include "rosbag2_play_test_fixture.hpp"
#include "rosbag2_transport_test_fixture.hpp"

#include "mock_player.hpp"

using namespace ::testing; // NOLINT
using namespace rosbag2_transport; // NOLINT
using namespace std::chrono_literals; // NOLINT
using namespace rosbag2_test_common; // NOLINT


constexpr int kIntValue{32};

constexpr float kFloat1Value{40.};
Expand Down Expand Up @@ -113,7 +114,7 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture
return messages;
}

void SetUp() override
void InitPlayerWithPlaybackDurationAndPlay(int64_t milliseconds)
{
auto topic_types = get_topic_types();
auto messages = get_serialized_messages();
Expand All @@ -122,50 +123,48 @@ class RosBag2PlayForTestFixture : public RosBag2PlayTestFixture
prepared_mock_reader->prepare(messages, topic_types);
auto reader = std::make_unique<rosbag2_cpp::Reader>(std::move(prepared_mock_reader));

// Due to a problem related to the subscriber, we play many (3) messages but make the subscriber
// node spin only until 2 have arrived. Hence the 2 as `launch_subscriber()` argument.
sub_->add_subscription<test_msgs::msg::BasicTypes>(kTopic1, 2);
sub_->add_subscription<test_msgs::msg::Arrays>(kTopic2, 2);
sub_->add_subscription<test_msgs::msg::BasicTypes>(kTopic1, 3);
sub_->add_subscription<test_msgs::msg::Arrays>(kTopic2, 3);

player_ = std::make_shared<rosbag2_transport::Player>(
play_options_.playback_duration =
rclcpp::Duration(std::chrono::nanoseconds(std::chrono::milliseconds(milliseconds)));
player_ = std::make_shared<MockPlayer>(
std::move(
reader), storage_options_, play_options_);
// Wait for discovery to match publishers with subscribers
ASSERT_TRUE(
sub_->spin_and_wait_for_matched(player_->get_list_of_publishers(), std::chrono::seconds(5)));

auto await_received_messages = sub_->spin_subscriptions();

player_->play();

await_received_messages.get();
}

std::shared_ptr<rosbag2_transport::Player> player_;
std::future<void> await_received_messages_;
std::shared_ptr<MockPlayer> player_;
};


TEST_F(RosBag2PlayForTestFixture, play_for_all_are_played_due_to_duration)
{
auto await_received_messages = sub_->spin_subscriptions();

player_->play(std::chrono::nanoseconds(std::chrono::milliseconds(1000)).count());

await_received_messages.get();
InitPlayerWithPlaybackDurationAndPlay(1000);

auto replayed_test_primitives = sub_->get_received_messages<test_msgs::msg::BasicTypes>(
kTopic1);
EXPECT_THAT(replayed_test_primitives, SizeIs(Ge(2u)));
EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(3u)));
EVAL_REPLAYED_PRIMITIVES(replayed_test_primitives);

auto replayed_test_arrays = sub_->get_received_messages<test_msgs::msg::Arrays>(
kTopic2);
EXPECT_THAT(replayed_test_arrays, SizeIs(Ge(2u)));
EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(3u)));
EVAL_REPLAYED_BOOL_ARRAY_PRIMITIVES(replayed_test_arrays);
EVAL_REPLAYED_FLOAT_ARRAY_PRIMITIVES(replayed_test_arrays);
}

TEST_F(RosBag2PlayForTestFixture, play_for_none_are_played_due_to_duration)
{
const rcutils_duration_value_t duration =
std::chrono::nanoseconds(std::chrono::milliseconds(300)).count();

auto await_received_messages = sub_->spin_subscriptions_for(duration);

player_->play(duration);

await_received_messages.get();
InitPlayerWithPlaybackDurationAndPlay(300);

auto replayed_test_primitives = sub_->get_received_messages<test_msgs::msg::BasicTypes>(
kTopic1);
Expand All @@ -178,14 +177,7 @@ TEST_F(RosBag2PlayForTestFixture, play_for_none_are_played_due_to_duration)

TEST_F(RosBag2PlayForTestFixture, play_for_less_than_the_total_duration)
{
const rcutils_duration_value_t duration =
std::chrono::nanoseconds(std::chrono::milliseconds(800)).count();

auto await_received_messages = sub_->spin_subscriptions_for(duration);

player_->play(duration);

await_received_messages.get();
InitPlayerWithPlaybackDurationAndPlay(800);

auto replayed_test_primitives = sub_->get_received_messages<test_msgs::msg::BasicTypes>(
kTopic1);
Expand All @@ -206,24 +198,19 @@ class RosBag2PlayForFilteredTopicTestFixture : public RosBag2PlayForTestFixture
{
// Filter allows /topic2, blocks /topic1
play_options_.topics_to_filter = {"topic2"};
RosBag2PlayForTestFixture::SetUp();
}
};

TEST_F(
RosBag2PlayForFilteredTopicTestFixture,
play_for_full_duration_recorded_messages_with_filtered_topics)
{
auto await_received_messages = sub_->spin_subscriptions();

player_->play(std::chrono::nanoseconds(std::chrono::milliseconds(1000)).count());

await_received_messages.get();
InitPlayerWithPlaybackDurationAndPlay(1000);

auto replayed_test_primitives =
sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1");
// No messages are allowed to have arrived
EXPECT_THAT(replayed_test_primitives, SizeIs(0u));
EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u)));

auto replayed_test_arrays = sub_->get_received_messages<test_msgs::msg::Arrays>("/topic2");
// All messages should have arrived.
Expand All @@ -236,36 +223,28 @@ TEST_F(
RosBag2PlayForFilteredTopicTestFixture,
play_for_short_duration_recorded_messages_with_filtered_topics)
{
auto await_received_messages = sub_->spin_subscriptions();

player_->play(std::chrono::nanoseconds(std::chrono::milliseconds(300)).count());

await_received_messages.get();
InitPlayerWithPlaybackDurationAndPlay(300);

auto replayed_test_primitives =
sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1");
// No messages are allowed to have arrived
EXPECT_THAT(replayed_test_primitives, SizeIs(0u));
EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u)));

auto replayed_test_arrays = sub_->get_received_messages<test_msgs::msg::Arrays>("/topic2");
// All messages should have arrived.
EXPECT_THAT(replayed_test_arrays, SizeIs(0u));
EXPECT_THAT(replayed_test_arrays, SizeIs(Eq(0u)));
}

TEST_F(
RosBag2PlayForFilteredTopicTestFixture,
play_for_intermediate_duration_recorded_messages_with_filtered_topics)
{
auto await_received_messages = sub_->spin_subscriptions();

player_->play(std::chrono::nanoseconds(std::chrono::milliseconds(800)).count());

await_received_messages.get();
InitPlayerWithPlaybackDurationAndPlay(800);

auto replayed_test_primitives =
sub_->get_received_messages<test_msgs::msg::BasicTypes>("/topic1");
// No messages are allowed to have arrived
EXPECT_THAT(replayed_test_primitives, SizeIs(0u));
EXPECT_THAT(replayed_test_primitives, SizeIs(Eq(0u)));

auto replayed_test_arrays = sub_->get_received_messages<test_msgs::msg::Arrays>("/topic2");
// Some messages should have arrived.
Expand Down

0 comments on commit a9b779f

Please sign in to comment.