Skip to content

Commit

Permalink
k/server: log errored responses at DEBUG
Browse files Browse the repository at this point in the history
Fixes redpanda-data#12858

Signed-off-by: NyaliaLui <[email protected]>
  • Loading branch information
NyaliaLui committed Aug 25, 2023
1 parent d898c5e commit 2cb55b7
Showing 1 changed file with 68 additions and 40 deletions.
108 changes: 68 additions & 40 deletions src/v/kafka/server/request_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#include <seastar/core/unaligned.hh>
#include <seastar/util/log.hh>

#include <fmt/format.h>

#include <memory>
#include <type_traits>

Expand Down Expand Up @@ -172,46 +174,9 @@ class request_context {
{ r.encode(writer, version) } -> std::same_as<void>;
}
ss::future<response_ptr> respond(ResponseType r) {
/// Many responses contain a throttle_time_ms field, to prevent each
/// handler from manually having to set this value, it can be done in
/// one place here, with this concept check
if constexpr (has_throttle_time_ms<ResponseType>) {
/// Allow request handlers to override the throttle response, if
/// multiple throttles detected, choose larger of the two
r.data.throttle_time_ms = std::max(
r.data.throttle_time_ms, throttle_delay_ms());
}

vlog(
klog.trace,
"[{}:{}] sending {}:{} for {}, response {}",
_conn->client_host(),
_conn->client_port(),
ResponseType::api_type::key,
ResponseType::api_type::name,
_header.client_id,
r);
/// KIP-511 bumps api_versions_request/response to 3, past the first
/// supported flex version for this API, and makes an exception
/// that there will be no tags in the response header.
auto is_flexible = flex_enabled(header().is_flexible());
api_version version = header().version;
if constexpr (std::is_same_v<ResponseType, api_versions_response>) {
is_flexible = flex_enabled::no;
if (r.data.error_code == kafka::error_code::unsupported_version) {
/// Furthermore if the client has made an api_versions_request
/// outside of the max supported version, any assumptions about
/// its ability to understand a response at a given version
/// cannot be made. In this case return api_versions_response at
/// version 0.
version = api_version(0);
}
}

auto resp = std::make_unique<response>(is_flexible);
r.encode(resp->writer(), version);
update_usage_stats(r, resp->buf().size_bytes());
return ss::make_ready_future<response_ptr>(std::move(resp));
return log_response(std::move(r)).then([this](ResponseType r) {
return process_response(std::move(r));
});
}

coordinator_ntp_mapper& coordinator_mapper() {
Expand Down Expand Up @@ -272,6 +237,69 @@ class request_context {
usage_mgr().add_bytes_sent(response_size - internal_bytes_sent);
}

template<typename ResponseType>
ss::future<ResponseType> log_response(ResponseType r) {
auto msg = fmt::format(
"[{}:{}] sending {}:{} for {}, response {}",
_conn->client_host(),
_conn->client_port(),
ResponseType::api_type::key,
ResponseType::api_type::name,
_header.client_id,
r);

return r.data.errored().then(
[msg, r = std::move(r)](auto errored) mutable {
if (errored) {
thread_local static ss::logger::rate_limit rate(
std::chrono::seconds(10));
klog.log(ss::log_level::debug, rate, "{}", msg);
} else {
vlog(klog.trace, "{}", msg);
}
return ss::make_ready_future<ResponseType>(std::move(r));
});
}

template<typename ResponseType>
requires requires(
ResponseType r, protocol::encoder& writer, api_version version) {
{ r.encode(writer, version) } -> std::same_as<void>;
}
ss::future<response_ptr> process_response(ResponseType r) {
/// Many responses contain a throttle_time_ms field, to prevent each
/// handler from manually having to set this value, it can be done in
/// one place here, with this concept check
if constexpr (has_throttle_time_ms<ResponseType>) {
/// Allow request handlers to override the throttle response, if
/// multiple throttles detected, choose larger of the two
r.data.throttle_time_ms = std::max(
r.data.throttle_time_ms, throttle_delay_ms());
}

/// KIP-511 bumps api_versions_request/response to 3, past the first
/// supported flex version for this API, and makes an exception
/// that there will be no tags in the response header.
auto is_flexible = flex_enabled(header().is_flexible());
api_version version = header().version;
if constexpr (std::is_same_v<ResponseType, api_versions_response>) {
is_flexible = flex_enabled::no;
if (r.data.error_code == kafka::error_code::unsupported_version) {
/// Furthermore if the client has made an api_versions_request
/// outside of the max supported version, any assumptions about
/// its ability to understand a response at a given version
/// cannot be made. In this case return api_versions_response at
/// version 0.
version = api_version(0);
}
}

auto resp = std::make_unique<response>(is_flexible);
r.encode(resp->writer(), version);
update_usage_stats(r, resp->buf().size_bytes());
return ss::make_ready_future<response_ptr>(std::move(resp));
}

private:
ss::lw_shared_ptr<connection_context> _conn;
size_t _request_size;
Expand Down

0 comments on commit 2cb55b7

Please sign in to comment.