Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change DSN bootstrapping. #1690

Merged
merged 10 commits into from
Jul 26, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ pub(super) fn configure_dsn(
let networking_parameters_registry = {
let known_addresses_db_path = base_path.join("known_addresses_db");

NetworkingParametersManager::new(&known_addresses_db_path, bootstrap_nodes.clone())
.map(|manager| manager.boxed())?
NetworkingParametersManager::new(&known_addresses_db_path).map(|manager| manager.boxed())?
};

let weak_readers_and_pieces = Arc::downgrade(readers_and_pieces);
Expand Down
18 changes: 7 additions & 11 deletions crates/subspace-networking/examples/get-peers-complex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use libp2p::PeerId;
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::Duration;
use subspace_networking::{BootstrappedNetworkingParameters, Config, NetworkingParametersManager};
use subspace_networking::{Config, NetworkingParametersManager, StubNetworkingParametersManager};

#[tokio::main]
async fn main() {
Expand All @@ -23,12 +23,10 @@ async fn main() {
let mut nodes = Vec::with_capacity(TOTAL_NODE_COUNT);
for i in 0..TOTAL_NODE_COUNT {
let config = Config {
networking_parameters_registry: BootstrappedNetworkingParameters::new(
bootstrap_nodes.clone(),
)
.boxed(),
networking_parameters_registry: StubNetworkingParametersManager.boxed(),
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
bootstrap_addresses: bootstrap_nodes.clone(),
..Config::default()
};
let keypair = config.keypair.clone().try_into_ed25519().unwrap();
Expand Down Expand Up @@ -83,12 +81,10 @@ async fn main() {
let config = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
networking_parameters_registry: NetworkingParametersManager::new(
db_path.as_ref(),
bootstrap_nodes,
)
.unwrap()
.boxed(),
networking_parameters_registry: NetworkingParametersManager::new(db_path.as_ref())
.unwrap()
.boxed(),
bootstrap_addresses: bootstrap_nodes,
..Config::default()
};

Expand Down
9 changes: 4 additions & 5 deletions crates/subspace-networking/examples/get-peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use parking_lot::Mutex;
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::{crypto, PieceIndexHash, U256};
use subspace_networking::{BootstrappedNetworkingParameters, Config};
use subspace_networking::{Config, StubNetworkingParametersManager};

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -42,13 +42,12 @@ async fn main() {
let node_1_addr = node_1_address_receiver.await.unwrap();
drop(on_new_listener_handler);

let bootstrap_addresses = vec![node_1_addr.with(Protocol::P2p(node_1.id().into()))];
let config_2 = Config {
networking_parameters_registry: BootstrappedNetworkingParameters::new(vec![
node_1_addr.with(Protocol::P2p(node_1.id().into()))
])
.boxed(),
networking_parameters_registry: StubNetworkingParametersManager.boxed(),
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
bootstrap_addresses,
..Config::default()
};

Expand Down
9 changes: 4 additions & 5 deletions crates/subspace-networking/examples/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use libp2p::multiaddr::Protocol;
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::Duration;
use subspace_networking::{BootstrappedNetworkingParameters, Config};
use subspace_networking::{Config, StubNetworkingParametersManager};

const TOPIC: &str = "Foo";

Expand Down Expand Up @@ -47,13 +47,12 @@ async fn main() {

let mut subscription = node_1.subscribe(Sha256Topic::new(TOPIC)).await.unwrap();

let bootstrap_addresses = vec![node_1_addr.with(Protocol::P2p(node_1.id().into()))];
let config_2 = Config {
networking_parameters_registry: BootstrappedNetworkingParameters::new(vec![
node_1_addr.with(Protocol::P2p(node_1.id().into()))
])
.boxed(),
networking_parameters_registry: StubNetworkingParametersManager.boxed(),
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
bootstrap_addresses,
..Config::default()
};

Expand Down
11 changes: 5 additions & 6 deletions crates/subspace-networking/examples/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use prometheus_client::registry::Registry;
use std::sync::Arc;
use std::time::Duration;
use subspace_networking::{
start_prometheus_metrics_server, BootstrappedNetworkingParameters, Config, GenericRequest,
GenericRequestHandler,
start_prometheus_metrics_server, Config, GenericRequest, GenericRequestHandler,
StubNetworkingParametersManager,
};
use tokio::time::sleep;
use tracing::error;
Expand Down Expand Up @@ -86,16 +86,15 @@ async fn main() {
let node_1_addr = node_1_address_receiver.await.unwrap();
drop(on_new_listener_handler);

let bootstrap_addresses = vec![node_1_addr.with(Protocol::P2p(node_1.id().into()))];
let config_2 = Config {
networking_parameters_registry: BootstrappedNetworkingParameters::new(vec![
node_1_addr.with(Protocol::P2p(node_1.id().into()))
])
.boxed(),
networking_parameters_registry: StubNetworkingParametersManager.boxed(),
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
request_response_protocols: vec![GenericRequestHandler::<ExampleRequest>::create(
|_, _| async { None },
)],
bootstrap_addresses,
..Config::default()
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::utils::{convert_multiaddresses, CollectionBatcher, PeerAddress};
use crate::utils::{CollectionBatcher, PeerAddress};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::future::Fuse;
Expand Down Expand Up @@ -70,40 +70,27 @@ impl Clone for Box<dyn NetworkingParametersRegistry> {
}
}

/// Networking manager implementation with bootstrapped addresses. All other operations muted.
/// Networking manager implementation with NOOP implementation.
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Clone, Default)]
pub struct BootstrappedNetworkingParameters {
bootstrap_addresses: Vec<Multiaddr>,
}

impl BootstrappedNetworkingParameters {
/// Creates a new instance of `BootstrappedNetworkingParameters`.
pub fn new(bootstrap_addresses: Vec<Multiaddr>) -> Self {
Self {
bootstrap_addresses,
}
}
pub struct StubNetworkingParametersManager;

fn bootstrap_addresses(&self) -> Vec<PeerAddress> {
convert_multiaddresses(self.bootstrap_addresses.clone())
}

/// Returns an instance of `BootstrappedNetworkingParameters` as the `Box` reference.
impl StubNetworkingParametersManager {
/// Returns an instance of `StubNetworkingParametersManager` as the `Box` reference.
pub fn boxed(self) -> Box<dyn NetworkingParametersRegistry> {
Box::new(self)
}
}

#[async_trait]
impl NetworkingParametersRegistry for BootstrappedNetworkingParameters {
impl NetworkingParametersRegistry for StubNetworkingParametersManager {
async fn add_known_peer(&mut self, _: PeerId, _: Vec<Multiaddr>) {}

async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec<Multiaddr>) {}

async fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {}

async fn next_known_addresses_batch(&mut self) -> Vec<PeerAddress> {
self.bootstrap_addresses()
Vec::new()
}

async fn run(&mut self) {
Expand Down Expand Up @@ -141,19 +128,14 @@ pub struct NetworkingParametersManager {
column_id: u8,
// Key to persistent parameters
object_id: &'static [u8],
// Bootstrap addresses provided on creation
bootstrap_addresses: Vec<Multiaddr>,
// Provides batching capabilities for the address collection (it stores the last batch index)
collection_batcher: CollectionBatcher<PeerAddress>,
}

impl NetworkingParametersManager {
/// Object constructor. It accepts `NetworkingParametersProvider` implementation as a parameter.
/// On object creation it starts a job for networking parameters cache handling.
pub fn new(
path: &Path,
bootstrap_addresses: Vec<Multiaddr>,
) -> Result<Self, NetworkParametersPersistenceError> {
pub fn new(path: &Path) -> Result<Self, NetworkParametersPersistenceError> {
let mut options = Options::with_columns(path, 1);
// We don't use stats
options.stats = false;
Expand Down Expand Up @@ -184,7 +166,6 @@ impl NetworkingParametersManager {
object_id,
known_peers: cache,
networking_parameters_save_delay: Self::default_delay(),
bootstrap_addresses,
collection_batcher: CollectionBatcher::new(
NonZeroUsize::new(PEERS_ADDRESSES_BATCH_SIZE)
.expect("Manual non-zero initialization failed."),
Expand All @@ -202,12 +183,6 @@ impl NetworkingParametersManager {
.collect()
}

// Returns boostrap addresses from networking parameters initialization.
// It removes p2p-protocol suffix.
fn bootstrap_addresses(&self) -> Vec<PeerAddress> {
convert_multiaddresses(self.bootstrap_addresses.clone())
}

// Helps create a copy of the internal LruCache
fn clone_known_peers(&self) -> LruCache<PeerId, LruCache<Multiaddr, FailureTime>> {
let mut known_peers = LruCache::new(self.known_peers.cap());
Expand Down Expand Up @@ -304,12 +279,7 @@ impl NetworkingParametersRegistry for NetworkingParametersManager {

async fn next_known_addresses_batch(&mut self) -> Vec<PeerAddress> {
// We take cached known addresses and combine them with manually provided bootstrap addresses.
let combined_addresses = self
.known_addresses()
.await
.into_iter()
.chain(self.bootstrap_addresses())
.collect::<Vec<_>>();
let combined_addresses = self.known_addresses().await.into_iter().collect::<Vec<_>>();

trace!(
"Peer addresses batch requested. Total list size: {}",
Expand Down Expand Up @@ -359,7 +329,6 @@ impl NetworkingParametersRegistry for NetworkingParametersManager {
db: self.db.clone(),
column_id: self.column_id,
object_id: self.object_id,
bootstrap_addresses: self.bootstrap_addresses.clone(),
collection_batcher: self.collection_batcher.clone(),
}
.boxed()
Expand Down
9 changes: 4 additions & 5 deletions crates/subspace-networking/src/behavior/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::persistent_parameters::remove_known_peer_addresses_internal;
use crate::behavior::provider_storage::{instant_to_micros, micros_to_instant};
use crate::{BootstrappedNetworkingParameters, Config, GenericRequest, GenericRequestHandler};
use crate::{Config, GenericRequest, GenericRequestHandler, StubNetworkingParametersManager};
use futures::channel::oneshot;
use libp2p::multiaddr::Protocol;
use libp2p::{Multiaddr, PeerId};
Expand Down Expand Up @@ -159,16 +159,15 @@ async fn test_async_handler_works_with_pending_internal_future() {
let node_1_addr = node_1_address_receiver.await.unwrap();
drop(on_new_listener_handler);

let bootstrap_addresses = vec![node_1_addr.with(Protocol::P2p(node_1.id().into()))];
let config_2 = Config {
networking_parameters_registry: BootstrappedNetworkingParameters::new(vec![
node_1_addr.with(Protocol::P2p(node_1.id().into()))
])
.boxed(),
networking_parameters_registry: StubNetworkingParametersManager.boxed(),
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
request_response_protocols: vec![GenericRequestHandler::<ExampleRequest>::create(
|_, _| async { None },
)],
bootstrap_addresses,
..Config::default()
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::path::PathBuf;
use std::sync::Arc;
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::{
peer_id, BootstrappedNetworkingParameters, Config, NetworkingParametersManager,
ParityDbProviderStorage, PeerInfoProvider, VoidProviderStorage,
peer_id, Config, NetworkingParametersManager, ParityDbProviderStorage, PeerInfoProvider,
StubNetworkingParametersManager, VoidProviderStorage,
};
use tracing::{debug, info, warn, Level};
use tracing_subscriber::fmt::Subscriber;
Expand Down Expand Up @@ -161,15 +161,10 @@ async fn main() -> anyhow::Result<()> {
.map(|path| {
let known_addresses_db = path.join("known_addresses_db");

NetworkingParametersManager::new(
&known_addresses_db,
bootstrap_nodes.clone(),
)
.map(|manager| manager.boxed())
NetworkingParametersManager::new(&known_addresses_db)
.map(|manager| manager.boxed())
})
.unwrap_or(Ok(
BootstrappedNetworkingParameters::new(bootstrap_nodes).boxed()
))
.unwrap_or(Ok(StubNetworkingParametersManager.boxed()))
.map_err(|err| anyhow!(err))?
};

Expand All @@ -185,6 +180,8 @@ async fn main() -> anyhow::Result<()> {
// we don't maintain permanent connections with any peer
general_connected_peers_handler: Arc::new(|_| false),
special_connected_peers_handler: Arc::new(|_| false),
bootstrap_addresses: bootstrap_nodes,

..Config::new(
protocol_version.to_string(),
keypair,
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-networking/src/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub(crate) mod temporary_bans;
mod transport;

use crate::behavior::persistent_parameters::{
BootstrappedNetworkingParameters, NetworkingParametersRegistry,
NetworkingParametersRegistry, StubNetworkingParametersManager,
};
use crate::behavior::provider_storage::MemoryProviderStorage;
use crate::behavior::{provider_storage, Behavior, BehaviorConfig};
Expand Down Expand Up @@ -313,7 +313,7 @@ where
provider_storage,
allow_non_global_addresses_in_dht: false,
initial_random_query_interval: Duration::from_secs(1),
networking_parameters_registry: BootstrappedNetworkingParameters::default().boxed(),
networking_parameters_registry: StubNetworkingParametersManager.boxed(),
request_response_protocols: Vec::new(),
yamux_config,
reserved_peers: Vec::new(),
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-networking/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ mod shared;
pub mod utils;

pub use crate::behavior::persistent_parameters::{
BootstrappedNetworkingParameters, NetworkParametersPersistenceError,
NetworkingParametersManager, ParityDbError,
NetworkParametersPersistenceError, NetworkingParametersManager, ParityDbError,
StubNetworkingParametersManager,
};
pub use crate::node::{
GetClosestPeersError, Node, SendRequestError, SubscribeError, TopicSubscription,
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ impl Node {
/// Callback is called when a peer is connected.
pub fn on_connected_peer(&self, callback: HandlerFn<PeerId>) -> HandlerId {
self.shared.handlers.connected_peer.add(callback)
}
}

pub(crate) async fn wait_for_bootstrap(&self) {
loop {
Expand Down
4 changes: 2 additions & 2 deletions crates/subspace-networking/src/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use std::sync::atomic::Ordering;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};
use tokio::time::Sleep;
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, trace, warn};

// Defines a batch size for peer addresses from Kademlia buckets.
const KADEMLIA_PEERS_ADDRESSES_BATCH_SIZE: usize = 20;
Expand Down Expand Up @@ -839,7 +839,7 @@ where
let mut bootstrap_finished = shared.bootstrap_finished.lock();
*bootstrap_finished = true;

info!(%success, "Bootstrap finished.",);
debug!(%success, "Bootstrap finished.",);
}
}

Expand Down
8 changes: 3 additions & 5 deletions crates/subspace-node/src/import_blocks_from_dsn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use sp_core::traits::SpawnEssentialNamed;
use sp_runtime::traits::Block as BlockT;
use std::sync::Arc;
use subspace_networking::libp2p::Multiaddr;
use subspace_networking::{BootstrappedNetworkingParameters, Config, PieceByHashRequestHandler};
use subspace_networking::{Config, PieceByHashRequestHandler, StubNetworkingParametersManager};
use subspace_service::dsn::import_blocks::initial_block_import_from_dsn;

/// The `import-blocks-from-network` command used to import blocks from Subspace Network DSN.
Expand Down Expand Up @@ -61,14 +61,12 @@ impl ImportBlocksFromDsnCmd {
IQ: sc_service::ImportQueue<B> + 'static,
{
let (node, mut node_runner) = subspace_networking::create(Config {
networking_parameters_registry: BootstrappedNetworkingParameters::new(
self.bootstrap_node.clone(),
)
.boxed(),
networking_parameters_registry: StubNetworkingParametersManager.boxed(),
allow_non_global_addresses_in_dht: true,
request_response_protocols: vec![PieceByHashRequestHandler::create(
move |_, _| async { None },
)],
bootstrap_addresses: self.bootstrap_node.clone(),
..Config::default()
})
.map_err(|error| sc_service::Error::Other(error.to_string()))?;
Expand Down
Loading