Skip to content

Commit

Permalink
Simplify bootstrapping process
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Jul 26, 2023
1 parent b7de051 commit e8bf334
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 83 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/subspace-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ include = [
]

[dependencies]
async-mutex = "1.4.0"
actix-web = "4.3.1"
anyhow = "1.0.71"
async-trait = "0.1.68"
Expand Down
13 changes: 11 additions & 2 deletions crates/subspace-networking/src/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::node_runner::{NodeRunner, NodeRunnerConfig};
use crate::peer_info::PeerInfoProvider;
use crate::request_responses::RequestHandler;
use crate::reserved_peers::Config as ReservedPeersConfig;
use crate::shared::Shared;
use crate::shared::{Command, Shared};
use crate::utils::{convert_multiaddresses, ResizableSemaphore};
use crate::{PeerInfo, PeerInfoConfig};
use backoff::{ExponentialBackoff, SystemClock};
Expand Down Expand Up @@ -475,7 +475,15 @@ where
}

// Create final structs
let (command_sender, command_receiver) = mpsc::channel(1);
let (mut command_sender, command_receiver) = mpsc::channel(1);

let (bootstrap_command_sender, bootstrap_command_receiver) = mpsc::unbounded();
// Bootstrapping is the first thing that happens, see `NodeRunner::bootstrap()`
command_sender
.try_send(Command::Bootstrap {
result_sender: bootstrap_command_sender,
})
.expect("Channel was just created, error is not possible; qed");

let kademlia_tasks_semaphore = ResizableSemaphore::new(KADEMLIA_BASE_CONCURRENT_TASKS);
let regular_tasks_semaphore = ResizableSemaphore::new(REGULAR_BASE_CONCURRENT_TASKS);
Expand Down Expand Up @@ -504,6 +512,7 @@ where
general_connection_decision_handler,
special_connection_decision_handler,
bootstrap_addresses,
bootstrap_command_receiver,
});

Ok((node, node_runner))
Expand Down
24 changes: 0 additions & 24 deletions crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,13 @@ use libp2p::kad::PeerRecord;
use libp2p::{Multiaddr, PeerId};
use parity_scale_codec::Decode;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use thiserror::Error;
use tokio::time::sleep;
use tracing::{debug, error, trace};

const BOOTSTRAP_CHECK_DELAY: Duration = Duration::from_secs(1);

/// Topic subscription, will unsubscribe when last instance is dropped for a particular topic.
#[derive(Debug)]
#[pin_project::pin_project(PinnedDrop)]
Expand Down Expand Up @@ -320,7 +317,6 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerRecord>, GetValueError> {
self.wait_for_bootstrap().await;
let permit = self.shared.kademlia_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = mpsc::unbounded();

Expand All @@ -344,7 +340,6 @@ impl Node {
key: Multihash,
value: Vec<u8>,
) -> Result<impl Stream<Item = ()>, PutValueError> {
self.wait_for_bootstrap().await;
let permit = self.shared.kademlia_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = mpsc::unbounded();

Expand All @@ -365,7 +360,6 @@ impl Node {

/// Subcribe to some topic on the DSN.
pub async fn subscribe(&self, topic: Sha256Topic) -> Result<TopicSubscription, SubscribeError> {
self.wait_for_bootstrap().await;
let permit = self.shared.regular_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = oneshot::channel();

Expand Down Expand Up @@ -394,7 +388,6 @@ impl Node {

/// Subcribe a messgo to some topic on the DSN.
pub async fn publish(&self, topic: Sha256Topic, message: Vec<u8>) -> Result<(), PublishError> {
self.wait_for_bootstrap().await;
let _permit = self.shared.regular_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = oneshot::channel();

Expand All @@ -420,7 +413,6 @@ impl Node {
where
Request: GenericRequest,
{
self.wait_for_bootstrap().await;
let _permit = self.shared.regular_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = oneshot::channel();
let command = Command::GenericRequest {
Expand All @@ -442,7 +434,6 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerId>, GetClosestPeersError> {
self.wait_for_bootstrap().await;
let permit = self.shared.kademlia_tasks_semaphore.acquire().await;
trace!(?key, "Starting 'GetClosestPeers' request.");

Expand Down Expand Up @@ -544,7 +535,6 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerId>, GetProvidersError> {
self.wait_for_bootstrap().await;
let permit = self.shared.kademlia_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = mpsc::unbounded();

Expand Down Expand Up @@ -667,18 +657,4 @@ impl Node {
pub fn on_connected_peer(&self, callback: HandlerFn<PeerId>) -> HandlerId {
self.shared.handlers.connected_peer.add(callback)
}

pub(crate) async fn wait_for_bootstrap(&self) {
loop {
let was_bootstrapped = self.shared.bootstrap_finished.load(Ordering::SeqCst);

if was_bootstrapped {
return;
} else {
trace!("Waiting for bootstrap...");

sleep(BOOTSTRAP_CHECK_DELAY).await;
}
}
}
}
87 changes: 33 additions & 54 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::shared::{Command, CreatedSubscription, NewPeerInfo, Shared};
use crate::utils::{
convert_multiaddresses, is_global_address_or_dns, PeerAddress, ResizableSemaphorePermit,
};
use async_mutex::Mutex as AsyncMutex;
use bytes::Bytes;
use futures::channel::mpsc;
use futures::future::Fuse;
Expand Down Expand Up @@ -129,7 +130,7 @@ where
/// Addresses to bootstrap Kademlia network
bootstrap_addresses: Vec<Multiaddr>,
/// Ensures a single bootstrap on run() invocation.
was_bootstrapped: bool,
bootstrap_command_receiver: Arc<AsyncMutex<Option<mpsc::UnboundedReceiver<()>>>>,
}

// Helper struct for NodeRunner configuration (clippy requirement).
Expand All @@ -150,6 +151,7 @@ where
pub(crate) general_connection_decision_handler: ConnectedPeersHandler,
pub(crate) special_connection_decision_handler: ConnectedPeersHandler,
pub(crate) bootstrap_addresses: Vec<Multiaddr>,
pub(crate) bootstrap_command_receiver: mpsc::UnboundedReceiver<()>,
}

impl<ProviderStorage> NodeRunner<ProviderStorage>
Expand All @@ -171,6 +173,7 @@ where
general_connection_decision_handler,
special_connection_decision_handler,
bootstrap_addresses,
bootstrap_command_receiver,
}: NodeRunnerConfig<ProviderStorage>,
) -> Self {
Self {
Expand All @@ -196,15 +199,13 @@ where
special_connection_decision_handler,
rng: StdRng::seed_from_u64(KADEMLIA_PEERS_ADDRESSES_BATCH_SIZE as u64), // any seed
bootstrap_addresses,
was_bootstrapped: false,
bootstrap_command_receiver: Arc::new(AsyncMutex::new(Some(bootstrap_command_receiver))),
}
}

/// Drives the main networking future forward.
pub async fn run(&mut self) {
if !self.was_bootstrapped {
self.bootstrap().await;
}
self.bootstrap().await;

loop {
futures::select! {
Expand Down Expand Up @@ -246,38 +247,36 @@ where

/// Bootstraps Kademlia network
async fn bootstrap(&mut self) {
self.was_bootstrapped = true;

let (result_sender, mut result_receiver) = mpsc::unbounded();

debug!("Bootstrap started.");

self.handle_command(Command::Bootstrap { result_sender })
.await;

let mut bootstrap_step = 0;
loop {
futures::select! {
swarm_event = self.swarm.next() => {
if let Some(swarm_event) = swarm_event {
self.register_event_metrics(&swarm_event);
self.handle_swarm_event(swarm_event).await;
} else {
break;
}
},
result = result_receiver.next() => {
if result.is_some() {
debug!(%bootstrap_step, "Kademlia bootstrapping...");
bootstrap_step += 1;
} else {
break;
let bootstrap_command_receiver = Arc::clone(&self.bootstrap_command_receiver);
let mut maybe_bootstrap_command_receiver = bootstrap_command_receiver.lock().await;
if let Some(bootstrap_command_receiver) = maybe_bootstrap_command_receiver.as_mut() {
debug!("Bootstrap started.");

let mut bootstrap_step = 0;
loop {
futures::select! {
swarm_event = self.swarm.next() => {
if let Some(swarm_event) = swarm_event {
self.register_event_metrics(&swarm_event);
self.handle_swarm_event(swarm_event).await;
} else {
break;
}
},
result = bootstrap_command_receiver.next() => {
if result.is_some() {
debug!(%bootstrap_step, "Kademlia bootstrapping...");
bootstrap_step += 1;
} else {
break;
}
}
}
}
}

debug!("Bootstrap finished.");
debug!("Bootstrap finished.");
maybe_bootstrap_command_receiver.take();
}
}

/// Handles periodical tasks.
Expand Down Expand Up @@ -853,38 +852,20 @@ where
) || cancelled;
}
Err(error) => {
debug!(?error, "Bootstrap query failed.",);

self.set_bootstrap_finished(false);
debug!(?error, "Bootstrap query failed.");
}
}
}

if last || cancelled {
// There will be no more progress
self.query_id_receivers.remove(&id);

if last {
self.set_bootstrap_finished(true);
}

if cancelled {
self.set_bootstrap_finished(false);
}
}
}
_ => {}
}
}

fn set_bootstrap_finished(&mut self, success: bool) {
if let Some(shared) = self.shared_weak.upgrade() {
shared.bootstrap_finished.store(true, Ordering::SeqCst);

debug!(%success, "Bootstrap finished.",);
}
}

// Returns `true` if query was cancelled
fn unbounded_send_and_cancel_on_error<T>(
kademlia: &mut Kademlia<ProviderOnlyRecordStore<ProviderStorage>>,
Expand Down Expand Up @@ -1264,8 +1245,6 @@ where
debug!(?err, "Bootstrap error.");

let _ = result_sender.close().await;

self.set_bootstrap_finished(false);
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions crates/subspace-networking/src/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use libp2p::kad::record::Key;
use libp2p::kad::PeerRecord;
use libp2p::{Multiaddr, PeerId};
use parking_lot::Mutex;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

#[derive(Debug)]
Expand Down Expand Up @@ -126,7 +126,6 @@ pub(crate) struct Shared {
pub(crate) command_sender: mpsc::Sender<Command>,
pub(crate) kademlia_tasks_semaphore: ResizableSemaphore,
pub(crate) regular_tasks_semaphore: ResizableSemaphore,
pub(crate) bootstrap_finished: AtomicBool,
}

impl Shared {
Expand All @@ -145,7 +144,6 @@ impl Shared {
command_sender,
kademlia_tasks_semaphore,
regular_tasks_semaphore,
bootstrap_finished: AtomicBool::new(false),
}
}
}

0 comments on commit e8bf334

Please sign in to comment.