Skip to content

Commit

Permalink
Added async_send_request_return_request to return the originating req…
Browse files Browse the repository at this point in the history
…uest
  • Loading branch information
Esteve Fernandez committed Nov 18, 2015
1 parent 24f9df6 commit a5d2747
Showing 1 changed file with 64 additions and 14 deletions.
78 changes: 64 additions & 14 deletions rclcpp/include/rclcpp/client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,20 @@ template<typename ServiceT>
class Client : public ClientBase
{
public:
using Promise = std::promise<typename ServiceT::Response::SharedPtr>;
using SharedRequest = typename ServiceT::Request::SharedPtr;
using SharedResponse = typename ServiceT::Response::SharedPtr;

using Promise = std::promise<SharedResponse>;
using PromiseWithRequest = std::promise<std::pair<SharedRequest, SharedResponse>>;

using SharedPromise = std::shared_ptr<Promise>;
using SharedFuture = std::shared_future<typename ServiceT::Response::SharedPtr>;
using SharedPromiseWithRequest = std::shared_ptr<PromiseWithRequest>;

using SharedFuture = std::shared_future<SharedResponse>;
using SharedFutureWithRequest = std::shared_future<std::pair<SharedRequest, SharedResponse>>;

using CallbackType = std::function<void(SharedFuture)>;
using CallbackWithRequestType = std::function<void(SharedFutureWithRequest)>;

RCLCPP_SMART_PTR_DEFINITIONS(Client);

Expand Down Expand Up @@ -108,24 +117,36 @@ class Client : public ClientBase
int64_t sequence_number = typed_request_header->sequence_number;
// TODO(esteve) this must check if the sequence_number is valid otherwise the
// call_promise will be null
auto tuple = this->pending_requests_[sequence_number];
auto call_promise = std::get<0>(tuple);
auto callback = std::get<1>(tuple);
auto future = std::get<2>(tuple);
this->pending_requests_.erase(sequence_number);
call_promise->set_value(typed_response);
callback(future);
{

This comment has been minimized.

Copy link
@esteve

esteve Nov 18, 2015

Member

Not needed, but I like wrapping scoped locks this way.

std::lock_guard<std::mutex> lock(requests_mutex_);
if (this->pending_requests_.find(sequence_number) != this->pending_requests_.end()) {
auto tuple = this->pending_requests_[sequence_number];
auto call_promise = std::get<0>(tuple);
auto callback = std::get<1>(tuple);
auto future = std::get<2>(tuple);
this->pending_requests_.erase(sequence_number);
call_promise->set_value(typed_response);
callback(future);
} else {
auto tuple = this->pending_requests_with_requests_[sequence_number];
auto request = std::get<0>(tuple);
auto call_promise = std::get<1>(tuple);
auto callback = std::get<2>(tuple);
auto future = std::get<3>(tuple);
this->pending_requests_with_requests_.erase(sequence_number);
auto pair = std::make_pair(request, typed_response);
call_promise->set_value(pair);
callback(future);
}
}
}

SharedFuture async_send_request(
typename ServiceT::Request::SharedPtr request)
SharedFuture async_send_request(SharedRequest request)
{
return async_send_request(request, [](SharedFuture) {});
}

SharedFuture async_send_request(
typename ServiceT::Request::SharedPtr request,
CallbackType && cb)
SharedFuture async_send_request(SharedRequest request, CallbackType && cb)
{
int64_t sequence_number;
if (RMW_RET_OK != rmw_send_request(get_client_handle(), request.get(), &sequence_number)) {
Expand All @@ -142,10 +163,39 @@ class Client : public ClientBase
return f;
}

SharedFutureWithRequest async_send_request_return_request(SharedRequest request)
{
return async_send_request_return_request(request, [](SharedFutureWithRequest) {});
}

SharedFutureWithRequest async_send_request_return_request(
SharedRequest request, CallbackWithRequestType && cb)
{
int64_t sequence_number;
if (RMW_RET_OK != rmw_send_request(get_client_handle(), request.get(), &sequence_number)) {
// *INDENT-OFF* (prevent uncrustify from making unecessary indents here)
throw std::runtime_error(
std::string("failed to send request: ") + rmw_get_error_string_safe());
// *INDENT-ON*
}

SharedPromiseWithRequest call_promise = std::make_shared<PromiseWithRequest>();
SharedFutureWithRequest f(call_promise->get_future());
pending_requests_with_requests_[sequence_number] =
std::make_tuple(request, call_promise, std::forward<CallbackWithRequestType>(cb), f);
return f;
}

private:
RCLCPP_DISABLE_COPY(Client);

std::mutex requests_mutex_;
std::map<int64_t, std::tuple<SharedPromise, CallbackType, SharedFuture>> pending_requests_;
std::map<
int64_t,
std::tuple<
SharedRequest, SharedPromiseWithRequest, CallbackWithRequestType,
SharedFutureWithRequest>> pending_requests_with_requests_;
};

} // namespace client
Expand Down

0 comments on commit a5d2747

Please sign in to comment.