Skip to content

Commit

Permalink
kafka/server: integrate metrics for bytes sent/recv into connection_c…
Browse files Browse the repository at this point in the history
…ontext
  • Loading branch information
ballard26 committed May 25, 2023
1 parent d78ed20 commit 9e42f80
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ ss::future<> connection_context::process_one_request() {
_server.probe().header_corrupted();
co_return;
}
_server.handler_probe(h->key).add_bytes_received(sz.value());

try {
co_return co_await dispatch_method_once(
Expand Down Expand Up @@ -523,10 +524,12 @@ ss::future<> connection_context::maybe_process_responses() {
// throttle_ms has been serialized long ago already. With the current
// approach, egress token bucket level will always be an extra burst
// into the negative while under pressure.
if (_kafka_throughput_controlled_api_keys().at(
resp_and_res.resources->request_data.request_key)) {
_server.snc_quota_mgr().record_response(msg.size());
auto response_size = msg.size();
auto request_key = resp_and_res.resources->request_data.request_key;
if (_kafka_throughput_controlled_api_keys().at(request_key)) {
_server.snc_quota_mgr().record_response(response_size);
}
_server.handler_probe(request_key).add_bytes_sent(response_size);
try {
return conn->write(std::move(msg))
.then([] {
Expand Down

0 comments on commit 9e42f80

Please sign in to comment.