Skip to content

Commit

Permalink
Rewrite pending node handling and add tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
Roman S. Borschel committed May 20, 2019
1 parent 77377c6 commit 765033a
Show file tree
Hide file tree
Showing 6 changed files with 1,144 additions and 947 deletions.
234 changes: 113 additions & 121 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use crate::addresses::Addresses;
use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn};
use crate::kbucket::{self, KBucketsTable};
use crate::kbucket::{self, KBucketsTable, NodeStatus};
use crate::protocol::{KadConnectionType, KadPeer};
use crate::query::{QueryConfig, QueryState, QueryStatePollOut};
use fnv::{FnvHashMap, FnvHashSet};
Expand Down Expand Up @@ -204,37 +204,47 @@ impl<TSubstream> Kademlia<TSubstream> {
/// Adds a known address for the given `PeerId`. We are connected to this address.
// TODO: report if the address was inserted? also, semantics unclear
pub fn add_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) {
self.add_address(peer_id, address, true)
self.add_address(peer_id, address, NodeStatus::Connected)
}

/// Adds a known address for the given `PeerId`. We are not connected or don't know whether we
/// are connected to this address.
// TODO: report if the address was inserted? also, semantics unclear
pub fn add_not_connected_address(&mut self, peer_id: &PeerId, address: Multiaddr) {
self.add_address(peer_id, address, false)
self.add_address(peer_id, address, NodeStatus::Disconnected)
}

/// Underlying implementation for `add_connected_address` and `add_not_connected_address`.
fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, _connected: bool) {
fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr, status: NodeStatus) {
let key = kbucket::Key::new(peer_id.clone());
match self.kbuckets.entry(&key) {
kbucket::Entry::InKbucketConnected(mut entry) => entry.value().insert(address),
kbucket::Entry::InKbucketConnectedPending(mut entry) => entry.value().insert(address),
kbucket::Entry::InKbucketDisconnected(mut entry) => entry.value().insert(address),
kbucket::Entry::InKbucketDisconnectedPending(mut entry) => entry.value().insert(address),
kbucket::Entry::NotInKbucket(entry) => {
kbucket::Entry::Present(mut entry, NodeStatus::Connected) => {
entry.value().insert(address);
}
kbucket::Entry::Present(mut entry, NodeStatus::Disconnected) => {
entry.value().insert(address);
entry.update(status);
},
kbucket::Entry::Pending(mut entry, NodeStatus::Connected) => {
entry.value().insert(address);
}
kbucket::Entry::Pending(mut entry, NodeStatus::Disconnected) => {
entry.value().insert(address);
entry.update(status);
},
kbucket::Entry::Absent(entry) => {
let mut addresses = Addresses::new();
addresses.insert(address);
match entry.insert_disconnected(addresses) {
kbucket::InsertOutcome::Inserted => {
match entry.insert(addresses, status) {
kbucket::InsertResult::Inserted => {
let event = KademliaOut::KBucketAdded {
peer_id: peer_id.clone(),
replaced: None,
};
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
},
kbucket::InsertOutcome::Full => (),
kbucket::InsertOutcome::Pending { to_ping } => {
kbucket::InsertResult::Full => (),
kbucket::InsertResult::Pending { to_ping } => {
self.queued_events.push(NetworkBehaviourAction::DialPeer {
peer_id: to_ping.into_preimage(),
})
Expand Down Expand Up @@ -269,9 +279,10 @@ impl<TSubstream> Kademlia<TSubstream> {
}
}

/// Returns an iterator to all the peer IDs in the bucket, without the pending nodes.
pub fn kbuckets_entries(&self) -> impl Iterator<Item = &PeerId> {
self.kbuckets.entries_not_pending().map(|(key, _)| key.preimage())
/// Returns an iterator over all peer IDs of nodes currently contained in a bucket
/// of the Kademlia routing table.
pub fn kbuckets_entries(&mut self) -> impl Iterator<Item = &PeerId> {
self.kbuckets.iter().map(|entry| entry.node.key.preimage())
}

/// Starts an iterative `FIND_NODE` request.
Expand Down Expand Up @@ -380,17 +391,19 @@ impl<TSubstream> Kademlia<TSubstream> {
}
}

/// TODO
fn find_closest<T: Clone>(&mut self, key: &kbucket::Key<T>, source: &PeerId) -> Vec<KadPeer> {
/// Finds the closest peers to a `target` in the context of a request by
/// the `source` peer, such that the `source` peer is never included in the
/// result.
fn find_closest<T: Clone>(&mut self, target: &kbucket::Key<T>, source: &PeerId) -> Vec<KadPeer> {
self.kbuckets
.closest(key)
.filter(|e| e.key.preimage() != source)
.closest(target)
.filter(|e| e.node.key.preimage() != source)
.take(self.num_results)
.map(KadPeer::from)
.collect()
}

/// TODO
/// Collects all peers who are known to be providers of the value for a given `Multihash`.
fn provider_peers(&mut self, key: &Multihash, source: &PeerId) -> Vec<KadPeer> {
let kbuckets = &mut self.kbuckets;
self.values_providers
Expand All @@ -399,12 +412,61 @@ impl<TSubstream> Kademlia<TSubstream> {
.flat_map(|peers| peers)
.filter_map(move |p|
if p != source {
kbuckets.entry(&kbucket::Key::new(p.clone())).view().map(KadPeer::from)
let key = kbucket::Key::new(p.clone());
kbuckets.entry(&key).view().map(|e| KadPeer::from(e.to_owned()))
} else {
None
})
.collect()
}

/// Update the connection status of a peer in the Kademlia routing table.
fn connection_updated(&mut self, peer: PeerId, address: Option<Multiaddr>, new_status: NodeStatus) {
let key = kbucket::Key::new(peer.clone());
match self.kbuckets.entry(&key) {
kbucket::Entry::Present(mut entry, old_status) => {
if let Some(address) = address {
entry.value().insert(address);
}
if old_status != new_status {
entry.update(new_status);
}
},

kbucket::Entry::Pending(mut entry, old_status) => {
if let Some(address) = address {
entry.value().insert(address);
}
if old_status != new_status {
entry.update(new_status);
}
},

kbucket::Entry::Absent(entry) => if new_status == NodeStatus::Connected {
let mut addresses = Addresses::new();
if let Some(address) = address {
addresses.insert(address);
}
match entry.insert(addresses, new_status) {
kbucket::InsertResult::Inserted => {
let event = KademliaOut::KBucketAdded {
peer_id: peer.clone(),
replaced: None,
};
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
},
kbucket::InsertResult::Full => (),
kbucket::InsertResult::Pending { to_ping } => {
debug_assert!(!self.connected_peers.contains(to_ping.preimage()));
self.queued_events.push(NetworkBehaviourAction::DialPeer {
peer_id: to_ping.into_preimage(),
})
},
}
},
_ => {}
}
}
}

impl<TSubstream> NetworkBehaviour for Kademlia<TSubstream>
Expand All @@ -425,11 +487,13 @@ where
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
// We should order addresses from decreasing likelyhood of connectivity, so start with
// the addresses of that peer in the k-buckets.
let mut out_list = self.kbuckets
.entry(&kbucket::Key::new(peer_id.clone()))
.value_not_pending()
.map(|l| l.iter().cloned().collect::<Vec<_>>())
.unwrap_or_else(Vec::new);
let key = kbucket::Key::new(peer_id.clone());
let mut out_list =
if let kbucket::Entry::Present(mut entry, _) = self.kbuckets.entry(&key) {
entry.value().iter().cloned().collect::<Vec<_>>()
} else {
Vec::new()
};

// We add to that a temporary list of addresses from the ongoing queries.
for query in self.active_queries.values() {
Expand Down Expand Up @@ -457,69 +521,18 @@ where
ConnectedPoint::Listener { .. } => None,
};

let key = kbucket::Key::new(id.clone());

match self.kbuckets.entry(&key) {
kbucket::Entry::InKbucketConnected(_) => {
unreachable!("Kbuckets are always kept in sync with the connection state; QED")
},
kbucket::Entry::InKbucketConnectedPending(_) => {
unreachable!("Kbuckets are always kept in sync with the connection state; QED")
},

kbucket::Entry::InKbucketDisconnected(mut entry) => {
if let Some(address) = address {
entry.value().insert(address);
}
entry.set_connected();
},

kbucket::Entry::InKbucketDisconnectedPending(mut entry) => {
if let Some(address) = address {
entry.value().insert(address);
}
entry.set_connected();
},

kbucket::Entry::NotInKbucket(entry) => {
let mut addresses = Addresses::new();
if let Some(address) = address {
addresses.insert(address);
}
match entry.insert_connected(addresses) {
kbucket::InsertOutcome::Inserted => {
let event = KademliaOut::KBucketAdded {
peer_id: id.clone(),
replaced: None,
};
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
},
kbucket::InsertOutcome::Full => (),
kbucket::InsertOutcome::Pending { to_ping } => {
debug_assert!(!self.connected_peers.contains(to_ping.preimage()));
self.queued_events.push(NetworkBehaviourAction::DialPeer {
peer_id: to_ping.into_preimage(),
})
},
}
},

kbucket::Entry::SelfEntry => {
unreachable!("Guaranteed to never receive disconnected even for self; QED")
},
}

self.connection_updated(id.clone(), address, NodeStatus::Connected);
self.connected_peers.insert(id);
}

fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, _: &dyn error::Error) {
if let Some(peer_id) = peer_id {
let key = kbucket::Key::new(peer_id.clone());

if let Some(list) = self.kbuckets.entry(&key).value() {
if let Some(addrs) = self.kbuckets.entry(&key).value() {
// TODO: don't remove the address if the error is that we are already connected
// to this peer
list.remove(addr);
addrs.remove(addr);
}

for query in self.active_queries.values_mut() {
Expand All @@ -537,40 +550,11 @@ where
}

fn inject_disconnected(&mut self, id: &PeerId, _old_endpoint: ConnectedPoint) {
let was_in = self.connected_peers.remove(id);
debug_assert!(was_in);

for query in self.active_queries.values_mut() {
query.inject_rpc_error(id);
}

match self.kbuckets.entry(&kbucket::Key::new(id.clone())) {
kbucket::Entry::InKbucketConnected(entry) => {
match entry.set_disconnected() {
kbucket::SetDisconnectedOutcome::Kept(_) => {},
kbucket::SetDisconnectedOutcome::Replaced { replacement, .. } => {
let event = KademliaOut::KBucketAdded {
peer_id: replacement.into_preimage(),
replaced: Some(id.clone()),
};
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(event));
},
}
},
kbucket::Entry::InKbucketConnectedPending(entry) => {
entry.set_disconnected();
},
kbucket::Entry::InKbucketDisconnected(_) => {
unreachable!("Kbuckets are always kept in sync with the connection state; QED")
},
kbucket::Entry::InKbucketDisconnectedPending(_) => {
unreachable!("Kbuckets are always kept in sync with the connection state; QED")
},
kbucket::Entry::NotInKbucket(_) => {},
kbucket::Entry::SelfEntry => {
unreachable!("Guaranteed to never receive disconnected even for self; QED")
},
}
self.connection_updated(id.clone(), None, NodeStatus::Disconnected);
self.connected_peers.remove(id);
}

fn inject_replaced(&mut self, peer_id: PeerId, _old: ConnectedPoint, new_endpoint: ConnectedPoint) {
Expand All @@ -584,9 +568,9 @@ where
}
}

if let Some(list) = self.kbuckets.entry(&kbucket::Key::new(peer_id)).value() {
if let Some(addrs) = self.kbuckets.entry(&kbucket::Key::new(peer_id)).value() {
if let ConnectedPoint::Dialer { address } = new_endpoint {
list.insert(address);
addrs.insert(address);
}
}
}
Expand Down Expand Up @@ -692,12 +676,21 @@ where
}

loop {
// Handle events queued by other parts of this struct
// Drain queued events first.
if !self.queued_events.is_empty() {
return Async::Ready(self.queued_events.remove(0));
}
self.queued_events.shrink_to_fit();

// Drain applied pending entries from the DHT.
if let Some(entry) = self.kbuckets.take_applied_pending() {
let event = KademliaOut::KBucketAdded {
peer_id: entry.inserted.into_preimage(),
replaced: entry.evicted.map(|n| n.key.into_preimage())
};
return Async::Ready(NetworkBehaviourAction::GenerateEvent(event))
}

// If iterating finds a query that is finished, stores it here and stops looping.
let mut finished_query = None;

Expand Down Expand Up @@ -831,12 +824,11 @@ pub enum KademliaOut {
impl From<kbucket::EntryView<PeerId, Addresses>> for KadPeer {
fn from(e: kbucket::EntryView<PeerId, Addresses>) -> KadPeer {
KadPeer {
node_id: e.key.into_preimage(),
multiaddrs: e.value.into_vec(),
connection_ty: if e.connected {
KadConnectionType::Connected
} else {
KadConnectionType::NotConnected
node_id: e.node.key.into_preimage(),
multiaddrs: e.node.value.into_vec(),
connection_ty: match e.status {
NodeStatus::Connected => KadConnectionType::Connected,
NodeStatus::Disconnected => KadConnectionType::NotConnected
}
}
}
Expand Down
Loading

0 comments on commit 765033a

Please sign in to comment.