From dd5b7e2104d1deaffad0ffe144237cd78d8c027a Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Thu, 30 May 2024 19:25:46 +0200 Subject: [PATCH] Allow subsystems using priority channels --- orchestra/proc-macro/src/impl_builder.rs | 3 +- orchestra/proc-macro/src/impl_channels_out.rs | 13 +- .../src/impl_subsystem_ctx_sender.rs | 14 +- orchestra/src/lib.rs | 3 + .../subsystems_priority_channels_test.rs | 161 ++++++++++++++++++ 5 files changed, 189 insertions(+), 5 deletions(-) create mode 100644 orchestra/tests/subsystems_priority_channels_test.rs diff --git a/orchestra/proc-macro/src/impl_builder.rs b/orchestra/proc-macro/src/impl_builder.rs index ee6e0dd..c6ce5b3 100644 --- a/orchestra/proc-macro/src/impl_builder.rs +++ b/orchestra/proc-macro/src/impl_builder.rs @@ -635,9 +635,10 @@ pub(crate) fn impl_feature_gated_items( #( let (#channel_name_tx, #channel_name_rx) = - #support_crate ::metered::channel::< + #support_crate ::metered::channel_with_priority::< MessagePacket< #maybe_boxed_consumes > >( + self.channel_capacity.unwrap_or(#message_channel_capacity), self.channel_capacity.unwrap_or(#message_channel_capacity) ); )* diff --git a/orchestra/proc-macro/src/impl_channels_out.rs b/orchestra/proc-macro/src/impl_channels_out.rs index 1d374a5..78c58ea 100644 --- a/orchestra/proc-macro/src/impl_channels_out.rs +++ b/orchestra/proc-macro/src/impl_channels_out.rs @@ -81,14 +81,21 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result = match message { #( #feature_gates #message_wrapper :: #consumes_variant ( inner ) => { - self. #channel_name .send( - #support_crate ::make_packet(signals_received, #maybe_boxed_send) - ).await.map_err(|_| stringify!( #channel_name )) + if use_priority_channel { + self. #channel_name .priority_send( + #support_crate ::make_packet(signals_received, #maybe_boxed_send) + ).await.map_err(|_| stringify!( #channel_name )) + } else { + self. #channel_name .send( + #support_crate ::make_packet(signals_received, #maybe_boxed_send) + ).await.map_err(|_| stringify!( #channel_name )) + } } )* // subsystems that are wip diff --git a/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs b/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs index 15de1b5..0881b4d 100644 --- a/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs +++ b/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs @@ -282,7 +282,19 @@ pub(crate) fn impl_subsystem_sender( self.signals_received.load(), <#all_messages_wrapper as ::std::convert::From<_>> ::from ( <#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg ) - ) + ), + false, + ).await; + } + + async fn priority_send_message(&mut self, msg: OutgoingMessage) + { + self.channels.send_and_log_error( + self.signals_received.load(), + <#all_messages_wrapper as ::std::convert::From<_>> ::from ( + <#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg ) + ), + true ).await; } diff --git a/orchestra/src/lib.rs b/orchestra/src/lib.rs index 4dd50c4..b18f376 100644 --- a/orchestra/src/lib.rs +++ b/orchestra/src/lib.rs @@ -503,6 +503,9 @@ where /// Send a direct message to some other `Subsystem`, routed based on message type. async fn send_message(&mut self, msg: OutgoingMessage); + /// Send a direct priority message to some other `Subsystem`, routed based on message type. + async fn priority_send_message(&mut self, msg: OutgoingMessage); + /// Tries to send a direct message to some other `Subsystem`, routed based on message type. /// This method is useful for cases where the message queue is bounded and the message is ok /// to be dropped if the queue is full. If the queue is full, this method will return an error. diff --git a/orchestra/tests/subsystems_priority_channels_test.rs b/orchestra/tests/subsystems_priority_channels_test.rs new file mode 100644 index 0000000..5c7b687 --- /dev/null +++ b/orchestra/tests/subsystems_priority_channels_test.rs @@ -0,0 +1,161 @@ +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures::executor::ThreadPool; +use orchestra::*; +use std::sync::{ + atomic::{AtomicU8, Ordering}, + Arc, +}; + +struct SubA { + regular: Vec, + priority: Vec, +} + +pub struct SubB { + messages: Vec>, +} + +impl SubA { + fn new(regular: Vec, priority: Vec) -> Self { + Self { regular, priority } + } +} + +impl SubB { + fn new(messages: Vec>) -> Self { + Self { messages } + } +} + +impl crate::Subsystem, OrchestraError> for SubA { + fn start(self, mut ctx: OrchestraSubsystemContext) -> SpawnedSubsystem { + let mut sender = ctx.sender().clone(); + SpawnedSubsystem { + name: "sub A", + future: Box::pin(async move { + for i in self.regular { + sender.send_message(MsgB(i)).await; + } + for i in self.priority { + sender.priority_send_message(MsgB(i)).await; + } + + Ok(()) + }), + } + } +} + +impl crate::Subsystem, OrchestraError> for SubB { + fn start(self, mut ctx: OrchestraSubsystemContext) -> SpawnedSubsystem { + SpawnedSubsystem { + name: "sub B", + future: Box::pin(async move { + // Wait until sub_a sends all messages + futures_timer::Delay::new(Duration::from_millis(50)).await; + for i in self.messages { + match ctx.recv().await.unwrap() { + FromOrchestra::Communication { msg } => { + i.store(msg.0, Ordering::SeqCst); + }, + _ => panic!("unexpected message"), + } + } + + Ok(()) + }), + } + } +} + +#[derive(Clone, Debug)] +pub struct SigSigSig; + +#[derive(Clone, Debug)] +pub struct Event; + +#[derive(Clone, Debug)] +#[allow(dead_code)] +pub struct MsgA(u8); + +#[derive(Clone, Debug)] +#[allow(dead_code)] +pub struct MsgB(u8); + +#[derive(Debug, Clone)] +pub struct DummySpawner(pub ThreadPool); + +impl Spawner for DummySpawner { + fn spawn_blocking( + &self, + _task_name: &'static str, + _subsystem_name: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + self.0.spawn_ok(future); + } + + fn spawn( + &self, + _task_name: &'static str, + _subsystem_name: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + self.0.spawn_ok(future); + } +} + +#[orchestra(signal=SigSigSig, event=Event, gen=AllMessages, error=OrchestraError, boxed_messages=true)] +pub struct Orchestra { + #[subsystem(consumes: MsgA, sends: [MsgB])] + sub_a: SubA, + + #[subsystem(consumes: MsgB, sends: [MsgA])] + sub_b: SubB, +} + +#[test] +fn test_priority_send_message() { + let regular = vec![1, 2, 3]; + let priority = vec![42]; + let messages = vec![ + Arc::new(AtomicU8::new(0)), + Arc::new(AtomicU8::new(0)), + Arc::new(AtomicU8::new(0)), + Arc::new(AtomicU8::new(0)), + ]; + let sub_a = SubA::new(regular.clone(), priority.clone()); + let sub_b = SubB::new(messages.clone()); + let pool = ThreadPool::new().unwrap(); + let (orchestra, _handle) = Orchestra::builder() + .sub_a(sub_a) + .sub_b(sub_b) + .spawner(DummySpawner(pool)) + .build() + .unwrap(); + + futures::executor::block_on(async move { + for run_subsystem in orchestra.running_subsystems { + run_subsystem.await.unwrap(); + } + }); + + assert_eq!( + priority.into_iter().chain(regular.into_iter()).collect::>(), + messages.iter().map(|i| i.load(Ordering::SeqCst)).collect::>() + ); +}