Skip to content

Commit

Permalink
Use serialized message directly (ros2#24)
Browse files Browse the repository at this point in the history
* Adapt new interface

* Try to write and read rcutils_char_array_t BLOBs in sqlite

* Add simple test for arbitrary char ptr

* Refactor SqliteWrapper and add tests

* Write and read actual timestamp from serialized message and add relative tests

* Add SqliteStatementWrapper class and refactor SqliteStorage and SqliteWrapper

* Refactor test fixture

* ros2GH-50 Assert message content in write_integration_test, and remove TODOs

* ros2GH-50 Remove sqlite_storage_plugin unit tests

* ros2GH-50 Refactor SqliteStatements and SqliteStorage

* ros2GH-50 Fix build after rebase

* ros2GH-50 Make has_next() method no more const

* ros2GH-52 Extend statement wrapper with a generic bind

* ros2GH-50 Refactor after rebase

* ros2GH-59 cleanup db interface

- Remove virtual on methods as this was added only for unit tests. We
  decided to use only integration tests for the sqlite plugins.
- Changes semantics of SqliteStatement: represents always a prepared
  statement if not null.
- Ensures that a SqliteStatementWrapper cannot be copied and does not
  publicly expose its sqlite_stmt as this would cause memory corruption.

* ros2GH-59 Introduce general read interface for sqlite statements

- Uses a std::tuple for row data
- Exposes an iterator interface for the query result

* ros2GH-59 Cleanup: remove unused files

* ros2GH-59 make sqlite interface fluent

* ros2GH-59 move creation of serialized message to rosbag2_storage

This is not storage plugin specific but will be needed by most (if
not all) plugins.

* Change rcutil_char_array_t to rmw_serialized_message_t in subscriber

* Remove debugging output in test
  • Loading branch information
Martin-Idel-SI authored and Karsten1987 committed Sep 3, 2018
1 parent fb3c664 commit 3f77b33
Show file tree
Hide file tree
Showing 25 changed files with 940 additions and 201 deletions.
20 changes: 11 additions & 9 deletions rosbag2/src/rosbag2/rosbag2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ void Rosbag2::record(
std::function<void(void)> after_write_action)
{
rosbag2_storage::StorageFactory factory;
auto storage =
factory.open_read_write(file_name, "sqlite3");
auto storage = factory.open_read_write(file_name, "sqlite3");
storage->create_topic();

if (storage) {
Expand All @@ -50,6 +49,13 @@ void Rosbag2::record(
[&storage, after_write_action](std::shared_ptr<rmw_serialized_message_t> msg) {
auto bag_msg = std::make_shared<rosbag2_storage::SerializedBagMessage>();
bag_msg->serialized_data = msg;
rcutils_time_point_value_t time_stamp;
int error = rcutils_system_time_now(&time_stamp);
if (error != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rosbag2", "Error getting current time. Error: %s", rcutils_get_error_string_safe());
}
bag_msg->time_stamp = time_stamp;
storage->write(bag_msg);
if (after_write_action) {
after_write_action();
Expand All @@ -64,22 +70,18 @@ void Rosbag2::record(
void Rosbag2::play(const std::string & file_name, const std::string & topic_name)
{
rosbag2_storage::StorageFactory factory;
auto storage =
factory.open_read_only(file_name, "sqlite3");
auto storage = factory.open_read_only(file_name, "sqlite3");

if (storage) {
auto node = std::make_shared<rclcpp::Node>("rosbag_publisher_node");
auto publisher = node->create_publisher<std_msgs::msg::String>(topic_name);
while (storage->has_next()) {
auto message = storage->read_next();

std_msgs::msg::String string_message;
// TODO(Martin-Idel-SI): We don't write a correct serialized_data in sqlite3_storage for now
// Change once available
string_message.data = std::string(message->serialized_data->buffer);
// without the sleep_for() many messages are lost.
std::this_thread::sleep_for(std::chrono::milliseconds(500));
publisher->publish(string_message);
publisher->publish(message->serialized_data.get());
RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, "published message");
}
}
}
Expand Down
38 changes: 8 additions & 30 deletions rosbag2/test/rosbag2/rosbag2_read_integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,34 +76,12 @@ TEST_F(RosBag2IntegrationTestFixture, recorded_messages_are_played)
{
rclcpp::init(0, nullptr);

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages;
for (int i : {1, 2, 3}) {
(void) i;
auto msg = std::make_shared<rosbag2_storage::SerializedBagMessage>();
auto payload = new rcutils_char_array_t;
*payload = rcutils_get_zero_initialized_char_array();
payload->allocator = rcutils_get_default_allocator();
// TODO(Martin-Idel-SI) The real serialized string message has 8 leading chars in CDR
std::string string_message_to_publish = "bbbbbbbbHello World";
auto ret = rcutils_char_array_resize(payload, strlen(string_message_to_publish.c_str()) + 1);
if (ret != RCUTILS_RET_OK) {
FAIL() << " Failed to resize serialized bag message";
}
memcpy(payload->buffer,
string_message_to_publish.c_str(),
strlen(string_message_to_publish.c_str()) + 1);

msg->serialized_data = std::shared_ptr<rcutils_char_array_t>(payload,
[](rcutils_char_array_t * msg) {
auto error = rcutils_char_array_fini(msg);
delete msg;
if (error != RCUTILS_RET_OK) {
FAIL() << " Failed to destroy serialized bag message";
}
});
messages.push_back(msg);
}
ASSERT_NO_THROW(write_messages(database_name_, messages));
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages =
{serialize_message("Hello World 1"),
serialize_message("Hello World 2"),
serialize_message("Hello World 2")};

write_messages(database_name_, messages);

// Due to a problem related to the subscriber, we play many (3) messages but make the subscriber
// node spin only until 2 have arrived. Hence the 2 as `launch_subscriber()` argument.
Expand All @@ -112,8 +90,8 @@ TEST_F(RosBag2IntegrationTestFixture, recorded_messages_are_played)

auto replayed_messages = subscriber_future_.get();
ASSERT_THAT(replayed_messages, SizeIs(2));
ASSERT_THAT(replayed_messages[0], Eq("Hello World"));
ASSERT_THAT(replayed_messages[1], Eq("Hello World"));
ASSERT_THAT(replayed_messages[0], Eq("Hello World 1"));
ASSERT_THAT(replayed_messages[1], Eq("Hello World 2"));

rclcpp::shutdown();
}
69 changes: 60 additions & 9 deletions rosbag2/test/rosbag2/rosbag2_test_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
#include <string>
#include <vector>

#include "rclcpp/rclcpp.hpp"
#include "std_msgs/msg/string.hpp"

#ifdef _WIN32
# include <direct.h>
# include <Windows.h>
Expand Down Expand Up @@ -107,22 +110,70 @@ class Rosbag2TestFixture : public Test
return table_msgs;
}

void
write_messages(
void write_messages(
const std::string & db_name,
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> messages)
{
rosbag2_storage::StorageFactory factory;
auto storage =
factory.open_read_write(db_name, "sqlite3");
if (storage == nullptr) {
throw std::runtime_error("failed to open sqlite3 storage");
auto storage = factory.open_read_write(db_name, "sqlite3");

if (storage) {
storage->create_topic();
for (auto msg : messages) {
storage->write(msg);
}
}
}

std::shared_ptr<rosbag2_storage::SerializedBagMessage> serialize_message(std::string message)
{
auto bag_msg = std::make_shared<rosbag2_storage::SerializedBagMessage>();

auto test_message = std::make_shared<std_msgs::msg::String>();
test_message->data = message;

auto rcutils_allocator = rcutils_get_default_allocator();
auto initial_capacity = 8u + static_cast<size_t>(test_message->data.size());
auto msg = new rcutils_char_array_t;
*msg = rcutils_get_zero_initialized_char_array();
auto ret = rcutils_char_array_init(msg, initial_capacity, &rcutils_allocator);
if (ret != RCUTILS_RET_OK) {
throw std::runtime_error("Error allocating resources for serialized message" +
std::to_string(ret));
}

storage->create_topic();
for (auto msg : messages) {
storage->write(msg);
bag_msg->serialized_data = std::shared_ptr<rcutils_char_array_t>(msg,
[](rcutils_char_array_t * msg) {
int error = rcutils_char_array_fini(msg);
delete msg;
if (error != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rosbag2", "Leaking memory. Error: %s", rcutils_get_error_string_safe());
}
});

bag_msg->serialized_data->buffer_length = initial_capacity;

auto string_ts =
rosidl_typesupport_cpp::get_message_type_support_handle<std_msgs::msg::String>();

auto error = rmw_serialize(test_message.get(), string_ts, bag_msg->serialized_data.get());
if (error != RMW_RET_OK) {
throw std::runtime_error("Something went wrong preparing the serialized message");
}

return bag_msg;
}

std::string deserialize_message(
std::shared_ptr<rosbag2_storage::SerializedBagMessage> serialized_message)
{
char * copied = new char[serialized_message->serialized_data->buffer_length];
auto string_length = serialized_message->serialized_data->buffer_length - 8;
memcpy(copied, &serialized_message->serialized_data->buffer[8], string_length);
std::string message_content(copied);
delete[] copied;
return message_content;
}

std::string database_name_;
Expand Down
14 changes: 7 additions & 7 deletions rosbag2/test/rosbag2/rosbag2_write_integration_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,13 @@ class RosBag2IntegrationTestFixture : public Rosbag2TestFixture
rclcpp::shutdown();
}

void publish_messages(std::vector<std::string> messages)
void publish_messages(std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> msgs)
{
size_t expected_counter_value = messages.size();
size_t expected_counter_value = msgs.size();
auto node = std::make_shared<rclcpp::Node>("publisher_node");
auto publisher = node->create_publisher<std_msgs::msg::String>("string_topic");
auto timer = node->create_wall_timer(50ms, [this, publisher, messages]() {
auto msg = std_msgs::msg::String();
msg.data = messages[counter_];
publisher->publish(msg);
auto timer = node->create_wall_timer(50ms, [this, publisher, msgs]() {
publisher->publish(msgs[counter_]->serialized_data.get());
});

while (counter_ < expected_counter_value) {
Expand All @@ -85,12 +83,14 @@ class RosBag2IntegrationTestFixture : public Rosbag2TestFixture
TEST_F(RosBag2IntegrationTestFixture, published_messages_are_recorded)
{
std::string message_to_publish = "test_message";
auto serialized_message = serialize_message(message_to_publish);

start_recording();
publish_messages({message_to_publish});
publish_messages({serialized_message});
stop_recording();

auto recorded_messages = get_messages(database_name_);

ASSERT_THAT(recorded_messages, Not(IsEmpty()));
EXPECT_THAT(deserialize_message(recorded_messages[0]), Eq(message_to_publish));
}
8 changes: 8 additions & 0 deletions rosbag2_storage/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ find_package(rcutils REQUIRED)
add_library(
rosbag2_storage
SHARED
src/rosbag2_storage/ros_helper.cpp
src/storage_factory.cpp)
target_include_directories(rosbag2_storage PUBLIC include)
ament_target_dependencies(
Expand Down Expand Up @@ -75,6 +76,13 @@ if(BUILD_TESTING)
target_include_directories(test_storage_factory PRIVATE include)
target_link_libraries(test_storage_factory rosbag2_storage)
endif()

ament_add_gmock(test_ros_helper
test/test_ros_helper.cpp)
if(TARGET test_ros_helper)
target_include_directories(test_ros_helper PRIVATE include)
target_link_libraries(test_ros_helper rosbag2_storage)
endif()
endif()

ament_export_include_directories(include)
Expand Down
33 changes: 33 additions & 0 deletions rosbag2_storage/include/rosbag2_storage/ros_helper.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2018 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_STORAGE__ROS_HELPER_HPP_
#define ROSBAG2_STORAGE__ROS_HELPER_HPP_

#include <memory>

#include "rcutils/types.h"

#include "rosbag2_storage/visibility_control.hpp"

namespace rosbag2_storage
{

ROSBAG2_STORAGE_PUBLIC
std::shared_ptr<rcutils_char_array_t>
make_serialized_message(const void * data, size_t size);

} // namespace rosbag2_storage

#endif // ROSBAG2_STORAGE__ROS_HELPER_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ROSBAG2_STORAGE_PUBLIC BaseReadInterface
public:
virtual ~BaseReadInterface() = default;

virtual bool has_next() const = 0;
virtual bool has_next() = 0;

virtual std::shared_ptr<SerializedBagMessage> read_next() = 0;
};
Expand Down
56 changes: 56 additions & 0 deletions rosbag2_storage/src/rosbag2_storage/ros_helper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2018 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "rosbag2_storage/ros_helper.hpp"

#include <memory>
#include <string>

#include "rcutils/logging_macros.h"
#include "rcutils/types.h"

namespace rosbag2_storage
{

std::shared_ptr<rcutils_char_array_t>
make_serialized_message(const void * data, size_t size)
{
auto rcutils_allocator = new rcutils_allocator_t;
*rcutils_allocator = rcutils_get_default_allocator();
auto msg = new rcutils_char_array_t;
*msg = rcutils_get_zero_initialized_char_array();
auto ret = rcutils_char_array_init(msg, size, rcutils_allocator);
if (ret != RCUTILS_RET_OK) {
throw std::runtime_error("Error allocating resources for serialized message: " +
std::string(rcutils_get_error_string_safe()));
}

auto serialized_message = std::shared_ptr<rcutils_char_array_t>(msg,
[](rcutils_char_array_t * msg) {
int error = rcutils_char_array_fini(msg);
delete msg;
if (error != RCUTILS_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rosbag2_storage",
"Leaking memory. Error: %s", rcutils_get_error_string_safe());
}
});

memcpy(serialized_message->buffer, data, size);
serialized_message->buffer_length = size;

return serialized_message;
}

} // namespace rosbag2_storage
2 changes: 1 addition & 1 deletion rosbag2_storage/test/test_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ rosbag2_storage::BagInfo TestPlugin::info()
return rosbag2_storage::BagInfo();
}

bool TestPlugin::has_next() const
bool TestPlugin::has_next()
{
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_storage/test/test_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class TestPlugin : public rosbag2_storage::storage_interfaces::ReadWriteInterfac

rosbag2_storage::BagInfo info() override;

bool has_next() const override;
bool has_next() override;

std::shared_ptr<rosbag2_storage::SerializedBagMessage> read_next() override;

Expand Down
2 changes: 1 addition & 1 deletion rosbag2_storage/test/test_read_only_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ rosbag2_storage::BagInfo TestReadOnlyPlugin::info()
return rosbag2_storage::BagInfo();
}

bool TestReadOnlyPlugin::has_next() const
bool TestReadOnlyPlugin::has_next()
{
return true;
}
Expand Down
2 changes: 1 addition & 1 deletion rosbag2_storage/test/test_read_only_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class TestReadOnlyPlugin : public rosbag2_storage::storage_interfaces::ReadOnlyI

rosbag2_storage::BagInfo info() override;

bool has_next() const override;
bool has_next() override;

std::shared_ptr<rosbag2_storage::SerializedBagMessage> read_next() override;
};
Expand Down
Loading

0 comments on commit 3f77b33

Please sign in to comment.