diff --git a/node/overseer/examples/minimal-example.rs b/node/overseer/examples/minimal-example.rs index cdef0340d04f..9e904c93dcfd 100644 --- a/node/overseer/examples/minimal-example.rs +++ b/node/overseer/examples/minimal-example.rs @@ -28,11 +28,14 @@ use futures_timer::Delay; use kv_log_macro as log; use polkadot_primitives::parachain::{BlockData, PoVBlock}; -use polkadot_overseer::Overseer; +use polkadot_overseer::{Overseer, AllSubsystems}; -use polkadot_subsystem::{Subsystem, SubsystemContext, SpawnedSubsystem, FromOverseer}; +use polkadot_subsystem::{ + Subsystem, SubsystemContext, DummySubsystem, + SpawnedSubsystem, FromOverseer, +}; use polkadot_subsystem::messages::{ - AllMessages, CandidateBackingMessage, CandidateValidationMessage + CandidateValidationMessage, CandidateBackingMessage, AllMessages, }; struct Subsystem1; @@ -127,10 +130,22 @@ fn main() { Delay::new(Duration::from_secs(1)).await; }); + let all_subsystems = AllSubsystems { + candidate_validation: Subsystem2, + candidate_backing: Subsystem1, + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + pov_distribution: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; let (overseer, _handler) = Overseer::new( vec![], - Subsystem2, - Subsystem1, + all_subsystems, spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 70de3c1d86f8..14d860d26784 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -76,7 +76,11 @@ use polkadot_primitives::{Block, BlockNumber, Hash}; use client::{BlockImportNotification, BlockchainEvents, FinalityNotification}; use polkadot_subsystem::messages::{ - CandidateValidationMessage, CandidateBackingMessage, AllMessages + CandidateValidationMessage, CandidateBackingMessage, + CandidateSelectionMessage, StatementDistributionMessage, + AvailabilityDistributionMessage, BitfieldDistributionMessage, + ProvisionerMessage, PoVDistributionMessage, RuntimeApiMessage, + AvailabilityStoreMessage, NetworkBridgeMessage, AllMessages, }; pub use polkadot_subsystem::{ Subsystem, SubsystemContext, OverseerSignal, FromOverseer, SubsystemError, SubsystemResult, @@ -319,12 +323,40 @@ struct OverseenSubsystem { /// The `Overseer` itself. pub struct Overseer { - /// A validation subsystem - validation_subsystem: OverseenSubsystem, + /// A candidate validation subsystem. + candidate_validation_subsystem: OverseenSubsystem, - /// A candidate backing subsystem + /// A candidate backing subsystem. candidate_backing_subsystem: OverseenSubsystem, + /// A candidate selection subsystem. + candidate_selection_subsystem: OverseenSubsystem, + + /// A statement distribution subsystem. + statement_distribution_subsystem: OverseenSubsystem, + + /// An availability distribution subsystem. + availability_distribution_subsystem: OverseenSubsystem, + + /// A bitfield distribution subsystem. + bitfield_distribution_subsystem: OverseenSubsystem, + + /// A provisioner subsystem. + provisioner_subsystem: OverseenSubsystem, + + /// A PoV distribution subsystem. + pov_distribution_subsystem: OverseenSubsystem, + + /// A runtime API subsystem. + runtime_api_subsystem: OverseenSubsystem, + + /// An availability store subsystem. + availability_store_subsystem: OverseenSubsystem, + + /// A network bridge subsystem. + network_bridge_subsystem: OverseenSubsystem, + + /// Spawner to spawn tasks to. s: S, @@ -346,22 +378,48 @@ pub struct Overseer { active_leaves: HashSet<(Hash, BlockNumber)>, } +/// This struct is passed as an argument to create a new instance of an [`Overseer`]. +/// +/// As any entity that satisfies the interface may act as a [`Subsystem`] this allows +/// mocking in the test code: +/// +/// Each [`Subsystem`] is supposed to implement some interface that is generic over +/// message type that is specific to this [`Subsystem`]. At the moment not all +/// subsystems are implemented and the rest can be mocked with the [`DummySubsystem`]. +/// +/// [`Subsystem`]: trait.Subsystem.html +/// [`DummySubsystem`]: struct.DummySubsystem.html +pub struct AllSubsystems { + /// A candidate validation subsystem. + pub candidate_validation: CV, + /// A candidate backing subsystem. + pub candidate_backing: CB, + /// A candidate selection subsystem. + pub candidate_selection: CS, + /// A statement distribution subsystem. + pub statement_distribution: SD, + /// An availability distribution subsystem. + pub availability_distribution: AD, + /// A bitfield distribution subsystem. + pub bitfield_distribution: BD, + /// A provisioner subsystem. + pub provisioner: P, + /// A PoV distribution subsystem. + pub pov_distribution: PoVD, + /// A runtime API subsystem. + pub runtime_api: RA, + /// An availability store subsystem. + pub availability_store: AS, + /// A network bridge subsystem. + pub network_bridge: NB, +} + impl Overseer where S: Spawn, { /// Create a new intance of the `Overseer` with a fixed set of [`Subsystem`]s. /// - /// Each [`Subsystem`] is passed to this function as an explicit parameter - /// and is supposed to implement some interface that is generic over message type - /// that is specific to this [`Subsystem`]. At the moment there are only two - /// subsystems: - /// * Validation - /// * CandidateBacking - /// - /// As any entity that satisfies the interface may act as a [`Subsystem`] this allows - /// mocking in the test code: - /// /// ```text /// +------------------------------------+ /// | Overseer | @@ -388,16 +446,16 @@ where /// # Example /// /// The [`Subsystems`] may be any type as long as they implement an expected interface. - /// Here, we create two mock subsystems and start the `Overseer` with them. For the sake - /// of simplicity the termination of the example is done with a timeout. + /// Here, we create a mock validation subsystem and a few dummy ones and start the `Overseer` with them. + /// For the sake of simplicity the termination of the example is done with a timeout. /// ``` /// # use std::time::Duration; /// # use futures::{executor, pin_mut, select, FutureExt}; /// # use futures_timer::Delay; - /// # use polkadot_overseer::Overseer; + /// # use polkadot_overseer::{Overseer, AllSubsystems}; /// # use polkadot_subsystem::{ - /// # Subsystem, SpawnedSubsystem, SubsystemContext, - /// # messages::{CandidateValidationMessage, CandidateBackingMessage}, + /// # Subsystem, DummySubsystem, SpawnedSubsystem, SubsystemContext, + /// # messages::CandidateValidationMessage, /// # }; /// /// struct ValidationSubsystem; @@ -417,28 +475,24 @@ where /// } /// } /// - /// struct CandidateBackingSubsystem; - /// impl Subsystem for CandidateBackingSubsystem - /// where C: SubsystemContext - /// { - /// fn start( - /// self, - /// mut ctx: C, - /// ) -> SpawnedSubsystem { - /// SpawnedSubsystem(Box::pin(async move { - /// loop { - /// Delay::new(Duration::from_secs(1)).await; - /// } - /// })) - /// } - /// } - /// /// # fn main() { executor::block_on(async move { /// let spawner = executor::ThreadPool::new().unwrap(); + /// let all_subsystems = AllSubsystems { + /// candidate_validation: ValidationSubsystem, + /// candidate_backing: DummySubsystem, + /// candidate_selection: DummySubsystem, + /// statement_distribution: DummySubsystem, + /// availability_distribution: DummySubsystem, + /// bitfield_distribution: DummySubsystem, + /// provisioner: DummySubsystem, + /// pov_distribution: DummySubsystem, + /// runtime_api: DummySubsystem, + /// availability_store: DummySubsystem, + /// network_bridge: DummySubsystem, + /// }; /// let (overseer, _handler) = Overseer::new( /// vec![], - /// ValidationSubsystem, - /// CandidateBackingSubsystem, + /// all_subsystems, /// spawner, /// ).unwrap(); /// @@ -455,12 +509,24 @@ where /// # /// # }); } /// ``` - pub fn new( + pub fn new( leaves: impl IntoIterator, - validation: impl Subsystem> + Send, - candidate_backing: impl Subsystem> + Send, + all_subsystems: AllSubsystems, mut s: S, - ) -> SubsystemResult<(Self, OverseerHandler)> { + ) -> SubsystemResult<(Self, OverseerHandler)> + where + CV: Subsystem> + Send, + CB: Subsystem> + Send, + CS: Subsystem> + Send, + SD: Subsystem> + Send, + AD: Subsystem> + Send, + BD: Subsystem> + Send, + P: Subsystem> + Send, + PoVD: Subsystem> + Send, + RA: Subsystem> + Send, + AS: Subsystem> + Send, + NB: Subsystem> + Send, + { let (events_tx, events_rx) = mpsc::channel(CHANNEL_CAPACITY); let handler = OverseerHandler { @@ -470,18 +536,81 @@ where let mut running_subsystems_rx = StreamUnordered::new(); let mut running_subsystems = FuturesUnordered::new(); - let validation_subsystem = spawn( + let candidate_validation_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, - validation, + all_subsystems.candidate_validation, )?; let candidate_backing_subsystem = spawn( &mut s, &mut running_subsystems, &mut running_subsystems_rx, - candidate_backing, + all_subsystems.candidate_backing, + )?; + + let candidate_selection_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.candidate_selection, + )?; + + let statement_distribution_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.statement_distribution, + )?; + + let availability_distribution_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.availability_distribution, + )?; + + let bitfield_distribution_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.bitfield_distribution, + )?; + + let provisioner_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.provisioner, + )?; + + let pov_distribution_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.pov_distribution, + )?; + + let runtime_api_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.runtime_api, + )?; + + let availability_store_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.availability_store, + )?; + + let network_bridge_subsystem = spawn( + &mut s, + &mut running_subsystems, + &mut running_subsystems_rx, + all_subsystems.network_bridge, )?; let active_leaves = HashSet::new(); @@ -492,8 +621,17 @@ where .collect(); let this = Self { - validation_subsystem, + candidate_validation_subsystem, candidate_backing_subsystem, + candidate_selection_subsystem, + statement_distribution_subsystem, + availability_distribution_subsystem, + bitfield_distribution_subsystem, + provisioner_subsystem, + pov_distribution_subsystem, + runtime_api_subsystem, + availability_store_subsystem, + network_bridge_subsystem, s, running_subsystems, running_subsystems_rx, @@ -507,7 +645,7 @@ where // Stop the overseer. async fn stop(mut self) { - if let Some(ref mut s) = self.validation_subsystem.instance { + if let Some(ref mut s) = self.candidate_validation_subsystem.instance { let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } @@ -515,6 +653,42 @@ where let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; } + if let Some(ref mut s) = self.candidate_selection_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.statement_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.availability_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.provisioner_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.pov_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.runtime_api_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.availability_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + if let Some(ref mut s) = self.network_bridge_subsystem.instance { + let _ = s.tx.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + let mut stop_delay = Delay::new(Duration::from_secs(STOP_DELAY)).fuse(); loop { @@ -616,11 +790,47 @@ where } async fn broadcast_signal(&mut self, signal: OverseerSignal) -> SubsystemResult<()> { - if let Some(ref mut s) = self.validation_subsystem.instance { + if let Some(ref mut s) = self.candidate_validation_subsystem.instance { s.tx.send(FromOverseer::Signal(signal.clone())).await?; } if let Some(ref mut s) = self.candidate_backing_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.candidate_selection_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.statement_distribution_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.availability_distribution_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.provisioner_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.pov_distribution_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.runtime_api_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.availability_store_subsystem.instance { + s.tx.send(FromOverseer::Signal(signal.clone())).await?; + } + + if let Some(ref mut s) = self.network_bridge_subsystem.instance { s.tx.send(FromOverseer::Signal(signal)).await?; } @@ -630,7 +840,7 @@ where async fn route_message(&mut self, msg: AllMessages) { match msg { AllMessages::CandidateValidation(msg) => { - if let Some(ref mut s) = self.validation_subsystem.instance { + if let Some(ref mut s) = self.candidate_validation_subsystem.instance { let _= s.tx.send(FromOverseer::Communication { msg }).await; } } @@ -639,11 +849,50 @@ where let _ = s.tx.send(FromOverseer::Communication { msg }).await; } } - _ => { - // TODO: temporary catch-all until all subsystems are integrated with overseer. - // The overseer is not complete until this is an exhaustive match with all - // messages targeting an included subsystem. - // https://github.com/paritytech/polkadot/issues/1317 + AllMessages::CandidateSelection(msg) => { + if let Some(ref mut s) = self.candidate_selection_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::StatementDistribution(msg) => { + if let Some(ref mut s) = self.statement_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::AvailabilityDistribution(msg) => { + if let Some(ref mut s) = self.availability_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::BitfieldDistribution(msg) => { + if let Some(ref mut s) = self.bitfield_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::Provisioner(msg) => { + if let Some(ref mut s) = self.provisioner_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::PoVDistribution(msg) => { + if let Some(ref mut s) = self.pov_distribution_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::RuntimeApi(msg) => { + if let Some(ref mut s) = self.runtime_api_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::AvailabilityStore(msg) => { + if let Some(ref mut s) = self.availability_store_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } + } + AllMessages::NetworkBridge(msg) => { + if let Some(ref mut s) = self.network_bridge_subsystem.instance { + let _ = s.tx.send(FromOverseer::Communication { msg }).await; + } } } } @@ -678,13 +927,16 @@ fn spawn( }) } + #[cfg(test)] mod tests { use futures::{executor, pin_mut, select, channel::mpsc, FutureExt}; use polkadot_primitives::parachain::{BlockData, PoVBlock}; + use polkadot_subsystem::DummySubsystem; use super::*; + struct TestSubsystem1(mpsc::Sender); impl Subsystem for TestSubsystem1 @@ -775,10 +1027,22 @@ mod tests { let (s1_tx, mut s1_rx) = mpsc::channel(64); let (s2_tx, mut s2_rx) = mpsc::channel(64); + let all_subsystems = AllSubsystems { + candidate_validation: TestSubsystem1(s1_tx), + candidate_backing: TestSubsystem2(s2_tx), + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + pov_distribution: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; let (overseer, mut handler) = Overseer::new( vec![], - TestSubsystem1(s1_tx), - TestSubsystem2(s2_tx), + all_subsystems, spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); @@ -825,10 +1089,22 @@ mod tests { executor::block_on(async move { let (s1_tx, _) = mpsc::channel(64); + let all_subsystems = AllSubsystems { + candidate_validation: TestSubsystem1(s1_tx), + candidate_backing: TestSubsystem4, + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + pov_distribution: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; let (overseer, _handle) = Overseer::new( vec![], - TestSubsystem1(s1_tx), - TestSubsystem4, + all_subsystems, spawner, ).unwrap(); let overseer_fut = overseer.run().fuse(); @@ -922,11 +1198,22 @@ mod tests { let (tx_5, mut rx_5) = mpsc::channel(64); let (tx_6, mut rx_6) = mpsc::channel(64); - + let all_subsystems = AllSubsystems { + candidate_validation: TestSubsystem5(tx_5), + candidate_backing: TestSubsystem6(tx_6), + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + pov_distribution: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; let (overseer, mut handler) = Overseer::new( vec![first_block], - TestSubsystem5(tx_5), - TestSubsystem6(tx_6), + all_subsystems, spawner, ).unwrap(); @@ -1007,11 +1294,23 @@ mod tests { let (tx_5, mut rx_5) = mpsc::channel(64); let (tx_6, mut rx_6) = mpsc::channel(64); + let all_subsystems = AllSubsystems { + candidate_validation: TestSubsystem5(tx_5), + candidate_backing: TestSubsystem6(tx_6), + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + pov_distribution: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; // start with two forks of different height. let (overseer, mut handler) = Overseer::new( vec![first_block, second_block], - TestSubsystem5(tx_5), - TestSubsystem6(tx_6), + all_subsystems, spawner, ).unwrap(); diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 23a6dbfd4192..0f4c2a408008 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -29,11 +29,8 @@ use grandpa::{self, FinalityProofProvider as GrandpaFinalityProofProvider}; use sc_executor::native_executor_instance; use log::info; use sp_blockchain::HeaderBackend; -use polkadot_overseer::{self as overseer, BlockInfo, Overseer, OverseerHandler}; -use polkadot_subsystem::{ - Subsystem, SubsystemContext, SpawnedSubsystem, - messages::{CandidateValidationMessage, CandidateBackingMessage}, -}; +use polkadot_overseer::{self as overseer, AllSubsystems, BlockInfo, Overseer, OverseerHandler}; +use polkadot_subsystem::DummySubsystem; use polkadot_node_core_proposer::ProposerFactory; use sp_trie::PrefixedMemoryDB; pub use service::{ @@ -273,38 +270,28 @@ macro_rules! new_full_start { }} } -struct CandidateValidationSubsystem; - -impl Subsystem for CandidateValidationSubsystem - where C: SubsystemContext -{ - fn start(self, mut ctx: C) -> SpawnedSubsystem { - SpawnedSubsystem(Box::pin(async move { - while let Ok(_) = ctx.recv().await {} - })) - } -} - -struct CandidateBackingSubsystem; - -impl Subsystem for CandidateBackingSubsystem - where C: SubsystemContext -{ - fn start(self, mut ctx: C) -> SpawnedSubsystem { - SpawnedSubsystem(Box::pin(async move { - while let Ok(_) = ctx.recv().await {} - })) - } -} - fn real_overseer( leaves: impl IntoIterator, s: S, ) -> Result<(Overseer, OverseerHandler), ServiceError> { - let validation = CandidateValidationSubsystem; - let candidate_backing = CandidateBackingSubsystem; - Overseer::new(leaves, validation, candidate_backing, s) - .map_err(|e| ServiceError::Other(format!("Failed to create an Overseer: {:?}", e))) + let all_subsystems = AllSubsystems { + candidate_validation: DummySubsystem, + candidate_backing: DummySubsystem, + candidate_selection: DummySubsystem, + statement_distribution: DummySubsystem, + availability_distribution: DummySubsystem, + bitfield_distribution: DummySubsystem, + provisioner: DummySubsystem, + pov_distribution: DummySubsystem, + runtime_api: DummySubsystem, + availability_store: DummySubsystem, + network_bridge: DummySubsystem, + }; + Overseer::new( + leaves, + all_subsystems, + s, + ).map_err(|e| ServiceError::Other(format!("Failed to create an Overseer: {:?}", e))) } /// Builds a new service for a full client. diff --git a/node/subsystem/src/lib.rs b/node/subsystem/src/lib.rs index fd32d7cfdbc9..db9a0629cfd4 100644 --- a/node/subsystem/src/lib.rs +++ b/node/subsystem/src/lib.rs @@ -148,3 +148,21 @@ pub trait Subsystem { /// Start this `Subsystem` and return `SpawnedSubsystem`. fn start(self, ctx: C) -> SpawnedSubsystem; } + +/// A dummy subsystem that implements [`Subsystem`] for all +/// types of messages. Used for tests or as a placeholder. +pub struct DummySubsystem; + +impl Subsystem for DummySubsystem { + fn start(self, mut ctx: C) -> SpawnedSubsystem { + SpawnedSubsystem(Box::pin(async move { + loop { + match ctx.recv().await { + Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return, + Err(_) => return, + _ => continue, + } + } + })) + } +} diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index 73371e28fd9b..3a7f248069be 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -224,12 +224,12 @@ pub enum PoVDistributionMessage { /// /// This `CandidateDescriptor` should correspond to a candidate seconded under the provided /// relay-parent hash. - FetchPoV(Hash, CandidateDescriptor, oneshot::Sender>), - /// Distribute a PoV for the given relay-parent and CandidateDescriptor. - /// The PoV should correctly hash to the PoV hash mentioned in the CandidateDescriptor - DistributePoV(Hash, CandidateDescriptor, Arc), - /// An update from the network bridge. - NetworkBridgeUpdate(NetworkBridgeEvent), + FetchPoV(Hash, CandidateDescriptor, oneshot::Sender>), + /// Distribute a PoV for the given relay-parent and CandidateDescriptor. + /// The PoV should correctly hash to the PoV hash mentioned in the CandidateDescriptor + DistributePoV(Hash, CandidateDescriptor, Arc), + /// An update from the network bridge. + NetworkBridgeUpdate(NetworkBridgeEvent), } /// A message type tying together all message types that are used across Subsystems.