Skip to content

Commit

Permalink
Merge pull request #8135 from dotnwat/connection-context-loop
Browse files Browse the repository at this point in the history
kafka: move processing loop into connect context
  • Loading branch information
dotnwat authored Jan 10, 2023
2 parents 40ab203 + ba91761 commit 5ffa022
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 60 deletions.
9 changes: 9 additions & 0 deletions src/v/kafka/server/connection_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ using namespace std::chrono_literals;

namespace kafka {

ss::future<> connection_context::process() {
while (true) {
if (is_finished_parsing()) {
break;
}
co_await process_one_request();
}
}

ss::future<> connection_context::process_one_request() {
auto sz = co_await parse_size(conn->input());
if (!sz.has_value()) {
Expand Down
4 changes: 3 additions & 1 deletion src/v/kafka/server/connection_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,14 @@ class connection_context final
return authorized;
}

ss::future<> process();
ss::future<> process_one_request();
bool is_finished_parsing() const;
ss::net::inet_address client_host() const { return _client_addr; }
uint16_t client_port() const { return conn ? conn->addr.port() : 0; }

private:
bool is_finished_parsing() const;

// Reserve units from memory from the memory semaphore in proportion
// to the number of bytes the request procesisng is expected to
// take.
Expand Down
117 changes: 58 additions & 59 deletions src/v/kafka/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -213,65 +213,64 @@ ss::future<> server::apply(ss::lw_shared_ptr<net::connection> conn) {
mtls_state,
config::shard_local_cfg().kafka_request_max_bytes.bind());

co_return co_await ss::do_until(
[ctx] { return ctx->is_finished_parsing(); },
[ctx] { return ctx->process_one_request(); })
.handle_exception([ctx, authn_method](std::exception_ptr eptr) {
auto disconnected = net::is_disconnect_exception(eptr);
if (authn_method == config::broker_authn_method::sasl) {
/*
* This block is a 2x2 matrix of:
* - sasl enabled or disabled
* - message looks like a disconnect or internal error
*
* Disconnects are logged at DEBUG level, because they are
* already recorded at INFO level by the outer RPC layer,
* so we don't want to log two INFO logs for each client
* disconnect.
*/
if (disconnected) {
vlog(
klog.debug,
"Disconnected {} {}:{} ({}, sasl state: {})",
ctx->server().name(),
ctx->client_host(),
ctx->client_port(),
disconnected.value(),
security::sasl_state_to_str(ctx->sasl()->state()));

} else {
vlog(
klog.warn,
"Error {} {}:{}: {} (sasl state: {})",
ctx->server().name(),
ctx->client_host(),
ctx->client_port(),
eptr,
security::sasl_state_to_str(ctx->sasl()->state()));
}
} else {
if (disconnected) {
vlog(
klog.debug,
"Disconnected {} {}:{} ({})",
ctx->server().name(),
ctx->client_host(),
ctx->client_port(),
disconnected.value());

} else {
vlog(
klog.warn,
"Error {} {}:{}: {}",
ctx->server().name(),
ctx->client_host(),
ctx->client_port(),
eptr);
}
}
return ss::make_exception_future(eptr);
})
.finally([ctx] {});
try {
co_await ctx->process();
} catch (...) {
auto eptr = std::current_exception();
auto disconnected = net::is_disconnect_exception(eptr);
if (authn_method == config::broker_authn_method::sasl) {
/*
* This block is a 2x2 matrix of:
* - sasl enabled or disabled
* - message looks like a disconnect or internal error
*
* Disconnects are logged at DEBUG level, because they are
* already recorded at INFO level by the outer RPC layer,
* so we don't want to log two INFO logs for each client
* disconnect.
*/
if (disconnected) {
vlog(
klog.debug,
"Disconnected {} {}:{} ({}, sasl state: {})",
ctx->server().name(),
ctx->client_host(),
ctx->client_port(),
disconnected.value(),
security::sasl_state_to_str(ctx->sasl()->state()));

} else {
vlog(
klog.warn,
"Error {} {}:{}: {} (sasl state: {})",
ctx->server().name(),
ctx->client_host(),
ctx->client_port(),
eptr,
security::sasl_state_to_str(ctx->sasl()->state()));
}
} else {
if (disconnected) {
vlog(
klog.debug,
"Disconnected {} {}:{} ({})",
ctx->server().name(),
ctx->client_host(),
ctx->client_port(),
disconnected.value());

} else {
vlog(
klog.warn,
"Error {} {}:{}: {}",
ctx->server().name(),
ctx->client_host(),
ctx->client_port(),
eptr);
}
}
std::rethrow_exception(eptr);
}
}

template<>
Expand Down

0 comments on commit 5ffa022

Please sign in to comment.