Skip to content

Commit

Permalink
Update libp2p to v0.3 (paritytech#1634)
Browse files Browse the repository at this point in the history
* Update libp2p

* Some more diagnostics

* 30 seconds back to 5 seconds

* Bump libp2p-core and improve test

* Fix runtime Cargo.lock

* More work

* Finish upgrade to libp2p 0.3

* Add a maximum of 60 seconds for the rounds

* Remove env_logger

* Update Cargo.lock

* Update Cargo.lock in test-runtime

* Fix test compilation

* Make the test pass

* Add identify addresses to Kademlia

* Don't connect to nodes we're already connected to

* Add warning for non-Substrate nodes

* Fix external address not added

* Start in Enabled mode
  • Loading branch information
tomaka authored and MTDK1 committed Apr 12, 2019
1 parent 48e5529 commit 823de48
Show file tree
Hide file tree
Showing 18 changed files with 599 additions and 411 deletions.
384 changes: 228 additions & 156 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion core/keystore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2018"

[dependencies]
substrate-primitives = { path = "../primitives" }
crypto = { package = "parity-crypto", version = "0.2", default-features = false }
crypto = { package = "parity-crypto", version = "0.3", default-features = false }
error-chain = "0.12"
hex = "0.3"
rand = "0.6"
Expand Down
27 changes: 20 additions & 7 deletions core/keystore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::collections::HashMap;
use std::path::PathBuf;
use std::fs::{self, File};
use std::io::{self, Write};
use std::num::NonZeroU32;

use serde_derive::{Serialize, Deserialize};
use error_chain::{error_chain, error_chain_processing, impl_error_chain_processed,
Expand Down Expand Up @@ -60,11 +61,11 @@ struct EncryptedKey {
salt: [u8; 32],
ciphertext: Vec<u8>, // FIXME: switch to fixed-size when serde supports
iv: [u8; 16],
iterations: u32,
iterations: NonZeroU32,
}

impl EncryptedKey {
fn encrypt(plain: &[u8; PKCS_LEN], password: &str, iterations: u32) -> Self {
fn encrypt(plain: &[u8; PKCS_LEN], password: &str, iterations: NonZeroU32) -> Self {
use rand::{Rng, rngs::OsRng};

let mut rng = OsRng::new().expect("OS Randomness available on all supported platforms; qed");
Expand Down Expand Up @@ -149,7 +150,11 @@ impl Store {
/// Generate a new key, placing it into the store.
pub fn generate(&self, password: &str) -> Result<Pair> {
let (pair, pkcs_bytes) = Pair::generate_with_pkcs8();
let key_file = EncryptedKey::encrypt(&pkcs_bytes, password, KEY_ITERATIONS as u32);
let key_file = EncryptedKey::encrypt(
&pkcs_bytes,
password,
NonZeroU32::new(KEY_ITERATIONS as u32).expect("KEY_ITERATIONS is not zero; QED")
);

let mut file = File::create(self.key_file_path(&pair.public()))?;
::serde_json::to_writer(&file, &key_file)?;
Expand Down Expand Up @@ -225,7 +230,7 @@ mod tests {
#[test]
fn encrypt_and_decrypt() {
let plain = [1; PKCS_LEN];
let encrypted_key = EncryptedKey::encrypt(&plain, "thepassword", KEY_ITERATIONS as u32);
let encrypted_key = EncryptedKey::encrypt(&plain, "thepassword", NonZeroU32::new(KEY_ITERATIONS as u32).expect("KEY_ITERATIONS is not zero; QED"));

let decrypted_key = encrypted_key.decrypt("thepassword").unwrap();

Expand All @@ -235,17 +240,25 @@ mod tests {
#[test]
fn decrypt_wrong_password_fails() {
let plain = [1; PKCS_LEN];
let encrypted_key = EncryptedKey::encrypt(&plain, "thepassword", KEY_ITERATIONS as u32);
let encrypted_key = EncryptedKey::encrypt(
&plain,
"thepassword",
NonZeroU32::new(KEY_ITERATIONS as u32).expect("KEY_ITERATIONS is not zero; QED")
);

assert!(encrypted_key.decrypt("thepassword2").is_err());
}

#[test]
fn decrypt_wrong_iterations_fails() {
let plain = [1; PKCS_LEN];
let mut encrypted_key = EncryptedKey::encrypt(&plain, "thepassword", KEY_ITERATIONS as u32);
let mut encrypted_key = EncryptedKey::encrypt(
&plain,
"thepassword",
NonZeroU32::new(KEY_ITERATIONS as u32).expect("KEY_ITERATIONS is not zero; QED")
);

encrypted_key.iterations -= 64;
encrypted_key.iterations = NonZeroU32::new(encrypted_key.iterations.get() - 64).unwrap();

assert!(encrypted_key.decrypt("thepassword").is_err());
}
Expand Down
2 changes: 1 addition & 1 deletion core/network-libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ bytes = "0.4"
error-chain = { version = "0.12", default-features = false }
fnv = "1.0"
futures = "0.1"
libp2p = { version = "0.2", default-features = false, features = ["secio-rsa", "secio-secp256k1", "libp2p-websocket"] }
libp2p = { version = "0.3.1", default-features = false, features = ["secio-secp256k1", "libp2p-websocket"] }
parking_lot = "0.7.1"
lazy_static = "1.2"
log = "0.4"
Expand Down
84 changes: 64 additions & 20 deletions core/network-libp2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ use crate::{NetworkConfiguration, ProtocolId};
use bytes::Bytes;
use futures::prelude::*;
use libp2p::NetworkBehaviour;
use libp2p::core::{PeerId, ProtocolsHandler};
use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey};
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters};
use libp2p::identify::{Identify, IdentifyEvent, protocol::IdentifyInfo};
use libp2p::kad::{Kademlia, KademliaOut, KademliaTopology};
use libp2p::kad::{Kademlia, KademliaOut, KadConnectionType};
use libp2p::ping::{Ping, PingEvent};
use log::{debug, trace, warn};
use std::{cmp, io, time::Duration, time::Instant};
Expand Down Expand Up @@ -51,17 +51,20 @@ pub struct Behaviour<TSubstream> {

impl<TSubstream> Behaviour<TSubstream> {
/// Builds a new `Behaviour`.
// TODO: redundancy between config and local_peer_id (https://github.com/libp2p/rust-libp2p/issues/745)
pub fn new(config: &NetworkConfiguration, local_peer_id: PeerId, protocols: RegisteredProtocols) -> Self {
// TODO: redundancy between config and local_public_key (https://github.com/libp2p/rust-libp2p/issues/745)
pub fn new(config: &NetworkConfiguration, local_public_key: PublicKey, protocols: RegisteredProtocols) -> Self {
let identify = {
let proto_version = "/substrate/1.0".to_string();
let user_agent = format!("{} ({})", config.client_version, config.node_name);
Identify::new(proto_version, user_agent)
Identify::new(proto_version, user_agent, local_public_key.clone())
};

let local_peer_id = local_public_key.into_peer_id();
let custom_protocols = CustomProtos::new(config, &local_peer_id, protocols);

Behaviour {
ping: Ping::new(),
custom_protocols: CustomProtos::new(config, protocols),
custom_protocols,
discovery: DiscoveryBehaviour::new(local_peer_id),
identify,
events: Vec::new(),
Expand All @@ -79,9 +82,26 @@ impl<TSubstream> Behaviour<TSubstream> {
self.custom_protocols.send_packet(target, protocol_id, data)
}

/// Returns the number of peers in the topology.
pub fn num_topology_peers(&self) -> usize {
self.custom_protocols.num_topology_peers()
}

/// Flushes the topology to the disk.
pub fn flush_topology(&mut self) -> Result<(), io::Error> {
self.custom_protocols.flush_topology()
}

/// Perform a cleanup pass, removing all obsolete addresses and peers.
///
/// This should be done from time to time.
pub fn cleanup(&mut self) {
self.custom_protocols.cleanup();
}

/// Try to add a reserved peer.
pub fn add_reserved_peer(&mut self, peer_id: PeerId) {
self.custom_protocols.add_reserved_peer(peer_id)
pub fn add_reserved_peer(&mut self, peer_id: PeerId, addr: Multiaddr) {
self.custom_protocols.add_reserved_peer(peer_id, addr)
}

/// Try to remove a reserved peer.
Expand Down Expand Up @@ -218,6 +238,20 @@ impl<TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<TSubs
// TODO: ideally we would delay the first identification to when we open the custom
// protocol, so that we only report id info to the service about the nodes we
// care about (https://github.com/libp2p/rust-libp2p/issues/876)
if !info.protocol_version.contains("substrate") {
warn!(target: "sub-libp2p", "Connected to a non-Substrate node: {:?}", info);
}
if info.listen_addrs.is_empty() {
warn!(target: "sub-libp2p", "Received identify response with empty list of \
addresses");
}
for addr in &info.listen_addrs {
self.discovery.kademlia.add_address(&peer_id, addr.clone());
}
self.custom_protocols.add_discovered_addrs(
&peer_id,
info.listen_addrs.iter().map(|addr| (addr.clone(), true))
);
self.events.push(BehaviourOut::Identified { peer_id, info });
}
IdentifyEvent::Error { .. } => {}
Expand All @@ -227,10 +261,13 @@ impl<TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<TSubs

impl<TSubstream> NetworkBehaviourEventProcess<KademliaOut> for Behaviour<TSubstream> {
fn inject_event(&mut self, out: KademliaOut) {
// We only ever use Kademlia for discovering nodes, and nodes discovered by Kademlia are
// automatically added to the topology. Therefore we don't need to perform any further
// action.
match out {
KademliaOut::Discovered { peer_id, addresses, ty } => {
self.custom_protocols.add_discovered_addrs(
&peer_id,
addresses.into_iter().map(|addr| (addr, ty == KadConnectionType::Connected))
);
}
KademliaOut::FindNodeResult { key, closer_peers } => {
trace!(target: "sub-libp2p", "Kademlia query for {:?} yielded {:?} results",
key, closer_peers.len());
Expand Down Expand Up @@ -284,37 +321,44 @@ impl<TSubstream> DiscoveryBehaviour<TSubstream> {
}
}

impl<TSubstream, TTopology> NetworkBehaviour<TTopology> for DiscoveryBehaviour<TSubstream>
impl<TSubstream> NetworkBehaviour for DiscoveryBehaviour<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
TTopology: KademliaTopology,
{
type ProtocolsHandler = <Kademlia<TSubstream> as NetworkBehaviour<TTopology>>::ProtocolsHandler;
type OutEvent = <Kademlia<TSubstream> as NetworkBehaviour<TTopology>>::OutEvent;
type ProtocolsHandler = <Kademlia<TSubstream> as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = <Kademlia<TSubstream> as NetworkBehaviour>::OutEvent;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
NetworkBehaviour::<TTopology>::new_handler(&mut self.kademlia)
NetworkBehaviour::new_handler(&mut self.kademlia)
}

fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
self.kademlia.addresses_of_peer(peer_id)
}

fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
NetworkBehaviour::<TTopology>::inject_connected(&mut self.kademlia, peer_id, endpoint)
NetworkBehaviour::inject_connected(&mut self.kademlia, peer_id, endpoint)
}

fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) {
NetworkBehaviour::<TTopology>::inject_disconnected(&mut self.kademlia, peer_id, endpoint)
NetworkBehaviour::inject_disconnected(&mut self.kademlia, peer_id, endpoint)
}

fn inject_replaced(&mut self, peer_id: PeerId, closed: ConnectedPoint, opened: ConnectedPoint) {
NetworkBehaviour::inject_replaced(&mut self.kademlia, peer_id, closed, opened)
}

fn inject_node_event(
&mut self,
peer_id: PeerId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
NetworkBehaviour::<TTopology>::inject_node_event(&mut self.kademlia, peer_id, event)
NetworkBehaviour::inject_node_event(&mut self.kademlia, peer_id, event)
}

fn poll(
&mut self,
params: &mut PollParameters<TTopology>,
params: &mut PollParameters,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Expand Down
Loading

0 comments on commit 823de48

Please sign in to comment.