From 5a40f2ce3000259c1c47c1c313ae39a8465c0fa9 Mon Sep 17 00:00:00 2001 From: Travis Downs Date: Wed, 21 Feb 2024 15:17:46 -0300 Subject: [PATCH] Implement async_for_each Same as std::for_each, but which yields periodically. Written in a style which keeps in inner loop fast, equivalent to the non-async version (optimizer willing, but seems to be true in the case I checked). For the simplest use cases we offer async_for_each, which is the same as std::for_each except that it yields every Traits::Interval iterations: every call starts the "work counter" anew. For more complicated cases, like nested loops, it may be convenient to carry the counter from one iteration to the next, in order to yield at the correct times. For example, in a doubly nested loop where the inner loop always ran for less than Interval iterations, we would not yield with the simple functions. Issue redpanda-data/core-internal#1061. (cherry picked from commit a4c6b6415401094e08a4f9a7188ce8bd727a02c7) --- src/v/ssx/async_algorithm.h | 218 ++++++++++++++++++++++++++++++++++++ 1 file changed, 218 insertions(+) create mode 100644 src/v/ssx/async_algorithm.h diff --git a/src/v/ssx/async_algorithm.h b/src/v/ssx/async_algorithm.h new file mode 100644 index 000000000000..8b1d819e8ebe --- /dev/null +++ b/src/v/ssx/async_algorithm.h @@ -0,0 +1,218 @@ +/* + * Copyright 2024 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "seastarx.h" + +#include +#include + +#include +#include + +// +// async_algorithms.h +// +// These are implementations of a few algorithms, similar in nature to those in +// , but which are async-friendly in the sense that they yield +// periodically. +// +// For the simplest use cases we offer free functions like async_for_each, which +// is the same as for_each except that it yields every Traits::interval +// iterations: every call starts the counter anew. +// +// For more complicated cases, like nested loops, it may be convenient to carry +// the counter from one iteration to the next, in order to yield at the correct +// times. For example, in a doubly nested loop where the inner loop always ran +// for less than interval iterations, we would not yield with the simple +// functions. + +namespace ssx { + +struct async_counter { + // internal details, don't rely on the values + ssize_t count = 0; +}; + +struct async_algo_traits { + // The number of elements processed before trying to yield + constexpr static ssize_t interval = 100; + // Called every time after we call maybe_yield, useful to + // introspect the behavior in tests. + static void yield_called() {} +}; + +namespace detail { + +// value-semantic counter for when no external counter was passed +struct internal_counter { + ssize_t count; +}; + +// ref-semantic counter for when an external counter was passed +// we need to jump though these semantics since using a ref-counter +// for the internal case would lead to a lifetime issue as the counter +// lives in the stack frame of a non-coroutine and is passed into a +// coroutine, which will UAF on suspension +struct ref_counter { + ssize_t& count; +}; + +template +static ssize_t remaining(const C& c) { + // amount of work we can do before yielding + return std::max(Traits::interval - c.count, (ssize_t)0); +} + +/** + * The fixed cost of just calling the async helper methods: it is + * mostly important that this is non-zero so if you call the counter + * version of the async methods on tons of empty containers, you do + * yield: otherwise we would never yield in this case. + * + */ +constexpr ssize_t FIXED_COST = 1; + +template< + typename Traits, + typename Counter, + typename Fn, + std::random_access_iterator Iterator> +ss::future<> +async_for_each_coro(Counter counter, Iterator begin, Iterator end, Fn f) { + do { + auto chunk_size = std::min(remaining(counter), end - begin); + Iterator chunk_end = begin + chunk_size; + std::for_each(begin, chunk_end, f); + begin = chunk_end; + counter.count += chunk_size; + if (counter.count >= Traits::interval) { + co_await ss::coroutine::maybe_yield(); + counter.count = 0; + Traits::yield_called(); + } + } while (begin != end); + + counter.count += FIXED_COST; +} + +/** + * Helper to combine the internal and external counter implementations. + */ +template< + typename Traits = async_algo_traits, + typename Counter, + typename Fn, + std::random_access_iterator Iterator> +ss::future<> +async_for_each_fast(Counter counter, Iterator begin, Iterator end, Fn f) { + // This first part is an important optimization: if the input range is small + // enough, we don't want to create a coroutine frame as that's costly, so + // this function is not coroutine and we do the whole iteration here (as we + // won't yield), otherwise we defer to the coroutine-based helper. + if (auto total_size = (end - begin) + FIXED_COST; + total_size <= detail::remaining(counter)) { + std::for_each(begin, end, std::move(f)); + counter.count += total_size; + return ss::make_ready_future(); + } + + return async_for_each_coro(counter, begin, end, std::move(f)); +} + +} // namespace detail + +/** + * @brief Call f on every element, yielding occasionally. + * + * This is equivalent to std::for_each, except that the computational + * loop yields every Traits::interval (default 100) iterations in order + * to avoid reactor stalls. The returned future resolves when all elements + * have been processed. + * + * The function is taken by value. + * + * The iterators must remain valid until the returned future resolves. + * + * @param begin the beginning of the range to process + * @param end the end of the range to process + * @param f the function to call on each element + * @return ss::future<> a future which resolves when all elements have been + * processed + */ +template< + typename Traits = async_algo_traits, + typename Fn, + std::random_access_iterator Iterator> +ss::future<> async_for_each(Iterator begin, Iterator end, Fn f) { + return async_for_each_fast( + detail::internal_counter{}, begin, end, std::move(f)); +} + +/** + * @brief Call f on every element, yielding occasionally and accepting + * an externally provided counter for yield control. + * + * This is equivalent to std::for_each, except that the computational + * loop yields every Traits::interval (default 100) iterations in order + * to avoid reactor stalls. The returned future resolves when all elements + * have been processed. + * + * This behaves similarly to async_for_each except that the counter used to + * track how much work as been done since the last attempted yield is passed + * in by the caller. This allows use in more complex cases such as nested loops + * where the inner loop may itself not do sufficient work to ever trigger the + * yield condition, even though the total amount of work done across all + iterations + * is very high. + * + * Use case: + * + * Replace something like: + * + * for (... outer loop ...) { + * for (... inner loop ...) { + * f(elem); + * } + * } + * with: + * + * async_counter counter; + * for (... outer loop ...) { + * co_await async_for_each(counter, begin, end, f); + * } + * + * The counter is taken by reference and must live at least until the + * returned future resolves: this usually trivial when the caller is a + * coroutine but may require some care when continuation style is used. + * + * The function is taken by value. + * + * The iterators must remain valid until the returned future resolves. + * + * @param begin the beginning of the range to process + * @param end the end of the range to process + * @param f the function to call on each element + * @return ss::future<> a future which resolves when all elements have been + * processed + */ +template< + typename Traits = async_algo_traits, + typename Fn, + std::random_access_iterator Iterator> +ss::future<> async_for_each_counter( + async_counter& counter, Iterator begin, Iterator end, Fn f) { + return detail::async_for_each_fast( + detail::ref_counter{counter.count}, begin, end, std::move(f)); +} + +} // namespace ssx