Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Node::wait_for_connected_peers() as no longer necessary #1733

Merged
merged 1 commit into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy,
use subspace_networking::libp2p::identity::{ed25519, Keypair};
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::utils::piece_provider::{PieceProvider, PieceValidator};
use subspace_networking::Node;
use subspace_proof_of_space::Table;
use tokio::time::sleep;
use tracing::{debug, error, info, info_span, trace, warn};
Expand All @@ -43,7 +42,6 @@ use zeroize::Zeroizing;
const RECORDS_ROOTS_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(1_000_000).expect("Not zero; qed");
const GET_PIECE_MAX_RETRIES_COUNT: u16 = 3;
const GET_PIECE_DELAY_IN_SECS: u64 = 3;
const POPULATE_PIECE_DELAY: Duration = Duration::from_secs(10);

/// Start farming by using multiple replica plot in specified path and connecting to WebSocket
/// server at specified address.
Expand Down Expand Up @@ -154,9 +152,8 @@ where
Box::pin({
let piece_cache = piece_cache.clone();
let piece_getter = piece_getter.clone();
let node = node.clone();

populate_pieces_cache(node, last_segment_index, piece_getter, piece_cache)
populate_pieces_cache(last_segment_index, piece_getter, piece_cache)
}),
"pieces-cache-population".to_string(),
)?;
Expand Down Expand Up @@ -384,17 +381,13 @@ fn derive_libp2p_keypair(schnorrkel_sk: &schnorrkel::SecretKey) -> Keypair {
/// previous segments to see if they are already in the cache. If they are not, they are added
/// from DSN.
async fn populate_pieces_cache<PV, PC>(
node: Node,
segment_index: SegmentIndex,
piece_getter: Arc<FarmerPieceGetter<PV, PC>>,
piece_cache: Arc<tokio::sync::Mutex<FarmerPieceCache>>,
) where
PV: PieceValidator + Send + Sync + 'static,
PC: PieceCache + Send + 'static,
{
// Give some time to obtain DSN connection.
let _ = node.wait_for_connected_peers(POPULATE_PIECE_DELAY).await;

debug!(%segment_index, "Started syncing piece cache...");
let final_piece_index =
u64::from(segment_index.first_piece_index()) + ArchivedHistorySegment::NUM_PIECES as u64;
Expand Down
4 changes: 0 additions & 4 deletions crates/subspace-networking/examples/get-peers-complex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ async fn main() {
node_runner.run().await;
});

node.wait_for_connected_peers(Duration::from_secs(5))
.await
.unwrap();

// Prepare multihash to look for in Kademlia
let key = Multihash::from(node.id());

Expand Down
5 changes: 0 additions & 5 deletions crates/subspace-networking/src/behavior/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,6 @@ async fn test_async_handler_works_with_pending_internal_future() {
node_runner_2.run().await;
});

node_2
.wait_for_connected_peers(Duration::from_secs(25))
.await
.unwrap();

let resp = node_2
.send_generic_request(node_1.id(), ExampleRequest)
.await
Expand Down
48 changes: 0 additions & 48 deletions crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@ use parity_scale_codec::Decode;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use thiserror::Error;
use tokio::time::sleep;
use tracing::{debug, error, trace};

/// Topic subscription, will unsubscribe when last instance is dropped for a particular topic.
Expand Down Expand Up @@ -120,16 +118,6 @@ impl From<oneshot::Canceled> for GetClosestPeersError {
}
}

#[derive(Debug, Error)]
pub enum CheckConnectedPeersError {
/// Did not connect within provided timeout window.
#[error("Did not connect within provided timeout window")]
Timeout,
/// Node runner was dropped, impossible to check connected peers.
#[error("Node runner was dropped, impossible to check connected peers")]
NodeRunnerDropped,
}

/// Defines errors for `subscribe` operation.
#[derive(Debug, Error)]
pub enum SubscribeError {
Expand Down Expand Up @@ -453,42 +441,6 @@ impl Node {
Ok(result_receiver)
}

/// Waits for peers connection to the swarm and for Kademlia address registration.
pub async fn wait_for_connected_peers(
&self,
timeout: Duration,
) -> Result<(), CheckConnectedPeersError> {
let mut command_sender = self.shared.command_sender.clone();
let fut = async move {
loop {
trace!("Starting 'CheckConnectedPeers' request.");

let (result_sender, result_receiver) = oneshot::channel();

command_sender
.send(Command::CheckConnectedPeers { result_sender })
.await
.map_err(|_| CheckConnectedPeersError::NodeRunnerDropped)?;

let connected_peers_present = result_receiver
.await
.map_err(|_| CheckConnectedPeersError::NodeRunnerDropped)?;

trace!("'CheckConnectedPeers' request returned {connected_peers_present}");

if connected_peers_present {
return Ok(());
}

sleep(Duration::from_millis(50)).await;
}
};

tokio::time::timeout(timeout, fut)
.await
.map_err(|_timeout| CheckConnectedPeersError::Timeout)?
}

/// Start local announcing item by its key. Saves key to the local storage.
/// It doesn't affect Kademlia operations.
pub async fn start_local_announcing(
Expand Down
11 changes: 0 additions & 11 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1188,17 +1188,6 @@ where
IfDisconnected::TryConnect,
);
}
Command::CheckConnectedPeers { result_sender } => {
let connected_peers_present = self.swarm.connected_peers().next().is_some();

let kademlia_connection_initiated = if connected_peers_present {
self.swarm.behaviour_mut().kademlia.bootstrap().is_ok()
} else {
false
};

let _ = result_sender.send(kademlia_connection_initiated);
}
Command::StartLocalAnnouncing { key, result_sender } => {
let local_peer_id = *self.swarm.local_peer_id();
let addresses = self.swarm.external_addresses().cloned().collect::<Vec<_>>();
Expand Down
3 changes: 0 additions & 3 deletions crates/subspace-networking/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ pub(crate) enum Command {
request: Vec<u8>,
result_sender: oneshot::Sender<Result<Vec<u8>, RequestFailure>>,
},
CheckConnectedPeers {
result_sender: oneshot::Sender<bool>,
},
StartLocalAnnouncing {
key: Key,
result_sender: oneshot::Sender<bool>,
Expand Down
13 changes: 0 additions & 13 deletions crates/subspace-service/src/dsn/import_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ use subspace_networking::Node;
// Refuse to compile on non-64-bit platforms, otherwise segment indices will not fit in memory
const_assert!(std::mem::size_of::<usize>() >= std::mem::size_of::<u64>());

/// How long to wait for peers before giving up
const WAIT_FOR_PEERS_TIMEOUT: Duration = Duration::from_secs(10);
/// How many blocks to queue before pausing and waiting for blocks to be imported
const QUEUED_BLOCKS_LIMIT: BlockNumber = 2048;
/// Time to wait for blocks to import if import is too slow
Expand Down Expand Up @@ -190,17 +188,6 @@ where
Client: HeaderBackend<Block> + BlockBackend<Block> + Send + Sync + 'static,
IQS: ImportQueueService<Block> + ?Sized,
{
debug!("Waiting for connected peers...");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should fail if bootstrap works longer than the delay.

if node
.wait_for_connected_peers(WAIT_FOR_PEERS_TIMEOUT)
.await
.is_err()
{
info!("Was not able to find any DSN peers, cancelling sync from DSN");
return Ok(0);
}
debug!("Connected to peers.");

let segment_headers = SegmentHeaderHandler::new(node.clone())
.get_segment_headers()
.await
Expand Down