Skip to content

Commit

Permalink
Merge pull request #53 from beman-project/when_all-problem
Browse files Browse the repository at this point in the history
When all problem
  • Loading branch information
dietmarkuehl authored Oct 26, 2024
2 parents 22f7627 + 147ba4a commit 8e01c7e
Show file tree
Hide file tree
Showing 9 changed files with 228 additions and 24 deletions.
1 change: 1 addition & 0 deletions examples/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
set(BEMAN_EXECUTION26_LIBRARY beman_${TARGET_NAME})

set(EXAMPLES
when_all-cancel
stop_token
stopping
allocator
Expand Down
167 changes: 167 additions & 0 deletions examples/when_all-cancel.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// examples/when_all-cancel.cpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#include <beman/execution26/execution.hpp>
#include <beman/execution26/stop_token.hpp>
#include <iostream>
#include <optional>
#include <type_traits>
#include <utility>

namespace ex = beman::execution26;

// ----------------------------------------------------------------------------

namespace
{
struct env
{
ex::inplace_stop_source* source;
auto query(ex::get_stop_token_t const&) const noexcept
-> ex::inplace_stop_token
{
std::cout << "query\n";
return this->source->get_token();
}
};

struct receiver
{
using receiver_concept = ex::receiver_t;
ex::inplace_stop_source* source;

auto set_value() && noexcept -> void { std::cout << "set_value\n"; }
auto set_error(auto&&) && noexcept -> void { std::cout << "set_error\n"; }
auto set_stopped() && noexcept -> void { std::cout << "set_stopped\n"; }

auto get_env() const noexcept { return env{this->source}; }
};
static_assert(ex::receiver<receiver>);

struct await_stop
{
using sender_concept = ex::sender_t;
using completion_signatures = ex::completion_signatures<
ex::set_value_t(),
ex::set_stopped_t()
>;

template <ex::receiver Receiver>
struct state
{
struct stop_t
{
state* st;
auto operator()() const noexcept -> void
{
auto local_st{this->st};
local_st->callback.reset();
std::cout << "await_stop stopping\n";
ex::set_stopped(std::move(local_st->receiver));
std::cout << "await_stop stopping done\n";
}
};
using operation_state_concept = ex::operation_state_t;
using token_t = decltype(ex::get_stop_token(ex::get_env(std::declval<Receiver>())));
using callback_t = ex::stop_callback_for_t<token_t, stop_t>;

Receiver receiver;
std::optional<callback_t> callback{};

auto start() & noexcept -> void
{
callback.emplace(ex::get_stop_token(ex::get_env(this->receiver)), stop_t{this});
}
};

template <ex::receiver Receiver>
auto connect(Receiver&& receiver) -> state<std::remove_cvref_t<Receiver>>
{
return { ::std::forward<Receiver>(receiver) };
}
};
static_assert(ex::sender_in<await_stop>);
static_assert(ex::operation_state<await_stop::state<receiver>>);

template <ex::sender Sender>
struct eager
{
using sender_concept = ex::sender_t;
using completion_signatures = ex::completion_signatures<
ex::set_value_t(),
ex::set_stopped_t()
>;

Sender sender;

template <ex::receiver Receiver>
struct state
{
using operation_state_concept = ex::operation_state_t;
struct receiver
{
using receiver_concept = ex::receiver_t;
state* st;
auto set_value() && noexcept -> void { ex::set_value(std::move(st->outer_receiver)); }
template <typename E>
auto set_error(E&& e) && noexcept -> void { ex::set_error(std::move(st->outer_receiver), std::forward<E>(e)); }
auto set_stopped() && noexcept -> void
{
st->inner_state.reset();
ex::set_stopped(std::move(st->outer_receiver));
}

auto get_env() const noexcept -> env
{
return ex::get_env(st->outer_receiver);
}
};
using inner_state_t = decltype(ex::connect(std::declval<Sender>(), std::declval<receiver>()));

struct helper
{
inner_state_t st;
template <typename S, typename R>
helper(S&& s, R&& r): st(ex::connect(std::forward<S>(s), std::forward<R>(r))) {}
};

Receiver outer_receiver;
std::optional<helper> inner_state;

template <typename R, typename S>
state(R&& r, S&& s)
: outer_receiver(std::forward<R>(r))
, inner_state()
{
inner_state.emplace(std::forward<S>(s), receiver{this});
}
auto start() & noexcept -> void
{
ex::start((*this->inner_state).st);
}
};
template <ex::receiver Receiver>
auto connect(Receiver&& receiver)
{
return state<std::remove_cvref_t<Receiver>>(std::forward<Receiver>(receiver), std::move(this->sender));
}
};
template <ex::sender Sender>
eager(Sender&&) -> eager<std::remove_cvref_t<Sender>>;
//static_assert(ex::sender_in<eager>);
//static_assert(ex::operation_state<eager::state<receiver>>);
}

auto main() -> int
{
auto s{eager{ex::when_all(await_stop{})}};

ex::inplace_stop_source source{};
auto op{ex::connect(s, receiver{&source})};
(void)op;
std::cout << "start\n";
ex::start(op);
std::cout << "started\n";
source.request_stop();
std::cout << "done\n";
}
8 changes: 4 additions & 4 deletions include/beman/execution26/detail/basic_operation.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ namespace beman::execution26::detail
using inner_ops_t = ::beman::execution26::detail::connect_all_result<Sender, Receiver>;
inner_ops_t inner_ops;

template <typename S> //-dk:TODO is that deviating from the spec?
basic_operation(S&& sender, Receiver&& receiver) noexcept(true/*-dk:TODO*/)
basic_operation(Sender&& sender, Receiver&& receiver) noexcept(true/*-dk:TODO*/)
: ::beman::execution26::detail::basic_state<Sender, Receiver>(
::std::forward<S>(sender), ::std::move(receiver)
::std::forward<Sender>(sender),
::std::move(receiver)
)
, inner_ops(::beman::execution26::detail::connect_all(
this,
::std::forward<S>(sender),
::std::forward<Sender>(sender),
::beman::execution26::detail::indices_for<Sender>()
))
{
Expand Down
2 changes: 1 addition & 1 deletion include/beman/execution26/detail/basic_sender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ namespace beman::execution26::detail
::beman::execution26::receiver Receiver>
auto connect(this Self&& self, Receiver receiver)
noexcept(true/*-dk:TODO*/)
-> ::beman::execution26::detail::basic_operation<basic_sender, Receiver>
-> ::beman::execution26::detail::basic_operation<Self, Receiver>
{
return { ::std::forward<Self>(self), ::std::move(receiver) };
}
Expand Down
5 changes: 2 additions & 3 deletions include/beman/execution26/detail/basic_state.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ namespace beman::execution26::detail
template <typename Sender, typename Receiver>
struct basic_state
{
template <typename S> //-dk:TODO is that deviating from the spec?
basic_state(S&& sender, Receiver&& receiver) noexcept(true)
basic_state(Sender&& sender, Receiver&& receiver) noexcept(true)
: receiver(::std::move(receiver))
, state(::beman::execution26::detail::impls_for<
::beman::execution26::tag_of_t<Sender>
>::get_state(::std::forward<S>(sender), receiver))
>::get_state(::std::forward<Sender>(sender), receiver))
{
}

Expand Down
13 changes: 7 additions & 6 deletions include/beman/execution26/detail/get_env.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,19 @@ namespace beman::execution26
{
template <typename Object>
requires(
not requires(Object&& object) { ::std::as_const(object).get_env(); }
not requires(::std::add_const_t<::std::remove_cvref_t<Object>>& object) { object.get_env(); }
|| ::beman::execution26::detail::queryable<std::remove_cvref_t<decltype(::std::declval<::std::remove_cvref_t<Object> const&>().get_env())>>
)
auto operator()(Object&& object) const noexcept -> decltype(auto)
{
if constexpr (requires{ ::std::as_const(object).get_env(); })
::std::add_const_t<::std::remove_cvref_t<Object>>& obj{object};
if constexpr (requires{ obj.get_env(); })
{
static_assert(noexcept(::std::as_const(object).get_env()),
"get_env requires the xpression to be noexcept");
static_assert(::beman::execution26::detail::queryable<std::remove_cvref_t<decltype(::std::as_const(object).get_env())>>,
static_assert(noexcept(obj.get_env()),
"get_env requires the expression to be noexcept");
static_assert(::beman::execution26::detail::queryable<std::remove_cvref_t<decltype(obj.get_env())>>,
"get_env requires the result type to be destructible");
return ::std::as_const(object).get_env();
return obj.get_env();
}
else
{
Expand Down
23 changes: 23 additions & 0 deletions include/beman/execution26/detail/on_stop_request.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// include/beman/execution26/detail/on_stop_request.hpp -*-C++-*-
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception

#ifndef INCLUDED_BEMAN_EXECUTION26_DETAIL_ON_STOP_REQUEST
#define INCLUDED_BEMAN_EXECUTION26_DETAIL_ON_STOP_REQUEST

// ----------------------------------------------------------------------------

namespace beman::execution26::detail
{
template <typename St>
struct on_stop_request
{
St& st;
auto operator()() const -> void { this->st.request_stop(); }
};
template <typename T>
on_stop_request(T&) -> on_stop_request<T>;
}

// ----------------------------------------------------------------------------

#endif
32 changes: 22 additions & 10 deletions include/beman/execution26/detail/when_all.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <beman/execution26/detail/meta_transform.hpp>
#include <beman/execution26/detail/meta_prepend.hpp>
#include <beman/execution26/detail/meta_unique.hpp>
#include <beman/execution26/detail/on_stop_request.hpp>
#include <beman/execution26/detail/sender.hpp>
#include <beman/execution26/detail/sender_in.hpp>
#include <beman/execution26/detail/set_value.hpp>
Expand Down Expand Up @@ -111,11 +112,6 @@ namespace beman::execution26::detail

enum class disposition { started, error, stopped };

struct on_stop_request
{
::beman::execution26::inplace_stop_source& stop_src;
auto operator()() { this->stop_src.request_stop(); }
};
template <typename Receiver, typename... Sender>
struct state_type
{
Expand Down Expand Up @@ -148,7 +144,9 @@ namespace beman::execution26::detail
>;
using stop_callback = ::beman::execution26::stop_callback_for_t<
::beman::execution26::stop_token_of_t<
::beman::execution26::env_of_t<Receiver>>, on_stop_request
::beman::execution26::env_of_t<Receiver>
>,
::beman::execution26::detail::on_stop_request<state_type>
>;

void arrive(Receiver& receiver) noexcept {
Expand Down Expand Up @@ -187,6 +185,18 @@ namespace beman::execution26::detail
}
}

auto request_stop() -> void
{
if (1u == ++this->count)
--this->count;
else
{
this->stop_src.request_stop();
this->arrive(*this->receiver);
}
}

Receiver* receiver{};
::std::atomic<size_t> count{sizeof...(Sender)};
::beman::execution26::inplace_stop_source stop_src{};
::std::atomic<disposition> disp{disposition::started};
Expand All @@ -207,17 +217,18 @@ namespace beman::execution26::detail
};
static constexpr auto get_state{
[]<typename Sender, typename Receiver>(Sender&& sender, Receiver&)
noexcept(noexcept(std::forward<Sender>(sender).apply(make_state<Receiver>())))
noexcept(noexcept(std::forward<Sender>(sender).apply(make_state<Receiver>{})))
{
return std::forward<Sender>(sender).apply(make_state<Receiver>());
return std::forward<Sender>(sender).apply(make_state<Receiver>{});
}
};
static constexpr auto start{
[]<typename State, typename Receiver, typename... Ops>(
State& state, Receiver& receiver, Ops&... ops) noexcept -> void {
state.receiver = &receiver;
state.on_stop.emplace(
::beman::execution26::get_stop_token(::beman::execution26::get_env(receiver)),
on_stop_request{state.stop_src}
::beman::execution26::detail::on_stop_request{state}
);
if (state.stop_src.stop_requested()) {
state.on_stop.reset();
Expand All @@ -229,7 +240,8 @@ namespace beman::execution26::detail
};
static constexpr auto complete{
[]<typename Index, typename State, typename Receiver, typename Set, typename... Args>(
Index, State& state, Receiver& receiver, Set, Args&&... args) noexcept -> void {
Index, State& state, Receiver& receiver, Set, Args&&... args) noexcept -> void
{
if constexpr (::std::same_as<Set, ::beman::execution26::set_error_t>) {
if (disposition::error != state.disp.exchange(disposition::error)) {
state.stop_src.request_stop();
Expand Down
1 change: 1 addition & 0 deletions src/beman/execution26/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ target_sources(${TARGET_LIBRARY}
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/never_stop_token.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/nostopstate.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/nothrow_callable.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/on_stop_request.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/operation_state.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/operation_state_task.hpp
${PROJECT_SOURCE_DIR}/include/beman/execution26/detail/product_type.hpp
Expand Down

0 comments on commit 8e01c7e

Please sign in to comment.