From a6269d6fddd7848cadd96cd498c32d4a9fddc023 Mon Sep 17 00:00:00 2001 From: Roman Borschel Date: Tue, 9 Mar 2021 18:35:43 +0100 Subject: [PATCH] [kad] Provide a targeted store operation. (#1988) * Add `Kademlia::put_record_to` for storing a record at specific nodes, e.g. for write-back caching after a successful read. In that context, peers that were queried in a successful `Kademlia::get_record` operation but did not return a record are now returned in the `GetRecordOk::no_record` list of peer IDs. Closes https://github.com/libp2p/rust-libp2p/issues/1577. * Update protocols/kad/src/behaviour.rs Co-authored-by: Max Inden * Refine implementation. Rather than returning the peers that are cache candidates in a `Vec` in an arbitrary order, use a `BTreeMap`. Furthermore, make the caching configurable, being enabled by default with `max_peers` of 1. By configuring it with `max_peers` > 1 it is now also possible to control at how many nodes the record is cached automatically after a lookup with quorum 1. When enabled, successful lookups will always return `cache_candidates`, which can be used explicitly with `Kademlia::put_record_to` after lookups with a quorum > 1. * Re-export KademliaCaching * Update protocols/kad/CHANGELOG.md Co-authored-by: Max Inden * Clarify changelog. Co-authored-by: Max Inden --- protocols/kad/CHANGELOG.md | 13 +++ protocols/kad/src/behaviour.rs | 174 +++++++++++++++++++++++----- protocols/kad/src/behaviour/test.rs | 12 +- protocols/kad/src/lib.rs | 9 +- 4 files changed, 173 insertions(+), 35 deletions(-) diff --git a/protocols/kad/CHANGELOG.md b/protocols/kad/CHANGELOG.md index a4a0b05f..fd8ec635 100644 --- a/protocols/kad/CHANGELOG.md +++ b/protocols/kad/CHANGELOG.md @@ -1,5 +1,18 @@ # 0.29.0 [unreleased] +- Add `KademliaCaching` and `KademliaConfig::set_caching` to configure + whether Kademlia should track, in lookups, the closest nodes to a key + that did not return a record, via `GetRecordOk::cache_candidates`. + As before, if a lookup used a quorum of 1, these candidates will + automatically be sent the found record. Otherwise, with a lookup + quorum of > 1, the candidates can be used with `Kademlia::put_record_to` + after selecting one of the return records to cache. As is the current + behaviour, caching is enabled by default with a `max_peers` of 1, i.e. + it only tracks the closest node to the key that did not return a record. + +- Add `Kademlia::put_record_to` for storing a record at specific nodes, + e.g. for write-back caching after a successful read with quorum > 1. + - Update `libp2p-swarm`. # 0.28.1 [2021-02-15] diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index ad267bfa..26557ae8 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -47,8 +47,8 @@ use libp2p_swarm::{ }; use log::{info, debug, warn}; use smallvec::SmallVec; -use std::{borrow::Cow, error, iter, time::Duration}; -use std::collections::{HashSet, VecDeque}; +use std::{borrow::Cow, error, time::Duration}; +use std::collections::{HashSet, VecDeque, BTreeMap}; use std::fmt; use std::num::NonZeroUsize; use std::task::{Context, Poll}; @@ -100,6 +100,9 @@ pub struct Kademlia { /// The currently known addresses of the local node. local_addrs: HashSet, + /// See [`KademliaConfig::caching`]. + caching: KademliaCaching, + /// The record storage. store: TStore, } @@ -143,6 +146,24 @@ pub struct KademliaConfig { provider_publication_interval: Option, connection_idle_timeout: Duration, kbucket_inserts: KademliaBucketInserts, + caching: KademliaCaching, +} + +/// The configuration for Kademlia "write-back" caching after successful +/// lookups via [`Kademlia::get_record`]. +#[derive(Debug, Clone)] +pub enum KademliaCaching { + /// Caching is disabled and the peers closest to records being looked up + /// that do not return a record are not tracked, i.e. + /// [`GetRecordOk::cache_candidates`] is always empty. + Disabled, + /// Up to `max_peers` peers not returning a record that are closest to the key + /// being looked up are tracked and returned in [`GetRecordOk::cache_candidates`]. + /// Furthermore, if [`Kademlia::get_record`] is used with a quorum of 1, the + /// found record is automatically sent to (i.e. cached at) these peers. For lookups with a + /// quorum > 1, the write-back operation must be performed explicitly, if + /// desired and after choosing a record from the results, via [`Kademlia::put_record_to`]. + Enabled { max_peers: u16 }, } impl Default for KademliaConfig { @@ -158,6 +179,7 @@ impl Default for KademliaConfig { provider_record_ttl: Some(Duration::from_secs(24 * 60 * 60)), connection_idle_timeout: Duration::from_secs(10), kbucket_inserts: KademliaBucketInserts::OnConnected, + caching: KademliaCaching::Enabled { max_peers: 1 }, } } } @@ -319,6 +341,17 @@ impl KademliaConfig { self.kbucket_inserts = inserts; self } + + /// Sets the [`KademliaCaching`] strategy to use for successful lookups. + /// + /// The default is [`KademliaCaching::Enabled`] with a `max_peers` of 1. + /// Hence, with default settings and a lookup quorum of 1, a successful lookup + /// will result in the record being cached at the closest node to the key that + /// did not return the record, i.e. the standard Kademlia behaviour. + pub fn set_caching(&mut self, c: KademliaCaching) -> &mut Self { + self.caching = c; + self + } } impl Kademlia @@ -366,7 +399,8 @@ where record_ttl: config.record_ttl, provider_record_ttl: config.provider_record_ttl, connection_idle_timeout: config.connection_idle_timeout, - local_addrs: HashSet::new() + local_addrs: HashSet::new(), + caching: config.caching, } } @@ -589,7 +623,12 @@ where let done = records.len() >= quorum.get(); let target = kbucket::Key::new(key.clone()); - let info = QueryInfo::GetRecord { key: key.clone(), records, quorum, cache_at: None }; + let info = QueryInfo::GetRecord { + key: key.clone(), + records, + quorum, + cache_candidates: BTreeMap::new(), + }; let peers = self.kbuckets.closest_keys(&target); let inner = QueryInner::new(info); let id = self.queries.add_iter_closest(target.clone(), peers, inner); // (*) @@ -602,7 +641,8 @@ where id } - /// Stores a record in the DHT. + /// Stores a record in the DHT, locally as well as at the nodes + /// closest to the key as per the xor distance metric. /// /// Returns `Ok` if a record has been stored locally, providing the /// `QueryId` of the initial query that replicates the record in the DHT. @@ -638,6 +678,54 @@ where Ok(self.queries.add_iter_closest(target.clone(), peers, inner)) } + /// Stores a record at specific peers, without storing it locally. + /// + /// The given [`Quorum`] is understood in the context of the total + /// number of distinct peers given. + /// + /// If the record's expiration is `None`, the configured record TTL is used. + /// + /// > **Note**: This is not a regular Kademlia DHT operation. It may be + /// > used to selectively update or store a record to specific peers + /// > for the purpose of e.g. making sure these peers have the latest + /// > "version" of a record or to "cache" a record at further peers + /// > to increase the lookup success rate on the DHT for other peers. + /// > + /// > In particular, if lookups are performed with a quorum > 1 multiple + /// > possibly different records may be returned and the standard Kademlia + /// > procedure of "caching" (i.e. storing) a found record at the closest + /// > node to the key that _did not_ return it cannot be employed + /// > transparently. In that case, client code can explicitly choose + /// > which record to store at which peers for analogous write-back + /// > caching or for other reasons. + pub fn put_record_to(&mut self, mut record: Record, peers: I, quorum: Quorum) -> QueryId + where + I: ExactSizeIterator + { + let quorum = if peers.len() > 0 { + quorum.eval(NonZeroUsize::new(peers.len()).expect("> 0")) + } else { + // If no peers are given, we just let the query fail immediately + // due to the fact that the quorum must be at least one, instead of + // introducing a new kind of error. + NonZeroUsize::new(1).expect("1 > 0") + }; + record.expires = record.expires.or_else(|| + self.record_ttl.map(|ttl| Instant::now() + ttl)); + let context = PutRecordContext::Custom; + let info = QueryInfo::PutRecord { + context, + record, + quorum, + phase: PutRecordPhase::PutRecord { + success: Vec::new(), + get_closest_peers_stats: QueryStats::empty() + } + }; + let inner = QueryInner::new(info); + self.queries.add_fixed(peers, inner) + } + /// Removes the record with the given key from _local_ storage, /// if the local node is the publisher of the record. /// @@ -1083,10 +1171,10 @@ where } } - QueryInfo::GetRecord { key, records, quorum, cache_at } => { + QueryInfo::GetRecord { key, records, quorum, cache_candidates } => { let results = if records.len() >= quorum.get() { // [not empty] - if let Some(cache_key) = cache_at { - // Cache the record at the closest node to the key that + if quorum.get() == 1 && !cache_candidates.is_empty() { + // Cache the record at the closest node(s) to the key that // did not return the record. let record = records.first().expect("[not empty]").record.clone(); let quorum = NonZeroUsize::new(1).expect("1 > 0"); @@ -1101,9 +1189,9 @@ where } }; let inner = QueryInner::new(info); - self.queries.add_fixed(iter::once(cache_key.into_preimage()), inner); + self.queries.add_fixed(cache_candidates.values().copied(), inner); } - Ok(GetRecordOk { records }) + Ok(GetRecordOk { records, cache_candidates }) } else if records.is_empty() { Err(GetRecordError::NotFound { key, @@ -1153,7 +1241,7 @@ where } }; match context { - PutRecordContext::Publish => + PutRecordContext::Publish | PutRecordContext::Custom => Some(KademliaEvent::QueryResult { id: query_id, stats: get_closest_peers_stats.merge(result.stats), @@ -1252,7 +1340,7 @@ where } }); match context { - PutRecordContext::Publish => + PutRecordContext::Publish | PutRecordContext::Custom => Some(KademliaEvent::QueryResult { id: query_id, stats: result.stats, @@ -1722,7 +1810,7 @@ where } => { if let Some(query) = self.queries.get_mut(&user_data) { if let QueryInfo::GetRecord { - key, records, quorum, cache_at + key, records, quorum, cache_candidates } = &mut query.inner.info { if let Some(record) = record { records.push(PeerRecord{ peer: Some(source), record }); @@ -1744,19 +1832,19 @@ where ); } } - } else if quorum.get() == 1 { - // It is a "standard" Kademlia query, for which the - // closest node to the key that did *not* return the - // value is tracked in order to cache the record on - // that node if the query turns out to be successful. - let source_key = kbucket::Key::from(source); - if let Some(cache_key) = cache_at { - let key = kbucket::Key::new(key.clone()); - if source_key.distance(&key) < cache_key.distance(&key) { - *cache_at = Some(source_key) + } else { + log::trace!("Record with key {:?} not found at {}", key, source); + if let KademliaCaching::Enabled { max_peers } = self.caching { + let source_key = kbucket::Key::from(source); + let target_key = kbucket::Key::from(key.clone()); + let distance = source_key.distance(&target_key); + cache_candidates.insert(distance, source); + if cache_candidates.len() > max_peers as usize { + // TODO: `pop_last()` would be nice once stabilised. + // See https://github.com/rust-lang/rust/issues/62924. + let last = *cache_candidates.keys().next_back().expect("len > 0"); + cache_candidates.remove(&last); } - } else { - *cache_at = Some(source_key) } } } @@ -2063,7 +2151,18 @@ pub type GetRecordResult = Result; /// The successful result of [`Kademlia::get_record`]. #[derive(Debug, Clone)] pub struct GetRecordOk { - pub records: Vec + /// The records found, including the peer that returned them. + pub records: Vec, + /// If caching is enabled, these are the peers closest + /// _to the record key_ (not the local node) that were queried but + /// did not return the record, sorted by distance to the record key + /// from closest to farthest. How many of these are tracked is configured + /// by [`KademliaConfig::set_caching`]. If the lookup used a quorum of + /// 1, these peers will be sent the record as a means of caching. + /// If the lookup used a quorum > 1, you may wish to use these + /// candidates with [`Kademlia::put_record_to`] after selecting + /// one of the returned records. + pub cache_candidates: BTreeMap, } /// The error result of [`Kademlia::get_record`]. @@ -2319,17 +2418,32 @@ impl QueryInner { /// The context of a [`QueryInfo::AddProvider`] query. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum AddProviderContext { + /// The context is a [`Kademlia::start_providing`] operation. Publish, + /// The context is periodic republishing of provider announcements + /// initiated earlier via [`Kademlia::start_providing`]. Republish, } /// The context of a [`QueryInfo::PutRecord`] query. #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum PutRecordContext { + /// The context is a [`Kademlia::put_record`] operation. Publish, + /// The context is periodic republishing of records stored + /// earlier via [`Kademlia::put_record`]. Republish, + /// The context is periodic replication (i.e. without extending + /// the record TTL) of stored records received earlier from another peer. Replicate, + /// The context is an automatic write-back caching operation of a + /// record found via [`Kademlia::get_record`] at the closest node + /// to the key queried that did not return a record. This only + /// occurs after a lookup quorum of 1 as per standard Kademlia. Cache, + /// The context is a custom store operation targeting specific + /// peers initiated by [`Kademlia::put_record_to`]. + Custom, } /// Information about a running query. @@ -2389,11 +2503,9 @@ pub enum QueryInfo { records: Vec, /// The number of records to look for. quorum: NonZeroUsize, - /// The closest peer to `key` that did not return a record. - /// - /// When a record is found in a standard Kademlia query (quorum == 1), - /// it is cached at this peer as soon as a record is found. - cache_at: Option>, + /// The peers closest to the `key` that were queried but did not return a record, + /// i.e. the peers that are candidates for caching the record. + cache_candidates: BTreeMap, }, } diff --git a/protocols/kad/src/behaviour/test.rs b/protocols/kad/src/behaviour/test.rs index 4c23d026..4275d55c 100644 --- a/protocols/kad/src/behaviour/test.rs +++ b/protocols/kad/src/behaviour/test.rs @@ -643,7 +643,9 @@ fn get_record() { let record = Record::new(random_multihash(), vec![4,5,6]); - swarms[1].store.put(record.clone()).unwrap(); + let expected_cache_candidate = *Swarm::local_peer_id(&swarms[1]); + + swarms[2].store.put(record.clone()).unwrap(); let qid = swarms[0].get_record(&record.key, Quorum::One); block_on( @@ -653,12 +655,16 @@ fn get_record() { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(KademliaEvent::QueryResult { id, - result: QueryResult::GetRecord(Ok(GetRecordOk { records })), + result: QueryResult::GetRecord(Ok(GetRecordOk { + records, cache_candidates + })), .. })) => { assert_eq!(id, qid); assert_eq!(records.len(), 1); assert_eq!(records.first().unwrap().record, record); + assert_eq!(cache_candidates.len(), 1); + assert_eq!(cache_candidates.values().next(), Some(&expected_cache_candidate)); return Poll::Ready(()); } // Ignore any other event. @@ -699,7 +705,7 @@ fn get_record_many() { match swarm.poll_next_unpin(ctx) { Poll::Ready(Some(KademliaEvent::QueryResult { id, - result: QueryResult::GetRecord(Ok(GetRecordOk { records })), + result: QueryResult::GetRecord(Ok(GetRecordOk { records, .. })), .. })) => { assert_eq!(id, qid); diff --git a/protocols/kad/src/lib.rs b/protocols/kad/src/lib.rs index 1510bc6e..5932dda3 100644 --- a/protocols/kad/src/lib.rs +++ b/protocols/kad/src/lib.rs @@ -39,7 +39,14 @@ mod dht_proto { } pub use addresses::Addresses; -pub use behaviour::{Kademlia, KademliaBucketInserts, KademliaConfig, KademliaEvent, Quorum}; +pub use behaviour::{ + Kademlia, + KademliaBucketInserts, + KademliaConfig, + KademliaCaching, + KademliaEvent, + Quorum +}; pub use behaviour::{ QueryRef, QueryMut,