Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kad: Implement put_record_to and try_put_record_to #77

Merged
merged 11 commits into from
Apr 19, 2024
47 changes: 45 additions & 2 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ pub(crate) enum KademliaCommand {

/// Query ID for the query.
query_id: QueryId,

/// Use the following peers for the put request.
peers: Option<Vec<PeerId>>,
},

/// Get record from DHT.
Expand Down Expand Up @@ -202,7 +205,29 @@ impl KademliaHandle {
/// Store record to DHT.
pub async fn put_record(&mut self, record: Record) -> QueryId {
let query_id = self.next_query_id();
let _ = self.cmd_tx.send(KademliaCommand::PutRecord { record, query_id }).await;
let _ = self
.cmd_tx
.send(KademliaCommand::PutRecord {
record,
query_id,
peers: None,
})
.await;

query_id
}

/// Store record to DHT to the given peers.
pub async fn put_record_to(&mut self, record: Record, peers: Vec<PeerId>) -> QueryId {
let query_id = self.next_query_id();
let _ = self
.cmd_tx
.send(KademliaCommand::PutRecord {
record,
query_id,
peers: Some(peers),
})
.await;

query_id
}
Expand Down Expand Up @@ -242,7 +267,25 @@ impl KademliaHandle {
pub fn try_put_record(&mut self, record: Record) -> Result<QueryId, ()> {
let query_id = self.next_query_id();
self.cmd_tx
.try_send(KademliaCommand::PutRecord { record, query_id })
.try_send(KademliaCommand::PutRecord {
record,
query_id,
peers: None,
})
.map(|_| query_id)
.map_err(|_| ())
}

/// Try to initiate `PUT_VALUE` query to the given peers and if the channel is clogged,
/// return an error.
pub fn try_put_record_to(&mut self, record: Record, peers: Vec<PeerId>) -> Result<QueryId, ()> {
let query_id = self.next_query_id();
self.cmd_tx
.try_send(KademliaCommand::PutRecord {
record,
query_id,
peers: Some(peers),
})
.map(|_| query_id)
.map_err(|_| ())
}
Expand Down
34 changes: 26 additions & 8 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use futures::StreamExt;
use multiaddr::Multiaddr;
use tokio::sync::mpsc::{Receiver, Sender};

use std::collections::{hash_map::Entry, HashMap};
use std::collections::{hash_map::Entry, HashMap, VecDeque};

pub use config::{Config, ConfigBuilder};
pub use handle::{KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode};
Expand Down Expand Up @@ -738,17 +738,35 @@ impl Kademlia {
self.routing_table.closest(Key::from(peer), self.replication_factor).into()
);
}
Some(KademliaCommand::PutRecord { record, query_id }) => {
Some(KademliaCommand::PutRecord { record, query_id, peers }) => {
tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT");

self.store.put(record.clone());
let key = Key::new(record.key.clone());

self.engine.start_put_record(
query_id,
record,
self.routing_table.closest(key, self.replication_factor).into(),
);
if let Some(peers) = peers {
lexnv marked this conversation as resolved.
Show resolved Hide resolved
// Put the record to the specified peers.
let peers = peers.into_iter().filter_map(|peer| {
match self.routing_table.entry(Key::from(peer)) {
// The routing table contains information about the peer address when:
// - Occupied: Established connection
// - Vacant: We'll try to establish the connection later, but the address is known.
lexnv marked this conversation as resolved.
Show resolved Hide resolved
KBucketEntry::Occupied(entry) | KBucketEntry::Vacant(entry) => Some(entry.clone()),
_ => None,
}
}).collect();

if let Err(error) = self.on_query_action(QueryAction::PutRecordToFoundNodes { record, peers }).await {
lexnv marked this conversation as resolved.
Show resolved Hide resolved
tracing::debug!(target: LOG_TARGET, ?error, "failed to put record to predefined peers");
}
} else {
self.store.put(record.clone());

self.engine.start_put_record(
query_id,
record,
self.routing_table.closest(key, self.replication_factor).into(),
);
}
}
Some(KademliaCommand::GetRecord { key, quorum, query_id }) => {
tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT");
Expand Down
2 changes: 1 addition & 1 deletion src/protocol/libp2p/kademlia/routing_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl RoutingTable {
}
}

/// Get `limit` closests peers to `target` from the k-buckets.
/// Get `limit` closest peers to `target` from the k-buckets.
pub fn closest<K: Clone>(&mut self, target: Key<K>, limit: usize) -> Vec<KademliaPeer> {
ClosestBucketsIter::new(self.local_key.distance(&target))
.map(|index| self.buckets[index.get()].closest_iter(&target))
Expand Down
Loading