Skip to content

Commit

Permalink
Merge pull request #8519 from ballard26/broken-promise-rpc
Browse files Browse the repository at this point in the history
Set promise on exceptions in `dispatch_method_once`
  • Loading branch information
dotnwat authored Feb 7, 2023
2 parents 4589ab5 + e47ab0d commit db61371
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 2 deletions.
20 changes: 18 additions & 2 deletions src/v/rpc/rpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,15 @@ ss::future<> rpc_server::dispatch_method_once(
*/
reply_buf.set_version(transport_version::max_supported);
return send_reply_skip_payload(ctx, std::move(reply_buf))
.then([ctx] { ctx->signal_body_parse(); });
.then_wrapped([ctx](ss::future<> f) {
// If the connection is closed then this can be an
// exceptional future.
if (f.failed()) {
ctx->body_parse_exception(f.get_exception());
} else {
ctx->signal_body_parse();
}
});
}
auto it = std::find_if(
_services.begin(),
Expand All @@ -169,7 +177,15 @@ ss::future<> rpc_server::dispatch_method_once(
reply_buf.set_version(ctx->get_header().version);
reply_buf.set_status(s);
return send_reply_skip_payload(ctx, std::move(reply_buf))
.then([ctx] { ctx->signal_body_parse(); });
.then_wrapped([ctx](ss::future<> f) {
// If the connection is closed then this can be an
// exceptional future.
if (f.failed()) {
ctx->body_parse_exception(f.get_exception());
} else {
ctx->signal_body_parse();
}
});
}

method* m = it->get()->method_from_id(method_id);
Expand Down
9 changes: 9 additions & 0 deletions src/v/rpc/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "seastarx.h"
#include "ssx/sformat.h"

#include <seastar/core/future.hh>
#include <seastar/core/reactor.hh>
#include <seastar/core/scheduling.hh>

Expand Down Expand Up @@ -81,6 +82,14 @@ struct service::execution_helper {
method_info method,
Func&& f) {
return ctx.permanent_memory_reservation(ctx.get_header().payload_size)
.handle_exception([&ctx](const std::exception_ptr& e) {
// It's possible to stop all waiters on a semaphore externally
// with the semaphore's `broken` method. In which case
// `permanent_memory_reservation` will return an exception.
// We intercept it here to avoid a broken promise.
ctx.body_parse_exception(e);
return ss::make_exception_future(e);
})
.then([f = std::forward<Func>(f), method, &in, &ctx]() mutable {
return parse_type<Input, Codec>(in, ctx.get_header())
.then_wrapped([f = std::forward<Func>(f),
Expand Down

0 comments on commit db61371

Please sign in to comment.