diff --git a/include/seqan3/range/view/async_buffer.hpp b/include/seqan3/range/view/async_buffer.hpp new file mode 100644 index 00000000000..b269ad586f6 --- /dev/null +++ b/include/seqan3/range/view/async_buffer.hpp @@ -0,0 +1,513 @@ +// ----------------------------------------------------------------------------------------------------- +// Copyright (c) 2006-2019, Knut Reinert & Freie Universität Berlin +// Copyright (c) 2016-2019, Knut Reinert & MPI für molekulare Genetik +// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License +// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md +// ----------------------------------------------------------------------------------------------------- + +/*!\file + * \author Hannes Hauswedell + * \brief Provides seqan3::view::async_buffer. + */ + +#pragma once + +#include + +#include +#include +#include +#include +#include +#include +#include + +//----------------------------------------------------------------------------- +// This is the path a value takes when using this view: +// urange +// → async_buffer_view.buffer +// → async_buffer_iterator.cached_value +// → user +//----------------------------------------------------------------------------- + +namespace seqan3::detail +{ + +/*!\brief Adds async_buffer behavior to the underlying range. + * \tparam urng_t The underlying range type. + * \implements std::ranges::InputRange + * \ingroup view + */ +template +class async_buffer_view : public std::ranges::view_interface> +{ +private: + + //!\brief The iterator type for the underlying range. + using urng_iterator_type = std::ranges::iterator_t; + + //!\brief The underlying range. + urng_t urange; + + //!\brief The buffer queue. + contrib::fixed_buffer_queue> buffer; + + //!\brief Iterates over the underlying range and pushes into the buffer until at end or stopped. + void produce() + { + auto it = std::ranges::begin(urange); + auto e = std::ranges::end(urange); + + detail::spin_delay delay{}; + + while ((it != e) && (!stop_producer.load()))// TODO likely + { + auto tmp = std::move(*it); + + while (!stop_producer.load()) + { + auto status = buffer.try_push(std::move(tmp)); + if (status == contrib::queue_op_status::full) // queue was full + delay.wait(); + else // push successful + break; + } + + ++it; // move to next item in source + } + + buffer.close(); + } + + //!\brief Thread that runs #produce in the background. + std::thread producer; + + //!\brief Abort thread before it reaches the end. + std::atomic stop_producer{false}; + + /*!\brief An input_iterator over the associated range. + * \implements std::InputIterator + * \ingroup view + * \tparam view_type The type of the associated type. + * + * This iterator reduces every iterator type of the associated view to an single pass input iterator. + */ + class async_buffer_iterator + { + //!\brief The sentinel type to compare to. + using sentinel_type = std::ranges::default_sentinel_t; + + //!\brief The pointer to the associated view. + contrib::fixed_buffer_queue> * buffer_ptr = nullptr; + + //!\brief The cached value this iterator holds. + value_type_t cached_value; + + //!\brief Whether this iterator is at end (the buffer is empty and closed). + bool at_end = false; + + #ifndef NDEBUG + //!\brief Whether the iterator currently holds a valid value in #cached_value. + bool has_value = false; + #endif + + public: + + /*!\name Associated types + * \{ + */ + //!\brief Difference type. + using difference_type = difference_type_t; + //!\brief Value type. + using value_type = value_type_t; + //!\brief Pointer type. + using pointer = value_type*; + //!\brief Reference type. + using reference = value_type; + //!\brief Iterator category. + using iterator_category = std::input_iterator_tag; + //!\} + + /*!\name Construction, destruction and assignment + * \{ + */ + //!\brief Default constructor caches the firs value. + async_buffer_iterator() noexcept = default; + //!\brief Copy construction. + async_buffer_iterator(async_buffer_iterator const & rhs) noexcept = default; //TODO delete + //!\brief Move construction. + async_buffer_iterator(async_buffer_iterator && rhs) noexcept = default; + //!\brief Copy assignment. + async_buffer_iterator & operator=(async_buffer_iterator const & rhs) noexcept = default; //TODO delete + //!\brief Move assignment. + async_buffer_iterator & operator=(async_buffer_iterator && rhs) noexcept = default; + //!\brief Destruction. + ~async_buffer_iterator() noexcept = default; + + //!\brief Constructing from the underlying seqan3::async_buffer_view. + async_buffer_iterator(contrib::fixed_buffer_queue> & buffer) noexcept : + buffer_ptr{&buffer} + { + ++(*this); // cache first value + } + //!\} + + /*!\name Access operations + * \{ + */ + //!\brief Dereferences the cached iterator. + reference operator*() noexcept + { + #ifndef NDEBUG + assert (has_value); + has_value = false; + #endif + return std::move(cached_value); + } + + //!\brief Returns pointer to the pointed-to object. + pointer operator->() const noexcept + { + return std::addressof(cached_value); + } + //!\} + + /*!\name Iterator operations + * \{ + */ + //!\brief Pre-increment. + async_buffer_iterator & operator++() noexcept + { + if (at_end) // TODO unlikely + return *this; + + detail::spin_delay delay{}; + + assert (buffer_ptr != nullptr); + + while (true) + { + switch (contrib::queue_op_status s = buffer_ptr->try_pop(cached_value); s) + { + case contrib::queue_op_status::closed: + at_end = true; + return *this; + case contrib::queue_op_status::success: + /* cached_value was updated */ + #ifndef NDEBUG + has_value = true; + #endif + return *this; + default: + assert(s == contrib::queue_op_status::empty); + break; // loop again + } + + delay.wait(); // pause and then try again. + } + + // TODO [[unreachable]] + return *this; + } + + //!\brief Post-increment. + void operator++(int) noexcept + { + ++(*this); + } + //!\} + + /*!\name Comparison operators + * \{ + */ + //!\brief Compares for equality with sentinel. + friend constexpr bool operator==(async_buffer_iterator const & lhs, + std::ranges::default_sentinel_t const &) noexcept + { + return lhs.at_end; + } + + //!\copydoc operator== + friend constexpr bool operator==(std::ranges::default_sentinel_t const &, + async_buffer_iterator const & rhs) noexcept + { + return rhs == std::ranges::default_sentinel_t{}; + } + + //!\brief Compares for inequality with sentinel. + friend constexpr bool operator!=(async_buffer_iterator const & lhs, + std::ranges::default_sentinel_t const &) noexcept + { + return !(lhs == std::ranges::default_sentinel_t{}); + } + + //!\copydoc operator!= + friend constexpr bool operator!=(std::ranges::default_sentinel_t const &, + async_buffer_iterator const & rhs) noexcept + { + return rhs != std::ranges::default_sentinel_t{}; + } + //!\} + }; + +public: + /*!\name Constructor, destructor, and assignment. + * \{ + * \brief All standard functions are explicitly defaulted. + */ + //!\brief Default default-constructor. + async_buffer_view() = default; + //!\brief Default copy-constructor. + async_buffer_view(async_buffer_view const &) = default; //TODO delete + //!\brief Default move-constructor. + async_buffer_view(async_buffer_view &&) = default; + //!\brief Default copy-assignment. + async_buffer_view & operator=(async_buffer_view const &) = default; //TODO delete + //!\brief Default move-assignment + async_buffer_view & operator=(async_buffer_view &&) = default; + //!\brief Tell thread to stop and wait for it. + ~async_buffer_view() + { + stop_producer.store(true); + producer.join(); + } + + //!\brief Construction from the underlying view. + async_buffer_view(urng_t _urng, size_t const buffer_size) : + urange{std::move(_urng)}, buffer{buffer_size}, producer{&async_buffer_view::produce, this} + {} + + //!\brief Construction from std::ranges::ViewableRange. + template + //!\cond + requires !std::Same, async_buffer_view> && + std::ranges::ViewableRange && // Must come after self type check to avoid conflicts with the move constructor. + std::Constructible>> + //!\endcond + async_buffer_view(other_urng_t && _urng, size_t const buffer_size) : + async_buffer_view{std::view::all(_urng), buffer_size} + {} + //!\} + + /*!\name Iterators + * \{ + */ + /*!\brief Returns an iterator to the current begin of the underlying range. + * + * \details + * + * ### Thread-Safety + * + * It is thread-safe to call this function. Subsequent calls to begin will result in different + * iterators that are each valid individually. It is thread-safe to operate on different iterators + * from different threads (however it is not thread-safe to operate on a single iterator from different + * threads). + */ + async_buffer_iterator begin() + { + return {buffer}; + } + + //!\brief Const version of begin is deleted, since the underlying view_state must be mutable. + async_buffer_iterator begin() const = delete; + + //!\copydoc async_buffer_view::begin() const + async_buffer_iterator cbegin() const = delete; + + //!\brief Returns a sentinel. + std::ranges::default_sentinel_t end() + { + return std::ranges::default_sentinel; + } + + //!\brief Const version of end is deleted, since the underlying view_state must be mutable. + std::ranges::default_sentinel_t end() const = delete; + + //!\copydoc async_buffer_view::end() const + std::ranges::default_sentinel_t cend() const = delete; + //!\} +}; + +/*!\name Deduction guide. + * \relates seqan3::detail::async_buffer_view + * \{ + */ + +//!\brief Deduces the async_buffer_view from the underlying range if it is a std::ranges::ViewableRange. +template +async_buffer_view(urng_t &&, size_t const buffer_size) -> async_buffer_view>; +//!\} + +} // seqan3::detail + +namespace seqan3::detail +{ + +// ============================================================================ +// async_buffer_fn (adaptor definition +// ============================================================================ + +//!\brief Definition of the range adaptor object type for seqan3::view::async_buffer. +struct async_buffer_fn +{ + //!\brief Store the argument and return a range adaptor closure object. + constexpr auto operator()(size_t const buffer_size) const + { + return detail::adaptor_from_functor{*this, buffer_size}; + } + + /*!\brief Directly return an instance of the view, initialised with the given parameters. + * \param[in] urange The underlying range. + * \param[in] buffer_size The frame that should be used for translation. + * \returns A range of translated sequence(s). + */ + template + constexpr auto operator()(urng_t && urange, size_t const buffer_size) const + { + static_assert(std::ranges::ViewableRange, + "The range parameter to view::async_buffer cannot be a temporary of a non-view range."); + static_assert(std::Movable>, + "The range parameter to view::async_buffer must have a value_type that is std::Movable."); + + if (buffer_size == 0) + throw std::invalid_argument{"The buffer_size parameter to view::async_buffer must be > 0."}; + + return detail::async_buffer_view{std::forward(urange), buffer_size}; + } +}; + +} // seqan3::detail + +//----------------------------------------------------------------------------- +// View shortcut for functor. +//----------------------------------------------------------------------------- + +namespace seqan3::view +{ +/*!\name General purpose views + * \{ + */ + +/*!\brief A view adapter that returns a concurrent-queue-like view over the underlying range. + * \tparam urng_t The type of the range being processed. See below for requirements. + * \param[in,out] urange The range being processed. + * \param[in] buffer_size Size of the buffer. Choose the size depending on the expected work per element. + * \returns A view that pre-fetches elements from the underlying range and provides a thread-safe interface. + * See below for the properties of the returned range. + * \ingroup view + * + * \details + * + * **Header** + * ```cpp + * #include + * ``` + * + * This view spawns a background thread that pre-fetches elements from the underlying range and stores them in a + * concurrent queue. Iterating over this view then pops elements out of the queue and returns them by value. + * This is primarily useful if dereferencing/incrementing the iterator of the underlying range + * is expensive, e.g. with SeqAn files that lazily perform I/O. + * + * Another advantage of this view is that multiple iterators can be created that are safe to iterate individually, + * even from different threads, i.e. you can use multiple threads to iterate safely over a single-pass input view + * with the added benefit of background pre-fetching. + * + * In technical terms: this view facilitates a single-producer, multi-consumer design; it's a range interface over + * a concurrent queue. + * + * The `buffer_size` parameter should be chosen depending on the expected work per element, e.g. if the underlying + * range is an input file over short reads, a buffer size of 100 or 1000 could be beneficial; if on the other hand + * the file contains genome-sized sequences, it would be better to buffer only a single sequence (buffering 100 + * sequences would result in the entire file being preloaded and likely consuming significant memory). + * + * \note This view always moves elements from the underlying range into its buffer which means that the elements in + * the underlying range will be invalidated! For underlying ranges that are single-pass this make no difference, but + * it might be unexpected for multi-pass ranges. + * + * ### View properties + * + * | concepts and reference type | `urng_t` (underlying range type) | `rrng_t` (returned range type) | + * |---------------------------------|:-------------------------------------:|:--------------------------------------------------:| + * | std::Semiregular | | *lost* | + * | | | | + * | std::ranges::InputRange | *required* | *preserved* | + * | std::ranges::ForwardRange | | *lost* | + * | std::ranges::BidirectionalRange | | *lost* | + * | std::ranges::RandomAccessRange | | *lost* | + * | std::ranges::ContiguousRange | | *lost* | + * | | | | + * | std::ranges::ViewableRange | *required* | *guaranteed* | + * | std::ranges::View | | *guaranteed* | + * | std::ranges::SizedRange | | *lost* | + * | std::ranges::CommonRange | | *lost* | + * | std::ranges::OutputRange | | *lost* | + * | seqan3::ConstIterableRange | | *lost* | + * | | | | + * | seqan3::reference_t | | seqan3::value_type_t | + * + * See the \link view view submodule documentation \endlink for detailed descriptions of the view properties. + * + * ### Thread safety + * + * The following operations are **thread-safe**: + * + * * calling `.begin()` and `.end()` on the view returned by this adaptor; + * * calling operators on the different iterators. + * + * Calling operators on the same iterator from different threads is not safe, i.e. you can pass the view + * to different threads by reference, and have each of those threads call `begin()` on the view and then + * perform operations (dereference, increment...) on that iterator from the respective thread; but you + * cannot call `begin()` in a parent thread, pass the iterator to different threads and operate on that + * concurrently. + * + * ### Example + * + * \include test/snippet/range/view/async_buffer.cpp + * + * Running the snippet could yield the following output: + * + * ``` + * Thread: 0x80116bf00 Seq: seq2 + * Thread: 0x80116bf00 Seq: seq3 + * Thread: 0x80116ba00 Seq: seq1 + * Thread: 0x80116bf00 Seq: seq4 + * Thread: 0x80116bf00 Seq: seq6 + * Thread: 0x80116ba00 Seq: seq5 + * Thread: 0x80116bf00 Seq: seq7 + * Thread: 0x80116ba00 Seq: seq8 + * Thread: 0x80116bf00 Seq: seq9 + * Thread: 0x80116bf00 Seq: seq11 + * Thread: 0x80116bf00 Seq: seq12 + * Thread: 0x80116ba00 Seq: seq10 + * ``` + * This shows that indeed elements from the underlying range are processed non-sequentially, that there are two threads + * and that work is "balanced" between them (one thread processed more element than the other, because its "work" + * per item happened to be smaller). + * + * Note that you might encounter jumbled output if by chance two threads write to the stream at the exact same time. + * + * If you remove the line with `auto f1 = ...` you will get sequential processing: + * ``` + * Thread: 0x80116aa00 Seq: seq1 + * Thread: 0x80116aa00 Seq: seq2 + * Thread: 0x80116aa00 Seq: seq3 + * Thread: 0x80116aa00 Seq: seq4 + * Thread: 0x80116aa00 Seq: seq5 + * Thread: 0x80116aa00 Seq: seq6 + * Thread: 0x80116aa00 Seq: seq7 + * Thread: 0x80116aa00 Seq: seq8 + * Thread: 0x80116aa00 Seq: seq9 + * Thread: 0x80116aa00 Seq: seq10 + * Thread: 0x80116aa00 Seq: seq11 + * Thread: 0x80116aa00 Seq: seq12 + * ``` + * + * Note that even if you have a single processing thread, using this view might still improve performance measurably, + * because loading of the elements into the buffer (which reads input from disk) happens in a background thread. + * + * \hideinitializer + */ +inline constexpr auto async_buffer = detail::async_buffer_fn{}; + +//!\} +} // namespace seqan3::view diff --git a/test/snippet/range/view/async_buffer.cpp b/test/snippet/range/view/async_buffer.cpp new file mode 100644 index 00000000000..7760c982db9 --- /dev/null +++ b/test/snippet/range/view/async_buffer.cpp @@ -0,0 +1,67 @@ +#include // std::rand +#include // std::async +#include // std::string +#include // seqan3::debug_stream +#include // seqan3::sequence_file_input +#include // seqan3::view::async_buffer + +using namespace seqan3; + +std::string fasta_file = +R"(> seq1 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq2 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq3 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq4 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq5 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq6 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq7 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq8 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq9 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq10 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq11 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +> seq12 +ACGACTACGACGATCATCGATCGATCGATCGATCGATCGATCGATCGTACTACGATCGATCG +)"; + +int main() +{ + // initialise random number generator, only needed for demonstration purposes + std::srand(std::time(nullptr)); + + // create an input file from the string above + seqan3::sequence_file_input fin{std::istringstream{fasta_file}, seqan3::format_fasta{}}; + + // create the async buffer around the input file + // spawns a background thread that tries to keep four records in the buffer + auto v = fin | seqan3::view::async_buffer(4); + + // create a lambda function that iterates over the async buffer when called + // (the buffer gets dynamically refilled as soon as empty) + auto l = [&v] () + { + for (auto r : v) + { + // pretend we are doing some work + std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 1000)); + // print current thread and sequence ID + seqan3::debug_stream << "Thread: " << std::this_thread::get_id() << '\t' + << "Seq: " << seqan3::get(r) << '\n'; + + } + }; + + // launch two threads and pass the lambda function to both + auto f0 = std::async(std::launch::async, l); + auto f1 = std::async(std::launch::async, l); +} diff --git a/test/unit/range/view/CMakeLists.txt b/test/unit/range/view/CMakeLists.txt index 693984768cf..813273889e4 100644 --- a/test/unit/range/view/CMakeLists.txt +++ b/test/unit/range/view/CMakeLists.txt @@ -2,6 +2,7 @@ add_subdirectories() seqan3_test(adaptor_base_test.cpp) seqan3_test(view_all_test.cpp) +seqan3_test(view_async_buffer_test.cpp) seqan3_test(view_char_to_test.cpp) seqan3_test(view_complement_test.cpp) seqan3_test(view_convert_test.cpp) diff --git a/test/unit/range/view/view_async_buffer_test.cpp b/test/unit/range/view/view_async_buffer_test.cpp new file mode 100644 index 00000000000..973d8520f3f --- /dev/null +++ b/test/unit/range/view/view_async_buffer_test.cpp @@ -0,0 +1,105 @@ +// ----------------------------------------------------------------------------------------------------- +// Copyright (c) 2006-2019, Knut Reinert & Freie Universität Berlin +// Copyright (c) 2016-2019, Knut Reinert & MPI für molekulare Genetik +// This file may be used, modified and/or redistributed under the terms of the 3-clause BSD-License +// shipped with this file and also available at: https://github.com/seqan/seqan3/blob/master/LICENSE.md +// ----------------------------------------------------------------------------------------------------- + +#include + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "../iterator_test_template.hpp" + +using namespace seqan3; + +using iterator_type = std::ranges::iterator_t&>() | view::async_buffer(3))>; + +template <> +struct iterator_fixture : public ::testing::Test +{ + using iterator_tag = std::input_iterator_tag; + static constexpr bool const_iterable = false; + + std::vector vec{"ACGTACGTACGTATCGAGAGCTTTAGC"_dna4}; + std::vector expected_range{"ACGTACGTACGTATCGAGAGCTTTAGC"_dna4}; + decltype(view::async_buffer(vec, 3)) test_range = view::async_buffer(vec, 3); +}; + +using test_type = ::testing::Types; +INSTANTIATE_TYPED_TEST_CASE_P(iterator_fixture, iterator_fixture, test_type); + + +TEST(async_buffer, in_out) +{ + std::vector vec{"ACGTACGTACGTATCGAGAGCTTTAGC"_dna4}; + + auto v = vec | view::async_buffer(3); + + EXPECT_TRUE(std::ranges::equal(vec, v)); +} + +TEST(async_buffer, buffer_size_zero) +{ + std::vector vec{"ACGTACGTACGTATCGAGAGCTTTAGC"_dna4}; + + EXPECT_THROW(vec | view::async_buffer(0), std::invalid_argument); +} + +TEST(async_buffer, buffer_size_huge) +{ + std::vector vec{"ACGTACGTACGTATCGAGAGCTTTAGC"_dna4}; + + auto v = vec | view::async_buffer(100000); + + EXPECT_TRUE(std::ranges::equal(vec, v)); +} + +TEST(async_buffer, destruct_with_full_buffer) +{ + std::vector vec{"ACGTACGTACGTATCGAGAGCTTTAGC"_dna4}; + + auto v0 = vec | view::single_pass_input; + + { + auto v1 = v0 | view::async_buffer(5); + + // consume five elements (construction already consumes one) + auto b = std::ranges::begin(v1); + ++b; ++b; ++b; ++b; + + // give time to rebuffer next five elements + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + } // thread sync at destruction of v1; tests working destruction with full buffer + + // check that no over-consumption: + EXPECT_TRUE(std::ranges::equal(v0, "TATCGAGAGCTTTAGC"_dna4)); // total of 10 chars consumed + //TODO: actually 11 because queue buffers one more element that requested :o +} + +//TODO: add combinability tests once the class models View. + +TEST(async_buffer, concepts) +{ + std::vector vec; + + auto v1 = vec | view::async_buffer(1); + + EXPECT_TRUE(std::ranges::InputRange); + EXPECT_FALSE(std::ranges::ForwardRange); + EXPECT_FALSE(std::ranges::RandomAccessRange); + EXPECT_FALSE(std::ranges::SizedRange); + EXPECT_FALSE(ConstIterableRange); + + EXPECT_FALSE(std::Semiregular); + EXPECT_FALSE(std::ranges::View); // ← should become true once View is relaxed +}