diff --git a/include/seastar/core/do_with.hh b/include/seastar/core/do_with.hh index c967659f8f8..35016d1632f 100644 --- a/include/seastar/core/do_with.hh +++ b/include/seastar/core/do_with.hh @@ -90,13 +90,13 @@ public: template inline auto do_with(T&& rvalue, F&& f) { - auto task = std::make_unique>>(std::forward(rvalue)); + auto task = new internal::do_with_state>(std::forward(rvalue)); auto fut = f(task->data()); if (fut.available()) { return fut; } auto ret = task->get_future(); - internal::set_callback(fut, std::move(task)); + internal::set_callback(fut, task); return ret; } @@ -148,13 +148,13 @@ do_with(T1&& rv1, T2&& rv2, T3_or_F&& rv3, More&&... more) { auto&& just_func = std::move(std::get(std::move(all))); using value_tuple = std::remove_reference_t; using ret_type = decltype(apply(just_func, just_values)); - auto task = std::make_unique>(std::move(just_values)); + auto task = new internal::do_with_state(std::move(just_values)); auto fut = apply(just_func, task->data()); if (fut.available()) { return fut; } auto ret = task->get_future(); - internal::set_callback(fut, std::move(task)); + internal::set_callback(fut, task); return ret; } diff --git a/include/seastar/core/future-util.hh b/include/seastar/core/future-util.hh index 021ed49a2d3..b10c1ea34f2 100644 --- a/include/seastar/core/future-util.hh +++ b/include/seastar/core/future-util.hh @@ -119,7 +119,7 @@ private: // Wait for one of the futures in _incomplete to complete, and then // decide what to do: wait for another one, or deliver _result if all // are complete. - void wait_for_one() { + void wait_for_one() noexcept { // Process from back to front, on the assumption that the front // futures are likely to complete earlier than the back futures. // If that's indeed the case, then the front futures will be @@ -135,7 +135,7 @@ private: // If there's an incompelete future, wait for it. if (!_incomplete.empty()) { - internal::set_callback(_incomplete.back(), std::unique_ptr>(this)); + internal::set_callback(_incomplete.back(), static_cast*>(this)); // This future's state will be collected in run_and_dispose(), so we can drop it. _incomplete.pop_back(); return; @@ -280,13 +280,14 @@ public: } future<> get_future() { return _promise.get_future(); } virtual void run_and_dispose() noexcept override { - std::unique_ptr zis{this}; if (_state.failed()) { _promise.set_exception(std::move(_state).get_exception()); + delete this; return; } else { if (std::get<0>(_state.get()) == stop_iteration::yes) { _promise.set_value(); + delete this; return; } _state = {}; @@ -295,20 +296,22 @@ public: do { auto f = futurator::apply(_action); if (!f.available()) { - internal::set_callback(f, std::move(zis)); + internal::set_callback(f, this); return; } if (f.get0() == stop_iteration::yes) { _promise.set_value(); + delete this; return; } } while (!need_preempt()); } catch (...) { _promise.set_exception(std::current_exception()); + delete this; return; } _state.set(stop_iteration::no); - schedule(std::move(zis)); + schedule(this); } }; @@ -328,7 +331,7 @@ public: template GCC6_CONCEPT( requires seastar::ApplyReturns || seastar::ApplyReturns> ) inline -future<> repeat(AsyncAction action) { +future<> repeat(AsyncAction action) noexcept { using futurator = futurize>; static_assert(std::is_same, typename futurator::type>::value, "bad AsyncAction signature"); try { @@ -339,9 +342,9 @@ future<> repeat(AsyncAction action) { if (!f.available()) { return [&] () noexcept { memory::disable_failure_guard dfg; - auto repeater = std::make_unique>(std::move(action)); + auto repeater = new internal::repeater(std::move(action)); auto ret = repeater->get_future(); - internal::set_callback(f, std::move(repeater)); + internal::set_callback(f, repeater); return ret; }(); } @@ -351,9 +354,9 @@ future<> repeat(AsyncAction action) { } } while (!need_preempt()); - auto repeater = std::make_unique>(stop_iteration::no, std::move(action)); + auto repeater = new internal::repeater(stop_iteration::no, std::move(action)); auto ret = repeater->get_future(); - schedule(std::move(repeater)); + schedule(repeater); return ret; } catch (...) { return make_exception_future(std::current_exception()); @@ -397,14 +400,15 @@ public: } future get_future() { return _promise.get_future(); } virtual void run_and_dispose() noexcept override { - std::unique_ptr zis{this}; if (this->_state.failed()) { _promise.set_exception(std::move(this->_state).get_exception()); + delete this; return; } else { auto v = std::get<0>(std::move(this->_state).get()); if (v) { _promise.set_value(std::move(*v)); + delete this; return; } this->_state = {}; @@ -413,21 +417,23 @@ public: do { auto f = futurator::apply(_action); if (!f.available()) { - internal::set_callback(f, std::move(zis)); + internal::set_callback(f, this); return; } auto ret = f.get0(); if (ret) { _promise.set_value(std::make_tuple(std::move(*ret))); + delete this; return; } } while (!need_preempt()); } catch (...) { _promise.set_exception(std::current_exception()); + delete this; return; } this->_state.set(compat::nullopt); - schedule(std::move(zis)); + schedule(this); } }; @@ -450,7 +456,7 @@ GCC6_CONCEPT( requires requires (AsyncAction aa) { futurize>::apply(aa).get0().value(); } ) repeat_until_value_return_type -repeat_until_value(AsyncAction action) { +repeat_until_value(AsyncAction action) noexcept { using futurator = futurize>; using type_helper = repeat_until_value_type_helper; // the "T" in the documentation @@ -462,9 +468,9 @@ repeat_until_value(AsyncAction action) { if (!f.available()) { return [&] () noexcept { memory::disable_failure_guard dfg; - auto state = std::make_unique>(std::move(action)); + auto state = new internal::repeat_until_value_state(std::move(action)); auto ret = state->get_future(); - internal::set_callback(f, std::move(state)); + internal::set_callback(f, state); return ret; }(); } @@ -480,9 +486,9 @@ repeat_until_value(AsyncAction action) { } while (!need_preempt()); try { - auto state = std::make_unique>(compat::nullopt, std::move(action)); + auto state = new internal::repeat_until_value_state(compat::nullopt, std::move(action)); auto f = state->get_future(); - schedule(std::move(state)); + schedule(state); return f; } catch (...) { return make_exception_future(std::current_exception()); @@ -500,10 +506,10 @@ public: explicit do_until_state(StopCondition stop, AsyncAction action) : _stop(std::move(stop)), _action(std::move(action)) {} future<> get_future() { return _promise.get_future(); } virtual void run_and_dispose() noexcept override { - std::unique_ptr zis{this}; if (_state.available()) { if (_state.failed()) { _promise.set_urgent_state(std::move(_state)); + delete this; return; } _state = {}; // allow next cycle to overrun state @@ -512,23 +518,26 @@ public: do { if (_stop()) { _promise.set_value(); + delete this; return; } auto f = _action(); if (!f.available()) { - internal::set_callback(f, std::move(zis)); + internal::set_callback(f, this); return; } if (f.failed()) { f.forward_to(std::move(_promise)); + delete this; return; } } while (!need_preempt()); } catch (...) { _promise.set_exception(std::current_exception()); + delete this; return; } - schedule(std::move(zis)); + schedule(this); } }; @@ -547,7 +556,7 @@ public: template GCC6_CONCEPT( requires seastar::ApplyReturns && seastar::ApplyReturns> ) inline -future<> do_until(StopCondition stop_cond, AsyncAction action) { +future<> do_until(StopCondition stop_cond, AsyncAction action) noexcept { using namespace internal; using futurator = futurize; do { @@ -558,9 +567,9 @@ future<> do_until(StopCondition stop_cond, AsyncAction action) { if (!f.available()) { return [&] () noexcept { memory::disable_failure_guard dfg; - auto task = std::make_unique>(std::move(stop_cond), std::move(action)); + auto task = new do_until_state(std::move(stop_cond), std::move(action)); auto ret = task->get_future(); - internal::set_callback(f, std::move(task)); + internal::set_callback(f, task); return ret; }(); } @@ -569,9 +578,9 @@ future<> do_until(StopCondition stop_cond, AsyncAction action) { } } while (!need_preempt()); - auto task = std::make_unique>(std::move(stop_cond), std::move(action)); + auto task = new do_until_state(std::move(stop_cond), std::move(action)); auto f = task->get_future(); - schedule(std::move(task)); + schedule(task); return f; } @@ -737,7 +746,7 @@ public: return true; } else { auto c = new (continuation) when_all_state_component(wasb, f); - set_callback(*f, std::unique_ptr(c)); + set_callback(*f, c); return false; } } diff --git a/include/seastar/core/future.hh b/include/seastar/core/future.hh index d5f0d753f26..76eeded31fc 100644 --- a/include/seastar/core/future.hh +++ b/include/seastar/core/future.hh @@ -495,7 +495,7 @@ template future make_exception_future(future_state_base&& state) noexcept; template -void set_callback(future& fut, std::unique_ptr callback); +void set_callback(future& fut, U* callback) noexcept; class future_base; @@ -509,7 +509,7 @@ protected: // details. future_state_base* _state; - std::unique_ptr _task; + task* _task = nullptr; promise_base(const promise_base&) = delete; promise_base(future_state_base* state) noexcept : _state(state) {} @@ -607,19 +607,19 @@ public: #if SEASTAR_COROUTINES_TS void set_coroutine(future_state& state, task& coroutine) noexcept { _state = &state; - _task = std::unique_ptr(&coroutine); + _task = &coroutine; } #endif private: template - void schedule(Func&& func) { - auto tws = std::make_unique>(std::move(func)); + void schedule(Func&& func) noexcept { + auto tws = new continuation(std::move(func)); _state = &tws->_state; - _task = std::move(tws); + _task = tws; } - void schedule(std::unique_ptr> callback) { + void schedule(continuation_base* callback) noexcept { _state = &callback->_state; - _task = std::move(callback); + _task = callback; } template @@ -968,12 +968,12 @@ private: return static_cast*>(future_base::detach_promise()); } template - void schedule(Func&& func) { + void schedule(Func&& func) noexcept { if (_state.available() || !_promise) { if (__builtin_expect(!_state.available() && !_promise, false)) { _state.set_to_broken_promise(); } - ::seastar::schedule(std::make_unique>(std::move(func), std::move(_state))); + ::seastar::schedule(new continuation(std::move(func), std::move(_state))); } else { assert(_promise); detach_promise()->schedule(std::move(func)); @@ -1095,7 +1095,7 @@ private: auto thread = thread_impl::get(); assert(thread); thread_wake_task wake_task{thread, this}; - detach_promise()->schedule(std::unique_ptr>(&wake_task)); + detach_promise()->schedule(static_cast*>(&wake_task)); thread_impl::switch_out(thread); } @@ -1443,13 +1443,13 @@ public: } #endif private: - void set_callback(std::unique_ptr> callback) { + void set_callback(continuation_base* callback) noexcept { if (_state.available()) { callback->set_state(get_available_state_ref()); - ::seastar::schedule(std::move(callback)); + ::seastar::schedule(callback); } else { assert(_promise); - detach_promise()->schedule(std::move(callback)); + detach_promise()->schedule(callback); } } @@ -1470,7 +1470,7 @@ private: template friend future internal::make_exception_future(future_state_base&& state) noexcept; template - friend void internal::set_callback(future&, std::unique_ptr); + friend void internal::set_callback(future&, V*) noexcept; /// \endcond }; @@ -1717,10 +1717,10 @@ namespace internal { template inline -void set_callback(future& fut, std::unique_ptr callback) { +void set_callback(future& fut, U* callback) noexcept { // It would be better to use continuation_base for U, but // then a derived class of continuation_base won't be matched - return fut.set_callback(std::move(callback)); + return fut.set_callback(callback); } } diff --git a/include/seastar/core/reactor.hh b/include/seastar/core/reactor.hh index 70688700cd4..1c975cfcea0 100644 --- a/include/seastar/core/reactor.hh +++ b/include/seastar/core/reactor.hh @@ -188,7 +188,7 @@ public: std::unique_ptr _pollfn; class registration_task; class deregistration_task; - registration_task* _registration_task; + registration_task* _registration_task = nullptr; public: template // signature: bool () static poller simple(Func&& poll) { @@ -201,7 +201,7 @@ public: ~poller(); poller(poller&& x); poller& operator=(poller&& x); - void do_register(); + void do_register() noexcept; friend class reactor; }; enum class idle_cpu_handler_result { @@ -305,7 +305,7 @@ private: uint8_t _id; sched_clock::duration _runtime = {}; uint64_t _tasks_processed = 0; - circular_buffer> _q; + circular_buffer _q; sstring _name; /** * This array holds pointers to the scheduling group specific @@ -564,10 +564,10 @@ public: } #ifdef SEASTAR_SHUFFLE_TASK_QUEUE - void shuffle(std::unique_ptr&, task_queue&); + void shuffle(task*&, task_queue&); #endif - void add_task(std::unique_ptr&& t) { + void add_task(task* t) noexcept { auto sg = t->group(); auto* q = _task_queues[sg._id].get(); bool was_empty = q->_q.empty(); @@ -579,7 +579,7 @@ public: activate(*q); } } - void add_urgent_task(std::unique_ptr&& t) { + void add_urgent_task(task* t) noexcept { auto sg = t->group(); auto* q = _task_queues[sg._id].get(); bool was_empty = q->_q.empty(); @@ -605,7 +605,7 @@ public: } void force_poll(); - void add_high_priority_task(std::unique_ptr&&); + void add_high_priority_task(task*) noexcept; network_stack& net() { return *_network_stack; } shard_id cpu_id() const { return _id; } diff --git a/include/seastar/core/task.hh b/include/seastar/core/task.hh index 379562cce55..efb85698262 100644 --- a/include/seastar/core/task.hh +++ b/include/seastar/core/task.hh @@ -35,8 +35,8 @@ public: scheduling_group group() const { return _sg; } }; -void schedule(std::unique_ptr&& t) noexcept; -void schedule_urgent(std::unique_ptr&& t) noexcept; +void schedule(task* t) noexcept; +void schedule_urgent(task* t) noexcept; template class lambda_task final : public task { @@ -52,16 +52,16 @@ public: template inline -std::unique_ptr -make_task(Func&& func) { - return std::make_unique>(current_scheduling_group(), std::forward(func)); +task* +make_task(Func&& func) noexcept { + return new lambda_task(current_scheduling_group(), std::forward(func)); } template inline -std::unique_ptr -make_task(scheduling_group sg, Func&& func) { - return std::make_unique>(sg, std::forward(func)); +task* +make_task(scheduling_group sg, Func&& func) noexcept { + return new lambda_task(sg, std::forward(func)); } } diff --git a/src/core/reactor.cc b/src/core/reactor.cc index a28bb5a0692..9ee998e4fb1 100644 --- a/src/core/reactor.cc +++ b/src/core/reactor.cc @@ -2099,12 +2099,11 @@ void reactor::run_tasks(task_queue& tq) { *internal::current_scheduling_group_ptr() = scheduling_group(tq._id); auto& tasks = tq._q; while (!tasks.empty()) { - auto tsk = std::move(tasks.front()); + auto tsk = tasks.front(); tasks.pop_front(); STAP_PROBE(seastar, reactor_run_tasks_single_start); task_histogram_add_task(*tsk); tsk->run_and_dispose(); - tsk.release(); STAP_PROBE(seastar, reactor_run_tasks_single_end); ++tq._tasks_processed; ++_global_tasks_processed; @@ -2123,7 +2122,7 @@ void reactor::run_tasks(task_queue& tq) { } #ifdef SEASTAR_SHUFFLE_TASK_QUEUE -void reactor::shuffle(std::unique_ptr& t, task_queue& q) { +void reactor::shuffle(task*& t, task_queue& q) { static thread_local std::mt19937 gen = std::mt19937(std::default_random_engine()()); std::uniform_int_distribution tasks_dist{0, q._q.size() - 1}; auto& to_swap = q._q[tasks_dist(gen)]; @@ -2835,7 +2834,7 @@ void reactor::replace_poller(pollfn* old, pollfn* neww) { } reactor::poller::poller(poller&& x) - : _pollfn(std::move(x._pollfn)), _registration_task(x._registration_task) { + : _pollfn(std::move(x._pollfn)), _registration_task(std::exchange(x._registration_task, nullptr)) { if (_pollfn && _registration_task) { _registration_task->moved(this); } @@ -2851,15 +2850,14 @@ reactor::poller::operator=(poller&& x) { } void -reactor::poller::do_register() { +reactor::poller::do_register() noexcept { // We can't just insert a poller into reactor::_pollers, because we // may be running inside a poller ourselves, and so in the middle of // iterating reactor::_pollers itself. So we schedule a task to add // the poller instead. - auto task = std::make_unique(this); - auto tmp = task.get(); - engine().add_task(std::move(task)); - _registration_task = tmp; + auto task = new registration_task(this); + engine().add_task(task); + _registration_task = task; } reactor::poller::~poller() { @@ -2878,8 +2876,8 @@ reactor::poller::~poller() { } else { auto dummy = make_pollfn([] { return false; }); auto dummy_p = dummy.get(); - auto task = std::make_unique(std::move(dummy)); - engine().add_task(std::move(task)); + auto task = new deregistration_task(std::move(dummy)); + engine().add_task(task); engine().replace_poller(_pollfn.get(), dummy_p); } } @@ -3129,12 +3127,12 @@ future readable_eventfd::wait() { }); } -void schedule(std::unique_ptr&& t) noexcept { - engine().add_task(std::move(t)); +void schedule(task* t) noexcept { + engine().add_task(t); } -void schedule_urgent(std::unique_ptr&& t) noexcept { - engine().add_urgent_task(std::move(t)); +void schedule_urgent(task* t) noexcept { + engine().add_urgent_task(t); } } @@ -3928,7 +3926,7 @@ void report_failed_future(const future_state_base& state) noexcept { broken_promise::broken_promise() : logic_error("broken promise") { } promise_base::promise_base(promise_base&& x) noexcept - : _future(x._future), _state(x._state), _task(std::move(x._task)) { + : _future(x._future), _state(x._state), _task(std::exchange(x._task, nullptr)) { x._state = nullptr; if (auto* fut = _future) { fut->detach_promise(); @@ -3944,7 +3942,7 @@ promise_base::~promise_base() noexcept { } else if (__builtin_expect(bool(_task), false)) { assert(_state && !_state->available()); _state->set_to_broken_promise(); - ::seastar::schedule(std::move(_task)); + ::seastar::schedule(std::exchange(_task, nullptr)); } } @@ -3953,9 +3951,9 @@ void promise_base::make_ready() noexcept { if (_task) { _state = nullptr; if (Urgent == urgent::yes && !need_preempt()) { - ::seastar::schedule_urgent(std::move(_task)); + ::seastar::schedule_urgent(std::exchange(_task, nullptr)); } else { - ::seastar::schedule(std::move(_task)); + ::seastar::schedule(std::exchange(_task, nullptr)); } } } @@ -4156,8 +4154,8 @@ future connect(socket_address sa, socket_address local, transp return engine().connect(sa, local, proto); } -void reactor::add_high_priority_task(std::unique_ptr&& t) { - add_urgent_task(std::move(t)); +void reactor::add_high_priority_task(task* t) noexcept { + add_urgent_task(t); // break .then() chains request_preemption(); } diff --git a/src/core/thread.cc b/src/core/thread.cc index da961404fa6..63761f1858f 100644 --- a/src/core/thread.cc +++ b/src/core/thread.cc @@ -240,13 +240,13 @@ thread_context::run_and_dispose() noexcept { void thread_context::yield() { - schedule(std::unique_ptr(this)); + schedule(this); switch_out(); } void thread_context::reschedule() { - schedule(std::unique_ptr(this)); + schedule(this); } void