From d05357c45173b3dc825d8a9a1788fcdbca896799 Mon Sep 17 00:00:00 2001 From: Kirk Shoop Date: Mon, 6 Feb 2023 11:43:32 -0800 Subject: [PATCH] adding some tests --- include/exec/tail_sender.hpp | 435 +++++++++++++++++++++++++++++++++ test/exec/test_tail_sender.cpp | 74 +++++- 2 files changed, 507 insertions(+), 2 deletions(-) diff --git a/include/exec/tail_sender.hpp b/include/exec/tail_sender.hpp index 2a97413af..187d8213b 100644 --- a/include/exec/tail_sender.hpp +++ b/include/exec/tail_sender.hpp @@ -554,4 +554,439 @@ namespace exec { return _resume_tail_senders_until_one_remaining(__r, std::index_sequence_for{}, cs...); } + template + void resume_tail_senders(_TailReceiver&& __r, Cs... cs) noexcept { + auto __last_tail = _resume_tail_senders_until_one_remaining(__r, std::index_sequence_for{}, cs...); + for(;;) { + auto __op = connect(__last_tail, __r); + if (!__op) { + return; + } + if constexpr (__terminal_tail_sender_to) { + start(__last_tail); + return; + } else { + __last_tail = __start_sequential(start(__last_tail), __r); + } + } + } + + ///////////////////////////////////////////////////////////////////////////// + // run_loop + namespace __loop { + class run_loop; + + struct __task : __immovable { + __task* __next_ = this; + union { + void (*__execute_)(__task*) noexcept; + __task* __tail_; + }; + + void __execute() noexcept { (*__execute_)(this); } + }; + + template + struct __operation { + using _Receiver = stdexec::__t<_ReceiverId>; + + struct __t : __task { + using __id = __operation; + + run_loop* __loop_; + [[no_unique_address]] _Receiver __rcvr_; + + static void __execute_impl(__task* __p) noexcept { + auto& __rcvr = ((__t*) __p)->__rcvr_; + try { + if (get_stop_token(get_env(__rcvr)).stop_requested()) { + set_stopped((_Receiver&&) __rcvr); + } else { + set_value((_Receiver&&) __rcvr); + } + } catch(...) { + set_error((_Receiver&&) __rcvr, std::current_exception()); + } + } + + explicit __t(__task* __tail) noexcept + : __task{.__tail_ = __tail} {} + __t(__task* __next, run_loop* __loop, _Receiver __rcvr) + : __task{{}, __next, {&__execute_impl}} + , __loop_{__loop} + , __rcvr_{(_Receiver&&) __rcvr} {} + + friend void tag_invoke(start_t, __t& __self) noexcept { + __self.__start_(); + } + + void __start_() noexcept; + }; + }; + + template + struct __run_operation { + using _Receiver = stdexec::__t<_ReceiverId>; + + struct __t { + using __id = __run_operation; + + run_loop* __loop_; + [[no_unique_address]] _Receiver __rcvr_; + + __t(run_loop* __loop, _Receiver __rcvr) + : __loop_{__loop} + , __rcvr_{(_Receiver&&) __rcvr} {} + + friend void tag_invoke(start_t, __t& __self) noexcept { + __self.__start_(); + } + + void __start_() noexcept; + }; + }; + + class run_loop { + template + using __completion_signatures_ = completion_signatures; + + template + friend struct __operation; + public: + struct __scheduler { + using __t = __scheduler; + using __id = __scheduler; + bool operator==(const __scheduler&) const noexcept = default; + + private: + struct __schedule_task { + using __t = __schedule_task; + using __id = __schedule_task; + using completion_signatures = + __completion_signatures_< + set_value_t(), + set_error_t(std::exception_ptr), + set_stopped_t()>; + + private: + friend __scheduler; + + template + using __operation = stdexec::__t<__operation>>; + + template + friend __operation<_Receiver> + tag_invoke(connect_t, const __schedule_task& __self, _Receiver __rcvr) { + return __self.__connect_((_Receiver &&) __rcvr); + } + + template + __operation<_Receiver> __connect_(_Receiver&& __rcvr) const { + return {&__loop_->__head_, __loop_, (_Receiver &&) __rcvr}; + } + + template + friend __scheduler + tag_invoke(get_completion_scheduler_t<_CPO>, const __schedule_task& __self) noexcept { + return __scheduler{__self.__loop_}; + } + + explicit __schedule_task(run_loop* __loop) noexcept + : __loop_(__loop) + {} + + run_loop* const __loop_; + }; + + friend run_loop; + + explicit __scheduler(run_loop* __loop) noexcept + : __loop_(__loop) {} + + friend __schedule_task tag_invoke(schedule_t, const __scheduler& __self) noexcept { + return __self.__schedule(); + } + + friend stdexec::forward_progress_guarantee tag_invoke( + get_forward_progress_guarantee_t, const __scheduler&) noexcept { + return stdexec::forward_progress_guarantee::parallel; + } + + // BUGBUG NOT TO SPEC + friend bool tag_invoke( + this_thread::execute_may_block_caller_t, const __scheduler&) noexcept { + return false; + } + + __schedule_task __schedule() const noexcept { + return __schedule_task{__loop_}; + } + + run_loop* __loop_; + }; + __scheduler get_scheduler() noexcept { + return __scheduler{this}; + } + + struct __run_sender { + using __t = __run_sender; + using __id = __run_sender; + using completion_signatures = + __completion_signatures_< + set_value_t(), + set_stopped_t()>; + + private: + friend __scheduler; + + template + using __operation = stdexec::__t<__operation>>; + + template + friend __operation<_Receiver> + tag_invoke(connect_t, const __run_sender& __self, _Receiver __rcvr) { + return __self.__connect_((_Receiver &&) __rcvr); + } + + template + __operation<_Receiver> __connect_(_Receiver&& __rcvr) const { + return {&__loop_->__head_, __loop_, (_Receiver &&) __rcvr}; + } + + template + friend __scheduler + tag_invoke(get_completion_scheduler_t<_CPO>, const __run_sender& __self) noexcept { + return __scheduler{__self.__loop_}; + } + + explicit __run_sender(run_loop* __loop) noexcept + : __loop_(__loop) + {} + + run_loop* __loop_; + }; + template + __run_sender run(_Sender&& __s); + + void run(); + + void finish(); + + private: + void __push_back_(__task* __task); + __task* __pop_front_(); + + std::mutex __mutex_; + std::condition_variable __cv_; + __task __head_{.__tail_ = &__head_}; + bool __stop_ = false; + }; + + template + inline void __operation<_ReceiverId>::__t::__start_() noexcept try { + __loop_->__push_back_(this); + } catch(...) { + set_error((_Receiver&&) __rcvr_, std::current_exception()); + } + + template + run_loop::__run_sender run_loop::run(_Sender&& __s) { + return {this, (_Sender &&)__s}; + } + + inline void run_loop::run() { + for (__task* __task; (__task = __pop_front_()) != &__head_;) { + __task->__execute(); + } + } + + inline void run_loop::finish() { + std::unique_lock __lock{__mutex_}; + __stop_ = true; + __cv_.notify_all(); + } + + inline void run_loop::__push_back_(__task* __task) { + std::unique_lock __lock{__mutex_}; + __task->__next_ = &__head_; + __head_.__tail_ = __head_.__tail_->__next_ = __task; + __cv_.notify_one(); + } + + inline __task* run_loop::__pop_front_() { + std::unique_lock __lock{__mutex_}; + __cv_.wait(__lock, [this]{ return __head_.__next_ != &__head_ || __stop_; }); + if (__head_.__tail_ == __head_.__next_) + __head_.__tail_ = &__head_; + return std::exchange(__head_.__next_, __head_.__next_->__next_); + } + } // namespace __loop + + // NOT TO SPEC + using run_loop = __loop::run_loop; + + ///////////////////////////////////////////////////////////////////////////// + // [execution.senders.consumers.sync_wait] + // [execution.senders.consumers.sync_wait_with_variant] + namespace __sync_wait { + template + using __into_variant_result_t = + decltype(stdexec::into_variant(__declval<_Sender>())); + + struct __env { + using __t = __env; + using __id = __env; + stdexec::run_loop::__scheduler __sched_; + + friend auto tag_invoke(stdexec::get_scheduler_t, const __env& __self) noexcept + -> stdexec::run_loop::__scheduler { + return __self.__sched_; + } + + friend auto tag_invoke(stdexec::get_delegatee_scheduler_t, const __env& __self) noexcept + -> stdexec::run_loop::__scheduler { + return __self.__sched_; + } + }; + + // What should sync_wait(just_stopped()) return? + template + using __sync_wait_result_impl = + __value_types_of_t< + _Sender, + __env, + __transform<__q, _Continuation>, + __q<__msingle>>; + + template _Sender> + using __sync_wait_result_t = + __sync_wait_result_impl<_Sender, __q>; + + template + using __sync_wait_with_variant_result_t = + __sync_wait_result_t<__into_variant_result_t<_Sender>>; + + template + struct __state { + using _Tuple = std::tuple<_Values...>; + std::variant __data_{}; + }; + + template + struct __receiver { + struct __t { + using __id = __receiver; + __state<_Values...>* __state_; + stdexec::run_loop* __loop_; + template + void __set_error(_Error __err) noexcept { + if constexpr (__decays_to<_Error, std::exception_ptr>) + __state_->__data_.template emplace<2>((_Error&&) __err); + else if constexpr (__decays_to<_Error, std::error_code>) + __state_->__data_.template emplace<2>(std::make_exception_ptr(std::system_error(__err))); + else + __state_->__data_.template emplace<2>(std::make_exception_ptr((_Error&&) __err)); + __loop_->finish(); + } + template + requires constructible_from, _As...> + friend void tag_invoke(stdexec::set_value_t, __t&& __rcvr, _As&&... __as) noexcept try { + __rcvr.__state_->__data_.template emplace<1>((_As&&) __as...); + __rcvr.__loop_->finish(); + } catch(...) { + __rcvr.__set_error(std::current_exception()); + } + template + friend void tag_invoke(stdexec::set_error_t, __t&& __rcvr, _Error __err) noexcept { + __rcvr.__set_error((_Error &&) __err); + } + friend void tag_invoke(stdexec::set_stopped_t __d, __t&& __rcvr) noexcept { + __rcvr.__state_->__data_.template emplace<3>(__d); + __rcvr.__loop_->finish(); + } + friend __env + tag_invoke(stdexec::get_env_t, const __t& __rcvr) noexcept { + return {__rcvr.__loop_->get_scheduler()}; + } + }; + }; + + template + using __into_variant_result_t = + decltype(stdexec::into_variant(__declval<_Sender>())); + + //////////////////////////////////////////////////////////////////////////// + // [execution.senders.consumers.sync_wait] + struct sync_wait_t { + template + using __receiver_t = __t<__sync_wait_result_impl<_Sender, __q<__receiver>>>; + + // TODO: constrain on return type + template <__single_value_variant_sender<__env> _Sender> // NOT TO SPEC + requires + __tag_invocable_with_completion_scheduler< + sync_wait_t, set_value_t, _Sender> + tag_invoke_result_t< + sync_wait_t, + __completion_scheduler_for<_Sender, set_value_t>, + _Sender> + operator()(_Sender&& __sndr) const noexcept( + nothrow_tag_invocable< + sync_wait_t, + __completion_scheduler_for<_Sender, set_value_t>, + _Sender>) { + auto __sched = + get_completion_scheduler(__sndr); + return tag_invoke(sync_wait_t{}, std::move(__sched), (_Sender&&) __sndr); + } + + // TODO: constrain on return type + template <__single_value_variant_sender<__env> _Sender> // NOT TO SPEC + requires + (!__tag_invocable_with_completion_scheduler< + sync_wait_t, set_value_t, _Sender>) && + tag_invocable + tag_invoke_result_t + operator()(_Sender&& __sndr) const noexcept( + nothrow_tag_invocable) { + return tag_invoke(sync_wait_t{}, (_Sender&&) __sndr); + } + + template <__single_value_variant_sender<__env> _Sender> + requires + (!__tag_invocable_with_completion_scheduler< + sync_wait_t, set_value_t, _Sender>) && + (!tag_invocable) && + sender<_Sender, __env> && + sender_to<_Sender, __receiver_t<_Sender>> + auto operator()(_Sender&& __sndr) const + -> std::optional<__sync_wait_result_t<_Sender>> { + using state_t = __sync_wait_result_impl<_Sender, __q<__state>>; + using tail_t = next_tail_from_sender_to_t<_Sender, __receiver_t<_Sender>>; + state_t __state {}; + run_loop __loop; + + // Launch the sender with a continuation that will fill in a variant + // and notify a condition variable. + auto __op_state = + connect((_Sender&&) __sndr, __receiver_t<_Sender>{&__state, &__loop}); + tail_t __tail = start(__op_state); + + // Wait for the variant to be filled in. + auto __tail_run = __loop.run(just()); + + resume_tail_senders(__null_tail_sender{}, __tail, __tail_run); + + if (__state.__data_.index() == 2) + std::rethrow_exception(std::get<2>(__state.__data_)); + + if (__state.__data_.index() == 3) + return std::nullopt; + + return std::move(std::get<1>(__state.__data_)); + } + }; + } // namespace __sync_wait + using __sync_wait::sync_wait_t; + inline constexpr sync_wait_t sync_wait{}; } // namespace exec diff --git a/test/exec/test_tail_sender.cpp b/test/exec/test_tail_sender.cpp index 7b0838f88..52f668ca7 100644 --- a/test/exec/test_tail_sender.cpp +++ b/test/exec/test_tail_sender.cpp @@ -22,6 +22,8 @@ using namespace std; namespace ex = stdexec; +namespace { + //! Tail Sender struct ATailSender { using completion_signatures = ex::completion_signatures; @@ -63,12 +65,13 @@ struct ATailReceiver { int* called; friend void tag_invoke(ex::set_value_t, ATailReceiver&& __self, auto&&...) noexcept { ++*__self.called; } friend void tag_invoke(ex::set_stopped_t, ATailReceiver&& __self) noexcept { ++*__self.called; } - friend ex::__empty_env tag_invoke(ex::get_env_t, const ATailReceiver&) { + template + requires same_as<_Self, ATailReceiver> + friend ex::__empty_env tag_invoke(ex::get_env_t, const _Self&) { return {}; } }; - template struct ANestTailReceiver { std::decay_t nested_tail_sender; @@ -88,6 +91,70 @@ struct ANestTailReceiver { } }; +// struct ATailSender { +// using completion_signatures = ex::completion_signatures; + +// template +// struct operation { +// Receiver rcvr_; + +// operation(Receiver __r) : rcvr_(__r) {} + +// operation(const operation&) = delete; +// operation(operation&&) = delete; +// operation& operator=(const operation&) = delete; +// operation& operator=(operation&&) = delete; +// [[nodiscard]] +// friend auto tag_invoke(ex::start_t, operation& self) noexcept { +// return ex::set_value(std::move(self.rcvr_)); +// } + +// friend void tag_invoke(exec::unwind_t, operation& self) noexcept { +// ex::set_stopped(std::move(self.rcvr_)); +// } +// }; + +// template +// friend auto tag_invoke(ex::connect_t, ATailSender self, Receiver&& rcvr) noexcept +// -> operation> { +// return {std::forward(rcvr)}; +// } + +// template +// friend constexpr bool tag_invoke( +// exec::always_completes_inline_t, exec::c_t, exec::c_t<_Env>) noexcept { +// return true; +// } +// }; + +struct ASenderWithTail { + using completion_signatures = ex::completion_signatures; + + template + struct operation { + Receiver rcvr_; + + operation(Receiver __r) : rcvr_(__r) {} + + operation(const operation&) = delete; + operation(operation&&) = delete; + operation& operator=(const operation&) = delete; + operation& operator=(operation&&) = delete; + [[nodiscard]] + friend auto tag_invoke(ex::start_t, operation& self) noexcept { + return ex::set_value(std::move(self.rcvr_)); + } + }; + + template + friend auto tag_invoke(ex::connect_t, ATailSender self, Receiver&& rcvr) noexcept + -> operation> { + return {std::forward(rcvr)}; + } +}; + +} + TEST_CASE("Test ATailSender is a tail_sender", "[tail_sender]") { static_assert(exec::tail_sender); static_assert(exec::__terminal_tail_sender_to); @@ -283,3 +350,6 @@ TEST_CASE("Test resume_tail_senders_until_one_remaining()", "[tail_sender]") { CHECK(called == 1); CHECK(!op4); } + +TEST_CASE("Test sync_wait() with start() that returns a tail_sender", "[tail_sender]") { +}