Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change DSN bootstrapping. #1690

Merged
merged 10 commits into from
Jul 26, 2023
16 changes: 16 additions & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::commands::shared::print_disk_farm_info;
use crate::utils::{get_required_plot_space_with_overhead, shutdown_signal};
use crate::{DiskFarm, FarmingArgs};
use anyhow::{anyhow, Context, Result};
use futures::future::pending;
use futures::stream::FuturesUnordered;
use futures::{FutureExt, StreamExt};
use lru::LruCache;
Expand Down Expand Up @@ -351,7 +352,22 @@ where
)?;
let mut networking_fut = Box::pin(networking_fut).fuse();

let bootstrap_fut = Box::pin({
let node = node.clone();

async move {
if let Err(err) = node.bootstrap().await {
warn!(?err, "DSN bootstrap failed.");
}
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved

pending::<()>().await;
}
});

futures::select!(
// Network bootstrapping future
_ = bootstrap_fut.fuse() => {},

// Signal future
_ = signal.fuse() => {},

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub(super) fn configure_dsn(
let networking_parameters_registry = {
let known_addresses_db_path = base_path.join("known_addresses_db");

NetworkingParametersManager::new(&known_addresses_db_path, bootstrap_nodes)
NetworkingParametersManager::new(&known_addresses_db_path, bootstrap_nodes.clone())
.map(|manager| manager.boxed())?
};

Expand Down Expand Up @@ -303,6 +303,7 @@ pub(super) fn configure_dsn(
special_connected_peers_handler: Arc::new(PeerInfo::is_farmer),
// other (non-farmer) connections
general_connected_peers_handler: Arc::new(|peer_info| !PeerInfo::is_farmer(peer_info)),
bootstrap_addresses: bootstrap_nodes,
..default_config
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use anyhow::anyhow;
use bytesize::ByteSize;
use clap::{Parser, ValueHint};
use either::Either;
use futures::future::pending;
use futures::FutureExt;
use libp2p::identity::ed25519::Keypair;
use libp2p::{identity, Multiaddr, PeerId};
use serde::{Deserialize, Serialize};
Expand All @@ -18,7 +20,7 @@ use subspace_networking::{
peer_id, BootstrappedNetworkingParameters, Config, NetworkingParametersManager,
ParityDbProviderStorage, PeerInfoProvider, VoidProviderStorage,
};
use tracing::{debug, info, Level};
use tracing::{debug, info, warn, Level};
use tracing_subscriber::fmt::Subscriber;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
Expand Down Expand Up @@ -206,7 +208,25 @@ async fn main() -> anyhow::Result<()> {
.detach();

info!("Subspace Bootstrap Node started");
node_runner.run().await;
let bootstrap_fut = Box::pin({
let node = node.clone();

async move {
if let Err(err) = node.bootstrap().await {
warn!(?err, "DSN bootstrap failed.");
}

pending::<()>().await;
}
});

futures::select!(
// Network bootstrapping future
_ = bootstrap_fut.fuse() => {},

// Networking runner
_ = node_runner.run().fuse() => {},
);
}
Command::GenerateKeypair { json } => {
let output = KeypairOutput::new(Keypair::generate());
Expand Down
5 changes: 5 additions & 0 deletions crates/subspace-networking/src/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ pub struct Config<ProviderStorage> {
pub general_target_connections: u32,
/// Defines target total (in and out) connection number that should be maintained for special peers.
pub special_target_connections: u32,
/// Addresses to bootstrap Kademlia network
pub bootstrap_addresses: Vec<Multiaddr>,
}

impl<ProviderStorage> fmt::Debug for Config<ProviderStorage> {
Expand Down Expand Up @@ -330,6 +332,7 @@ where
special_connected_peers_handler: Arc::new(|_| false),
general_target_connections: SWARM_TARGET_CONNECTION_NUMBER,
special_target_connections: SWARM_TARGET_CONNECTION_NUMBER,
bootstrap_addresses: Vec::new(),
}
}
}
Expand Down Expand Up @@ -392,6 +395,7 @@ where
special_connected_peers_handler: special_connection_decision_handler,
general_target_connections,
special_target_connections,
bootstrap_addresses,
} = config;
let local_peer_id = peer_id(&keypair);

Expand Down Expand Up @@ -498,6 +502,7 @@ where
protocol_version,
general_connection_decision_handler,
special_connection_decision_handler,
bootstrap_addresses,
});

Ok((node, node_runner))
Expand Down
72 changes: 70 additions & 2 deletions crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use bytes::Bytes;
use event_listener_primitives::HandlerId;
use futures::channel::mpsc::SendError;
use futures::channel::{mpsc, oneshot};
use futures::{SinkExt, Stream};
use futures::{SinkExt, Stream, StreamExt};
use libp2p::core::multihash::Multihash;
use libp2p::gossipsub::{Sha256Topic, SubscriptionError};
use libp2p::kad::record::Key;
Expand All @@ -19,7 +19,9 @@ use std::task::{Context, Poll};
use std::time::Duration;
use thiserror::Error;
use tokio::time::sleep;
use tracing::{error, trace};
use tracing::{debug, error, trace};

const BOOTSTRAP_CHECK_DELAY: Duration = Duration::from_secs(1);

/// Topic subscription, will unsubscribe when last instance is dropped for a particular topic.
#[derive(Debug)]
Expand Down Expand Up @@ -275,6 +277,26 @@ impl From<oneshot::Canceled> for ConnectedPeersError {
}
}

#[derive(Debug, Error)]
pub enum BootstrapError {
/// Failed to send command to the node runner
#[error("Failed to send command to the node runner: {0}")]
SendCommand(#[from] SendError),
/// Node runner was dropped
#[error("Node runner was dropped")]
NodeRunnerDropped,
/// Failed to bootstrap a peer.
#[error("Failed to bootstrap a peer.")]
Bootstrap,
}

impl From<oneshot::Canceled> for BootstrapError {
#[inline]
fn from(oneshot::Canceled: oneshot::Canceled) -> Self {
Self::NodeRunnerDropped
}
}

/// Implementation of a network node on Subspace Network.
#[derive(Debug, Clone)]
#[must_use = "Node doesn't do anything if dropped"]
Expand All @@ -297,6 +319,7 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerRecord>, GetValueError> {
self.wait_for_bootstrap().await;
let permit = self.shared.kademlia_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = mpsc::unbounded();

Expand All @@ -320,6 +343,7 @@ impl Node {
key: Multihash,
value: Vec<u8>,
) -> Result<impl Stream<Item = ()>, PutValueError> {
self.wait_for_bootstrap().await;
let permit = self.shared.kademlia_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = mpsc::unbounded();

Expand All @@ -340,6 +364,7 @@ impl Node {

/// Subcribe to some topic on the DSN.
pub async fn subscribe(&self, topic: Sha256Topic) -> Result<TopicSubscription, SubscribeError> {
self.wait_for_bootstrap().await;
let permit = self.shared.regular_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = oneshot::channel();

Expand Down Expand Up @@ -368,6 +393,7 @@ impl Node {

/// Subcribe a messgo to some topic on the DSN.
pub async fn publish(&self, topic: Sha256Topic, message: Vec<u8>) -> Result<(), PublishError> {
self.wait_for_bootstrap().await;
let _permit = self.shared.regular_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = oneshot::channel();

Expand All @@ -393,6 +419,7 @@ impl Node {
where
Request: GenericRequest,
{
self.wait_for_bootstrap().await;
let _permit = self.shared.regular_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = oneshot::channel();
let command = Command::GenericRequest {
Expand All @@ -414,6 +441,7 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerId>, GetClosestPeersError> {
self.wait_for_bootstrap().await;
let permit = self.shared.kademlia_tasks_semaphore.acquire().await;
trace!(?key, "Starting 'GetClosestPeers' request.");

Expand Down Expand Up @@ -515,6 +543,7 @@ impl Node {
&self,
key: Multihash,
) -> Result<impl Stream<Item = PeerId>, GetProvidersError> {
self.wait_for_bootstrap().await;
let permit = self.shared.kademlia_tasks_semaphore.acquire().await;
let (result_sender, result_receiver) = mpsc::unbounded();

Expand Down Expand Up @@ -598,6 +627,31 @@ impl Node {
.map_err(|_| ConnectedPeersError::ConnectedPeers)
}

/// Bootstraps Kademlia network
pub async fn bootstrap(&self) -> Result<(), BootstrapError> {
let (result_sender, mut result_receiver) = mpsc::unbounded();

debug!("Starting 'bootstrap' request.");

self.shared
.command_sender
.clone()
.send(Command::Bootstrap { result_sender })
.await?;

for step in 0.. {
let result = result_receiver.next().await;

if result.is_some() {
debug!(%step, "Kademlia bootstrapping...");
} else {
break;
}
}

Ok(())
}

/// Callback is called when we receive new [`crate::peer_info::PeerInfo`]
pub fn on_peer_info(&self, callback: HandlerFn<NewPeerInfo>) -> HandlerId {
self.shared.handlers.new_peer_info.add(callback)
Expand All @@ -611,5 +665,19 @@ impl Node {
/// Callback is called when a peer is connected.
pub fn on_connected_peer(&self, callback: HandlerFn<PeerId>) -> HandlerId {
self.shared.handlers.connected_peer.add(callback)
}

pub(crate) async fn wait_for_bootstrap(&self) {
loop {
let was_bootstrapped = self.shared.bootstrap_finished.lock().to_owned();

if was_bootstrapped {
return;
} else {
trace!("Waiting for bootstrap...");

sleep(BOOTSTRAP_CHECK_DELAY).await;
}
}
}
}
Loading