Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoT gossip refactoring #1838

Merged
merged 3 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 152 additions & 37 deletions crates/sc-proof-of-time/src/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,70 +1,104 @@
//! PoT gossip functionality.

use crate::state_manager::PotProtocolState;
use crate::PotComponents;
use futures::channel::mpsc;
use futures::{FutureExt, StreamExt};
use parity_scale_codec::Decode;
use parity_scale_codec::{Decode, Encode};
use parking_lot::{Mutex, RwLock};
use sc_client_api::BlockchainEvents;
use sc_network::config::NonDefaultSetConfig;
use sc_network::PeerId;
use sc_network_gossip::{
GossipEngine, MessageIntent, Syncing as GossipSyncing, ValidationResult, Validator,
ValidatorContext,
};
use sp_blockchain::HeaderBackend;
use sp_consensus_subspace::digests::extract_pre_digest;
use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Instant;
use subspace_core_primitives::crypto::blake2b_256_hash;
use subspace_core_primitives::{Blake2b256Hash, PotProof};
use subspace_proof_of_time::ProofOfTime;
use tracing::{error, trace};
use tracing::{debug, error, trace, warn};

pub(crate) const GOSSIP_PROTOCOL: &str = "/subspace/subspace-proof-of-time";

/// PoT gossip components.
#[derive(Clone)]
pub(crate) struct PotGossip<Block: BlockT> {
/// PoT gossip worker
#[must_use = "Gossip worker doesn't do anything unless run() method is called"]
pub struct PotGossipWorker<Block, Client>
where
Block: BlockT,
{
engine: Arc<Mutex<GossipEngine<Block>>>,
validator: Arc<PotGossipValidator>,
validator: Arc<PotGossipValidator<Block>>,
pot_state: Arc<dyn PotProtocolState>,
client: Arc<Client>,
topic: Block::Hash,
outgoing_messages_sender: mpsc::Sender<PotProof>,
outgoing_messages_receiver: mpsc::Receiver<PotProof>,
}

impl<Block: BlockT> PotGossip<Block> {
/// Creates the gossip components.
pub(crate) fn new<Network, GossipSync>(
impl<Block, Client> PotGossipWorker<Block, Client>
where
Block: BlockT,
Client: HeaderBackend<Block> + BlockchainEvents<Block>,
{
/// Instantiate gossip worker
pub fn new<Network, GossipSync>(
components: &PotComponents,
client: Arc<Client>,
network: Network,
sync: Arc<GossipSync>,
pot_state: Arc<dyn PotProtocolState>,
proof_of_time: ProofOfTime,
) -> Self
where
Network: sc_network_gossip::Network<Block> + Send + Sync + Clone + 'static,
GossipSync: GossipSyncing<Block> + 'static,
{
let validator = Arc::new(PotGossipValidator::new(pot_state, proof_of_time));
let topic =
<<Block::Header as HeaderT>::Hashing as HashT>::hash(b"subspace-proof-of-time-gossip");

let validator = Arc::new(PotGossipValidator::new(
Arc::clone(&components.protocol_state),
components.proof_of_time,
topic,
));
let engine = Arc::new(Mutex::new(GossipEngine::new(
network,
sync,
GOSSIP_PROTOCOL,
validator.clone(),
None,
)));
Self { engine, validator }

let (outgoing_messages_sender, outgoing_messages_receiver) = mpsc::channel(0);

Self {
engine,
validator,
pot_state: Arc::clone(&components.protocol_state),
client,
topic,
outgoing_messages_sender,
outgoing_messages_receiver,
}
}

/// Gossips the message to the network.
pub(crate) fn gossip_message(&self, message: Vec<u8>) {
self.validator.on_broadcast(&message);
self.engine
.lock()
.gossip_message(topic::<Block>(), message, false);
/// Sender that can be used to gossip PoT messages to the network
pub fn gossip_sender(&self) -> mpsc::Sender<PotProof> {
self.outgoing_messages_sender.clone()
}

/// Runs the loop to process incoming messages.
/// Returns when the gossip engine terminates.
pub(crate) async fn process_incoming_messages<'a>(
&self,
process_fn: Arc<dyn Fn(PeerId, PotProof) + Send + Sync + 'a>,
) {
let message_receiver = self.engine.lock().messages_for(topic::<Block>());
/// Run gossip engine.
///
/// NOTE: Even though this function is async, it might do blocking operations internally and
/// should be running on a dedicated thread.
pub async fn run(mut self) {
self.initialize().await;

let message_receiver = self.engine.lock().messages_for(self.topic);
let mut incoming_messages = Box::pin(message_receiver.filter_map(
// Filter out messages without sender or fail to decode.
// TODO: penalize nodes that send garbled messages.
Expand All @@ -85,8 +119,11 @@ impl<Block: BlockT> PotGossip<Block> {
futures::select! {
gossiped = incoming_messages.next().fuse() => {
if let Some((sender, proof)) = gossiped {
(process_fn)(sender, proof);
self.handle_incoming_message(sender, proof);
}
},
outgoing_message = self.outgoing_messages_receiver.select_next_some() => {
self.handle_outgoing_message(outgoing_message)
},
_ = gossip_engine_poll.fuse() => {
error!("Gossip engine has terminated");
Expand All @@ -95,22 +132,102 @@ impl<Block: BlockT> PotGossip<Block> {
}
}
}

/// Initializes the chain state from the consensus tip info.
async fn initialize(&self) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about initialize() happening both from the time keeper and the gossip components, particularly in the context of the node restart, etc that we talked about (this may be ok, but not clear right now). After 1860, 1861 the init path would be simplified, so it probably is fine then

Copy link
Member Author

@nazar-pc nazar-pc Aug 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next PR actually removed all initialization of gossip, I think I'll have to change it since initialization changes there as well.

debug!("Waiting for initialization");

// Wait for a block with proofs.
let mut block_import = self.client.import_notification_stream();
while let Some(incoming_block) = block_import.next().await {
let pre_digest = match extract_pre_digest(&incoming_block.header) {
Ok(pre_digest) => pre_digest,
Err(error) => {
warn!(
%error,
block_hash = %incoming_block.hash,
origin = ?incoming_block.origin,
"Failed to get pre_digest",
);
continue;
}
};

let pot_pre_digest = match pre_digest.pot_pre_digest() {
Some(pot_pre_digest) => pot_pre_digest,
None => {
warn!(
block_hash = %incoming_block.hash,
origin = ?incoming_block.origin,
"Failed to get pot_pre_digest",
);
continue;
}
};

if pot_pre_digest.proofs().is_some() {
trace!(
block_hash = %incoming_block.hash,
origin = ?incoming_block.origin,
?pot_pre_digest,
"Initialization complete",
);
return;
}
}
}

/// Handles the incoming gossip message.
fn handle_incoming_message(&self, sender: PeerId, proof: PotProof) {
let start_ts = Instant::now();
let ret = self.pot_state.on_proof_from_peer(sender, &proof);
let elapsed = start_ts.elapsed();

if let Err(error) = ret {
trace!(%error, %sender, "On gossip");
} else {
trace!(%proof, ?elapsed, %sender, "On gossip");
self.engine
.lock()
.gossip_message(self.topic, proof.encode(), false);
}
}

fn handle_outgoing_message(&self, proof: PotProof) {
let message = proof.encode();
self.validator.on_broadcast(&message);
self.engine
.lock()
.gossip_message(self.topic, message, false);
}
}

/// Validator for gossiped messages
struct PotGossipValidator {
struct PotGossipValidator<Block>
where
Block: BlockT,
{
pot_state: Arc<dyn PotProtocolState>,
proof_of_time: ProofOfTime,
pending: RwLock<HashSet<Blake2b256Hash>>,
topic: Block::Hash,
}

impl PotGossipValidator {
impl<Block> PotGossipValidator<Block>
where
Block: BlockT,
{
/// Creates the validator.
fn new(pot_state: Arc<dyn PotProtocolState>, proof_of_time: ProofOfTime) -> Self {
fn new(
pot_state: Arc<dyn PotProtocolState>,
proof_of_time: ProofOfTime,
topic: Block::Hash,
) -> Self {
Self {
pot_state,
proof_of_time,
pending: RwLock::new(HashSet::new()),
topic,
}
}

Expand All @@ -121,7 +238,10 @@ impl PotGossipValidator {
}
}

impl<Block: BlockT> Validator<Block> for PotGossipValidator {
impl<Block> Validator<Block> for PotGossipValidator<Block>
where
Block: BlockT,
{
fn validate(
&self,
_context: &mut dyn ValidatorContext<Block>,
Expand All @@ -138,7 +258,7 @@ impl<Block: BlockT> Validator<Block> for PotGossipValidator {
trace!(%error, "Verification failed");
ValidationResult::Discard
} else {
ValidationResult::ProcessAndKeep(topic::<Block>())
ValidationResult::ProcessAndKeep(self.topic)
}
}
Err(_) => ValidationResult::Discard,
Expand All @@ -162,11 +282,6 @@ impl<Block: BlockT> Validator<Block> for PotGossipValidator {
}
}

/// PoT message topic.
fn topic<Block: BlockT>() -> Block::Hash {
<<Block::Header as HeaderT>::Hashing as HashT>::hash(b"subspace-proof-of-time-gossip")
}

/// Returns the network configuration for PoT gossip.
pub fn pot_gossip_peers_set_config() -> NonDefaultSetConfig {
let mut cfg = NonDefaultSetConfig::new(GOSSIP_PROTOCOL.into(), 5 * 1024 * 1024);
Expand Down
7 changes: 2 additions & 5 deletions crates/sc-proof-of-time/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

#![feature(const_option)]

mod gossip;
mod node_client;
pub mod gossip;
mod state_manager;
mod time_keeper;

Expand All @@ -13,8 +12,6 @@ use std::sync::Arc;
use subspace_core_primitives::{BlockNumber, SlotNumber};
use subspace_proof_of_time::ProofOfTime;

pub use gossip::pot_gossip_peers_set_config;
pub use node_client::PotClient;
pub use state_manager::{
PotConsensusState, PotGetBlockProofsError, PotStateSummary, PotVerifyBlockProofsError,
};
Expand Down Expand Up @@ -87,7 +84,7 @@ impl PotComponents {
let proof_of_time = ProofOfTime::new(config.pot_iterations, config.num_checkpoints)
// TODO: Proper error handling or proof
.expect("Failed to initialize proof of time");
let (protocol_state, consensus_state) = init_pot_state(config, proof_of_time.clone());
let (protocol_state, consensus_state) = init_pot_state(config, proof_of_time);

Self {
is_time_keeper,
Expand Down
Loading