Skip to content

Commit

Permalink
[kad] Provide a targeted store operation. (#1988)
Browse files Browse the repository at this point in the history
* Add `Kademlia::put_record_to` for storing a record at specific nodes,
e.g. for write-back caching after a successful read. In that context,
peers that were queried in a successful `Kademlia::get_record` operation but
did not return a record are now returned in the `GetRecordOk::no_record`
list of peer IDs.

Closes libp2p/rust-libp2p#1577.

* Update protocols/kad/src/behaviour.rs

Co-authored-by: Max Inden <[email protected]>

* Refine implementation.

Rather than returning the peers that are cache candidates
in a `Vec` in an arbitrary order, use a `BTreeMap`. Furthermore,
make the caching configurable, being enabled by default with
`max_peers` of 1. By configuring it with `max_peers` > 1 it is
now also possible to control at how many nodes the record is cached
automatically after a lookup with quorum 1. When enabled,
successful lookups will always return `cache_candidates`, which
can be used explicitly with `Kademlia::put_record_to` after
lookups with a quorum > 1.

* Re-export KademliaCaching

* Update protocols/kad/CHANGELOG.md

Co-authored-by: Max Inden <[email protected]>

* Clarify changelog.

Co-authored-by: Max Inden <[email protected]>
  • Loading branch information
romanb and mxinden authored Mar 9, 2021
1 parent b54106d commit a6269d6
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 35 deletions.
13 changes: 13 additions & 0 deletions protocols/kad/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
# 0.29.0 [unreleased]

- Add `KademliaCaching` and `KademliaConfig::set_caching` to configure
whether Kademlia should track, in lookups, the closest nodes to a key
that did not return a record, via `GetRecordOk::cache_candidates`.
As before, if a lookup used a quorum of 1, these candidates will
automatically be sent the found record. Otherwise, with a lookup
quorum of > 1, the candidates can be used with `Kademlia::put_record_to`
after selecting one of the return records to cache. As is the current
behaviour, caching is enabled by default with a `max_peers` of 1, i.e.
it only tracks the closest node to the key that did not return a record.

- Add `Kademlia::put_record_to` for storing a record at specific nodes,
e.g. for write-back caching after a successful read with quorum > 1.

- Update `libp2p-swarm`.

# 0.28.1 [2021-02-15]
Expand Down
174 changes: 143 additions & 31 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ use libp2p_swarm::{
};
use log::{info, debug, warn};
use smallvec::SmallVec;
use std::{borrow::Cow, error, iter, time::Duration};
use std::collections::{HashSet, VecDeque};
use std::{borrow::Cow, error, time::Duration};
use std::collections::{HashSet, VecDeque, BTreeMap};
use std::fmt;
use std::num::NonZeroUsize;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -100,6 +100,9 @@ pub struct Kademlia<TStore> {
/// The currently known addresses of the local node.
local_addrs: HashSet<Multiaddr>,

/// See [`KademliaConfig::caching`].
caching: KademliaCaching,

/// The record storage.
store: TStore,
}
Expand Down Expand Up @@ -143,6 +146,24 @@ pub struct KademliaConfig {
provider_publication_interval: Option<Duration>,
connection_idle_timeout: Duration,
kbucket_inserts: KademliaBucketInserts,
caching: KademliaCaching,
}

/// The configuration for Kademlia "write-back" caching after successful
/// lookups via [`Kademlia::get_record`].
#[derive(Debug, Clone)]
pub enum KademliaCaching {
/// Caching is disabled and the peers closest to records being looked up
/// that do not return a record are not tracked, i.e.
/// [`GetRecordOk::cache_candidates`] is always empty.
Disabled,
/// Up to `max_peers` peers not returning a record that are closest to the key
/// being looked up are tracked and returned in [`GetRecordOk::cache_candidates`].
/// Furthermore, if [`Kademlia::get_record`] is used with a quorum of 1, the
/// found record is automatically sent to (i.e. cached at) these peers. For lookups with a
/// quorum > 1, the write-back operation must be performed explicitly, if
/// desired and after choosing a record from the results, via [`Kademlia::put_record_to`].
Enabled { max_peers: u16 },
}

impl Default for KademliaConfig {
Expand All @@ -158,6 +179,7 @@ impl Default for KademliaConfig {
provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)),
connection_idle_timeout: Duration::from_secs(10),
kbucket_inserts: KademliaBucketInserts::OnConnected,
caching: KademliaCaching::Enabled { max_peers: 1 },
}
}
}
Expand Down Expand Up @@ -319,6 +341,17 @@ impl KademliaConfig {
self.kbucket_inserts = inserts;
self
}

/// Sets the [`KademliaCaching`] strategy to use for successful lookups.
///
/// The default is [`KademliaCaching::Enabled`] with a `max_peers` of 1.
/// Hence, with default settings and a lookup quorum of 1, a successful lookup
/// will result in the record being cached at the closest node to the key that
/// did not return the record, i.e. the standard Kademlia behaviour.
pub fn set_caching(&mut self, c: KademliaCaching) -> &mut Self {
self.caching = c;
self
}
}

impl<TStore> Kademlia<TStore>
Expand Down Expand Up @@ -366,7 +399,8 @@ where
record_ttl: config.record_ttl,
provider_record_ttl: config.provider_record_ttl,
connection_idle_timeout: config.connection_idle_timeout,
local_addrs: HashSet::new()
local_addrs: HashSet::new(),
caching: config.caching,
}
}

Expand Down Expand Up @@ -589,7 +623,12 @@ where

let done = records.len() >= quorum.get();
let target = kbucket::Key::new(key.clone());
let info = QueryInfo::GetRecord { key: key.clone(), records, quorum, cache_at: None };
let info = QueryInfo::GetRecord {
key: key.clone(),
records,
quorum,
cache_candidates: BTreeMap::new(),
};
let peers = self.kbuckets.closest_keys(&target);
let inner = QueryInner::new(info);
let id = self.queries.add_iter_closest(target.clone(), peers, inner); // (*)
Expand All @@ -602,7 +641,8 @@ where
id
}

/// Stores a record in the DHT.
/// Stores a record in the DHT, locally as well as at the nodes
/// closest to the key as per the xor distance metric.
///
/// Returns `Ok` if a record has been stored locally, providing the
/// `QueryId` of the initial query that replicates the record in the DHT.
Expand Down Expand Up @@ -638,6 +678,54 @@ where
Ok(self.queries.add_iter_closest(target.clone(), peers, inner))
}

/// Stores a record at specific peers, without storing it locally.
///
/// The given [`Quorum`] is understood in the context of the total
/// number of distinct peers given.
///
/// If the record's expiration is `None`, the configured record TTL is used.
///
/// > **Note**: This is not a regular Kademlia DHT operation. It may be
/// > used to selectively update or store a record to specific peers
/// > for the purpose of e.g. making sure these peers have the latest
/// > "version" of a record or to "cache" a record at further peers
/// > to increase the lookup success rate on the DHT for other peers.
/// >
/// > In particular, if lookups are performed with a quorum > 1 multiple
/// > possibly different records may be returned and the standard Kademlia
/// > procedure of "caching" (i.e. storing) a found record at the closest
/// > node to the key that _did not_ return it cannot be employed
/// > transparently. In that case, client code can explicitly choose
/// > which record to store at which peers for analogous write-back
/// > caching or for other reasons.
pub fn put_record_to<I>(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId
where
I: ExactSizeIterator<Item = PeerId>
{
let quorum = if peers.len() > 0 {
quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0"))
} else {
// If no peers are given, we just let the query fail immediately
// due to the fact that the quorum must be at least one, instead of
// introducing a new kind of error.
NonZeroUsize::new(1).expect("1 > 0")
};
record.expires = record.expires.or_else(||
self.record_ttl.map(|ttl| Instant::now() + ttl));
let context = PutRecordContext::Custom;
let info = QueryInfo::PutRecord {
context,
record,
quorum,
phase: PutRecordPhase::PutRecord {
success: Vec::new(),
get_closest_peers_stats: QueryStats::empty()
}
};
let inner = QueryInner::new(info);
self.queries.add_fixed(peers, inner)
}

/// Removes the record with the given key from _local_ storage,
/// if the local node is the publisher of the record.
///
Expand Down Expand Up @@ -1083,10 +1171,10 @@ where
}
}

QueryInfo::GetRecord { key, records, quorum, cache_at } => {
QueryInfo::GetRecord { key, records, quorum, cache_candidates } => {
let results = if records.len() >= quorum.get() { // [not empty]
if let Some(cache_key) = cache_at {
// Cache the record at the closest node to the key that
if quorum.get() == 1 && !cache_candidates.is_empty() {
// Cache the record at the closest node(s) to the key that
// did not return the record.
let record = records.first().expect("[not empty]").record.clone();
let quorum = NonZeroUsize::new(1).expect("1 > 0");
Expand All @@ -1101,9 +1189,9 @@ where
}
};
let inner = QueryInner::new(info);
self.queries.add_fixed(iter::once(cache_key.into_preimage()), inner);
self.queries.add_fixed(cache_candidates.values().copied(), inner);
}
Ok(GetRecordOk { records })
Ok(GetRecordOk { records, cache_candidates })
} else if records.is_empty() {
Err(GetRecordError::NotFound {
key,
Expand Down Expand Up @@ -1153,7 +1241,7 @@ where
}
};
match context {
PutRecordContext::Publish =>
PutRecordContext::Publish | PutRecordContext::Custom =>
Some(KademliaEvent::QueryResult {
id: query_id,
stats: get_closest_peers_stats.merge(result.stats),
Expand Down Expand Up @@ -1252,7 +1340,7 @@ where
}
});
match context {
PutRecordContext::Publish =>
PutRecordContext::Publish | PutRecordContext::Custom =>
Some(KademliaEvent::QueryResult {
id: query_id,
stats: result.stats,
Expand Down Expand Up @@ -1722,7 +1810,7 @@ where
} => {
if let Some(query) = self.queries.get_mut(&user_data) {
if let QueryInfo::GetRecord {
key, records, quorum, cache_at
key, records, quorum, cache_candidates
} = &mut query.inner.info {
if let Some(record) = record {
records.push(PeerRecord{ peer: Some(source), record });
Expand All @@ -1744,19 +1832,19 @@ where
);
}
}
} else if quorum.get() == 1 {
// It is a "standard" Kademlia query, for which the
// closest node to the key that did *not* return the
// value is tracked in order to cache the record on
// that node if the query turns out to be successful.
let source_key = kbucket::Key::from(source);
if let Some(cache_key) = cache_at {
let key = kbucket::Key::new(key.clone());
if source_key.distance(&key) < cache_key.distance(&key) {
*cache_at = Some(source_key)
} else {
log::trace!("Record with key {:?} not found at {}", key, source);
if let KademliaCaching::Enabled { max_peers } = self.caching {
let source_key = kbucket::Key::from(source);
let target_key = kbucket::Key::from(key.clone());
let distance = source_key.distance(&target_key);
cache_candidates.insert(distance, source);
if cache_candidates.len() > max_peers as usize {
// TODO: `pop_last()` would be nice once stabilised.
// See https://github.com/rust-lang/rust/issues/62924.
let last = *cache_candidates.keys().next_back().expect("len > 0");
cache_candidates.remove(&last);
}
} else {
*cache_at = Some(source_key)
}
}
}
Expand Down Expand Up @@ -2063,7 +2151,18 @@ pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
/// The successful result of [`Kademlia::get_record`].
#[derive(Debug, Clone)]
pub struct GetRecordOk {
pub records: Vec<PeerRecord>
/// The records found, including the peer that returned them.
pub records: Vec<PeerRecord>,
/// If caching is enabled, these are the peers closest
/// _to the record key_ (not the local node) that were queried but
/// did not return the record, sorted by distance to the record key
/// from closest to farthest. How many of these are tracked is configured
/// by [`KademliaConfig::set_caching`]. If the lookup used a quorum of
/// 1, these peers will be sent the record as a means of caching.
/// If the lookup used a quorum > 1, you may wish to use these
/// candidates with [`Kademlia::put_record_to`] after selecting
/// one of the returned records.
pub cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
}

/// The error result of [`Kademlia::get_record`].
Expand Down Expand Up @@ -2319,17 +2418,32 @@ impl QueryInner {
/// The context of a [`QueryInfo::AddProvider`] query.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum AddProviderContext {
/// The context is a [`Kademlia::start_providing`] operation.
Publish,
/// The context is periodic republishing of provider announcements
/// initiated earlier via [`Kademlia::start_providing`].
Republish,
}

/// The context of a [`QueryInfo::PutRecord`] query.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum PutRecordContext {
/// The context is a [`Kademlia::put_record`] operation.
Publish,
/// The context is periodic republishing of records stored
/// earlier via [`Kademlia::put_record`].
Republish,
/// The context is periodic replication (i.e. without extending
/// the record TTL) of stored records received earlier from another peer.
Replicate,
/// The context is an automatic write-back caching operation of a
/// record found via [`Kademlia::get_record`] at the closest node
/// to the key queried that did not return a record. This only
/// occurs after a lookup quorum of 1 as per standard Kademlia.
Cache,
/// The context is a custom store operation targeting specific
/// peers initiated by [`Kademlia::put_record_to`].
Custom,
}

/// Information about a running query.
Expand Down Expand Up @@ -2389,11 +2503,9 @@ pub enum QueryInfo {
records: Vec<PeerRecord>,
/// The number of records to look for.
quorum: NonZeroUsize,
/// The closest peer to `key` that did not return a record.
///
/// When a record is found in a standard Kademlia query (quorum == 1),
/// it is cached at this peer as soon as a record is found.
cache_at: Option<kbucket::Key<PeerId>>,
/// The peers closest to the `key` that were queried but did not return a record,
/// i.e. the peers that are candidates for caching the record.
cache_candidates: BTreeMap<kbucket::Distance, PeerId>,
},
}

Expand Down
12 changes: 9 additions & 3 deletions protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,9 @@ fn get_record() {

let record = Record::new(random_multihash(), vec![4,5,6]);

swarms[1].store.put(record.clone()).unwrap();
let expected_cache_candidate = *Swarm::local_peer_id(&swarms[1]);

swarms[2].store.put(record.clone()).unwrap();
let qid = swarms[0].get_record(&record.key, Quorum::One);

block_on(
Expand All @@ -653,12 +655,16 @@ fn get_record() {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult {
id,
result: QueryResult::GetRecord(Ok(GetRecordOk { records })),
result: QueryResult::GetRecord(Ok(GetRecordOk {
records, cache_candidates
})),
..
})) => {
assert_eq!(id, qid);
assert_eq!(records.len(), 1);
assert_eq!(records.first().unwrap().record, record);
assert_eq!(cache_candidates.len(), 1);
assert_eq!(cache_candidates.values().next(), Some(&expected_cache_candidate));
return Poll::Ready(());
}
// Ignore any other event.
Expand Down Expand Up @@ -699,7 +705,7 @@ fn get_record_many() {
match swarm.poll_next_unpin(ctx) {
Poll::Ready(Some(KademliaEvent::QueryResult {
id,
result: QueryResult::GetRecord(Ok(GetRecordOk { records })),
result: QueryResult::GetRecord(Ok(GetRecordOk { records, .. })),
..
})) => {
assert_eq!(id, qid);
Expand Down
9 changes: 8 additions & 1 deletion protocols/kad/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,14 @@ mod dht_proto {
}

pub use addresses::Addresses;
pub use behaviour::{Kademlia, KademliaBucketInserts, KademliaConfig, KademliaEvent, Quorum};
pub use behaviour::{
Kademlia,
KademliaBucketInserts,
KademliaConfig,
KademliaCaching,
KademliaEvent,
Quorum
};
pub use behaviour::{
QueryRef,
QueryMut,
Expand Down

0 comments on commit a6269d6

Please sign in to comment.