diff --git a/src/v/rpc/rpc_server.cc b/src/v/rpc/rpc_server.cc index 2889e36ecb22..19e5cbb99414 100644 --- a/src/v/rpc/rpc_server.cc +++ b/src/v/rpc/rpc_server.cc @@ -143,7 +143,15 @@ ss::future<> rpc_server::dispatch_method_once( */ reply_buf.set_version(transport_version::max_supported); return send_reply_skip_payload(ctx, std::move(reply_buf)) - .then([ctx] { ctx->signal_body_parse(); }); + .then_wrapped([ctx](ss::future<> f) { + // If the connection is closed then this can be an + // exceptional future. + if (f.failed()) { + ctx->body_parse_exception(f.get_exception()); + } else { + ctx->signal_body_parse(); + } + }); } auto it = std::find_if( _services.begin(), @@ -169,7 +177,15 @@ ss::future<> rpc_server::dispatch_method_once( reply_buf.set_version(ctx->get_header().version); reply_buf.set_status(s); return send_reply_skip_payload(ctx, std::move(reply_buf)) - .then([ctx] { ctx->signal_body_parse(); }); + .then_wrapped([ctx](ss::future<> f) { + // If the connection is closed then this can be an + // exceptional future. + if (f.failed()) { + ctx->body_parse_exception(f.get_exception()); + } else { + ctx->signal_body_parse(); + } + }); } method* m = it->get()->method_from_id(method_id); diff --git a/src/v/rpc/service.h b/src/v/rpc/service.h index 018a2dbf40c4..6d4360b794c7 100644 --- a/src/v/rpc/service.h +++ b/src/v/rpc/service.h @@ -17,6 +17,7 @@ #include "seastarx.h" #include "ssx/sformat.h" +#include #include #include @@ -81,6 +82,14 @@ struct service::execution_helper { method_info method, Func&& f) { return ctx.permanent_memory_reservation(ctx.get_header().payload_size) + .handle_exception([&ctx](const std::exception_ptr& e) { + // It's possible to stop all waiters on a semaphore externally + // with the semaphore's `broken` method. In which case + // `permanent_memory_reservation` will return an exception. + // We intercept it here to avoid a broken promise. + ctx.body_parse_exception(e); + return ss::make_exception_future(e); + }) .then([f = std::forward(f), method, &in, &ctx]() mutable { return parse_type(in, ctx.get_header()) .then_wrapped([f = std::forward(f),