Skip to content

Commit

Permalink
task: resumable tasks
Browse files Browse the repository at this point in the history
Currently, a task object is single use. After calling task::run(), the
reactor deletes the task, so it cannot be re-entered.

This patch lays the groundwork to re-entering tasks that were already
run once by moving the deletion of the task from the reactor to task::run()
itself.  Normally, a task will delete itself after running, but it may
also choose not to do so, and it can then schedule itself again.

To avoid confusion, task::run() is renamed to task::run_and_dispose().

Good candidates for resumable tasks are looping primitives like repeat()
and C++ coroutines.
  • Loading branch information
avikivity committed Dec 7, 2017
1 parent 125831e commit 4d811de
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 7 deletions.
3 changes: 2 additions & 1 deletion core/future.hh
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,9 @@ template <typename Func, typename... T>
struct continuation final : task {
continuation(Func&& func, future_state<T...>&& state) : _state(std::move(state)), _func(std::move(func)) {}
continuation(Func&& func) : _func(std::move(func)) {}
virtual void run() noexcept override {
virtual void run_and_dispose() noexcept override {
_func(std::move(_state));
delete this;
}
future_state<T...> _state;
Func _func;
Expand Down
10 changes: 6 additions & 4 deletions core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2428,8 +2428,8 @@ void reactor::run_tasks(task_queue& tq) {
auto tsk = std::move(tasks.front());
tasks.pop_front();
STAP_PROBE(seastar, reactor_run_tasks_single_start);
tsk->run();
tsk.reset();
tsk->run_and_dispose();
tsk.release();
STAP_PROBE(seastar, reactor_run_tasks_single_end);
++tq._tasks_processed;
// check at end of loop, to allow at least one task to run
Expand Down Expand Up @@ -3102,11 +3102,12 @@ class reactor::poller::registration_task : public task {
poller* _p;
public:
explicit registration_task(poller* p) : _p(p) {}
virtual void run() noexcept override {
virtual void run_and_dispose() noexcept override {
if (_p) {
engine().register_poller(_p->_pollfn.get());
_p->_registration_task = nullptr;
}
delete this;
}
void cancel() {
_p = nullptr;
Expand All @@ -3121,8 +3122,9 @@ class reactor::poller::deregistration_task : public task {
std::unique_ptr<pollfn> _p;
public:
explicit deregistration_task(std::unique_ptr<pollfn>&& p) : _p(std::move(p)) {}
virtual void run() noexcept override {
virtual void run_and_dispose() noexcept override {
engine().unregister_poller(_p.get());
delete this;
}
};

Expand Down
7 changes: 5 additions & 2 deletions core/task.hh
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class task {
public:
explicit task(scheduling_group sg = current_scheduling_group()) : _sg(sg) {}
virtual ~task() noexcept {}
virtual void run() noexcept = 0;
virtual void run_and_dispose() noexcept = 0;
scheduling_group group() const { return _sg; }
};

Expand All @@ -44,7 +44,10 @@ class lambda_task final : public task {
public:
lambda_task(scheduling_group sg, const Func& func) : task(sg), _func(func) {}
lambda_task(scheduling_group sg, Func&& func) : task(sg), _func(std::move(func)) {}
virtual void run() noexcept override { _func(); }
virtual void run_and_dispose() noexcept override {
_func();
delete this;
}
};

template <typename Func>
Expand Down

0 comments on commit 4d811de

Please sign in to comment.