diff --git a/Cargo.lock b/Cargo.lock index e60196f995394..2e5a50ef2e1d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5376,12 +5376,14 @@ dependencies = [ [[package]] name = "expander" -version = "2.0.0" +version = "2.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f86a749cf851891866c10515ef6c299b5c69661465e9c3bbe7e07a2b77fb0f7" +checksum = "e2c470c71d91ecbd179935b24170459e926382eaaa86b590b78814e180d8a8e2" dependencies = [ "blake2 0.10.6", + "file-guard", "fs-err", + "prettyplease 0.2.12", "proc-macro2 1.0.82", "quote 1.0.35", "syn 2.0.61", @@ -5514,6 +5516,16 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "27573eac26f4dd11e2b1916c3fe1baa56407c83c71a773a8ba17ec0bca03b6b7" +[[package]] +name = "file-guard" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21ef72acf95ec3d7dbf61275be556299490a245f017cf084bd23b4f68cf9407c" +dependencies = [ + "libc", + "winapi", +] + [[package]] name = "file-per-thread-logger" version = "0.1.6" @@ -9554,9 +9566,9 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" [[package]] name = "orchestra" -version = "0.3.6" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92829eef0328a3d1cd22a02c0e51deb92a5362df3e7d21a4e9bdc38934694e66" +checksum = "41f6bbacc8c189a3f2e45e0fd0436e5d97f194db888e721bdbc3973e7dbed4c2" dependencies = [ "async-trait", "dyn-clonable", @@ -9571,9 +9583,9 @@ dependencies = [ [[package]] name = "orchestra-proc-macro" -version = "0.3.6" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1344346d5af32c95bbddea91b18a88cc83eac394192d20ef2fc4c40a74332355" +checksum = "f7b1d40dd8f367db3c65bec8d3dd47d4a604ee8874480738f93191bddab4e0e0" dependencies = [ "expander", "indexmap 2.2.3", diff --git a/Cargo.toml b/Cargo.toml index 5c2677fffeb22..0999d63040130 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -855,7 +855,7 @@ num-rational = { version = "0.4.1" } num-traits = { version = "0.2.17", default-features = false } num_cpus = { version = "1.13.1" } once_cell = { version = "1.19.0" } -orchestra = { version = "0.3.5", default-features = false } +orchestra = { version = "0.4.0", default-features = false } pallet-alliance = { path = "substrate/frame/alliance", default-features = false } pallet-asset-conversion = { path = "substrate/frame/asset-conversion", default-features = false } pallet-asset-conversion-ops = { path = "substrate/frame/asset-conversion/ops", default-features = false } diff --git a/polkadot/node/malus/src/interceptor.rs b/polkadot/node/malus/src/interceptor.rs index b44ffc8956b52..2181118646d56 100644 --- a/polkadot/node/malus/src/interceptor.rs +++ b/polkadot/node/malus/src/interceptor.rs @@ -90,6 +90,10 @@ where >::Error: std::fmt::Debug, { async fn send_message(&mut self, msg: OutgoingMessage) { + self.send_message_with_priority::(msg).await; + } + + async fn send_message_with_priority(&mut self, msg: OutgoingMessage) { let msg = < <>::Message as overseer::AssociateOutgoing >::OutgoingMessages as From>::from(msg); @@ -103,7 +107,14 @@ where } } - fn try_send_message(&mut self, msg: OutgoingMessage) -> Result<(), TrySendError> { + fn try_send_message( + &mut self, + msg: OutgoingMessage, + ) -> Result<(), polkadot_node_subsystem_util::metered::TrySendError> { + self.try_send_message_with_priority::(msg) + } + + fn try_send_message_with_priority(&mut self, msg: OutgoingMessage) -> Result<(), TrySendError> { let msg = < <>::Message as overseer::AssociateOutgoing >::OutgoingMessages as From>::from(msg); diff --git a/polkadot/node/network/availability-distribution/src/tests/state.rs b/polkadot/node/network/availability-distribution/src/tests/state.rs index befbff0a2f27e..97e616f79fb75 100644 --- a/polkadot/node/network/availability-distribution/src/tests/state.rs +++ b/polkadot/node/network/availability-distribution/src/tests/state.rs @@ -216,7 +216,7 @@ impl TestState { // Test will fail if this does not happen until timeout. let mut remaining_stores = self.valid_chunks.len(); - let TestSubsystemContextHandle { tx, mut rx } = harness.virtual_overseer; + let TestSubsystemContextHandle { tx, mut rx, .. } = harness.virtual_overseer; // Spawning necessary as incoming queue can only hold a single item, we don't want to dead // lock ;-) diff --git a/polkadot/node/network/bridge/src/rx/mod.rs b/polkadot/node/network/bridge/src/rx/mod.rs index 84e935366d0cb..56965ce6ba404 100644 --- a/polkadot/node/network/bridge/src/rx/mod.rs +++ b/polkadot/node/network/bridge/src/rx/mod.rs @@ -1135,13 +1135,33 @@ async fn dispatch_validation_events_to_all( I: IntoIterator>, I::IntoIter: Send, { + macro_rules! send_message { + ($event:expr, $message:ident) => { + if let Ok(event) = $event.focus() { + let has_high_priority = matches!( + event, + // NetworkBridgeEvent::OurViewChange(..) must also be here, + // but it is sent via an unbounded channel. + // See https://github.com/paritytech/polkadot-sdk/issues/824 + NetworkBridgeEvent::PeerConnected(..) | + NetworkBridgeEvent::PeerDisconnected(..) | + NetworkBridgeEvent::PeerViewChange(..) + ); + let message = $message::from(event); + if has_high_priority { + sender.send_message_with_priority::(message).await; + } else { + sender.send_message(message).await; + } + } + }; + } + for event in events { - sender - .send_messages(event.focus().map(StatementDistributionMessage::from)) - .await; - sender.send_messages(event.focus().map(BitfieldDistributionMessage::from)).await; - sender.send_messages(event.focus().map(ApprovalDistributionMessage::from)).await; - sender.send_messages(event.focus().map(GossipSupportMessage::from)).await; + send_message!(event, StatementDistributionMessage); + send_message!(event, BitfieldDistributionMessage); + send_message!(event, ApprovalDistributionMessage); + send_message!(event, GossipSupportMessage); } } diff --git a/polkadot/node/network/bridge/src/rx/tests.rs b/polkadot/node/network/bridge/src/rx/tests.rs index 6182bf3d883b5..392ff7391a1c1 100644 --- a/polkadot/node/network/bridge/src/rx/tests.rs +++ b/polkadot/node/network/bridge/src/rx/tests.rs @@ -880,6 +880,8 @@ fn peer_view_updates_sent_via_overseer() { &mut virtual_overseer, ) .await; + + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8); } network_handle @@ -895,6 +897,7 @@ fn peer_view_updates_sent_via_overseer() { &mut virtual_overseer, ) .await; + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 12); virtual_overseer }); } @@ -930,6 +933,8 @@ fn peer_messages_sent_via_overseer() { &mut virtual_overseer, ) .await; + + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8); } let approval_distribution_message = @@ -970,6 +975,7 @@ fn peer_messages_sent_via_overseer() { &mut virtual_overseer, ) .await; + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 12); virtual_overseer }); } @@ -1008,6 +1014,8 @@ fn peer_disconnect_from_just_one_peerset() { &mut virtual_overseer, ) .await; + + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8); } { @@ -1036,6 +1044,7 @@ fn peer_disconnect_from_just_one_peerset() { &mut virtual_overseer, ) .await; + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 12); // to show that we're still connected on the collation protocol, send a view update. @@ -1094,6 +1103,8 @@ fn relays_collation_protocol_messages() { &mut virtual_overseer, ) .await; + + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8); } { @@ -1201,6 +1212,8 @@ fn different_views_on_different_peer_sets() { &mut virtual_overseer, ) .await; + + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8); } { @@ -1247,6 +1260,8 @@ fn different_views_on_different_peer_sets() { ) .await; + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 12); + assert_sends_collation_event_to_all( NetworkBridgeEvent::PeerViewChange(peer, view_b.clone()), &mut virtual_overseer, @@ -1481,6 +1496,8 @@ fn network_protocol_versioning_subsystem_msg() { &mut virtual_overseer, ) .await; + + assert_eq!(virtual_overseer.message_counter.with_high_priority(), 8); } let approval_distribution_message = diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index 24985a99913d8..4e13d5eda76f6 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -105,10 +105,11 @@ pub use polkadot_node_metrics::{ pub use orchestra as gen; pub use orchestra::{ - contextbounds, orchestra, subsystem, FromOrchestra, MapSubsystem, MessagePacket, - OrchestraError as OverseerError, SignalsReceived, Spawner, Subsystem, SubsystemContext, - SubsystemIncomingMessages, SubsystemInstance, SubsystemMeterReadouts, SubsystemMeters, - SubsystemSender, TimeoutExt, ToOrchestra, TrySendError, + contextbounds, orchestra, subsystem, FromOrchestra, HighPriority, MapSubsystem, MessagePacket, + NormalPriority, OrchestraError as OverseerError, Priority, PriorityLevel, SignalsReceived, + Spawner, Subsystem, SubsystemContext, SubsystemIncomingMessages, SubsystemInstance, + SubsystemMeterReadouts, SubsystemMeters, SubsystemSender, TimeoutExt, ToOrchestra, + TrySendError, }; #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] @@ -495,7 +496,7 @@ pub struct Overseer { RuntimeApiMessage, ProspectiveParachainsMessage, ChainApiMessage, - ])] + ], can_receive_priority_messages)] statement_distribution: StatementDistribution, #[subsystem(AvailabilityDistributionMessage, sends: [ @@ -524,7 +525,7 @@ pub struct Overseer { RuntimeApiMessage, NetworkBridgeTxMessage, ProvisionerMessage, - ])] + ], can_receive_priority_messages)] bitfield_distribution: BitfieldDistribution, #[subsystem(ProvisionerMessage, sends: [ @@ -580,7 +581,7 @@ pub struct Overseer { #[subsystem(blocking, message_capacity: 64000, ApprovalDistributionMessage, sends: [ NetworkBridgeTxMessage, ApprovalVotingMessage, - ])] + ], can_receive_priority_messages)] approval_distribution: ApprovalDistribution, #[subsystem(blocking, ApprovalVotingMessage, sends: [ @@ -599,7 +600,7 @@ pub struct Overseer { NetworkBridgeRxMessage, // TODO RuntimeApiMessage, ChainSelectionMessage, - ])] + ], can_receive_priority_messages)] gossip_support: GossipSupport, #[subsystem(blocking, message_capacity: 32000, DisputeCoordinatorMessage, sends: [ diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index 375121c374637..bdb0647fee6f5 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -36,7 +36,7 @@ use std::{ convert::Infallible, future::Future, pin::Pin, - sync::Arc, + sync::{atomic::AtomicUsize, Arc}, task::{Context, Poll, Waker}, time::Duration, }; @@ -146,12 +146,13 @@ pub fn single_item_sink() -> (SingleItemSink, SingleItemStream) { #[derive(Clone)] pub struct TestSubsystemSender { tx: mpsc::UnboundedSender, + message_counter: MessageCounter, } /// Construct a sender/receiver pair. pub fn sender_receiver() -> (TestSubsystemSender, mpsc::UnboundedReceiver) { let (tx, rx) = mpsc::unbounded(); - (TestSubsystemSender { tx }, rx) + (TestSubsystemSender { tx, message_counter: MessageCounter::default() }, rx) } #[async_trait::async_trait] @@ -161,6 +162,11 @@ where OutgoingMessage: Send + 'static, { async fn send_message(&mut self, msg: OutgoingMessage) { + self.send_message_with_priority::(msg).await; + } + + async fn send_message_with_priority(&mut self, msg: OutgoingMessage) { + self.message_counter.increment(P::priority()); self.tx.send(msg.into()).await.expect("test overseer no longer live"); } @@ -168,6 +174,14 @@ where &mut self, msg: OutgoingMessage, ) -> Result<(), TrySendError> { + self.try_send_message_with_priority::(msg) + } + + fn try_send_message_with_priority( + &mut self, + msg: OutgoingMessage, + ) -> Result<(), TrySendError> { + self.message_counter.increment(P::priority()); self.tx.unbounded_send(msg.into()).expect("test overseer no longer live"); Ok(()) } @@ -277,6 +291,9 @@ pub struct TestSubsystemContextHandle { /// Direct access to the receiver. pub rx: mpsc::UnboundedReceiver, + + /// Message counter over subsystems. + pub message_counter: MessageCounter, } impl TestSubsystemContextHandle { @@ -322,6 +339,34 @@ pub fn make_subsystem_context( make_buffered_subsystem_context(spawner, 0) } +/// Message counter over subsystems. +#[derive(Default, Clone)] +pub struct MessageCounter { + total: Arc, + with_high_priority: Arc, +} + +impl MessageCounter { + /// Increment the message counter. + pub fn increment(&mut self, priority_level: overseer::PriorityLevel) { + self.total.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + if matches!(priority_level, overseer::PriorityLevel::High) { + self.with_high_priority.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + } + + /// Reset the message counter. + pub fn reset(&mut self) { + self.total.store(0, std::sync::atomic::Ordering::SeqCst); + self.with_high_priority.store(0, std::sync::atomic::Ordering::SeqCst); + } + + /// Get the messages with high priority count. + pub fn with_high_priority(&self) -> usize { + self.with_high_priority.load(std::sync::atomic::Ordering::SeqCst) + } +} + /// Make a test subsystem context with buffered overseer channel. Some tests (e.g. /// `dispute-coordinator`) create too many parallel operations and deadlock unless /// the channel is buffered. Usually `buffer_size=1` is enough. @@ -331,15 +376,23 @@ pub fn make_buffered_subsystem_context( ) -> (TestSubsystemContext>, TestSubsystemContextHandle) { let (overseer_tx, overseer_rx) = mpsc::channel(buffer_size); let (all_messages_tx, all_messages_rx) = mpsc::unbounded(); + let message_counter = MessageCounter::default(); ( TestSubsystemContext { - tx: TestSubsystemSender { tx: all_messages_tx }, + tx: TestSubsystemSender { + tx: all_messages_tx, + message_counter: message_counter.clone(), + }, rx: overseer_rx, spawn: SpawnGlue(spawner), message_buffer: VecDeque::new(), }, - TestSubsystemContextHandle { tx: overseer_tx, rx: all_messages_rx }, + TestSubsystemContextHandle { + tx: overseer_tx, + rx: all_messages_rx, + message_counter: message_counter.clone(), + }, ) } diff --git a/prdoc/pr_4755.prdoc b/prdoc/pr_4755.prdoc new file mode 100644 index 0000000000000..1018446cb67e7 --- /dev/null +++ b/prdoc/pr_4755.prdoc @@ -0,0 +1,24 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Send PeerViewChange with high priority + +doc: + - audience: Node Dev + description: | + - orchestra updated to 0.4.0, which introduces support for prioritizing system messages. + - PeerViewChange sent with high priority and should be processed first in a queue. + - To count them in tests added tracker to TestSender and TestOverseer. It acts more like a smoke test though. + + +crates: + - name: polkadot-overseer + bump: minor + - name: polkadot-network-bridge + bump: patch + - name: polkadot-availability-distribution + bump: patch + - name: polkadot-test-malus + bump: patch + - name: polkadot-node-subsystem-test-helpers + bump: patch