From 2cb55b7ff817765d89939d592b34b6fa3f281b18 Mon Sep 17 00:00:00 2001 From: NyaliaLui Date: Fri, 25 Aug 2023 12:13:38 -0400 Subject: [PATCH] k/server: log errored responses at DEBUG Fixes #12858 Signed-off-by: NyaliaLui --- src/v/kafka/server/request_context.h | 108 +++++++++++++++++---------- 1 file changed, 68 insertions(+), 40 deletions(-) diff --git a/src/v/kafka/server/request_context.h b/src/v/kafka/server/request_context.h index 46efbab7955ad..bd925a7d91564 100644 --- a/src/v/kafka/server/request_context.h +++ b/src/v/kafka/server/request_context.h @@ -37,6 +37,8 @@ #include #include +#include + #include #include @@ -172,46 +174,9 @@ class request_context { { r.encode(writer, version) } -> std::same_as; } ss::future respond(ResponseType r) { - /// Many responses contain a throttle_time_ms field, to prevent each - /// handler from manually having to set this value, it can be done in - /// one place here, with this concept check - if constexpr (has_throttle_time_ms) { - /// Allow request handlers to override the throttle response, if - /// multiple throttles detected, choose larger of the two - r.data.throttle_time_ms = std::max( - r.data.throttle_time_ms, throttle_delay_ms()); - } - - vlog( - klog.trace, - "[{}:{}] sending {}:{} for {}, response {}", - _conn->client_host(), - _conn->client_port(), - ResponseType::api_type::key, - ResponseType::api_type::name, - _header.client_id, - r); - /// KIP-511 bumps api_versions_request/response to 3, past the first - /// supported flex version for this API, and makes an exception - /// that there will be no tags in the response header. - auto is_flexible = flex_enabled(header().is_flexible()); - api_version version = header().version; - if constexpr (std::is_same_v) { - is_flexible = flex_enabled::no; - if (r.data.error_code == kafka::error_code::unsupported_version) { - /// Furthermore if the client has made an api_versions_request - /// outside of the max supported version, any assumptions about - /// its ability to understand a response at a given version - /// cannot be made. In this case return api_versions_response at - /// version 0. - version = api_version(0); - } - } - - auto resp = std::make_unique(is_flexible); - r.encode(resp->writer(), version); - update_usage_stats(r, resp->buf().size_bytes()); - return ss::make_ready_future(std::move(resp)); + return log_response(std::move(r)).then([this](ResponseType r) { + return process_response(std::move(r)); + }); } coordinator_ntp_mapper& coordinator_mapper() { @@ -272,6 +237,69 @@ class request_context { usage_mgr().add_bytes_sent(response_size - internal_bytes_sent); } + template + ss::future log_response(ResponseType r) { + auto msg = fmt::format( + "[{}:{}] sending {}:{} for {}, response {}", + _conn->client_host(), + _conn->client_port(), + ResponseType::api_type::key, + ResponseType::api_type::name, + _header.client_id, + r); + + return r.data.errored().then( + [msg, r = std::move(r)](auto errored) mutable { + if (errored) { + thread_local static ss::logger::rate_limit rate( + std::chrono::seconds(10)); + klog.log(ss::log_level::debug, rate, "{}", msg); + } else { + vlog(klog.trace, "{}", msg); + } + return ss::make_ready_future(std::move(r)); + }); + } + + template + requires requires( + ResponseType r, protocol::encoder& writer, api_version version) { + { r.encode(writer, version) } -> std::same_as; + } + ss::future process_response(ResponseType r) { + /// Many responses contain a throttle_time_ms field, to prevent each + /// handler from manually having to set this value, it can be done in + /// one place here, with this concept check + if constexpr (has_throttle_time_ms) { + /// Allow request handlers to override the throttle response, if + /// multiple throttles detected, choose larger of the two + r.data.throttle_time_ms = std::max( + r.data.throttle_time_ms, throttle_delay_ms()); + } + + /// KIP-511 bumps api_versions_request/response to 3, past the first + /// supported flex version for this API, and makes an exception + /// that there will be no tags in the response header. + auto is_flexible = flex_enabled(header().is_flexible()); + api_version version = header().version; + if constexpr (std::is_same_v) { + is_flexible = flex_enabled::no; + if (r.data.error_code == kafka::error_code::unsupported_version) { + /// Furthermore if the client has made an api_versions_request + /// outside of the max supported version, any assumptions about + /// its ability to understand a response at a given version + /// cannot be made. In this case return api_versions_response at + /// version 0. + version = api_version(0); + } + } + + auto resp = std::make_unique(is_flexible); + r.encode(resp->writer(), version); + update_usage_stats(r, resp->buf().size_bytes()); + return ss::make_ready_future(std::move(resp)); + } + private: ss::lw_shared_ptr _conn; size_t _request_size;