Skip to content

Commit

Permalink
rpc/rpc_server: set promise on exceptions in dispatch_method_once
Browse files Browse the repository at this point in the history
Changes the logic of `dispatch_method_once` to avoid a broken promise
when the connection is closed before the input is skipped for unknown
methods and unknown versions. This ensures that we are always setting
the body_parse promise after reading in all the input and before we send
a reply.
  • Loading branch information
ballard26 committed Feb 1, 2023
1 parent 11ecec6 commit dae07f4
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
20 changes: 18 additions & 2 deletions src/v/rpc/rpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,15 @@ ss::future<> rpc_server::dispatch_method_once(
*/
reply_buf.set_version(transport_version::max_supported);
return send_reply_skip_payload(ctx, std::move(reply_buf))
.then([ctx] { ctx->signal_body_parse(); });
.then_wrapped([ctx](ss::future<> f) {
// If the connection is closed then this can be an
// exceptional future.
if (f.failed()) {
ctx->body_parse_exception(f.get_exception());
} else {
ctx->signal_body_parse();
}
});
}
auto it = std::find_if(
_services.begin(),
Expand All @@ -162,7 +170,15 @@ ss::future<> rpc_server::dispatch_method_once(
reply_buf.set_version(ctx->get_header().version);
reply_buf.set_status(rpc::status::method_not_found);
return send_reply_skip_payload(ctx, std::move(reply_buf))
.then([ctx] { ctx->signal_body_parse(); });
.then_wrapped([ctx](ss::future<> f) {
// If the connection is closed then this can be an
// exceptional future.
if (f.failed()) {
ctx->body_parse_exception(f.get_exception());
} else {
ctx->signal_body_parse();
}
});
}

method* m = it->get()->method_from_id(method_id);
Expand Down
9 changes: 9 additions & 0 deletions src/v/rpc/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <seastar/core/scheduling.hh>

#include <cstdint>
#include <exception>

namespace rpc {

Expand Down Expand Up @@ -81,6 +82,14 @@ struct service::execution_helper {
method_info method,
Func&& f) {
return ctx.permanent_memory_reservation(ctx.get_header().payload_size)
.handle_exception([&ctx](const std::exception_ptr& e) {
// It's possible to stop all waiters on a semaphore externally
// with the semaphore's `broken` method. In which case
// `permanent_memory_reservation` will return an exception.
// We intercept it here to avoid a broken promise.
ctx.body_parse_exception(e);
std::rethrow_exception(e);
})
.then([f = std::forward<Func>(f), method, &in, &ctx]() mutable {
return parse_type<Input, Codec>(in, ctx.get_header())
.then_wrapped([f = std::forward<Func>(f),
Expand Down

0 comments on commit dae07f4

Please sign in to comment.