Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow subsystems using priority channels #77

Merged
merged 16 commits into from
Jun 7, 2024
Merged
14 changes: 11 additions & 3 deletions orchestra/proc-macro/src/impl_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ pub(crate) fn impl_feature_gated_items(
let channel_name_rx = &cfg_set.channel_names_without_wip("_rx");
let channel_name_unbounded_rx = &info.channel_names_without_wip("_unbounded_rx");

let with_priority_channel = &cfg_set.with_priority_channel_without_wip();

let baggage_name = &info.baggage_names();
let baggage_generic_ty = &info.baggage_generic_types();

Expand Down Expand Up @@ -633,14 +635,20 @@ pub(crate) fn impl_feature_gated_items(
>();

#(
let (#channel_name_tx, #channel_name_rx)
=
let (#channel_name_tx, #channel_name_rx) = if #with_priority_channel {
#support_crate ::metered::channel_with_priority::<
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Should we make this configurable per-subsystem ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but I think we'll have little to gain considering the number of subsystems.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least from Polkadot perspective, we only need the priority channels for the networking subsystems, so it makes sense to have it configurable per subsystem.

Copy link
Collaborator

@drahnr drahnr Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be in favor of having it per subsystem or for everything. We will poll twice as many channels, in the empty priority channel case, which will be the default for most I presume.

MessagePacket< #maybe_boxed_consumes >
>(
self.channel_capacity.unwrap_or(#message_channel_capacity),
self.channel_capacity.unwrap_or(#message_channel_capacity)
);
)
} else {
#support_crate ::metered::channel::<
MessagePacket< #maybe_boxed_consumes >
>(
self.channel_capacity.unwrap_or(#message_channel_capacity)
)
};
)*

#(
Expand Down
39 changes: 38 additions & 1 deletion orchestra/proc-macro/src/parse/parse_orchestra_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ mod kw {
syn::custom_keyword!(sends);
syn::custom_keyword!(message_capacity);
syn::custom_keyword!(signal_capacity);
syn::custom_keyword!(with_priority_channel);
}

#[derive(Clone, Debug)]
Expand All @@ -58,6 +59,8 @@ pub(crate) enum SubSysAttrItem {
MessageChannelCapacity(ChannelCapacity<kw::message_capacity>),
/// Custom signal channels capacity for this subsystem
SignalChannelCapacity(ChannelCapacity<kw::signal_capacity>),
/// The subsystem can send priority messages
WithPriorityChannel(kw::with_priority_channel),
}

impl Parse for SubSysAttrItem {
Expand All @@ -73,6 +76,8 @@ impl Parse for SubSysAttrItem {
Self::MessageChannelCapacity(input.parse::<ChannelCapacity<kw::message_capacity>>()?)
} else if lookahead.peek(kw::signal_capacity) {
Self::SignalChannelCapacity(input.parse::<ChannelCapacity<kw::signal_capacity>>()?)
} else if lookahead.peek(kw::with_priority_channel) {
Self::WithPriorityChannel(input.parse::<kw::with_priority_channel>()?)
} else {
Self::Consumes(input.parse::<Consumes>()?)
})
Expand Down Expand Up @@ -100,6 +105,9 @@ impl ToTokens for SubSysAttrItem {
Self::SignalChannelCapacity(_) => {
quote! {}
},
Self::WithPriorityChannel(with_priority_channel) => {
quote! { #with_priority_channel }
},
};
tokens.extend(ts.into_iter());
}
Expand Down Expand Up @@ -130,6 +138,8 @@ pub(crate) struct SubSysField {
pub(crate) message_capacity: Option<usize>,
/// Custom signal channel capacity
pub(crate) signal_capacity: Option<usize>,
/// The subsystem can send priority messages
pub(crate) with_priority_channel: bool,

pub(crate) feature_gates: Option<CfgPredicate>,
}
Expand Down Expand Up @@ -352,6 +362,8 @@ pub(crate) struct SubSystemAttrItems {
pub(crate) message_capacity: Option<ChannelCapacity<kw::message_capacity>>,
/// Custom signal channel capacity
pub(crate) signal_capacity: Option<ChannelCapacity<kw::signal_capacity>>,
/// The subsystem can send priority messages
pub(crate) with_priority_channel: bool,
drahnr marked this conversation as resolved.
Show resolved Hide resolved
}

impl Parse for SubSystemAttrItems {
Expand Down Expand Up @@ -393,8 +405,17 @@ impl Parse for SubSystemAttrItems {
let wip = extract_variant!(unique, Wip; default = false);
let message_capacity = extract_variant!(unique, MessageChannelCapacity take );
let signal_capacity = extract_variant!(unique, SignalChannelCapacity take );
let with_priority_channel = extract_variant!(unique, WithPriorityChannel; default = false);

Ok(Self { blocking, wip, sends, consumes, message_capacity, signal_capacity })
Ok(Self {
blocking,
wip,
sends,
consumes,
message_capacity,
signal_capacity,
with_priority_channel,
drahnr marked this conversation as resolved.
Show resolved Hide resolved
})
}
}

Expand Down Expand Up @@ -487,6 +508,10 @@ impl<'a> SubsystemConfigSet<'a> {
) -> Vec<LitInt> {
signal_channel_capacities_without_wip(&self.enabled_subsystems, default_capacity)
}

pub(crate) fn with_priority_channel_without_wip(&self) -> Vec<TokenStream> {
with_priority_channel_without_wip(&self.enabled_subsystems)
}
}

impl OrchestraInfo {
Expand Down Expand Up @@ -738,6 +763,7 @@ impl OrchestraGuts {
sends,
message_capacity,
signal_capacity,
with_priority_channel,
..
} = subsystem_attrs;

Expand All @@ -761,6 +787,7 @@ impl OrchestraGuts {
message_capacity,
signal_capacity,
feature_gates,
with_priority_channel,
});
} else {
// collect the "baggage"
Expand Down Expand Up @@ -893,3 +920,13 @@ pub(crate) fn consumes_without_wip<'a, T: Borrow<SubSysField>>(subsystems: &[T])
.map(|ssf| ssf.message_to_consume())
.collect::<Vec<_>>()
}

pub(crate) fn with_priority_channel_without_wip(
subsystems: &Vec<&SubSysField>,
) -> Vec<TokenStream> {
subsystems
.iter()
.filter(|ssf| !ssf.wip)
.map(|ssf| if ssf.with_priority_channel { quote!(true) } else { quote!(false) })
.collect::<Vec<_>>()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,28 @@
use futures::executor::ThreadPool;
use orchestra::*;

#[derive(Clone, Copy)]
enum SendingMethod {
Send,
TrySend,
}

struct SubA {
regular: Vec<u8>,
normal: Vec<u8>,
priority: Vec<u8>,
sending_method: SendingMethod,
ready_sender: oneshot::Sender<()>,
sub_b_sent_rx: oneshot::Receiver<()>,
sub_a_sent_tx: oneshot::Sender<()>,
sub_a_received_tx: oneshot::Sender<Vec<u8>>,
}

pub struct SubB {
message_limit: usize,
result_sender: oneshot::Sender<Vec<u8>>,
ready_receiver: oneshot::Receiver<()>,
}

impl SubA {
fn new(
regular: Vec<u8>,
priority: Vec<u8>,
sending_method: SendingMethod,
ready_sender: oneshot::Sender<()>,
) -> Self {
Self { regular, priority, sending_method, ready_sender }
}
}

impl SubB {
fn new(
message_limit: usize,
result_sender: oneshot::Sender<Vec<u8>>,
ready_receiver: oneshot::Receiver<()>,
) -> Self {
Self { message_limit, result_sender, ready_receiver }
}
normal: Vec<u8>,
priority: Vec<u8>,
sending_method: SendingMethod,
sub_a_sent_rx: oneshot::Receiver<()>,
sub_b_sent_tx: oneshot::Sender<()>,
sub_b_received_tx: oneshot::Sender<Vec<u8>>,
}

impl crate::Subsystem<OrchestraSubsystemContext<MsgA>, OrchestraError> for SubA {
Expand All @@ -61,7 +46,10 @@ impl crate::Subsystem<OrchestraSubsystemContext<MsgA>, OrchestraError> for SubA
SpawnedSubsystem {
name: "sub A",
future: Box::pin(async move {
for i in self.regular {
let mut messages = vec![];
let message_limit = self.normal.len() + self.priority.len();

for i in self.normal {
match self.sending_method {
SendingMethod::Send => sender.send_message(MsgB(i)).await,
SendingMethod::TrySend => sender.try_send_message(MsgB(i)).unwrap(),
Expand All @@ -75,7 +63,18 @@ impl crate::Subsystem<OrchestraSubsystemContext<MsgA>, OrchestraError> for SubA
}
}

self.ready_sender.send(()).unwrap();
self.sub_a_sent_tx.send(()).unwrap();
self.sub_b_sent_rx.await.unwrap();

for _ in 0..message_limit {
match ctx.recv().await.unwrap() {
FromOrchestra::Communication { msg } => {
messages.push(msg.0);
},
_ => panic!("unexpected message"),
}
}
self.sub_a_received_tx.send(messages).unwrap();

Ok(())
}),
Expand All @@ -85,21 +84,39 @@ impl crate::Subsystem<OrchestraSubsystemContext<MsgA>, OrchestraError> for SubA

impl crate::Subsystem<OrchestraSubsystemContext<MsgB>, OrchestraError> for SubB {
fn start(self, mut ctx: OrchestraSubsystemContext<MsgB>) -> SpawnedSubsystem<OrchestraError> {
let mut sender = ctx.sender().clone();
SpawnedSubsystem {
name: "sub B",
future: Box::pin(async move {
let mut messages = vec![];
// Wait until sub_a sends all messages
self.ready_receiver.await.unwrap();
for _ in 0..self.message_limit {
let message_limit = self.normal.len() + self.priority.len();

for i in self.normal {
match self.sending_method {
SendingMethod::Send => sender.send_message(MsgA(i)).await,
SendingMethod::TrySend => sender.try_send_message(MsgA(i)).unwrap(),
}
}
for i in self.priority {
match self.sending_method {
SendingMethod::Send => sender.priority_send_message(MsgA(i)).await,
SendingMethod::TrySend =>
sender.try_priority_send_message(MsgA(i)).unwrap(),
}
}

self.sub_b_sent_tx.send(()).unwrap();
self.sub_a_sent_rx.await.unwrap();

for _ in 0..message_limit {
match ctx.recv().await.unwrap() {
FromOrchestra::Communication { msg } => {
messages.push(msg.0);
},
_ => panic!("unexpected message"),
}
}
self.result_sender.send(messages).unwrap();
self.sub_b_received_tx.send(messages).unwrap();

Ok(())
}),
Expand Down Expand Up @@ -149,18 +166,36 @@ pub struct Orchestra {
#[subsystem(consumes: MsgA, sends: [MsgB])]
sub_a: SubA,

#[subsystem(consumes: MsgB, sends: [MsgA])]
#[subsystem(consumes: MsgB, sends: [MsgA], with_priority_channel)]
sub_b: SubB,
}

#[test]
fn test_priority_send_message() {
let (result_tx, mut result_rx) = oneshot::channel::<Vec<u8>>();
let (ready_tx, ready_rx) = oneshot::channel::<()>();
let regular = vec![1, 2, 3];
let priority = vec![42];
let sub_a = SubA::new(regular.clone(), priority.clone(), SendingMethod::Send, ready_tx);
let sub_b = SubB::new(regular.len() + priority.len(), result_tx, ready_rx);
async fn run_inner(
normal: Vec<u8>,
priority: Vec<u8>,
sending_method: SendingMethod,
) -> (Vec<u8>, Vec<u8>) {
let (sub_a_received_tx, mut sub_a_received_rx) = oneshot::channel::<Vec<u8>>();
let (sub_b_received_tx, mut sub_b_received_rx) = oneshot::channel::<Vec<u8>>();
let (sub_a_sent_tx, sub_a_sent_rx) = oneshot::channel::<()>();
let (sub_b_sent_tx, sub_b_sent_rx) = oneshot::channel::<()>();

let sub_a = SubA {
normal: normal.clone(),
priority: priority.clone(),
sending_method,
sub_b_sent_rx,
sub_a_sent_tx,
sub_a_received_tx,
};
let sub_b = SubB {
normal: normal.clone(),
priority: priority.clone(),
sending_method,
sub_a_sent_rx,
sub_b_sent_tx,
sub_b_received_tx,
};
let pool = ThreadPool::new().unwrap();
let (orchestra, _handle) = Orchestra::builder()
.sub_a(sub_a)
Expand All @@ -169,42 +204,27 @@ fn test_priority_send_message() {
.build()
.unwrap();

futures::executor::block_on(async move {
for run_subsystem in orchestra.running_subsystems {
run_subsystem.await.unwrap();
}
});
for run_subsystem in orchestra.running_subsystems {
run_subsystem.await.unwrap();
}

assert_eq!(
priority.into_iter().chain(regular.into_iter()).collect::<Vec<u8>>(),
result_rx.try_recv().unwrap().unwrap()
);
(sub_a_received_rx.try_recv().unwrap().unwrap(), sub_b_received_rx.try_recv().unwrap().unwrap())
}

#[test]
fn test_try_priority_send_message() {
let (result_tx, mut result_rx) = oneshot::channel::<Vec<u8>>();
let (ready_tx, ready_rx) = oneshot::channel::<()>();
let regular = vec![1, 2, 3];
let priority = vec![42];
let sub_a = SubA::new(regular.clone(), priority.clone(), SendingMethod::TrySend, ready_tx);
let sub_b = SubB::new(regular.len() + priority.len(), result_tx, ready_rx);
let pool = ThreadPool::new().unwrap();
let (orchestra, _handle) = Orchestra::builder()
.sub_a(sub_a)
.sub_b(sub_b)
.spawner(DummySpawner(pool))
.build()
.unwrap();
fn test_priority_send_message() {
let (sub_a_received, sub_b_received) =
futures::executor::block_on(run_inner(vec![1, 2, 3], vec![42], SendingMethod::Send));

futures::executor::block_on(async move {
for run_subsystem in orchestra.running_subsystems {
run_subsystem.await.unwrap();
}
});
assert_eq!(vec![1, 2, 3, 42], sub_a_received);
assert_eq!(vec![42, 1, 2, 3], sub_b_received);
}

#[test]
fn test_try_priority_send_message() {
let (sub_a_received, sub_b_received) =
futures::executor::block_on(run_inner(vec![1, 2, 3], vec![42], SendingMethod::TrySend));

assert_eq!(
priority.into_iter().chain(regular.into_iter()).collect::<Vec<u8>>(),
result_rx.try_recv().unwrap().unwrap()
);
assert_eq!(vec![1, 2, 3, 42], sub_a_received);
assert_eq!(vec![42, 1, 2, 3], sub_b_received);
}
Loading