diff --git a/Cargo.lock b/Cargo.lock index c7dfa1390c..5d32e35b5b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -612,6 +612,15 @@ dependencies = [ "event-listener", ] +[[package]] +name = "async-mutex" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" +dependencies = [ + "event-listener", +] + [[package]] name = "async-oneshot" version = "0.5.0" @@ -10669,6 +10678,7 @@ version = "0.1.0" dependencies = [ "actix-web", "anyhow", + "async-mutex", "async-trait", "backoff", "bytes", diff --git a/crates/subspace-networking/Cargo.toml b/crates/subspace-networking/Cargo.toml index db9bf3d802..a9af82c7d0 100644 --- a/crates/subspace-networking/Cargo.toml +++ b/crates/subspace-networking/Cargo.toml @@ -16,6 +16,7 @@ include = [ ] [dependencies] +async-mutex = "1.4.0" actix-web = "4.3.1" anyhow = "1.0.71" async-trait = "0.1.68" diff --git a/crates/subspace-networking/src/behavior/persistent_parameters.rs b/crates/subspace-networking/src/behavior/persistent_parameters.rs index 0c7f9b3904..635d39c843 100644 --- a/crates/subspace-networking/src/behavior/persistent_parameters.rs +++ b/crates/subspace-networking/src/behavior/persistent_parameters.rs @@ -47,7 +47,7 @@ pub trait NetworkingParametersRegistry: Send + Sync { async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec); /// Unregisters associated addresses for peer ID. - async fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId); + fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId); /// Returns a batch of the combined collection of known addresses from networking parameters DB /// and boostrap addresses from networking parameters initialization. @@ -87,7 +87,7 @@ impl NetworkingParametersRegistry for StubNetworkingParametersManager { async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec) {} - async fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {} + fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {} async fn next_known_addresses_batch(&mut self) -> Vec { Vec::new() @@ -269,7 +269,7 @@ impl NetworkingParametersRegistry for NetworkingParametersManager { self.cache_need_saving = true; } - async fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) { + fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) { trace!(%peer_id, "Remove all peer addresses from the networking parameters registry"); self.known_peers.pop(&peer_id); diff --git a/crates/subspace-networking/src/node.rs b/crates/subspace-networking/src/node.rs index 436be5e2a7..c52e3d2b15 100644 --- a/crates/subspace-networking/src/node.rs +++ b/crates/subspace-networking/src/node.rs @@ -14,7 +14,6 @@ use libp2p::kad::PeerRecord; use libp2p::{Multiaddr, PeerId}; use parity_scale_codec::Decode; use std::pin::Pin; -use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::{Context, Poll}; use std::time::Duration; @@ -22,8 +21,6 @@ use thiserror::Error; use tokio::time::sleep; use tracing::{debug, error, trace}; -const BOOTSTRAP_CHECK_DELAY: Duration = Duration::from_secs(1); - /// Topic subscription, will unsubscribe when last instance is dropped for a particular topic. #[derive(Debug)] #[pin_project::pin_project(PinnedDrop)] @@ -320,7 +317,6 @@ impl Node { &self, key: Multihash, ) -> Result, GetValueError> { - self.wait_for_bootstrap().await; let permit = self.shared.kademlia_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = mpsc::unbounded(); @@ -344,7 +340,6 @@ impl Node { key: Multihash, value: Vec, ) -> Result, PutValueError> { - self.wait_for_bootstrap().await; let permit = self.shared.kademlia_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = mpsc::unbounded(); @@ -365,7 +360,6 @@ impl Node { /// Subcribe to some topic on the DSN. pub async fn subscribe(&self, topic: Sha256Topic) -> Result { - self.wait_for_bootstrap().await; let permit = self.shared.regular_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = oneshot::channel(); @@ -394,7 +388,6 @@ impl Node { /// Subcribe a messgo to some topic on the DSN. pub async fn publish(&self, topic: Sha256Topic, message: Vec) -> Result<(), PublishError> { - self.wait_for_bootstrap().await; let _permit = self.shared.regular_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = oneshot::channel(); @@ -420,7 +413,6 @@ impl Node { where Request: GenericRequest, { - self.wait_for_bootstrap().await; let _permit = self.shared.regular_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = oneshot::channel(); let command = Command::GenericRequest { @@ -442,7 +434,6 @@ impl Node { &self, key: Multihash, ) -> Result, GetClosestPeersError> { - self.wait_for_bootstrap().await; let permit = self.shared.kademlia_tasks_semaphore.acquire().await; trace!(?key, "Starting 'GetClosestPeers' request."); @@ -544,7 +535,6 @@ impl Node { &self, key: Multihash, ) -> Result, GetProvidersError> { - self.wait_for_bootstrap().await; let permit = self.shared.kademlia_tasks_semaphore.acquire().await; let (result_sender, result_receiver) = mpsc::unbounded(); @@ -667,18 +657,4 @@ impl Node { pub fn on_connected_peer(&self, callback: HandlerFn) -> HandlerId { self.shared.handlers.connected_peer.add(callback) } - - pub(crate) async fn wait_for_bootstrap(&self) { - loop { - let was_bootstrapped = self.shared.bootstrap_finished.load(Ordering::SeqCst); - - if was_bootstrapped { - return; - } else { - trace!("Waiting for bootstrap..."); - - sleep(BOOTSTRAP_CHECK_DELAY).await; - } - } - } } diff --git a/crates/subspace-networking/src/node_runner.rs b/crates/subspace-networking/src/node_runner.rs index 7a14f75718..490466cc80 100644 --- a/crates/subspace-networking/src/node_runner.rs +++ b/crates/subspace-networking/src/node_runner.rs @@ -16,10 +16,11 @@ use crate::shared::{Command, CreatedSubscription, NewPeerInfo, Shared}; use crate::utils::{ convert_multiaddresses, is_global_address_or_dns, PeerAddress, ResizableSemaphorePermit, }; +use async_mutex::Mutex as AsyncMutex; use bytes::Bytes; use futures::channel::mpsc; use futures::future::Fuse; -use futures::{FutureExt, SinkExt, StreamExt}; +use futures::{FutureExt, StreamExt}; use libp2p::core::ConnectedPoint; use libp2p::gossipsub::{Event as GossipsubEvent, TopicHash}; use libp2p::identify::Event as IdentifyEvent; @@ -85,6 +86,14 @@ enum QueryResultSender { }, } +#[derive(Debug, Default)] +enum BootstrapCommandState { + #[default] + NotStarted, + InProgress(mpsc::UnboundedReceiver<()>), + Finished, +} + /// Runner for the Node. #[must_use = "Node does not function properly unless its runner is driven forward"] pub struct NodeRunner @@ -129,7 +138,7 @@ where /// Addresses to bootstrap Kademlia network bootstrap_addresses: Vec, /// Ensures a single bootstrap on run() invocation. - was_bootstrapped: bool, + bootstrap_command_state: Arc>, } // Helper struct for NodeRunner configuration (clippy requirement). @@ -196,15 +205,13 @@ where special_connection_decision_handler, rng: StdRng::seed_from_u64(KADEMLIA_PEERS_ADDRESSES_BATCH_SIZE as u64), // any seed bootstrap_addresses, - was_bootstrapped: false, + bootstrap_command_state: Arc::new(AsyncMutex::new(BootstrapCommandState::default())), } } /// Drives the main networking future forward. pub async fn run(&mut self) { - if !self.was_bootstrapped { - self.bootstrap().await; - } + self.bootstrap().await; loop { futures::select! { @@ -226,7 +233,7 @@ where }, command = self.command_receiver.next() => { if let Some(command) = command { - self.handle_command(command).await; + self.handle_command(command); } else { break; } @@ -246,14 +253,38 @@ where /// Bootstraps Kademlia network async fn bootstrap(&mut self) { - self.was_bootstrapped = true; + let bootstrap_command_state = Arc::clone(&self.bootstrap_command_state); + let mut bootstrap_command_state = bootstrap_command_state.lock().await; + let bootstrap_command_receiver = match &mut *bootstrap_command_state { + BootstrapCommandState::NotStarted => { + error!("Bootstrap started."); - let (result_sender, mut result_receiver) = mpsc::unbounded(); + let (bootstrap_command_sender, bootstrap_command_receiver) = mpsc::unbounded(); - debug!("Bootstrap started."); + self.handle_command(Command::Bootstrap { + result_sender: bootstrap_command_sender, + }); + + *bootstrap_command_state = + BootstrapCommandState::InProgress(bootstrap_command_receiver); + match &mut *bootstrap_command_state { + BootstrapCommandState::InProgress(bootstrap_command_receiver) => { + bootstrap_command_receiver + } + _ => { + unreachable!("Was just set to that exact value"); + } + } + } + BootstrapCommandState::InProgress(bootstrap_command_receiver) => { + bootstrap_command_receiver + } + BootstrapCommandState::Finished => { + return; + } + }; - self.handle_command(Command::Bootstrap { result_sender }) - .await; + debug!("Bootstrap started."); let mut bootstrap_step = 0; loop { @@ -266,7 +297,7 @@ where break; } }, - result = result_receiver.next() => { + result = bootstrap_command_receiver.next() => { if result.is_some() { debug!(%bootstrap_step, "Kademlia bootstrapping..."); bootstrap_step += 1; @@ -278,6 +309,7 @@ where } debug!("Bootstrap finished."); + *bootstrap_command_state = BootstrapCommandState::Finished; } /// Handles periodical tasks. @@ -559,7 +591,7 @@ where "Peer has different protocol version. Peer was banned.", ); - self.ban_peer(peer_id).await; + self.ban_peer(peer_id); } if info.listen_addrs.len() > 30 { @@ -853,9 +885,7 @@ where ) || cancelled; } Err(error) => { - debug!(?error, "Bootstrap query failed.",); - - self.set_bootstrap_finished(false); + debug!(?error, "Bootstrap query failed."); } } } @@ -863,28 +893,12 @@ where if last || cancelled { // There will be no more progress self.query_id_receivers.remove(&id); - - if last { - self.set_bootstrap_finished(true); - } - - if cancelled { - self.set_bootstrap_finished(false); - } } } _ => {} } } - fn set_bootstrap_finished(&mut self, success: bool) { - if let Some(shared) = self.shared_weak.upgrade() { - shared.bootstrap_finished.store(true, Ordering::SeqCst); - - debug!(%success, "Bootstrap finished.",); - } - } - // Returns `true` if query was cancelled fn unbounded_send_and_cancel_on_error( kademlia: &mut Kademlia>, @@ -982,7 +996,7 @@ where .add_peers_to_dial(&peers); } - async fn handle_command(&mut self, command: Command) { + fn handle_command(&mut self, command: Command) { match command { Command::GetValue { key, @@ -1234,7 +1248,7 @@ where ); } Command::BanPeer { peer_id } => { - self.ban_peer(peer_id).await; + self.ban_peer(peer_id); } Command::Dial { address } => { let _ = self.swarm.dial(address); @@ -1244,7 +1258,7 @@ where let _ = result_sender.send(connected_peers); } - Command::Bootstrap { mut result_sender } => { + Command::Bootstrap { result_sender } => { let kademlia = &mut self.swarm.behaviour_mut().kademlia; for (peer_id, address) in convert_multiaddresses(self.bootstrap_addresses.clone()) { @@ -1262,17 +1276,13 @@ where } Err(err) => { debug!(?err, "Bootstrap error."); - - let _ = result_sender.close().await; - - self.set_bootstrap_finished(false); } } } } } - async fn ban_peer(&mut self, peer_id: PeerId) { + fn ban_peer(&mut self, peer_id: PeerId) { // Remove temporary ban if there is any before creating a permanent one self.temporary_bans.lock().remove(&peer_id); @@ -1281,8 +1291,7 @@ where self.swarm.behaviour_mut().block_list.block_peer(peer_id); self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id); self.networking_parameters_registry - .remove_all_known_peer_addresses(peer_id) - .await; + .remove_all_known_peer_addresses(peer_id); } fn register_event_metrics(&mut self, swarm_event: &SwarmEvent) { diff --git a/crates/subspace-networking/src/shared.rs b/crates/subspace-networking/src/shared.rs index 9e0cf53c17..5cf6e10bad 100644 --- a/crates/subspace-networking/src/shared.rs +++ b/crates/subspace-networking/src/shared.rs @@ -13,7 +13,7 @@ use libp2p::kad::record::Key; use libp2p::kad::PeerRecord; use libp2p::{Multiaddr, PeerId}; use parking_lot::Mutex; -use std::sync::atomic::{AtomicBool, AtomicUsize}; +use std::sync::atomic::AtomicUsize; use std::sync::Arc; #[derive(Debug)] @@ -126,7 +126,6 @@ pub(crate) struct Shared { pub(crate) command_sender: mpsc::Sender, pub(crate) kademlia_tasks_semaphore: ResizableSemaphore, pub(crate) regular_tasks_semaphore: ResizableSemaphore, - pub(crate) bootstrap_finished: AtomicBool, } impl Shared { @@ -145,7 +144,6 @@ impl Shared { command_sender, kademlia_tasks_semaphore, regular_tasks_semaphore, - bootstrap_finished: AtomicBool::new(false), } } }