From 80f96fbdae1dd5e9c199963e227a3d54ea067e24 Mon Sep 17 00:00:00 2001 From: Lucas Czech Date: Mon, 3 Jun 2024 17:30:49 +0200 Subject: [PATCH] Add Window Stream begin and end callbacks --- .../population/window/base_window_stream.hpp | 136 +++++++++++++++++- .../population/window/window_view_stream.hpp | 12 +- .../utils/containers/generic_input_stream.hpp | 6 +- .../src/population/interval_window_stream.cpp | 44 ++++++ test/src/population/window_view_stream.cpp | 71 +++++++++ 5 files changed, 258 insertions(+), 11 deletions(-) diff --git a/lib/genesis/population/window/base_window_stream.hpp b/lib/genesis/population/window/base_window_stream.hpp index d96d44bd..c689cfbf 100644 --- a/lib/genesis/population/window/base_window_stream.hpp +++ b/lib/genesis/population/window/base_window_stream.hpp @@ -208,12 +208,31 @@ class BaseWindowStream // Iterator() = delete; - Iterator( std::unique_ptr base_iterator ) - : pimpl_( std::move( base_iterator )) + Iterator( + BaseWindowStream const* parent, + std::unique_ptr base_iterator + ) + : base_parent_( parent ) + , pimpl_( std::move( base_iterator )) { // Observe the first element, if this is an active iterator. assert( pimpl_ ); + + // If we are here from an end iterator, we do nothing. + if( !base_parent_ ) { + return; + } + assert( base_parent_ ); + + // Before starting to init anything, call the callbacks. + execute_begin_callbacks_(); execute_on_enter_observers_(); + + // Special case: no data. Need to execute the end callback as well. + assert( pimpl_ ); + if( !pimpl_->get_parent_() ) { + execute_end_callbacks_(); + } } public: @@ -224,14 +243,26 @@ class BaseWindowStream Iterator( Iterator const& ) = delete; Iterator( Iterator&& other ) { + // Copy the base parent pointer, as that instance stays the same. + // We however need to move the pimpl, as we own it. + base_parent_ = other.base_parent_; pimpl_ = std::move( other.pimpl_ ); + + // Now reset the other, so that in case of a bug here, we induce a segfault + // instead of accidental undefined behaviour when accessing the old data. + other.base_parent_ = nullptr; other.pimpl_ = nullptr; } Iterator& operator= ( Iterator const& ) = delete; Iterator& operator= ( Iterator&& other ) { + // Same as above. + // Probably we should follow the rule of five more closely here though, + // and implement one in terms of the other... + base_parent_ = other.base_parent_; pimpl_ = std::move( other.pimpl_ ); + other.base_parent_ = nullptr; other.pimpl_ = nullptr; return *this; } @@ -308,16 +339,29 @@ class BaseWindowStream self_type& operator ++() { + assert( pimpl_ ); + assert( base_parent_ ); + // Advance to the next element, and observe it. // This is the only place that we need to call the leaving observers: // During normal iteration, this works fine anyway. Then, the last iteration // will first call the leaving oberservers, then try to move to the next window, // but find that there is none, and finish the iteration. By that time, the // leaving oberserver has been called correclty already, so nothing else to do. - assert( pimpl_ ); + execute_on_leave_observers_(); pimpl_->increment_(); + + // Now that we are at the new element, we execute the enter observers. + // If we instead reached the end of the input though, that one will do nothing. + // In that case however, we want to execute the end callbacks instead. execute_on_enter_observers_(); + + assert( pimpl_ ); + assert( base_parent_ ); + if( !pimpl_->get_parent_() ) { + execute_end_callbacks_(); + } return *this; } @@ -384,12 +428,36 @@ class BaseWindowStream } } + void execute_begin_callbacks_() const + { + assert( base_parent_ ); + for( auto const& cb : base_parent_->begin_callbacks_ ) { + cb(); + } + } + + void execute_end_callbacks_() const + { + assert( base_parent_ ); + for( auto const& cb : base_parent_->end_callbacks_ ) { + cb(); + } + } + // ------------------------------------------------------------------------- // PIMPL-like Implementation Abstraction // ------------------------------------------------------------------------- private: + // Parent. This is only the base class, and hence cannot be used by the derived classes. + // We need it here for the begin and end callbacks only, as those need to be run + // indepentently of the derived classes. + BaseWindowStream const* base_parent_ = nullptr; + + // Implementation of the derived iterator. This is where all the logic for the actual + // window iteration lives. We use the pimpl idiom here so that this class here does + // not need to expose derived interfaces. std::unique_ptr pimpl_; }; @@ -618,6 +686,60 @@ class BaseWindowStream return *this; } + /** + * @brief Add a callback function that is executed when beginning the iteration. + * + * Similar to the functionality offered by the observers, this could also be achieved by + * executing these functions direclty where needed, but having it as a callback here helps + * to reduce code duplication. + * + * See also add_end_callback(). + */ + self_type& add_begin_callback( std::function const& callback ) + { + if( started_ ) { + throw std::runtime_error( + "Window Stream: Cannot change callbacks after iteration has started." + ); + } + begin_callbacks_.push_back( callback ); + return *this; + } + + /** + * @brief Add a callback function that is executed when the end of the iteration is reached. + * + * This is similar to the add_begin_callback() functionality, but instead of executing the + * callback when starting the iteration, it is called when ending it. Again, this is meant + * as a means to reduce user code duplication, for example for logging needs. + */ + self_type& add_end_callback( std::function const& callback ) + { + if( started_ ) { + throw std::runtime_error( + "Window Stream: Cannot change callbacks after iteration has started." + ); + } + end_callbacks_.push_back( callback ); + return *this; + } + + /** + * @brief Clear all functions that have been added via add_begin_callback() and + * add_end_callback(). + */ + self_type& clear_callbacks() + { + if( started_ ) { + throw std::runtime_error( + "Window Stream: Cannot change callbacks after iteration has started." + ); + } + begin_callbacks_.clear(); + end_callbacks_.clear(); + return *this; + } + // ------------------------------------------------------------------------- // Iteration // ------------------------------------------------------------------------- @@ -631,12 +753,12 @@ class BaseWindowStream ); } started_ = true; - return Iterator( get_begin_iterator_() ); + return Iterator( this, get_begin_iterator_() ); } Iterator end() { - return Iterator( get_end_iterator_() ); + return Iterator( nullptr, get_end_iterator_() ); } // ------------------------------------------------------------------------- @@ -679,6 +801,10 @@ class BaseWindowStream std::vector> on_enter_observers_; std::vector> on_leave_observers_; + // We furthermore allow callbacks for the beginning and and of the iteration. + std::vector> begin_callbacks_; + std::vector> end_callbacks_; + }; } // namespace population diff --git a/lib/genesis/population/window/window_view_stream.hpp b/lib/genesis/population/window/window_view_stream.hpp index e7cab155..af642dda 100644 --- a/lib/genesis/population/window/window_view_stream.hpp +++ b/lib/genesis/population/window/window_view_stream.hpp @@ -19,9 +19,9 @@ along with this program. If not, see . Contact: - Lucas Czech - Department of Plant Biology, Carnegie Institution For Science - 260 Panama Street, Stanford, CA 94305, USA + Lucas Czech + University of Copenhagen, Globe Institute, Section for GeoGenetics + Oster Voldgade 5-7, 1350 Copenhagen K, Denmark */ /** @@ -171,6 +171,12 @@ class WindowViewStream final : public BaseWindowStream< current_ = parent_->window_stream_->begin(); end_ = parent_->window_stream_->end(); + // Edge case for empty data. + if( current_ == end_ ) { + parent_ = nullptr; + return; + } + // Start a view into the first window. This creates a view that mirrors the underlying // window, and iteratres through it, using the WindowView constructor that takes a Window. window_view_ = WindowViewType{ *current_ }; diff --git a/lib/genesis/utils/containers/generic_input_stream.hpp b/lib/genesis/utils/containers/generic_input_stream.hpp index f66d218c..2d6a0aea 100644 --- a/lib/genesis/utils/containers/generic_input_stream.hpp +++ b/lib/genesis/utils/containers/generic_input_stream.hpp @@ -1031,14 +1031,14 @@ class GenericInputStream } /** - * @brief Add a callback function that is executed when the begin() function is called. + * @brief Add a callback function that is executed when beginning the iteration. * * The callback needs to accept the GenericInputStream object itself, as a means to, for example, * access its data(), and is meant as a reporting mechanism. For example, callbacks can be added * that write properties of the underlying data sources to log. They are executed in the order * added. * - * Similar to the functionality offered by add_observer(), this could also be achieved by + * Similar to the functionality offered by the observers, this could also be achieved by * executing these functions direclty where needed, but having it as a callback here helps * to reduce code duplication. * @@ -1056,7 +1056,7 @@ class GenericInputStream } /** - * @brief Add a callback function that is executed when the end() of the iteration is reached. + * @brief Add a callback function that is executed when the end of the iteration is reached. * * This is similar to the add_begin_callback() functionality, but instead of executing the * callback when starting the iteration, it is called when ending it. Again, this is meant diff --git a/test/src/population/interval_window_stream.cpp b/test/src/population/interval_window_stream.cpp index fc87bc17..ca1ee5bf 100644 --- a/test/src/population/interval_window_stream.cpp +++ b/test/src/population/interval_window_stream.cpp @@ -84,6 +84,18 @@ void test_sliding_interval_stream_( WindowStream& win_it ) ++leave_observe_cnt; }); + // Same for the callbacks + size_t begin_cnt = 0; + size_t end_cnt = 0; + win_it.add_begin_callback( [&begin_cnt](){ + // LOG_DBG << "begin"; + ++begin_cnt; + }); + win_it.add_end_callback( [&end_cnt](){ + // LOG_DBG << "end"; + ++end_cnt; + }); + size_t window_cnt = 0; for( auto it = win_it.begin(); it != win_it.end(); ++it ) { auto const& window = *it; @@ -117,6 +129,8 @@ void test_sliding_interval_stream_( WindowStream& win_it ) EXPECT_EQ( 7, window_cnt ); EXPECT_EQ( 7, enter_observe_cnt ); EXPECT_EQ( 7, leave_observe_cnt ); + EXPECT_EQ( 1, begin_cnt ); + EXPECT_EQ( 1, end_cnt ); EXPECT_TRUE( found_first_win ); EXPECT_TRUE( found_last_win ); @@ -190,7 +204,9 @@ TEST( WindowStream, SlidingIntervalLambda ) void run_sliding_interval_window_view_variant_test_( VariantWindowViewStream& win_it ) { size_t window_cnt = 0; + // LOG_DBG << "go go go"; for( auto it = win_it.begin(); it != win_it.end(); ++it ) { + // LOG_DBG << "=========="; ++window_cnt; } EXPECT_EQ( 7, window_cnt ); @@ -228,10 +244,24 @@ TEST( WindowStream, SlidingIntervalWindowView ) ++leave_observe_cnt; }); + // Same for the callbacks + size_t begin_cnt = 0; + size_t end_cnt = 0; + win_it.add_begin_callback( [&begin_cnt](){ + // LOG_DBG << "begin"; + ++begin_cnt; + }); + win_it.add_end_callback( [&end_cnt](){ + // LOG_DBG << "end"; + ++end_cnt; + }); + // We use a test function that takes our abstract type, to see if we set this up correctly. run_sliding_interval_window_view_variant_test_( win_it ); EXPECT_EQ( 7, enter_observe_cnt ); EXPECT_EQ( 7, leave_observe_cnt ); + EXPECT_EQ( 1, begin_cnt ); + EXPECT_EQ( 1, end_cnt ); // test_sliding_interval_stream_( win_it ); // size_t window_cnt = 0; @@ -269,6 +299,18 @@ TEST( WindowStream, SlidingIntervalEmpty ) ++leave_observe_cnt; }); + // Same for the callbacks + size_t begin_cnt = 0; + size_t end_cnt = 0; + win_it.add_begin_callback( [&begin_cnt](){ + // LOG_DBG << "begin"; + ++begin_cnt; + }); + win_it.add_end_callback( [&end_cnt](){ + // LOG_DBG << "end"; + ++end_cnt; + }); + size_t window_cnt = 0; for( auto it = win_it.begin(); it != win_it.end(); ++it ) { @@ -285,4 +327,6 @@ TEST( WindowStream, SlidingIntervalEmpty ) EXPECT_EQ( 0, window_cnt ); EXPECT_EQ( 0, enter_observe_cnt ); EXPECT_EQ( 0, leave_observe_cnt ); + EXPECT_EQ( 1, begin_cnt ); + EXPECT_EQ( 1, end_cnt ); } diff --git a/test/src/population/window_view_stream.cpp b/test/src/population/window_view_stream.cpp index 8f007d0a..90de3811 100644 --- a/test/src/population/window_view_stream.cpp +++ b/test/src/population/window_view_stream.cpp @@ -47,6 +47,63 @@ using namespace genesis::population; using namespace genesis::utils; +TEST( WindowStream, EmptyWindowViewStream ) +{ + // Make a Generic Input Stream over the data stream. + std::vector data; + auto data_gen = make_variant_input_stream_from_vector( data ); + + // Create a window iterator based on the Generic Input Stream. + auto win_it = make_window_view_stream( + make_default_interval_window_stream( + data_gen.begin(), data_gen.end(), 10000 + ) + ); + + // Also test that the observer functions get executed once per window. + size_t enter_observe_cnt = 0; + size_t leave_observe_cnt = 0; + win_it.add_on_enter_observer( [&enter_observe_cnt]( WindowView const& ){ + // LOG_DBG << "enter at " << enter_observe_cnt; + ++enter_observe_cnt; + }); + win_it.add_on_leave_observer( [&leave_observe_cnt]( WindowView const& ){ + // LOG_DBG << "leave at " << leave_observe_cnt; + ++leave_observe_cnt; + }); + + // Same for the callbacks + size_t begin_cnt = 0; + size_t end_cnt = 0; + win_it.add_begin_callback( [&begin_cnt](){ + // LOG_DBG << "begin"; + ++begin_cnt; + }); + win_it.add_end_callback( [&end_cnt](){ + // LOG_DBG << "end"; + ++end_cnt; + }); + + size_t window_cnt = 0; + size_t total_cnt = 0; + for( auto it = win_it.begin(); it != win_it.end(); ++it ) { + auto const& window = *it; + LOG_DBG << "loop"; + + for( auto const& elem : window ) { + (void) elem; + ++total_cnt; + } + ++window_cnt; + } + EXPECT_EQ( 0, window_cnt ); + EXPECT_EQ( 0, enter_observe_cnt ); + EXPECT_EQ( 0, leave_observe_cnt ); + EXPECT_EQ( 1, begin_cnt ); + EXPECT_EQ( 1, end_cnt ); + EXPECT_EQ( 0, total_cnt ); +} + TEST( WindowStream, WindowViewStream ) { // Skip test if no data availabe. @@ -80,6 +137,18 @@ TEST( WindowStream, WindowViewStream ) ++leave_observe_cnt; }); + // Same for the callbacks + size_t begin_cnt = 0; + size_t end_cnt = 0; + win_it.add_begin_callback( [&begin_cnt](){ + // LOG_DBG << "begin"; + ++begin_cnt; + }); + win_it.add_end_callback( [&end_cnt](){ + // LOG_DBG << "end"; + ++end_cnt; + }); + size_t window_cnt = 0; size_t total_cnt = 0; for( auto it = win_it.begin(); it != win_it.end(); ++it ) { @@ -94,6 +163,8 @@ TEST( WindowStream, WindowViewStream ) EXPECT_EQ( 7, window_cnt ); EXPECT_EQ( 7, enter_observe_cnt ); EXPECT_EQ( 7, leave_observe_cnt ); + EXPECT_EQ( 1, begin_cnt ); + EXPECT_EQ( 1, end_cnt ); EXPECT_EQ( 50000, total_cnt ); }