Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow subsystems using priority channels #77

Merged
merged 16 commits into from
Jun 7, 2024
Merged
3 changes: 2 additions & 1 deletion orchestra/proc-macro/src/impl_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,9 +635,10 @@ pub(crate) fn impl_feature_gated_items(
#(
let (#channel_name_tx, #channel_name_rx)
=
#support_crate ::metered::channel::<
#support_crate ::metered::channel_with_priority::<
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Should we make this configurable per-subsystem ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but I think we'll have little to gain considering the number of subsystems.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least from Polkadot perspective, we only need the priority channels for the networking subsystems, so it makes sense to have it configurable per subsystem.

Copy link
Collaborator

@drahnr drahnr Jun 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be in favor of having it per subsystem or for everything. We will poll twice as many channels, in the empty priority channel case, which will be the default for most I presume.

MessagePacket< #maybe_boxed_consumes >
>(
self.channel_capacity.unwrap_or(#message_channel_capacity),
self.channel_capacity.unwrap_or(#message_channel_capacity)
);
)*
Expand Down
21 changes: 18 additions & 3 deletions orchestra/proc-macro/src/impl_channels_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result<proc_macr
)*
}

/// Priority of messages sending to the individual subsystems.
/// Only for the bounded channel sender.
#[derive(Debug)]
pub enum ChannelsOutPriority {
Normal,
High,
}

#[allow(unreachable_code)]
// when no defined messages in enum
impl ChannelsOut {
Expand All @@ -81,14 +89,21 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result<proc_macr
&mut self,
signals_received: usize,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to see a unit test regarding how signals_received is handled and how that effects the priority vs non-priority messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added signals to tests

message: #message_wrapper,
priority: ChannelsOutPriority,
drahnr marked this conversation as resolved.
Show resolved Hide resolved
) {
let res: ::std::result::Result<_, _> = match message {
#(
#feature_gates
#message_wrapper :: #consumes_variant ( inner ) => {
self. #channel_name .send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).await.map_err(|_| stringify!( #channel_name ))
if matches!(priority, ChannelsOutPriority::High) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you add the same logic for try_send for completeness ?

self. #channel_name .priority_send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).await.map_err(|_| stringify!( #channel_name ))
} else {
self. #channel_name .send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).await.map_err(|_| stringify!( #channel_name ))
}
}
)*
// subsystems that are wip
Expand Down
14 changes: 13 additions & 1 deletion orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,19 @@ pub(crate) fn impl_subsystem_sender(
self.signals_received.load(),
<#all_messages_wrapper as ::std::convert::From<_>> ::from (
<#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg )
)
),
ChannelsOutPriority::Normal,
).await;
}

async fn priority_send_message(&mut self, msg: OutgoingMessage)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use the generic with default value, we could avoid this additional public API

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean we should make priority markers public? Considering, that I couldn't set defaults it's a bit inconvenient to write them every time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, in that case I propose a slighly different API send_message and send_message_with_priority<Priority> which is called by send_message.

{
self.channels.send_and_log_error(
self.signals_received.load(),
<#all_messages_wrapper as ::std::convert::From<_>> ::from (
<#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg )
),
ChannelsOutPriority::High,
).await;
}

Expand Down
3 changes: 3 additions & 0 deletions orchestra/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,9 @@ where
/// Send a direct message to some other `Subsystem`, routed based on message type.
async fn send_message(&mut self, msg: OutgoingMessage);

/// Send a direct priority message to some other `Subsystem`, routed based on message type.
async fn priority_send_message(&mut self, msg: OutgoingMessage);

/// Tries to send a direct message to some other `Subsystem`, routed based on message type.
/// This method is useful for cases where the message queue is bounded and the message is ok
/// to be dropped if the queue is full. If the queue is full, this method will return an error.
Expand Down
155 changes: 155 additions & 0 deletions orchestra/tests/subsystems_priority_channels_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright (C) 2022 Parity Technologies (UK) Ltd.
// SPDX-License-Identifier: Apache-2.0

// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use futures::executor::ThreadPool;
use orchestra::*;

struct SubA {
regular: Vec<u8>,
priority: Vec<u8>,
}

pub struct SubB {
message_limit: usize,
result_sender: oneshot::Sender<Vec<u8>>,
}

impl SubA {
fn new(regular: Vec<u8>, priority: Vec<u8>) -> Self {
Self { regular, priority }
}
}

impl SubB {
fn new(message_limit: usize, result_sender: oneshot::Sender<Vec<u8>>) -> Self {
Self { message_limit, result_sender }
}
}

impl crate::Subsystem<OrchestraSubsystemContext<MsgA>, OrchestraError> for SubA {
fn start(self, mut ctx: OrchestraSubsystemContext<MsgA>) -> SpawnedSubsystem<OrchestraError> {
let mut sender = ctx.sender().clone();
SpawnedSubsystem {
name: "sub A",
future: Box::pin(async move {
for i in self.regular {
sender.send_message(MsgB(i)).await;
}
for i in self.priority {
sender.priority_send_message(MsgB(i)).await;
}

Ok(())
}),
}
}
}

impl crate::Subsystem<OrchestraSubsystemContext<MsgB>, OrchestraError> for SubB {
fn start(self, mut ctx: OrchestraSubsystemContext<MsgB>) -> SpawnedSubsystem<OrchestraError> {
SpawnedSubsystem {
name: "sub B",
future: Box::pin(async move {
let mut messages = vec![];
// Wait until sub_a sends all messages
futures_timer::Delay::new(Duration::from_millis(50)).await;
drahnr marked this conversation as resolved.
Show resolved Hide resolved
for _ in 0..self.message_limit {
match ctx.recv().await.unwrap() {
FromOrchestra::Communication { msg } => {
messages.push(msg.0);
},
_ => panic!("unexpected message"),
}
}
self.result_sender.send(messages).unwrap();

Ok(())
}),
}
}
}

#[derive(Clone, Debug)]
pub struct SigSigSig;

#[derive(Clone, Debug)]
pub struct Event;

#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct MsgA(u8);

#[derive(Clone, Debug)]
#[allow(dead_code)]
pub struct MsgB(u8);

#[derive(Debug, Clone)]
pub struct DummySpawner(pub ThreadPool);

impl Spawner for DummySpawner {
fn spawn_blocking(
&self,
_task_name: &'static str,
_subsystem_name: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.0.spawn_ok(future);
}

fn spawn(
&self,
_task_name: &'static str,
_subsystem_name: Option<&'static str>,
future: futures::future::BoxFuture<'static, ()>,
) {
self.0.spawn_ok(future);
}
}

#[orchestra(signal=SigSigSig, event=Event, gen=AllMessages, error=OrchestraError, boxed_messages=true)]
pub struct Orchestra {
#[subsystem(consumes: MsgA, sends: [MsgB])]
sub_a: SubA,

#[subsystem(consumes: MsgB, sends: [MsgA])]
sub_b: SubB,
}

#[test]
fn test_priority_send_message() {
let (tx, mut rx) = oneshot::channel::<Vec<u8>>();
let regular = vec![1, 2, 3];
let priority = vec![42];
let sub_a = SubA::new(regular.clone(), priority.clone());
let sub_b = SubB::new(regular.len() + priority.len(), tx);
let pool = ThreadPool::new().unwrap();
let (orchestra, _handle) = Orchestra::builder()
.sub_a(sub_a)
.sub_b(sub_b)
.spawner(DummySpawner(pool))
.build()
.unwrap();

futures::executor::block_on(async move {
for run_subsystem in orchestra.running_subsystems {
run_subsystem.await.unwrap();
}
});

assert_eq!(
drahnr marked this conversation as resolved.
Show resolved Hide resolved
priority.into_iter().chain(regular.into_iter()).collect::<Vec<u8>>(),
rx.try_recv().unwrap().unwrap()
);
}
Loading