From 57dbe14805cba9ee3cae9ba34625c115465947e2 Mon Sep 17 00:00:00 2001 From: Brandon Allard Date: Sun, 23 Jun 2024 16:22:27 -0400 Subject: [PATCH] treewide: remove coroutines in validate_fetch_offset callees The most common path for validate_fetch_offset results in a number of short lived coroutines. In these cases the allocation/deallocation for the coroutine's frame ends up dominating the runtime for the function. This commit removes the coroutines in favor for then chains which can avoid the allocation if the task quota hasn't been met. --- src/v/cluster/log_eviction_stm.cc | 19 ++-- src/v/kafka/server/replicated_partition.cc | 102 +++++++++++---------- src/v/raft/persisted_stm.cc | 22 +++-- 3 files changed, 77 insertions(+), 66 deletions(-) diff --git a/src/v/cluster/log_eviction_stm.cc b/src/v/cluster/log_eviction_stm.cc index 862f3470ab954..3af07b8dbcbcc 100644 --- a/src/v/cluster/log_eviction_stm.cc +++ b/src/v/cluster/log_eviction_stm.cc @@ -225,14 +225,17 @@ log_eviction_stm::sync_start_offset_override( /// useful to know if the start offset is up to date in the case /// leadership has recently changed for example. auto term = _raft->term(); - if (!co_await sync(timeout)) { - if (term != _raft->term()) { - co_return errc::not_leader; - } else { - co_return errc::timeout; - } - } - co_return start_offset_override(); + return sync(timeout).then( + [this, term](bool success) -> result { + if (!success) { + if (term != _raft->term()) { + return errc::not_leader; + } else { + return errc::timeout; + } + } + return start_offset_override(); + }); } model::offset log_eviction_stm::effective_start_offset() const { diff --git a/src/v/kafka/server/replicated_partition.cc b/src/v/kafka/server/replicated_partition.cc index 77536cba6d308..0840eeb6cf6bc 100644 --- a/src/v/kafka/server/replicated_partition.cc +++ b/src/v/kafka/server/replicated_partition.cc @@ -45,30 +45,32 @@ const model::ntp& replicated_partition::ntp() const { ss::future> replicated_partition::sync_effective_start( model::timeout_clock::duration timeout) { - auto synced_start_offset_override - = co_await _partition->sync_kafka_start_offset_override(timeout); - if (synced_start_offset_override.has_failure()) { - auto err = synced_start_offset_override.error(); - auto error_code = error_code::unknown_server_error; - if (err.category() == cluster::error_category()) { - switch (cluster::errc(err.value())) { - /** - * In the case of timeout and shutting down errors return - * not_leader_for_partition error to force clients retry. - */ - case cluster::errc::shutting_down: - case cluster::errc::not_leader: - case cluster::errc::timeout: - error_code = error_code::not_leader_for_partition; - break; - default: - error_code = error_code::unknown_server_error; - } - } - co_return error_code; - } - co_return kafka_start_offset_with_override( - synced_start_offset_override.value()); + return _partition->sync_kafka_start_offset_override(timeout).then( + [this](auto synced_start_offset_override) + -> result { + if (synced_start_offset_override.has_failure()) { + auto err = synced_start_offset_override.error(); + auto error_code = error_code::unknown_server_error; + if (err.category() == cluster::error_category()) { + switch (cluster::errc(err.value())) { + /** + * In the case of timeout and shutting down errors return + * not_leader_for_partition error to force clients retry. + */ + case cluster::errc::shutting_down: + case cluster::errc::not_leader: + case cluster::errc::timeout: + error_code = error_code::not_leader_for_partition; + break; + default: + error_code = error_code::unknown_server_error; + } + } + return error_code; + } + return kafka_start_offset_with_override( + synced_start_offset_override.value()); + }); } model::offset replicated_partition::start_offset() const { @@ -575,36 +577,40 @@ ss::future replicated_partition::validate_fetch_offset( ec); } - co_return ec; + return ss::make_ready_future(ec); } // Grab the up to date start offset auto timeout = deadline - model::timeout_clock::now(); - auto start_offset = co_await sync_effective_start(timeout); - if (!start_offset) { - vlog( - klog.warn, - "ntp {}: error obtaining latest start offset - {}", - ntp(), - start_offset.error()); - co_return start_offset.error(); - } + return sync_effective_start(timeout).then( + [this, fetch_offset](auto start_offset) { + if (!start_offset) { + vlog( + klog.warn, + "ntp {}: error obtaining latest start offset - {}", + ntp(), + start_offset.error()); + return start_offset.error(); + } - if ( - fetch_offset < start_offset.value() || fetch_offset > log_end_offset()) { - vlog( - klog.warn, - "ntp {}: fetch offset_out_of_range on leader, requested: {}, " - "partition start offset: {}, high watermark: {}, log end offset: {}", - ntp(), - fetch_offset, - start_offset.value(), - high_watermark(), - log_end_offset()); - co_return error_code::offset_out_of_range; - } + if ( + fetch_offset < start_offset.value() + || fetch_offset > log_end_offset()) { + vlog( + klog.warn, + "ntp {}: fetch offset_out_of_range on leader, requested: {}, " + "partition start offset: {}, high watermark: {}, log end " + "offset: {}", + ntp(), + fetch_offset, + start_offset.value(), + high_watermark(), + log_end_offset()); + return error_code::offset_out_of_range; + } - co_return error_code::none; + return error_code::none; + }); } result replicated_partition::get_partition_info() const { diff --git a/src/v/raft/persisted_stm.cc b/src/v/raft/persisted_stm.cc index 1a42dbb53f2ed..3ae7802a79ace 100644 --- a/src/v/raft/persisted_stm.cc +++ b/src/v/raft/persisted_stm.cc @@ -491,16 +491,16 @@ ss::future persisted_stm::sync(model::timeout_clock::duration timeout) { auto term = _raft->term(); if (!_raft->is_leader()) { - co_return false; + return ss::make_ready_future(false); } if (_insync_term == term) { - co_return true; + return ss::make_ready_future(true); } if (_is_catching_up) { auto deadline = model::timeout_clock::now() + timeout; auto sync_waiter = ss::make_lw_shared>(); _sync_waiters.push_back(sync_waiter); - co_return co_await sync_waiter->get_future_with_timeout( + return sync_waiter->get_future_with_timeout( deadline, [] { return false; }); } _is_catching_up = true; @@ -526,14 +526,16 @@ persisted_stm::sync(model::timeout_clock::duration timeout) { // of the term yet. sync_offset = log_offsets.dirty_offset; } - auto is_synced = co_await do_sync(timeout, sync_offset, term); - _is_catching_up = false; - for (auto& sync_waiter : _sync_waiters) { - sync_waiter->set_value(is_synced); - } - _sync_waiters.clear(); - co_return is_synced; + return do_sync(timeout, sync_offset, term).then([this](bool is_synced) { + _is_catching_up = false; + for (auto& sync_waiter : _sync_waiters) { + sync_waiter->set_value(is_synced); + } + _sync_waiters.clear(); + + return is_synced; + }); } template