Skip to content

Commit

Permalink
Merge pull request #1699 from subspace/change_bootstrapping-simplific…
Browse files Browse the repository at this point in the history
…ation

Simplify bootstrapping process
  • Loading branch information
shamil-gadelshin authored Jul 26, 2023
2 parents b7de051 + 76e464a commit e52d6fd
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 73 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/subspace-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ include = [
]

[dependencies]
async-mutex = "1.4.0"
actix-web = "4.3.1"
anyhow = "1.0.71"
async-trait = "0.1.68"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub trait NetworkingParametersRegistry: Send + Sync {
async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);

/// Unregisters associated addresses for peer ID.
async fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId);
fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId);

/// Returns a batch of the combined collection of known addresses from networking parameters DB
/// and boostrap addresses from networking parameters initialization.
Expand Down Expand Up @@ -87,7 +87,7 @@ impl NetworkingParametersRegistry for StubNetworkingParametersManager {

async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec<Multiaddr>) {}

async fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {}
fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {}

async fn next_known_addresses_batch(&mut self) -> Vec<PeerAddress> {
Vec::new()
Expand Down Expand Up @@ -269,7 +269,7 @@ impl NetworkingParametersRegistry for NetworkingParametersManager {
self.cache_need_saving = true;
}

async fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) {
fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) {
trace!(%peer_id, "Remove all peer addresses from the networking parameters registry");

self.known_peers.pop(&peer_id);
Expand Down
24 changes: 0 additions & 24 deletions crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,13 @@ use libp2p::kad::PeerRecord;
use libp2p::{Multiaddr, PeerId};
use parity_scale_codec::Decode;
use std::pin::Pin;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use thiserror::Error;
use tokio::time::sleep;
use tracing::{debug, error, trace};

const BOOTSTRAP_CHECK_DELAY: Duration = Duration::from_secs(1);

/// Topic subscription, will unsubscribe when last instance is dropped for a particular topic.
#[derive(Debug)]
#[pin_project::pin_project(PinnedDrop)]
Expand Down Expand Up @@ -320,7 +317,6 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerRecord>, GetValueError> {
self.wait_for_bootstrap().await;
let permit = self.shared.kademlia_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = mpsc::unbounded();

Expand All @@ -344,7 +340,6 @@ impl Node {
key: Multihash,
value: Vec<u8>,
) -> Result<impl Stream<Item = ()>, PutValueError> {
self.wait_for_bootstrap().await;
let permit = self.shared.kademlia_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = mpsc::unbounded();

Expand All @@ -365,7 +360,6 @@ impl Node {

/// Subcribe to some topic on the DSN.
pub async fn subscribe(&self, topic: Sha256Topic) -> Result<TopicSubscription, SubscribeError> {
self.wait_for_bootstrap().await;
let permit = self.shared.regular_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = oneshot::channel();

Expand Down Expand Up @@ -394,7 +388,6 @@ impl Node {

/// Subcribe a messgo to some topic on the DSN.
pub async fn publish(&self, topic: Sha256Topic, message: Vec<u8>) -> Result<(), PublishError> {
self.wait_for_bootstrap().await;
let _permit = self.shared.regular_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = oneshot::channel();

Expand All @@ -420,7 +413,6 @@ impl Node {
where
Request: GenericRequest,
{
self.wait_for_bootstrap().await;
let _permit = self.shared.regular_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = oneshot::channel();
let command = Command::GenericRequest {
Expand All @@ -442,7 +434,6 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerId>, GetClosestPeersError> {
self.wait_for_bootstrap().await;
let permit = self.shared.kademlia_tasks_semaphore.acquire().await;
trace!(?key, "Starting 'GetClosestPeers' request.");

Expand Down Expand Up @@ -544,7 +535,6 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerId>, GetProvidersError> {
self.wait_for_bootstrap().await;
let permit = self.shared.kademlia_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = mpsc::unbounded();

Expand Down Expand Up @@ -667,18 +657,4 @@ impl Node {
pub fn on_connected_peer(&self, callback: HandlerFn<PeerId>) -> HandlerId {
self.shared.handlers.connected_peer.add(callback)
}

pub(crate) async fn wait_for_bootstrap(&self) {
loop {
let was_bootstrapped = self.shared.bootstrap_finished.load(Ordering::SeqCst);

if was_bootstrapped {
return;
} else {
trace!("Waiting for bootstrap...");

sleep(BOOTSTRAP_CHECK_DELAY).await;
}
}
}
}
95 changes: 52 additions & 43 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ use crate::shared::{Command, CreatedSubscription, NewPeerInfo, Shared};
use crate::utils::{
convert_multiaddresses, is_global_address_or_dns, PeerAddress, ResizableSemaphorePermit,
};
use async_mutex::Mutex as AsyncMutex;
use bytes::Bytes;
use futures::channel::mpsc;
use futures::future::Fuse;
use futures::{FutureExt, SinkExt, StreamExt};
use futures::{FutureExt, StreamExt};
use libp2p::core::ConnectedPoint;
use libp2p::gossipsub::{Event as GossipsubEvent, TopicHash};
use libp2p::identify::Event as IdentifyEvent;
Expand Down Expand Up @@ -85,6 +86,14 @@ enum QueryResultSender {
},
}

#[derive(Debug, Default)]
enum BootstrapCommandState {
#[default]
NotStarted,
InProgress(mpsc::UnboundedReceiver<()>),
Finished,
}

/// Runner for the Node.
#[must_use = "Node does not function properly unless its runner is driven forward"]
pub struct NodeRunner<ProviderStorage>
Expand Down Expand Up @@ -129,7 +138,7 @@ where
/// Addresses to bootstrap Kademlia network
bootstrap_addresses: Vec<Multiaddr>,
/// Ensures a single bootstrap on run() invocation.
was_bootstrapped: bool,
bootstrap_command_state: Arc<AsyncMutex<BootstrapCommandState>>,
}

// Helper struct for NodeRunner configuration (clippy requirement).
Expand Down Expand Up @@ -196,15 +205,13 @@ where
special_connection_decision_handler,
rng: StdRng::seed_from_u64(KADEMLIA_PEERS_ADDRESSES_BATCH_SIZE as u64), // any seed
bootstrap_addresses,
was_bootstrapped: false,
bootstrap_command_state: Arc::new(AsyncMutex::new(BootstrapCommandState::default())),
}
}

/// Drives the main networking future forward.
pub async fn run(&mut self) {
if !self.was_bootstrapped {
self.bootstrap().await;
}
self.bootstrap().await;

loop {
futures::select! {
Expand All @@ -226,7 +233,7 @@ where
},
command = self.command_receiver.next() => {
if let Some(command) = command {
self.handle_command(command).await;
self.handle_command(command);
} else {
break;
}
Expand All @@ -246,14 +253,38 @@ where

/// Bootstraps Kademlia network
async fn bootstrap(&mut self) {
self.was_bootstrapped = true;
let bootstrap_command_state = Arc::clone(&self.bootstrap_command_state);
let mut bootstrap_command_state = bootstrap_command_state.lock().await;
let bootstrap_command_receiver = match &mut *bootstrap_command_state {
BootstrapCommandState::NotStarted => {
error!("Bootstrap started.");

let (result_sender, mut result_receiver) = mpsc::unbounded();
let (bootstrap_command_sender, bootstrap_command_receiver) = mpsc::unbounded();

debug!("Bootstrap started.");
self.handle_command(Command::Bootstrap {
result_sender: bootstrap_command_sender,
});

*bootstrap_command_state =
BootstrapCommandState::InProgress(bootstrap_command_receiver);
match &mut *bootstrap_command_state {
BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
bootstrap_command_receiver
}
_ => {
unreachable!("Was just set to that exact value");
}
}
}
BootstrapCommandState::InProgress(bootstrap_command_receiver) => {
bootstrap_command_receiver
}
BootstrapCommandState::Finished => {
return;
}
};

self.handle_command(Command::Bootstrap { result_sender })
.await;
debug!("Bootstrap started.");

let mut bootstrap_step = 0;
loop {
Expand All @@ -266,7 +297,7 @@ where
break;
}
},
result = result_receiver.next() => {
result = bootstrap_command_receiver.next() => {
if result.is_some() {
debug!(%bootstrap_step, "Kademlia bootstrapping...");
bootstrap_step += 1;
Expand All @@ -278,6 +309,7 @@ where
}

debug!("Bootstrap finished.");
*bootstrap_command_state = BootstrapCommandState::Finished;
}

/// Handles periodical tasks.
Expand Down Expand Up @@ -559,7 +591,7 @@ where
"Peer has different protocol version. Peer was banned.",
);

self.ban_peer(peer_id).await;
self.ban_peer(peer_id);
}

if info.listen_addrs.len() > 30 {
Expand Down Expand Up @@ -853,38 +885,20 @@ where
) || cancelled;
}
Err(error) => {
debug!(?error, "Bootstrap query failed.",);

self.set_bootstrap_finished(false);
debug!(?error, "Bootstrap query failed.");
}
}
}

if last || cancelled {
// There will be no more progress
self.query_id_receivers.remove(&id);

if last {
self.set_bootstrap_finished(true);
}

if cancelled {
self.set_bootstrap_finished(false);
}
}
}
_ => {}
}
}

fn set_bootstrap_finished(&mut self, success: bool) {
if let Some(shared) = self.shared_weak.upgrade() {
shared.bootstrap_finished.store(true, Ordering::SeqCst);

debug!(%success, "Bootstrap finished.",);
}
}

// Returns `true` if query was cancelled
fn unbounded_send_and_cancel_on_error<T>(
kademlia: &mut Kademlia<ProviderOnlyRecordStore<ProviderStorage>>,
Expand Down Expand Up @@ -982,7 +996,7 @@ where
.add_peers_to_dial(&peers);
}

async fn handle_command(&mut self, command: Command) {
fn handle_command(&mut self, command: Command) {
match command {
Command::GetValue {
key,
Expand Down Expand Up @@ -1234,7 +1248,7 @@ where
);
}
Command::BanPeer { peer_id } => {
self.ban_peer(peer_id).await;
self.ban_peer(peer_id);
}
Command::Dial { address } => {
let _ = self.swarm.dial(address);
Expand All @@ -1244,7 +1258,7 @@ where

let _ = result_sender.send(connected_peers);
}
Command::Bootstrap { mut result_sender } => {
Command::Bootstrap { result_sender } => {
let kademlia = &mut self.swarm.behaviour_mut().kademlia;

for (peer_id, address) in convert_multiaddresses(self.bootstrap_addresses.clone()) {
Expand All @@ -1262,17 +1276,13 @@ where
}
Err(err) => {
debug!(?err, "Bootstrap error.");

let _ = result_sender.close().await;

self.set_bootstrap_finished(false);
}
}
}
}
}

async fn ban_peer(&mut self, peer_id: PeerId) {
fn ban_peer(&mut self, peer_id: PeerId) {
// Remove temporary ban if there is any before creating a permanent one
self.temporary_bans.lock().remove(&peer_id);

Expand All @@ -1281,8 +1291,7 @@ where
self.swarm.behaviour_mut().block_list.block_peer(peer_id);
self.swarm.behaviour_mut().kademlia.remove_peer(&peer_id);
self.networking_parameters_registry
.remove_all_known_peer_addresses(peer_id)
.await;
.remove_all_known_peer_addresses(peer_id);
}

fn register_event_metrics<E: Debug>(&mut self, swarm_event: &SwarmEvent<Event, E>) {
Expand Down
Loading

0 comments on commit e52d6fd

Please sign in to comment.