diff --git a/src/v/kafka/server/connection_context.cc b/src/v/kafka/server/connection_context.cc index a4025d3121cd..833307020961 100644 --- a/src/v/kafka/server/connection_context.cc +++ b/src/v/kafka/server/connection_context.cc @@ -40,6 +40,15 @@ using namespace std::chrono_literals; namespace kafka { +ss::future<> connection_context::process() { + while (true) { + if (is_finished_parsing()) { + break; + } + co_await process_one_request(); + } +} + ss::future<> connection_context::process_one_request() { auto sz = co_await parse_size(conn->input()); if (!sz.has_value()) { diff --git a/src/v/kafka/server/connection_context.h b/src/v/kafka/server/connection_context.h index 5b5ac945dd95..32f38f0df28a 100644 --- a/src/v/kafka/server/connection_context.h +++ b/src/v/kafka/server/connection_context.h @@ -187,12 +187,14 @@ class connection_context final return authorized; } + ss::future<> process(); ss::future<> process_one_request(); - bool is_finished_parsing() const; ss::net::inet_address client_host() const { return _client_addr; } uint16_t client_port() const { return conn ? conn->addr.port() : 0; } private: + bool is_finished_parsing() const; + // Reserve units from memory from the memory semaphore in proportion // to the number of bytes the request procesisng is expected to // take. diff --git a/src/v/kafka/server/server.cc b/src/v/kafka/server/server.cc index f74688491bed..813ab4b8a715 100644 --- a/src/v/kafka/server/server.cc +++ b/src/v/kafka/server/server.cc @@ -213,65 +213,64 @@ ss::future<> server::apply(ss::lw_shared_ptr conn) { mtls_state, config::shard_local_cfg().kafka_request_max_bytes.bind()); - co_return co_await ss::do_until( - [ctx] { return ctx->is_finished_parsing(); }, - [ctx] { return ctx->process_one_request(); }) - .handle_exception([ctx, authn_method](std::exception_ptr eptr) { - auto disconnected = net::is_disconnect_exception(eptr); - if (authn_method == config::broker_authn_method::sasl) { - /* - * This block is a 2x2 matrix of: - * - sasl enabled or disabled - * - message looks like a disconnect or internal error - * - * Disconnects are logged at DEBUG level, because they are - * already recorded at INFO level by the outer RPC layer, - * so we don't want to log two INFO logs for each client - * disconnect. - */ - if (disconnected) { - vlog( - klog.debug, - "Disconnected {} {}:{} ({}, sasl state: {})", - ctx->server().name(), - ctx->client_host(), - ctx->client_port(), - disconnected.value(), - security::sasl_state_to_str(ctx->sasl()->state())); - - } else { - vlog( - klog.warn, - "Error {} {}:{}: {} (sasl state: {})", - ctx->server().name(), - ctx->client_host(), - ctx->client_port(), - eptr, - security::sasl_state_to_str(ctx->sasl()->state())); - } - } else { - if (disconnected) { - vlog( - klog.debug, - "Disconnected {} {}:{} ({})", - ctx->server().name(), - ctx->client_host(), - ctx->client_port(), - disconnected.value()); - - } else { - vlog( - klog.warn, - "Error {} {}:{}: {}", - ctx->server().name(), - ctx->client_host(), - ctx->client_port(), - eptr); - } - } - return ss::make_exception_future(eptr); - }) - .finally([ctx] {}); + try { + co_await ctx->process(); + } catch (...) { + auto eptr = std::current_exception(); + auto disconnected = net::is_disconnect_exception(eptr); + if (authn_method == config::broker_authn_method::sasl) { + /* + * This block is a 2x2 matrix of: + * - sasl enabled or disabled + * - message looks like a disconnect or internal error + * + * Disconnects are logged at DEBUG level, because they are + * already recorded at INFO level by the outer RPC layer, + * so we don't want to log two INFO logs for each client + * disconnect. + */ + if (disconnected) { + vlog( + klog.debug, + "Disconnected {} {}:{} ({}, sasl state: {})", + ctx->server().name(), + ctx->client_host(), + ctx->client_port(), + disconnected.value(), + security::sasl_state_to_str(ctx->sasl()->state())); + + } else { + vlog( + klog.warn, + "Error {} {}:{}: {} (sasl state: {})", + ctx->server().name(), + ctx->client_host(), + ctx->client_port(), + eptr, + security::sasl_state_to_str(ctx->sasl()->state())); + } + } else { + if (disconnected) { + vlog( + klog.debug, + "Disconnected {} {}:{} ({})", + ctx->server().name(), + ctx->client_host(), + ctx->client_port(), + disconnected.value()); + + } else { + vlog( + klog.warn, + "Error {} {}:{}: {}", + ctx->server().name(), + ctx->client_host(), + ctx->client_port(), + eptr); + } + } + std::rethrow_exception(eptr); + } } template<>