diff --git a/orchestra/proc-macro/src/impl_builder.rs b/orchestra/proc-macro/src/impl_builder.rs index ee6e0dd..be4bb7b 100644 --- a/orchestra/proc-macro/src/impl_builder.rs +++ b/orchestra/proc-macro/src/impl_builder.rs @@ -175,6 +175,8 @@ pub(crate) fn impl_feature_gated_items( let channel_name_rx = &cfg_set.channel_names_without_wip("_rx"); let channel_name_unbounded_rx = &info.channel_names_without_wip("_unbounded_rx"); + let can_receive_priority_messages = &cfg_set.can_receive_priority_messages_without_wip(); + let baggage_name = &info.baggage_names(); let baggage_generic_ty = &info.baggage_generic_types(); @@ -633,13 +635,20 @@ pub(crate) fn impl_feature_gated_items( >(); #( - let (#channel_name_tx, #channel_name_rx) - = + let (#channel_name_tx, #channel_name_rx) = if #can_receive_priority_messages { + #support_crate ::metered::channel_with_priority::< + MessagePacket< #maybe_boxed_consumes > + >( + self.channel_capacity.unwrap_or(#message_channel_capacity), + self.channel_capacity.unwrap_or(#message_channel_capacity) + ) + } else { #support_crate ::metered::channel::< MessagePacket< #maybe_boxed_consumes > >( self.channel_capacity.unwrap_or(#message_channel_capacity) - ); + ) + }; )* #( diff --git a/orchestra/proc-macro/src/impl_channels_out.rs b/orchestra/proc-macro/src/impl_channels_out.rs index 1d374a5..25883af 100644 --- a/orchestra/proc-macro/src/impl_channels_out.rs +++ b/orchestra/proc-macro/src/impl_channels_out.rs @@ -77,18 +77,27 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result( &mut self, signals_received: usize, - message: #message_wrapper, + message: #message_wrapper ) { let res: ::std::result::Result<_, _> = match message { #( #feature_gates #message_wrapper :: #consumes_variant ( inner ) => { - self. #channel_name .send( - #support_crate ::make_packet(signals_received, #maybe_boxed_send) - ).await.map_err(|_| stringify!( #channel_name )) + match P::priority() { + PriorityLevel::Normal => { + self. #channel_name .send( + #support_crate ::make_packet(signals_received, #maybe_boxed_send) + ).await + }, + PriorityLevel::High => { + self. #channel_name .priority_send( + #support_crate ::make_packet(signals_received, #maybe_boxed_send) + ).await + }, + }.map_err(|_| stringify!( #channel_name )) } )* // subsystems that are wip @@ -116,7 +125,7 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result( &mut self, signals_received: usize, message: #message_wrapper, @@ -125,9 +134,18 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result { - self. #channel_name .try_send( - #support_crate ::make_packet(signals_received, #maybe_boxed_send) - ).map_err(|err| match err { + match P::priority() { + PriorityLevel::Normal => { + self. #channel_name .try_send( + #support_crate ::make_packet(signals_received, #maybe_boxed_send) + ) + }, + PriorityLevel::High => { + self. #channel_name .try_priority_send( + #support_crate ::make_packet(signals_received, #maybe_boxed_send) + ) + }, + }.map_err(|err| match err { #support_crate ::metered::TrySendError::Full(err_inner) => #support_crate ::metered::TrySendError::Full(#message_wrapper:: #consumes_variant ( #maybe_unbox_error )), #support_crate ::metered::TrySendError::Closed(err_inner) => #support_crate ::metered::TrySendError::Closed(#message_wrapper:: #consumes_variant ( #maybe_unbox_error )), }) diff --git a/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs b/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs index 15de1b5..1d93591 100644 --- a/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs +++ b/orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs @@ -278,21 +278,31 @@ pub(crate) fn impl_subsystem_sender( { async fn send_message(&mut self, msg: OutgoingMessage) { - self.channels.send_and_log_error( + self.send_message_with_priority::(msg).await; + } + + async fn send_message_with_priority(&mut self, msg: OutgoingMessage) + { + self.channels.send_and_log_error::

( self.signals_received.load(), <#all_messages_wrapper as ::std::convert::From<_>> ::from ( <#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg ) - ) + ), ).await; } fn try_send_message(&mut self, msg: OutgoingMessage) -> ::std::result::Result<(), #support_crate ::metered::TrySendError> { - self.channels.try_send( + self.try_send_message_with_priority::(msg) + } + + fn try_send_message_with_priority(&mut self, msg: OutgoingMessage) -> ::std::result::Result<(), #support_crate ::metered::TrySendError> + { + self.channels.try_send::

( self.signals_received.load(), <#all_messages_wrapper as ::std::convert::From<_>> ::from ( <#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg ) - ) + ), ).map_err(|err| match err { #support_crate ::metered::TrySendError::Full(inner) => #support_crate ::metered::TrySendError::Full(inner.try_into().expect("we should be able to unwrap what we wrap, qed")), #support_crate ::metered::TrySendError::Closed(inner) => #support_crate ::metered::TrySendError::Closed(inner.try_into().expect("we should be able to unwrap what we wrap, qed")), diff --git a/orchestra/proc-macro/src/parse/parse_orchestra_struct.rs b/orchestra/proc-macro/src/parse/parse_orchestra_struct.rs index 0a99eaa..1728c45 100644 --- a/orchestra/proc-macro/src/parse/parse_orchestra_struct.rs +++ b/orchestra/proc-macro/src/parse/parse_orchestra_struct.rs @@ -40,6 +40,7 @@ mod kw { syn::custom_keyword!(sends); syn::custom_keyword!(message_capacity); syn::custom_keyword!(signal_capacity); + syn::custom_keyword!(can_receive_priority_messages); } #[derive(Clone, Debug)] @@ -58,6 +59,8 @@ pub(crate) enum SubSysAttrItem { MessageChannelCapacity(ChannelCapacity), /// Custom signal channels capacity for this subsystem SignalChannelCapacity(ChannelCapacity), + /// The subsystem can receive priority messages + CanReceivePriorityMessages(kw::can_receive_priority_messages), } impl Parse for SubSysAttrItem { @@ -73,6 +76,8 @@ impl Parse for SubSysAttrItem { Self::MessageChannelCapacity(input.parse::>()?) } else if lookahead.peek(kw::signal_capacity) { Self::SignalChannelCapacity(input.parse::>()?) + } else if lookahead.peek(kw::can_receive_priority_messages) { + Self::CanReceivePriorityMessages(input.parse::()?) } else { Self::Consumes(input.parse::()?) }) @@ -100,6 +105,9 @@ impl ToTokens for SubSysAttrItem { Self::SignalChannelCapacity(_) => { quote! {} }, + Self::CanReceivePriorityMessages(can_receive_priority_messages) => { + quote! { #can_receive_priority_messages } + }, }; tokens.extend(ts.into_iter()); } @@ -130,6 +138,8 @@ pub(crate) struct SubSysField { pub(crate) message_capacity: Option, /// Custom signal channel capacity pub(crate) signal_capacity: Option, + /// The subsystem can receive priority messages + pub(crate) can_receive_priority_messages: bool, pub(crate) feature_gates: Option, } @@ -352,6 +362,8 @@ pub(crate) struct SubSystemAttrItems { pub(crate) message_capacity: Option>, /// Custom signal channel capacity pub(crate) signal_capacity: Option>, + /// The subsystem can receive priority messages + pub(crate) can_receive_priority_messages: bool, } impl Parse for SubSystemAttrItems { @@ -393,8 +405,18 @@ impl Parse for SubSystemAttrItems { let wip = extract_variant!(unique, Wip; default = false); let message_capacity = extract_variant!(unique, MessageChannelCapacity take ); let signal_capacity = extract_variant!(unique, SignalChannelCapacity take ); + let can_receive_priority_messages = + extract_variant!(unique, CanReceivePriorityMessages; default = false); - Ok(Self { blocking, wip, sends, consumes, message_capacity, signal_capacity }) + Ok(Self { + blocking, + wip, + sends, + consumes, + message_capacity, + signal_capacity, + can_receive_priority_messages, + }) } } @@ -487,6 +509,10 @@ impl<'a> SubsystemConfigSet<'a> { ) -> Vec { signal_channel_capacities_without_wip(&self.enabled_subsystems, default_capacity) } + + pub(crate) fn can_receive_priority_messages_without_wip(&self) -> Vec { + can_receive_priority_messages_without_wip(&self.enabled_subsystems) + } } impl OrchestraInfo { @@ -738,6 +764,7 @@ impl OrchestraGuts { sends, message_capacity, signal_capacity, + can_receive_priority_messages, .. } = subsystem_attrs; @@ -761,6 +788,7 @@ impl OrchestraGuts { message_capacity, signal_capacity, feature_gates, + can_receive_priority_messages, }); } else { // collect the "baggage" @@ -893,3 +921,13 @@ pub(crate) fn consumes_without_wip<'a, T: Borrow>(subsystems: &[T]) .map(|ssf| ssf.message_to_consume()) .collect::>() } + +pub(crate) fn can_receive_priority_messages_without_wip( + subsystems: &Vec<&SubSysField>, +) -> Vec { + subsystems + .iter() + .filter(|ssf| !ssf.wip) + .map(|ssf| syn::LitBool::new(ssf.can_receive_priority_messages, ssf.name.span())) + .collect::>() +} diff --git a/orchestra/src/lib.rs b/orchestra/src/lib.rs index 4dd50c4..2bf270f 100644 --- a/orchestra/src/lib.rs +++ b/orchestra/src/lib.rs @@ -494,6 +494,37 @@ where fn start(self, ctx: Ctx) -> SpawnedSubsystem; } +/// Priority of messages sending to the individual subsystems. +/// Only for the bounded channel sender. +pub enum PriorityLevel { + /// Normal priority. + Normal, + /// High priority. + High, +} +/// Normal priority. +pub struct NormalPriority; +/// High priority. +pub struct HighPriority; + +/// Describes the priority of the message. +pub trait Priority { + /// The priority level. + fn priority() -> PriorityLevel { + PriorityLevel::Normal + } +} +impl Priority for NormalPriority { + fn priority() -> PriorityLevel { + PriorityLevel::Normal + } +} +impl Priority for HighPriority { + fn priority() -> PriorityLevel { + PriorityLevel::High + } +} + /// Sender end of a channel to interface with a subsystem. #[async_trait::async_trait] pub trait SubsystemSender: Clone + Send + 'static @@ -503,6 +534,9 @@ where /// Send a direct message to some other `Subsystem`, routed based on message type. async fn send_message(&mut self, msg: OutgoingMessage); + /// Send a direct message with defined priority to some other `Subsystem`, routed based on message type. + async fn send_message_with_priority(&mut self, msg: OutgoingMessage); + /// Tries to send a direct message to some other `Subsystem`, routed based on message type. /// This method is useful for cases where the message queue is bounded and the message is ok /// to be dropped if the queue is full. If the queue is full, this method will return an error. @@ -512,6 +546,14 @@ where msg: OutgoingMessage, ) -> Result<(), metered::TrySendError>; + /// Tries to send a direct message with defined priority to some other `Subsystem`, routed based on message type. + /// If the queue is full, this method will return an error. + /// This method is not async and will not block the current task. + fn try_send_message_with_priority( + &mut self, + msg: OutgoingMessage, + ) -> Result<(), metered::TrySendError>; + /// Send multiple direct messages to other `Subsystem`s, routed based on message type. async fn send_messages(&mut self, msgs: I) where diff --git a/orchestra/tests/subsystem_with_priority_channel_test.rs b/orchestra/tests/subsystem_with_priority_channel_test.rs new file mode 100644 index 0000000..1f8f97c --- /dev/null +++ b/orchestra/tests/subsystem_with_priority_channel_test.rs @@ -0,0 +1,294 @@ +// Copyright (C) 2022 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use futures::executor::ThreadPool; +use orchestra::*; + +#[derive(Clone, Copy)] +enum SendingMethod { + Send, + TrySend, +} + +struct SubA { + normal: Vec, + priority: Vec, + sending_method: SendingMethod, + sub_a_received_tx: oneshot::Sender>, + sub_a_sent_tx: oneshot::Sender<()>, + sub_a_ready_for_signal_tx: oneshot::Sender<()>, + sub_b_sent_rx: oneshot::Receiver<()>, + orchestra_sent_to_sub_a_rx: oneshot::Receiver<()>, +} + +pub struct SubB { + normal: Vec, + priority: Vec, + sending_method: SendingMethod, + sub_b_received_tx: oneshot::Sender>, + sub_b_sent_tx: oneshot::Sender<()>, + sub_b_ready_for_signal_tx: oneshot::Sender<()>, + sub_a_sent_rx: oneshot::Receiver<()>, + orchestra_sent_to_sub_b_rx: oneshot::Receiver<()>, +} + +impl crate::Subsystem, OrchestraError> for SubA { + fn start(self, mut ctx: OrchestraSubsystemContext) -> SpawnedSubsystem { + let mut sender = ctx.sender().clone(); + SpawnedSubsystem { + name: "sub A", + future: Box::pin(async move { + let mut messages = vec![]; + let message_limit = self.normal.len() + self.priority.len(); + + for i in self.normal { + match self.sending_method { + SendingMethod::Send => sender.send_message(MsgB(i)).await, + SendingMethod::TrySend => sender.try_send_message(MsgB(i)).unwrap(), + } + } + for i in self.priority { + match self.sending_method { + SendingMethod::Send => + sender.send_message_with_priority::(MsgB(i)).await, + SendingMethod::TrySend => + sender.try_send_message_with_priority::(MsgB(i)).unwrap(), + } + } + + // Inform that the messages have been sent. + self.sub_a_sent_tx.send(()).unwrap(); + self.sub_a_ready_for_signal_tx.send(()).unwrap(); + + // Wait for others + self.orchestra_sent_to_sub_a_rx.await.unwrap(); + self.sub_b_sent_rx.await.unwrap(); + + while let Ok(received) = ctx.recv().await { + match received { + FromOrchestra::Communication { msg } => { + messages.push(msg.0); + }, + FromOrchestra::Signal(SigSigSig) => { + messages.push(u8::MAX); + }, + } + if messages.len() > message_limit { + break; + } + } + self.sub_a_received_tx.send(messages).unwrap(); + + Ok(()) + }), + } + } +} + +impl crate::Subsystem, OrchestraError> for SubB { + fn start(self, mut ctx: OrchestraSubsystemContext) -> SpawnedSubsystem { + let mut sender = ctx.sender().clone(); + SpawnedSubsystem { + name: "sub B", + future: Box::pin(async move { + let mut messages = vec![]; + let message_limit = self.normal.len() + self.priority.len(); + + for i in self.normal { + match self.sending_method { + SendingMethod::Send => sender.send_message(MsgA(i)).await, + SendingMethod::TrySend => sender.try_send_message(MsgA(i)).unwrap(), + } + } + for i in self.priority { + match self.sending_method { + SendingMethod::Send => + sender.send_message_with_priority::(MsgA(i)).await, + SendingMethod::TrySend => + sender.try_send_message_with_priority::(MsgA(i)).unwrap(), + } + } + + // Inform that the messages have been sent. + self.sub_b_sent_tx.send(()).unwrap(); + self.sub_b_ready_for_signal_tx.send(()).unwrap(); + + // Wait for others + self.orchestra_sent_to_sub_b_rx.await.unwrap(); + self.sub_a_sent_rx.await.unwrap(); + + while let Ok(received) = ctx.recv().await { + match received { + FromOrchestra::Communication { msg } => { + messages.push(msg.0); + }, + FromOrchestra::Signal(SigSigSig) => { + messages.push(u8::MAX); + }, + } + if messages.len() > message_limit { + break; + } + } + self.sub_b_received_tx.send(messages).unwrap(); + + Ok(()) + }), + } + } +} + +#[derive(Clone, Debug)] +pub struct SigSigSig; + +#[derive(Clone, Debug)] +pub struct Event; + +#[derive(Clone, Debug)] +#[allow(dead_code)] +pub struct MsgA(u8); + +#[derive(Clone, Debug)] +#[allow(dead_code)] +pub struct MsgB(u8); + +#[derive(Debug, Clone)] +pub struct DummySpawner(pub ThreadPool); + +impl Spawner for DummySpawner { + fn spawn_blocking( + &self, + _task_name: &'static str, + _subsystem_name: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + self.0.spawn_ok(future); + } + + fn spawn( + &self, + _task_name: &'static str, + _subsystem_name: Option<&'static str>, + future: futures::future::BoxFuture<'static, ()>, + ) { + self.0.spawn_ok(future); + } +} + +#[orchestra(signal=SigSigSig, event=Event, gen=AllMessages, error=OrchestraError, boxed_messages=true)] +pub struct Orchestra { + #[subsystem(consumes: MsgA, sends: [MsgB])] + sub_a: SubA, + + #[subsystem(consumes: MsgB, sends: [MsgA], can_receive_priority_messages)] + sub_b: SubB, +} + +async fn run_inner( + normal: Vec, + priority: Vec, + sending_method: SendingMethod, +) -> (Vec, Vec) { + let (sub_a_received_tx, mut sub_a_received_rx) = oneshot::channel::>(); + let (sub_b_received_tx, mut sub_b_received_rx) = oneshot::channel::>(); + + let (sub_a_sent_tx, sub_a_sent_rx) = oneshot::channel::<()>(); + let (sub_a_ready_for_signal_tx, sub_a_ready_for_signal_rx) = oneshot::channel::<()>(); + + let (sub_b_sent_tx, sub_b_sent_rx) = oneshot::channel::<()>(); + let (sub_b_ready_for_signal_tx, sub_b_ready_for_signal_rx) = oneshot::channel::<()>(); + + let (orchestra_sent_to_sub_a_tx, orchestra_sent_to_sub_a_rx) = oneshot::channel::<()>(); + let (orchestra_sent_to_sub_b_tx, orchestra_sent_to_sub_b_rx) = oneshot::channel::<()>(); + + let sub_a = SubA { + normal: normal.clone(), + priority: priority.clone(), + sending_method, + sub_a_sent_tx, + sub_a_received_tx, + sub_a_ready_for_signal_tx, + orchestra_sent_to_sub_a_rx, + sub_b_sent_rx, + }; + let sub_b = SubB { + normal: normal.clone(), + priority: priority.clone(), + sending_method, + sub_b_sent_tx, + sub_b_received_tx, + sub_b_ready_for_signal_tx, + orchestra_sent_to_sub_b_rx, + sub_a_sent_rx, + }; + let pool = ThreadPool::new().unwrap(); + let (mut orchestra, _handle) = Orchestra::builder() + .sub_a(sub_a) + .sub_b(sub_b) + .spawner(DummySpawner(pool)) + .build() + .unwrap(); + + // Wait until both subsystems sent their messages + sub_a_ready_for_signal_rx.await.unwrap(); + sub_b_ready_for_signal_rx.await.unwrap(); + + // Subsystems are waiting for a signal from the orchestra + orchestra.broadcast_signal(SigSigSig).await.unwrap(); + + // Allow both subsystems to receive messages + orchestra_sent_to_sub_a_tx.send(()).unwrap(); + orchestra_sent_to_sub_b_tx.send(()).unwrap(); + + for run_subsystem in orchestra.running_subsystems { + run_subsystem.await.unwrap(); + } + + (sub_a_received_rx.try_recv().unwrap().unwrap(), sub_b_received_rx.try_recv().unwrap().unwrap()) +} + +#[test] +fn test_priority_send_message() { + let (sub_a_received, sub_b_received) = + futures::executor::block_on(run_inner(vec![1, 2, 3], vec![42], SendingMethod::Send)); + + // SubA can't receive priority messages, so it receives messages in the order they were sent + // u8::MAX - signal is first + // 1, 2, 3 - normal messages + // 42 - priority message + assert_eq!(vec![u8::MAX, 1, 2, 3, 42], sub_a_received); + // SubB receive priority messages first + // u8::MAX - signal is first + // 42 - priority message + // 1, 2, 3 - normal messages + assert_eq!(vec![u8::MAX, 42, 1, 2, 3], sub_b_received); +} + +#[test] +fn test_try_priority_send_message() { + let (sub_a_received, sub_b_received) = + futures::executor::block_on(run_inner(vec![1, 2, 3], vec![42], SendingMethod::TrySend)); + + // SubA can't receive priority messages, so it receives messages in the order they were sent + // u8::MAX - signal is first + // 1, 2, 3 - normal messages + // 42 - priority message + assert_eq!(vec![u8::MAX, 1, 2, 3, 42], sub_a_received); + // SubB receive priority messages first + // u8::MAX - signal is first + // 42 - priority message + // 1, 2, 3 - normal messages + assert_eq!(vec![u8::MAX, 42, 1, 2, 3], sub_b_received); +}