Skip to content

Commit

Permalink
Merge pull request #16689 from travisdowns/td-backport-16676-v23.2.x
Browse files Browse the repository at this point in the history
[v23.2.x] Add forward iterator to async_for_each
  • Loading branch information
travisdowns authored Feb 23, 2024
2 parents 97a38be + cc27740 commit 3c5e552
Showing 1 changed file with 53 additions and 16 deletions.
69 changes: 53 additions & 16 deletions src/v/ssx/async_algorithm.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,27 +82,62 @@ static ssize_t remaining(const C& c) {
*/
constexpr ssize_t FIXED_COST = 1;

template<typename I>
struct iter_size {
I iter;
ssize_t count;
};

/**
* A mix of for_each and for_each_n: iterates from begin to end, or until
* limit elements have been visited, whichever comes first, applying f to
* each element.
*
* Returns the number of elements visited as well as the iterator to the first
* unvisited element. This can be implemented more efficiently with random
* access iterators since we can calculate the exact end iterator up front and
* so do an efficient loop with a single sentinel. The forward iterator version
* must increment an count in the loop and check both end iterator and counter
* as the termination condition.
*/
template<std::random_access_iterator I, typename Fn>
iter_size<I> for_each_limit(const I begin, const I end, ssize_t limit, Fn f) {
auto chunk_size = std::min(limit, end - begin);
I chunk_end = begin + chunk_size;
std::for_each(begin, chunk_end, std::move(f));
return {chunk_end, chunk_size};
}

template<std::forward_iterator I, typename Fn>
iter_size<I> for_each_limit(const I begin, const I end, ssize_t limit, Fn f) {
ssize_t count = 0;
auto i = begin;
while (i != end && count < limit) {
f(*i);
++i;
++count;
}
return {i, count};
}

template<
typename Traits,
typename Counter,
typename Fn,
std::random_access_iterator Iterator>
std::forward_iterator Iterator>
ss::future<>
async_for_each_coro(Counter counter, Iterator begin, Iterator end, Fn f) {
do {
auto chunk_size = std::min(remaining<Traits>(counter), end - begin);
Iterator chunk_end = begin + chunk_size;
std::for_each(begin, chunk_end, f);
begin = chunk_end;
counter.count += chunk_size;
auto new_begin = for_each_limit(
begin, end, remaining<Traits>(counter), f);
begin = new_begin.iter;
counter.count += new_begin.count;
if (counter.count >= Traits::interval) {
co_await ss::coroutine::maybe_yield();
counter.count = 0;
Traits::yield_called();
}
} while (begin != end);

counter.count += FIXED_COST;
}

/**
Expand All @@ -112,21 +147,23 @@ template<
typename Traits = async_algo_traits,
typename Counter,
typename Fn,
std::random_access_iterator Iterator>
std::forward_iterator Iterator>
ss::future<>
async_for_each_fast(Counter counter, Iterator begin, Iterator end, Fn f) {
// This first part is an important optimization: if the input range is small
// enough, we don't want to create a coroutine frame as that's costly, so
// this function is not coroutine and we do the whole iteration here (as we
// won't yield), otherwise we defer to the coroutine-based helper.
if (auto total_size = (end - begin) + FIXED_COST;
total_size <= detail::remaining<Traits>(counter)) {
std::for_each(begin, end, std::move(f));
counter.count += total_size;

ssize_t limit = detail::remaining<Traits>(counter);
auto new_begin = for_each_limit(begin, end, limit, f);
counter.count += new_begin.count + FIXED_COST;
if (new_begin.iter == end && counter.count < Traits::interval) [[likely]] {
return ss::make_ready_future();
}

return async_for_each_coro<Traits>(counter, begin, end, std::move(f));
return async_for_each_coro<Traits>(
counter, new_begin.iter, end, std::move(f));
}

} // namespace detail
Expand All @@ -152,7 +189,7 @@ async_for_each_fast(Counter counter, Iterator begin, Iterator end, Fn f) {
template<
typename Traits = async_algo_traits,
typename Fn,
std::random_access_iterator Iterator>
std::forward_iterator Iterator>
ss::future<> async_for_each(Iterator begin, Iterator end, Fn f) {
return async_for_each_fast<Traits>(
detail::internal_counter{}, begin, end, std::move(f));
Expand Down Expand Up @@ -208,7 +245,7 @@ ss::future<> async_for_each(Iterator begin, Iterator end, Fn f) {
template<
typename Traits = async_algo_traits,
typename Fn,
std::random_access_iterator Iterator>
std::forward_iterator Iterator>
ss::future<> async_for_each_counter(
async_counter& counter, Iterator begin, Iterator end, Fn f) {
return detail::async_for_each_fast<Traits>(
Expand Down

0 comments on commit 3c5e552

Please sign in to comment.