Skip to content

Commit

Permalink
Merge pull request #13436 from vbotbuildovich/backport-pr-12860-v23.1…
Browse files Browse the repository at this point in the history
….x-114

[v23.1.x] kafka: log errored responses at DEBUG instead of TRACE
  • Loading branch information
NyaliaLui authored Sep 14, 2023
2 parents 6f0199f + 24e1208 commit 4101198
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 9 deletions.
57 changes: 57 additions & 0 deletions src/v/kafka/protocol/schemata/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1035,6 +1035,11 @@ def is_default_comparable(self):
type_name, _ = self._redpanda_type()
return type_name not in WITHOUT_DEFAULT_EQUALITY_OPERATOR

@property
def is_error_code(self):
type_name, _ = self._redpanda_type()
return type_name == "kafka::error_code"


HEADER_TEMPLATE = """
#pragma once
Expand Down Expand Up @@ -1087,6 +1092,9 @@ def is_default_comparable(self):
{%- if struct.is_default_comparable %}
friend bool operator==(const {{ struct.name }}&, const {{ struct.name }}&) = default;
{%- endif %}
{%- if op_type == "response" %}
bool errored() const;
{%- endif %}
{% endmacro %}
namespace kafka {
Expand Down Expand Up @@ -1150,6 +1158,8 @@ class response;
#include "kafka/protocol/response_writer.h"
#include "kafka/protocol/request_reader.h"
#include <algorithm>
#include <fmt/core.h>
#include <fmt/format.h>
#include <fmt/ostream.h>
Expand Down Expand Up @@ -1415,6 +1425,52 @@ class response;
{%- endif %}
{%- endmacro %}
{% macro render_errored_source(struct) %}
{%- if op_type == "response" %}
{%- if not struct.fields %}
bool {{ struct.name }}::errored() const {
return false;
}
{%- else %}
bool {{ struct.name }}::errored() const {
bool ret = false;
{%- for field in struct.fields %}
{%- if field.is_error_code %}
if ({{ field.name }} != kafka::error_code::none) {
return true;
}
{%- elif field.is_array and field.type().value_type().is_struct %}
{%- if field.nullable() %}
if ({{ field.name }}) {
ret = std::any_of(
{{ field.name }}->begin(),
{{ field.name }}->end(),
[](auto& item) { return item.errored(); });
if (ret) {
return ret;
}
}
{%- else %}
ret = std::any_of(
{{ field.name }}.begin(),
{{ field.name }}.end(),
[](auto& item) { return item.errored(); });
if (ret) {
return ret;
}
{%- endif %}
{%- endif %}
{%- endfor %}
return ret;
}
{%- endif %}
{%- endif %}
{% endmacro %}
namespace kafka {
{%- if struct.fields %}
Expand Down Expand Up @@ -1563,6 +1619,7 @@ class response;
return o << "{}";
}
{%- endif %}
{{ render_errored_source(struct) }}
{% endfor %}
}
"""
Expand Down
31 changes: 22 additions & 9 deletions src/v/kafka/server/request_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,28 @@ class request_context {
r.data.throttle_time_ms, throttle_delay_ms());
}

vlog(
klog.trace,
"[{}:{}] sending {}:{} for {}, response {}",
_conn->client_host(),
_conn->client_port(),
ResponseType::api_type::key,
ResponseType::api_type::name,
_header.client_id,
r);
if (r.data.errored()) {
vlog(
klog.debug,
"[{}:{}] sending {}:{} for {}, response {}",
_conn->client_host(),
_conn->client_port(),
ResponseType::api_type::key,
ResponseType::api_type::name,
_header.client_id,
r);
} else {
vlog(
klog.trace,
"[{}:{}] sending {}:{} for {}, response {}",
_conn->client_host(),
_conn->client_port(),
ResponseType::api_type::key,
ResponseType::api_type::name,
_header.client_id,
r);
}

/// KIP-511 bumps api_versions_request/response to 3, past the first
/// supported flex version for this API, and makes an exception
/// that there will be no tags in the response header.
Expand Down

0 comments on commit 4101198

Please sign in to comment.