From 1912785d36f36ee2894082331122bef2a2360314 Mon Sep 17 00:00:00 2001 From: s0me0ne-unkn0wn <48632512+s0me0ne-unkn0wn@users.noreply.github.com> Date: Wed, 29 Nov 2023 16:52:27 +0100 Subject: [PATCH] Implement separate signal and message flows (#66) * Implement separate signal and message flows * Fight data races * `cargo fmt` * Leave only signal receiving separate endpoint * Add an example * Add error handling --- Cargo.lock | 229 ++++++++++++++++- orchestra/Cargo.toml | 8 + orchestra/examples/backpressure.rs | 236 ++++++++++++++++++ .../src/impl_subsystem_ctx_sender.rs | 6 + orchestra/src/lib.rs | 8 +- 5 files changed, 478 insertions(+), 9 deletions(-) create mode 100644 orchestra/examples/backpressure.rs diff --git a/Cargo.lock b/Cargo.lock index bb650ab..ab1a62d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -46,6 +46,35 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix 0.37.27", + "slab", + "socket2", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener", +] + [[package]] name = "async-trait" version = "0.1.71" @@ -83,6 +112,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.4.0" @@ -382,6 +417,19 @@ dependencies = [ "termcolor", ] +[[package]] +name = "env_logger" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95b3f3e67048839cb0d0781f445682a35113da7121f7c949db0e2be96a4fbece" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -427,6 +475,15 @@ dependencies = [ "quote", ] +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fixedbitset" version = "0.4.2" @@ -488,6 +545,21 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.28" @@ -511,6 +583,18 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +[[package]] +name = "futures-time" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6404853a6824881fe5f7d662d147dc4e84ecd2259ba0378f272a71dab600758a" +dependencies = [ + "async-channel", + "async-io", + "futures-core", + "pin-project-lite", +] + [[package]] name = "futures-timer" version = "3.0.2" @@ -545,6 +629,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "glob" version = "0.3.1" @@ -610,6 +705,26 @@ dependencies = [ "hashbrown 0.14.0", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi 0.3.2", + "libc", + "windows-sys", +] + [[package]] name = "is-terminal" version = "0.4.9" @@ -617,7 +732,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi 0.3.2", - "rustix", + "rustix 0.38.8", "windows-sys", ] @@ -665,9 +780,15 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.147" +version = "0.2.150" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" + +[[package]] +name = "linux-raw-sys" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" [[package]] name = "linux-raw-sys" @@ -677,9 +798,9 @@ checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" [[package]] name = "log" -version = "0.4.19" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" [[package]] name = "memchr" @@ -740,11 +861,15 @@ dependencies = [ "async-trait", "criterion", "dyn-clonable", + "env_logger 0.10.1", "futures", + "futures-time", "futures-timer", + "log", "orchestra-proc-macro", "pin-project", "prioritized-metered-channel", + "rand", "rustversion", "thiserror", "tracing", @@ -769,6 +894,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "petgraph" version = "0.6.3" @@ -839,6 +970,28 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "prioritized-metered-channel" version = "0.6.0" @@ -848,7 +1001,7 @@ dependencies = [ "coarsetime", "crossbeam-queue", "derive_more", - "env_logger", + "env_logger 0.9.3", "futures", "futures-timer", "log", @@ -885,6 +1038,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "rayon" version = "1.7.0" @@ -945,16 +1128,30 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.37.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys", +] + [[package]] name = "rustix" version = "0.38.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19ed4fa021d81c8392ce04db050a3da9a60299050b7ae1cf482d862b54a7218f" dependencies = [ - "bitflags", + "bitflags 2.4.0", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.5", "windows-sys", ] @@ -1031,6 +1228,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "subtle" version = "2.5.0" @@ -1181,6 +1388,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" + [[package]] name = "walkdir" version = "2.3.3" diff --git a/orchestra/Cargo.toml b/orchestra/Cargo.toml index 9628f94..8e866f4 100644 --- a/orchestra/Cargo.toml +++ b/orchestra/Cargo.toml @@ -25,6 +25,10 @@ trybuild = "1.0.61" rustversion = "1.0.6" criterion = { version = "0.5" } futures = { version = "0.3", features = ["thread-pool"] } +env_logger = "0.10.1" +futures-time = "3.0.0" +log = "0.4.20" +rand = "0.8.5" [[example]] name = "duo" @@ -34,6 +38,10 @@ crate-type = ["bin"] name = "dig" crate-type = ["bin"] +[[example]] +name = "backpressure" +crate-type = ["bin"] + [[bench]] name = "bench_main" harness = false diff --git a/orchestra/examples/backpressure.rs b/orchestra/examples/backpressure.rs new file mode 100644 index 0000000..2176f76 --- /dev/null +++ b/orchestra/examples/backpressure.rs @@ -0,0 +1,236 @@ +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This is an example of how backpressure between subsystems may be created using the +//! `recv_signal()` method. +//! +//! Subsystems A and B receive a heartbeat signal every second. Once a signal is received, subsystem +//! B sends a message to subsystem A asking the latter to do some job. The job takes 5 to 30 +//! seconds, so subsystem A cannot keep up as it can only process five tasks concurrently. +//! +//! When subsystem A is saturated with tasks, it stops receiving new messages but keeps receiving +//! orchestra signals. At that moment, subsystem B is blocked in `send_message()` and cannot +//! process either signals or messages as it doesn't implement separate `recv_signal()` logic. +//! +//! Once a task is complete, subsystem A is ready to accept a new message again. Subsystem B +//! unblocks and continues its loop, processing new signals and saturating subsystem A with new +//! tasks again. +//! +//! Please note that the orchestra message capacity has been chosen to be equal to subsystem A task +//! capacity to demonstrate the backpressure logic better. The message capacity may (and probably +//! should) be larger than the task capacity to create additional buffering, which allows not +//! blocking too early. +//! +//! At the same time, signal capacity has been chosen to be superfluous. That's because subsystem B, +//! being intentionally implemented suboptimally in this example, fails to consume signals when it +//! is stuck because subsystem A backpressures on it. If the signal capacity is too low, it may +//! lead to signal channel capacity overflow, and subsystem A would stop receiving signals as well. +//! That demonstrates that in a well-designed backpressure system, all of its parts +//! (subsystems) must be able to handle backpressure gracefully; otherwise, all the actors may +//! experience signal starvation. + +use std::sync::atomic::{AtomicUsize, Ordering}; + +use futures::{executor::ThreadPool, pin_mut}; +use futures_time::{task::sleep, time}; +use orchestra::*; +use rand::prelude::*; + +static COUNTER: AtomicUsize = AtomicUsize::new(0); + +#[derive(Debug, Clone)] +pub enum Signal { + Heartbeat, +} + +#[derive(thiserror::Error, Debug)] +enum MyError { + #[error(transparent)] + Generated(#[from] OrchestraError), +} + +#[derive(Debug)] +pub struct Message1(usize); +#[derive(Debug)] +pub struct Message2; +struct DummyEvent; + +#[derive(Debug, Clone)] +pub struct MySpawner(pub ThreadPool); + +impl Spawner for MySpawner { + fn spawn_blocking( + &self, + _task_name: &'static str, + _subsystem_name: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + self.0.spawn_ok(future); + } + + fn spawn( + &self, + _task_name: &'static str, + _subsystem_name: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + self.0.spawn_ok(future); + } +} + +struct SubsystemA; +struct SubsystemB; + +#[orchestra(signal=Signal, event=DummyEvent, gen=AllMessages, error=OrchestraError, message_capacity=5, signal_capacity=600)] +struct MyOrchestra { + #[subsystem(Message1, sends: [Message2])] + sub_a: SubsystemA, + #[subsystem(Message2, sends: [Message1])] + sub_b: SubsystemB, +} + +#[orchestra::subsystem(SubsystemA, error=OrchestraError)] +impl SubsystemA { + fn start(self, mut ctx: Context) -> SpawnedSubsystem { + SpawnedSubsystem { + name: "SubsystemA", + future: Box::pin(async move { + const TASK_LIMIT: usize = 5; + let mut tasks = FuturesUnordered::new(); + 'outer: loop { + loop { + select! { + from_orchestra = ctx.recv().fuse() => { + match from_orchestra { + Ok(FromOrchestra::Signal(sig)) => { + log::info!(target: "subsystem::A", "received SIGNAL {sig:?} in message-processing loop"); + }, + Ok(FromOrchestra::Communication { msg }) => { + log::info!(target: "subsystem::A", "received MESSAGE {msg:?}"); + let Message1(id) = msg; + let dur = time::Duration::from_secs((5..30).choose(&mut rand::thread_rng()).unwrap()); + tasks.push(async move { + log::info!(target: "task", "[{id}]: sleeping for {dur:?}"); + sleep(dur).await; + log::info!(target: "task", "[{id}]: woke up"); + id + }); + if tasks.len() >= TASK_LIMIT { + break; + } + }, + Err(_) => { + break 'outer; + } + } + }, + id = tasks.select_next_some() => { + log::info!(target: "subsystem::A", "task {id} finished in message-processing loop"); + } + } + } + + log::warn!(target: "subsystem::A", "↓↓↓ saturated, only processing signals ↓↓↓"); + + loop { + select! { + signal = ctx.recv_signal().fuse() => { + match signal { + Ok(sig) => log::info!(target: "subsystem::A", "received SIGNAL {sig:?} in signal-processing loop"), + Err(_) => { + break 'outer; + } + } + }, + id = tasks.select_next_some() => { + log::info!(target: "subsystem::A", "task {id} finished in signal-processing loop"); + if tasks.len() < TASK_LIMIT { + break; + } + } + } + } + + log::warn!(target: "subsystem::A", "↑↑↑ desaturated, processing everything ↑↑↑"); + } + Ok(()) + }), + } + } +} + +#[orchestra::subsystem(SubsystemB, error=OrchestraError)] +impl SubsystemB { + fn start(self, mut ctx: Context) -> SpawnedSubsystem { + let mut sender = ctx.sender().clone(); + SpawnedSubsystem { + name: "SubsystemB", + future: Box::pin(async move { + loop { + select! { + from_orchestra = ctx.recv().fuse() => { + match from_orchestra { + Ok(FromOrchestra::Signal(sig)) => { + let id = COUNTER.fetch_add(1, Ordering::AcqRel); + log::info!(target: "subsystem::B", "received SIGNAL {sig:?}, sending task [{id}]"); + sender.send_message(Message1(id)).await; + log::info!(target: "subsystem::B", "successfully sent task [{id}]"); + }, + Ok(FromOrchestra::Communication { msg }) => { + log::info!(target: "subsystem::B", "received MESSAGE {msg:?}"); + }, + Err(_) => break + } + }, + } + } + Ok(()) + }), + } + } +} + +fn main() { + env_logger::builder().filter_level(log::LevelFilter::Info).init(); + + let (mut orchestra, _handle) = MyOrchestra::builder() + .sub_a(SubsystemA) + .sub_b(SubsystemB) + .spawner(MySpawner(ThreadPool::new().unwrap())) + .build() + .unwrap(); + + let fut = orchestra.running_subsystems.into_future().fuse(); + pin_mut!(fut); + + let signal_spammer = async { + loop { + sleep(time::Duration::from_secs(1)).await; + let _ = orchestra.sub_a.send_signal(Signal::Heartbeat).await; + let _ = orchestra.sub_b.send_signal(Signal::Heartbeat).await; + } + }; + + pin_mut!(signal_spammer); + + futures::executor::block_on(async { + loop { + select! { + _ = signal_spammer.as_mut().fuse() => (), + _ = fut => (), + } + } + }) +} diff --git a/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs b/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs index b96c87b..708a008 100644 --- a/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs +++ b/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs @@ -494,6 +494,12 @@ pub(crate) fn impl_subsystem_context_trait_for( } } + async fn recv_signal(&mut self) -> ::std::result::Result<#signal, #error_ty> { + self.signals.next().await.ok_or(#support_crate ::OrchestraError::Context( + "Signal channel is terminated and empty.".to_owned(), + ).into()) + } + fn sender(&mut self) -> &mut Self::Sender { &mut self.to_subsystems } diff --git a/orchestra/src/lib.rs b/orchestra/src/lib.rs index bf58225..4dd50c4 100644 --- a/orchestra/src/lib.rs +++ b/orchestra/src/lib.rs @@ -416,9 +416,15 @@ pub trait SubsystemContext: Send + 'static { /// using `pending!()` macro you will end up with a busy loop! async fn try_recv(&mut self) -> Result>, ()>; - /// Receive a message. + /// Receive a signal or a message. async fn recv(&mut self) -> Result, Self::Error>; + /// Receive a signal. + /// + /// This method allows the subsystem to process signals while being blocked on processing messages. + /// See `examples/backpressure.rs` for an example. + async fn recv_signal(&mut self) -> Result; + /// Spawn a child task on the executor. fn spawn( &mut self,