Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement services and rmw_wait #16

Merged
merged 37 commits into from
Sep 1, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
8ef7f4d
Add service typesupport class
methylDragon Aug 19, 2020
1605e0e
Reorder CMakeLists dependencies
methylDragon Aug 21, 2020
95a2f0e
Add service creation and destruction
methylDragon Aug 21, 2020
3b9c8ae
Separate server and client sources
methylDragon Aug 21, 2020
8bc43f0
Neaten service impl header
methylDragon Aug 21, 2020
6f800c0
Implement wait
methylDragon Aug 21, 2020
f85f1a2
Change printouts and some comments
methylDragon Aug 21, 2020
21770f1
Implement rmw_take_request
methylDragon Aug 21, 2020
8692cab
Change more printouts
methylDragon Aug 21, 2020
8aabb5a
Add service client creation and request-response handling
methylDragon Aug 24, 2020
3e02b8b
Implement service request-response handling
methylDragon Aug 24, 2020
e5c00cc
Refactor code and improve comments and printouts
methylDragon Aug 24, 2020
3f9677b
Handle clients in rmw_wait and improve comments
methylDragon Aug 24, 2020
b10c8b0
Cleanup wait printouts
methylDragon Aug 24, 2020
eeb838f
Undeclare subscribers and style fixes
methylDragon Aug 24, 2020
102ee2d
Remove extraneous const_cast and fix style
methylDragon Aug 25, 2020
3d33743
Fix CMakeLists
methylDragon Aug 26, 2020
f27d557
Add back missing event handle ignore
methylDragon Aug 28, 2020
64c646b
Add note for returning OK when take cannot find messages
methylDragon Aug 28, 2020
78b6a4c
Refactor how Zenoh request and response topic keys are assigned
methylDragon Aug 28, 2020
206f78f
Avoid 'using' directives in header
methylDragon Aug 28, 2020
3be2dcf
Cleanup comments
methylDragon Aug 28, 2020
23497d4
Massive memory management overhaul
methylDragon Aug 28, 2020
af43236
Clean up client and service assertions and error messages
methylDragon Aug 28, 2020
29f2dbb
Clean up subscription assertions and error messages
methylDragon Aug 28, 2020
05038c3
Cleanup error messages
methylDragon Aug 28, 2020
b89303d
Clean up code style
methylDragon Aug 28, 2020
5452948
Check if Zenoh managed to publish
methylDragon Aug 28, 2020
0c6c9e0
Set failed names in error message
methylDragon Aug 28, 2020
9303df3
Log if availability query already set
methylDragon Aug 28, 2020
ba06e8e
Print warning if messages are clobbered
methylDragon Aug 28, 2020
4d9342d
Fix style
methylDragon Aug 28, 2020
591d0e3
Clean up error messages
methylDragon Aug 28, 2020
f619cad
Update Zenoh resource ID clarification
methylDragon Sep 1, 2020
446ac96
Clean up topic string appends in services
methylDragon Sep 1, 2020
5b6237f
Use stack variable for topic validation
methylDragon Sep 1, 2020
83867da
Fix erroneous publisher line deletion
methylDragon Sep 1, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion rmw_zenoh_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ add_library(rmw_zenoh_cpp
src/rmw_publish.cpp
src/rmw_publisher.cpp
src/rmw_serialization.cpp
src/rmw_services.cpp
src/rmw_service.cpp
src/rmw_client.cpp
src/rmw_subscriber.cpp
src/rmw_wait_sets.cpp

src/impl/identifier.cpp
src/impl/wait_impl.cpp
src/impl/pubsub_impl.cpp
src/impl/service_impl.cpp
src/impl/client_impl.cpp
src/impl/type_support_common.cpp
)
target_link_libraries(rmw_zenoh_cpp
Expand Down
31 changes: 31 additions & 0 deletions rmw_zenoh_cpp/include/rmw_zenoh_cpp/ServiceTypeSupport.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#ifndef RMW_ZENOH_CPP__SERVICETYPESUPPORT_HPP_
#define RMW_ZENOH_CPP__SERVICETYPESUPPORT_HPP_

#include "rosidl_typesupport_zenoh_cpp/message_type_support.h"
#include "rosidl_typesupport_zenoh_cpp/service_type_support.h"
#include "TypeSupport.hpp"

namespace rmw_zenoh_cpp
{

class ServiceTypeSupport : public TypeSupport
{
protected:
ServiceTypeSupport();
};

class RequestTypeSupport : public ServiceTypeSupport
{
public:
explicit RequestTypeSupport(const service_type_support_callbacks_t * members);
};

class ResponseTypeSupport : public ServiceTypeSupport
{
public:
explicit ResponseTypeSupport(const service_type_support_callbacks_t * members);
};

} // namespace rmw_zenoh_cpp

#endif // RMW_ZENOH_CPP__SERVICETYPESUPPORT_HPP_
1 change: 1 addition & 0 deletions rmw_zenoh_cpp/include/rmw_zenoh_cpp/TypeSupport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

namespace rmw_zenoh_cpp
{

class TypeSupport
{
public:
Expand Down
61 changes: 61 additions & 0 deletions rmw_zenoh_cpp/src/impl/client_impl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
extern "C"
{
#include "zenoh/zenoh-ffi.h"
}

#include <iostream>
#include <mutex>

#include "rmw_zenoh_cpp/TypeSupport.hpp"
#include "client_impl.hpp"

std::mutex response_callback_mutex;
std::mutex query_callback_mutex;

// Static sequence ID
int64_t rmw_client_data_t::sequence_id = -1;

// Static response ROS message map
std::unordered_map<std::string, std::vector<unsigned char> >
rmw_client_data_t::zn_response_messages_;

// Static availability query Zenoh response set
std::unordered_set<std::string> rmw_client_data_t::zn_availability_query_responses_;

/// ZENOH RESPONSE SUBSCRIPTION CALLBACK =======================================
void rmw_client_data_t::zn_response_sub_callback(const zn_sample * sample) {
// Prevent race conditions...
std::lock_guard<std::mutex> guard(response_callback_mutex);

// NOTE(CH3): We unfortunately have to do this copy construction since we shouldn't be using
// char * as keys to the unordered_map
std::string key(sample->key.val, sample->key.len);

// Vector to store the byte array (so we have a copyable type instead of a pointer)
std::vector<unsigned char> byte_vec(sample->value.val, sample->value.val + sample->value.len);

// Fill the static response message map with the latest received message
//
// NOTE(CH3): This means that the queue size for each topic is ONE for now!!
gbiggs marked this conversation as resolved.
Show resolved Hide resolved
// So this might break if a service is being spammed.
// TODO(CH3): Implement queuing logic
rmw_client_data_t::zn_response_messages_[key] = std::vector<unsigned char>(byte_vec);
}

/// ZENOH SERVICE AVAILABILITY QUERY CALLBACK ==================================
void rmw_client_data_t::zn_service_availability_query_callback(const zn_source_info * info,
const zn_sample * sample) {
// Prevent race conditions...
std::lock_guard<std::mutex> guard(query_callback_mutex);

// NOTE(CH3): We unfortunately have to do this copy construction since we shouldn't be using
// char * as keys to the unordered_map
std::string key(sample->key.val, sample->key.len);

// Insert if key not found in query response set
if (rmw_client_data_t::zn_availability_query_responses_.find(key)
gbiggs marked this conversation as resolved.
Show resolved Hide resolved
== rmw_client_data_t::zn_availability_query_responses_.end())
{
rmw_client_data_t::zn_availability_query_responses_.insert(key);
}
}
58 changes: 58 additions & 0 deletions rmw_zenoh_cpp/src/impl/client_impl.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#ifndef RMW_ZENOH_CPP__ZENOH_CLIENT_HPP_
#define RMW_ZENOH_CPP__ZENOH_CLIENT_HPP_

#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <string>
#include <vector>

extern "C"
{
#include "zenoh/zenoh-ffi.h"
}

#include "rmw/rmw.h"
#include "rmw_zenoh_cpp/TypeSupport.hpp"

struct rmw_client_data_t
{
/// STATIC MEMBERS ===========================================================
static void zn_response_sub_callback(const zn_sample * sample);
static void zn_service_availability_query_callback(
const zn_source_info * info, const zn_sample * sample
);

// Sequence id
static int64_t sequence_id;

// Serialized ROS response messages
static std::unordered_map<std::string, std::vector<unsigned char> > zn_response_messages_;

// Availability query Zenoh responses
static std::unordered_set<std::string> zn_availability_query_responses_;

/// TYPE SUPPORT =============================================================
const void * request_type_support_impl_;
const void * response_type_support_impl_;
const char * typesupport_identifier_;

rmw_zenoh_cpp::TypeSupport * request_type_support_;
rmw_zenoh_cpp::TypeSupport * response_type_support_;

/// ZENOH ====================================================================
ZNSession * zn_session_;

// Response Sub
const char * zn_response_topic_key_;
ZNSubscriber * zn_response_subscriber_;

// Request Pub
const char * zn_request_topic_key_;
size_t zn_request_topic_id_;

/// ROS ======================================================================
const rmw_node_t * node_;
};

#endif // RMW_ZENOH_CPP__ZENOH_CLIENT_HPP_
18 changes: 12 additions & 6 deletions rmw_zenoh_cpp/src/impl/pubsub_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ extern "C"
#include "zenoh/zenoh-ffi.h"
}

#include "rmw_zenoh_cpp/TypeSupport.hpp"
#include "pubsub_impl.hpp"
#include <iostream>
#include <mutex>

#include "rmw_zenoh_cpp/TypeSupport.hpp"
#include "pubsub_impl.hpp"

std::mutex sub_callback_mutex;

// Declaring static members
// Static message map
std::unordered_map<std::string, std::vector<unsigned char> >
rmw_subscription_data_t::zn_messages_;

void rmw_subscription_data_t::zn_sub_callback(const zn_sample * sample) {
// Prevent race conditions...
std::lock_guard<std::mutex> guard(sub_callback_mutex);
Expand All @@ -22,8 +26,10 @@ void rmw_subscription_data_t::zn_sub_callback(const zn_sample * sample) {
// Vector to store the byte array (so we have a copyable type instead of a pointer)
std::vector<unsigned char> byte_vec(sample->value.val, sample->value.val + sample->value.len);

// Fill the static message map with the latest received message
//
// NOTE(CH3): This means that the queue size for each topic is ONE for now!!
gbiggs marked this conversation as resolved.
Show resolved Hide resolved
// So this might break if a topic is being spammed.
// TODO(CH3): Implement queuing logic
rmw_subscription_data_t::zn_messages_[key] = std::vector<unsigned char>(byte_vec);
}

std::unordered_map<std::string, std::vector<unsigned char> >
rmw_subscription_data_t::zn_messages_;
1 change: 0 additions & 1 deletion rmw_zenoh_cpp/src/impl/pubsub_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ struct rmw_publisher_data_t
// Functionally a struct. But with a method for handling incoming Zenoh messages
struct rmw_subscription_data_t
{
// TODO(CH3): If needed, implement lock guards/mutexes to prevent race conditions
static void zn_sub_callback(const zn_sample * sample);

// Map of Zenoh topic key expression to latest serialized ROS messages
Expand Down
35 changes: 35 additions & 0 deletions rmw_zenoh_cpp/src/impl/service_impl.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
extern "C"
{
#include "zenoh/zenoh-ffi.h"
}

#include <iostream>
#include <mutex>

#include "rmw_zenoh_cpp/TypeSupport.hpp"
#include "service_impl.hpp"

std::mutex request_callback_mutex;

// Static request message map
std::unordered_map<std::string, std::vector<unsigned char> >
rmw_service_data_t::zn_request_messages_;

void rmw_service_data_t::zn_request_sub_callback(const zn_sample * sample) {
// Prevent race conditions...
std::lock_guard<std::mutex> guard(request_callback_mutex);

// NOTE(CH3): We unfortunately have to do this copy construction since we shouldn't be using
// char * as keys to the unordered_map
std::string key(sample->key.val, sample->key.len);

// Vector to store the byte array (so we have a copyable type instead of a pointer)
std::vector<unsigned char> byte_vec(sample->value.val, sample->value.val + sample->value.len);

// Fill the static request message map with the latest received message
//
// NOTE(CH3): This means that the queue size for each topic is ONE for now!!
gbiggs marked this conversation as resolved.
Show resolved Hide resolved
// So this might break if a service is being spammed.
// TODO(CH3): Implement queuing logic
rmw_service_data_t::zn_request_messages_[key] = std::vector<unsigned char>(byte_vec);
}
49 changes: 49 additions & 0 deletions rmw_zenoh_cpp/src/impl/service_impl.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#ifndef RMW_ZENOH_CPP__ZENOH_SERVICE_HPP_
#define RMW_ZENOH_CPP__ZENOH_SERVICE_HPP_

#include <unordered_map>
#include <utility>
#include <string>
#include <vector>

extern "C"
{
#include "zenoh/zenoh-ffi.h"
}

#include "rmw/rmw.h"
#include "rmw_zenoh_cpp/TypeSupport.hpp"

struct rmw_service_data_t
{
/// STATIC MEMBERS ===========================================================
static void zn_request_sub_callback(const zn_sample * sample);

// Serialized ROS request messages
static std::unordered_map<std::string, std::vector<unsigned char> > zn_request_messages_;

/// TYPE SUPPORT =============================================================
const void * request_type_support_impl_;
const void * response_type_support_impl_;
const char * typesupport_identifier_;

rmw_zenoh_cpp::TypeSupport * request_type_support_;
rmw_zenoh_cpp::TypeSupport * response_type_support_;

/// ZENOH ====================================================================
ZNSession * zn_session_;
ZNQueryable * zn_queryable_;

// Request Sub
const char * zn_request_topic_key_;
ZNSubscriber * zn_request_subscriber_;

// Response Pub
const char * zn_response_topic_key_;
size_t zn_response_topic_id_;

/// ROS ======================================================================
const rmw_node_t * node_;
};

#endif // RMW_ZENOH_CPP__ZENOH_SERVICE_HPP_
52 changes: 25 additions & 27 deletions rmw_zenoh_cpp/src/impl/type_support_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,34 +99,32 @@ MessageTypeSupport::MessageTypeSupport(const message_type_support_callbacks_t *
set_members(members);
}

ServiceTypeSupport::ServiceTypeSupport()
{
}

RequestTypeSupport::RequestTypeSupport(const service_type_support_callbacks_t * members)
{
assert(members);

auto msg = static_cast<const message_type_support_callbacks_t *>(
members->request_members_->data);
// std::string name = _create_type_name(msg); // + "Request_";
gbiggs marked this conversation as resolved.
Show resolved Hide resolved
// this->setName(name.c_str());

set_members(msg);
}

ResponseTypeSupport::ResponseTypeSupport(const service_type_support_callbacks_t * members)
{
assert(members);

// ServiceTypeSupport::ServiceTypeSupport()
// {
// }
//
// RequestTypeSupport::RequestTypeSupport(const service_type_support_callbacks_t * members)
// {
// assert(members);
//
// auto msg = static_cast<const message_type_support_callbacks_t *>(
// members->request_members_->data);
// std::string name = _create_type_name(msg); // + "Request_";
// this->setName(name.c_str());
//
// set_members(msg);
// }
//
// ResponseTypeSupport::ResponseTypeSupport(const service_type_support_callbacks_t * members)
// {
// assert(members);
//
// auto msg = static_cast<const message_type_support_callbacks_t *>(
// members->response_members_->data);
// std::string name = _create_type_name(msg); // + "Response_";
// this->setName(name.c_str());
//
// set_members(msg);
// }
auto msg = static_cast<const message_type_support_callbacks_t *>(
members->response_members_->data);
// std::string name = _create_type_name(msg); // + "Response_";
gbiggs marked this conversation as resolved.
Show resolved Hide resolved
// this->setName(name.c_str());

set_members(msg);
}

} // namespace rmw_zenoh_cpp
8 changes: 4 additions & 4 deletions rmw_zenoh_cpp/src/impl/type_support_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@
#include "rmw_zenoh_cpp/TypeSupport.hpp"

#include "rmw_zenoh_cpp/MessageTypeSupport.hpp"
// #include "rmw_zenoh_cpp/ServiceTypeSupport.hpp"
#include "rmw_zenoh_cpp/ServiceTypeSupport.hpp"

#include "rmw_zenoh_cpp/identifier.hpp"

#include "rosidl_typesupport_zenoh_c/identifier.h"
#include "rosidl_typesupport_zenoh_cpp/identifier.hpp"
#include "rosidl_typesupport_zenoh_cpp/message_type_support.h"
// #include "rosidl_typesupport_zenoh_cpp/service_type_support.h"
#include "rosidl_typesupport_zenoh_cpp/service_type_support.h"

#define RMW_ZENOH_CPP_TYPESUPPORT_C rosidl_typesupport_zenoh_c__identifier
#define RMW_ZENOH_CPP_TYPESUPPORT_CPP rosidl_typesupport_zenoh_cpp::typesupport_identifier

using TypeSupport_cpp = rmw_zenoh_cpp::TypeSupport;
using MessageTypeSupport_cpp = rmw_zenoh_cpp::MessageTypeSupport;
// using RequestTypeSupport_cpp = rmw_zenoh_cpp::RequestTypeSupport;
// using ResponseTypeSupport_cpp = rmw_zenoh_cpp::ResponseTypeSupport;
using RequestTypeSupport_cpp = rmw_zenoh_cpp::RequestTypeSupport;
gbiggs marked this conversation as resolved.
Show resolved Hide resolved
using ResponseTypeSupport_cpp = rmw_zenoh_cpp::ResponseTypeSupport;

// inline std::string
// _create_type_name(
Expand Down
Loading