From 7767406bf5a550a115d810232f7bf4344daa147a Mon Sep 17 00:00:00 2001 From: Ali Saeed Date: Fri, 13 Jan 2023 22:01:30 +0000 Subject: [PATCH] pw_async: Add module with Task and basic & test Dispatchers Land module with experimental flag limiting visibility. Change-Id: I5652fcb5dbebfc03dbc0bcafb20b2462e3dfea5b Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/110454 Reviewed-by: Erik Gilling Commit-Queue: Ali Saeed --- PIGWEED_MODULES | 1 + pw_async/BUILD.bazel | 27 +++ pw_async/BUILD.gn | 99 +++++++++++ pw_async/README.md | 0 pw_async/async.gni | 20 +++ pw_async/dispatcher_basic.cc | 140 ++++++++++++++++ pw_async/dispatcher_basic_test.cc | 126 ++++++++++++++ pw_async/dispatcher_test.cc | 157 ++++++++++++++++++ pw_async/docs.rst | 5 + pw_async/public/pw_async/dispatcher.h | 70 ++++++++ pw_async/public/pw_async/dispatcher_basic.h | 151 +++++++++++++++++ pw_async/public/pw_async/task.h | 60 +++++++ .../public_test/pw_async/test_dispatcher.h | 85 ++++++++++ pw_async/test_dispatcher.cc | 111 +++++++++++++ pw_build/generated_pigweed_modules_lists.gni | 4 + 15 files changed, 1056 insertions(+) create mode 100644 pw_async/BUILD.bazel create mode 100644 pw_async/BUILD.gn create mode 100644 pw_async/README.md create mode 100644 pw_async/async.gni create mode 100644 pw_async/dispatcher_basic.cc create mode 100644 pw_async/dispatcher_basic_test.cc create mode 100644 pw_async/dispatcher_test.cc create mode 100644 pw_async/docs.rst create mode 100644 pw_async/public/pw_async/dispatcher.h create mode 100644 pw_async/public/pw_async/dispatcher_basic.h create mode 100644 pw_async/public/pw_async/task.h create mode 100644 pw_async/public_test/pw_async/test_dispatcher.h create mode 100644 pw_async/test_dispatcher.cc diff --git a/PIGWEED_MODULES b/PIGWEED_MODULES index f856b01a2a..34de8ac7c5 100644 --- a/PIGWEED_MODULES +++ b/PIGWEED_MODULES @@ -8,6 +8,7 @@ pw_assert_basic pw_assert_log pw_assert_tokenized pw_assert_zephyr +pw_async pw_base64 pw_bloat pw_blob_store diff --git a/pw_async/BUILD.bazel b/pw_async/BUILD.bazel new file mode 100644 index 0000000000..99cf0ba7b9 --- /dev/null +++ b/pw_async/BUILD.bazel @@ -0,0 +1,27 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +filegroup( + name = "pw_async_files", + srcs = [ + "dispatcher_basic.cc", + "dispatcher_basic_test.cc", + "dispatcher_test.cc", + "public/pw_async/dispatcher.h", + "public/pw_async/dispatcher_basic.h", + "public/pw_async/task.h", + "public_test/pw_async/test_dispatcher.h", + "test_dispatcher.cc", + ], +) diff --git a/pw_async/BUILD.gn b/pw_async/BUILD.gn new file mode 100644 index 0000000000..42f1dc3673 --- /dev/null +++ b/pw_async/BUILD.gn @@ -0,0 +1,99 @@ +# Copyright 2022 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +import("//build_overrides/pigweed.gni") + +import("$dir_pw_async/async.gni") +import("$dir_pw_chrono/backend.gni") +import("$dir_pw_docgen/docs.gni") +import("$dir_pw_thread/backend.gni") +import("$dir_pw_unit_test/test.gni") + +config("public_include_path") { + include_dirs = [ "public" ] +} + +config("public_test_include_path") { + include_dirs = [ "public_test" ] +} + +pw_source_set("task") { + public_configs = [ ":public_include_path" ] + public_deps = [ + "$dir_pw_chrono:system_clock", + "$dir_pw_containers:intrusive_list", + dir_pw_function, + ] + public = [ "public/pw_async/task.h" ] + visibility = [ ":*" ] + pw_async_EXPERIMENTAL_MODULE_VISIBILITY +} + +pw_source_set("dispatcher") { + public_configs = [ ":public_include_path" ] + public_deps = [ ":task" ] + public = [ "public/pw_async/dispatcher.h" ] + visibility = [ ":*" ] + pw_async_EXPERIMENTAL_MODULE_VISIBILITY +} + +pw_source_set("test_dispatcher") { + public_configs = [ ":public_test_include_path" ] + public_deps = [ + ":pw_dispatcher_basic", + dir_pw_log, + ] + public = [ "public_test/pw_async/test_dispatcher.h" ] + sources = [ "test_dispatcher.cc" ] + visibility = [ ":*" ] + pw_async_EXPERIMENTAL_MODULE_VISIBILITY +} + +pw_source_set("pw_dispatcher_basic") { + public_configs = [ ":public_include_path" ] + public = [ "public/pw_async/dispatcher_basic.h" ] + public_deps = [ + ":dispatcher", + "$dir_pw_sync:interrupt_spin_lock", + "$dir_pw_sync:timed_thread_notification", + "$dir_pw_thread:thread", + dir_pw_log, + ] + sources = [ "dispatcher_basic.cc" ] + visibility = [ ":*" ] + pw_async_EXPERIMENTAL_MODULE_VISIBILITY +} + +pw_test("dispatcher_test") { + enable_if = pw_chrono_SYSTEM_CLOCK_BACKEND != "" + deps = [ ":test_dispatcher" ] + sources = [ "dispatcher_test.cc" ] +} + +pw_test("dispatcher_basic_test") { + enable_if = pw_chrono_SYSTEM_CLOCK_BACKEND != "" && + pw_thread_THREAD_BACKEND == "$dir_pw_thread_stl:thread" + public_deps = [ + ":pw_dispatcher_basic", + dir_pw_log, + ] + sources = [ "dispatcher_basic_test.cc" ] +} + +pw_test_group("tests") { + tests = [ + ":dispatcher_test", + ":dispatcher_basic_test", + ] +} + +pw_doc_group("docs") { + sources = [ "docs.rst" ] +} diff --git a/pw_async/README.md b/pw_async/README.md new file mode 100644 index 0000000000..e69de29bb2 diff --git a/pw_async/async.gni b/pw_async/async.gni new file mode 100644 index 0000000000..1420c96c70 --- /dev/null +++ b/pw_async/async.gni @@ -0,0 +1,20 @@ +# Copyright 2022 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +declare_args() { + # To depend on pw_async, add targets to this list. + # + # WARNING: This is experimental and *not* guaranteed to work. + pw_async_EXPERIMENTAL_MODULE_VISIBILITY = [] +} diff --git a/pw_async/dispatcher_basic.cc b/pw_async/dispatcher_basic.cc new file mode 100644 index 0000000000..03394fec4d --- /dev/null +++ b/pw_async/dispatcher_basic.cc @@ -0,0 +1,140 @@ +// Copyright 2022 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#include "pw_async/dispatcher_basic.h" + +#include "pw_log/log.h" + +using namespace std::chrono_literals; + +namespace pw::async { + +const chrono::SystemClock::duration SLEEP_DURATION = 5s; + +void BasicDispatcher::Run() { + lock_.lock(); + while (!stop_requested_) { + RunLoopOnce(); + } + lock_.unlock(); +} + +void BasicDispatcher::RunUntilIdle() { + lock_.lock(); + while (!task_queue_.empty()) { + RunLoopOnce(); + } + lock_.unlock(); +} + +void BasicDispatcher::RunUntil(chrono::SystemClock::time_point end_time) { + lock_.lock(); + while (end_time < Now()) { + RunLoopOnce(); + } + lock_.unlock(); +} + +void BasicDispatcher::RunFor(chrono::SystemClock::duration duration) { + RunUntil(Now() + duration); +} + +void BasicDispatcher::RunLoopOnce() { + if (task_queue_.empty() || DueTime(task_queue_.front()) > Now()) { + // Sleep until a notification is received or until the due time of the + // next task. Notifications are sent when tasks are posted or 'stop' is + // requested. + chrono::SystemClock::time_point wake_time = + task_queue_.empty() ? Now() + SLEEP_DURATION + : DueTime(task_queue_.front()); + + lock_.unlock(); + PW_LOG_DEBUG("no task due; waiting for signal"); + timed_notification_.try_acquire_until(wake_time); + lock_.lock(); + + return; + } + + while (!task_queue_.empty() && DueTime(task_queue_.front()) <= Now()) { + Task& task = task_queue_.front(); + task_queue_.pop_front(); + + if (IsPeriodic(task)) { + PostTaskInternal(task, DueTime(task) + SetInterval(task)); + } + + lock_.unlock(); + PW_LOG_DEBUG("running task"); + Context ctx{this, &task}; + task(ctx); + lock_.lock(); + } +} + +void BasicDispatcher::RequestStop() { + std::lock_guard lock(lock_); + PW_LOG_DEBUG("stop requested"); + stop_requested_ = true; + task_queue_.clear(); + timed_notification_.release(); +} + +void BasicDispatcher::PostTask(Task& task) { PostTaskForTime(task, Now()); } + +void BasicDispatcher::PostDelayedTask(Task& task, + chrono::SystemClock::duration delay) { + PostTaskForTime(task, Now() + delay); +} + +void BasicDispatcher::PostTaskForTime(Task& task, + chrono::SystemClock::time_point time) { + lock_.lock(); + PW_LOG_DEBUG("posting task"); + PostTaskInternal(task, time); + lock_.unlock(); +} + +void BasicDispatcher::SchedulePeriodicTask( + Task& task, chrono::SystemClock::duration interval) { + SchedulePeriodicTask(task, interval, Now()); +} + +void BasicDispatcher::SchedulePeriodicTask( + Task& task, + chrono::SystemClock::duration interval, + chrono::SystemClock::time_point start_time) { + SetInterval(task, interval); + PostTaskForTime(task, start_time); +} + +bool BasicDispatcher::Cancel(Task& task) { + std::lock_guard lock(lock_); + return task_queue_.remove(task); +} + +// Ensure lock_ is held when invoking this function. +void BasicDispatcher::PostTaskInternal( + Task& task, chrono::SystemClock::time_point time_due) { + SetDueTime(task, time_due); + auto it_front = task_queue_.begin(); + auto it_behind = task_queue_.before_begin(); + while (it_front != task_queue_.end() && time_due > DueTime(*it_front)) { + ++it_front; + ++it_behind; + } + task_queue_.insert_after(it_behind, task); + timed_notification_.release(); +} + +} // namespace pw::async diff --git a/pw_async/dispatcher_basic_test.cc b/pw_async/dispatcher_basic_test.cc new file mode 100644 index 0000000000..b3cff0f3fe --- /dev/null +++ b/pw_async/dispatcher_basic_test.cc @@ -0,0 +1,126 @@ +// Copyright 2022 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#include "pw_async/dispatcher_basic.h" + +#include "gtest/gtest.h" +#include "public/pw_async/dispatcher_basic.h" +#include "pw_log/log.h" +#include "pw_sync/thread_notification.h" +#include "pw_thread/thread.h" +#include "pw_thread_stl/options.h" + +using namespace std::chrono_literals; + +namespace pw::async { + +// Lambdas can only capture one ptr worth of memory without allocating, so we +// group the data we want to share between tasks and their containing tests +// inside one struct. +struct TestPrimitives { + int count = 0; + sync::ThreadNotification notification; +}; + +TEST(DispatcherBasic, PostTasks) { + BasicDispatcher dispatcher; + thread::Thread work_thread(thread::stl::Options(), dispatcher); + + TestPrimitives tp; + auto inc_count = [&tp]([[maybe_unused]] Context& c) { ++tp.count; }; + + Task task(inc_count); + dispatcher.PostTask(task); + + Task task2(inc_count); + dispatcher.PostTask(task2); + + Task task3([&tp]([[maybe_unused]] Context& c) { + ++tp.count; + tp.notification.release(); + }); + dispatcher.PostTask(task3); + + tp.notification.acquire(); + dispatcher.RequestStop(); + work_thread.join(); + + ASSERT_TRUE(tp.count == 3); +} + +struct TaskPair { + Task task_a; + Task task_b; + int count = 0; + sync::ThreadNotification notification; +}; + +TEST(DispatcherBasic, ChainedTasks) { + BasicDispatcher dispatcher; + thread::Thread work_thread(thread::stl::Options(), dispatcher); + + TaskPair tp; + + Task task0([&tp](Context& c) { + ++tp.count; + + c.dispatcher->PostTask(tp.task_a); + }); + + tp.task_a.SetFunction([&tp](Context& c) { + ++tp.count; + + c.dispatcher->PostTask(tp.task_b); + }); + + tp.task_b.SetFunction([&tp]([[maybe_unused]] Context& c) { + ++tp.count; + tp.notification.release(); + }); + + dispatcher.PostTask(task0); + + tp.notification.acquire(); + dispatcher.RequestStop(); + work_thread.join(); + + ASSERT_TRUE(tp.count == 3); +} + +// Test RequestStop() from inside task. +TEST(DispatcherBasic, RequestStopInsideTask) { + BasicDispatcher dispatcher; + thread::Thread work_thread(thread::stl::Options(), dispatcher); + + TestPrimitives tp; + auto inc_count = [&tp]([[maybe_unused]] Context& c) { ++tp.count; }; + + // These tasks are never executed and cleaned up in RequestStop(). + Task task0(inc_count), task1(inc_count); + dispatcher.PostDelayedTask(task0, 20ms); + dispatcher.PostDelayedTask(task1, 21ms); + + Task stop_task([&tp]([[maybe_unused]] Context& c) { + ++tp.count; + c.dispatcher->RequestStop(); + tp.notification.release(); + }); + dispatcher.PostTask(stop_task); + + tp.notification.acquire(); + work_thread.join(); + + ASSERT_TRUE(tp.count == 1); +} + +} // namespace pw::async diff --git a/pw_async/dispatcher_test.cc b/pw_async/dispatcher_test.cc new file mode 100644 index 0000000000..b37a3aef60 --- /dev/null +++ b/pw_async/dispatcher_test.cc @@ -0,0 +1,157 @@ +// Copyright 2022 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#include "gtest/gtest.h" +#include "pw_async/test_dispatcher.h" +#include "pw_sync/thread_notification.h" +#include "pw_thread/thread.h" +#include "pw_thread_stl/options.h" + +using namespace std::chrono_literals; + +namespace pw::async { + +// Lambdas can only capture one ptr worth of memory without allocating, so we +// group the data we want to share between tasks and their containing tests +// inside one struct. +struct TestPrimitives { + int count = 0; + sync::ThreadNotification notification; +}; + +TEST(TestDispatcher, PostTasks) { + TestDispatcher dispatcher; + + TestPrimitives tp; + auto inc_count = [&tp]([[maybe_unused]] Context& c) { ++tp.count; }; + + Task task(inc_count); + dispatcher.PostTask(task); + + Task task2(inc_count); + dispatcher.PostTask(task2); + + Task task3([&tp]([[maybe_unused]] Context& c) { ++tp.count; }); + dispatcher.PostTask(task3); + + dispatcher.RunUntilIdle(); + dispatcher.RequestStop(); + + ASSERT_TRUE(tp.count == 3); +} + +struct TaskPair { + Task task_a; + Task task_b; + int count = 0; + sync::ThreadNotification notification; +}; + +TEST(TestDispatcher, DelayedTasks) { + TestDispatcher dispatcher; + TaskPair tp; + + Task task0( + [&tp]([[maybe_unused]] Context& c) { tp.count = tp.count * 10 + 4; }); + + dispatcher.PostDelayedTask(task0, 200ms); + + Task task1([&tp]([[maybe_unused]] Context& c) { + tp.count = tp.count * 10 + 1; + c.dispatcher->PostDelayedTask(tp.task_a, 50ms); + c.dispatcher->PostDelayedTask(tp.task_b, 25ms); + }); + + dispatcher.PostDelayedTask(task1, 100ms); + + tp.task_a.SetFunction( + [&tp]([[maybe_unused]] Context& c) { tp.count = tp.count * 10 + 3; }); + + tp.task_b.SetFunction( + [&tp]([[maybe_unused]] Context& c) { tp.count = tp.count * 10 + 2; }); + + dispatcher.RunUntilIdle(); + dispatcher.RequestStop(); + + ASSERT_TRUE(tp.count == 1234); +} + +TEST(TestDispatcher, CancelTasks) { + TestDispatcher dispatcher; + + TestPrimitives tp; + auto inc_count = [&tp]([[maybe_unused]] Context& c) { ++tp.count; }; + + // This task gets canceled in the last task. + Task task0(inc_count); + dispatcher.PostDelayedTask(task0, 40ms); + + // This task gets canceled immediately. + Task task1(inc_count); + dispatcher.PostDelayedTask(task1, 10ms); + ASSERT_TRUE(dispatcher.Cancel(task1)); + + // This task cancels the first task. + Task cancel_task( + [&task0](Context& c) { ASSERT_TRUE(c.dispatcher->Cancel(task0)); }); + dispatcher.PostDelayedTask(cancel_task, 20ms); + + dispatcher.RunUntilIdle(); + dispatcher.RequestStop(); + + ASSERT_TRUE(tp.count == 0); +} + +// Test RequestStop() from inside task. +TEST(TestDispatcher, RequestStopInsideTask) { + TestDispatcher dispatcher; + + TestPrimitives tp; + auto inc_count = [&tp]([[maybe_unused]] Context& c) { ++tp.count; }; + + // These tasks are never executed and cleaned up in RequestStop(). + Task task0(inc_count), task1(inc_count); + dispatcher.PostDelayedTask(task0, 20ms); + dispatcher.PostDelayedTask(task1, 21ms); + + Task stop_task([&tp]([[maybe_unused]] Context& c) { + ++tp.count; + c.dispatcher->RequestStop(); + }); + dispatcher.PostTask(stop_task); + + dispatcher.RunUntilIdle(); + + ASSERT_TRUE(tp.count == 1); +} + +TEST(TestDispatcher, PeriodicTasks) { + TestDispatcher dispatcher; + + TestPrimitives tp; + + Task periodic_task([&tp]([[maybe_unused]] Context& c) { ++tp.count; }); + dispatcher.SchedulePeriodicTask(periodic_task, 20ms, dispatcher.Now() + 50ms); + + // Cancel periodic task after it has run thrice, at +50ms, +70ms, and +90ms. + Task cancel_task( + [&periodic_task](Context& c) { c.dispatcher->Cancel(periodic_task); }); + dispatcher.PostDelayedTask(cancel_task, 100ms); + + dispatcher.RunUntilIdle(); + dispatcher.RequestStop(); + + ASSERT_TRUE(tp.count == 3); +} + +} // namespace pw::async diff --git a/pw_async/docs.rst b/pw_async/docs.rst new file mode 100644 index 0000000000..0949c23a0c --- /dev/null +++ b/pw_async/docs.rst @@ -0,0 +1,5 @@ +.. _module-pw_async: + +================ +pw_async +================ diff --git a/pw_async/public/pw_async/dispatcher.h b/pw_async/public/pw_async/dispatcher.h new file mode 100644 index 0000000000..df449200fb --- /dev/null +++ b/pw_async/public/pw_async/dispatcher.h @@ -0,0 +1,70 @@ +// Copyright 2022 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#pragma once + +#include "pw_async/task.h" + +namespace pw::async { + +// Asynchronous Dispatcher abstract class. A default implementation is provided +// in dispatcher_basic.h. +class Dispatcher { + public: + virtual ~Dispatcher() = default; + + virtual void RequestStop() = 0; + + // Returns the current time as viewed by the Dispatcher. + virtual chrono::SystemClock::time_point Now() = 0; + + // Post caller owned |task|. + virtual void PostTask(Task& task) = 0; + + // Post caller owned |task| to be run after |delay|. + virtual void PostDelayedTask(Task& task, + chrono::SystemClock::duration delay) = 0; + + // Post caller owned |task| to be run at |time|. + virtual void PostTaskForTime(Task& task, + chrono::SystemClock::time_point time) = 0; + + // Post caller owned |task| to be run immediately then rerun at a regular + // |interval|. + virtual void SchedulePeriodicTask(Task& task, + chrono::SystemClock::duration interval) = 0; + // Post caller owned |task| to be run at |start_time| then rerun at a regular + // |interval|. + virtual void SchedulePeriodicTask( + Task& task, + chrono::SystemClock::duration interval, + chrono::SystemClock::time_point start_time) = 0; + + // Returns true if |task| is succesfully canceled. + // If cancelation fails, the task may be running or completed. + // Periodic tasks may be posted once more after they are canceled. + virtual bool Cancel(Task& task) = 0; + + // Execute tasks until the Dispatcher enters a state where none are queued. + virtual void RunUntilIdle() = 0; + + // Run the Dispatcher until Now() has reached `end_time`, executing all tasks + // that come due before then. + virtual void RunUntil(chrono::SystemClock::time_point end_time) = 0; + + // Run the Dispatcher until `duration` has elapsed, executing all tasks that + // come due in that period. + virtual void RunFor(chrono::SystemClock::duration duration) = 0; +}; + +} // namespace pw::async diff --git a/pw_async/public/pw_async/dispatcher_basic.h b/pw_async/public/pw_async/dispatcher_basic.h new file mode 100644 index 0000000000..482990c20b --- /dev/null +++ b/pw_async/public/pw_async/dispatcher_basic.h @@ -0,0 +1,151 @@ +// Copyright 2022 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#pragma once + +#include "pw_async/dispatcher.h" +#include "pw_sync/interrupt_spin_lock.h" +#include "pw_sync/lock_annotations.h" +#include "pw_sync/timed_thread_notification.h" +#include "pw_thread/thread_core.h" + +namespace pw::async { + +class BasicDispatcher final : public Dispatcher, public thread::ThreadCore { + public: + explicit BasicDispatcher() : stop_requested_(false) {} + ~BasicDispatcher() override { RequestStop(); } + + void RequestStop() override PW_LOCKS_EXCLUDED(lock_); + + // Returns the current time as viewed by the BasicDispatcher. + chrono::SystemClock::time_point Now() override { + return chrono::SystemClock::now(); + } + + // Post caller owned |task|. + void PostTask(Task& task) override; + + // Post caller owned |task| to be run after |delay|. + void PostDelayedTask(Task& task, + chrono::SystemClock::duration delay) override; + + // Post caller owned |task| to be run at |time|. + void PostTaskForTime(Task& task, + chrono::SystemClock::time_point time) override; + + // Post caller owned |task| to be run immediately then rerun at a regular + // |interval|. + void SchedulePeriodicTask(Task& task, + chrono::SystemClock::duration interval) override; + // Post caller owned |task| to be run at |start_time| then rerun at a regular + // |interval|. + void SchedulePeriodicTask( + Task& task, + chrono::SystemClock::duration interval, + chrono::SystemClock::time_point start_time) override; + + // Returns true if |task| is succesfully canceled. + // If cancelation fails, the task may be running or completed. + // Periodic tasks may run once more after they are canceled. + bool Cancel(Task& task) override PW_LOCKS_EXCLUDED(lock_); + + // Execute tasks until the Dispatcher enters a state where none are queued. + void RunUntilIdle() override; + + // Run the Dispatcher until Now() has reached `end_time`, executing all tasks + // that come due before then. + void RunUntil(chrono::SystemClock::time_point end_time) override; + void RunFor(chrono::SystemClock::duration duration) override; + + private: + // TestDispatcher uses BasicDispatcher methods operating on Task state. + friend class TestDispatcher; + + void Run() override PW_LOCKS_EXCLUDED(lock_); + + // Insert |task| into task_queue_ maintaining its min-heap property, keyed by + // |time_due|. Must be holding lock_ when calling this function. + void PostTaskInternal(Task& task, chrono::SystemClock::time_point time_due) + PW_EXCLUSIVE_LOCKS_REQUIRED(lock_); + + // If no tasks are due, sleeps until a notification is received or until the + // due time of the next task. + // + // If at least one task is due, dequeues and runs each task that is due. + // + // Must be holding lock_ when calling this function. + void RunLoopOnce() PW_EXCLUSIVE_LOCKS_REQUIRED(lock_); + + // Below are several static methods that operate on Tasks to maintain per Task + // state as necessary for the operation of BasicDispatcher. + // + // BasicDispatcher reserves 3 contiguous fields at the beginning of the Task + // state memory block for these three data: + // + // DUE TIME | RECURRANCE INTERVAL FOR PERIODIC TASKS | PERIODICITY FLAG + // + // with types: + // + // chrono::SystemClock::time_point | chrono::SystemClock::duration | bool + // + // Thus, the offsets at which these fields are located in Task state memory: + static constexpr uint8_t kDueTimeOffset = 0; + static constexpr uint8_t kIntervalOffset = + sizeof(chrono::SystemClock::time_point); + static constexpr uint8_t kPeriodicityOffset = + kIntervalOffset + sizeof(chrono::SystemClock::duration); + + static chrono::SystemClock::time_point DueTime(Task& task) { + chrono::SystemClock::time_point time; + memcpy(static_cast(&time), + task.State() + kDueTimeOffset, + sizeof(chrono::SystemClock::time_point)); + return time; + } + static void SetDueTime(Task& task, chrono::SystemClock::time_point due_time) { + memcpy(task.State() + kDueTimeOffset, + &due_time, + sizeof(chrono::SystemClock::time_point)); + } + + static chrono::SystemClock::duration SetInterval(Task& task) { + chrono::SystemClock::duration duration; + memcpy(static_cast(&duration), + task.State() + kIntervalOffset, + sizeof(chrono::SystemClock::duration)); + return duration; + } + static void SetInterval(Task& task, chrono::SystemClock::duration interval) { + memcpy(task.State() + kIntervalOffset, + &interval, + sizeof(chrono::SystemClock::duration)); + // Set periodicity flag as well. + bool true_value = true; + memcpy(task.State() + kPeriodicityOffset, &true_value, sizeof(bool)); + } + + static bool IsPeriodic(Task& task) { + bool periodic; + memcpy(&periodic, task.State() + kPeriodicityOffset, sizeof(bool)); + return periodic; + } + + sync::InterruptSpinLock lock_; + sync::TimedThreadNotification timed_notification_; + bool stop_requested_ PW_GUARDED_BY(lock_); + // A priority queue of scheduled Tasks sorted by earliest due times first. + IntrusiveList task_queue_ PW_GUARDED_BY(lock_); +}; + +} // namespace pw::async diff --git a/pw_async/public/pw_async/task.h b/pw_async/public/pw_async/task.h new file mode 100644 index 0000000000..e88858743d --- /dev/null +++ b/pw_async/public/pw_async/task.h @@ -0,0 +1,60 @@ +// Copyright 2022 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#pragma once + +#include + +#include "pw_chrono/system_clock.h" +#include "pw_containers/intrusive_list.h" +#include "pw_function/function.h" + +namespace pw::async { + +constexpr size_t kTaskStateBytes = sizeof(void*) * 4; + +class Dispatcher; +class Task; + +// Task functions take a `Context` as their argument. Before executing a Task, +// the Dispatcher sets the pointer to itself and to the Task in `Context`. +struct Context { + Dispatcher* dispatcher; + Task* task; +}; + +// TODO(saeedali): Remove IntrusiveList from here. +class Task : public IntrusiveList::Item { + public: + using TaskFunction = Function; + + Task() { state_.fill(static_cast(0)); } + explicit Task(TaskFunction&& f) { + state_.fill(static_cast(0)); + SetFunction(std::move(f)); + } + + void SetFunction(TaskFunction&& f) { f_ = std::move(f); } + + void operator()(Context& ctx) { f_(ctx); } + + std::byte* State() { return state_.data(); } + + private: + // Dispatchers use `state_` to store per Task information. Unused space can be + // used by clients to store custom information in their Tasks. + std::array state_; + TaskFunction f_; +}; + +} // namespace pw::async diff --git a/pw_async/public_test/pw_async/test_dispatcher.h b/pw_async/public_test/pw_async/test_dispatcher.h new file mode 100644 index 0000000000..0c86026f10 --- /dev/null +++ b/pw_async/public_test/pw_async/test_dispatcher.h @@ -0,0 +1,85 @@ +// Copyright 2022 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#pragma once + +#include "pw_async/dispatcher.h" + +namespace pw::async { + +class TestDispatcher final : public Dispatcher { + public: + explicit TestDispatcher() {} + ~TestDispatcher() override { RequestStop(); } + + chrono::SystemClock::time_point Now() override { return now_; } + + void RequestStop() override; + + // Post caller owned |task|. + void PostTask(Task& task) override; + + // Post caller owned |task| to be run after |delay|. + void PostDelayedTask(Task& task, + chrono::SystemClock::duration delay) override; + + // Post caller owned |task| to be run at |time|. + void PostTaskForTime(Task& task, + chrono::SystemClock::time_point time) override; + + // Post caller owned |task| to be run immediately then rerun at a regular + // |interval|. + void SchedulePeriodicTask(Task& task, + chrono::SystemClock::duration interval) override; + // Post caller owned |task| to be run at |start_time| then rerun at a regular + // |interval|. + void SchedulePeriodicTask( + Task& task, + chrono::SystemClock::duration interval, + chrono::SystemClock::time_point start_time) override; + + // Returns true if |task| is succesfully canceled. + // If cancelation fails, the task may be running or completed. + // Periodic tasks may run once more after they are canceled. + bool Cancel(Task& task) override; + + // Execute tasks until the Dispatcher enters a state where none are queued. + void RunUntilIdle() override; + + // Run the Dispatcher until Now() has reached `end_time`, executing all tasks + // that come due before then. + void RunUntil(chrono::SystemClock::time_point end_time) override; + + // Run the Dispatcher until `duration` has elapsed, executing all tasks that + // come due in that period. + void RunFor(chrono::SystemClock::duration duration) override; + + private: + // Insert |task| into task_queue_ maintaining its min-heap property, keyed by + // |time_due|. + void PostTaskInternal(Task& task, chrono::SystemClock::time_point time_due); + + // If no tasks are due, sleeps until a notification is received or until the + // due time of the next task. + // + // If at least one task is due, dequeues and runs each task that is due. + void RunLoopOnce(); + + // A priority queue of scheduled Tasks sorted by earliest due times first. + IntrusiveList task_queue_; + + // Tracks the current time as viewed by the test dispatcher. + chrono::SystemClock::time_point now_; +}; + +} // namespace pw::async diff --git a/pw_async/test_dispatcher.cc b/pw_async/test_dispatcher.cc new file mode 100644 index 0000000000..66da26a69c --- /dev/null +++ b/pw_async/test_dispatcher.cc @@ -0,0 +1,111 @@ +// Copyright 2022 The Pigweed Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); you may not +// use this file except in compliance with the License. You may obtain a copy of +// the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations under +// the License. +#include "pw_async/test_dispatcher.h" + +#include "pw_async/dispatcher_basic.h" +#include "pw_log/log.h" + +using namespace std::chrono_literals; + +namespace pw::async { + +void TestDispatcher::RunUntilIdle() { + while (!task_queue_.empty()) { + // Only advance to the due time of the next task because new tasks can be + // scheduled in the next task. + now_ = BasicDispatcher::DueTime(task_queue_.front()); + RunLoopOnce(); + } +} + +void TestDispatcher::RunUntil(chrono::SystemClock::time_point end_time) { + while (!task_queue_.empty() && + BasicDispatcher::DueTime(task_queue_.front()) <= end_time) { + now_ = BasicDispatcher::DueTime(task_queue_.front()); + RunLoopOnce(); + } + + if (now_ < end_time) { + now_ = end_time; + } +} + +void TestDispatcher::RunFor(chrono::SystemClock::duration duration) { + RunUntil(Now() + duration); +} + +void TestDispatcher::RunLoopOnce() { + while (!task_queue_.empty() && + BasicDispatcher::DueTime(task_queue_.front()) <= Now()) { + Task& task = task_queue_.front(); + task_queue_.pop_front(); + + if (BasicDispatcher::IsPeriodic(task)) { + PostTaskInternal( + task, + BasicDispatcher::DueTime(task) + BasicDispatcher::SetInterval(task)); + } + + Context ctx{this, &task}; + task(ctx); + } +} + +void TestDispatcher::RequestStop() { + PW_LOG_DEBUG("stop requested"); + task_queue_.clear(); +} + +void TestDispatcher::PostTask(Task& task) { PostTaskForTime(task, Now()); } + +void TestDispatcher::PostDelayedTask(Task& task, + chrono::SystemClock::duration delay) { + PostTaskForTime(task, Now() + delay); +} + +void TestDispatcher::PostTaskForTime(Task& task, + chrono::SystemClock::time_point time) { + PW_LOG_DEBUG("posting task"); + PostTaskInternal(task, time); +} + +void TestDispatcher::SchedulePeriodicTask( + Task& task, chrono::SystemClock::duration interval) { + SchedulePeriodicTask(task, interval, Now()); +} + +void TestDispatcher::SchedulePeriodicTask( + Task& task, + chrono::SystemClock::duration interval, + chrono::SystemClock::time_point start_time) { + BasicDispatcher::SetInterval(task, interval); + PostTaskForTime(task, start_time); +} + +bool TestDispatcher::Cancel(Task& task) { return task_queue_.remove(task); } + +void TestDispatcher::PostTaskInternal( + Task& task, chrono::SystemClock::time_point time_due) { + BasicDispatcher::SetDueTime(task, time_due); + auto it_front = task_queue_.begin(); + auto it_behind = task_queue_.before_begin(); + while (it_front != task_queue_.end() && + time_due > BasicDispatcher::DueTime(*it_front)) { + ++it_front; + ++it_behind; + } + task_queue_.insert_after(it_behind, task); +} + +} // namespace pw::async diff --git a/pw_build/generated_pigweed_modules_lists.gni b/pw_build/generated_pigweed_modules_lists.gni index 64fd6e3635..2550de0c90 100644 --- a/pw_build/generated_pigweed_modules_lists.gni +++ b/pw_build/generated_pigweed_modules_lists.gni @@ -37,6 +37,7 @@ declare_args() { dir_pw_assert_log = get_path_info("../pw_assert_log", "abspath") dir_pw_assert_tokenized = get_path_info("../pw_assert_tokenized", "abspath") dir_pw_assert_zephyr = get_path_info("../pw_assert_zephyr", "abspath") + dir_pw_async = get_path_info("../pw_async", "abspath") dir_pw_base64 = get_path_info("../pw_base64", "abspath") dir_pw_bloat = get_path_info("../pw_bloat", "abspath") dir_pw_blob_store = get_path_info("../pw_blob_store", "abspath") @@ -181,6 +182,7 @@ declare_args() { dir_pw_assert_log, dir_pw_assert_tokenized, dir_pw_assert_zephyr, + dir_pw_async, dir_pw_base64, dir_pw_bloat, dir_pw_blob_store, @@ -313,6 +315,7 @@ declare_args() { "$dir_pw_assert_log:tests", "$dir_pw_assert_tokenized:tests", "$dir_pw_assert_zephyr:tests", + "$dir_pw_async:tests", "$dir_pw_base64:tests", "$dir_pw_bloat:tests", "$dir_pw_blob_store:tests", @@ -445,6 +448,7 @@ declare_args() { "$dir_pw_assert_log:docs", "$dir_pw_assert_tokenized:docs", "$dir_pw_assert_zephyr:docs", + "$dir_pw_async:docs", "$dir_pw_base64:docs", "$dir_pw_bloat:docs", "$dir_pw_blob_store:docs",