Skip to content

Commit

Permalink
Remove ChannelsOutPriority from runtime
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreiEres committed Jun 5, 2024
1 parent c172ccc commit 51134cc
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 39 deletions.
83 changes: 52 additions & 31 deletions orchestra/proc-macro/src/impl_channels_out.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,35 +75,57 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result<proc_macr

/// Priority of messages sending to the individual subsystems.
/// Only for the bounded channel sender.
#[derive(Debug)]
pub enum ChannelsOutPriority {
pub enum PriorityLevel {
Normal,
High,
}

trait Priority {
fn priority() -> PriorityLevel {
PriorityLevel::Normal
}
}

struct NormalPriority;
struct HighPriority;

impl Priority for NormalPriority {
fn priority() -> PriorityLevel {
PriorityLevel::Normal
}
}

impl Priority for HighPriority {
fn priority() -> PriorityLevel {
PriorityLevel::High
}
}

#[allow(unreachable_code)]
// when no defined messages in enum
impl ChannelsOut {
/// Send a message via a bounded channel.
pub async fn send_and_log_error(
pub async fn send_and_log_error<P: Priority>(
&mut self,
signals_received: usize,
message: #message_wrapper,
priority: ChannelsOutPriority,
message: #message_wrapper
) {
let res: ::std::result::Result<_, _> = match message {
#(
#feature_gates
#message_wrapper :: #consumes_variant ( inner ) => {
if matches!(priority, ChannelsOutPriority::High) {
self. #channel_name .priority_send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).await.map_err(|_| stringify!( #channel_name ))
} else {
self. #channel_name .send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).await.map_err(|_| stringify!( #channel_name ))
}
match P::priority() {
PriorityLevel::Normal => {
self. #channel_name .send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).await
},
PriorityLevel::High => {
self. #channel_name .priority_send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).await
},
}.map_err(|_| stringify!( #channel_name ))
}
)*
// subsystems that are wip
Expand Down Expand Up @@ -131,31 +153,30 @@ pub(crate) fn impl_channels_out_struct(info: &OrchestraInfo) -> Result<proc_macr
}

/// Try to send a message via a bounded channel.
pub fn try_send(
pub fn try_send<P: Priority>(
&mut self,
signals_received: usize,
message: #message_wrapper,
priority: ChannelsOutPriority,
) -> ::std::result::Result<(), #support_crate ::metered::TrySendError<#message_wrapper>> {
let res: ::std::result::Result<_, _> = match message {
#(
#feature_gates
#message_wrapper :: #consumes_variant ( inner ) => {
if matches!(priority, ChannelsOutPriority::High) {
self. #channel_name .try_priority_send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).map_err(|err| match err {
#support_crate ::metered::TrySendError::Full(err_inner) => #support_crate ::metered::TrySendError::Full(#message_wrapper:: #consumes_variant ( #maybe_unbox_error )),
#support_crate ::metered::TrySendError::Closed(err_inner) => #support_crate ::metered::TrySendError::Closed(#message_wrapper:: #consumes_variant ( #maybe_unbox_error )),
})
} else {
self. #channel_name .try_send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
).map_err(|err| match err {
#support_crate ::metered::TrySendError::Full(err_inner) => #support_crate ::metered::TrySendError::Full(#message_wrapper:: #consumes_variant ( #maybe_unbox_error )),
#support_crate ::metered::TrySendError::Closed(err_inner) => #support_crate ::metered::TrySendError::Closed(#message_wrapper:: #consumes_variant ( #maybe_unbox_error )),
})
}
match P::priority() {
PriorityLevel::Normal => {
self. #channel_name .try_send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
)
},
PriorityLevel::High => {
self. #channel_name .try_priority_send(
#support_crate ::make_packet(signals_received, #maybe_boxed_send)
)
},
}.map_err(|err| match err {
#support_crate ::metered::TrySendError::Full(err_inner) => #support_crate ::metered::TrySendError::Full(#message_wrapper:: #consumes_variant ( #maybe_unbox_error )),
#support_crate ::metered::TrySendError::Closed(err_inner) => #support_crate ::metered::TrySendError::Closed(#message_wrapper:: #consumes_variant ( #maybe_unbox_error )),
})
}
)*
// subsystems that are wip
Expand Down
12 changes: 4 additions & 8 deletions orchestra/proc-macro/src/impl_subsystem_ctx_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,34 +278,31 @@ pub(crate) fn impl_subsystem_sender(
{
async fn send_message(&mut self, msg: OutgoingMessage)
{
self.channels.send_and_log_error(
self.channels.send_and_log_error::<NormalPriority>(
self.signals_received.load(),
<#all_messages_wrapper as ::std::convert::From<_>> ::from (
<#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg )
),
ChannelsOutPriority::Normal,
).await;
}

async fn priority_send_message(&mut self, msg: OutgoingMessage)
{
self.channels.send_and_log_error(
self.channels.send_and_log_error::<HighPriority>(
self.signals_received.load(),
<#all_messages_wrapper as ::std::convert::From<_>> ::from (
<#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg )
),
ChannelsOutPriority::High,
).await;
}

fn try_send_message(&mut self, msg: OutgoingMessage) -> ::std::result::Result<(), #support_crate ::metered::TrySendError<OutgoingMessage>>
{
self.channels.try_send(
self.channels.try_send::<NormalPriority>(
self.signals_received.load(),
<#all_messages_wrapper as ::std::convert::From<_>> ::from (
<#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg )
),
ChannelsOutPriority::Normal,
).map_err(|err| match err {
#support_crate ::metered::TrySendError::Full(inner) => #support_crate ::metered::TrySendError::Full(inner.try_into().expect("we should be able to unwrap what we wrap, qed")),
#support_crate ::metered::TrySendError::Closed(inner) => #support_crate ::metered::TrySendError::Closed(inner.try_into().expect("we should be able to unwrap what we wrap, qed")),
Expand All @@ -314,12 +311,11 @@ pub(crate) fn impl_subsystem_sender(

fn try_priority_send_message(&mut self, msg: OutgoingMessage) -> ::std::result::Result<(), #support_crate ::metered::TrySendError<OutgoingMessage>>
{
self.channels.try_send(
self.channels.try_send::<HighPriority>(
self.signals_received.load(),
<#all_messages_wrapper as ::std::convert::From<_>> ::from (
<#outgoing_wrapper as ::std::convert::From<_>> :: from ( msg )
),
ChannelsOutPriority::High,
).map_err(|err| match err {
#support_crate ::metered::TrySendError::Full(inner) => #support_crate ::metered::TrySendError::Full(inner.try_into().expect("we should be able to unwrap what we wrap, qed")),
#support_crate ::metered::TrySendError::Closed(inner) => #support_crate ::metered::TrySendError::Closed(inner.try_into().expect("we should be able to unwrap what we wrap, qed")),
Expand Down

0 comments on commit 51134cc

Please sign in to comment.