Skip to content

Commit

Permalink
treewide: remove coroutines in validate_fetch_offset callees
Browse files Browse the repository at this point in the history
The most common path for validate_fetch_offset results in a number of
short lived coroutines. In these cases the allocation/deallocation for
the coroutine's frame ends up dominating the runtime for the function.

This commit removes the coroutines in favor for then chains which can
avoid the allocation if the task quota hasn't been met.
  • Loading branch information
ballard26 committed Jun 27, 2024
1 parent 25b8841 commit 59ee771
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 66 deletions.
19 changes: 11 additions & 8 deletions src/v/cluster/log_eviction_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -254,14 +254,17 @@ log_eviction_stm::sync_kafka_start_offset_override(
/// useful to know if the start offset is up to date in the case
/// leadership has recently changed for example.
auto term = _raft->term();
if (!co_await sync(timeout)) {
if (term != _raft->term()) {
co_return errc::not_leader;
} else {
co_return errc::timeout;
}
}
co_return kafka_start_offset_override();
return sync(timeout).then(
[this, term](bool success) -> result<kafka::offset, std::error_code> {
if (!success) {
if (term != _raft->term()) {
return errc::not_leader;
} else {
return errc::timeout;
}
}
return kafka_start_offset_override();
});
}

model::offset log_eviction_stm::effective_start_offset() const {
Expand Down
102 changes: 54 additions & 48 deletions src/v/kafka/server/replicated_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,30 +45,32 @@ const model::ntp& replicated_partition::ntp() const {
ss::future<result<model::offset, error_code>>
replicated_partition::sync_effective_start(
model::timeout_clock::duration timeout) {
auto synced_start_offset_override
= co_await _partition->sync_kafka_start_offset_override(timeout);
if (synced_start_offset_override.has_failure()) {
auto err = synced_start_offset_override.error();
auto error_code = error_code::unknown_server_error;
if (err.category() == cluster::error_category()) {
switch (cluster::errc(err.value())) {
/**
* In the case of timeout and shutting down errors return
* not_leader_for_partition error to force clients retry.
*/
case cluster::errc::shutting_down:
case cluster::errc::not_leader:
case cluster::errc::timeout:
error_code = error_code::not_leader_for_partition;
break;
default:
error_code = error_code::unknown_server_error;
}
}
co_return error_code;
}
co_return kafka_start_offset_with_override(
synced_start_offset_override.value());
return _partition->sync_kafka_start_offset_override(timeout).then(
[this](auto synced_start_offset_override)
-> result<model::offset, error_code> {
if (synced_start_offset_override.has_failure()) {
auto err = synced_start_offset_override.error();
auto error_code = error_code::unknown_server_error;
if (err.category() == cluster::error_category()) {
switch (cluster::errc(err.value())) {
/**
* In the case of timeout and shutting down errors return
* not_leader_for_partition error to force clients retry.
*/
case cluster::errc::shutting_down:
case cluster::errc::not_leader:
case cluster::errc::timeout:
error_code = error_code::not_leader_for_partition;
break;
default:
error_code = error_code::unknown_server_error;
}
}
return error_code;
}
return kafka_start_offset_with_override(
synced_start_offset_override.value());
});
}

model::offset replicated_partition::start_offset() const {
Expand Down Expand Up @@ -575,36 +577,40 @@ ss::future<error_code> replicated_partition::validate_fetch_offset(
ec);
}

co_return ec;
return ss::make_ready_future<error_code>(ec);
}

// Grab the up to date start offset
auto timeout = deadline - model::timeout_clock::now();
auto start_offset = co_await sync_effective_start(timeout);
if (!start_offset) {
vlog(
klog.warn,
"ntp {}: error obtaining latest start offset - {}",
ntp(),
start_offset.error());
co_return start_offset.error();
}
return sync_effective_start(timeout).then(
[this, fetch_offset](auto start_offset) {
if (!start_offset) {
vlog(
klog.warn,
"ntp {}: error obtaining latest start offset - {}",
ntp(),
start_offset.error());
return start_offset.error();
}

if (
fetch_offset < start_offset.value() || fetch_offset > log_end_offset()) {
vlog(
klog.warn,
"ntp {}: fetch offset_out_of_range on leader, requested: {}, "
"partition start offset: {}, high watermark: {}, log end offset: {}",
ntp(),
fetch_offset,
start_offset.value(),
high_watermark(),
log_end_offset());
co_return error_code::offset_out_of_range;
}
if (
fetch_offset < start_offset.value()
|| fetch_offset > log_end_offset()) {
vlog(
klog.warn,
"ntp {}: fetch offset_out_of_range on leader, requested: {}, "
"partition start offset: {}, high watermark: {}, log end "
"offset: {}",
ntp(),
fetch_offset,
start_offset.value(),
high_watermark(),
log_end_offset());
return error_code::offset_out_of_range;
}

co_return error_code::none;
return error_code::none;
});
}

result<partition_info> replicated_partition::get_partition_info() const {
Expand Down
22 changes: 12 additions & 10 deletions src/v/raft/persisted_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,16 +491,16 @@ ss::future<bool>
persisted_stm<T>::sync(model::timeout_clock::duration timeout) {
auto term = _raft->term();
if (!_raft->is_leader()) {
co_return false;
return ss::make_ready_future<bool>(false);
}
if (_insync_term == term) {
co_return true;
return ss::make_ready_future<bool>(true);
}
if (_is_catching_up) {
auto deadline = model::timeout_clock::now() + timeout;
auto sync_waiter = ss::make_lw_shared<expiring_promise<bool>>();
_sync_waiters.push_back(sync_waiter);
co_return co_await sync_waiter->get_future_with_timeout(
return sync_waiter->get_future_with_timeout(
deadline, [] { return false; });
}
_is_catching_up = true;
Expand All @@ -526,14 +526,16 @@ persisted_stm<T>::sync(model::timeout_clock::duration timeout) {
// of the term yet.
sync_offset = log_offsets.dirty_offset;
}
auto is_synced = co_await do_sync(timeout, sync_offset, term);

_is_catching_up = false;
for (auto& sync_waiter : _sync_waiters) {
sync_waiter->set_value(is_synced);
}
_sync_waiters.clear();
co_return is_synced;
return do_sync(timeout, sync_offset, term).then([this](bool is_synced) {
_is_catching_up = false;
for (auto& sync_waiter : _sync_waiters) {
sync_waiter->set_value(is_synced);
}
_sync_waiters.clear();

return is_synced;
});
}

template<supported_stm_snapshot T>
Expand Down

0 comments on commit 59ee771

Please sign in to comment.