Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kademlia Records #1144

Merged
merged 71 commits into from
Jun 4, 2019
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
0875210
initial implementation of the records
montekki May 20, 2019
5db9049
move to multihash keys
montekki May 23, 2019
cb9ca1f
correctly process query results
montekki May 23, 2019
0119163
comments and formatting
montekki May 23, 2019
30ce681
correctly return closer_peers in query
montekki May 23, 2019
60942d5
checking wrong peer id in test
montekki May 23, 2019
40f4063
Apply suggestions from code review
montekki May 23, 2019
5e01cca
Fix changes from suggestions
montekki May 23, 2019
f284630
Send responses to PUT_VALUE requests
montekki May 23, 2019
9b7f2a0
Shortcut in get_value
montekki May 23, 2019
579ce74
Update protocols/kad/src/behaviour.rs
montekki May 23, 2019
3bf187c
Revert "Update protocols/kad/src/behaviour.rs"
montekki May 23, 2019
41bfd39
Remove duplicate insertion
montekki May 23, 2019
ceb608e
Adds a record to a PUT_VALUE response
montekki May 24, 2019
5068d31
Fix a racy put_value test
montekki May 24, 2019
ef75844
Store value ourselves only if we are in K closest
montekki May 24, 2019
eaebf5b
Abstract over storage
montekki May 24, 2019
2f373be
Revert "Abstract over storage": bad take
montekki May 24, 2019
7b0e030
Abstract over records storage using hashmap as default
montekki May 24, 2019
311b7f6
Constructor for custom records
montekki May 24, 2019
ed59d24
New Record type and its traits
montekki May 26, 2019
fc205fc
Fix outdated storage name
montekki May 26, 2019
428cc54
Fixes returning an event
montekki May 27, 2019
1e873e3
Change FindNodeReq key type to Multihash
montekki May 27, 2019
ad7ab91
WriteState for a second stage of a PUT_VALUE request
montekki May 28, 2019
a5efc86
GET_VALUE should not have a record
montekki May 28, 2019
8402b5f
Refactor a match arm
montekki May 28, 2019
a1e3a17
Add successes and failures counters to PutValueRes
montekki May 28, 2019
1d38285
If value is found no need to return closer peers
montekki May 28, 2019
d6c94d6
Remove a custo storage from tests
montekki May 28, 2019
8fa58b8
Rename a test to get_value_not_found
montekki May 28, 2019
5bd1153
Adds a TODO to change FindNode request key to Multihash
montekki May 29, 2019
6bbf93c
Move MemoryRecordStorage to record.rs
montekki May 29, 2019
ec37994
Return a Cow-ed Record from get
montekki May 29, 2019
b1aeb4c
Fix incorrect GET_VALUE parsing
montekki May 29, 2019
abaf76d
Various fixes with review
montekki May 29, 2019
7b3c0e7
Fixes get_value_not_found
montekki May 29, 2019
eadfa20
Fix peerids names in test
montekki May 29, 2019
861ab52
another fix
montekki May 29, 2019
f5ac594
PutValue correctly distributes values
montekki May 31, 2019
b183ccb
Merge branch 'master' into fs-kademlia-session-key-lookups
montekki May 31, 2019
e07d07c
Simplify the test
montekki May 31, 2019
cadc5fa
Check that results are actually the closest
montekki May 31, 2019
b6fbd86
Reverts changes to tests
montekki Jun 1, 2019
db03ec2
Fix the test topology and checking the results
montekki Jun 1, 2019
291d2cb
Run put_value test ten times
montekki Jun 1, 2019
56faceb
Adds a get_value test
montekki Jun 1, 2019
e09481b
Apply suggestions from code review
montekki Jun 1, 2019
ad87d86
Make Record fields public
montekki Jun 1, 2019
e654e11
Moves WriteState to write.rs
montekki Jun 1, 2019
702b047
A couple of minor fixes
montekki Jun 1, 2019
8e9c2d1
Another few fixes of review
montekki Jun 1, 2019
0fd517e
Simplify the put_value test
montekki Jun 1, 2019
dd698be
Dont synchronously return an error from put_value
montekki Jun 1, 2019
a98b526
Formatting fixes and comments
montekki Jun 2, 2019
2515eac
Collect a bunch of results
montekki Jun 2, 2019
cf598c9
Take exactly as much elements as neede
montekki Jun 2, 2019
99a9076
Check if the peer is still connected
montekki Jun 2, 2019
54eeb9e
Adds a multiple GetValueResults results number test
montekki Jun 2, 2019
2bdc078
Unnecessary mut iterators in put_value
montekki Jun 2, 2019
7d3b295
Ask for num_results in get_value
montekki Jun 3, 2019
0b40d40
Dont allocate twice in get_value
montekki Jun 3, 2019
4a3dfbc
Dont count same errored peer multiple times
montekki Jun 3, 2019
b2b6bee
Apply suggestions from code review
montekki Jun 4, 2019
9c2c233
Fix another review
montekki Jun 4, 2019
27084e2
Merge branch 'master' into fs-kademlia-session-key-lookups
tomaka Jun 4, 2019
2434668
Merge branch 'master' into fs-kademlia-session-key-lookups
tomaka Jun 4, 2019
9c56f5c
Apply suggestions from code review
montekki Jun 4, 2019
8a78ae6
Bring back FromIterator and improve a panic message
montekki Jun 4, 2019
58e4f37
Update protocols/kad/src/behaviour.rs
montekki Jun 4, 2019
c3c64ce
Merge branch 'master' into fs-kademlia-session-key-lookups
tomaka Jun 4, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 148 additions & 0 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ pub struct Kademlia<TSubstream> {

/// Marker to pin the generics.
marker: PhantomData<TSubstream>,

/// The records that we keep.
records: FnvHashMap<Multihash, Vec<u8>>
}

/// Opaque type. Each query that we start gets a unique number.
Expand Down Expand Up @@ -131,6 +134,22 @@ enum QueryInfoInner {
/// Which hash we're targetting.
target: Multihash,
},

/// Put the value to the dht records
PutValue {
/// The key of the record being inserted
key: Multihash,
/// The value of the record being inserted
value: Vec<u8>,
},

/// Get value from the dht record
GetValue {
/// The key we're looking for
key: Multihash,
/// The results from peers are stored here
results: Vec<(Multihash, Vec<u8>)>,
},
}

impl Into<kbucket::Key<QueryInfo>> for QueryInfo {
Expand All @@ -146,6 +165,8 @@ impl AsRef<[u8]> for QueryInfo {
QueryInfoInner::FindPeer(peer) => peer.as_ref(),
QueryInfoInner::GetProviders { target, .. } => target.as_bytes(),
QueryInfoInner::AddProvider { target } => target.as_bytes(),
QueryInfoInner::GetValue { key, .. } => key.as_bytes(),
QueryInfoInner::PutValue { key, .. } => key.as_bytes(),
}
}
}
Expand All @@ -170,6 +191,17 @@ impl QueryInfo {
key: unimplemented!(), // TODO: target.clone(),
user_data,
},
QueryInfoInner::GetValue { key, .. } => {
KademliaHandlerIn::GetValue {
key: key.clone(),
user_data,
}
},
QueryInfoInner::PutValue { key, value } => KademliaHandlerIn::PutValue {
montekki marked this conversation as resolved.
Show resolved Hide resolved
key: key.clone(),
value: value.clone(),
user_data,
}
}
}
}
Expand Down Expand Up @@ -261,6 +293,7 @@ impl<TSubstream> Kademlia<TSubstream> {
rpc_timeout: Duration::from_secs(8),
add_provider: SmallVec::new(),
marker: PhantomData,
records: FnvHashMap::default(),
}
}

Expand All @@ -283,6 +316,26 @@ impl<TSubstream> Kademlia<TSubstream> {
self.start_query(QueryInfoInner::GetProviders { target, pending_results: Vec::new() });
}

/// Starts an iterative `GET_VALUE` request.
pub fn get_value(&mut self, key: &Multihash) {
if let Some(value) = self.records.get(key) {
self.queued_events.push(NetworkBehaviourAction::GenerateEvent(
KademliaOut::GetValueResult {
result: Some((key.clone(), value.clone())),
closer_peers: None,
}
));
}
montekki marked this conversation as resolved.
Show resolved Hide resolved
self.start_query(QueryInfoInner::GetValue { key: key.clone(), results: vec![] });
}

/// Starts an iterative `PUT_VALUE` request
pub fn put_value(&mut self, key: Multihash, value: Vec<u8>) {
// TODO: Probably we shouldn't store the value ourselves
montekki marked this conversation as resolved.
Show resolved Hide resolved
self.records.insert(key.clone(), value.clone());
self.start_query(QueryInfoInner::PutValue{key, value});
}

/// Register the local node as the provider for the given key.
///
/// This will periodically send `ADD_PROVIDER` messages to the nodes closest to the key. When
Expand Down Expand Up @@ -623,6 +676,66 @@ where
self.add_provider.push((key, provider_peer.node_id));
return;
}
KademliaHandlerEvent::GetValue { key, request_id } => {
let result = match self.records.get(&key) {
Some(value) => Some((key.clone(), value.clone())),
None => None,
};
let closer_peers = self.find_closest(&kbucket::Key::from(key), &source);
montekki marked this conversation as resolved.
Show resolved Hide resolved

self.queued_events.push(NetworkBehaviourAction::SendEvent {
peer_id: source,
event: KademliaHandlerIn::GetValueRes {
result,
closer_peers,
request_id,
},
});
}
KademliaHandlerEvent::GetValueRes {
result,
closer_peers,
user_data,
} => {
if let Some(query) = self.active_queries.get_mut(&user_data) {
if let QueryInfoInner::GetValue {
key: _,
results
} = &mut query.target_mut().inner {
if let Some(result) = result {
results.push(result);
montekki marked this conversation as resolved.
Show resolved Hide resolved
}
}
// A hacky way to end a query
query.inject_rpc_result(&source, vec![source.clone()]);
montekki marked this conversation as resolved.
Show resolved Hide resolved
montekki marked this conversation as resolved.
Show resolved Hide resolved
}
self.discovered(&user_data, &source, closer_peers.iter());
}
KademliaHandlerEvent::PutValue {
key,
value,
request_id
} => {
self.records.insert(key.clone(), value.clone());

self.queued_events.push(NetworkBehaviourAction::SendEvent {
peer_id: source,
event: KademliaHandlerIn::PutValueRes {
key,
value,
request_id,
},
});
}
KademliaHandlerEvent::PutValueRes {
key: _,
user_data,
} => {
if let Some(query) = self.active_queries.get_mut(&user_data) {
// A hacky way to end a query
query.inject_rpc_result(&source, vec![source.clone()]);
montekki marked this conversation as resolved.
Show resolved Hide resolved
}
}
};
}

Expand Down Expand Up @@ -753,6 +866,27 @@ where
self.queued_events.push(event);
}
},
QueryInfoInner::GetValue { key: _, results } => {
let result = match results.first() {
montekki marked this conversation as resolved.
Show resolved Hide resolved
Some(a) => Some(a.clone()),
None => None,
};

let event = KademliaOut::GetValueResult {
result: result.clone(),
closer_peers: {
match result {
None => Some(closer_peers.collect()),
Some(_) => None,
}},
};

break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
},
QueryInfoInner::PutValue { key, .. } => {
let event = KademliaOut::PutValueResult { key };
break Async::Ready(NetworkBehaviourAction::GenerateEvent(event));
},
}
} else {
break Async::NotReady;
Expand Down Expand Up @@ -802,6 +936,20 @@ pub enum KademliaOut {
/// List of peers ordered from closest to furthest away.
closer_peers: Vec<PeerId>,
},

/// Result of a `GET_VALUE` query
GetValueResult {
/// The result that we have probably received
result: Option<(Multihash, Vec<u8>)>,
montekki marked this conversation as resolved.
Show resolved Hide resolved
/// List of peers ordered from closes to furthest from the key
closer_peers: Option<Vec<PeerId>>,
montekki marked this conversation as resolved.
Show resolved Hide resolved
},

/// Result of a `PUT_VALUE` query
PutValueResult {
/// The key that we were inserting
key: Multihash,
}
}

impl From<kbucket::EntryView<PeerId, Addresses>> for KadPeer {
Expand Down
81 changes: 81 additions & 0 deletions protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use libp2p_yamux as yamux;
use rand::random;
use std::{io, u64};
use tokio::runtime::Runtime;
use multihash::Hash;

type TestSwarm = Swarm<
Boxed<(PeerId, StreamMuxerBox), io::Error>,
Expand Down Expand Up @@ -223,3 +224,83 @@ fn unresponsive_not_returned_indirect() {
}))
.unwrap();
}


#[test]
fn search_for_unknown_value() {
montekki marked this conversation as resolved.
Show resolved Hide resolved
let (port_base, mut swarms) = build_nodes(3);

let peer_ids: Vec<_> = swarms.iter()
.map(|swarm| Swarm::local_peer_id(&swarm).clone()).collect();

swarms[0].add_address(&peer_ids[1], Protocol::Memory(port_base + 1).into());
swarms[1].add_address(&peer_ids[2], Protocol::Memory(port_base + 2).into());

let target_key = multihash::encode(Hash::SHA2256, &vec![1,2,3]).unwrap();
swarms[0].get_value(&target_key);

Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> {
for swarm in &mut swarms {
loop {
match swarm.poll().unwrap() {
Async::Ready(Some(KademliaOut::GetValueResult { result, .. })) => {
montekki marked this conversation as resolved.
Show resolved Hide resolved
assert_eq!(result, None);
return Ok(Async::Ready(()));
}
Async::Ready(_) => (),
Async::NotReady => break,
}
}
}

Ok(Async::NotReady)
}))
.unwrap()
}

#[test]
fn put_value() {
montekki marked this conversation as resolved.
Show resolved Hide resolved
let (port_base, mut swarms) = build_nodes(3);

let peer_ids: Vec<_> = swarms.iter()
.map(|swarm| Swarm::local_peer_id(&swarm).clone()).collect();

swarms[0].add_address(&peer_ids[1], Protocol::Memory(port_base + 1).into());
swarms[1].add_address(&peer_ids[2], Protocol::Memory(port_base + 2).into());

let target_key = multihash::encode(Hash::SHA2256, &vec![1,2,3]).unwrap();

// TODO: This is racy, need to be sequenced
swarms[0].get_value(&target_key);
swarms[1].put_value(target_key.clone(), vec![4,5,6]);

Runtime::new().unwrap().block_on(
future::poll_fn(move || -> Result<_, io::Error> {
for swarm in &mut swarms {
loop {
match swarm.poll().unwrap() {
montekki marked this conversation as resolved.
Show resolved Hide resolved
Async::Ready(Some(KademliaOut::GetValueResult {
result,
closer_peers,
})) => {
assert_ne!(result, None);
assert_eq!(closer_peers, None);
let value = result.unwrap();
assert_eq!(value.0, target_key);
assert_eq!(value.1, vec![4,5,6]);
return Ok(Async::Ready(()));
}
Async::Ready(Some(KademliaOut::PutValueResult { key })) => {
assert_eq!(key, target_key);
},
Async::Ready(_) => (),
Async::NotReady => break,
}
}
}

Ok(Async::NotReady)
}))
.unwrap()
}
Loading