Skip to content

Commit

Permalink
Fix style in paritytech#364
Browse files Browse the repository at this point in the history
  • Loading branch information
tomaka committed Jul 18, 2018
1 parent 93a39a9 commit a058a5f
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 8 deletions.
246 changes: 246 additions & 0 deletions substrate/network-libp2p/src/peer_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
// Copyright 2018 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! Implementation of the `Peerstore` trait that uses a single JSON file as backend.
use super::TTL;
use bs58;
use datastore::{Datastore, JsonFileDatastore, JsonFileDatastoreEntry, Query};
use futures::{Future, Stream};
use multiaddr::Multiaddr;
use peer_info::{AddAddrBehaviour, PeerInfo};
use peerstore::{PeerAccess, Peerstore};
use std::io::Error as IoError;
use std::iter;
use std::path::PathBuf;
use std::vec::IntoIter as VecIntoIter;
use PeerId;

/// Peerstore backend that uses a Json file.
pub struct JsonPeerstore {
store: JsonFileDatastore<PeerInfo>,
}

impl JsonPeerstore {
/// Opens a new peerstore tied to a JSON file at the given path.
///
/// If the file exists, this function will open it. In any case, flushing the peerstore or
/// destroying it will write to the file.
#[inline]
pub fn new<P>(path: P) -> Result<JsonPeerstore, IoError>
where
P: Into<PathBuf>,
{
Ok(JsonPeerstore {
store: JsonFileDatastore::new(path)?,
})
}

/// Flushes the content of the peer store to the disk.
///
/// This function can only fail in case of a disk access error. If an error occurs, any change
/// to the peerstore that was performed since the last successful flush will be lost. No data
/// will be corrupted.
#[inline]
pub fn flush(&self) -> Result<(), IoError> {
self.store.flush()
}
}

impl<'a> Peerstore for &'a JsonPeerstore {
type PeerAccess = JsonPeerstoreAccess<'a>;
type PeersIter = Box<Iterator<Item = PeerId>>;

#[inline]
fn peer(self, peer_id: &PeerId) -> Option<Self::PeerAccess> {
let hash = bs58::encode(peer_id.as_bytes()).into_string();
self.store.lock(hash.into()).map(JsonPeerstoreAccess)
}

#[inline]
fn peer_or_create(self, peer_id: &PeerId) -> Self::PeerAccess {
let hash = bs58::encode(peer_id.as_bytes()).into_string();
JsonPeerstoreAccess(self.store.lock_or_create(hash.into()))
}

fn peers(self) -> Self::PeersIter {
let query = self.store.query(Query {
prefix: "".into(),
filters: vec![],
orders: vec![],
skip: 0,
limit: u64::max_value(),
keys_only: true,
});

let list = query
.filter_map(|(key, _)| {
// We filter out invalid elements. This can happen if the JSON storage file was
// corrupted or manually modified by the user.
PeerId::from_bytes(bs58::decode(key).into_vec().ok()?).ok()
})
.collect()
.wait(); // Wait can never block for the JSON datastore.

// Need to handle I/O errors. Again we just ignore.
if let Ok(list) = list {
Box::new(list.into_iter()) as Box<_>
} else {
Box::new(iter::empty()) as Box<_>
}
}
}

pub struct JsonPeerstoreAccess<'a>(JsonFileDatastoreEntry<'a, PeerInfo>);

impl<'a> PeerAccess for JsonPeerstoreAccess<'a> {
type AddrsIter = VecIntoIter<Multiaddr>;

#[inline]
fn addrs(&self) -> Self::AddrsIter {
self.0.addrs().cloned().collect::<Vec<_>>().into_iter()
}

#[inline]
fn add_addr(&mut self, addr: Multiaddr, ttl: TTL) {
self.0
.add_addr(addr, ttl, AddAddrBehaviour::IgnoreTtlIfInferior);
}

#[inline]
fn set_addr_ttl(&mut self, addr: Multiaddr, ttl: TTL) {
self.0.add_addr(addr, ttl, AddAddrBehaviour::OverwriteTtl);
}

#[inline]
fn clear_addrs(&mut self) {
self.0.set_addrs(iter::empty());
}
}

/// Information about a peer.
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
struct PeerInfo {
/// Adresses, and the time at which they will be considered expired.
addrs: Vec<(SerdeMultiaddr, SystemTime)>,
}

impl PeerInfo {
/// Builds a new empty `PeerInfo`.
#[inline]
pub fn new() -> PeerInfo {
PeerInfo { addrs: vec![] }
}

/// Returns the list of the non-expired addresses stored in this `PeerInfo`.
///
/// > **Note**: Keep in mind that this function is racy because addresses can expire between
/// > the moment when you get them and the moment when you process them.
// TODO: use -> impl Iterator eventually
#[inline]
pub fn addrs<'a>(&'a self) -> Box<Iterator<Item = &'a Multiaddr> + 'a> {
let now = SystemTime::now();
Box::new(self.addrs.iter().filter_map(
move |&(ref addr, ref expires)| if *expires >= now { Some(&addr.0) } else { None },
))
}

/// Sets the list of addresses and their time-to-live.
///
/// This removes all previously-stored addresses and replaces them with new ones.
#[inline]
pub fn set_addrs<I>(&mut self, addrs: I)
where
I: IntoIterator<Item = (Multiaddr, TTL)>,
{
let now = SystemTime::now();
self.addrs = addrs
.into_iter()
.map(move |(addr, ttl)| (SerdeMultiaddr(addr), now + ttl))
.collect();
}

/// Adds a single address and its time-to-live.
///
/// If the peer info already knows about that address, then what happens depends on the
/// `behaviour` parameter.
pub fn add_addr(&mut self, addr: Multiaddr, ttl: TTL, behaviour: AddAddrBehaviour) {
let expires = SystemTime::now() + ttl;

if let Some(&mut (_, ref mut existing_expires)) =
self.addrs.iter_mut().find(|&&mut (ref a, _)| a.0 == addr)
{
if behaviour == AddAddrBehaviour::OverwriteTtl || *existing_expires < expires {
*existing_expires = expires;
}
return;
}

self.addrs.push((SerdeMultiaddr(addr), expires));
}
}

/// Behaviour of the `add_addr` function.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum AddAddrBehaviour {
/// Always overwrite the existing TTL.
OverwriteTtl,
/// Don't overwrite if the TTL is larger.
IgnoreTtlIfInferior,
}

/// Same as `Multiaddr`, but serializable.
#[derive(Debug, Clone, PartialEq, Eq)]
struct SerdeMultiaddr(Multiaddr);

impl Serialize for SerdeMultiaddr {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
self.0.to_string().serialize(serializer)
}
}

impl<'de> Deserialize<'de> for SerdeMultiaddr {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let addr: String = Deserialize::deserialize(deserializer)?;
let addr = match addr.parse::<Multiaddr>() {
Ok(a) => a,
Err(err) => return Err(DeserializerError::custom(err)),
};
Ok(SerdeMultiaddr(addr))
}
}

// The reason why we need to implement the PartialOrd trait is that the datastore library (a
// key-value storage) which we use allows performing queries where the results can be ordered.
//
// Since the struct that implements PartialOrd is internal and since we never use this ordering
// feature, I think it's ok to have this code.
impl PartialOrd for PeerInfo {
#[inline]
fn partial_cmp(&self, _other: &Self) -> Option<Ordering> {
None
}
}
21 changes: 15 additions & 6 deletions substrate/network-libp2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,8 +495,13 @@ fn init_thread(

trace!(target: "sub-libp2p", "Dialing bootnode {:?}", peer_id);
for proto in shared.protocols.read().0.clone().into_iter() {
open_peer_custom_proto(shared.clone(), transport.clone(),
proto, peer_id.clone(), &swarm_controller)
open_peer_custom_proto(
shared.clone(),
transport.clone(),
proto,
peer_id.clone(),
&swarm_controller
)
}
}

Expand Down Expand Up @@ -843,8 +848,7 @@ fn start_kademlia_discovery<T, To, St, C>(shared: Arc<Shared>, transport: T,
.for_each({
let shared = shared.clone();
move |_| {
connect_to_nodes(shared.clone(), transport.clone(),
&swarm_controller);
connect_to_nodes(shared.clone(), transport.clone(), &swarm_controller);
Ok(())
}
});
Expand Down Expand Up @@ -940,8 +944,13 @@ fn connect_to_nodes<T, To, St, C>(
// should automatically open multiple substreams.
trace!(target: "sub-libp2p", "Ensuring connection to {:?}", peer);
for proto in shared.protocols.read().0.clone().into_iter() {
open_peer_custom_proto(shared.clone(), base_transport.clone(),
proto, peer.clone(), swarm_controller)
open_peer_custom_proto(
shared.clone(),
base_transport.clone(),
proto,
peer.clone(),
swarm_controller
)
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions substrate/network-libp2p/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ pub fn build_transport(

TransportTimeout::new(base, Duration::from_secs(20))
.map_err(|err| {
debug!(target: "sub-libp2p", "Error in base transport \
layer: {:?}", err);
debug!(target: "sub-libp2p", "Error in base transport layer: {:?}", err);
err
})
}
Expand Down

0 comments on commit a058a5f

Please sign in to comment.