diff --git a/src/v/kafka/protocol/schemata/generator.py b/src/v/kafka/protocol/schemata/generator.py index 3df7453f2b4c..497710c9bc3c 100755 --- a/src/v/kafka/protocol/schemata/generator.py +++ b/src/v/kafka/protocol/schemata/generator.py @@ -1035,6 +1035,11 @@ def is_default_comparable(self): type_name, _ = self._redpanda_type() return type_name not in WITHOUT_DEFAULT_EQUALITY_OPERATOR + @property + def is_error_code(self): + type_name, _ = self._redpanda_type() + return type_name == "kafka::error_code" + HEADER_TEMPLATE = """ #pragma once @@ -1087,6 +1092,9 @@ def is_default_comparable(self): {%- if struct.is_default_comparable %} friend bool operator==(const {{ struct.name }}&, const {{ struct.name }}&) = default; {%- endif %} +{%- if op_type == "response" %} + bool errored() const; +{%- endif %} {% endmacro %} namespace kafka { @@ -1150,6 +1158,8 @@ class response; #include "kafka/protocol/response_writer.h" #include "kafka/protocol/request_reader.h" +#include + #include #include #include @@ -1415,6 +1425,52 @@ class response; {%- endif %} {%- endmacro %} +{% macro render_errored_source(struct) %} +{%- if op_type == "response" %} +{%- if not struct.fields %} +bool {{ struct.name }}::errored() const { + return false; +} +{%- else %} +bool {{ struct.name }}::errored() const { + bool ret = false; + {%- for field in struct.fields %} + {%- if field.is_error_code %} + if ({{ field.name }} != kafka::error_code::none) { + return true; + } + {%- elif field.is_array and field.type().value_type().is_struct %} + {%- if field.nullable() %} + if ({{ field.name }}) { + ret = std::any_of( + {{ field.name }}->begin(), + {{ field.name }}->end(), + [](auto& item) { return item.errored(); }); + + if (ret) { + return ret; + } + } + + {%- else %} + ret = std::any_of( + {{ field.name }}.begin(), + {{ field.name }}.end(), + [](auto& item) { return item.errored(); }); + + if (ret) { + return ret; + } + {%- endif %} + {%- endif %} + {%- endfor %} + + return ret; +} +{%- endif %} +{%- endif %} +{% endmacro %} + namespace kafka { {%- if struct.fields %} @@ -1563,6 +1619,7 @@ class response; return o << "{}"; } {%- endif %} +{{ render_errored_source(struct) }} {% endfor %} } """ diff --git a/src/v/kafka/server/request_context.h b/src/v/kafka/server/request_context.h index a972b7bf160d..ec93352e27f7 100644 --- a/src/v/kafka/server/request_context.h +++ b/src/v/kafka/server/request_context.h @@ -175,15 +175,28 @@ class request_context { r.data.throttle_time_ms, throttle_delay_ms()); } - vlog( - klog.trace, - "[{}:{}] sending {}:{} for {}, response {}", - _conn->client_host(), - _conn->client_port(), - ResponseType::api_type::key, - ResponseType::api_type::name, - _header.client_id, - r); + if (r.data.errored()) { + vlog( + klog.debug, + "[{}:{}] sending {}:{} for {}, response {}", + _conn->client_host(), + _conn->client_port(), + ResponseType::api_type::key, + ResponseType::api_type::name, + _header.client_id, + r); + } else { + vlog( + klog.trace, + "[{}:{}] sending {}:{} for {}, response {}", + _conn->client_host(), + _conn->client_port(), + ResponseType::api_type::key, + ResponseType::api_type::name, + _header.client_id, + r); + } + /// KIP-511 bumps api_versions_request/response to 3, past the first /// supported flex version for this API, and makes an exception /// that there will be no tags in the response header.