From fb8250da78eb3cfb2fffbdcddce4367509fae16f Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Fri, 17 Nov 2023 23:11:25 +0100 Subject: [PATCH 1/6] Implement separate signal and message flows --- .../src/impl_subsystem_ctx_sender.rs | 23 +++++++++++ orchestra/src/lib.rs | 40 ++++++++++++++++--- 2 files changed, 58 insertions(+), 5 deletions(-) diff --git a/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs b/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs index b96c87b..571d7ed 100644 --- a/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs +++ b/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs @@ -494,6 +494,29 @@ pub(crate) fn impl_subsystem_context_trait_for( } } + async fn recv_signal(&mut self) -> ::std::result::Result<#signal, #error_ty> { + self.signals.next().await.ok_or(#support_crate ::OrchestraError::Context( + "Signal channel is terminated and empty.".to_owned(), + ).into()) + } + + async fn recv_msg(&mut self) -> ::std::result::Result { + loop { + if let Some((needs_signals_received, msg)) = self.pending_incoming.take() { + while self.signals_received.load() < needs_signals_received { + self.signals_received.clone().await; + } + return Ok(msg); + } + let msg = self.messages.next().await.ok_or( + #support_crate ::OrchestraError::Context( + "Message channel is terminated and empty.".to_owned() + ) + )?; + self.pending_incoming = Some((msg.signals_received, msg.message)); + } + } + fn sender(&mut self) -> &mut Self::Sender { &mut self.to_subsystems } diff --git a/orchestra/src/lib.rs b/orchestra/src/lib.rs index bf58225..b0b6d76 100644 --- a/orchestra/src/lib.rs +++ b/orchestra/src/lib.rs @@ -75,7 +75,7 @@ pub use futures::{ future::{BoxFuture, Fuse, Future}, poll, select, stream::{self, select, select_with_strategy, FuturesUnordered, PollNext}, - task::{Context, Poll}, + task::{AtomicWaker, Context, Poll}, FutureExt, StreamExt, }; #[doc(hidden)] @@ -217,9 +217,15 @@ pub type SubsystemIncomingMessages = self::stream::SelectWithStrategy< (), >; +#[derive(Debug, Default)] +struct SignalsReceivedInner { + waker: AtomicWaker, + value: AtomicUsize, +} + /// Watermark to track the received signals. #[derive(Debug, Default, Clone)] -pub struct SignalsReceived(Arc); +pub struct SignalsReceived(Arc); impl SignalsReceived { /// Load the current value of received signals. @@ -227,12 +233,30 @@ impl SignalsReceived { // It's imperative that we prevent reading a stale value from memory because of reordering. // Memory barrier to ensure that no reads or writes in the current thread before this load are reordered. // All writes in other threads using release semantics become visible to the current thread. - self.0.load(atomic::Ordering::Acquire) + self.0.value.load(atomic::Ordering::Acquire) } /// Increase the number of signals by one. pub fn inc(&self) { - let _previous = self.0.fetch_add(1, atomic::Ordering::AcqRel); + let _previous = self.0.value.fetch_add(1, atomic::Ordering::AcqRel); + self.0.waker.wake(); + } +} + +impl Future for SignalsReceived { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + if self.0.waker.take().is_some() { + // The waker has already been registered, so we're polled due to `wake()`, that is, the value + // has changed. + Poll::Ready(()) + } else { + // No waker, so it's the initial poll on `await`. Register the waker and wait for the value to + // change. + self.0.waker.register(cx.waker()); + Poll::Pending + } } } @@ -416,9 +440,15 @@ pub trait SubsystemContext: Send + 'static { /// using `pending!()` macro you will end up with a busy loop! async fn try_recv(&mut self) -> Result>, ()>; - /// Receive a message. + /// Receive a signal or a message. async fn recv(&mut self) -> Result, Self::Error>; + /// Receive a signal. + async fn recv_signal(&mut self) -> Result; + + /// Receive a message. + async fn recv_msg(&mut self) -> Result; + /// Spawn a child task on the executor. fn spawn( &mut self, From 7bf13f54308a0ee542126a91dbb29f8816115be7 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Mon, 20 Nov 2023 00:34:47 +0100 Subject: [PATCH 2/6] Fight data races --- .../src/impl_subsystem_ctx_sender.rs | 4 +- orchestra/src/lib.rs | 38 +++++++++++-------- 2 files changed, 24 insertions(+), 18 deletions(-) diff --git a/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs b/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs index 571d7ed..19c3425 100644 --- a/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs +++ b/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs @@ -503,9 +503,7 @@ pub(crate) fn impl_subsystem_context_trait_for( async fn recv_msg(&mut self) -> ::std::result::Result { loop { if let Some((needs_signals_received, msg)) = self.pending_incoming.take() { - while self.signals_received.load() < needs_signals_received { - self.signals_received.clone().await; - } + self.signals_received.wait_until(|v| v >= needs_signals_received).await; return Ok(msg); } let msg = self.messages.next().await.ok_or( diff --git a/orchestra/src/lib.rs b/orchestra/src/lib.rs index b0b6d76..fadee40 100644 --- a/orchestra/src/lib.rs +++ b/orchestra/src/lib.rs @@ -223,6 +223,26 @@ struct SignalsReceivedInner { value: AtomicUsize, } +/// Future to wait on for the watermark predicate +pub struct SignalsReceivedWaiter<'a, F: Fn(usize) -> bool> { + owner: &'a SignalsReceivedInner, + predicate: F, +} + +impl bool> Future for SignalsReceivedWaiter<'_, F> { + type Output = usize; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.owner.waker.register(cx.waker()); + let value = self.owner.value.load(atomic::Ordering::Acquire); + if (self.predicate)(value) { + Poll::Ready(value) + } else { + Poll::Pending + } + } +} + /// Watermark to track the received signals. #[derive(Debug, Default, Clone)] pub struct SignalsReceived(Arc); @@ -241,22 +261,10 @@ impl SignalsReceived { let _previous = self.0.value.fetch_add(1, atomic::Ordering::AcqRel); self.0.waker.wake(); } -} - -impl Future for SignalsReceived { - type Output = (); - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { - if self.0.waker.take().is_some() { - // The waker has already been registered, so we're polled due to `wake()`, that is, the value - // has changed. - Poll::Ready(()) - } else { - // No waker, so it's the initial poll on `await`. Register the waker and wait for the value to - // change. - self.0.waker.register(cx.waker()); - Poll::Pending - } + /// Wait until a predicate for the watermark is true. + pub fn wait_until bool>(&self, predicate: F) -> SignalsReceivedWaiter { + SignalsReceivedWaiter { owner: &self.0, predicate } } } From 7171cf94c40af391d2ad08fc600522e075516b67 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Mon, 20 Nov 2023 00:36:43 +0100 Subject: [PATCH 3/6] `cargo fmt` --- orchestra/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestra/src/lib.rs b/orchestra/src/lib.rs index fadee40..2084020 100644 --- a/orchestra/src/lib.rs +++ b/orchestra/src/lib.rs @@ -231,7 +231,7 @@ pub struct SignalsReceivedWaiter<'a, F: Fn(usize) -> bool> { impl bool> Future for SignalsReceivedWaiter<'_, F> { type Output = usize; - + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { self.owner.waker.register(cx.waker()); let value = self.owner.value.load(atomic::Ordering::Acquire); From 315cd35bfb43ad38dc517287547b57d78d77eac3 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Mon, 27 Nov 2023 18:56:42 +0100 Subject: [PATCH 4/6] Leave only signal receiving separate endpoint --- .../src/impl_subsystem_ctx_sender.rs | 15 ------- orchestra/src/lib.rs | 43 ++----------------- 2 files changed, 4 insertions(+), 54 deletions(-) diff --git a/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs b/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs index 19c3425..708a008 100644 --- a/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs +++ b/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs @@ -500,21 +500,6 @@ pub(crate) fn impl_subsystem_context_trait_for( ).into()) } - async fn recv_msg(&mut self) -> ::std::result::Result { - loop { - if let Some((needs_signals_received, msg)) = self.pending_incoming.take() { - self.signals_received.wait_until(|v| v >= needs_signals_received).await; - return Ok(msg); - } - let msg = self.messages.next().await.ok_or( - #support_crate ::OrchestraError::Context( - "Message channel is terminated and empty.".to_owned() - ) - )?; - self.pending_incoming = Some((msg.signals_received, msg.message)); - } - } - fn sender(&mut self) -> &mut Self::Sender { &mut self.to_subsystems } diff --git a/orchestra/src/lib.rs b/orchestra/src/lib.rs index 2084020..050cea8 100644 --- a/orchestra/src/lib.rs +++ b/orchestra/src/lib.rs @@ -75,7 +75,7 @@ pub use futures::{ future::{BoxFuture, Fuse, Future}, poll, select, stream::{self, select, select_with_strategy, FuturesUnordered, PollNext}, - task::{AtomicWaker, Context, Poll}, + task::{Context, Poll}, FutureExt, StreamExt, }; #[doc(hidden)] @@ -217,35 +217,9 @@ pub type SubsystemIncomingMessages = self::stream::SelectWithStrategy< (), >; -#[derive(Debug, Default)] -struct SignalsReceivedInner { - waker: AtomicWaker, - value: AtomicUsize, -} - -/// Future to wait on for the watermark predicate -pub struct SignalsReceivedWaiter<'a, F: Fn(usize) -> bool> { - owner: &'a SignalsReceivedInner, - predicate: F, -} - -impl bool> Future for SignalsReceivedWaiter<'_, F> { - type Output = usize; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.owner.waker.register(cx.waker()); - let value = self.owner.value.load(atomic::Ordering::Acquire); - if (self.predicate)(value) { - Poll::Ready(value) - } else { - Poll::Pending - } - } -} - /// Watermark to track the received signals. #[derive(Debug, Default, Clone)] -pub struct SignalsReceived(Arc); +pub struct SignalsReceived(Arc); impl SignalsReceived { /// Load the current value of received signals. @@ -253,18 +227,12 @@ impl SignalsReceived { // It's imperative that we prevent reading a stale value from memory because of reordering. // Memory barrier to ensure that no reads or writes in the current thread before this load are reordered. // All writes in other threads using release semantics become visible to the current thread. - self.0.value.load(atomic::Ordering::Acquire) + self.0.load(atomic::Ordering::Acquire) } /// Increase the number of signals by one. pub fn inc(&self) { - let _previous = self.0.value.fetch_add(1, atomic::Ordering::AcqRel); - self.0.waker.wake(); - } - - /// Wait until a predicate for the watermark is true. - pub fn wait_until bool>(&self, predicate: F) -> SignalsReceivedWaiter { - SignalsReceivedWaiter { owner: &self.0, predicate } + let _previous = self.0.fetch_add(1, atomic::Ordering::AcqRel); } } @@ -454,9 +422,6 @@ pub trait SubsystemContext: Send + 'static { /// Receive a signal. async fn recv_signal(&mut self) -> Result; - /// Receive a message. - async fn recv_msg(&mut self) -> Result; - /// Spawn a child task on the executor. fn spawn( &mut self, From a560af35257eec73533d1920f38ab977c131e64a Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Mon, 27 Nov 2023 23:12:54 +0100 Subject: [PATCH 5/6] Add an example --- Cargo.lock | 229 +++++++++++++++++++++++++++- orchestra/Cargo.toml | 8 + orchestra/examples/backpressure.rs | 231 +++++++++++++++++++++++++++++ orchestra/src/lib.rs | 3 + 4 files changed, 463 insertions(+), 8 deletions(-) create mode 100644 orchestra/examples/backpressure.rs diff --git a/Cargo.lock b/Cargo.lock index bb650ab..ab1a62d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -46,6 +46,35 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix 0.37.27", + "slab", + "socket2", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener", +] + [[package]] name = "async-trait" version = "0.1.71" @@ -83,6 +112,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + [[package]] name = "bitflags" version = "2.4.0" @@ -382,6 +417,19 @@ dependencies = [ "termcolor", ] +[[package]] +name = "env_logger" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95b3f3e67048839cb0d0781f445682a35113da7121f7c949db0e2be96a4fbece" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -427,6 +475,15 @@ dependencies = [ "quote", ] +[[package]] +name = "fastrand" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51093e27b0797c359783294ca4f0a911c270184cb10f85783b118614a1501be" +dependencies = [ + "instant", +] + [[package]] name = "fixedbitset" version = "0.4.2" @@ -488,6 +545,21 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.28" @@ -511,6 +583,18 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +[[package]] +name = "futures-time" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6404853a6824881fe5f7d662d147dc4e84ecd2259ba0378f272a71dab600758a" +dependencies = [ + "async-channel", + "async-io", + "futures-core", + "pin-project-lite", +] + [[package]] name = "futures-timer" version = "3.0.2" @@ -545,6 +629,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe9006bed769170c11f845cf00c7c1e9092aeb3f268e007c3e760ac68008070f" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + [[package]] name = "glob" version = "0.3.1" @@ -610,6 +705,26 @@ dependencies = [ "hashbrown 0.14.0", ] +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", +] + +[[package]] +name = "io-lifetimes" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eae7b9aee968036d54dce06cebaefd919e4472e753296daccd6d344e3e2df0c2" +dependencies = [ + "hermit-abi 0.3.2", + "libc", + "windows-sys", +] + [[package]] name = "is-terminal" version = "0.4.9" @@ -617,7 +732,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b" dependencies = [ "hermit-abi 0.3.2", - "rustix", + "rustix 0.38.8", "windows-sys", ] @@ -665,9 +780,15 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.147" +version = "0.2.150" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" + +[[package]] +name = "linux-raw-sys" +version = "0.3.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4668fb0ea861c1df094127ac5f1da3409a82116a4ba74fca2e58ef927159bb3" +checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" [[package]] name = "linux-raw-sys" @@ -677,9 +798,9 @@ checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" [[package]] name = "log" -version = "0.4.19" +version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b06a4cde4c0f271a446782e3eff8de789548ce57dbc8eca9292c27f4a42004b4" +checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" [[package]] name = "memchr" @@ -740,11 +861,15 @@ dependencies = [ "async-trait", "criterion", "dyn-clonable", + "env_logger 0.10.1", "futures", + "futures-time", "futures-timer", + "log", "orchestra-proc-macro", "pin-project", "prioritized-metered-channel", + "rand", "rustversion", "thiserror", "tracing", @@ -769,6 +894,12 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "parking" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" + [[package]] name = "petgraph" version = "0.6.3" @@ -839,6 +970,28 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags 1.3.2", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" + [[package]] name = "prioritized-metered-channel" version = "0.6.0" @@ -848,7 +1001,7 @@ dependencies = [ "coarsetime", "crossbeam-queue", "derive_more", - "env_logger", + "env_logger 0.9.3", "futures", "futures-timer", "log", @@ -885,6 +1038,36 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + [[package]] name = "rayon" version = "1.7.0" @@ -945,16 +1128,30 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "0.37.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea8ca367a3a01fe35e6943c400addf443c0f57670e6ec51196f71a4b8762dd2" +dependencies = [ + "bitflags 1.3.2", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys 0.3.8", + "windows-sys", +] + [[package]] name = "rustix" version = "0.38.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19ed4fa021d81c8392ce04db050a3da9a60299050b7ae1cf482d862b54a7218f" dependencies = [ - "bitflags", + "bitflags 2.4.0", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.5", "windows-sys", ] @@ -1031,6 +1228,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "socket2" +version = "0.4.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f7916fc008ca5542385b89a3d3ce689953c143e9304a9bf8beec1de48994c0d" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "subtle" version = "2.5.0" @@ -1181,6 +1388,12 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "waker-fn" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" + [[package]] name = "walkdir" version = "2.3.3" diff --git a/orchestra/Cargo.toml b/orchestra/Cargo.toml index 9628f94..8e866f4 100644 --- a/orchestra/Cargo.toml +++ b/orchestra/Cargo.toml @@ -25,6 +25,10 @@ trybuild = "1.0.61" rustversion = "1.0.6" criterion = { version = "0.5" } futures = { version = "0.3", features = ["thread-pool"] } +env_logger = "0.10.1" +futures-time = "3.0.0" +log = "0.4.20" +rand = "0.8.5" [[example]] name = "duo" @@ -34,6 +38,10 @@ crate-type = ["bin"] name = "dig" crate-type = ["bin"] +[[example]] +name = "backpressure" +crate-type = ["bin"] + [[bench]] name = "bench_main" harness = false diff --git a/orchestra/examples/backpressure.rs b/orchestra/examples/backpressure.rs new file mode 100644 index 0000000..df4f205 --- /dev/null +++ b/orchestra/examples/backpressure.rs @@ -0,0 +1,231 @@ +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! This is an example of how backpressure between subsystems may be created using the +//! `recv_signal()` method. +//! +//! Subsystems A and B receive a heartbeat signal every second. Once a signal is received, subsystem +//! B sends a message to subsystem A asking the latter to do some job. The job takes 5 to 30 +//! seconds, so subsystem A cannot keep up as it can only process five tasks concurrently. +//! +//! When subsystem A is saturated with tasks, it stops receiving new messages but keeps receiving +//! orchestra signals. At that moment, subsystem B is blocked in `send_message()` and cannot +//! process either signals or messages as it doesn't implement separate `recv_signal()` logic. +//! +//! Once a task is complete, subsystem A is ready to accept a new message again. Subsystem B +//! unblocks and continues its loop, processing new signals and saturating subsystem A with new +//! tasks again. +//! +//! Please note that the orchestra message capacity has been chosen to be equal to subsystem A task +//! capacity to demonstrate the backpressure logic better. The message capacity may (and probably +//! should) be larger than the task capacity to create additional buffering, which allows not +//! blocking too early. +//! +//! At the same time, signal capacity has been chosen to be superfluous. That's because subsystem B, +//! being intentionally implemented suboptimally in this example, fails to consume signals when it +//! is stuck because subsystem A backpressures on it. If the signal capacity is too low, it may +//! lead to signal channel capacity overflow, and subsystem A would stop receiving signals as well. +//! That demonstrates that in a well-designed backpressure system, all of its parts +//! (subsystems) must be able to handle backpressure gracefully; otherwise, all the actors may +//! experience signal starvation. + +use std::sync::atomic::{AtomicUsize, Ordering}; + +use futures::{executor::ThreadPool, pin_mut}; +use futures_time::{task::sleep, time}; +use orchestra::*; +use rand::prelude::*; + +static COUNTER: AtomicUsize = AtomicUsize::new(0); + +#[derive(Debug, Clone)] +pub enum Signal { + Heartbeat, +} + +#[derive(thiserror::Error, Debug)] +enum MyError { + #[error(transparent)] + Generated(#[from] OrchestraError), +} + +#[derive(Debug)] +pub struct Message1(usize); +#[derive(Debug)] +pub struct Message2; +struct DummyEvent; + +#[derive(Debug, Clone)] +pub struct MySpawner(pub ThreadPool); + +impl Spawner for MySpawner { + fn spawn_blocking( + &self, + _task_name: &'static str, + _subsystem_name: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + self.0.spawn_ok(future); + } + + fn spawn( + &self, + _task_name: &'static str, + _subsystem_name: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + self.0.spawn_ok(future); + } +} + +struct SubsystemA; +struct SubsystemB; + +#[orchestra(signal=Signal, event=DummyEvent, gen=AllMessages, error=OrchestraError, message_capacity=5, signal_capacity=600)] +struct MyOrchestra { + #[subsystem(Message1, sends: [Message2])] + sub_a: SubsystemA, + #[subsystem(Message2, sends: [Message1])] + sub_b: SubsystemB, +} + +#[orchestra::subsystem(SubsystemA, error=OrchestraError)] +impl SubsystemA { + fn start(self, mut ctx: Context) -> SpawnedSubsystem { + SpawnedSubsystem { + name: "SubsystemA", + future: Box::pin(async move { + const TASK_LIMIT: usize = 5; + let mut tasks = FuturesUnordered::new(); + 'outer: loop { + loop { + select! { + from_orchestra = ctx.recv().fuse() => { + match from_orchestra { + Ok(FromOrchestra::Signal(sig)) => { + log::info!(target: "subsystem::A", "received SIGNAL {sig:?} in message-processing loop"); + }, + Ok(FromOrchestra::Communication { msg }) => { + log::info!(target: "subsystem::A", "received MESSAGE {msg:?}"); + let Message1(id) = msg; + let dur = time::Duration::from_secs((5..30).choose(&mut rand::thread_rng()).unwrap()); + tasks.push(async move { + log::info!(target: "task", "[{id}]: sleeping for {dur:?}"); + sleep(dur).await; + log::info!(target: "task", "[{id}]: woke up"); + id + }); + if tasks.len() >= TASK_LIMIT { + break; + } + }, + Err(_) => { + break 'outer; + } + } + }, + id = tasks.select_next_some() => { + log::info!(target: "subsystem::A", "task {id} finished in message-processing loop"); + } + } + } + + log::warn!(target: "subsystem::A", "↓↓↓ saturated, only processing signals ↓↓↓"); + + loop { + select! { + sig = ctx.recv_signal().fuse() => { + log::info!(target: "subsystem::A", "received SIGNAL {sig:?} in signal-processing loop"); + }, + id = tasks.select_next_some() => { + log::info!(target: "subsystem::A", "task {id} finished in signal-processing loop"); + if tasks.len() < TASK_LIMIT { + break; + } + } + } + } + + log::warn!(target: "subsystem::A", "↑↑↑ desaturated, processing everything ↑↑↑"); + } + Ok(()) + }), + } + } +} + +#[orchestra::subsystem(SubsystemB, error=OrchestraError)] +impl SubsystemB { + fn start(self, mut ctx: Context) -> SpawnedSubsystem { + let mut sender = ctx.sender().clone(); + SpawnedSubsystem { + name: "SubsystemB", + future: Box::pin(async move { + loop { + select! { + from_orchestra = ctx.recv().fuse() => { + match from_orchestra { + Ok(FromOrchestra::Signal(sig)) => { + let id = COUNTER.fetch_add(1, Ordering::AcqRel); + log::info!(target: "subsystem::B", "received SIGNAL {sig:?}, sending task [{id}]"); + sender.send_message(Message1(id)).await; + log::info!(target: "subsystem::B", "successfully sent task [{id}]"); + }, + Ok(FromOrchestra::Communication { msg }) => { + log::info!(target: "subsystem::B", "received MESSAGE {msg:?}"); + }, + Err(_) => break + } + }, + } + } + Ok(()) + }), + } + } +} + +fn main() { + env_logger::builder().filter_level(log::LevelFilter::Info).init(); + + let (mut orchestra, _handle) = MyOrchestra::builder() + .sub_a(SubsystemA) + .sub_b(SubsystemB) + .spawner(MySpawner(ThreadPool::new().unwrap())) + .build() + .unwrap(); + + let fut = orchestra.running_subsystems.into_future().fuse(); + pin_mut!(fut); + + let signal_spammer = async { + loop { + sleep(time::Duration::from_secs(1)).await; + let _ = orchestra.sub_a.send_signal(Signal::Heartbeat).await; + let _ = orchestra.sub_b.send_signal(Signal::Heartbeat).await; + } + }; + + pin_mut!(signal_spammer); + + futures::executor::block_on(async { + loop { + select! { + _ = signal_spammer.as_mut().fuse() => (), + _ = fut => (), + } + } + }) +} diff --git a/orchestra/src/lib.rs b/orchestra/src/lib.rs index 050cea8..4dd50c4 100644 --- a/orchestra/src/lib.rs +++ b/orchestra/src/lib.rs @@ -420,6 +420,9 @@ pub trait SubsystemContext: Send + 'static { async fn recv(&mut self) -> Result, Self::Error>; /// Receive a signal. + /// + /// This method allows the subsystem to process signals while being blocked on processing messages. + /// See `examples/backpressure.rs` for an example. async fn recv_signal(&mut self) -> Result; /// Spawn a child task on the executor. From 54f8803a4cf038a86becd4d8de40af925ab469e4 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Tue, 28 Nov 2023 15:40:24 +0100 Subject: [PATCH 6/6] Add error handling --- orchestra/examples/backpressure.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/orchestra/examples/backpressure.rs b/orchestra/examples/backpressure.rs index df4f205..2176f76 100644 --- a/orchestra/examples/backpressure.rs +++ b/orchestra/examples/backpressure.rs @@ -146,8 +146,13 @@ impl SubsystemA { loop { select! { - sig = ctx.recv_signal().fuse() => { - log::info!(target: "subsystem::A", "received SIGNAL {sig:?} in signal-processing loop"); + signal = ctx.recv_signal().fuse() => { + match signal { + Ok(sig) => log::info!(target: "subsystem::A", "received SIGNAL {sig:?} in signal-processing loop"), + Err(_) => { + break 'outer; + } + } }, id = tasks.select_next_some() => { log::info!(target: "subsystem::A", "task {id} finished in signal-processing loop");