Skip to content

Commit

Permalink
Refactor iterative queries.
Browse files Browse the repository at this point in the history
Refactoring of iterative queries (`query.rs`) to improve both
correctness and performance (for larger DHTs):

Correctness:

  1. Queries no longer terminate prematurely due to counting results
     from peers farther from the target while results from closer
     peers are still pending. (#1105).

  2. Queries no longer ignore reported closer peers that are not duplicates
     just because they are currently not among the `num_results` closest.
     The currently `max_results` closest may contain peers marked as failed
     or pending / waiting. Hence all reported closer peers that are not
     duplicates must be considered candidates that may still end up
     among the `num_results` closest that successfully responded.

  3. Bounded parallelism based on the `active_counter` was not working
     correctly, as new (not yet contacted) peers closer to the target
     may be discovered at any time and thus appear in `closer_peers`
     before the already active / pending peers.

  4. The `Frozen` query mechanism allowed all remaining not-yet contacted
     peers to be contacted, but their results were discarded, because
     `inject_rpc_result` would only incorporate results while the
     query is `Iterating`. The `Frozen` state has been reworked into
     a `Stalled` state that implements a slightly more permissive
     variant of the following from the paper / specs: "If a round of
     FIND_NODEs fails to return a node any closer than the closest
     already seen, the initiator resends the FIND_NODE to all of the
     k closest nodes it has not already queried.". Importantly, though
     not explicitly mentioned, the query can move back to `Iterating`
     if it makes further progress again as a result of these requests.
     The `Stalled` state thus allows (temporarily) higher parallelism
     in an effort to make progress and bring the query to an end.

Performance:

  1. Repeated distance calculations between the same peers and the
     target is avoided.

  2. Enabled by #1108, use of a more appropriate data structure (`BTreeMap`) for
     the incrementally updated list of closer peers. The data structure needs
     efficient lookups (to avoid duplicates) and insertions at any position,
     both of which large(r) vectors are not that good at. Unscientific benchmarks
     showed a ~40-60% improvement in somewhat pathological scenarios with at least
     20 healthy nodes, each possibly returning a distinct list of closer 20 peers
     to the requestor. A previous assumption may have been that the vector always
     stays very small, but that is not the case in larger clusters: Even if the
     lists of closer peers reported by the 20 contacted peers are heavily overlapping,
     typically a lot more than 20 peers have to be (at least temporarily) considered
     as closest peers until the query completes. See also issue (2) above.

New tests are added for:

  * Query termination conditions.
  * Bounded parallelism.
  * Absence of duplicates.
  • Loading branch information
Roman S. Borschel committed Jun 13, 2019
1 parent 78d6f44 commit 271b496
Show file tree
Hide file tree
Showing 2 changed files with 679 additions and 507 deletions.
129 changes: 51 additions & 78 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::addresses::Addresses;
use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn};
use crate::kbucket::{self, KBucketsTable, NodeStatus};
use crate::protocol::{KadConnectionType, KadPeer};
use crate::query::{QueryConfig, QueryState, QueryStatePollOut};
use crate::query::{QueryConfig, Query, QueryState};
use crate::write::WriteState;
use crate::record::{MemoryRecordStorage, RecordStore, Record, RecordStorageError};
use fnv::{FnvHashMap, FnvHashSet};
Expand All @@ -47,7 +47,7 @@ pub struct Kademlia<TSubstream, TRecordStorage: RecordStore = MemoryRecordStorag

/// All the iterative queries we are currently performing, with their ID. The last parameter
/// is the list of accumulated providers for `GET_PROVIDERS` queries.
active_queries: FnvHashMap<QueryId, QueryState<QueryInfo, PeerId>>,
active_queries: FnvHashMap<QueryId, Query<QueryInfo, PeerId>>,

/// All the `PUT_VALUE` actions we are currently performing
active_writes: FnvHashMap<QueryId, WriteState<PeerId, Multihash>>,
Expand All @@ -65,7 +65,6 @@ pub struct Kademlia<TSubstream, TRecordStorage: RecordStore = MemoryRecordStorag
/// List of values and peers that are providing them.
///
/// Our local peer ID can be in this container.
// TODO: Note that in reality the value is a SHA-256 of the actual value (https://github.com/libp2p/rust-libp2p/issues/694)
values_providers: FnvHashMap<Multihash, SmallVec<[PeerId; 20]>>,

/// List of values that we are providing ourselves. Must be kept in sync with
Expand All @@ -75,16 +74,8 @@ pub struct Kademlia<TSubstream, TRecordStorage: RecordStore = MemoryRecordStorag
/// Interval to send `ADD_PROVIDER` messages to everyone.
refresh_add_providers: stream::Fuse<Interval>,

/// `α` in the Kademlia reference papers. Designates the maximum number of queries that we
/// perform in parallel.
parallelism: usize,

/// The number of results to return from a query. Defaults to the maximum number
/// of entries in a single k-bucket, i.e. the `k` parameter.
num_results: usize,

/// Timeout for a single RPC.
rpc_timeout: Duration,
/// The configuration for iterative queries.
query_config: QueryConfig,

/// Queued events to return when the behaviour is being polled.
queued_events: SmallVec<[NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaOut>; 32]>,
Expand Down Expand Up @@ -215,8 +206,7 @@ impl<TSubstream, TRecordStorage> Kademlia<TSubstream, TRecordStorage>
where
TRecordStorage: RecordStore
{
/// Creates a `Kademlia`.
#[inline]
/// Creates a new `Kademlia` network behaviour with the given local `PeerId`.
pub fn new(local_peer_id: PeerId) -> Self
where
TRecordStorage: Default
Expand Down Expand Up @@ -249,7 +239,6 @@ where
///
/// Contrary to `new`, doesn't perform the initialization queries that store our local ID into
/// the DHT and fill our buckets.
#[inline]
#[deprecated(note="this function is now equivalent to new() and will be removed in the future")]
pub fn without_init(local_peer_id: PeerId) -> Self
where TRecordStorage: Default
Expand Down Expand Up @@ -299,7 +288,7 @@ where

/// Inner implementation of the constructors.
fn new_inner(local_peer_id: PeerId, records: TRecordStorage) -> Self {
let parallelism = 3;
let query_config = QueryConfig::default();

Kademlia {
kbuckets: KBucketsTable::new(kbucket::Key::new(local_peer_id), Duration::from_secs(60)), // TODO: constant
Expand All @@ -308,14 +297,12 @@ where
active_queries: Default::default(),
active_writes: Default::default(),
connected_peers: Default::default(),
pending_rpcs: SmallVec::with_capacity(parallelism),
pending_rpcs: SmallVec::with_capacity(query_config.parallelism),
next_query_id: QueryId(0),
values_providers: FnvHashMap::default(),
providing_keys: FnvHashSet::default(),
refresh_add_providers: Interval::new_interval(Duration::from_secs(60)).fuse(), // TODO: constant
parallelism,
num_results: kbucket::MAX_NODES_PER_BUCKET,
rpc_timeout: Duration::from_secs(8),
query_config,
add_provider: SmallVec::new(),
marker: PhantomData,
records,
Expand Down Expand Up @@ -429,24 +416,13 @@ where
};

let target_key = kbucket::Key::new(target.clone());
let known_closest_peers = self.kbuckets.closest_keys(&target_key);
let query = Query::with_config(self.query_config.clone(), target, known_closest_peers);

let known_closest_peers = self.kbuckets
.closest_keys(&target_key)
.take(self.num_results);

self.active_queries.insert(
query_id,
QueryState::new(QueryConfig {
target,
parallelism: self.parallelism,
num_results: self.num_results,
rpc_timeout: self.rpc_timeout,
known_closest_peers,
})
);
self.active_queries.insert(query_id, query);
}

/// Processes discovered peers from a query.
/// Processes discovered peers from an iterative `Query`.
fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
where
I: Iterator<Item=&'a KadPeer> + Clone
Expand All @@ -469,7 +445,7 @@ where
query.target_mut().untrusted_addresses
.insert(peer.node_id.clone(), peer.multiaddrs.iter().cloned().collect());
}
query.inject_rpc_result(source, others_iter.cloned().map(|kp| kp.node_id))
query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
}
}

Expand All @@ -480,7 +456,7 @@ where
self.kbuckets
.closest(target)
.filter(|e| e.node.key.preimage() != source)
.take(self.num_results)
.take(self.query_config.num_results)
.map(KadPeer::from)
.collect()
}
Expand Down Expand Up @@ -628,7 +604,7 @@ where

fn inject_dial_failure(&mut self, peer_id: &PeerId) {
for query in self.active_queries.values_mut() {
query.inject_rpc_error(peer_id);
query.on_failure(peer_id);
}
for write in self.active_writes.values_mut() {
write.inject_write_error(peer_id);
Expand All @@ -637,7 +613,7 @@ where

fn inject_disconnected(&mut self, id: &PeerId, _old_endpoint: ConnectedPoint) {
for query in self.active_queries.values_mut() {
query.inject_rpc_error(id);
query.on_failure(id);
}
for write in self.active_writes.values_mut() {
write.inject_write_error(id);
Expand All @@ -659,6 +635,7 @@ where

if let Some(addrs) = self.kbuckets.entry(&kbucket::Key::new(peer_id)).value() {
if let ConnectedPoint::Dialer { address } = new_endpoint {
// TODO: Remove the old address, i.e. from `_old`?
addrs.insert(address);
}
}
Expand Down Expand Up @@ -715,7 +692,7 @@ where
// It is possible that we obtain a response for a query that has finished, which is
// why we may not find an entry in `self.active_queries`.
if let Some(query) = self.active_queries.get_mut(&user_data) {
query.inject_rpc_error(&source)
query.on_failure(&source)
}

if let Some(write) = self.active_writes.get_mut(&user_data) {
Expand Down Expand Up @@ -774,13 +751,12 @@ where
}

if let Some(finished_query) = finished_query {
let (query_info, _) = self
.active_queries
let result = self.active_queries
.remove(&finished_query)
.expect("finished_query was gathered when peeking into active_queries; QED.")
.into_target_and_closest_peers();
.into_result();

match query_info.inner {
match result.target.inner {
QueryInfoInner::GetValue { key: _, results, .. } => {
let result = GetValueResult::Found { results };
let event = KademliaOut::GetValueResult(result);
Expand Down Expand Up @@ -831,6 +807,8 @@ where
Self::OutEvent,
>,
> {
let now = Instant::now();

// Flush the changes to the topology that we want to make.
for (key, provider) in self.add_provider.drain() {
// Don't add ourselves to the providers.
Expand Down Expand Up @@ -877,33 +855,29 @@ where

'queries_iter: for (&query_id, query) in self.active_queries.iter_mut() {
loop {
match query.poll() {
Async::Ready(QueryStatePollOut::Finished) => {
match query.next(now) {
QueryState::Finished => {
finished_query = Some(query_id);
break 'queries_iter;
}
Async::Ready(QueryStatePollOut::SendRpc {
peer_id,
query_target,
}) => {
let rpc = query_target.to_rpc_request(query_id);
QueryState::Waiting(Some(peer_id)) => {
if self.connected_peers.contains(peer_id) {
let peer_id = peer_id.clone();
let event = query.target().to_rpc_request(query_id);
return Async::Ready(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: rpc,
peer_id,
event
});
} else if peer_id != self.kbuckets.local_key().preimage() {
self.pending_rpcs.push((peer_id.clone(), rpc));
let peer_id = peer_id.clone();
let event = query.target().to_rpc_request(query_id);
self.pending_rpcs.push((peer_id.clone(), event));
return Async::Ready(NetworkBehaviourAction::DialPeer {
peer_id: peer_id.clone(),
peer_id,
});
}
}
Async::Ready(QueryStatePollOut::CancelRpc { peer_id }) => {
// We don't cancel if the RPC has already been sent out.
self.pending_rpcs.retain(|(id, _)| id != peer_id);
}
Async::NotReady => break,
QueryState::Waiting(None) | QueryState::WaitingAtCapacity => break,
}
}
}
Expand Down Expand Up @@ -932,32 +906,31 @@ where
}

if let Some(finished_query) = finished_query {
let (query_info, closer_peers) = self
.active_queries
let result = self.active_queries
.remove(&finished_query)
.expect("finished_query was gathered when iterating active_queries; QED.")
.into_target_and_closest_peers();
.into_result();

match query_info.inner {
match result.target.inner {
QueryInfoInner::Initialization { .. } => {},
QueryInfoInner::FindPeer(target) => {
let event = KademliaOut::FindNodeResult {
key: target,
closer_peers: closer_peers.collect(),
closer_peers: result.closest_peers.collect(),
};
break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
},
QueryInfoInner::GetProviders { target, pending_results } => {
let event = KademliaOut::GetProvidersResult {
key: target,
closer_peers: closer_peers.collect(),
closer_peers: result.closest_peers.collect(),
provider_peers: pending_results,
};

break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
},
QueryInfoInner::AddProvider { target } => {
for closest in closer_peers {
for closest in result.closest_peers {
let event = NetworkBehaviourAction::SendEvent {
peer_id: closest,
event: KademliaHandlerIn::AddProvider {
Expand All @@ -974,25 +947,25 @@ where
},
QueryInfoInner::GetValue { key, results, .. } => {
let result = match results.len() {
0 => GetValueResult::NotFound{
0 => GetValueResult::NotFound {
key,
closest_peers: closer_peers.collect()
closest_peers: result.closest_peers.collect()
},
_ => GetValueResult::Found{ results },
_ => GetValueResult::Found { results },
};

let event = KademliaOut::GetValueResult(result);

break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
},
QueryInfoInner::PutValue { key, value } => {
let closer_peers = Vec::from_iter(closer_peers);
for peer in &closer_peers {
let closest_peers = Vec::from_iter(result.closest_peers);
for peer in &closest_peers {
let event = KademliaHandlerIn::PutValue {
key: key.clone(),
value: value.clone(),
user_data: finished_query,
};
key: key.clone(),
value: value.clone(),
user_data: finished_query,
};

if self.connected_peers.contains(peer) {
let event = NetworkBehaviourAction::SendEvent {
Expand All @@ -1008,7 +981,7 @@ where
}
}

self.active_writes.insert(finished_query, WriteState::new(key, closer_peers));
self.active_writes.insert(finished_query, WriteState::new(key, closest_peers));
},
}
} else {
Expand Down
Loading

0 comments on commit 271b496

Please sign in to comment.