Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change DSN bootstrapping. #1690

Merged
merged 10 commits into from
Jul 26, 2023
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,7 @@ pub(super) fn configure_dsn(
let networking_parameters_registry = {
let known_addresses_db_path = base_path.join("known_addresses_db");

NetworkingParametersManager::new(&known_addresses_db_path, bootstrap_nodes)
.map(|manager| manager.boxed())?
NetworkingParametersManager::new(&known_addresses_db_path).map(|manager| manager.boxed())?
};

let weak_readers_and_pieces = Arc::downgrade(readers_and_pieces);
Expand Down Expand Up @@ -155,7 +154,7 @@ pub(super) fn configure_dsn(
reserved_peers,
listen_on,
allow_non_global_addresses_in_dht: !disable_private_ips,
networking_parameters_registry,
networking_parameters_registry: Some(networking_parameters_registry),
request_response_protocols: vec![
PieceAnnouncementRequestHandler::create({
move |peer_id, req| {
Expand Down Expand Up @@ -305,6 +304,7 @@ pub(super) fn configure_dsn(
special_connected_peers_handler: Arc::new(PeerInfo::is_farmer),
// other (non-farmer) connections
general_connected_peers_handler: Arc::new(|peer_info| !PeerInfo::is_farmer(peer_info)),
bootstrap_addresses: bootstrap_nodes,
..default_config
};

Expand Down
1 change: 1 addition & 0 deletions crates/subspace-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ include = [
]

[dependencies]
async-mutex = "1.4.0"
actix-web = "4.3.1"
anyhow = "1.0.71"
async-trait = "0.1.68"
Expand Down
19 changes: 8 additions & 11 deletions crates/subspace-networking/examples/get-peers-complex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use libp2p::PeerId;
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::Duration;
use subspace_networking::{BootstrappedNetworkingParameters, Config, NetworkingParametersManager};
use subspace_networking::{Config, NetworkingParametersManager};

#[tokio::main]
async fn main() {
Expand All @@ -23,12 +23,9 @@ async fn main() {
let mut nodes = Vec::with_capacity(TOTAL_NODE_COUNT);
for i in 0..TOTAL_NODE_COUNT {
let config = Config {
networking_parameters_registry: BootstrappedNetworkingParameters::new(
bootstrap_nodes.clone(),
)
.boxed(),
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
bootstrap_addresses: bootstrap_nodes.clone(),
..Config::default()
};
let keypair = config.keypair.clone().try_into_ed25519().unwrap();
Expand Down Expand Up @@ -83,12 +80,12 @@ async fn main() {
let config = Config {
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
networking_parameters_registry: NetworkingParametersManager::new(
db_path.as_ref(),
bootstrap_nodes,
)
.unwrap()
.boxed(),
networking_parameters_registry: Some(
NetworkingParametersManager::new(db_path.as_ref())
.unwrap()
.boxed(),
),
bootstrap_addresses: bootstrap_nodes,
..Config::default()
};

Expand Down
8 changes: 3 additions & 5 deletions crates/subspace-networking/examples/get-peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use parking_lot::Mutex;
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::{crypto, PieceIndexHash, U256};
use subspace_networking::{BootstrappedNetworkingParameters, Config};
use subspace_networking::Config;

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -42,13 +42,11 @@ async fn main() {
let node_1_addr = node_1_address_receiver.await.unwrap();
drop(on_new_listener_handler);

let bootstrap_addresses = vec![node_1_addr.with(Protocol::P2p(node_1.id().into()))];
let config_2 = Config {
networking_parameters_registry: BootstrappedNetworkingParameters::new(vec![
node_1_addr.with(Protocol::P2p(node_1.id().into()))
])
.boxed(),
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
bootstrap_addresses,
..Config::default()
};

Expand Down
8 changes: 3 additions & 5 deletions crates/subspace-networking/examples/networking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use libp2p::multiaddr::Protocol;
use parking_lot::Mutex;
use std::sync::Arc;
use std::time::Duration;
use subspace_networking::{BootstrappedNetworkingParameters, Config};
use subspace_networking::Config;

const TOPIC: &str = "Foo";

Expand Down Expand Up @@ -47,13 +47,11 @@ async fn main() {

let mut subscription = node_1.subscribe(Sha256Topic::new(TOPIC)).await.unwrap();

let bootstrap_addresses = vec![node_1_addr.with(Protocol::P2p(node_1.id().into()))];
let config_2 = Config {
networking_parameters_registry: BootstrappedNetworkingParameters::new(vec![
node_1_addr.with(Protocol::P2p(node_1.id().into()))
])
.boxed(),
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
bootstrap_addresses,
..Config::default()
};

Expand Down
9 changes: 3 additions & 6 deletions crates/subspace-networking/examples/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use prometheus_client::registry::Registry;
use std::sync::Arc;
use std::time::Duration;
use subspace_networking::{
start_prometheus_metrics_server, BootstrappedNetworkingParameters, Config, GenericRequest,
GenericRequestHandler,
start_prometheus_metrics_server, Config, GenericRequest, GenericRequestHandler,
};
use tokio::time::sleep;
use tracing::error;
Expand Down Expand Up @@ -86,16 +85,14 @@ async fn main() {
let node_1_addr = node_1_address_receiver.await.unwrap();
drop(on_new_listener_handler);

let bootstrap_addresses = vec![node_1_addr.with(Protocol::P2p(node_1.id().into()))];
let config_2 = Config {
networking_parameters_registry: BootstrappedNetworkingParameters::new(vec![
node_1_addr.with(Protocol::P2p(node_1.id().into()))
])
.boxed(),
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
request_response_protocols: vec![GenericRequestHandler::<ExampleRequest>::create(
|_, _| async { None },
)],
bootstrap_addresses,
..Config::default()
};

Expand Down
55 changes: 12 additions & 43 deletions crates/subspace-networking/src/behavior/persistent_parameters.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::utils::{convert_multiaddresses, CollectionBatcher, PeerAddress};
use crate::utils::{CollectionBatcher, PeerAddress};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use futures::future::Fuse;
Expand Down Expand Up @@ -47,7 +47,7 @@ pub trait NetworkingParametersRegistry: Send + Sync {
async fn remove_known_peer_addresses(&mut self, peer_id: PeerId, addresses: Vec<Multiaddr>);

/// Unregisters associated addresses for peer ID.
async fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId);
fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId);

/// Returns a batch of the combined collection of known addresses from networking parameters DB
/// and boostrap addresses from networking parameters initialization.
Expand All @@ -70,40 +70,27 @@ impl Clone for Box<dyn NetworkingParametersRegistry> {
}
}

/// Networking manager implementation with bootstrapped addresses. All other operations muted.
/// Networking manager implementation with NOOP implementation.
shamil-gadelshin marked this conversation as resolved.
Show resolved Hide resolved
#[derive(Clone, Default)]
pub struct BootstrappedNetworkingParameters {
bootstrap_addresses: Vec<Multiaddr>,
}

impl BootstrappedNetworkingParameters {
/// Creates a new instance of `BootstrappedNetworkingParameters`.
pub fn new(bootstrap_addresses: Vec<Multiaddr>) -> Self {
Self {
bootstrap_addresses,
}
}
pub(crate) struct StubNetworkingParametersManager;

fn bootstrap_addresses(&self) -> Vec<PeerAddress> {
convert_multiaddresses(self.bootstrap_addresses.clone())
}

/// Returns an instance of `BootstrappedNetworkingParameters` as the `Box` reference.
impl StubNetworkingParametersManager {
/// Returns an instance of `StubNetworkingParametersManager` as the `Box` reference.
pub fn boxed(self) -> Box<dyn NetworkingParametersRegistry> {
Box::new(self)
}
}

#[async_trait]
impl NetworkingParametersRegistry for BootstrappedNetworkingParameters {
impl NetworkingParametersRegistry for StubNetworkingParametersManager {
async fn add_known_peer(&mut self, _: PeerId, _: Vec<Multiaddr>) {}

async fn remove_known_peer_addresses(&mut self, _peer_id: PeerId, _addresses: Vec<Multiaddr>) {}

async fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {}
fn remove_all_known_peer_addresses(&mut self, _peer_id: PeerId) {}

async fn next_known_addresses_batch(&mut self) -> Vec<PeerAddress> {
self.bootstrap_addresses()
Vec::new()
}

async fn run(&mut self) {
Expand Down Expand Up @@ -141,19 +128,14 @@ pub struct NetworkingParametersManager {
column_id: u8,
// Key to persistent parameters
object_id: &'static [u8],
// Bootstrap addresses provided on creation
bootstrap_addresses: Vec<Multiaddr>,
// Provides batching capabilities for the address collection (it stores the last batch index)
collection_batcher: CollectionBatcher<PeerAddress>,
}

impl NetworkingParametersManager {
/// Object constructor. It accepts `NetworkingParametersProvider` implementation as a parameter.
/// On object creation it starts a job for networking parameters cache handling.
pub fn new(
path: &Path,
bootstrap_addresses: Vec<Multiaddr>,
) -> Result<Self, NetworkParametersPersistenceError> {
pub fn new(path: &Path) -> Result<Self, NetworkParametersPersistenceError> {
let mut options = Options::with_columns(path, 1);
// We don't use stats
options.stats = false;
Expand Down Expand Up @@ -184,7 +166,6 @@ impl NetworkingParametersManager {
object_id,
known_peers: cache,
networking_parameters_save_delay: Self::default_delay(),
bootstrap_addresses,
collection_batcher: CollectionBatcher::new(
NonZeroUsize::new(PEERS_ADDRESSES_BATCH_SIZE)
.expect("Manual non-zero initialization failed."),
Expand All @@ -202,12 +183,6 @@ impl NetworkingParametersManager {
.collect()
}

// Returns boostrap addresses from networking parameters initialization.
// It removes p2p-protocol suffix.
fn bootstrap_addresses(&self) -> Vec<PeerAddress> {
convert_multiaddresses(self.bootstrap_addresses.clone())
}

// Helps create a copy of the internal LruCache
fn clone_known_peers(&self) -> LruCache<PeerId, LruCache<Multiaddr, FailureTime>> {
let mut known_peers = LruCache::new(self.known_peers.cap());
Expand Down Expand Up @@ -294,7 +269,7 @@ impl NetworkingParametersRegistry for NetworkingParametersManager {
self.cache_need_saving = true;
}

async fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) {
fn remove_all_known_peer_addresses(&mut self, peer_id: PeerId) {
trace!(%peer_id, "Remove all peer addresses from the networking parameters registry");

self.known_peers.pop(&peer_id);
Expand All @@ -304,12 +279,7 @@ impl NetworkingParametersRegistry for NetworkingParametersManager {

async fn next_known_addresses_batch(&mut self) -> Vec<PeerAddress> {
// We take cached known addresses and combine them with manually provided bootstrap addresses.
let combined_addresses = self
.known_addresses()
.await
.into_iter()
.chain(self.bootstrap_addresses())
.collect::<Vec<_>>();
let combined_addresses = self.known_addresses().await.into_iter().collect::<Vec<_>>();

trace!(
"Peer addresses batch requested. Total list size: {}",
Expand Down Expand Up @@ -359,7 +329,6 @@ impl NetworkingParametersRegistry for NetworkingParametersManager {
db: self.db.clone(),
column_id: self.column_id,
object_id: self.object_id,
bootstrap_addresses: self.bootstrap_addresses.clone(),
collection_batcher: self.collection_batcher.clone(),
}
.boxed()
Expand Down
23 changes: 18 additions & 5 deletions crates/subspace-networking/src/behavior/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::persistent_parameters::remove_known_peer_addresses_internal;
use crate::behavior::provider_storage::{instant_to_micros, micros_to_instant};
use crate::{BootstrappedNetworkingParameters, Config, GenericRequest, GenericRequestHandler};
use crate::{Config, GenericRequest, GenericRequestHandler};
use futures::channel::oneshot;
use futures::future::pending;
use libp2p::multiaddr::Protocol;
use libp2p::{Multiaddr, PeerId};
use lru::LruCache;
Expand Down Expand Up @@ -159,21 +160,33 @@ async fn test_async_handler_works_with_pending_internal_future() {
let node_1_addr = node_1_address_receiver.await.unwrap();
drop(on_new_listener_handler);

let bootstrap_addresses = vec![node_1_addr.with(Protocol::P2p(node_1.id().into()))];
let config_2 = Config {
networking_parameters_registry: BootstrappedNetworkingParameters::new(vec![
node_1_addr.with(Protocol::P2p(node_1.id().into()))
])
.boxed(),
listen_on: vec!["/ip4/0.0.0.0/tcp/0".parse().unwrap()],
allow_non_global_addresses_in_dht: true,
request_response_protocols: vec![GenericRequestHandler::<ExampleRequest>::create(
|_, _| async { None },
)],
bootstrap_addresses,
..Config::default()
};

let (node_2, mut node_runner_2) = crate::create(config_2).unwrap();

let bootstrap_fut = Box::pin({
let node = node_2.clone();

async move {
let _ = node.bootstrap().await;

pending::<()>().await;
}
});

tokio::spawn(async move {
bootstrap_fut.await;
});

tokio::spawn(async move {
node_runner_2.run().await;
});
Expand Down
Loading