diff --git a/crates/sc-proof-of-time/src/gossip.rs b/crates/sc-proof-of-time/src/gossip.rs index 92e22d95a6d..a8e8da40822 100644 --- a/crates/sc-proof-of-time/src/gossip.rs +++ b/crates/sc-proof-of-time/src/gossip.rs @@ -1,15 +1,20 @@ //! PoT gossip functionality. use crate::state_manager::PotProtocolState; +use crate::PotComponents; +use futures::channel::mpsc; use futures::{FutureExt, StreamExt}; -use parity_scale_codec::Decode; +use parity_scale_codec::{Decode, Encode}; use parking_lot::{Mutex, RwLock}; +use sc_client_api::BlockchainEvents; use sc_network::config::NonDefaultSetConfig; use sc_network::PeerId; use sc_network_gossip::{ GossipEngine, MessageIntent, Syncing as GossipSyncing, ValidationResult, Validator, ValidatorContext, }; +use sp_blockchain::HeaderBackend; +use sp_consensus_subspace::digests::extract_pre_digest; use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT}; use std::collections::HashSet; use std::sync::Arc; @@ -17,33 +22,48 @@ use std::time::Instant; use subspace_core_primitives::crypto::blake2b_256_hash; use subspace_core_primitives::{Blake2b256Hash, PotProof}; use subspace_proof_of_time::ProofOfTime; -use tracing::{error, trace}; +use tracing::{debug, error, trace, warn}; pub(crate) const GOSSIP_PROTOCOL: &str = "/subspace/subspace-proof-of-time"; -/// PoT gossip components. -#[derive(Clone)] -pub(crate) struct PotGossip { +/// PoT gossip worker +#[must_use = "Gossip worker doesn't do anything unless run() method is called"] +pub struct PotGossipWorker +where + Block: BlockT, +{ engine: Arc>>, - validator: Arc, + validator: Arc>, pot_state: Arc, + client: Arc, + topic: Block::Hash, + outgoing_messages_sender: mpsc::Sender, + outgoing_messages_receiver: mpsc::Receiver, } -impl PotGossip { - /// Creates the gossip components. - pub(crate) fn new( +impl PotGossipWorker +where + Block: BlockT, + Client: HeaderBackend + BlockchainEvents, +{ + /// Instantiate gossip worker + pub fn new( + components: &PotComponents, + client: Arc, network: Network, sync: Arc, - pot_state: Arc, - proof_of_time: ProofOfTime, ) -> Self where Network: sc_network_gossip::Network + Send + Sync + Clone + 'static, GossipSync: GossipSyncing + 'static, { + let topic = + <::Hashing as HashT>::hash(b"subspace-proof-of-time-gossip"); + let validator = Arc::new(PotGossipValidator::new( - Arc::clone(&pot_state), - proof_of_time, + Arc::clone(&components.protocol_state), + components.proof_of_time, + topic, )); let engine = Arc::new(Mutex::new(GossipEngine::new( network, @@ -52,25 +72,33 @@ impl PotGossip { validator.clone(), None, ))); + + let (outgoing_messages_sender, outgoing_messages_receiver) = mpsc::channel(0); + Self { engine, validator, - pot_state, + pot_state: Arc::clone(&components.protocol_state), + client, + topic, + outgoing_messages_sender, + outgoing_messages_receiver, } } - /// Gossips the message to the network. - pub(crate) fn gossip_message(&self, message: Vec) { - self.validator.on_broadcast(&message); - self.engine - .lock() - .gossip_message(topic::(), message, false); + /// Sender that can be used to gossip PoT messages to the network + pub fn gossip_sender(&self) -> mpsc::Sender { + self.outgoing_messages_sender.clone() } - /// Runs the loop to process incoming messages. - /// Returns when the gossip engine terminates. - pub(crate) async fn process_incoming_messages(&self) { - let message_receiver = self.engine.lock().messages_for(topic::()); + /// Run gossip engine. + /// + /// NOTE: Even though this function is async, it might do blocking operations internally and + /// should be running on a dedicated thread. + pub async fn run(mut self) { + self.initialize().await; + + let message_receiver = self.engine.lock().messages_for(self.topic); let mut incoming_messages = Box::pin(message_receiver.filter_map( // Filter out messages without sender or fail to decode. // TODO: penalize nodes that send garbled messages. @@ -91,8 +119,11 @@ impl PotGossip { futures::select! { gossiped = incoming_messages.next().fuse() => { if let Some((sender, proof)) = gossiped { - self.handle_gossip_message(sender, proof); + self.handle_incoming_message(sender, proof); } + }, + outgoing_message = self.outgoing_messages_receiver.select_next_some() => { + self.handle_outgoing_message(outgoing_message) }, _ = gossip_engine_poll.fuse() => { error!("Gossip engine has terminated"); @@ -102,8 +133,52 @@ impl PotGossip { } } + /// Initializes the chain state from the consensus tip info. + async fn initialize(&self) { + debug!("Waiting for initialization"); + + // Wait for a block with proofs. + let mut block_import = self.client.import_notification_stream(); + while let Some(incoming_block) = block_import.next().await { + let pre_digest = match extract_pre_digest(&incoming_block.header) { + Ok(pre_digest) => pre_digest, + Err(error) => { + warn!( + %error, + block_hash = %incoming_block.hash, + origin = ?incoming_block.origin, + "Failed to get pre_digest", + ); + continue; + } + }; + + let pot_pre_digest = match pre_digest.pot_pre_digest() { + Some(pot_pre_digest) => pot_pre_digest, + None => { + warn!( + block_hash = %incoming_block.hash, + origin = ?incoming_block.origin, + "Failed to get pot_pre_digest", + ); + continue; + } + }; + + if pot_pre_digest.proofs().is_some() { + trace!( + block_hash = %incoming_block.hash, + origin = ?incoming_block.origin, + ?pot_pre_digest, + "Initialization complete", + ); + return; + } + } + } + /// Handles the incoming gossip message. - fn handle_gossip_message(&self, sender: PeerId, proof: PotProof) { + fn handle_incoming_message(&self, sender: PeerId, proof: PotProof) { let start_ts = Instant::now(); let ret = self.pot_state.on_proof_from_peer(sender, &proof); let elapsed = start_ts.elapsed(); @@ -114,22 +189,42 @@ impl PotGossip { trace!(%proof, ?elapsed, %sender, "On gossip"); } } + + fn handle_outgoing_message(&self, proof: PotProof) { + let message = proof.encode(); + self.validator.on_broadcast(&message); + self.engine + .lock() + .gossip_message(self.topic, message, false); + } } /// Validator for gossiped messages -struct PotGossipValidator { +struct PotGossipValidator +where + Block: BlockT, +{ pot_state: Arc, proof_of_time: ProofOfTime, pending: RwLock>, + topic: Block::Hash, } -impl PotGossipValidator { +impl PotGossipValidator +where + Block: BlockT, +{ /// Creates the validator. - fn new(pot_state: Arc, proof_of_time: ProofOfTime) -> Self { + fn new( + pot_state: Arc, + proof_of_time: ProofOfTime, + topic: Block::Hash, + ) -> Self { Self { pot_state, proof_of_time, pending: RwLock::new(HashSet::new()), + topic, } } @@ -140,7 +235,10 @@ impl PotGossipValidator { } } -impl Validator for PotGossipValidator { +impl Validator for PotGossipValidator +where + Block: BlockT, +{ fn validate( &self, _context: &mut dyn ValidatorContext, @@ -157,7 +255,7 @@ impl Validator for PotGossipValidator { trace!(%error, "Verification failed"); ValidationResult::Discard } else { - ValidationResult::ProcessAndKeep(topic::()) + ValidationResult::ProcessAndKeep(self.topic) } } Err(_) => ValidationResult::Discard, @@ -181,11 +279,6 @@ impl Validator for PotGossipValidator { } } -/// PoT message topic. -fn topic() -> Block::Hash { - <::Hashing as HashT>::hash(b"subspace-proof-of-time-gossip") -} - /// Returns the network configuration for PoT gossip. pub fn pot_gossip_peers_set_config() -> NonDefaultSetConfig { let mut cfg = NonDefaultSetConfig::new(GOSSIP_PROTOCOL.into(), 5 * 1024 * 1024); diff --git a/crates/sc-proof-of-time/src/lib.rs b/crates/sc-proof-of-time/src/lib.rs index 22305cc9dcb..4f17dbcb2a0 100644 --- a/crates/sc-proof-of-time/src/lib.rs +++ b/crates/sc-proof-of-time/src/lib.rs @@ -2,8 +2,7 @@ #![feature(const_option)] -mod gossip; -mod node_client; +pub mod gossip; mod state_manager; mod time_keeper; @@ -13,8 +12,6 @@ use std::sync::Arc; use subspace_core_primitives::{BlockNumber, SlotNumber}; use subspace_proof_of_time::ProofOfTime; -pub use gossip::pot_gossip_peers_set_config; -pub use node_client::PotClient; pub use state_manager::{ PotConsensusState, PotGetBlockProofsError, PotStateSummary, PotVerifyBlockProofsError, }; @@ -87,7 +84,7 @@ impl PotComponents { let proof_of_time = ProofOfTime::new(config.pot_iterations, config.num_checkpoints) // TODO: Proper error handling or proof .expect("Failed to initialize proof of time"); - let (protocol_state, consensus_state) = init_pot_state(config, proof_of_time.clone()); + let (protocol_state, consensus_state) = init_pot_state(config, proof_of_time); Self { is_time_keeper, diff --git a/crates/sc-proof-of-time/src/node_client.rs b/crates/sc-proof-of-time/src/node_client.rs deleted file mode 100644 index 9683e09aaef..00000000000 --- a/crates/sc-proof-of-time/src/node_client.rs +++ /dev/null @@ -1,98 +0,0 @@ -//! Consensus node interface to the time keeper network. - -use crate::gossip::PotGossip; -use crate::PotComponents; -use futures::StreamExt; -use sc_client_api::BlockchainEvents; -use sc_network_gossip::{Network as GossipNetwork, Syncing as GossipSyncing}; -use sp_blockchain::HeaderBackend; -use sp_consensus_subspace::digests::extract_pre_digest; -use sp_core::H256; -use sp_runtime::traits::Block as BlockT; -use std::sync::Arc; -use tracing::{debug, error, trace, warn}; - -/// The PoT client implementation -pub struct PotClient, Client> { - gossip: PotGossip, - client: Arc, -} - -impl PotClient -where - Block: BlockT, - Client: HeaderBackend + BlockchainEvents, -{ - /// Creates the PoT client instance. - pub fn new( - components: PotComponents, - client: Arc, - network: Network, - sync: Arc, - ) -> Self - where - Network: GossipNetwork + Send + Sync + Clone + 'static, - GossipSync: GossipSyncing + 'static, - { - Self { - gossip: PotGossip::new( - network, - sync, - components.protocol_state, - components.proof_of_time, - ), - client, - } - } - - /// Runs the node client processing loop. - pub async fn run(self) { - self.initialize().await; - self.gossip.process_incoming_messages().await; - error!("Gossip engine has terminated"); - } - - /// Initializes the chain state from the consensus tip info. - async fn initialize(&self) { - debug!("Waiting for initialization"); - - // Wait for a block with proofs. - let mut block_import = self.client.import_notification_stream(); - while let Some(incoming_block) = block_import.next().await { - let pre_digest = match extract_pre_digest(&incoming_block.header) { - Ok(pre_digest) => pre_digest, - Err(error) => { - warn!( - %error, - block_hash = %incoming_block.hash, - origin = ?incoming_block.origin, - "Failed to get pre_digest", - ); - continue; - } - }; - - let pot_pre_digest = match pre_digest.pot_pre_digest() { - Some(pot_pre_digest) => pot_pre_digest, - None => { - warn!( - block_hash = %incoming_block.hash, - origin = ?incoming_block.origin, - "Failed to get pot_pre_digest", - ); - continue; - } - }; - - if pot_pre_digest.proofs().is_some() { - trace!( - block_hash = %incoming_block.hash, - origin = ?incoming_block.origin, - ?pot_pre_digest, - "Initialization complete", - ); - return; - } - } - } -} diff --git a/crates/sc-proof-of-time/src/time_keeper.rs b/crates/sc-proof-of-time/src/time_keeper.rs index 32c96d05074..5c2cea8c5a0 100644 --- a/crates/sc-proof-of-time/src/time_keeper.rs +++ b/crates/sc-proof-of-time/src/time_keeper.rs @@ -1,16 +1,15 @@ //! Time keeper implementation. -use crate::gossip::PotGossip; use crate::state_manager::PotProtocolState; use crate::PotComponents; -use futures::{FutureExt, StreamExt}; -use parity_scale_codec::Encode; +use futures::channel::mpsc; +use futures::{SinkExt, StreamExt}; use sc_client_api::BlockchainEvents; -use sc_network_gossip::{Network as GossipNetwork, Syncing as GossipSyncing}; use sp_blockchain::HeaderBackend; use sp_consensus_subspace::digests::extract_pre_digest; use sp_core::H256; use sp_runtime::traits::Block as BlockT; +use std::marker::PhantomData; use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; @@ -25,15 +24,16 @@ use tracing::{debug, error, trace, warn}; const PROOFS_CHANNEL_SIZE: usize = 12; // 2 * reveal lag. /// The time keeper manages the protocol: periodic proof generation/verification, gossip. -pub struct TimeKeeper, Client> { +pub struct TimeKeeper { proof_of_time: ProofOfTime, pot_state: Arc, - gossip: PotGossip, client: Arc, // Expected time to produce a proof. // TODO: this will be removed after the pot_iterations is set // to produce a proof/sec. target_proof_time: Duration, + gossip_sender: mpsc::Sender, + _block: PhantomData, } impl TimeKeeper @@ -42,49 +42,32 @@ where Client: HeaderBackend + BlockchainEvents, { /// Creates the time keeper instance. - pub fn new( - components: PotComponents, + pub fn new( + components: &PotComponents, client: Arc, - network: Network, - sync: Arc, target_proof_time: Duration, - ) -> Self - where - Network: GossipNetwork + Send + Sync + Clone + 'static, - GossipSync: GossipSyncing + 'static, - { - let PotComponents { - proof_of_time, - protocol_state: pot_state, - .. - } = components; - + gossip_sender: mpsc::Sender, + ) -> Self { Self { - proof_of_time: proof_of_time.clone(), - pot_state: pot_state.clone(), - gossip: PotGossip::new(network, sync, pot_state, proof_of_time), + proof_of_time: components.proof_of_time, + pot_state: Arc::clone(&components.protocol_state), client, target_proof_time, + gossip_sender, + _block: PhantomData, } } /// Runs the time keeper processing loop. - pub async fn run(self) { + pub async fn run(mut self) { self.initialize().await; let mut local_proof_receiver = self.spawn_producer_thread(); - loop { - futures::select! { - local_proof = local_proof_receiver.recv().fuse() => { - if let Some(proof) = local_proof { - trace!(%proof, "Got local proof"); - self.handle_local_proof(proof); - } - }, - _ = self.gossip.process_incoming_messages().fuse() => { - error!("Gossip engine has terminated"); - return; - } + while let Some(proof) = local_proof_receiver.recv().await { + trace!(%proof, "Got local proof"); + if let Err(error) = self.gossip_sender.send(proof).await { + error!(%error, "Failed to send proof to gossip"); + return; } } } @@ -151,7 +134,7 @@ where /// Starts the thread to produce the proofs. fn spawn_producer_thread(&self) -> Receiver { let (sender, receiver) = channel(PROOFS_CHANNEL_SIZE); - let proof_of_time = self.proof_of_time.clone(); + let proof_of_time = self.proof_of_time; let pot_state = self.pot_state.clone(); let target_proof_time = self.target_proof_time; thread::Builder::new() @@ -215,9 +198,4 @@ where } } } - - /// Gossips the locally generated proof. - fn handle_local_proof(&self, proof: PotProof) { - self.gossip.gossip_message(proof.encode()); - } } diff --git a/crates/subspace-proof-of-time/src/lib.rs b/crates/subspace-proof-of-time/src/lib.rs index e07d277d491..9825b0e76b2 100644 --- a/crates/subspace-proof-of-time/src/lib.rs +++ b/crates/subspace-proof-of-time/src/lib.rs @@ -35,7 +35,7 @@ pub enum PotVerificationError { } /// Wrapper for the low level AES primitives -#[derive(Clone)] +#[derive(Debug, Clone, Copy)] pub struct ProofOfTime { /// Number of checkpoints per PoT. num_checkpoints: u8, diff --git a/crates/subspace-service/src/lib.rs b/crates/subspace-service/src/lib.rs index 50379ec9e0b..0260517cea4 100644 --- a/crates/subspace-service/src/lib.rs +++ b/crates/subspace-service/src/lib.rs @@ -55,7 +55,8 @@ use sc_consensus_subspace::{ }; use sc_executor::{NativeElseWasmExecutor, NativeExecutionDispatch}; use sc_network::NetworkService; -use sc_proof_of_time::{pot_gossip_peers_set_config, PotClient, PotComponents, TimeKeeper}; +use sc_proof_of_time::gossip::{pot_gossip_peers_set_config, PotGossipWorker}; +use sc_proof_of_time::{PotComponents, TimeKeeper}; use sc_service::error::Error as ServiceError; use sc_service::{Configuration, NetworkStarter, PartialComponents, SpawnTasksParams, TaskManager}; use sc_subspace_block_relay::{build_consensus_relay, NetworkWrapper}; @@ -772,13 +773,27 @@ where .as_ref() .map(|component| component.consensus_state()); if let Some(components) = pot_components { + let pot_gossip_worker = PotGossipWorker::new( + &components, + client.clone(), + network_service.clone(), + sync_service.clone(), + ); + let gossip_sender = pot_gossip_worker.gossip_sender(); + task_manager.spawn_essential_handle().spawn_blocking( + "pot-gossip-worker", + Some("pot"), + async move { + pot_gossip_worker.run().await; + }, + ); + if components.is_time_keeper() { - let time_keeper = TimeKeeper::::new( - components, + let time_keeper = TimeKeeper::new( + &components, client.clone(), - network_service.clone(), - sync_service.clone(), subspace_link.slot_duration().as_duration(), + gossip_sender, ); task_manager.spawn_essential_handle().spawn_blocking( @@ -788,20 +803,6 @@ where time_keeper.run().await; }, ); - } else { - let pot_client = PotClient::::new( - components, - client.clone(), - network_service.clone(), - sync_service.clone(), - ); - task_manager.spawn_essential_handle().spawn_blocking( - "subspace-proof-of-time-client", - Some("pot"), - async move { - pot_client.run().await; - }, - ); } }