Skip to content

Commit

Permalink
Add Window Stream begin and end callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
lczech committed Jun 3, 2024
1 parent dbc1fa0 commit 80f96fb
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 11 deletions.
136 changes: 131 additions & 5 deletions lib/genesis/population/window/base_window_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,12 +208,31 @@ class BaseWindowStream

// Iterator() = delete;

Iterator( std::unique_ptr<BaseIterator> base_iterator )
: pimpl_( std::move( base_iterator ))
Iterator(
BaseWindowStream const* parent,
std::unique_ptr<BaseIterator> base_iterator
)
: base_parent_( parent )
, pimpl_( std::move( base_iterator ))
{
// Observe the first element, if this is an active iterator.
assert( pimpl_ );

// If we are here from an end iterator, we do nothing.
if( !base_parent_ ) {
return;
}
assert( base_parent_ );

// Before starting to init anything, call the callbacks.
execute_begin_callbacks_();
execute_on_enter_observers_();

// Special case: no data. Need to execute the end callback as well.
assert( pimpl_ );
if( !pimpl_->get_parent_() ) {
execute_end_callbacks_();
}
}

public:
Expand All @@ -224,14 +243,26 @@ class BaseWindowStream
Iterator( Iterator const& ) = delete;
Iterator( Iterator&& other )
{
// Copy the base parent pointer, as that instance stays the same.
// We however need to move the pimpl, as we own it.
base_parent_ = other.base_parent_;
pimpl_ = std::move( other.pimpl_ );

// Now reset the other, so that in case of a bug here, we induce a segfault
// instead of accidental undefined behaviour when accessing the old data.
other.base_parent_ = nullptr;
other.pimpl_ = nullptr;
}

Iterator& operator= ( Iterator const& ) = delete;
Iterator& operator= ( Iterator&& other )
{
// Same as above.
// Probably we should follow the rule of five more closely here though,
// and implement one in terms of the other...
base_parent_ = other.base_parent_;
pimpl_ = std::move( other.pimpl_ );
other.base_parent_ = nullptr;
other.pimpl_ = nullptr;
return *this;
}
Expand Down Expand Up @@ -308,16 +339,29 @@ class BaseWindowStream

self_type& operator ++()
{
assert( pimpl_ );
assert( base_parent_ );

// Advance to the next element, and observe it.
// This is the only place that we need to call the leaving observers:
// During normal iteration, this works fine anyway. Then, the last iteration
// will first call the leaving oberservers, then try to move to the next window,
// but find that there is none, and finish the iteration. By that time, the
// leaving oberserver has been called correclty already, so nothing else to do.
assert( pimpl_ );

execute_on_leave_observers_();
pimpl_->increment_();

// Now that we are at the new element, we execute the enter observers.
// If we instead reached the end of the input though, that one will do nothing.
// In that case however, we want to execute the end callbacks instead.
execute_on_enter_observers_();

assert( pimpl_ );
assert( base_parent_ );
if( !pimpl_->get_parent_() ) {
execute_end_callbacks_();
}
return *this;
}

Expand Down Expand Up @@ -384,12 +428,36 @@ class BaseWindowStream
}
}

void execute_begin_callbacks_() const
{
assert( base_parent_ );
for( auto const& cb : base_parent_->begin_callbacks_ ) {
cb();
}
}

void execute_end_callbacks_() const
{
assert( base_parent_ );
for( auto const& cb : base_parent_->end_callbacks_ ) {
cb();
}
}

// -------------------------------------------------------------------------
// PIMPL-like Implementation Abstraction
// -------------------------------------------------------------------------

private:

// Parent. This is only the base class, and hence cannot be used by the derived classes.
// We need it here for the begin and end callbacks only, as those need to be run
// indepentently of the derived classes.
BaseWindowStream const* base_parent_ = nullptr;

// Implementation of the derived iterator. This is where all the logic for the actual
// window iteration lives. We use the pimpl idiom here so that this class here does
// not need to expose derived interfaces.
std::unique_ptr<BaseIterator> pimpl_;

};
Expand Down Expand Up @@ -618,6 +686,60 @@ class BaseWindowStream
return *this;
}

/**
* @brief Add a callback function that is executed when beginning the iteration.
*
* Similar to the functionality offered by the observers, this could also be achieved by
* executing these functions direclty where needed, but having it as a callback here helps
* to reduce code duplication.
*
* See also add_end_callback().
*/
self_type& add_begin_callback( std::function<void()> const& callback )
{
if( started_ ) {
throw std::runtime_error(
"Window Stream: Cannot change callbacks after iteration has started."
);
}
begin_callbacks_.push_back( callback );
return *this;
}

/**
* @brief Add a callback function that is executed when the end of the iteration is reached.
*
* This is similar to the add_begin_callback() functionality, but instead of executing the
* callback when starting the iteration, it is called when ending it. Again, this is meant
* as a means to reduce user code duplication, for example for logging needs.
*/
self_type& add_end_callback( std::function<void()> const& callback )
{
if( started_ ) {
throw std::runtime_error(
"Window Stream: Cannot change callbacks after iteration has started."
);
}
end_callbacks_.push_back( callback );
return *this;
}

/**
* @brief Clear all functions that have been added via add_begin_callback() and
* add_end_callback().
*/
self_type& clear_callbacks()
{
if( started_ ) {
throw std::runtime_error(
"Window Stream: Cannot change callbacks after iteration has started."
);
}
begin_callbacks_.clear();
end_callbacks_.clear();
return *this;
}

// -------------------------------------------------------------------------
// Iteration
// -------------------------------------------------------------------------
Expand All @@ -631,12 +753,12 @@ class BaseWindowStream
);
}
started_ = true;
return Iterator( get_begin_iterator_() );
return Iterator( this, get_begin_iterator_() );
}

Iterator end()
{
return Iterator( get_end_iterator_() );
return Iterator( nullptr, get_end_iterator_() );
}

// -------------------------------------------------------------------------
Expand Down Expand Up @@ -679,6 +801,10 @@ class BaseWindowStream
std::vector<std::function<void(WindowType const&)>> on_enter_observers_;
std::vector<std::function<void(WindowType const&)>> on_leave_observers_;

// We furthermore allow callbacks for the beginning and and of the iteration.
std::vector<std::function<void()>> begin_callbacks_;
std::vector<std::function<void()>> end_callbacks_;

};

} // namespace population
Expand Down
12 changes: 9 additions & 3 deletions lib/genesis/population/window/window_view_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
Contact:
Lucas Czech <[email protected]>
Department of Plant Biology, Carnegie Institution For Science
260 Panama Street, Stanford, CA 94305, USA
Lucas Czech <[email protected]>
University of Copenhagen, Globe Institute, Section for GeoGenetics
Oster Voldgade 5-7, 1350 Copenhagen K, Denmark
*/

/**
Expand Down Expand Up @@ -171,6 +171,12 @@ class WindowViewStream final : public BaseWindowStream<
current_ = parent_->window_stream_->begin();
end_ = parent_->window_stream_->end();

// Edge case for empty data.
if( current_ == end_ ) {
parent_ = nullptr;
return;
}

// Start a view into the first window. This creates a view that mirrors the underlying
// window, and iteratres through it, using the WindowView constructor that takes a Window.
window_view_ = WindowViewType{ *current_ };
Expand Down
6 changes: 3 additions & 3 deletions lib/genesis/utils/containers/generic_input_stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1031,14 +1031,14 @@ class GenericInputStream
}

/**
* @brief Add a callback function that is executed when the begin() function is called.
* @brief Add a callback function that is executed when beginning the iteration.
*
* The callback needs to accept the GenericInputStream object itself, as a means to, for example,
* access its data(), and is meant as a reporting mechanism. For example, callbacks can be added
* that write properties of the underlying data sources to log. They are executed in the order
* added.
*
* Similar to the functionality offered by add_observer(), this could also be achieved by
* Similar to the functionality offered by the observers, this could also be achieved by
* executing these functions direclty where needed, but having it as a callback here helps
* to reduce code duplication.
*
Expand All @@ -1056,7 +1056,7 @@ class GenericInputStream
}

/**
* @brief Add a callback function that is executed when the end() of the iteration is reached.
* @brief Add a callback function that is executed when the end of the iteration is reached.
*
* This is similar to the add_begin_callback() functionality, but instead of executing the
* callback when starting the iteration, it is called when ending it. Again, this is meant
Expand Down
44 changes: 44 additions & 0 deletions test/src/population/interval_window_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ void test_sliding_interval_stream_( WindowStream& win_it )
++leave_observe_cnt;
});

// Same for the callbacks
size_t begin_cnt = 0;
size_t end_cnt = 0;
win_it.add_begin_callback( [&begin_cnt](){
// LOG_DBG << "begin";
++begin_cnt;
});
win_it.add_end_callback( [&end_cnt](){
// LOG_DBG << "end";
++end_cnt;
});

size_t window_cnt = 0;
for( auto it = win_it.begin(); it != win_it.end(); ++it ) {
auto const& window = *it;
Expand Down Expand Up @@ -117,6 +129,8 @@ void test_sliding_interval_stream_( WindowStream& win_it )
EXPECT_EQ( 7, window_cnt );
EXPECT_EQ( 7, enter_observe_cnt );
EXPECT_EQ( 7, leave_observe_cnt );
EXPECT_EQ( 1, begin_cnt );
EXPECT_EQ( 1, end_cnt );

EXPECT_TRUE( found_first_win );
EXPECT_TRUE( found_last_win );
Expand Down Expand Up @@ -190,7 +204,9 @@ TEST( WindowStream, SlidingIntervalLambda )
void run_sliding_interval_window_view_variant_test_( VariantWindowViewStream& win_it )
{
size_t window_cnt = 0;
// LOG_DBG << "go go go";
for( auto it = win_it.begin(); it != win_it.end(); ++it ) {
// LOG_DBG << "==========";
++window_cnt;
}
EXPECT_EQ( 7, window_cnt );
Expand Down Expand Up @@ -228,10 +244,24 @@ TEST( WindowStream, SlidingIntervalWindowView )
++leave_observe_cnt;
});

// Same for the callbacks
size_t begin_cnt = 0;
size_t end_cnt = 0;
win_it.add_begin_callback( [&begin_cnt](){
// LOG_DBG << "begin";
++begin_cnt;
});
win_it.add_end_callback( [&end_cnt](){
// LOG_DBG << "end";
++end_cnt;
});

// We use a test function that takes our abstract type, to see if we set this up correctly.
run_sliding_interval_window_view_variant_test_( win_it );
EXPECT_EQ( 7, enter_observe_cnt );
EXPECT_EQ( 7, leave_observe_cnt );
EXPECT_EQ( 1, begin_cnt );
EXPECT_EQ( 1, end_cnt );

// test_sliding_interval_stream_( win_it );
// size_t window_cnt = 0;
Expand Down Expand Up @@ -269,6 +299,18 @@ TEST( WindowStream, SlidingIntervalEmpty )
++leave_observe_cnt;
});

// Same for the callbacks
size_t begin_cnt = 0;
size_t end_cnt = 0;
win_it.add_begin_callback( [&begin_cnt](){
// LOG_DBG << "begin";
++begin_cnt;
});
win_it.add_end_callback( [&end_cnt](){
// LOG_DBG << "end";
++end_cnt;
});

size_t window_cnt = 0;
for( auto it = win_it.begin(); it != win_it.end(); ++it ) {

Expand All @@ -285,4 +327,6 @@ TEST( WindowStream, SlidingIntervalEmpty )
EXPECT_EQ( 0, window_cnt );
EXPECT_EQ( 0, enter_observe_cnt );
EXPECT_EQ( 0, leave_observe_cnt );
EXPECT_EQ( 1, begin_cnt );
EXPECT_EQ( 1, end_cnt );
}
Loading

0 comments on commit 80f96fb

Please sign in to comment.