Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small cleanups #1836

Merged
merged 3 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/pallet-domains/src/runtime_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ mod tests {
fn go_to_block(block: u64) {
for i in System::block_number() + 1..=block {
let parent_hash = if System::block_number() > 1 {
let hdr = System::finalize();
hdr.hash()
let header = System::finalize();
header.hash()
} else {
System::parent_hash()
};
Expand Down
4 changes: 2 additions & 2 deletions crates/pallet-subspace/src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ pub fn go_to_block(
Subspace::on_finalize(System::block_number());

let parent_hash = if System::block_number() > 1 {
let hdr = System::finalize();
hdr.hash()
let header = System::finalize();
header.hash()
} else {
System::parent_hash()
};
Expand Down
30 changes: 10 additions & 20 deletions crates/sc-proof-of-time/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,12 @@ use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT};
use std::collections::HashSet;
use std::sync::Arc;
use subspace_core_primitives::crypto::blake2b_256_hash;
use subspace_core_primitives::PotProof;
use subspace_core_primitives::{Blake2b256Hash, PotProof};
use subspace_proof_of_time::ProofOfTime;
use tracing::{error, trace};

pub(crate) const GOSSIP_PROTOCOL: &str = "/subspace/subspace-proof-of-time";

type MessageHash = [u8; 32];

/// PoT gossip components.
#[derive(Clone)]
pub(crate) struct PotGossip<Block: BlockT> {
Expand Down Expand Up @@ -91,7 +89,7 @@ impl<Block: BlockT> PotGossip<Block> {
}
},
_ = gossip_engine_poll.fuse() => {
error!("Gossip engine has terminated.");
error!("Gossip engine has terminated");
return;
}
}
Expand All @@ -103,7 +101,7 @@ impl<Block: BlockT> PotGossip<Block> {
struct PotGossipValidator {
pot_state: Arc<dyn PotProtocolState>,
proof_of_time: ProofOfTime,
pending: RwLock<HashSet<MessageHash>>,
pending: RwLock<HashSet<Blake2b256Hash>>,
}

impl PotGossipValidator {
Expand All @@ -119,8 +117,7 @@ impl PotGossipValidator {
/// Called when the message is broadcast.
fn on_broadcast(&self, msg: &[u8]) {
let hash = blake2b_256_hash(msg);
let mut pending = self.pending.write();
pending.insert(hash);
self.pending.write().insert(hash);
}
}

Expand All @@ -134,11 +131,11 @@ impl<Block: BlockT> Validator<Block> for PotGossipValidator {
match PotProof::decode(&mut data) {
Ok(proof) => {
// Perform AES verification only if the proof is a candidate.
if let Err(err) = self.pot_state.is_candidate(*sender, &proof) {
trace!("gossip::validate: not a candidate: {err:?}");
if let Err(error) = self.pot_state.is_candidate(*sender, &proof) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i like err as an instance of Error rather than error though non blocker

trace!(%error, "Not a candidate");
ValidationResult::Discard
} else if let Err(err) = self.proof_of_time.verify(&proof) {
trace!("gossip::validate: verification failed: {err:?}");
} else if let Err(error) = self.proof_of_time.verify(&proof) {
trace!(%error, "Verification failed");
ValidationResult::Discard
} else {
ValidationResult::ProcessAndKeep(topic::<Block>())
Expand All @@ -151,8 +148,7 @@ impl<Block: BlockT> Validator<Block> for PotGossipValidator {
fn message_expired<'a>(&'a self) -> Box<dyn FnMut(Block::Hash, &[u8]) -> bool + 'a> {
Box::new(move |_topic, data| {
let hash = blake2b_256_hash(data);
let pending = self.pending.read();
!pending.contains(&hash)
!self.pending.read().contains(&hash)
})
}

Expand All @@ -161,13 +157,7 @@ impl<Block: BlockT> Validator<Block> for PotGossipValidator {
) -> Box<dyn FnMut(&PeerId, MessageIntent, &Block::Hash, &[u8]) -> bool + 'a> {
Box::new(move |_who, _intent, _topic, data| {
let hash = blake2b_256_hash(data);
let mut pending = self.pending.write();
if pending.contains(&hash) {
pending.remove(&hash);
true
} else {
false
}
self.pending.write().remove(&hash)
})
}
}
Expand Down
33 changes: 19 additions & 14 deletions crates/sc-proof-of-time/src/node_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use sp_runtime::traits::Block as BlockT;
use std::sync::Arc;
use std::time::Instant;
use subspace_core_primitives::PotProof;
use tracing::{error, info, trace, warn};
use tracing::{debug, error, trace, warn};

/// The PoT client implementation
pub struct PotClient<Block: BlockT<Hash = H256>, Client> {
Expand Down Expand Up @@ -61,22 +61,24 @@ where
self.gossip
.process_incoming_messages(handle_gossip_message)
.await;
error!("pot_client: gossip engine has terminated.");
error!("Gossip engine has terminated");
}

/// Initializes the chain state from the consensus tip info.
async fn initialize(&self) {
info!("pot_client::initialize: waiting for initialization ...");
debug!("Waiting for initialization");

// Wait for a block with proofs.
let mut block_import = self.client.import_notification_stream();
while let Some(incoming_block) = block_import.next().await {
let pre_digest = match extract_pre_digest(&incoming_block.header) {
Ok(pre_digest) => pre_digest,
Err(err) => {
Err(error) => {
warn!(
"pot_client::initialize: failed to get pre_digest: {}/{:?}/{err:?}",
incoming_block.hash, incoming_block.origin
%error,
block_hash = %incoming_block.hash,
origin = ?incoming_block.origin,
"Failed to get pre_digest",
);
continue;
}
Expand All @@ -86,17 +88,20 @@ where
Some(pot_pre_digest) => pot_pre_digest,
None => {
warn!(
"pot_client::initialize: failed to get pot_pre_digest: {}/{:?}",
incoming_block.hash, incoming_block.origin
block_hash = %incoming_block.hash,
origin = ?incoming_block.origin,
"Failed to get pot_pre_digest",
);
continue;
}
};

if pot_pre_digest.proofs().is_some() {
info!(
"pot_client::initialize: initialization complete: {}/{:?}, pot_pre_digest = {:?}",
incoming_block.hash, incoming_block.origin, pot_pre_digest
trace!(
block_hash = %incoming_block.hash,
origin = ?incoming_block.origin,
?pot_pre_digest,
"Initialization complete",
);
return;
}
Expand All @@ -109,10 +114,10 @@ where
let ret = self.pot_state.on_proof_from_peer(sender, &proof);
let elapsed = start_ts.elapsed();

if let Err(err) = ret {
trace!("pot_client::on gossip: {err:?}, {sender}");
if let Err(error) = ret {
trace!(%error, %sender, "On gossip");
} else {
trace!("pot_client::on gossip: {proof}, time=[{elapsed:?}], {sender}");
trace!(%proof, ?elapsed, %sender, "On gossip");
}
}
}
18 changes: 11 additions & 7 deletions crates/sc-proof-of-time/src/state_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -794,26 +794,30 @@ impl PotConsensusState for StateManager {
.as_ref()
.map_or(false, |wait_time| start_ts.elapsed() < *wait_time)
};
let retry_delay = tokio::time::Duration::from_millis(200);
let retry_delay = Duration::from_millis(200);
let mut retries = 0;
loop {
let ret =
let result =
self.state
.lock()
.get_block_proofs(block_number, current_slot, parent_pre_digest);
match ret {
Ok(_) => return ret,
match result {
Ok(_) => return result,
Err(PotGetBlockProofsError::ProofUnavailable { .. }) => {
if (should_wait)() {
// TODO: notification instead of sleep/retry.
retries += 1;
trace!("get_block_proofs: {ret:?}, retry {retries}...",);
trace!(
?result,
%retries,
"Proof unavailable, retrying...",
);
tokio::time::sleep(retry_delay).await;
} else {
return ret;
return result;
}
}
_ => return ret,
_ => return result,
}
}
}
Expand Down
52 changes: 30 additions & 22 deletions crates/sc-proof-of-time/src/time_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::time::{Duration, Instant};
use subspace_core_primitives::{NonEmptyVec, PotProof, PotSeed};
use subspace_proof_of_time::ProofOfTime;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tracing::{error, info, trace, warn};
use tracing::{debug, error, trace, warn};

/// Channel size to send the produced proofs.
/// The proof producer thread will block if the receiver is behind and
Expand Down Expand Up @@ -82,14 +82,14 @@ where
futures::select! {
local_proof = local_proof_receiver.recv().fuse() => {
if let Some(proof) = local_proof {
trace!("time_keeper: got local proof: {proof}");
trace!(%proof, "Got local proof");
self.handle_local_proof(proof);
}
},
_ = self.gossip.process_incoming_messages(
handle_gossip_message.clone()
).fuse() => {
error!("time_keeper: gossip engine has terminated.");
error!("Gossip engine has terminated");
return;
}
}
Expand All @@ -98,17 +98,19 @@ where

/// Initializes the chain state from the consensus tip info.
async fn initialize(&self) {
info!("time_keeper::initialize: waiting for initialization ...");
debug!("Waiting for initialization");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this changed? Without target and no prefix, the logs don't have much context

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They actually do, as mentioned in previous PR reviews, these are excessive and hard to read, logger already prints the module when you log something and you don't need to do it manually, here is an example:

2023-08-17T18:22:46.374851Z DEBUG single_disk_farm{disk_farm_index=0}: subspace_farmer::single_disk_farm::farming: Solution found slot=846148283 sector_index=9
2023-08-17T18:22:46.379039Z  INFO single_disk_farm{disk_farm_index=0}: subspace_farmer::reward_signing: Successfully signed reward hash 0x1f4767f4785b35cf0766f783e076356a3f0794aed2cfc10ad8647396aad173d0

And while I understand you're still developing it, info level should be used very occasionally for things users would actually care about and understand, hence downgrade to debug.


// Wait for the first block import.
let mut block_import = self.client.import_notification_stream();
while let Some(incoming_block) = block_import.next().await {
let pre_digest = match extract_pre_digest(&incoming_block.header) {
Ok(pre_digest) => pre_digest,
Err(err) => {
Err(error) => {
warn!(
"time_keeper::initialize: failed to get pre_digest: {}/{:?}/{err:?}",
incoming_block.hash, incoming_block.origin
%error,
rahulksnv marked this conversation as resolved.
Show resolved Hide resolved
block_hash = %incoming_block.hash,
origin = ?incoming_block.origin,
"Failed to get pre_digest",
);
continue;
}
Expand All @@ -118,16 +120,19 @@ where
Some(pot_pre_digest) => pot_pre_digest,
None => {
warn!(
"time_keeper::initialize: failed to get pot_pre_digest: {}/{:?}",
incoming_block.hash, incoming_block.origin
block_hash = %incoming_block.hash,
origin = ?incoming_block.origin,
"Failed to get pot_pre_digest",
);
continue;
}
};

info!(
"time_keeper::initialize: initialization complete: {}/{:?}, pot_pre_digest = {:?}",
incoming_block.hash, incoming_block.origin, pot_pre_digest
debug!(
block_hash = %incoming_block.hash,
rahulksnv marked this conversation as resolved.
Show resolved Hide resolved
origin = ?incoming_block.origin,
?pot_pre_digest,
"Initialization complete",
);
let proofs = pot_pre_digest.proofs().cloned().unwrap_or_else(|| {
// Producing proofs starting from (genesis_slot + 1).
Expand All @@ -141,7 +146,7 @@ where
.expect("Initial slot number should be available for block_number >= 1"),
block_hash,
);
info!("time_keeper::initialize: creating first proof: {proof}");
debug!(%proof, "Creating first proof");
NonEmptyVec::new_with_entry(proof)
});

Expand Down Expand Up @@ -191,14 +196,18 @@ where
last_proof.injected_block_hash,
);
let elapsed = start_ts.elapsed();
trace!("time_keeper::produce proofs: {next_proof}, time=[{elapsed:?}]");
trace!(
%next_proof,
?elapsed,
"Produce proofs",
);

// Store the new proof back into the chain and gossip to other time keepers.
if let Err(e) = state.on_proof(&next_proof) {
info!("time_keeper::produce proofs: failed to extend chain: {e:?}");
if let Err(error) = state.on_proof(&next_proof) {
error!(%error, "Produce proofs: failed to extend chain");
continue;
} else if let Err(e) = proof_sender.blocking_send(next_proof.clone()) {
warn!("time_keeper::produce proofs: send failed: {e:?}");
} else if let Err(error) = proof_sender.blocking_send(next_proof.clone()) {
warn!(%error, "Produce proofs: send failed");
return;
}

Expand All @@ -225,11 +234,10 @@ where
let ret = self.pot_state.on_proof_from_peer(sender, &proof);
let elapsed = start_ts.elapsed();

if let Err(err) = ret {
trace!("time_keeper::on gossip: {err:?}, {sender}");
if let Err(error) = ret {
trace!(%error, %sender, "On gossip");
} else {
trace!("time_keeper::on gossip: {proof}, time=[{elapsed:?}], {sender}");
self.gossip.gossip_message(proof.encode());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed .. this is a difference between time keeper and node client

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... I'm sorry, I think I missed it when copy-pasting from one file to another 😕

Why though? The whole point of gossip engine is to handle gossip, I don't think you need to manually re-gossip what you have received.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Substrate gossip requires you to manually resend against the topic, if you want to propagate to the peers (we want to in case of time keeper). ValidationResult::ProcessAndKeep and ValidationResult::Discard only indicate if these should be kept in the per-peer queue or not(to validate future messages for duplicates, etc), from when I looked at their code recently. Pls correct if it is not so ..

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think you're write, this probably needs to be restored. At the same time why should this only be done by time keeper? Client should also re-gossip received messages, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consensus nodes already receive O(num time keeper) messages per second now which cannot be avoided. Would be good to avoid adding more volume to that. And if they gossip, will go back to time keepers also. We could possibly consider two different topics in future.

In case of time keepers, the gossip was needed for node sync IIRC. This part is still changing though

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think that is correct. You assume that every consensus node is connected to at least one timekeeper. That is not guaranteed to be the case, very far from it actually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, I was assuming a close to full mesh topology. In that case we would indeed need to gossip from consensus nodes also

trace!(%proof, ?elapsed, %sender, "On gossip");
}
}
}
Loading