Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow subsystems using priority channels #77

Merged
merged 16 commits into from
Jun 7, 2024
Merged
15 changes: 12 additions & 3 deletions orchestra/proc-macro/src/impl_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ pub(crate) fn impl_feature_gated_items(
let channel_name_rx = &cfg_set.channel_names_without_wip("_rx");
let channel_name_unbounded_rx = &info.channel_names_without_wip("_unbounded_rx");

let can_receive_priority_messages = &cfg_set.can_receive_priority_messages_without_wip();

let baggage_name = &info.baggage_names();
let baggage_generic_ty = &info.baggage_generic_types();

Expand Down Expand Up @@ -633,13 +635,20 @@ pub(crate) fn impl_feature_gated_items(
>();

#(
let (#channel_name_tx, #channel_name_rx)
=
let (#channel_name_tx, #channel_name_rx) = if #can_receive_priority_messages {
#support_crate ::metered::channel_with_priority::<
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Should we make this configurable per-subsystem ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but I think we'll have little to gain considering the number of subsystems.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least from Polkadot perspective, we only need the priority channels for the networking subsystems, so it makes sense to have it configurable per subsystem.

Copy link
Collaborator

@drahnr drahnr Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be in favor of having it per subsystem or for everything. We will poll twice as many channels, in the empty priority channel case, which will be the default for most I presume.

MessagePacket< #maybe_boxed_consumes >
>(
self.channel_capacity.unwrap_or(#message_channel_capacity),
self.channel_capacity.unwrap_or(#message_channel_capacity)
)
} else {
#support_crate ::metered::channel::<
MessagePacket< #maybe_boxed_consumes >
>(
self.channel_capacity.unwrap_or(#message_channel_capacity)
);
)
};
)*

#(
Expand Down
36 changes: 27 additions & 9 deletions orchestra/proc-macro/src/impl_channels_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,18 +77,27 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result<proc_macr
// when no defined messages in enum
impl ChannelsOut {
/// Send a message via a bounded channel.
pub async fn send_and_log_error(
pub async fn send_and_log_error<P: Priority>(
&mut self,
signals_received: usize,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see a unit test regarding how signals_received is handled and how that effects the priority vs non-priority messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added signals to tests

message: #message_wrapper,
message: #message_wrapper
) {
let res: ::std::result::Result<_, _> = match message {
#(
#feature_gates
#message_wrapper :: #consumes_variant ( inner ) => {
self. #channel_name .send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).await.map_err(|_| stringify!( #channel_name ))
match P::priority() {
PriorityLevel::Normal => {
self. #channel_name .send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).await
},
PriorityLevel::High => {
self. #channel_name .priority_send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).await
},
}.map_err(|_| stringify!( #channel_name ))
}
)*
// subsystems that are wip
Expand Down Expand Up @@ -116,7 +125,7 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result<proc_macr
}

/// Try to send a message via a bounded channel.
pub fn try_send(
pub fn try_send<P: Priority>(
&mut self,
signals_received: usize,
message: #message_wrapper,
Expand All @@ -125,9 +134,18 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result<proc_macr
#(
#feature_gates
#message_wrapper :: #consumes_variant ( inner ) => {
self. #channel_name .try_send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).map_err(|err| match err {
match P::priority() {
PriorityLevel::Normal => {
self. #channel_name .try_send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
)
},
PriorityLevel::High => {
self. #channel_name .try_priority_send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
)
},
}.map_err(|err| match err {
#support_crate ::metered::TrySendError::Full(err_inner) => #support_crate ::metered::TrySendError::Full(#message_wrapper:: #consumes_variant ( #maybe_unbox_error )),
#support_crate ::metered::TrySendError::Closed(err_inner) => #support_crate ::metered::TrySendError::Closed(#message_wrapper:: #consumes_variant ( #maybe_unbox_error )),
})
Expand Down
18 changes: 14 additions & 4 deletions orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,21 +278,31 @@ pub(crate) fn impl_subsystem_sender(
{
async fn send_message(&mut self, msg: OutgoingMessage)
{
self.channels.send_and_log_error(
self.send_message_with_priority::<NormalPriority>(msg).await;
}

async fn send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage)
{
self.channels.send_and_log_error::<P>(
self.signals_received.load(),
<#all_messages_wrapper as ::std::convert::From<_>> ::from (
<#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg )
)
),
).await;
}

fn try_send_message(&mut self, msg: OutgoingMessage) -> ::std::result::Result<(), #support_crate ::metered::TrySendError<OutgoingMessage>>
{
self.channels.try_send(
self.try_send_message_with_priority::<NormalPriority>(msg)
}

fn try_send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage) -> ::std::result::Result<(), #support_crate ::metered::TrySendError<OutgoingMessage>>
{
self.channels.try_send::<P>(
self.signals_received.load(),
<#all_messages_wrapper as ::std::convert::From<_>> ::from (
<#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg )
)
),
).map_err(|err| match err {
#support_crate ::metered::TrySendError::Full(inner) => #support_crate ::metered::TrySendError::Full(inner.try_into().expect("we should be able to unwrap what we wrap, qed")),
#support_crate ::metered::TrySendError::Closed(inner) => #support_crate ::metered::TrySendError::Closed(inner.try_into().expect("we should be able to unwrap what we wrap, qed")),
Expand Down
40 changes: 39 additions & 1 deletion orchestra/proc-macro/src/parse/parse_orchestra_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod kw {
syn::custom_keyword!(sends);
syn::custom_keyword!(message_capacity);
syn::custom_keyword!(signal_capacity);
syn::custom_keyword!(can_receive_priority_messages);
drahnr marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Clone, Debug)]
Expand All @@ -58,6 +59,8 @@ pub(crate) enum SubSysAttrItem {
MessageChannelCapacity(ChannelCapacity<kw::message_capacity>),
/// Custom signal channels capacity for this subsystem
SignalChannelCapacity(ChannelCapacity<kw::signal_capacity>),
/// The subsystem can receive priority messages
CanReceivePriorityMessages(kw::can_receive_priority_messages),
}

impl Parse for SubSysAttrItem {
Expand All @@ -73,6 +76,8 @@ impl Parse for SubSysAttrItem {
Self::MessageChannelCapacity(input.parse::<ChannelCapacity<kw::message_capacity>>()?)
} else if lookahead.peek(kw::signal_capacity) {
Self::SignalChannelCapacity(input.parse::<ChannelCapacity<kw::signal_capacity>>()?)
} else if lookahead.peek(kw::can_receive_priority_messages) {
Self::CanReceivePriorityMessages(input.parse::<kw::can_receive_priority_messages>()?)
} else {
Self::Consumes(input.parse::<Consumes>()?)
})
Expand Down Expand Up @@ -100,6 +105,9 @@ impl ToTokens for SubSysAttrItem {
Self::SignalChannelCapacity(_) => {
quote! {}
},
Self::CanReceivePriorityMessages(can_receive_priority_messages) => {
quote! { #can_receive_priority_messages }
},
};
tokens.extend(ts.into_iter());
}
Expand Down Expand Up @@ -130,6 +138,8 @@ pub(crate) struct SubSysField {
pub(crate) message_capacity: Option<usize>,
/// Custom signal channel capacity
pub(crate) signal_capacity: Option<usize>,
/// The subsystem can receive priority messages
pub(crate) can_receive_priority_messages: bool,

pub(crate) feature_gates: Option<CfgPredicate>,
}
Expand Down Expand Up @@ -352,6 +362,8 @@ pub(crate) struct SubSystemAttrItems {
pub(crate) message_capacity: Option<ChannelCapacity<kw::message_capacity>>,
/// Custom signal channel capacity
pub(crate) signal_capacity: Option<ChannelCapacity<kw::signal_capacity>>,
/// The subsystem can receive priority messages
pub(crate) can_receive_priority_messages: bool,
}

impl Parse for SubSystemAttrItems {
Expand Down Expand Up @@ -393,8 +405,18 @@ impl Parse for SubSystemAttrItems {
let wip = extract_variant!(unique, Wip; default = false);
let message_capacity = extract_variant!(unique, MessageChannelCapacity take );
let signal_capacity = extract_variant!(unique, SignalChannelCapacity take );
let can_receive_priority_messages =
extract_variant!(unique, CanReceivePriorityMessages; default = false);

Ok(Self { blocking, wip, sends, consumes, message_capacity, signal_capacity })
Ok(Self {
blocking,
wip,
sends,
consumes,
message_capacity,
signal_capacity,
can_receive_priority_messages,
})
}
}

Expand Down Expand Up @@ -487,6 +509,10 @@ impl<'a> SubsystemConfigSet<'a> {
) -> Vec<LitInt> {
signal_channel_capacities_without_wip(&self.enabled_subsystems, default_capacity)
}

pub(crate) fn can_receive_priority_messages_without_wip(&self) -> Vec<syn::LitBool> {
can_receive_priority_messages_without_wip(&self.enabled_subsystems)
}
}

impl OrchestraInfo {
Expand Down Expand Up @@ -738,6 +764,7 @@ impl OrchestraGuts {
sends,
message_capacity,
signal_capacity,
can_receive_priority_messages,
..
} = subsystem_attrs;

Expand All @@ -761,6 +788,7 @@ impl OrchestraGuts {
message_capacity,
signal_capacity,
feature_gates,
can_receive_priority_messages,
});
} else {
// collect the "baggage"
Expand Down Expand Up @@ -893,3 +921,13 @@ pub(crate) fn consumes_without_wip<'a, T: Borrow<SubSysField>>(subsystems: &[T])
.map(|ssf| ssf.message_to_consume())
.collect::<Vec<_>>()
}

pub(crate) fn can_receive_priority_messages_without_wip(
subsystems: &Vec<&SubSysField>,
) -> Vec<syn::LitBool> {
subsystems
.iter()
.filter(|ssf| !ssf.wip)
.map(|ssf| syn::LitBool::new(ssf.can_receive_priority_messages, ssf.name.span()))
.collect::<Vec<_>>()
}
42 changes: 42 additions & 0 deletions orchestra/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,37 @@ where
fn start(self, ctx: Ctx) -> SpawnedSubsystem<E>;
}

/// Priority of messages sending to the individual subsystems.
/// Only for the bounded channel sender.
pub enum PriorityLevel {
/// Normal priority.
Normal,
/// High priority.
High,
}
/// Normal priority.
pub struct NormalPriority;
/// High priority.
pub struct HighPriority;

/// Describes the priority of the message.
pub trait Priority {
/// The priority level.
fn priority() -> PriorityLevel {
PriorityLevel::Normal
}
}
impl Priority for NormalPriority {
fn priority() -> PriorityLevel {
PriorityLevel::Normal
}
}
impl Priority for HighPriority {
fn priority() -> PriorityLevel {
PriorityLevel::High
}
}

/// Sender end of a channel to interface with a subsystem.
#[async_trait::async_trait]
pub trait SubsystemSender<OutgoingMessage>: Clone + Send + 'static
Expand All @@ -503,6 +534,9 @@ where
/// Send a direct message to some other `Subsystem`, routed based on message type.
async fn send_message(&mut self, msg: OutgoingMessage);

/// Send a direct message with defined priority to some other `Subsystem`, routed based on message type.
async fn send_message_with_priority<P: Priority>(&mut self, msg: OutgoingMessage);

/// Tries to send a direct message to some other `Subsystem`, routed based on message type.
/// This method is useful for cases where the message queue is bounded and the message is ok
/// to be dropped if the queue is full. If the queue is full, this method will return an error.
Expand All @@ -512,6 +546,14 @@ where
msg: OutgoingMessage,
) -> Result<(), metered::TrySendError<OutgoingMessage>>;

/// Tries to send a direct message with defined priority to some other `Subsystem`, routed based on message type.
/// If the queue is full, this method will return an error.
/// This method is not async and will not block the current task.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't make much sense to drop priority messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Does it mean that we should not have the try_priority_send at all?

fn try_send_message_with_priority<P: Priority>(
&mut self,
msg: OutgoingMessage,
) -> Result<(), metered::TrySendError<OutgoingMessage>>;

/// Send multiple direct messages to other `Subsystem`s, routed based on message type.
async fn send_messages<I>(&mut self, msgs: I)
where
Expand Down
Loading
Loading