Skip to content

Commit

Permalink
Move handle_gossip_message into gossip engine
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Aug 21, 2023
1 parent 644f4a2 commit 0bf106b
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 54 deletions.
33 changes: 26 additions & 7 deletions crates/sc-proof-of-time/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use sc_network_gossip::{
use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Instant;
use subspace_core_primitives::crypto::blake2b_256_hash;
use subspace_core_primitives::{Blake2b256Hash, PotProof};
use subspace_proof_of_time::ProofOfTime;
Expand All @@ -25,6 +26,7 @@ pub(crate) const GOSSIP_PROTOCOL: &str = "/subspace/subspace-proof-of-time";
pub(crate) struct PotGossip<Block: BlockT> {
engine: Arc<Mutex<GossipEngine<Block>>>,
validator: Arc<PotGossipValidator>,
pot_state: Arc<dyn PotProtocolState>,
}

impl<Block: BlockT> PotGossip<Block> {
Expand All @@ -39,15 +41,22 @@ impl<Block: BlockT> PotGossip<Block> {
Network: sc_network_gossip::Network<Block> + Send + Sync + Clone + 'static,
GossipSync: GossipSyncing<Block> + 'static,
{
let validator = Arc::new(PotGossipValidator::new(pot_state, proof_of_time));
let validator = Arc::new(PotGossipValidator::new(
Arc::clone(&pot_state),
proof_of_time,
));
let engine = Arc::new(Mutex::new(GossipEngine::new(
network,
sync,
GOSSIP_PROTOCOL,
validator.clone(),
None,
)));
Self { engine, validator }
Self {
engine,
validator,
pot_state,
}
}

/// Gossips the message to the network.
Expand All @@ -60,10 +69,7 @@ impl<Block: BlockT> PotGossip<Block> {

/// Runs the loop to process incoming messages.
/// Returns when the gossip engine terminates.
pub(crate) async fn process_incoming_messages<'a>(
&self,
process_fn: Arc<dyn Fn(PeerId, PotProof) + Send + Sync + 'a>,
) {
pub(crate) async fn process_incoming_messages(&self) {
let message_receiver = self.engine.lock().messages_for(topic::<Block>());
let mut incoming_messages = Box::pin(message_receiver.filter_map(
// Filter out messages without sender or fail to decode.
Expand All @@ -85,7 +91,7 @@ impl<Block: BlockT> PotGossip<Block> {
futures::select! {
gossiped = incoming_messages.next().fuse() => {
if let Some((sender, proof)) = gossiped {
(process_fn)(sender, proof);
self.handle_gossip_message(sender, proof);
}
},
_ = gossip_engine_poll.fuse() => {
Expand All @@ -95,6 +101,19 @@ impl<Block: BlockT> PotGossip<Block> {
}
}
}

/// Handles the incoming gossip message.
fn handle_gossip_message(&self, sender: PeerId, proof: PotProof) {
let start_ts = Instant::now();
let ret = self.pot_state.on_proof_from_peer(sender, &proof);
let elapsed = start_ts.elapsed();

if let Err(error) = ret {
trace!(%error, %sender, "On gossip");
} else {
trace!(%proof, ?elapsed, %sender, "On gossip");
}
}
}

/// Validator for gossiped messages
Expand Down
27 changes: 1 addition & 26 deletions crates/sc-proof-of-time/src/node_client.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
//! Consensus node interface to the time keeper network.
use crate::gossip::PotGossip;
use crate::state_manager::PotProtocolState;
use crate::PotComponents;
use futures::StreamExt;
use sc_client_api::BlockchainEvents;
use sc_network::PeerId;
use sc_network_gossip::{Network as GossipNetwork, Syncing as GossipSyncing};
use sp_blockchain::HeaderBackend;
use sp_consensus_subspace::digests::extract_pre_digest;
use sp_core::H256;
use sp_runtime::traits::Block as BlockT;
use std::sync::Arc;
use std::time::Instant;
use subspace_core_primitives::PotProof;
use tracing::{debug, error, trace, warn};

/// The PoT client implementation
pub struct PotClient<Block: BlockT<Hash = H256>, Client> {
pot_state: Arc<dyn PotProtocolState>,
gossip: PotGossip<Block>,
client: Arc<Client>,
}
Expand All @@ -40,7 +35,6 @@ where
GossipSync: GossipSyncing<Block> + 'static,
{
Self {
pot_state: components.protocol_state.clone(),
gossip: PotGossip::new(
network,
sync,
Expand All @@ -54,13 +48,7 @@ where
/// Runs the node client processing loop.
pub async fn run(self) {
self.initialize().await;
let handle_gossip_message: Arc<dyn Fn(PeerId, PotProof) + Send + Sync> =
Arc::new(|sender, proof| {
self.handle_gossip_message(sender, proof);
});
self.gossip
.process_incoming_messages(handle_gossip_message)
.await;
self.gossip.process_incoming_messages().await;
error!("Gossip engine has terminated");
}

Expand Down Expand Up @@ -107,17 +95,4 @@ where
}
}
}

/// Handles the incoming gossip message.
fn handle_gossip_message(&self, sender: PeerId, proof: PotProof) {
let start_ts = Instant::now();
let ret = self.pot_state.on_proof_from_peer(sender, &proof);
let elapsed = start_ts.elapsed();

if let Err(error) = ret {
trace!(%error, %sender, "On gossip");
} else {
trace!(%proof, ?elapsed, %sender, "On gossip");
}
}
}
22 changes: 1 addition & 21 deletions crates/sc-proof-of-time/src/time_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::PotComponents;
use futures::{FutureExt, StreamExt};
use parity_scale_codec::Encode;
use sc_client_api::BlockchainEvents;
use sc_network::PeerId;
use sc_network_gossip::{Network as GossipNetwork, Syncing as GossipSyncing};
use sp_blockchain::HeaderBackend;
use sp_consensus_subspace::digests::extract_pre_digest;
Expand Down Expand Up @@ -74,10 +73,6 @@ where
self.initialize().await;

let mut local_proof_receiver = self.spawn_producer_thread();
let handle_gossip_message: Arc<dyn Fn(PeerId, PotProof) + Send + Sync> =
Arc::new(|sender, proof| {
self.handle_gossip_message(sender, proof);
});
loop {
futures::select! {
local_proof = local_proof_receiver.recv().fuse() => {
Expand All @@ -86,9 +81,7 @@ where
self.handle_local_proof(proof);
}
},
_ = self.gossip.process_incoming_messages(
handle_gossip_message.clone()
).fuse() => {
_ = self.gossip.process_incoming_messages().fuse() => {
error!("Gossip engine has terminated");
return;
}
Expand Down Expand Up @@ -227,17 +220,4 @@ where
fn handle_local_proof(&self, proof: PotProof) {
self.gossip.gossip_message(proof.encode());
}

/// Handles the incoming gossip message.
fn handle_gossip_message(&self, sender: PeerId, proof: PotProof) {
let start_ts = Instant::now();
let ret = self.pot_state.on_proof_from_peer(sender, &proof);
let elapsed = start_ts.elapsed();

if let Err(error) = ret {
trace!(%error, %sender, "On gossip");
} else {
trace!(%proof, ?elapsed, %sender, "On gossip");
}
}
}

0 comments on commit 0bf106b

Please sign in to comment.