Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kademlia: Refactor iterative queries. #1154

Merged
merged 2 commits into from
Jun 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 51 additions & 78 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::addresses::Addresses;
use crate::handler::{KademliaHandler, KademliaHandlerEvent, KademliaHandlerIn};
use crate::kbucket::{self, KBucketsTable, NodeStatus};
use crate::protocol::{KadConnectionType, KadPeer};
use crate::query::{QueryConfig, QueryState, QueryStatePollOut};
use crate::query::{QueryConfig, Query, QueryState};
use crate::write::WriteState;
use crate::record::{MemoryRecordStorage, RecordStore, Record, RecordStorageError};
use fnv::{FnvHashMap, FnvHashSet};
Expand All @@ -47,7 +47,7 @@ pub struct Kademlia<TSubstream, TRecordStorage: RecordStore = MemoryRecordStorag

/// All the iterative queries we are currently performing, with their ID. The last parameter
/// is the list of accumulated providers for `GET_PROVIDERS` queries.
active_queries: FnvHashMap<QueryId, QueryState<QueryInfo, PeerId>>,
active_queries: FnvHashMap<QueryId, Query<QueryInfo, PeerId>>,

/// All the `PUT_VALUE` actions we are currently performing
active_writes: FnvHashMap<QueryId, WriteState<PeerId, Multihash>>,
Expand All @@ -65,7 +65,6 @@ pub struct Kademlia<TSubstream, TRecordStorage: RecordStore = MemoryRecordStorag
/// List of values and peers that are providing them.
///
/// Our local peer ID can be in this container.
// TODO: Note that in reality the value is a SHA-256 of the actual value (https://github.com/libp2p/rust-libp2p/issues/694)
values_providers: FnvHashMap<Multihash, SmallVec<[PeerId; 20]>>,

/// List of values that we are providing ourselves. Must be kept in sync with
Expand All @@ -75,16 +74,8 @@ pub struct Kademlia<TSubstream, TRecordStorage: RecordStore = MemoryRecordStorag
/// Interval to send `ADD_PROVIDER` messages to everyone.
refresh_add_providers: stream::Fuse<Interval>,

/// `α` in the Kademlia reference papers. Designates the maximum number of queries that we
/// perform in parallel.
parallelism: usize,

/// The number of results to return from a query. Defaults to the maximum number
/// of entries in a single k-bucket, i.e. the `k` parameter.
num_results: usize,

/// Timeout for a single RPC.
rpc_timeout: Duration,
/// The configuration for iterative queries.
query_config: QueryConfig,

/// Queued events to return when the behaviour is being polled.
queued_events: SmallVec<[NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaOut>; 32]>,
Expand Down Expand Up @@ -215,8 +206,7 @@ impl<TSubstream, TRecordStorage> Kademlia<TSubstream, TRecordStorage>
where
TRecordStorage: RecordStore
{
/// Creates a `Kademlia`.
#[inline]
/// Creates a new `Kademlia` network behaviour with the given local `PeerId`.
pub fn new(local_peer_id: PeerId) -> Self
where
TRecordStorage: Default
Expand Down Expand Up @@ -249,7 +239,6 @@ where
///
/// Contrary to `new`, doesn't perform the initialization queries that store our local ID into
/// the DHT and fill our buckets.
#[inline]
#[deprecated(note="this function is now equivalent to new() and will be removed in the future")]
pub fn without_init(local_peer_id: PeerId) -> Self
where TRecordStorage: Default
Expand Down Expand Up @@ -299,7 +288,7 @@ where

/// Inner implementation of the constructors.
fn new_inner(local_peer_id: PeerId, records: TRecordStorage) -> Self {
let parallelism = 3;
let query_config = QueryConfig::default();

Kademlia {
kbuckets: KBucketsTable::new(kbucket::Key::new(local_peer_id), Duration::from_secs(60)), // TODO: constant
Expand All @@ -308,14 +297,12 @@ where
active_queries: Default::default(),
active_writes: Default::default(),
connected_peers: Default::default(),
pending_rpcs: SmallVec::with_capacity(parallelism),
pending_rpcs: SmallVec::with_capacity(query_config.parallelism),
next_query_id: QueryId(0),
values_providers: FnvHashMap::default(),
providing_keys: FnvHashSet::default(),
refresh_add_providers: Interval::new_interval(Duration::from_secs(60)).fuse(), // TODO: constant
parallelism,
num_results: kbucket::MAX_NODES_PER_BUCKET,
rpc_timeout: Duration::from_secs(8),
query_config,
add_provider: SmallVec::new(),
marker: PhantomData,
records,
Expand Down Expand Up @@ -429,24 +416,13 @@ where
};

let target_key = kbucket::Key::new(target.clone());
let known_closest_peers = self.kbuckets.closest_keys(&target_key);
let query = Query::with_config(self.query_config.clone(), target, known_closest_peers);

let known_closest_peers = self.kbuckets
.closest_keys(&target_key)
.take(self.num_results);

self.active_queries.insert(
query_id,
QueryState::new(QueryConfig {
target,
parallelism: self.parallelism,
num_results: self.num_results,
rpc_timeout: self.rpc_timeout,
known_closest_peers,
})
);
self.active_queries.insert(query_id, query);
}

/// Processes discovered peers from a query.
/// Processes discovered peers from an iterative `Query`.
fn discovered<'a, I>(&'a mut self, query_id: &QueryId, source: &PeerId, peers: I)
where
I: Iterator<Item=&'a KadPeer> + Clone
Expand All @@ -469,7 +445,7 @@ where
query.target_mut().untrusted_addresses
.insert(peer.node_id.clone(), peer.multiaddrs.iter().cloned().collect());
}
query.inject_rpc_result(source, others_iter.cloned().map(|kp| kp.node_id))
query.on_success(source, others_iter.cloned().map(|kp| kp.node_id))
}
}

Expand All @@ -480,7 +456,7 @@ where
self.kbuckets
.closest(target)
.filter(|e| e.node.key.preimage() != source)
.take(self.num_results)
.take(self.query_config.num_results)
.map(KadPeer::from)
.collect()
}
Expand Down Expand Up @@ -628,7 +604,7 @@ where

fn inject_dial_failure(&mut self, peer_id: &PeerId) {
for query in self.active_queries.values_mut() {
query.inject_rpc_error(peer_id);
query.on_failure(peer_id);
}
for write in self.active_writes.values_mut() {
write.inject_write_error(peer_id);
Expand All @@ -637,7 +613,7 @@ where

fn inject_disconnected(&mut self, id: &PeerId, _old_endpoint: ConnectedPoint) {
for query in self.active_queries.values_mut() {
query.inject_rpc_error(id);
query.on_failure(id);
}
for write in self.active_writes.values_mut() {
write.inject_write_error(id);
Expand All @@ -659,6 +635,7 @@ where

if let Some(addrs) = self.kbuckets.entry(&kbucket::Key::new(peer_id)).value() {
if let ConnectedPoint::Dialer { address } = new_endpoint {
// TODO: Remove the old address, i.e. from `_old`?
addrs.insert(address);
}
}
Expand Down Expand Up @@ -715,7 +692,7 @@ where
// It is possible that we obtain a response for a query that has finished, which is
// why we may not find an entry in `self.active_queries`.
if let Some(query) = self.active_queries.get_mut(&user_data) {
query.inject_rpc_error(&source)
query.on_failure(&source)
}

if let Some(write) = self.active_writes.get_mut(&user_data) {
Expand Down Expand Up @@ -774,13 +751,12 @@ where
}

if let Some(finished_query) = finished_query {
let (query_info, _) = self
.active_queries
let result = self.active_queries
.remove(&finished_query)
.expect("finished_query was gathered when peeking into active_queries; QED.")
.into_target_and_closest_peers();
.into_result();

match query_info.inner {
match result.target.inner {
QueryInfoInner::GetValue { key: _, results, .. } => {
let result = GetValueResult::Found { results };
let event = KademliaOut::GetValueResult(result);
Expand Down Expand Up @@ -831,6 +807,8 @@ where
Self::OutEvent,
>,
> {
let now = Instant::now();

// Flush the changes to the topology that we want to make.
for (key, provider) in self.add_provider.drain() {
// Don't add ourselves to the providers.
Expand Down Expand Up @@ -877,33 +855,29 @@ where

'queries_iter: for (&query_id, query) in self.active_queries.iter_mut() {
loop {
match query.poll() {
Async::Ready(QueryStatePollOut::Finished) => {
match query.next(now) {
QueryState::Finished => {
finished_query = Some(query_id);
break 'queries_iter;
}
Async::Ready(QueryStatePollOut::SendRpc {
peer_id,
query_target,
}) => {
let rpc = query_target.to_rpc_request(query_id);
QueryState::Waiting(Some(peer_id)) => {
if self.connected_peers.contains(peer_id) {
let peer_id = peer_id.clone();
let event = query.target().to_rpc_request(query_id);
return Async::Ready(NetworkBehaviourAction::SendEvent {
peer_id: peer_id.clone(),
event: rpc,
peer_id,
event
});
} else if peer_id != self.kbuckets.local_key().preimage() {
self.pending_rpcs.push((peer_id.clone(), rpc));
let peer_id = peer_id.clone();
let event = query.target().to_rpc_request(query_id);
self.pending_rpcs.push((peer_id.clone(), event));
return Async::Ready(NetworkBehaviourAction::DialPeer {
peer_id: peer_id.clone(),
peer_id,
});
}
}
Async::Ready(QueryStatePollOut::CancelRpc { peer_id }) => {
// We don't cancel if the RPC has already been sent out.
self.pending_rpcs.retain(|(id, _)| id != peer_id);
}
Async::NotReady => break,
QueryState::Waiting(None) | QueryState::WaitingAtCapacity => break,
}
}
}
Expand Down Expand Up @@ -932,32 +906,31 @@ where
}

if let Some(finished_query) = finished_query {
let (query_info, closer_peers) = self
.active_queries
let result = self.active_queries
.remove(&finished_query)
.expect("finished_query was gathered when iterating active_queries; QED.")
.into_target_and_closest_peers();
.into_result();

match query_info.inner {
match result.target.inner {
QueryInfoInner::Initialization { .. } => {},
QueryInfoInner::FindPeer(target) => {
let event = KademliaOut::FindNodeResult {
key: target,
closer_peers: closer_peers.collect(),
closer_peers: result.closest_peers.collect(),
};
break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
},
QueryInfoInner::GetProviders { target, pending_results } => {
let event = KademliaOut::GetProvidersResult {
key: target,
closer_peers: closer_peers.collect(),
closer_peers: result.closest_peers.collect(),
provider_peers: pending_results,
};

break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
},
QueryInfoInner::AddProvider { target } => {
for closest in closer_peers {
for closest in result.closest_peers {
let event = NetworkBehaviourAction::SendEvent {
peer_id: closest,
event: KademliaHandlerIn::AddProvider {
Expand All @@ -974,25 +947,25 @@ where
},
QueryInfoInner::GetValue { key, results, .. } => {
let result = match results.len() {
0 => GetValueResult::NotFound{
0 => GetValueResult::NotFound {
key,
closest_peers: closer_peers.collect()
closest_peers: result.closest_peers.collect()
},
_ => GetValueResult::Found{ results },
_ => GetValueResult::Found { results },
};

let event = KademliaOut::GetValueResult(result);

break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
},
QueryInfoInner::PutValue { key, value } => {
let closer_peers = Vec::from_iter(closer_peers);
for peer in &closer_peers {
let closest_peers = Vec::from_iter(result.closest_peers);
for peer in &closest_peers {
let event = KademliaHandlerIn::PutValue {
key: key.clone(),
value: value.clone(),
user_data: finished_query,
};
key: key.clone(),
value: value.clone(),
user_data: finished_query,
};

if self.connected_peers.contains(peer) {
let event = NetworkBehaviourAction::SendEvent {
Expand All @@ -1008,7 +981,7 @@ where
}
}

self.active_writes.insert(finished_query, WriteState::new(key, closer_peers));
self.active_writes.insert(finished_query, WriteState::new(key, closest_peers));
},
}
} else {
Expand Down
Loading