Skip to content

Commit

Permalink
protocols/kad: Implement S-Kademlia's lookup over disjoint paths v2 (#…
Browse files Browse the repository at this point in the history
…1473)

The extension paper S-Kademlia includes a proposal for lookups over
disjoint paths. Within vanilla Kademlia, queries keep track of the
closest nodes in a single bucket. Any adversary along the path can thus
influence all future paths, in case they can come up with the
next-closest (not overall closest) hops. S-Kademlia tries to solve the
attack above by querying over disjoint paths using multiple buckets.

To adjust the libp2p Kademlia implementation accordingly this change-set
introduces an additional peers iterator: `ClosestDisjointPeersIter`.
This new iterator wraps around a set of `ClosestPeersIter`
`ClosestDisjointPeersIter` enforces that each of the `ClosestPeersIter`
explore disjoint paths by having each peer instantly return that was
queried by a different iterator before.
  • Loading branch information
mxinden authored Jun 19, 2020
1 parent 00fc223 commit 9dd2d66
Show file tree
Hide file tree
Showing 10 changed files with 1,432 additions and 120 deletions.
7 changes: 4 additions & 3 deletions examples/distributed-key-value-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ use async_std::{io, task};
use futures::prelude::*;
use libp2p::kad::record::store::MemoryStore;
use libp2p::kad::{
record::Key,
Kademlia,
KademliaEvent,
PeerRecord,
PutRecordOk,
QueryResult,
Quorum,
Record
Record,
record::Key,
};
use libp2p::{
NetworkBehaviour,
Expand Down Expand Up @@ -86,7 +87,7 @@ fn main() -> Result<(), Box<dyn Error>> {
match message {
KademliaEvent::QueryResult { result, .. } => match result {
QueryResult::GetRecord(Ok(ok)) => {
for Record { key, value, .. } in ok.records {
for PeerRecord { record: Record { key, value, .. }, ..} in ok.records {
println!(
"Got record {:?} {:?}",
std::str::from_utf8(key.as_ref()).unwrap(),
Expand Down
1 change: 1 addition & 0 deletions protocols/kad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ unsigned-varint = { version = "0.3", features = ["futures-codec"] }
void = "1.0"

[dev-dependencies]
futures-timer = "3.0"
libp2p-secio = { path = "../secio" }
libp2p-yamux = { path = "../../muxers/yamux" }
quickcheck = "0.9.0"
Expand Down
146 changes: 109 additions & 37 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,9 @@ impl Default for KademliaConfig {
impl KademliaConfig {
/// Sets a custom protocol name.
///
/// Kademlia nodes only communicate with other nodes using the same protocol name. Using a
/// custom name therefore allows to segregate the DHT from others, if that is desired.
/// Kademlia nodes only communicate with other nodes using the same protocol
/// name. Using a custom name therefore allows to segregate the DHT from
/// others, if that is desired.
pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) -> &mut Self {
self.protocol_config.set_protocol_name(name);
self
Expand All @@ -154,10 +155,41 @@ impl KademliaConfig {
self
}

/// Sets the allowed level of parallelism for iterative queries.
///
/// The `α` parameter in the Kademlia paper. The maximum number of peers
/// that an iterative query is allowed to wait for in parallel while
/// iterating towards the closest nodes to a target. Defaults to
/// `ALPHA_VALUE`.
///
/// This only controls the level of parallelism of an iterative query, not
/// the level of parallelism of a query to a fixed set of peers.
///
/// When used with [`KademliaConfig::disjoint_query_paths`] it equals
/// the amount of disjoint paths used.
pub fn set_parallelism(&mut self, parallelism: NonZeroUsize) -> &mut Self {
self.query_config.parallelism = parallelism;
self
}

/// Require iterative queries to use disjoint paths for increased resiliency
/// in the presence of potentially adversarial nodes.
///
/// When enabled the number of disjoint paths used equals the configured
/// parallelism.
///
/// See the S/Kademlia paper for more information on the high level design
/// as well as its security improvements.
pub fn disjoint_query_paths(&mut self, enabled: bool) -> &mut Self {
self.query_config.disjoint_query_paths = enabled;
self
}

/// Sets the TTL for stored records.
///
/// The TTL should be significantly longer than the (re-)publication
/// interval, to avoid premature expiration of records. The default is 36 hours.
/// interval, to avoid premature expiration of records. The default is 36
/// hours.
///
/// `None` means records never expire.
///
Expand Down Expand Up @@ -191,10 +223,10 @@ impl KademliaConfig {

/// Sets the (re-)publication interval of stored records.
///
/// Records persist in the DHT until they expire. By default, published records
/// are re-published in regular intervals for as long as the record exists
/// in the local storage of the original publisher, thereby extending the
/// records lifetime.
/// Records persist in the DHT until they expire. By default, published
/// records are re-published in regular intervals for as long as the record
/// exists in the local storage of the original publisher, thereby extending
/// the records lifetime.
///
/// This interval should be significantly shorter than the record TTL, to
/// ensure records do not expire prematurely. The default is 24 hours.
Expand All @@ -220,7 +252,8 @@ impl KademliaConfig {
/// Sets the interval at which provider records for keys provided
/// by the local node are re-published.
///
/// `None` means that stored provider records are never automatically re-published.
/// `None` means that stored provider records are never automatically
/// re-published.
///
/// Must be significantly less than the provider record TTL.
pub fn set_provider_publication_interval(&mut self, interval: Option<Duration>) -> &mut Self {
Expand All @@ -236,7 +269,8 @@ impl KademliaConfig {

/// Modifies the maximum allowed size of individual Kademlia packets.
///
/// It might be necessary to increase this value if trying to put large records.
/// It might be necessary to increase this value if trying to put large
/// records.
pub fn set_max_packet_size(&mut self, size: usize) -> &mut Self {
self.protocol_config.set_max_packet_size(size);
self
Expand All @@ -247,7 +281,7 @@ impl<TStore> Kademlia<TStore>
where
for<'a> TStore: RecordStore<'a>
{
/// Creates a new `Kademlia` network behaviour with the given configuration.
/// Creates a new `Kademlia` network behaviour with a default configuration.
pub fn new(id: PeerId, store: TStore) -> Self {
Self::with_config(id, store, Default::default())
}
Expand Down Expand Up @@ -430,7 +464,7 @@ where
if record.is_expired(Instant::now()) {
self.store.remove(key)
} else {
records.push(record.into_owned());
records.push(PeerRecord{ peer: None, record: record.into_owned()});
}
}

Expand Down Expand Up @@ -892,15 +926,15 @@ where
if let Some(cache_key) = cache_at {
// Cache the record at the closest node to the key that
// did not return the record.
let record = records.first().expect("[not empty]").clone();
let record = records.first().expect("[not empty]").record.clone();
let quorum = NonZeroUsize::new(1).expect("1 > 0");
let context = PutRecordContext::Cache;
let info = QueryInfo::PutRecord {
context,
record,
quorum,
phase: PutRecordPhase::PutRecord {
num_results: 0,
success: vec![],
get_closest_peers_stats: QueryStats::empty()
}
};
Expand Down Expand Up @@ -934,7 +968,7 @@ where
record,
quorum,
phase: PutRecordPhase::PutRecord {
num_results: 0,
success: vec![],
get_closest_peers_stats: result.stats
}
};
Expand All @@ -947,13 +981,13 @@ where
context,
record,
quorum,
phase: PutRecordPhase::PutRecord { num_results, get_closest_peers_stats }
phase: PutRecordPhase::PutRecord { success, get_closest_peers_stats }
} => {
let mk_result = |key: record::Key| {
if num_results >= quorum.get() {
if success.len() >= quorum.get() {
Ok(PutRecordOk { key })
} else {
Err(PutRecordError::QuorumFailed { key, quorum, num_results })
Err(PutRecordError::QuorumFailed { key, quorum, success })
}
};
match context {
Expand Down Expand Up @@ -1050,9 +1084,9 @@ where
let err = Err(PutRecordError::Timeout {
key: record.key,
quorum,
num_results: match phase {
PutRecordPhase::GetClosestPeers => 0,
PutRecordPhase::PutRecord { num_results, .. } => num_results
success: match phase {
PutRecordPhase::GetClosestPeers => vec![],
PutRecordPhase::PutRecord { ref success, .. } => success.clone(),
}
});
match context {
Expand Down Expand Up @@ -1098,7 +1132,7 @@ where
id: query_id,
stats: result.stats,
result: QueryResult::GetRecord(Err(
GetRecordError::Timeout { key, records, quorum }
GetRecordError::Timeout { key, records, quorum },
))
}),

Expand Down Expand Up @@ -1475,9 +1509,24 @@ where
key, records, quorum, cache_at
} = &mut query.inner.info {
if let Some(record) = record {
records.push(record);
if records.len() >= quorum.get() {
query.finish()
records.push(PeerRecord{ peer: Some(source.clone()), record });

let quorum = quorum.get();
if records.len() >= quorum {
// Desired quorum reached. The query may finish. See
// [`Query::try_finish`] for details.
let peers = records.iter()
.filter_map(|PeerRecord{ peer, .. }| peer.as_ref())
.cloned()
.collect::<Vec<_>>();
let finished = query.try_finish(peers.iter());
if !finished {
debug!(
"GetRecord query ({:?}) reached quorum ({}/{}) with \
response from peer {} but could not yet finish.",
user_data, peers.len(), quorum, source,
);
}
}
} else if quorum.get() == 1 {
// It is a "standard" Kademlia query, for which the
Expand Down Expand Up @@ -1513,11 +1562,21 @@ where
if let Some(query) = self.queries.get_mut(&user_data) {
query.on_success(&source, vec![]);
if let QueryInfo::PutRecord {
phase: PutRecordPhase::PutRecord { num_results, .. }, quorum, ..
phase: PutRecordPhase::PutRecord { success, .. }, quorum, ..
} = &mut query.inner.info {
*num_results += 1;
if *num_results >= quorum.get() {
query.finish()
success.push(source.clone());

let quorum = quorum.get();
if success.len() >= quorum {
let peers = success.clone();
let finished = query.try_finish(peers.iter());
if !finished {
debug!(
"PutRecord query ({:?}) reached quorum ({}/{}) with response \
from peer {} but could not yet finish.",
user_data, peers.len(), quorum, source,
);
}
}
}
}
Expand Down Expand Up @@ -1659,6 +1718,16 @@ impl Quorum {
}
}

/// A record either received by the given peer or retrieved from the local
/// record store.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PeerRecord {
/// The peer from whom the record was received. `None` if the record was
/// retrieved from local storage.
pub peer: Option<PeerId>,
pub record: Record,
}

//////////////////////////////////////////////////////////////////////////////
// Events

Expand Down Expand Up @@ -1742,7 +1811,7 @@ pub type GetRecordResult = Result<GetRecordOk, GetRecordError>;
/// The successful result of [`Kademlia::get_record`].
#[derive(Debug, Clone)]
pub struct GetRecordOk {
pub records: Vec<Record>
pub records: Vec<PeerRecord>
}

/// The error result of [`Kademlia::get_record`].
Expand All @@ -1754,12 +1823,12 @@ pub enum GetRecordError {
},
QuorumFailed {
key: record::Key,
records: Vec<Record>,
records: Vec<PeerRecord>,
quorum: NonZeroUsize
},
Timeout {
key: record::Key,
records: Vec<Record>,
records: Vec<PeerRecord>,
quorum: NonZeroUsize
}
}
Expand Down Expand Up @@ -1799,12 +1868,14 @@ pub struct PutRecordOk {
pub enum PutRecordError {
QuorumFailed {
key: record::Key,
num_results: usize,
/// [`PeerId`]s of the peers the record was successfully stored on.
success: Vec<PeerId>,
quorum: NonZeroUsize
},
Timeout {
key: record::Key,
num_results: usize,
/// [`PeerId`]s of the peers the record was successfully stored on.
success: Vec<PeerId>,
quorum: NonZeroUsize
},
}
Expand Down Expand Up @@ -2061,8 +2132,9 @@ pub enum QueryInfo {
GetRecord {
/// The key to look for.
key: record::Key,
/// The records found so far.
records: Vec<Record>,
/// The records with the id of the peer that returned them. `None` when
/// the record was found in the local store.
records: Vec<PeerRecord>,
/// The number of records to look for.
quorum: NonZeroUsize,
/// The closest peer to `key` that did not return a record.
Expand Down Expand Up @@ -2150,8 +2222,8 @@ pub enum PutRecordPhase {

/// The query is replicating the record to the closest nodes to the key.
PutRecord {
/// The number of successful replication requests so far.
num_results: usize,
/// A list of peers the given record has been successfully replicated to.
success: Vec<PeerId>,
/// Query statistics from the finished `GetClosestPeers` phase.
get_closest_peers_stats: QueryStats,
},
Expand Down
Loading

0 comments on commit 9dd2d66

Please sign in to comment.