-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Switch from devp2p to libp2p #268
Changes from 17 commits
a94e7e7
a514ed3
1cfc18e
0e30b5d
536edd6
21ab784
fb17f77
329b7dc
abe756c
1131498
d1d38d7
2277a5f
224f6b1
77092a3
18ee06f
4ad5da7
fdc5fbf
9e06c31
5e74817
293cbdf
95c79fc
e8f743c
61e3e57
533f53d
fc5a342
e5390ab
2316ebb
ce3b819
b9b54b3
dfa92bc
e9757a4
6731f05
06b5498
ab89ef9
1a942a2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
[package] | ||
description = "libp2p implementation of the ethcore network library" | ||
homepage = "http://parity.io" | ||
license = "GPL-3.0" | ||
name = "substrate-network-libp2p" | ||
version = "0.1.0" | ||
authors = ["Parity Technologies <[email protected]>"] | ||
|
||
[dependencies] | ||
bytes = "0.4" | ||
fnv = "1.0" | ||
futures = "0.1" | ||
libp2p = { git = "https://github.com/tomaka/libp2p-rs", rev = "77b1c445807e53b8c5e4e5e2da751222da15b8cc", default-features = false, features = ["libp2p-secio", "libp2p-secio-secp256k1"] } | ||
ethcore-network = { git = "https://github.com/paritytech/parity.git" } | ||
ethkey = { git = "https://github.com/paritytech/parity.git" } | ||
parking_lot = "0.5" | ||
log = "0.3" | ||
rand = "0.5.0" | ||
tokio-core = "0.1" | ||
tokio-io = "0.1" | ||
tokio-timer = "0.2" | ||
varint = { git = "https://github.com/libp2p/rust-libp2p" } | ||
|
||
[dev-dependencies] | ||
ethcore-bytes = { git = "https://github.com/paritytech/parity.git" } | ||
ethcore-io = { git = "https://github.com/paritytech/parity.git" } | ||
ethcore-logger = { git = "https://github.com/paritytech/parity.git" } |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,270 @@ | ||
// Copyright 2018 Parity Technologies (UK) Ltd. | ||
// This file is part of Polkadot. | ||
|
||
// Polkadot is free software: you can redistribute it and/or modify | ||
// it under the terms of the GNU General Public License as published by | ||
// the Free Software Foundation, either version 3 of the License, or | ||
// (at your option) any later version. | ||
|
||
// Polkadot is distributed in the hope that it will be useful, | ||
// but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
// GNU General Public License for more details. | ||
|
||
// You should have received a copy of the GNU General Public License | ||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.? | ||
|
||
use bytes::{Bytes, BytesMut}; | ||
use network::ProtocolId; | ||
use libp2p::core::{Multiaddr, ConnectionUpgrade, Endpoint}; | ||
use network::PacketId; | ||
use std::io::Error as IoError; | ||
use std::vec::IntoIter as VecIntoIter; | ||
use futures::{future, Future, stream, Stream, Sink}; | ||
use futures::sync::mpsc; | ||
use tokio_io::{AsyncRead, AsyncWrite}; | ||
use varint::VarintCodec; | ||
|
||
/// Connection upgrade for a single protocol. | ||
/// | ||
/// Note that "a single protocol" here refers to `par` for example. However each protocol can | ||
/// have multiple different versions for networking purposes. | ||
#[derive(Clone)] | ||
pub struct RegisteredProtocol<T> { | ||
// Id of the protocol for API purposes. | ||
id: ProtocolId, | ||
// Base name of the protocol as advertised on the network. | ||
// Ends with `/` so that we can append a version number behind. | ||
base_name: Bytes, | ||
// List of protocol versions that we support, plus their packet count. Ordered in descending | ||
// order so that the best comes first. | ||
// The packet count is used to filter out invalid messages. | ||
supported_versions: Vec<(u8, u8)>, | ||
// Custom data. | ||
custom_data: T, | ||
} | ||
|
||
/// Output of a `RegisteredProtocol` upgrade. | ||
pub struct RegisteredProtocolOutput<T> { | ||
/// Data passed to `RegisteredProtocol::new`. | ||
pub custom_data: T, | ||
|
||
/// Id of the protocol. | ||
pub protocol_id: ProtocolId, | ||
|
||
/// Version of the protocol that was negotiated. | ||
pub protocol_version: u8, | ||
|
||
/// Channel to sender outgoing messages to. Closing this channel closes the connection. | ||
// TODO: consider assembling packet_id here | ||
pub outgoing: mpsc::UnboundedSender<Bytes>, | ||
|
||
/// Stream where incoming messages are received. The stream ends whenever either side is | ||
/// closed. | ||
pub incoming: Box<Stream<Item = (PacketId, Bytes), Error = IoError>>, | ||
} | ||
|
||
impl<T> RegisteredProtocol<T> { | ||
/// Creates a new `RegisteredProtocol`. The `custom_data` parameter will be passed inside the | ||
/// `RegisteredProtocolOutput`. | ||
pub fn new(custom_data: T, protocol: ProtocolId, versions: &[(u8, u8)]) -> Self { | ||
let mut proto_name = Bytes::from_static(b"/substrate/"); | ||
proto_name.extend_from_slice(&protocol); | ||
proto_name.extend_from_slice(b"/"); | ||
|
||
RegisteredProtocol { | ||
base_name: proto_name, | ||
id: protocol, | ||
supported_versions: { | ||
let mut tmp: Vec<_> = versions.iter().rev().cloned().collect(); | ||
tmp.sort_unstable_by(|a, b| b.1.cmp(&a.1)); | ||
tmp | ||
}, | ||
custom_data: custom_data, | ||
} | ||
} | ||
|
||
/// Returns the ID of the protocol. | ||
pub fn id(&self) -> ProtocolId { | ||
self.id | ||
} | ||
|
||
/// Returns the custom data that was passed to `new`. | ||
pub fn custom_data(&self) -> &T { | ||
&self.custom_data | ||
} | ||
} | ||
|
||
impl<T, C, Maf> ConnectionUpgrade<C, Maf> for RegisteredProtocol<T> | ||
where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/ | ||
Maf: Future<Item = Multiaddr, Error = IoError> + 'static, // TODO: 'static :( | ||
{ | ||
type NamesIter = VecIntoIter<(Bytes, Self::UpgradeIdentifier)>; | ||
type UpgradeIdentifier = u8; // Protocol version | ||
|
||
#[inline] | ||
fn protocol_names(&self) -> Self::NamesIter { | ||
// Report each version as an individual protocol. | ||
self.supported_versions.iter().map(|&(ver, _)| { | ||
let num = ver.to_string(); | ||
let mut name = self.base_name.clone(); | ||
name.extend_from_slice(num.as_bytes()); | ||
(name, ver) | ||
}).collect::<Vec<_>>().into_iter() | ||
} | ||
|
||
type Output = RegisteredProtocolOutput<T>; | ||
type MultiaddrFuture = Maf; | ||
type Future = future::FutureResult<(Self::Output, Self::MultiaddrFuture), IoError>; | ||
|
||
fn upgrade(self, socket: C, protocol_version: Self::UpgradeIdentifier, _: Endpoint, | ||
remote_addr: Maf) -> Self::Future | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. only single indent for run-on lines. |
||
{ | ||
let packet_count = self.supported_versions | ||
.iter() | ||
.find(|&(v, _)| *v == protocol_version) | ||
.expect("negotiated protocol version that wasn't advertised ; programmer error") | ||
.1; | ||
|
||
// This function is called whenever we successfully negotiated a protocol with a | ||
// remote (both if initiated by us or by the remote) | ||
|
||
// This channel is used to send outgoing packets to the custom_data for this open | ||
// substream. | ||
let (msg_tx, msg_rx) = mpsc::unbounded(); | ||
|
||
// Build the sink for outgoing network bytes, and the stream for incoming instructions. | ||
// `stream` implements `Stream<Item = Message>`. | ||
enum Message { | ||
// Received data from the network. | ||
RecvSocket(BytesMut), | ||
// Data to send to the network. | ||
// The packet_id must already be inside the `Bytes`. | ||
SendReq(Bytes), | ||
// The socket has been closed. | ||
Finished, | ||
} | ||
|
||
let (sink, stream) = { | ||
let framed = AsyncRead::framed(socket, VarintCodec::default()); | ||
let msg_rx = msg_rx.map(Message::SendReq) | ||
.chain(stream::once(Ok(Message::Finished))) | ||
.map_err(|()| unreachable!()); | ||
let (sink, stream) = framed.split(); | ||
let stream = stream.map(Message::RecvSocket) | ||
.chain(stream::once(Ok(Message::Finished))); | ||
(sink, msg_rx.select(stream)) | ||
}; | ||
|
||
let incoming = stream::unfold((sink, stream, false), move |(sink, stream, finished)| { | ||
if finished { | ||
return None; | ||
} | ||
|
||
Some(stream | ||
.into_future() | ||
.map_err(|(err, _)| err) | ||
.and_then(move |(message, stream)| { | ||
match message { | ||
Some(Message::RecvSocket(mut data)) => { | ||
// The `data` should be prefixed by the packet ID, therefore an | ||
// empty packet is invalid. | ||
if data.is_empty() { | ||
debug!(target: "sub-libp2p", "ignoring incoming packet \ | ||
because it was empty"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. only single indent for run-on lines. |
||
let f = future::ok((None, (sink, stream, false))); | ||
return future::Either::A(f); | ||
} | ||
|
||
let packet_id = data[0]; | ||
let data = data.split_off(1); | ||
|
||
if packet_id >= packet_count { | ||
debug!(target: "sub-libp2p", "ignoring incoming packet \ | ||
because packet_id {} is too large", packet_id); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. only single indent for run-on lines. |
||
let f = future::ok((None, (sink, stream, false))); | ||
future::Either::A(f) | ||
} else { | ||
let out = Some((packet_id, data.freeze())); | ||
let f = future::ok((out, (sink, stream, false))); | ||
future::Either::A(f) | ||
} | ||
}, | ||
|
||
Some(Message::SendReq(data)) => { | ||
let fut = sink.send(data) | ||
.map(move |sink| (None, (sink, stream, false))); | ||
future::Either::B(fut) | ||
}, | ||
|
||
Some(Message::Finished) | None => { | ||
let f = future::ok((None, (sink, stream, true))); | ||
future::Either::A(f) | ||
}, | ||
} | ||
})) | ||
}).filter_map(|v| v); | ||
|
||
let out = RegisteredProtocolOutput { | ||
custom_data: self.custom_data, | ||
protocol_id: self.id, | ||
protocol_version: protocol_version, | ||
outgoing: msg_tx, | ||
incoming: Box::new(incoming), | ||
}; | ||
|
||
future::ok((out, remote_addr)) | ||
} | ||
} | ||
|
||
// Connection upgrade for all the protocols contained in it. | ||
#[derive(Clone)] | ||
pub struct RegisteredProtocols<T>(pub Vec<RegisteredProtocol<T>>); | ||
|
||
impl<T> RegisteredProtocols<T> { | ||
/// Finds a protocol in the list by its id. | ||
pub fn find_protocol(&self, protocol: ProtocolId) -> Option<&RegisteredProtocol<T>> { | ||
self.0.iter().find(|p| p.id == protocol) | ||
} | ||
|
||
/// Returns true if the given protocol is in the list. | ||
pub fn has_protocol(&self, protocol: ProtocolId) -> bool { | ||
self.0.iter().any(|p| p.id == protocol) | ||
} | ||
} | ||
|
||
impl<T> Default for RegisteredProtocols<T> { | ||
fn default() -> Self { | ||
RegisteredProtocols(Vec::new()) | ||
} | ||
} | ||
|
||
impl<T, C, Maf> ConnectionUpgrade<C, Maf> for RegisteredProtocols<T> | ||
where C: AsyncRead + AsyncWrite + 'static, // TODO: 'static :-/ | ||
Maf: Future<Item = Multiaddr, Error = IoError> + 'static, // TODO: 'static :( | ||
{ | ||
type NamesIter = VecIntoIter<(Bytes, Self::UpgradeIdentifier)>; | ||
type UpgradeIdentifier = (usize, <RegisteredProtocol<T> as ConnectionUpgrade<C, Maf>>::UpgradeIdentifier); | ||
|
||
fn protocol_names(&self) -> Self::NamesIter { | ||
// We concat the lists of `RegisteredProtocol::protocol_names` for each protocol. | ||
self.0.iter().enumerate().flat_map(|(n, proto)| { | ||
ConnectionUpgrade::<C, Maf>::protocol_names(proto) | ||
.map(move |(name, id)| (name, (n, id))) | ||
}).collect::<Vec<_>>().into_iter() | ||
} | ||
|
||
type Output = <RegisteredProtocol<T> as ConnectionUpgrade<C, Maf>>::Output; | ||
type MultiaddrFuture = <RegisteredProtocol<T> as ConnectionUpgrade<C, Maf>>::MultiaddrFuture; | ||
type Future = <RegisteredProtocol<T> as ConnectionUpgrade<C, Maf>>::Future; | ||
|
||
#[inline] | ||
fn upgrade(self, socket: C, (protocol_index, inner_proto_id): Self::UpgradeIdentifier, | ||
endpoint: Endpoint, remote_addr: Maf) -> Self::Future | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. only single indent for run-on lines. |
||
{ | ||
self.0.into_iter() | ||
.nth(protocol_index) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. spaces! |
||
.expect("invalid protocol index ; programmer logic error") | ||
.upgrade(socket, inner_proto_id, endpoint, remote_addr) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
// Copyright 2018 Parity Technologies (UK) Ltd. | ||
// This file is part of Polkadot. | ||
|
||
// Polkadot is free software: you can redistribute it and/or modify | ||
// it under the terms of the GNU General Public License as published by | ||
// the Free Software Foundation, either version 3 of the License, or | ||
// (at your option) any later version. | ||
|
||
// Polkadot is distributed in the hope that it will be useful, | ||
// but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
// GNU General Public License for more details. | ||
|
||
// You should have received a copy of the GNU General Public License | ||
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.? | ||
|
||
#![type_length_limit = "268435456"] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. long type! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why doesn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added some |
||
|
||
extern crate parking_lot; | ||
extern crate fnv; | ||
extern crate futures; | ||
extern crate tokio_core; | ||
extern crate tokio_io; | ||
extern crate tokio_timer; | ||
extern crate ethkey; | ||
extern crate ethcore_network as network; | ||
extern crate libp2p; | ||
extern crate rand; | ||
extern crate bytes; | ||
extern crate varint; | ||
|
||
#[macro_use] | ||
extern crate log; | ||
|
||
mod custom_proto; | ||
mod network_state; | ||
mod service; | ||
mod timeouts; | ||
mod transport; | ||
|
||
pub use service::NetworkService; | ||
|
||
/// Check if node url is valid | ||
pub fn validate_node_url(url: &str) -> Result<(), network::Error> { | ||
match url.parse::<libp2p::multiaddr::Multiaddr>() { | ||
Ok(_) => Ok(()), | ||
Err(_) => Err(network::ErrorKind::InvalidNodeId.into()), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be nicer is these were doc comments
///
... then editors would format nicely.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fields are private, so I'd say no.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason you don't think private APIs are deserving of an extra
/
, and therefore better editor support?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tomaka ^^^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I generally see
//
as "internal stuff" and///
as "public stuff", but I made the change.