From cc277400226b8e321f0f786f882b5ac25375f921 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Thu, 22 Feb 2024 12:42:31 -0300 Subject: [PATCH] Add forward iterator to async_for_each async_for_each originally assumed random access iterators, but we can also support forward iterators at the cost of some complexity. The main limitation with forward iterators is that we cannot do arithmetic on the iterators to determine the size of the range and to pre-calculate end iterators. So we use counted loops instead when those iterators are used (we dispatch to a separate path for random access iterators). (cherry picked from commit c667749ed2c1ae3f042d20e0c0218217b1096b3c) --- src/v/ssx/async_algorithm.h | 69 ++++++++++++++++++++++++++++--------- 1 file changed, 53 insertions(+), 16 deletions(-) diff --git a/src/v/ssx/async_algorithm.h b/src/v/ssx/async_algorithm.h index 8b1d819e8ebe..5de8c137fe70 100644 --- a/src/v/ssx/async_algorithm.h +++ b/src/v/ssx/async_algorithm.h @@ -82,27 +82,62 @@ static ssize_t remaining(const C& c) { */ constexpr ssize_t FIXED_COST = 1; +template +struct iter_size { + I iter; + ssize_t count; +}; + +/** + * A mix of for_each and for_each_n: iterates from begin to end, or until + * limit elements have been visited, whichever comes first, applying f to + * each element. + * + * Returns the number of elements visited as well as the iterator to the first + * unvisited element. This can be implemented more efficiently with random + * access iterators since we can calculate the exact end iterator up front and + * so do an efficient loop with a single sentinel. The forward iterator version + * must increment an count in the loop and check both end iterator and counter + * as the termination condition. + */ +template +iter_size for_each_limit(const I begin, const I end, ssize_t limit, Fn f) { + auto chunk_size = std::min(limit, end - begin); + I chunk_end = begin + chunk_size; + std::for_each(begin, chunk_end, std::move(f)); + return {chunk_end, chunk_size}; +} + +template +iter_size for_each_limit(const I begin, const I end, ssize_t limit, Fn f) { + ssize_t count = 0; + auto i = begin; + while (i != end && count < limit) { + f(*i); + ++i; + ++count; + } + return {i, count}; +} + template< typename Traits, typename Counter, typename Fn, - std::random_access_iterator Iterator> + std::forward_iterator Iterator> ss::future<> async_for_each_coro(Counter counter, Iterator begin, Iterator end, Fn f) { do { - auto chunk_size = std::min(remaining(counter), end - begin); - Iterator chunk_end = begin + chunk_size; - std::for_each(begin, chunk_end, f); - begin = chunk_end; - counter.count += chunk_size; + auto new_begin = for_each_limit( + begin, end, remaining(counter), f); + begin = new_begin.iter; + counter.count += new_begin.count; if (counter.count >= Traits::interval) { co_await ss::coroutine::maybe_yield(); counter.count = 0; Traits::yield_called(); } } while (begin != end); - - counter.count += FIXED_COST; } /** @@ -112,21 +147,23 @@ template< typename Traits = async_algo_traits, typename Counter, typename Fn, - std::random_access_iterator Iterator> + std::forward_iterator Iterator> ss::future<> async_for_each_fast(Counter counter, Iterator begin, Iterator end, Fn f) { // This first part is an important optimization: if the input range is small // enough, we don't want to create a coroutine frame as that's costly, so // this function is not coroutine and we do the whole iteration here (as we // won't yield), otherwise we defer to the coroutine-based helper. - if (auto total_size = (end - begin) + FIXED_COST; - total_size <= detail::remaining(counter)) { - std::for_each(begin, end, std::move(f)); - counter.count += total_size; + + ssize_t limit = detail::remaining(counter); + auto new_begin = for_each_limit(begin, end, limit, f); + counter.count += new_begin.count + FIXED_COST; + if (new_begin.iter == end && counter.count < Traits::interval) [[likely]] { return ss::make_ready_future(); } - return async_for_each_coro(counter, begin, end, std::move(f)); + return async_for_each_coro( + counter, new_begin.iter, end, std::move(f)); } } // namespace detail @@ -152,7 +189,7 @@ async_for_each_fast(Counter counter, Iterator begin, Iterator end, Fn f) { template< typename Traits = async_algo_traits, typename Fn, - std::random_access_iterator Iterator> + std::forward_iterator Iterator> ss::future<> async_for_each(Iterator begin, Iterator end, Fn f) { return async_for_each_fast( detail::internal_counter{}, begin, end, std::move(f)); @@ -208,7 +245,7 @@ ss::future<> async_for_each(Iterator begin, Iterator end, Fn f) { template< typename Traits = async_algo_traits, typename Fn, - std::random_access_iterator Iterator> + std::forward_iterator Iterator> ss::future<> async_for_each_counter( async_counter& counter, Iterator begin, Iterator end, Fn f) { return detail::async_for_each_fast(