diff --git a/rosbag2/src/rosbag2/rosbag2.cpp b/rosbag2/src/rosbag2/rosbag2.cpp index 2abc6c2e48..fc93e3cc16 100644 --- a/rosbag2/src/rosbag2/rosbag2.cpp +++ b/rosbag2/src/rosbag2/rosbag2.cpp @@ -39,8 +39,7 @@ void Rosbag2::record( std::function after_write_action) { rosbag2_storage::StorageFactory factory; - auto storage = - factory.open_read_write(file_name, "sqlite3"); + auto storage = factory.open_read_write(file_name, "sqlite3"); storage->create_topic(); if (storage) { @@ -50,6 +49,13 @@ void Rosbag2::record( [&storage, after_write_action](std::shared_ptr msg) { auto bag_msg = std::make_shared(); bag_msg->serialized_data = msg; + rcutils_time_point_value_t time_stamp; + int error = rcutils_system_time_now(&time_stamp); + if (error != RCUTILS_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rosbag2", "Error getting current time. Error: %s", rcutils_get_error_string_safe()); + } + bag_msg->time_stamp = time_stamp; storage->write(bag_msg); if (after_write_action) { after_write_action(); @@ -64,8 +70,7 @@ void Rosbag2::record( void Rosbag2::play(const std::string & file_name, const std::string & topic_name) { rosbag2_storage::StorageFactory factory; - auto storage = - factory.open_read_only(file_name, "sqlite3"); + auto storage = factory.open_read_only(file_name, "sqlite3"); if (storage) { auto node = std::make_shared("rosbag_publisher_node"); @@ -73,13 +78,10 @@ void Rosbag2::play(const std::string & file_name, const std::string & topic_name while (storage->has_next()) { auto message = storage->read_next(); - std_msgs::msg::String string_message; - // TODO(Martin-Idel-SI): We don't write a correct serialized_data in sqlite3_storage for now - // Change once available - string_message.data = std::string(message->serialized_data->buffer); // without the sleep_for() many messages are lost. std::this_thread::sleep_for(std::chrono::milliseconds(500)); - publisher->publish(string_message); + publisher->publish(message->serialized_data.get()); + RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, "published message"); } } } diff --git a/rosbag2/test/rosbag2/rosbag2_read_integration_test.cpp b/rosbag2/test/rosbag2/rosbag2_read_integration_test.cpp index ede16b325c..dda2c5b679 100644 --- a/rosbag2/test/rosbag2/rosbag2_read_integration_test.cpp +++ b/rosbag2/test/rosbag2/rosbag2_read_integration_test.cpp @@ -76,34 +76,12 @@ TEST_F(RosBag2IntegrationTestFixture, recorded_messages_are_played) { rclcpp::init(0, nullptr); - std::vector> messages; - for (int i : {1, 2, 3}) { - (void) i; - auto msg = std::make_shared(); - auto payload = new rcutils_char_array_t; - *payload = rcutils_get_zero_initialized_char_array(); - payload->allocator = rcutils_get_default_allocator(); - // TODO(Martin-Idel-SI) The real serialized string message has 8 leading chars in CDR - std::string string_message_to_publish = "bbbbbbbbHello World"; - auto ret = rcutils_char_array_resize(payload, strlen(string_message_to_publish.c_str()) + 1); - if (ret != RCUTILS_RET_OK) { - FAIL() << " Failed to resize serialized bag message"; - } - memcpy(payload->buffer, - string_message_to_publish.c_str(), - strlen(string_message_to_publish.c_str()) + 1); - - msg->serialized_data = std::shared_ptr(payload, - [](rcutils_char_array_t * msg) { - auto error = rcutils_char_array_fini(msg); - delete msg; - if (error != RCUTILS_RET_OK) { - FAIL() << " Failed to destroy serialized bag message"; - } - }); - messages.push_back(msg); - } - ASSERT_NO_THROW(write_messages(database_name_, messages)); + std::vector> messages = + {serialize_message("Hello World 1"), + serialize_message("Hello World 2"), + serialize_message("Hello World 2")}; + + write_messages(database_name_, messages); // Due to a problem related to the subscriber, we play many (3) messages but make the subscriber // node spin only until 2 have arrived. Hence the 2 as `launch_subscriber()` argument. @@ -112,8 +90,8 @@ TEST_F(RosBag2IntegrationTestFixture, recorded_messages_are_played) auto replayed_messages = subscriber_future_.get(); ASSERT_THAT(replayed_messages, SizeIs(2)); - ASSERT_THAT(replayed_messages[0], Eq("Hello World")); - ASSERT_THAT(replayed_messages[1], Eq("Hello World")); + ASSERT_THAT(replayed_messages[0], Eq("Hello World 1")); + ASSERT_THAT(replayed_messages[1], Eq("Hello World 2")); rclcpp::shutdown(); } diff --git a/rosbag2/test/rosbag2/rosbag2_test_fixture.hpp b/rosbag2/test/rosbag2/rosbag2_test_fixture.hpp index c2f1f82644..7905160589 100644 --- a/rosbag2/test/rosbag2/rosbag2_test_fixture.hpp +++ b/rosbag2/test/rosbag2/rosbag2_test_fixture.hpp @@ -22,6 +22,9 @@ #include #include +#include "rclcpp/rclcpp.hpp" +#include "std_msgs/msg/string.hpp" + #ifdef _WIN32 # include # include @@ -107,22 +110,70 @@ class Rosbag2TestFixture : public Test return table_msgs; } - void - write_messages( + void write_messages( const std::string & db_name, std::vector> messages) { rosbag2_storage::StorageFactory factory; - auto storage = - factory.open_read_write(db_name, "sqlite3"); - if (storage == nullptr) { - throw std::runtime_error("failed to open sqlite3 storage"); + auto storage = factory.open_read_write(db_name, "sqlite3"); + + if (storage) { + storage->create_topic(); + for (auto msg : messages) { + storage->write(msg); + } + } + } + + std::shared_ptr serialize_message(std::string message) + { + auto bag_msg = std::make_shared(); + + auto test_message = std::make_shared(); + test_message->data = message; + + auto rcutils_allocator = rcutils_get_default_allocator(); + auto initial_capacity = 8u + static_cast(test_message->data.size()); + auto msg = new rcutils_char_array_t; + *msg = rcutils_get_zero_initialized_char_array(); + auto ret = rcutils_char_array_init(msg, initial_capacity, &rcutils_allocator); + if (ret != RCUTILS_RET_OK) { + throw std::runtime_error("Error allocating resources for serialized message" + + std::to_string(ret)); } - storage->create_topic(); - for (auto msg : messages) { - storage->write(msg); + bag_msg->serialized_data = std::shared_ptr(msg, + [](rcutils_char_array_t * msg) { + int error = rcutils_char_array_fini(msg); + delete msg; + if (error != RCUTILS_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rosbag2", "Leaking memory. Error: %s", rcutils_get_error_string_safe()); + } + }); + + bag_msg->serialized_data->buffer_length = initial_capacity; + + auto string_ts = + rosidl_typesupport_cpp::get_message_type_support_handle(); + + auto error = rmw_serialize(test_message.get(), string_ts, bag_msg->serialized_data.get()); + if (error != RMW_RET_OK) { + throw std::runtime_error("Something went wrong preparing the serialized message"); } + + return bag_msg; + } + + std::string deserialize_message( + std::shared_ptr serialized_message) + { + char * copied = new char[serialized_message->serialized_data->buffer_length]; + auto string_length = serialized_message->serialized_data->buffer_length - 8; + memcpy(copied, &serialized_message->serialized_data->buffer[8], string_length); + std::string message_content(copied); + delete[] copied; + return message_content; } std::string database_name_; diff --git a/rosbag2/test/rosbag2/rosbag2_write_integration_test.cpp b/rosbag2/test/rosbag2/rosbag2_write_integration_test.cpp index 1990391100..1207bd7a3c 100644 --- a/rosbag2/test/rosbag2/rosbag2_write_integration_test.cpp +++ b/rosbag2/test/rosbag2/rosbag2_write_integration_test.cpp @@ -62,15 +62,13 @@ class RosBag2IntegrationTestFixture : public Rosbag2TestFixture rclcpp::shutdown(); } - void publish_messages(std::vector messages) + void publish_messages(std::vector> msgs) { - size_t expected_counter_value = messages.size(); + size_t expected_counter_value = msgs.size(); auto node = std::make_shared("publisher_node"); auto publisher = node->create_publisher("string_topic"); - auto timer = node->create_wall_timer(50ms, [this, publisher, messages]() { - auto msg = std_msgs::msg::String(); - msg.data = messages[counter_]; - publisher->publish(msg); + auto timer = node->create_wall_timer(50ms, [this, publisher, msgs]() { + publisher->publish(msgs[counter_]->serialized_data.get()); }); while (counter_ < expected_counter_value) { @@ -85,12 +83,14 @@ class RosBag2IntegrationTestFixture : public Rosbag2TestFixture TEST_F(RosBag2IntegrationTestFixture, published_messages_are_recorded) { std::string message_to_publish = "test_message"; + auto serialized_message = serialize_message(message_to_publish); start_recording(); - publish_messages({message_to_publish}); + publish_messages({serialized_message}); stop_recording(); auto recorded_messages = get_messages(database_name_); ASSERT_THAT(recorded_messages, Not(IsEmpty())); + EXPECT_THAT(deserialize_message(recorded_messages[0]), Eq(message_to_publish)); } diff --git a/rosbag2_storage/CMakeLists.txt b/rosbag2_storage/CMakeLists.txt index 0d83df02ed..9c9dd7a7e4 100644 --- a/rosbag2_storage/CMakeLists.txt +++ b/rosbag2_storage/CMakeLists.txt @@ -22,6 +22,7 @@ find_package(rcutils REQUIRED) add_library( rosbag2_storage SHARED + src/rosbag2_storage/ros_helper.cpp src/storage_factory.cpp) target_include_directories(rosbag2_storage PUBLIC include) ament_target_dependencies( @@ -75,6 +76,13 @@ if(BUILD_TESTING) target_include_directories(test_storage_factory PRIVATE include) target_link_libraries(test_storage_factory rosbag2_storage) endif() + + ament_add_gmock(test_ros_helper + test/test_ros_helper.cpp) + if(TARGET test_ros_helper) + target_include_directories(test_ros_helper PRIVATE include) + target_link_libraries(test_ros_helper rosbag2_storage) + endif() endif() ament_export_include_directories(include) diff --git a/rosbag2_storage/include/rosbag2_storage/ros_helper.hpp b/rosbag2_storage/include/rosbag2_storage/ros_helper.hpp new file mode 100644 index 0000000000..f97e20fb22 --- /dev/null +++ b/rosbag2_storage/include/rosbag2_storage/ros_helper.hpp @@ -0,0 +1,33 @@ +// Copyright 2018 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_STORAGE__ROS_HELPER_HPP_ +#define ROSBAG2_STORAGE__ROS_HELPER_HPP_ + +#include + +#include "rcutils/types.h" + +#include "rosbag2_storage/visibility_control.hpp" + +namespace rosbag2_storage +{ + +ROSBAG2_STORAGE_PUBLIC +std::shared_ptr +make_serialized_message(const void * data, size_t size); + +} // namespace rosbag2_storage + +#endif // ROSBAG2_STORAGE__ROS_HELPER_HPP_ diff --git a/rosbag2_storage/include/rosbag2_storage/storage_interfaces/base_read_interface.hpp b/rosbag2_storage/include/rosbag2_storage/storage_interfaces/base_read_interface.hpp index 8947b6c845..87242a6413 100644 --- a/rosbag2_storage/include/rosbag2_storage/storage_interfaces/base_read_interface.hpp +++ b/rosbag2_storage/include/rosbag2_storage/storage_interfaces/base_read_interface.hpp @@ -30,7 +30,7 @@ class ROSBAG2_STORAGE_PUBLIC BaseReadInterface public: virtual ~BaseReadInterface() = default; - virtual bool has_next() const = 0; + virtual bool has_next() = 0; virtual std::shared_ptr read_next() = 0; }; diff --git a/rosbag2_storage/src/rosbag2_storage/ros_helper.cpp b/rosbag2_storage/src/rosbag2_storage/ros_helper.cpp new file mode 100644 index 0000000000..90927f2dd8 --- /dev/null +++ b/rosbag2_storage/src/rosbag2_storage/ros_helper.cpp @@ -0,0 +1,56 @@ +// Copyright 2018 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rosbag2_storage/ros_helper.hpp" + +#include +#include + +#include "rcutils/logging_macros.h" +#include "rcutils/types.h" + +namespace rosbag2_storage +{ + +std::shared_ptr +make_serialized_message(const void * data, size_t size) +{ + auto rcutils_allocator = new rcutils_allocator_t; + *rcutils_allocator = rcutils_get_default_allocator(); + auto msg = new rcutils_char_array_t; + *msg = rcutils_get_zero_initialized_char_array(); + auto ret = rcutils_char_array_init(msg, size, rcutils_allocator); + if (ret != RCUTILS_RET_OK) { + throw std::runtime_error("Error allocating resources for serialized message: " + + std::string(rcutils_get_error_string_safe())); + } + + auto serialized_message = std::shared_ptr(msg, + [](rcutils_char_array_t * msg) { + int error = rcutils_char_array_fini(msg); + delete msg; + if (error != RCUTILS_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rosbag2_storage", + "Leaking memory. Error: %s", rcutils_get_error_string_safe()); + } + }); + + memcpy(serialized_message->buffer, data, size); + serialized_message->buffer_length = size; + + return serialized_message; +} + +} // namespace rosbag2_storage diff --git a/rosbag2_storage/test/test_plugin.cpp b/rosbag2_storage/test/test_plugin.cpp index 7c9fa1a2f6..6745cfdd64 100644 --- a/rosbag2_storage/test/test_plugin.cpp +++ b/rosbag2_storage/test/test_plugin.cpp @@ -45,7 +45,7 @@ rosbag2_storage::BagInfo TestPlugin::info() return rosbag2_storage::BagInfo(); } -bool TestPlugin::has_next() const +bool TestPlugin::has_next() { return true; } diff --git a/rosbag2_storage/test/test_plugin.hpp b/rosbag2_storage/test/test_plugin.hpp index 5df537eff0..12e2b4e7b5 100644 --- a/rosbag2_storage/test/test_plugin.hpp +++ b/rosbag2_storage/test/test_plugin.hpp @@ -32,7 +32,7 @@ class TestPlugin : public rosbag2_storage::storage_interfaces::ReadWriteInterfac rosbag2_storage::BagInfo info() override; - bool has_next() const override; + bool has_next() override; std::shared_ptr read_next() override; diff --git a/rosbag2_storage/test/test_read_only_plugin.cpp b/rosbag2_storage/test/test_read_only_plugin.cpp index d3360d1c64..ddcdf99f78 100644 --- a/rosbag2_storage/test/test_read_only_plugin.cpp +++ b/rosbag2_storage/test/test_read_only_plugin.cpp @@ -41,7 +41,7 @@ rosbag2_storage::BagInfo TestReadOnlyPlugin::info() return rosbag2_storage::BagInfo(); } -bool TestReadOnlyPlugin::has_next() const +bool TestReadOnlyPlugin::has_next() { return true; } diff --git a/rosbag2_storage/test/test_read_only_plugin.hpp b/rosbag2_storage/test/test_read_only_plugin.hpp index c1963f0d75..cc8ee5b273 100644 --- a/rosbag2_storage/test/test_read_only_plugin.hpp +++ b/rosbag2_storage/test/test_read_only_plugin.hpp @@ -29,7 +29,7 @@ class TestReadOnlyPlugin : public rosbag2_storage::storage_interfaces::ReadOnlyI rosbag2_storage::BagInfo info() override; - bool has_next() const override; + bool has_next() override; std::shared_ptr read_next() override; }; diff --git a/rosbag2_storage/test/test_ros_helper.cpp b/rosbag2_storage/test/test_ros_helper.cpp new file mode 100644 index 0000000000..503549e1c1 --- /dev/null +++ b/rosbag2_storage/test/test_ros_helper.cpp @@ -0,0 +1,34 @@ +// Copyright 2018 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "rosbag2_storage/ros_helper.hpp" + +using namespace ::testing; // NOLINT + +TEST(ros_helper, make_serialized_message_contains_correct_data) { + double data_value = 3.14; + auto data = new double; + *data = data_value; + auto size = sizeof(double); + + auto serialized_message = rosbag2_storage::make_serialized_message(data, size); + delete data; + + ASSERT_THAT(serialized_message->buffer_length, Eq(size)); + ASSERT_THAT(serialized_message->buffer_capacity, Eq(size)); + ASSERT_THAT(reinterpret_cast(serialized_message->buffer), Pointee(data_value)); +} diff --git a/rosbag2_storage_default_plugins/CMakeLists.txt b/rosbag2_storage_default_plugins/CMakeLists.txt index 722f4d8fb7..aa15715bcc 100644 --- a/rosbag2_storage_default_plugins/CMakeLists.txt +++ b/rosbag2_storage_default_plugins/CMakeLists.txt @@ -23,16 +23,13 @@ find_package(sqlite3_vendor REQUIRED) find_package(SQLite3 REQUIRED) # provided by sqlite3_vendor add_library(${PROJECT_NAME} SHARED - src/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper.hpp src/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper.cpp - src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp - src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp) + src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp + src/rosbag2_storage_default_plugins/sqlite/sqlite_statement_wrapper.cpp) ament_target_dependencies(${PROJECT_NAME} rosbag2_storage - rclcpp rcutils - std_msgs SQLite3 pluginlib) @@ -56,17 +53,29 @@ if(BUILD_TESTING) find_package(ament_lint_auto REQUIRED) ament_lint_auto_find_test_dependencies() + ament_add_gmock(sqlite_wrapper_integration_test + test/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper_integration_test.cpp + src/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper.cpp + src/rosbag2_storage_default_plugins/sqlite/sqlite_statement_wrapper.cpp + WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) + if(TARGET sqlite_wrapper_integration_test) + ament_target_dependencies(sqlite_wrapper_integration_test + rosbag2_storage + rcutils + SQLite3 + pluginlib) + endif() + ament_add_gmock(sqlite_storage_integration_test test/rosbag2_storage_default_plugins/sqlite/sqlite_storage_integration_test.cpp src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp src/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper.cpp + src/rosbag2_storage_default_plugins/sqlite/sqlite_statement_wrapper.cpp WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}) if(TARGET sqlite_storage_integration_test) ament_target_dependencies(sqlite_storage_integration_test rosbag2_storage - rclcpp rcutils - std_msgs SQLite3 pluginlib) endif() diff --git a/rosbag2_storage_default_plugins/package.xml b/rosbag2_storage_default_plugins/package.xml index 92630f0123..bd9d5e6e95 100644 --- a/rosbag2_storage_default_plugins/package.xml +++ b/rosbag2_storage_default_plugins/package.xml @@ -9,23 +9,17 @@ ament_cmake sqlite3_vendor - rclcpp - std_msgs rcutils rosbag2_storage sqlite3_vendor rosbag2_storage - rclcpp rcutils - std_msgs ament_lint_auto ament_lint_common ament_cmake_gmock ament_cmake_gtest - rclcpp - std_msgs ament_cmake diff --git a/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/mock_sqlite_wrapper.hpp b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_exception.hpp similarity index 54% rename from rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/mock_sqlite_wrapper.hpp rename to rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_exception.hpp index e5d14c5941..1755ed6f72 100644 --- a/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/mock_sqlite_wrapper.hpp +++ b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_exception.hpp @@ -12,21 +12,22 @@ // See the License for the specific language governing permissions and // limitations under the License. -#ifndef ROSBAG2_STORAGE_DEFAULT_PLUGINS__SQLITE__MOCK_SQLITE_WRAPPER_HPP_ -#define ROSBAG2_STORAGE_DEFAULT_PLUGINS__SQLITE__MOCK_SQLITE_WRAPPER_HPP_ - -#include +#ifndef ROSBAG2_STORAGE_DEFAULT_PLUGINS__SQLITE__SQLITE_EXCEPTION_HPP_ +#define ROSBAG2_STORAGE_DEFAULT_PLUGINS__SQLITE__SQLITE_EXCEPTION_HPP_ +#include #include -#include -#include "../../../src/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper.hpp" +namespace rosbag2_storage_plugins +{ -class MockSqliteWrapper : public rosbag2_storage_plugins::SqliteWrapper +class SqliteException : public std::runtime_error { public: - MOCK_METHOD1(execute_query, void(const std::string &)); - MOCK_METHOD2(get_message, bool(std::string &, size_t)); + explicit SqliteException(const std::string & message) + : runtime_error(message) {} }; -#endif // ROSBAG2_STORAGE_DEFAULT_PLUGINS__SQLITE__MOCK_SQLITE_WRAPPER_HPP_ +} // namespace rosbag2_storage_plugins + +#endif // ROSBAG2_STORAGE_DEFAULT_PLUGINS__SQLITE__SQLITE_EXCEPTION_HPP_ diff --git a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_statement_wrapper.cpp b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_statement_wrapper.cpp new file mode 100644 index 0000000000..87ade3e26e --- /dev/null +++ b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_statement_wrapper.cpp @@ -0,0 +1,160 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "sqlite_statement_wrapper.hpp" + +#include +#include +#include +#include +#include + +#include "rcutils/logging_macros.h" +#include "rosbag2_storage/ros_helper.hpp" + +#include "sqlite_exception.hpp" + +namespace rosbag2_storage_plugins +{ + +SqliteStatementWrapper::SqliteStatementWrapper(sqlite3 * database, std::string query) +{ + sqlite3_stmt * statement; + int return_code = sqlite3_prepare_v2(database, query.c_str(), -1, &statement, nullptr); + if (return_code != SQLITE_OK) { + throw SqliteException("SQL error when preparing statement '" + query + "' with return code: " + + std::to_string(return_code)); + } + + statement_ = statement; + last_bound_parameter_index_ = 0; +} + +SqliteStatementWrapper::~SqliteStatementWrapper() +{ + if (statement_) { + sqlite3_finalize(statement_); + } +} + +std::shared_ptr SqliteStatementWrapper::execute_and_reset() +{ + int return_code = sqlite3_step(statement_); + if (return_code != SQLITE_OK && return_code != SQLITE_DONE) { + throw SqliteException("Error processing SQLite statement."); + } + return reset(); +} + +std::shared_ptr SqliteStatementWrapper::bind(int value) +{ + auto return_code = sqlite3_bind_int(statement_, ++last_bound_parameter_index_, value); + check_and_report_bind_error(return_code, value); + return shared_from_this(); +} + +std::shared_ptr +SqliteStatementWrapper::bind(rcutils_time_point_value_t value) +{ + auto return_code = sqlite3_bind_int64(statement_, ++last_bound_parameter_index_, value); + check_and_report_bind_error(return_code, value); + return shared_from_this(); +} + +std::shared_ptr SqliteStatementWrapper::bind(double value) +{ + auto return_code = sqlite3_bind_double(statement_, ++last_bound_parameter_index_, value); + check_and_report_bind_error(return_code, value); + return shared_from_this(); +} + +std::shared_ptr SqliteStatementWrapper::bind(std::string value) +{ + auto return_code = sqlite3_bind_text( + statement_, ++last_bound_parameter_index_, value.c_str(), -1, SQLITE_TRANSIENT); + check_and_report_bind_error(return_code, value); + return shared_from_this(); +} + +std::shared_ptr +SqliteStatementWrapper::bind(std::shared_ptr value) +{ + written_blobs_cache_.push_back(value); + auto return_code = sqlite3_bind_blob( + statement_, ++last_bound_parameter_index_, + value->buffer, static_cast(value->buffer_length), SQLITE_STATIC); + check_and_report_bind_error(return_code); + return shared_from_this(); +} + +std::shared_ptr SqliteStatementWrapper::reset() +{ + sqlite3_reset(statement_); + sqlite3_clear_bindings(statement_); + last_bound_parameter_index_ = 0; + written_blobs_cache_.clear(); + return shared_from_this(); +} + +bool SqliteStatementWrapper::step() +{ + int return_code = sqlite3_step(statement_); + if (return_code == SQLITE_ROW) { + return true; + } else if (return_code == SQLITE_DONE) { + return false; + } else { + throw SqliteException("Error reading query result: " + std::to_string(return_code)); + } +} + +void SqliteStatementWrapper::obtain_column_value(size_t index, int & value) const +{ + value = sqlite3_column_int(statement_, static_cast(index)); +} + +void +SqliteStatementWrapper::obtain_column_value(size_t index, rcutils_time_point_value_t & value) const +{ + value = sqlite3_column_int64(statement_, static_cast(index)); +} + +void SqliteStatementWrapper::obtain_column_value(size_t index, double & value) const +{ + value = sqlite3_column_double(statement_, static_cast(index)); +} + +void SqliteStatementWrapper::obtain_column_value(size_t index, std::string & value) const +{ + value = reinterpret_cast(sqlite3_column_text(statement_, static_cast(index))); +} + +void SqliteStatementWrapper::obtain_column_value( + size_t index, std::shared_ptr & value) const +{ + auto data = sqlite3_column_blob(statement_, static_cast(index)); + auto size = static_cast(sqlite3_column_bytes(statement_, static_cast(index))); + value = rosbag2_storage::make_serialized_message(data, size); +} + +void SqliteStatementWrapper::check_and_report_bind_error(int return_code) +{ + if (return_code != SQLITE_OK) { + throw SqliteException("SQLite error when binding parameter " + + std::to_string(last_bound_parameter_index_) + ". Return code: " + + std::to_string(return_code)); + } +} + +} // namespace rosbag2_storage_plugins diff --git a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_statement_wrapper.hpp b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_statement_wrapper.hpp new file mode 100644 index 0000000000..8ac0b92fa5 --- /dev/null +++ b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_statement_wrapper.hpp @@ -0,0 +1,204 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef ROSBAG2_STORAGE_DEFAULT_PLUGINS__SQLITE__SQLITE_STATEMENT_WRAPPER_HPP_ +#define ROSBAG2_STORAGE_DEFAULT_PLUGINS__SQLITE__SQLITE_STATEMENT_WRAPPER_HPP_ + +#include + +#include +#include +#include +#include +#include +#include + +#include "rosbag2_storage/serialized_bag_message.hpp" +#include "sqlite_exception.hpp" + +namespace rosbag2_storage_plugins +{ + +class SqliteStatementWrapper : public std::enable_shared_from_this +{ +public: + SqliteStatementWrapper(sqlite3 * database, std::string query); + SqliteStatementWrapper(const SqliteStatementWrapper &) = delete; + SqliteStatementWrapper & operator=(const SqliteStatementWrapper &) = delete; + ~SqliteStatementWrapper(); + + template + class QueryResult + { +public: + using RowType = std::tuple; + class Iterator : public std::iterator + { +public: + static const int POSITION_END = -1; + Iterator(std::shared_ptr statement, int position) + : statement_(statement), next_row_idx_(position) + { + if (next_row_idx_ != POSITION_END) { + if (statement_->step()) { + ++next_row_idx_; + } else { + next_row_idx_ = POSITION_END; + } + } + } + + Iterator & operator++() + { + if (next_row_idx_ != POSITION_END) { + if (statement_->step()) { + ++next_row_idx_; + } else { + next_row_idx_ = POSITION_END; + } + return *this; + } else { + throw SqliteException("Cannot increment result iterator beyond result set!"); + } + } + Iterator operator++(int) + { + auto old_value = *this; + ++(*this); + return old_value; + } + + RowType operator*() const + { + RowType row{}; + obtain_row_values(row); + return row; + } + + bool operator==(Iterator other) const + { + return statement_ == other.statement_ && next_row_idx_ == other.next_row_idx_; + } + bool operator!=(Iterator other) const + { + return !(*this == other); + } + +private: + template> + void obtain_row_values(RowType & row) const + { + obtain_row_values_impl(row, Indices{}); + } + + template> + void obtain_row_values_impl(RowType & row, std::index_sequence) const + { + statement_->obtain_column_value(I, std::get(row)); + obtain_row_values_impl(row, RemainingIndices{}); + } + void obtain_row_values_impl(RowType &, std::index_sequence<>) const {} // end of recursion + + std::shared_ptr statement_; + int next_row_idx_; + }; + + explicit QueryResult(std::shared_ptr statement) + : statement_(statement) + {} + + Iterator begin() + { + return Iterator(statement_, 0); + } + Iterator end() + { + return Iterator(statement_, Iterator::POSITION_END); + } + +private: + std::shared_ptr statement_; + }; + + std::shared_ptr execute_and_reset(); + template + QueryResult execute_query(); + + template + std::shared_ptr bind(T1 value1, T2 value2, Params ... values); + std::shared_ptr bind(int value); + std::shared_ptr bind(rcutils_time_point_value_t value); + std::shared_ptr bind(double value); + std::shared_ptr bind(std::string value); + std::shared_ptr bind(std::shared_ptr value); + + std::shared_ptr reset(); + +private: + bool step(); + + void obtain_column_value(size_t index, int & value) const; + void obtain_column_value(size_t index, rcutils_time_point_value_t & value) const; + void obtain_column_value(size_t index, double & value) const; + void obtain_column_value(size_t index, std::string & value) const; + void obtain_column_value(size_t index, std::shared_ptr & value) const; + + template + void check_and_report_bind_error(int return_code, T value); + void check_and_report_bind_error(int return_code); + + sqlite3_stmt * statement_; + int last_bound_parameter_index_; + std::vector> written_blobs_cache_; +}; + +template +inline +std::shared_ptr +SqliteStatementWrapper::bind(T1 value1, T2 value2, Params ... values) +{ + bind(value1); + return bind(value2, values ...); +} + +template<> +inline +void SqliteStatementWrapper::check_and_report_bind_error(int return_code, std::string value) +{ + if (return_code != SQLITE_OK) { + throw SqliteException("SQLite error when binding parameter " + + std::to_string(last_bound_parameter_index_) + " to value '" + value + + "'. Return code: " + std::to_string(return_code)); + } +} + +template +inline +void SqliteStatementWrapper::check_and_report_bind_error(int return_code, T value) +{ + check_and_report_bind_error(return_code, std::to_string(value)); +} + +template +inline +SqliteStatementWrapper::QueryResult SqliteStatementWrapper::execute_query() +{ + return QueryResult(shared_from_this()); +} + +using SqliteStatement = std::shared_ptr; + +} // namespace rosbag2_storage_plugins + +#endif // ROSBAG2_STORAGE_DEFAULT_PLUGINS__SQLITE__SQLITE_STATEMENT_WRAPPER_HPP_ diff --git a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp index c2f96088a2..a33fc812f3 100644 --- a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp +++ b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.cpp @@ -15,6 +15,7 @@ #include "sqlite_storage.hpp" #include +#include #include #include #include @@ -28,16 +29,15 @@ namespace rosbag2_storage_plugins const char * ROS_PACKAGE_NAME = "rosbag2_storage_default_plugins"; SqliteStorage::SqliteStorage() -: database_(), counter_(0), bag_info_() -{} - -SqliteStorage::SqliteStorage(std::shared_ptr database) -: database_(std::move(database)), counter_(0) +: database_(), bag_info_(), write_statement_(nullptr), read_statement_(nullptr), + message_result_(nullptr), + current_message_row_(nullptr, SqliteStatementWrapper::QueryResult<>::Iterator::POSITION_END) {} void SqliteStorage::open( const std::string & uri, rosbag2_storage::storage_interfaces::IOFlag io_flag) { + // TODO(MARTIN-IDEL-SI): use flags to open database for read-only (void) io_flag; try { database_ = std::make_unique(uri); @@ -51,50 +51,35 @@ void SqliteStorage::open( void SqliteStorage::write(std::shared_ptr message) { - // TODO(Martin-Idel-SI) The real serialized string message has 8 leading chars in CDR - std::string msg(&message->serialized_data->buffer[8]); - std::string insert_message = - "INSERT INTO messages (data, timestamp) VALUES ('" + msg + "', strftime('%s%f','now'))"; - database_->execute_query(insert_message); + if (!write_statement_) { + prepare_for_writing(); + } + + write_statement_->bind(message->serialized_data, message->time_stamp); + write_statement_->execute_and_reset(); - RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, "Stored message"); + RCUTILS_LOG_INFO_NAMED(ROS_PACKAGE_NAME, "Stored message."); } -bool SqliteStorage::has_next() const +bool SqliteStorage::has_next() { - // TODO(Martin-Idel-SI): improve sqlite_wrapper interface - std::string message; - return database_->get_message(message, counter_); + if (!read_statement_) { + prepare_for_reading(); + } + + return current_message_row_ != message_result_.end(); } std::shared_ptr SqliteStorage::read_next() { - // TODO(Martin-Idel-SI): improve sqlite_wrapper interface - std::string message; - database_->get_message(message, counter_++); - auto payload = new rcutils_char_array_t; - *payload = rcutils_get_zero_initialized_char_array(); - auto allocator = new rcutils_allocator_t; - *allocator = rcutils_get_default_allocator(); - auto ret = rcutils_char_array_init(payload, strlen(message.c_str()) + 1, allocator); - if (ret != RCUTILS_RET_OK) { - throw std::runtime_error(std::string(" Failed to allocate serialized bag message ") + - rcutils_get_error_string_safe()); + if (!read_statement_) { + prepare_for_reading(); } - memcpy(payload->buffer, message.c_str(), strlen(message.c_str()) + 1); - - auto msg = std::make_shared(); - msg->time_stamp = 0; - msg->serialized_data = std::shared_ptr(payload, - [](rcutils_char_array_t * msg) { - auto error = rcutils_char_array_fini(msg); - delete msg; - if (error != RCUTILS_RET_OK) { - RCUTILS_LOG_ERROR_NAMED("rosbag2_storage_default_plugins", - " Failed to destroy serialized bag message %s", rcutils_get_error_string_safe()); - } - }); - return msg; + + auto bag_message = std::make_shared(); + std::tie(bag_message->serialized_data, bag_message->time_stamp) = *current_message_row_; + ++current_message_row_; + return bag_message; } void SqliteStorage::initialize() @@ -102,9 +87,9 @@ void SqliteStorage::initialize() std::string create_table = "CREATE TABLE messages(" \ "id INTEGER PRIMARY KEY AUTOINCREMENT," \ "data BLOB NOT NULL," \ - "timestamp INT NOT NULL);"; + "timestamp INTEGER NOT NULL);"; - database_->execute_query(create_table); + database_->prepare_statement(create_table)->execute_and_reset(); } rosbag2_storage::BagInfo SqliteStorage::info() @@ -117,6 +102,21 @@ void SqliteStorage::create_topic() initialize(); } +void SqliteStorage::prepare_for_writing() +{ + write_statement_ = database_->prepare_statement( + "INSERT INTO messages (data, timestamp) VALUES (?, ?);"); +} + +void SqliteStorage::prepare_for_reading() +{ + read_statement_ = + database_->prepare_statement("SELECT data, timestamp FROM messages ORDER BY id;"); + message_result_ = read_statement_->execute_query< + std::shared_ptr, rcutils_time_point_value_t>(); + current_message_row_ = message_result_.begin(); +} + } // namespace rosbag2_storage_plugins diff --git a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp index 6a5375ca83..6f9d7733ee 100644 --- a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp +++ b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp @@ -15,13 +15,14 @@ #ifndef ROSBAG2_STORAGE_DEFAULT_PLUGINS__SQLITE__SQLITE_STORAGE_HPP_ #define ROSBAG2_STORAGE_DEFAULT_PLUGINS__SQLITE__SQLITE_STORAGE_HPP_ -#include #include +#include #include -#include "sqlite_wrapper.hpp" - +#include "rcutils/types.h" #include "rosbag2_storage/storage_interfaces/read_write_interface.hpp" +#include "rosbag2_storage/serialized_bag_message.hpp" +#include "sqlite_wrapper.hpp" namespace rosbag2_storage_plugins { @@ -30,7 +31,6 @@ class SqliteStorage : public rosbag2_storage::storage_interfaces::ReadWriteInter { public: SqliteStorage(); - explicit SqliteStorage(std::shared_ptr database); ~SqliteStorage() override = default; void open( @@ -42,7 +42,7 @@ class SqliteStorage : public rosbag2_storage::storage_interfaces::ReadWriteInter void write(std::shared_ptr message) override; - bool has_next() const override; + bool has_next() override; std::shared_ptr read_next() override; @@ -50,10 +50,17 @@ class SqliteStorage : public rosbag2_storage::storage_interfaces::ReadWriteInter private: void initialize(); + void prepare_for_writing(); + void prepare_for_reading(); std::shared_ptr database_; - size_t counter_; rosbag2_storage::BagInfo bag_info_; + SqliteStatement write_statement_; + SqliteStatement read_statement_; + SqliteStatementWrapper::QueryResult, + rcutils_time_point_value_t> message_result_; + SqliteStatementWrapper::QueryResult, + rcutils_time_point_value_t>::Iterator current_message_row_; }; } // namespace rosbag2_storage_plugins diff --git a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper.cpp b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper.cpp index b92b002f0d..7adfbb26d7 100644 --- a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper.cpp +++ b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper.cpp @@ -14,12 +14,16 @@ #include "sqlite_wrapper.hpp" -#include #include -#include #include +#include #include +#include "rcutils/types.h" +#include "rosbag2_storage/serialized_bag_message.hpp" + +#include "sqlite_exception.hpp" + namespace rosbag2_storage_plugins { @@ -40,39 +44,14 @@ SqliteWrapper::~SqliteWrapper() sqlite3_close(db_ptr); } -void SqliteWrapper::execute_query(const std::string & query) +SqliteStatement SqliteWrapper::prepare_statement(std::string query) { - char * error_msg = nullptr; - int return_code = sqlite3_exec(db_ptr, query.c_str(), nullptr, nullptr, &error_msg); - - if (return_code != SQLITE_OK) { - auto error = "SQL error: " + std::string(error_msg); - sqlite3_free(error_msg); - throw SqliteException(error); - } + return std::make_shared(db_ptr, query); } -bool SqliteWrapper::get_message(std::string & message, size_t index) +size_t SqliteWrapper::get_last_insert_id() { - std::string offset = std::to_string(index); - sqlite3_stmt * statement; - std::string query = "SELECT * FROM messages LIMIT 1 OFFSET " + offset; - - int return_code = sqlite3_prepare_v2(db_ptr, query.c_str(), -1, &statement, nullptr); - if (return_code != SQLITE_OK) { - throw SqliteException("SQL error when preparing statement '" + query + "'with return code: " + - std::to_string(return_code)); - } - - int result = sqlite3_step(statement); - if (result == SQLITE_ROW) { - message = std::string(reinterpret_cast(sqlite3_column_text(statement, 1))); - sqlite3_finalize(statement); - return true; - } else { - sqlite3_finalize(statement); - return false; - } + return sqlite3_last_insert_rowid(db_ptr); } SqliteWrapper::operator bool() diff --git a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper.hpp b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper.hpp index 8cbdead29b..c14e6ec385 100644 --- a/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper.hpp +++ b/rosbag2_storage_default_plugins/src/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper.hpp @@ -16,19 +16,17 @@ #define ROSBAG2_STORAGE_DEFAULT_PLUGINS__SQLITE__SQLITE_WRAPPER_HPP_ #include -#include + +#include #include #include -namespace rosbag2_storage_plugins -{ +#include "rcutils/types.h" +#include "rosbag2_storage/serialized_bag_message.hpp" +#include "sqlite_statement_wrapper.hpp" -class SqliteException : public std::runtime_error +namespace rosbag2_storage_plugins { -public: - explicit SqliteException(const std::string & message) - : runtime_error(message) {} -}; using DBPtr = sqlite3 *; @@ -37,13 +35,13 @@ class SqliteWrapper public: explicit SqliteWrapper(const std::string & uri); SqliteWrapper(); - virtual ~SqliteWrapper(); + ~SqliteWrapper(); - virtual void execute_query(const std::string & query); + SqliteStatement prepare_statement(std::string query); - virtual bool get_message(std::string & message, size_t index); + size_t get_last_insert_id(); - virtual operator bool(); + operator bool(); private: DBPtr db_ptr; diff --git a/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_storage_integration_test.cpp b/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_storage_integration_test.cpp index d989383584..ea9f074770 100644 --- a/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_storage_integration_test.cpp +++ b/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_storage_integration_test.cpp @@ -16,18 +16,82 @@ #include #include +#include #include +#include "rcutils/snprintf.h" + #include "storage_test_fixture.hpp" using namespace ::testing; // NOLINT TEST_F(StorageTestFixture, string_messages_are_written_and_read_to_and_from_sqlite3_storage) { - std::vector string_messages = {"test_message 1", "test_message 2", "test_message 3"}; + std::vector> string_messages = + {std::make_pair("first message", 1), + std::make_pair("second message", 2), + std::make_pair("third message", 3)}; write_messages_to_sqlite(string_messages); auto read_messages = read_all_messages_from_sqlite(); ASSERT_THAT(read_messages, SizeIs(3)); - EXPECT_THAT(std::string(read_messages[0]->serialized_data->buffer), Eq(string_messages[0])); + EXPECT_THAT(deserialize_message(read_messages[0]->serialized_data), Eq(string_messages[0].first)); + EXPECT_THAT(deserialize_message(read_messages[1]->serialized_data), Eq(string_messages[1].first)); + EXPECT_THAT(deserialize_message(read_messages[2]->serialized_data), Eq(string_messages[2].first)); +} + +TEST_F(StorageTestFixture, message_roundtrip_with_arbitrary_char_array_works_correctly) { + std::string message = "test_message"; + size_t message_size = strlen(message.c_str()) + 1; + char * test_message = new char[message_size]; + memcpy(test_message, message.c_str(), message_size); + + std::unique_ptr read_write_storage = + std::make_unique(); + read_write_storage->open(database_name_); + read_write_storage->create_topic(); + + auto test = std::make_shared(); + test->serialized_data = std::make_shared(); + test->serialized_data->buffer = test_message; + test->serialized_data->buffer_length = message_size; + test->serialized_data->buffer_capacity = message_size; + + read_write_storage->write(test); + + auto read_message = read_write_storage->read_next(); + EXPECT_THAT(strcmp(read_message->serialized_data->buffer, test_message), Eq(0)); + + delete[] test_message; +} + +TEST_F(StorageTestFixture, has_next_return_false_if_there_are_no_more_messages) { + std::vector> string_messages = + {std::make_pair("first message", 1), std::make_pair("second message", 2)}; + + write_messages_to_sqlite(string_messages); + std::unique_ptr readable_storage = + std::make_unique(); + readable_storage->open(database_name_); + + EXPECT_TRUE(readable_storage->has_next()); + readable_storage->read_next(); + EXPECT_TRUE(readable_storage->has_next()); + readable_storage->read_next(); + EXPECT_FALSE(readable_storage->has_next()); +} + +TEST_F(StorageTestFixture, write_stamped_char_array_writes_correct_time_stamp) { + std::vector> string_messages = + {std::make_pair("first message", 1), std::make_pair("second message", 2)}; + + write_messages_to_sqlite(string_messages); + std::unique_ptr readable_storage = + std::make_unique(); + readable_storage->open(database_name_); + + auto read_message = readable_storage->read_next(); + EXPECT_THAT(read_message->time_stamp, Eq(1)); + read_message = readable_storage->read_next(); + EXPECT_THAT(read_message->time_stamp, Eq(2)); } diff --git a/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper_integration_test.cpp b/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper_integration_test.cpp new file mode 100644 index 0000000000..3e531c6405 --- /dev/null +++ b/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper_integration_test.cpp @@ -0,0 +1,115 @@ +// Copyright 2018, Bosch Software Innovations GmbH. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "rcutils/types.h" + +#include "../../../src/rosbag2_storage_default_plugins/sqlite/sqlite_wrapper.hpp" + +#include "storage_test_fixture.hpp" + +using namespace ::testing; // NOLINT + +TEST_F(StorageTestFixture, querying_empty_adhoc_results_gives_empty_query_result) { + auto db = rosbag2_storage_plugins::SqliteWrapper{database_name_}; + + auto result = db.prepare_statement("SELECT 1 WHERE 1=2;")->execute_query(); + + ASSERT_THAT(result.begin(), Eq(result.end())); +} + +TEST_F(StorageTestFixture, querying_adhoc_results_with_normal_data_gives_content) { + auto db = rosbag2_storage_plugins::SqliteWrapper{database_name_}; + + auto result = + db.prepare_statement("SELECT 1, 1.465, 'abc';")->execute_query(); + auto row = *(result.begin()); + + ASSERT_THAT(std::get<0>(row), Eq(1)); + ASSERT_THAT(std::get<1>(row), Eq(1.465)); + ASSERT_THAT(std::get<2>(row), StrEq("abc")); +} + +TEST_F(StorageTestFixture, bind_values_are_inserted) { + auto db = rosbag2_storage_plugins::SqliteWrapper{database_name_}; + db.prepare_statement("CREATE TABLE test (int_col INTEGER, double_col FLOAT, string_col TEXT);") + ->execute_and_reset(); + + db.prepare_statement("INSERT INTO test (int_col, double_col, string_col) VALUES (?, ?, ?);") + ->bind(1, 3.14, "abc")->execute_and_reset(); + + auto row_iter = db.prepare_statement( + "SELECT COUNT(*) FROM test WHERE int_col = 1 AND double_col = 3.14 AND string_col = 'abc';") + ->execute_query().begin(); + ASSERT_THAT(std::get<0>(*row_iter), Eq(1)); +} + +TEST_F(StorageTestFixture, reuse_prepared_statement) { + auto db = rosbag2_storage_plugins::SqliteWrapper{database_name_}; + db.prepare_statement("CREATE TABLE test (col INTEGER);")->execute_and_reset(); + + db.prepare_statement("INSERT INTO test (col) VALUES (?);") + ->bind(1)->execute_and_reset()->bind(2)->execute_and_reset(); + + auto row_iter = db.prepare_statement("SELECT COUNT(*) FROM test;")->execute_query().begin(); + ASSERT_THAT(std::get<0>(*row_iter), Eq(2)); +} + +TEST_F(StorageTestFixture, all_result_rows_are_available) { + auto db = rosbag2_storage_plugins::SqliteWrapper{database_name_}; + db.prepare_statement("CREATE TABLE test (col INTEGER);")->execute_and_reset(); + db.prepare_statement("INSERT INTO test (col) VALUES (1);")->execute_and_reset(); + db.prepare_statement("INSERT INTO test (col) VALUES (2);")->execute_and_reset(); + db.prepare_statement("INSERT INTO test (col) VALUES (3);")->execute_and_reset(); + + auto result = + db.prepare_statement("SELECT col FROM test ORDER BY col ASC;")->execute_query(); + + // range-for access + int row_value = 1; + for (auto row : result) { + ASSERT_THAT(std::get<0>(row), Eq(row_value++)); + } + ASSERT_THAT(row_value, Eq(4)); + + // iterator access + row_value = 1; + auto row_iterator = result.begin(); + while (row_iterator != result.end()) { + ASSERT_THAT(std::get<0>(*row_iterator), Eq(row_value++)); + ++row_iterator; + } + ASSERT_THAT(row_value, Eq(4)); +} + +TEST_F(StorageTestFixture, ros_specific_types_are_supported_for_reading_and_writing) { + auto db = rosbag2_storage_plugins::SqliteWrapper{database_name_}; + db.prepare_statement("CREATE TABLE test (timestamp INTEGER, data BLOB);")->execute_and_reset(); + rcutils_time_point_value_t time = 1099511627783; + auto msg_content = "message"; + std::shared_ptr message = make_serialized_message(msg_content); + + db.prepare_statement("INSERT INTO test (timestamp, data) VALUES (?, ?);") + ->bind(time, message)->execute_and_reset(); + auto row_iter = db.prepare_statement("SELECT timestamp, data FROM test") + ->execute_query>().begin(); + + ASSERT_THAT(std::get<0>(*row_iter), Eq(time)); + ASSERT_THAT(deserialize_message(std::get<1>(*row_iter)), StrEq(msg_content)); +} diff --git a/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/storage_test_fixture.hpp b/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/storage_test_fixture.hpp index 7df2db41d0..911ec0fd5d 100644 --- a/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/storage_test_fixture.hpp +++ b/rosbag2_storage_default_plugins/test/rosbag2_storage_default_plugins/sqlite/storage_test_fixture.hpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #ifdef _WIN32 @@ -28,6 +29,8 @@ # include #endif +#include "rcutils/logging_macros.h" +#include "rcutils/snprintf.h" #include "../../../src/rosbag2_storage_default_plugins/sqlite/sqlite_storage.hpp" using namespace ::testing; // NOLINT @@ -88,35 +91,60 @@ class StorageTestFixture : public Test #endif } - void write_messages_to_sqlite(std::vector messages) + std::shared_ptr make_serialized_message(std::string message) + { + int message_size = get_buffer_capacity(message); + message_size++; // need to account for terminating null character + assert(message_size > 0); + + auto rcutils_allocator = rcutils_get_default_allocator(); + auto msg = new rcutils_char_array_t; + *msg = rcutils_get_zero_initialized_char_array(); + auto ret = rcutils_char_array_init(msg, message_size, &rcutils_allocator); + if (ret != RCUTILS_RET_OK) { + throw std::runtime_error("Error allocating resources " + std::to_string(ret)); + } + + auto serialized_data = std::shared_ptr(msg, + [](rcutils_char_array_t * msg) { + int error = rcutils_char_array_fini(msg); + delete msg; + if (error != RCUTILS_RET_OK) { + RCUTILS_LOG_ERROR_NAMED( + "rosbag2_storage_default_plugins", "Leaking memory %i", error); + } + }); + + serialized_data->buffer_length = message_size; + int written_size = write_data_to_serialized_string_message( + serialized_data->buffer, serialized_data->buffer_capacity, message); + + assert(written_size == message_size - 1); // terminated null character not counted + return serialized_data; + } + + std::string deserialize_message(std::shared_ptr serialized_message) + { + char * copied = new char[serialized_message->buffer_length]; + auto string_length = serialized_message->buffer_length - 8; + memcpy(copied, &serialized_message->buffer[8], string_length); + std::string message_content(copied); + delete[] copied; + return message_content; + } + + void write_messages_to_sqlite(std::vector> messages) { std::unique_ptr writable_storage = std::make_unique(); writable_storage->open(database_name_); writable_storage->create_topic(); - for (auto message : messages) { - auto msg = std::make_shared(); - auto payload = new rcutils_char_array_t; - *payload = rcutils_get_zero_initialized_char_array(); - payload->allocator = rcutils_get_default_allocator(); - auto ret = rcutils_char_array_resize(payload, 8 + strlen(message.c_str()) + 1); - if (ret != RCUTILS_RET_OK) { - FAIL() << " Failed to resize serialized bag message"; - } - // TODO(Martin-Idel-SI) The real serialized string message has 8 leading chars in CDR - std::string full_message = "bbbbbbbb" + message; - memcpy(payload->buffer, full_message.c_str(), strlen(full_message.c_str()) + 1); - - msg->serialized_data = std::shared_ptr(payload, - [](rcutils_char_array_t * msg) { - auto error = rcutils_char_array_fini(msg); - delete msg; - if (error != RCUTILS_RET_OK) { - FAIL() << " Failed to destroy serialized bag message"; - } - }); - writable_storage->write(msg); + for (auto msg : messages) { + auto bag_message = std::make_shared(); + bag_message->serialized_data = make_serialized_message(msg.first); + bag_message->time_stamp = msg.second; + writable_storage->write(bag_message); } } @@ -128,14 +156,32 @@ class StorageTestFixture : public Test readable_storage->open(database_name_); std::vector> read_messages; - std::string message; while (readable_storage->has_next()) { - read_messages.emplace_back(readable_storage->read_next()); + read_messages.push_back(readable_storage->read_next()); } return read_messages; } +protected: + int get_buffer_capacity(std::string message) + { + return write_data_to_serialized_string_message(nullptr, 0, message); + } + + int write_data_to_serialized_string_message( + char * buffer, size_t buffer_capacity, std::string message) + { + // This function also writes the final null charachter, which is absent in the CDR format. + // Here this behaviour is ok, because we only test test writing and reading from/to sqlite. + return rcutils_snprintf(buffer, + buffer_capacity, + "%c%c%c%c%c%c%c%c%s", + 0x00, 0x01, 0x00, 0x00, 0x0f, 0x00, 0x00, 0x00, + message.c_str()); + } + +public: std::string database_name_; static std::string temporary_dir_path_; };