Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Switch from devp2p to libp2p #268

Merged
merged 35 commits into from
Jul 15, 2018
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
a94e7e7
Switch from devp2p to libp2p
tomaka May 22, 2018
a514ed3
Move the keys in the network state
tomaka Jun 30, 2018
1cfc18e
Properly load, store or generate private key
tomaka Jun 30, 2018
0e30b5d
Some robustness
tomaka Jun 30, 2018
536edd6
Merge remote-tracking branch 'upstream/master' into tka-libp2p
tomaka Jul 2, 2018
21ab784
Update for latest libp2p
tomaka Jul 2, 2018
fb17f77
Allow secio
tomaka Jul 2, 2018
329b7dc
Don't open a new Kademlia connec all the time
tomaka Jul 2, 2018
abe756c
Handle Kademlia disconnection
tomaka Jul 2, 2018
1131498
Set correct permissions on key file
tomaka Jul 2, 2018
d1d38d7
Improvements to secret key storage
tomaka Jul 2, 2018
2277a5f
Flush the peer store at Kademlia requests
tomaka Jul 2, 2018
224f6b1
Use RAII guards for disconnection
tomaka Jul 2, 2018
77092a3
Some misc work
tomaka Jul 3, 2018
18ee06f
Set informations about peers
tomaka Jul 3, 2018
4ad5da7
Fix tests and external URL
tomaka Jul 3, 2018
fdc5fbf
Merge remote-tracking branch 'upstream/master' into tka-libp2p
tomaka Jul 3, 2018
9e06c31
Fix some style
tomaka Jul 3, 2018
5e74817
Split obtain_private_key into multiple function
tomaka Jul 4, 2018
293cbdf
Split start_kademlia_discovery in multiple functions
tomaka Jul 4, 2018
95c79fc
More style fixes
tomaka Jul 4, 2018
e8f743c
More style fixes
tomaka Jul 6, 2018
61e3e57
Merge remote-tracking branch 'upstream/master' into tka-libp2p
tomaka Jul 6, 2018
533f53d
Fix some concerns
tomaka Jul 6, 2018
fc5a342
Turn // into ///
tomaka Jul 7, 2018
e5390ab
More style fixes
tomaka Jul 8, 2018
2316ebb
Merge remote-tracking branch 'upstream/master' into tka-libp2p
tomaka Jul 10, 2018
ce3b819
More style fixes
tomaka Jul 10, 2018
b9b54b3
Merge remote-tracking branch 'upstream/master' into tka-libp2p
tomaka Jul 11, 2018
dfa92bc
Merge remote-tracking branch 'origin/master' into tka-libp2p
gavofyork Jul 12, 2018
e9757a4
Add annotations to unreachable!
tomaka Jul 13, 2018
6731f05
Fix style again
tomaka Jul 13, 2018
06b5498
Remove commented out code
tomaka Jul 13, 2018
ab89ef9
Fix test year
tomaka Jul 13, 2018
1a942a2
More concerns
tomaka Jul 13, 2018
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
895 changes: 752 additions & 143 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ polkadot --chain=local --validator --key Alice -d /tmp/alice
and in the other, run:

```
polkadot --chain=local --validator --key Bob -d /tmp/bob --port 30334 --bootnodes 'enode://ALICE_BOOTNODE_ID_HERE@127.0.0.1:30333'
polkadot --chain=local --validator --key Bob -d /tmp/bob --port 30334 --bootnodes '/ip4/127.0.0.1/tcp/30333/p2p/ALICE_BOOTNODE_ID_HERE'
```

Ensure you replace `ALICE_BOOTNODE_ID_HERE` with the node ID from the output of
Expand Down
27 changes: 27 additions & 0 deletions substrate/network-libp2p/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
[package]
description = "libp2p implementation of the ethcore network library"
homepage = "http://parity.io"
license = "GPL-3.0"
name = "substrate-network-libp2p"
version = "0.1.0"
authors = ["Parity Technologies <[email protected]>"]

[dependencies]
bytes = "0.4"
fnv = "1.0"
futures = "0.1"
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "77b1c445807e53b8c5e4e5e2da751222da15b8cc", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] }
ethcore-network = { git = "https://github.com/paritytech/parity.git" }
ethkey = { git = "https://github.com/paritytech/parity.git" }
parking_lot = "0.5"
log = "0.3"
rand = "0.5.0"
tokio-core = "0.1"
tokio-io = "0.1"
tokio-timer = "0.2"
varint = { git = "https://github.com/libp2p/rust-libp2p" }

[dev-dependencies]
ethcore-bytes = { git = "https://github.com/paritytech/parity.git" }
ethcore-io = { git = "https://github.com/paritytech/parity.git" }
ethcore-logger = { git = "https://github.com/paritytech/parity.git" }
276 changes: 276 additions & 0 deletions substrate/network-libp2p/src/custom_proto.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,276 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?

use bytes::{Bytes, BytesMut};
use network::ProtocolId;
use libp2p::core::{Multiaddr, ConnectionUpgrade, Endpoint};
use network::PacketId;
use std::io::Error as IoError;
use std::vec::IntoIter as VecIntoIter;
use futures::{future, Future, stream, Stream, Sink};
use futures::sync::mpsc;
use tokio_io::{AsyncRead, AsyncWrite};
use varint::VarintCodec;

/// Connection upgrade for a single protocol.
///
/// Note that "a single protocol" here refers to `par` for example. However
/// each protocol can have multiple different versions for networking purposes.
#[derive(Clone)]
pub struct RegisteredProtocol<T> {
/// Id of the protocol for API purposes.
id: ProtocolId,
/// Base name of the protocol as advertised on the network.
/// Ends with `/` so that we can append a version number behind.
base_name: Bytes,
/// List of protocol versions that we support, plus their packet count.
/// Ordered in descending order so that the best comes first.
/// The packet count is used to filter out invalid messages.
supported_versions: Vec<(u8, u8)>,
/// Custom data.
custom_data: T,
}

/// Output of a `RegisteredProtocol` upgrade.
pub struct RegisteredProtocolOutput<T> {
/// Data passed to `RegisteredProtocol::new`.
pub custom_data: T,

/// Id of the protocol.
pub protocol_id: ProtocolId,

/// Version of the protocol that was negotiated.
pub protocol_version: u8,

/// Channel to sender outgoing messages to. Closing this channel closes the
/// connection.
// TODO: consider assembling packet_id here
pub outgoing: mpsc::UnboundedSender<Bytes>,

/// Stream where incoming messages are received. The stream ends whenever
/// either side is closed.
pub incoming: Box<Stream<Item = (PacketId, Bytes), Error = IoError>>,
}

impl<T> RegisteredProtocol<T> {
/// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be
/// passed inside the `RegisteredProtocolOutput`.
pub fn new(custom_data: T, protocol: ProtocolId, versions: &[(u8, u8)])
-> Self {
let mut proto_name = Bytes::from_static(b"/substrate/");
proto_name.extend_from_slice(&protocol);
proto_name.extend_from_slice(b"/");

RegisteredProtocol {
base_name: proto_name,
id: protocol,
supported_versions: {
let mut tmp: Vec<_> = versions.iter().rev().cloned().collect();
tmp.sort_unstable_by(|a, b| b.1.cmp(&a.1));
tmp
},
custom_data: custom_data,
}
}

/// Returns the ID of the protocol.
pub fn id(&self) -> ProtocolId {
self.id
}

/// Returns the custom data that was passed to `new`.
pub fn custom_data(&self) -> &T {
&self.custom_data
}
}

impl<T, C, Maf> ConnectionUpgrade<C, Maf> for RegisteredProtocol<T>
where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
Maf: Future<Item = Multiaddr, Error = IoError> + 'static, // TODO: 'static :(
{
type NamesIter = VecIntoIter<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = u8; // Protocol version

#[inline]
fn protocol_names(&self) -> Self::NamesIter {
// Report each version as an individual protocol.
self.supported_versions.iter().map(|&(ver, _)| {
let num = ver.to_string();
let mut name = self.base_name.clone();
name.extend_from_slice(num.as_bytes());
(name, ver)
}).collect::<Vec<_>>().into_iter()
}

type Output = RegisteredProtocolOutput<T>;
type MultiaddrFuture = Maf;
type Future = future::FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>;

fn upgrade(self, socket: C, protocol_version: Self::UpgradeIdentifier,
_: Endpoint, remote_addr: Maf) -> Self::Future {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: all-or-nothing

let packet_count = self.supported_versions
.iter()
.find(|&(v, _)| *v == protocol_version)
.expect("negotiated protocol version that wasn't advertised ; \
programmer error")
.1;

// This function is called whenever we successfully negotiated a
// protocol with a remote (both if initiated by us or by the remote)

// This channel is used to send outgoing packets to the custom_data
// for this open substream.
let (msg_tx, msg_rx) = mpsc::unbounded();

// Build the sink for outgoing network bytes, and the stream for
// incoming instructions. `stream` implements `Stream<Item = Message>`.
enum Message {
/// Received data from the network.
RecvSocket(BytesMut),
/// Data to send to the network.
/// The packet_id must already be inside the `Bytes`.
SendReq(Bytes),
/// The socket has been closed.
Finished,
}

let (sink, stream) = {
let framed = AsyncRead::framed(socket, VarintCodec::default());
let msg_rx = msg_rx.map(Message::SendReq)
.chain(stream::once(Ok(Message::Finished)))
.map_err(|()| unreachable!());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assumptions around this kind of thing can change quickly. Usually I would use an explicit panic or unreachable with message to indicate why it's impossible. then the reader can check it while reading.

let (sink, stream) = framed.split();
let stream = stream.map(Message::RecvSocket)
.chain(stream::once(Ok(Message::Finished)));
(sink, msg_rx.select(stream))
};

let incoming = stream::unfold((sink, stream, false), move |(sink, stream, finished)| {
if finished {
return None
}

Some(stream
.into_future()
.map_err(|(err, _)| err)
.and_then(move |(message, stream)|
match message {
Some(Message::RecvSocket(mut data)) => {
// The `data` should be prefixed by the packet ID,
// therefore an empty packet is invalid.
if data.is_empty() {
debug!(target: "sub-libp2p", "ignoring incoming \
packet because it was empty");
let f = future::ok((None, (sink, stream, false)));
return future::Either::A(f)
}

let packet_id = data[0];
let data = data.split_off(1);

if packet_id >= packet_count {
debug!(target: "sub-libp2p", "ignoring incoming packet \
because packet_id {} is too large", packet_id);
let f = future::ok((None, (sink, stream, false)));
future::Either::A(f)
} else {
let out = Some((packet_id, data.freeze()));
let f = future::ok((out, (sink, stream, false)));
future::Either::A(f)
}
},

Some(Message::SendReq(data)) => {
let fut = sink.send(data)
.map(move |sink| (None, (sink, stream, false)));
future::Either::B(fut)
},

Some(Message::Finished) | None => {
let f = future::ok((None, (sink, stream, true)));
future::Either::A(f)
},
}
))
}).filter_map(|v| v);

let out = RegisteredProtocolOutput {
custom_data: self.custom_data,
protocol_id: self.id,
protocol_version: protocol_version,
outgoing: msg_tx,
incoming: Box::new(incoming),
};

future::ok((out, remote_addr))
}
}

// Connection upgrade for all the protocols contained in it.
#[derive(Clone)]
pub struct RegisteredProtocols<T>(pub Vec<RegisteredProtocol<T>>);

impl<T> RegisteredProtocols<T> {
/// Finds a protocol in the list by its id.
pub fn find_protocol(&self, protocol: ProtocolId)
-> Option<&RegisteredProtocol<T>> {
self.0.iter().find(|p| p.id == protocol)
}

/// Returns true if the given protocol is in the list.
pub fn has_protocol(&self, protocol: ProtocolId) -> bool {
self.0.iter().any(|p| p.id == protocol)
}
}

impl<T> Default for RegisteredProtocols<T> {
fn default() -> Self {
RegisteredProtocols(Vec::new())
}
}

impl<T, C, Maf> ConnectionUpgrade<C, Maf> for RegisteredProtocols<T>
where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/
Maf: Future<Item = Multiaddr, Error = IoError> + 'static, // TODO: 'static :(
{
type NamesIter = VecIntoIter<(Bytes, Self::UpgradeIdentifier)>;
type UpgradeIdentifier = (usize,
<RegisteredProtocol<T> as ConnectionUpgrade<C, Maf>>::UpgradeIdentifier);

fn protocol_names(&self) -> Self::NamesIter {
// We concat the lists of `RegisteredProtocol::protocol_names` for
// each protocol.
self.0.iter().enumerate().flat_map(|(n, proto)|
ConnectionUpgrade::<C, Maf>::protocol_names(proto)
.map(move |(name, id)| (name, (n, id)))
).collect::<Vec<_>>().into_iter()
}

type Output = <RegisteredProtocol<T> as ConnectionUpgrade<C, Maf>>::Output;
type MultiaddrFuture = <RegisteredProtocol<T> as
ConnectionUpgrade<C, Maf>>::MultiaddrFuture;
type Future = <RegisteredProtocol<T> as ConnectionUpgrade<C, Maf>>::Future;

#[inline]
fn upgrade(self, socket: C, upgrade_identifier: Self::UpgradeIdentifier,
endpoint: Endpoint, remote_addr: Maf) -> Self::Future {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: all-or-nothing

let (protocol_index, inner_proto_id) = upgrade_identifier;
self.0.into_iter()
.nth(protocol_index)
.expect("invalid protocol index ; programmer logic error")
.upgrade(socket, inner_proto_id, endpoint, remote_addr)
}
}
49 changes: 49 additions & 0 deletions substrate/network-libp2p/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
// This file is part of Polkadot.

// Polkadot is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// Polkadot is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.?

#![type_length_limit = "268435456"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long type!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why doesn't impl Trait just assign unique numbers or something so we don't have to deal with this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added some Boxes after I realized that the compile time was starting to be really long, so the limit might not need to be that high. However I don't see a reason to lower it either.


extern crate parking_lot;
extern crate fnv;
extern crate futures;
extern crate tokio_core;
extern crate tokio_io;
extern crate tokio_timer;
extern crate ethkey;
extern crate ethcore_network as network;
extern crate libp2p;
extern crate rand;
extern crate bytes;
extern crate varint;

#[macro_use]
extern crate log;

mod custom_proto;
mod network_state;
mod service;
mod timeouts;
mod transport;

pub use service::NetworkService;

/// Check if node url is valid
pub fn validate_node_url(url: &str) -> Result<(), network::Error> {
match url.parse::<libp2p::multiaddr::Multiaddr>() {
Ok(_) => Ok(()),
Err(_) => Err(network::ErrorKind::InvalidNodeId.into()),
}
}
Loading