diff --git a/.clang-format b/.clang-format index c01db5e..b9f94b8 100644 --- a/.clang-format +++ b/.clang-format @@ -6,3 +6,4 @@ AllowShortIfStatementsOnASingleLine: false AllowShortLoopsOnASingleLine: false SortIncludes: false IndentPPDirectives: BeforeHash +RemoveBracesLLVM: true diff --git a/include/kelcoro/executor_interface.hpp b/include/kelcoro/executor_interface.hpp index ce5b95b..afae838 100644 --- a/include/kelcoro/executor_interface.hpp +++ b/include/kelcoro/executor_interface.hpp @@ -27,6 +27,14 @@ struct task_node { task_node* next = nullptr; std::coroutine_handle<> task = nullptr; schedule_errc status = schedule_errc::ok; + + static task_node deadpill() { + return task_node{.next = nullptr, .task = nullptr}; + } + + bool is_deadpill() const noexcept { + return task == nullptr; + } }; template diff --git a/include/kelcoro/noexport/fixed_array.hpp b/include/kelcoro/noexport/fixed_array.hpp new file mode 100644 index 0000000..5641054 --- /dev/null +++ b/include/kelcoro/noexport/fixed_array.hpp @@ -0,0 +1,91 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace dd::noexport { + +template +struct fixed_array { + private: + T* arr = nullptr; + size_t n = 0; + std::pmr::memory_resource* resource = std::pmr::new_delete_resource(); + + public: + static_assert(std::is_same_v>); + fixed_array() = default; + fixed_array(fixed_array&& rhs) noexcept { + *this = std::move(rhs); + } + fixed_array& operator=(fixed_array&& rhs) noexcept { + std::swap(arr, rhs.arr); + std::swap(n, rhs.n); + std::swap(resource, rhs.resource); + return *this; + } + fixed_array(size_t n, std::pmr::memory_resource& resource = *std::pmr::new_delete_resource()) : n(n) { + if (n == 0) + return; + arr = (T*)resource.allocate(sizeof(T) * n, alignof(T)); + std::uninitialized_default_construct_n(arr, n); + } + + std::pmr::memory_resource* get_resource() const { + assert(resource); + return resource; + } + + T* data() { + return arr; + } + + T* data() const { + return arr; + } + + size_t size() const noexcept { + return n; + } + + T* begin() noexcept { + return arr; + } + + T* end() noexcept { + return arr + n; + } + + const T* begin() const noexcept { + return arr; + } + + const T* end() const noexcept { + return arr + n; + } + + T& operator[](size_t i) noexcept { + assert(arr); + assert(i < n); + return arr[i]; + } + + const T& operator[](size_t i) const noexcept { + assert(arr); + assert(i < n); + return arr[i]; + } + + ~fixed_array() { + if (arr) { + std::destroy_n(arr, n); + // assume noexcept + resource->deallocate(arr, sizeof(T) * n, alignof(T)); + } + } +}; + +} // namespace dd::noexport diff --git a/include/kelcoro/operation_hash.hpp b/include/kelcoro/operation_hash.hpp index bb7afe5..e71bc98 100644 --- a/include/kelcoro/operation_hash.hpp +++ b/include/kelcoro/operation_hash.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include "noexport/macro.hpp" diff --git a/include/kelcoro/thread_pool.hpp b/include/kelcoro/thread_pool.hpp index e09615a..7c39302 100644 --- a/include/kelcoro/thread_pool.hpp +++ b/include/kelcoro/thread_pool.hpp @@ -1,29 +1,32 @@ #pragma once +#include #include #include #include #include "job.hpp" #include "executor_interface.hpp" +#include "kelcoro/common.hpp" +#include "memory_support.hpp" +#include "noexport/fixed_array.hpp" +#include "noexport/macro.hpp" #include "noexport/thread_pool_monitoring.hpp" -#include "common.hpp" namespace dd::noexport { -static auto cancel_tasks(task_node* top) noexcept { +static void cancel_tasks(task_node* top) noexcept { // set status and invoke tasks once, - // assume after getting 'cancelled' status task do not produce new tasks - KELCORO_MONITORING(size_t count = 0;) + // assume after getting 'cancelled' status task do not produce new task while (top) { + // if this task is the final one, then the work must be completed. + // after resume top deleted std::coroutine_handle task = top->task; - assert(task && "dead pill must be consumed by worker"); top->status = schedule_errc::cancelled; top = top->next; - task.resume(); - KELCORO_MONITORING(++count); + if (task) + task.resume(); } - KELCORO_MONITORING(return count); } } // namespace dd::noexport @@ -46,12 +49,19 @@ struct task_queue { // precondition: node != nullptr && node is not contained in queue void push(task_node* node) { node->next = nullptr; + push_list(node, node); + } + + // attach a whole linked list + void push_list(task_node* first_, task_node* last_) { + assert(first_ && last_); std::lock_guard l{mtx}; if (first) { - last->next = node; - last = node; + last->next = first_; + last = last_; } else { - first = last = node; + first = first_; + last = last_; // notify under lock, because of loop in 'pop_all_not_empty' // while (!first) |..missed notify..| wait // this may block worker thread forever if no one notify after that @@ -59,6 +69,15 @@ struct task_queue { } } + void push_list(task_node* first_) { + if (!first_) + return; + auto last_ = first_; + while (last_->next) + last_ = last_->next; + push_list(first_, last_); + } + [[nodiscard]] task_node* pop_all() { task_node* tasks; { @@ -81,8 +100,48 @@ struct task_queue { assert(!!nodes); return nodes; } + + // precondition: node && node->task + // executor interface + void attach(task_node* node) noexcept { + assert(node && node->task); + push(node); + } +}; + +static_assert(executor); + +namespace noexport { + +// We push the deadpill, and behind it ourselves, +// in a chain, so that the deadpill is definitely alive. +struct deadpill_pusher { + task_queue& queue; + + // put deadpill on this awater and push them together. + task_node pill = task_node::deadpill(); + task_node this_node; + + static bool await_ready() noexcept { + return false; + } + void await_suspend(std::coroutine_handle<> handle) { + pill.next = &this_node; + this_node.task = handle; + // avoiding race between tasks + queue.push_list(&pill, &this_node); + } + void await_resume() noexcept { + assert(this_node.status == schedule_errc::cancelled); + } }; +inline dd::job push_deadpill(task_queue& queue) { + co_await deadpill_pusher{queue}; +} + +} // namespace noexport + // schedules execution of 'foo' to executor 'e' [[maybe_unused]] job schedule_to(auto& e KELCORO_LIFETIMEBOUND, auto foo) { if (!co_await jump_on(e)) [[unlikely]] @@ -96,43 +155,55 @@ template co_return; foo(); } + +void default_worker_job(task_queue& queue) noexcept; + // executes tasks on one thread, not movable // expensive to create struct worker { private: task_queue queue; - KELCORO_MONITORING(monitoring_t mon); std::thread thread; friend struct strand; friend struct thread_pool; - static void worker_job(worker* w) noexcept; public: - worker() : queue(), thread(&worker_job, this) { + /* + The function must execute tasks from queue until got `task_node::deadpill` + + and cancel all pending tasks after. + + Cancel task == (resume with status `schedule_errc::cancelled`). + + Example impl: `default_worker_job`. + */ + using job_t = void (*)(task_queue&); + + worker(job_t job = default_worker_job) : queue(), thread(job, std::ref(queue)) { } worker(worker&&) = delete; void operator=(worker&&) = delete; ~worker() { - if (!thread.joinable()) + if (!thread.joinable()) { + noexport::cancel_tasks(queue.pop_all()); return; - task_node pill{.next = nullptr, .task = nullptr}; + } + auto pill = task_node::deadpill(); queue.push(&pill); thread.join(); noexport::cancel_tasks(queue.pop_all()); } // use co_await jump_on(worker) to schedule coroutine - // precondition: node && node->task && node.status == ok + // precondition: node && node->task + // executor interface void attach(task_node* node) noexcept { - assert(node && node->task && node->status == schedule_errc::ok); + assert(node && node->task); queue.push(node); - KELCORO_MONITORING_INC(mon.pushed); } - KELCORO_MONITORING(const monitoring_t& get_moniroting() const noexcept { return mon; }) - std::thread::id get_id() const noexcept { return thread.get_id(); } @@ -152,19 +223,20 @@ struct worker { struct strand { private: // invariant: != nullptr, ptr for trivial copy/move - worker* w = nullptr; + task_queue* q = nullptr; public: - explicit strand(worker& wo KELCORO_LIFETIMEBOUND) : w(&wo) { - KELCORO_MONITORING_INC(w->mon.strands_count); + explicit strand(worker& wo KELCORO_LIFETIMEBOUND) : q(&wo.queue) { + } + explicit strand(task_queue& qu KELCORO_LIFETIMEBOUND) : q(&qu) { } // use co_await jump_on(strand) to schedule coroutine void attach(task_node* node) const noexcept { - w->attach(node); + q->attach(node); } - worker& get_worker() const noexcept { - return *w; + task_queue& get_queue() const noexcept { + return *q; } }; @@ -173,71 +245,109 @@ struct strand { // note: when thread pool dies, all pending tasks invoked with schedule_errc::cancelled struct thread_pool { private: - worker* workers; // invariant: != 0 - size_t workers_size; // invariant: > 0 - std::pmr::memory_resource* resource; // invariant: != 0 - + noexport::fixed_array queues; // invariant: queues.size() == threads.size() + noexport::fixed_array threads; // invariant: size > 0 public: static size_t default_thread_count() { unsigned c = std::thread::hardware_concurrency(); return c < 2 ? 1 : c - 1; } - explicit thread_pool(size_t thread_count = default_thread_count(), - std::pmr::memory_resource* r = std::pmr::get_default_resource()); + explicit thread_pool(size_t thread_count = default_thread_count(), worker::job_t job = default_worker_job, + std::pmr::memory_resource& r = *std::pmr::get_default_resource()) + : queues(std::max(1, thread_count), r), threads(std::max(1, thread_count), r) { + bool failed = true; + + scope_exit _{[&]() { + if (!failed) + return; + task_node pill = task_node::deadpill(); + for (size_t i = 0; i < threads.size(); i++) { + if (!threads[i].joinable()) + break; + queues[i].push(&pill); + threads[i].join(); + } + }}; + for (size_t i = 0; i < threads.size(); i++) + threads[i] = std::thread(job, std::ref(queues[i])); + failed = false; + } ~thread_pool() { - // if destructor started, then it is undefined behavior to push tasks - // because its data race (between destruction and accessing to 'this' for scheduling new task) - // - // But there is special case - workers, which may invoke task, which schedules next task - // in this case, .stop waits for workers consume dead pill, grab all operations from all queues - // and cancel them (resume with special errc) - // So, no one pushes and all what was pushed by tasks executed on workers is cancelled, - // no memory leak, profit! - stop(workers, workers_size); + // You can't just reset it because the queue is inside the worker, + // and in that case, while the stop was + // going on, someone could push into the dead queue. + task_node pill = task_node::deadpill(); + for (size_t i = 0; i < threads.size(); i++) { + if (threads[i].joinable()) { + queues[i].push(&pill); + threads[i].join(); + } + } + for (auto& q : queues) + noexport::cancel_tasks(q.pop_all()); } thread_pool(thread_pool&&) = delete; void operator=(thread_pool&&) = delete; // use co_await jump_on(pool) to schedule coroutine - void attach(task_node* node) noexcept { - worker& w = select_worker(calculate_operation_hash(node->task)); - w.attach(node); + task_queue& q = select_queue(calculate_operation_hash(node->task)); + q.attach(node); + } + + [[nodiscard]] bool is_worker(std::thread::id id = std::this_thread::get_id()) { + for (auto& t : threads) + if (t.get_id() == id) + return true; + return false; } // same as schedule_to(pool), but uses pool memory resource to allocate tasks void schedule(std::invocable auto&& foo, operation_hash_t hash) { - worker& w = select_worker(hash); - schedule_to(w, std::forward(foo), with_resource{*resource}); + task_queue& w = select_queue(hash); + schedule_to(w, std::forward(foo), with_resource{*queues.get_resource()}); } void schedule(std::invocable auto&& foo) { schedule(std::forward(foo), calculate_operation_hash(foo)); } - KELCORO_PURE std::span workers_range() noexcept KELCORO_LIFETIMEBOUND { - return std::span(workers, workers_size); + KELCORO_PURE std::span workers_range() noexcept KELCORO_LIFETIMEBOUND { + return std::span(threads.data(), threads.size()); } - KELCORO_PURE std::span workers_range() const noexcept KELCORO_LIFETIMEBOUND { - return std::span(workers, workers_size); + KELCORO_PURE std::span workers_range() const noexcept KELCORO_LIFETIMEBOUND { + return std::span(threads.data(), threads.size()); } strand get_strand(operation_hash_t op_hash) KELCORO_LIFETIMEBOUND { - return strand(select_worker(op_hash)); + return strand(select_queue(op_hash)); } - worker& select_worker(operation_hash_t op_hash) noexcept KELCORO_LIFETIMEBOUND { - return workers[op_hash % workers_size]; + task_queue& select_queue(operation_hash_t op_hash) noexcept KELCORO_LIFETIMEBOUND { + return queues[op_hash % queues.size()]; } std::pmr::memory_resource& get_resource() const noexcept { - assert(resource); - return *resource; + return *queues.get_resource(); } - private: - // should be called exactly once - void stop(worker* w, size_t count) noexcept; + // can be called as many times as you want from any threads + void request_stop() { + for (auto& q : queues) + noexport::push_deadpill(q); + } + + // NOTE: can't be called from workers + // NOTE: can't be called more than once + // Wait for the job to complete (after calling `request_stop`) + void wait_stop() && { + assert(!is_worker()); + for (auto& t : threads) + if (t.joinable()) + t.join(); + for (auto& q : queues) + noexport::cancel_tasks(q.pop_all()); + } }; // specialization for thread pool uses hash to maximize parallel execution @@ -245,7 +355,7 @@ inline void attach_list(thread_pool& e, task_node* top) { operation_hash_t hash = noexport::do_hash(top); while (top) { task_node* next = top->next; - e.select_worker(hash).attach(top); + e.select_queue(hash).attach(top); ++hash; top = next; } @@ -264,23 +374,20 @@ struct jump_on_thread_pool : private create_node_and_attach { void await_suspend(std::coroutine_handle

handle) noexcept { // set task before it is attached task = handle; - worker& w = e.select_worker(calculate_operation_hash(task)); - w.attach(this); + task_queue& q = e.select_queue(calculate_operation_hash(task)); + q.attach(this); } }; -KELCORO_CO_AWAIT_REQUIRED inline co_awaiter auto jump_on(thread_pool& tp KELCORO_LIFETIMEBOUND) noexcept { +KELCORO_CO_AWAIT_REQUIRED inline jump_on_thread_pool jump_on(thread_pool& tp KELCORO_LIFETIMEBOUND) noexcept { return jump_on_thread_pool(tp); } -inline void worker::worker_job(worker* w) noexcept { - assert(w); +inline void default_worker_job(task_queue& queue) noexcept { task_node* top; std::coroutine_handle task; - task_queue* queue = &w->queue; for (;;) { - top = queue->pop_all_not_empty(); - KELCORO_MONITORING_INC(w->mon.pop_count); + top = queue.pop_all_not_empty(); assert(top); do { // grab task from memory which will be invalidated after task.resume() @@ -291,52 +398,11 @@ inline void worker::worker_job(worker* w) noexcept { goto work_end; // dead pill // if exception thrown, std::terminate called task.resume(); - KELCORO_MONITORING_INC(w->mon.finished); } while (top); } work_end: // after this point .stop in thread pool cancels all pending tasks in queues for all workers - KELCORO_MONITORING(w->mon.cancelled +=) noexport::cancel_tasks(top); -} - -inline thread_pool::thread_pool(size_t thread_count, std::pmr::memory_resource* r) { - resource = r ? r : std::pmr::new_delete_resource(); - workers_size = std::max(1, thread_count); - workers = (worker*)r->allocate(sizeof(worker) * workers_size, alignof(worker)); - - bool exception = true; - size_t i = 0; - scope_exit _([&] { - if (exception) - stop(workers, i); - }); - for (; i < workers_size; ++i) { - worker* ptr = &workers[i]; - new (ptr) worker; - } - exception = false; -} - -inline void thread_pool::stop(worker* w, size_t count) noexcept { - task_node pill{.next = nullptr, .task = nullptr}; - std::span workers(w, count); - for (auto& w : workers) - w.queue.push(&pill); - for (auto& w : workers) { - assert(w.thread.joinable()); - w.thread.join(); - } - // here all workers stopped, cancel tasks - for (auto& w : workers) { - KELCORO_MONITORING(w.mon.cancelled +=) noexport::cancel_tasks(w.queue.pop_all()); - } -#ifdef KELCORO_ENABLE_THREADPOOL_MONITORING - monitorings.clear(); - for (auto& w : workers) - monitorings.emplace_back(w.get_moniroting()); -#endif - std::destroy(begin(workers), end(workers)); - resource->deallocate(this->workers, sizeof(worker) * workers_size, alignof(worker)); + noexport::cancel_tasks(top); } } // namespace dd diff --git a/tests/test_thread_pool.cpp b/tests/test_thread_pool.cpp index 319edaf..0161974 100644 --- a/tests/test_thread_pool.cpp +++ b/tests/test_thread_pool.cpp @@ -1,10 +1,13 @@ #include "kelcoro/async_task.hpp" +#include "kelcoro/common.hpp" #include "kelcoro/task.hpp" #include "kelcoro/thread_pool.hpp" #include "kelcoro/latch.hpp" +#include +#include #include -#include +#include #include static_assert(dd::executor && dd::executor && @@ -37,9 +40,8 @@ TEST(latch) { --counter; }; std::vector> test_tasks; - for (int i = 0; i < task_count; ++i) { + for (int i = 0; i < task_count; ++i) test_tasks.emplace_back(create_test()); - } for (auto& t : test_tasks) t.wait(); error_if(counter != 0); @@ -69,9 +71,9 @@ TEST(thread_pool) { dd::latch start(COUNT, p); std::span workers = p.workers_range(); for (int i = 0; i < workers.size(); ++i) { - (void)pin_thread_to_cpu_core(workers[i].get_thread(), i); + (void)pin_thread_to_cpu_core(workers[i], i); std::string name = "number " + std::to_string(i); - (void)set_thread_name(workers[i].get_thread(), name.c_str()); + (void)set_thread_name(workers[i], name.c_str()); } for (int ind = 0; ind < COUNT; ++ind) { foo(p, start, i, l); @@ -123,17 +125,50 @@ TEST(latch_waiters) { return error_count; } +TEST(request_stop) { + auto tsize = dd::thread_pool::default_thread_count(); + thread_local std::minstd_rand rand(42); + for (size_t i = 0; i <= 1000; i++) { + dd::thread_pool pool; + std::vector> tasks; + std::atomic_int counter{0}; + std::vector cancelled(tsize); + std::vector request_stop_on(tsize); + + for (auto& i : request_stop_on) + i.store(RAND_MAX); + auto maker_task = [&](size_t idx) -> dd::async_task { + auto distr = std::uniform_int_distribution(0, i); + request_stop_on[idx] = distr(rand); + while (co_await dd::jump_on(pool)) + if (counter.fetch_add(1) >= request_stop_on[idx]) + pool.request_stop(); + cancelled[idx] = true; + }; + for (size_t i = 0; i < tsize; i++) + tasks.push_back(maker_task(i)); + std::move(pool).wait_stop(); + for (size_t i = 0; i < tsize; i++) { + error_if(!tasks[i].ready()); + error_if(!cancelled[i]); + } + } + + return error_count; +} + int main() { size_t ec = 0; ec += test_latch(); ec += test_thread_pool(); ec += test_latch_waiters(); + ec += test_request_stop(); return ec; } #ifdef _WIN32 -#include + #include #elif defined(__unix__) -#include + #include #else #endif