Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Backport networking changes 2018-09-07 edition
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaka committed Sep 7, 2018
1 parent 8756e64 commit f8ac2bb
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 17 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

25 changes: 15 additions & 10 deletions substrate/network-libp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use std::sync::Arc;
use std::sync::mpsc as sync_mpsc;
use std::thread;
use std::time::{Duration, Instant};
use futures::{future, Future, stream, Stream};
use futures::{future, Future, stream, Stream, select_all};
use futures::sync::{mpsc, oneshot};
use tokio::runtime::current_thread;
use tokio_io::{AsyncRead, AsyncWrite};
Expand Down Expand Up @@ -546,19 +546,24 @@ fn init_thread(
// Start the process of pinging the active nodes on the network.
let periodic = start_periodic_updates(shared.clone(), transport, swarm_controller);

// Merge all the futures into one!
Ok(swarm_events.for_each(|_| Ok(()))
.select(discovery).map_err(|(err, _)| err).and_then(|(_, rest)| rest)
.select(periodic).map_err(|(err, _)| err).and_then(|(_, rest)| rest)
.select(outgoing_connections).map_err(|(err, _)| err).and_then(|(_, rest)| rest)
.select(timeouts).map_err(|(err, _)| err).and_then(|(_, rest)| rest)
.select(close_rx.then(|_| Ok(()))).map(|_| ()).map_err(|(err, _)| err)

let futures: Vec<Box<Future<Item = (), Error = IoError>>> = vec![
Box::new(swarm_events.for_each(|_| Ok(()))),
Box::new(discovery),
Box::new(periodic),
Box::new(outgoing_connections),
Box::new(timeouts),
Box::new(close_rx.map_err(|err| IoError::new(IoErrorKind::Other, err))),
];

Ok(
select_all(futures)
.and_then(move |_| {
debug!(target: "sub-libp2p", "Networking ended ; disconnecting all peers");
shared.network_state.disconnect_all();
Ok(())
}))
})
.map_err(|(r, _, _)| r)
)
}

/// Output of the common transport layer.
Expand Down
8 changes: 4 additions & 4 deletions substrate/network-libp2p/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use parking_lot::Mutex;
use libp2p::{Multiaddr, PeerId};
use serde_json;
use std::{cmp, fs};
use std::io::{Read, Cursor, Error as IoError, ErrorKind as IoErrorKind, Write};
use std::io::{Read, Cursor, Error as IoError, ErrorKind as IoErrorKind, Write, BufReader, BufWriter};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant, SystemTime};

Expand All @@ -46,7 +46,7 @@ const FIRST_CONNECT_FAIL_BACKOFF: Duration = Duration::from_secs(2);
/// Every time we fail to connect to an address, multiply the backoff by this constant.
const FAIL_BACKOFF_MULTIPLIER: u32 = 2;
/// We need a maximum value for the backoff, overwise we risk an overflow.
const MAX_BACKOFF: Duration = Duration::from_secs(60);
const MAX_BACKOFF: Duration = Duration::from_secs(30 * 60);

// TODO: should be merged with the Kademlia k-buckets

Expand Down Expand Up @@ -101,7 +101,7 @@ impl NetTopology {
};

let file = fs::File::create(path)?;
serialize(file, &self.store)
serialize(BufWriter::with_capacity(1024 * 1024, file), &self.store)
}

/// Perform a cleanup pass, removing all obsolete addresses and peers.
Expand Down Expand Up @@ -513,7 +513,7 @@ fn try_load(path: impl AsRef<Path>) -> FnvHashMap<PeerId, PeerInfo> {

let mut first_byte = [0];
let num_read = match file.read(&mut first_byte) {
Ok(f) => f,
Ok(f) => BufReader::with_capacity(1024 * 1024, f),
Err(err) => {
// TODO: DRY
warn!(target: "sub-libp2p", "Failed to read peer storage file: {:?}", err);
Expand Down

0 comments on commit f8ac2bb

Please sign in to comment.