Skip to content

Commit

Permalink
kad: Add put records to peers command
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv committed Apr 19, 2024
1 parent 99e88d8 commit ad3ec06
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 42 deletions.
44 changes: 24 additions & 20 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,20 @@ pub(crate) enum KademliaCommand {

/// Query ID for the query.
query_id: QueryId,
},

/// Store record to DHT to the given peers.
///
/// Similar to [`KademliaCommand::PutRecord`] but allows user to specify the peers.
PutRecordToPeers {
/// Record.
record: Record,

/// Query ID for the query.
query_id: QueryId,

/// Use the following peers for the put request.
peers: Option<Vec<PeerId>>,
peers: Vec<PeerId>,
},

/// Get record from DHT.
Expand Down Expand Up @@ -205,27 +216,20 @@ impl KademliaHandle {
/// Store record to DHT.
pub async fn put_record(&mut self, record: Record) -> QueryId {
let query_id = self.next_query_id();
let _ = self
.cmd_tx
.send(KademliaCommand::PutRecord {
record,
query_id,
peers: None,
})
.await;
let _ = self.cmd_tx.send(KademliaCommand::PutRecord { record, query_id }).await;

query_id
}

/// Store record to DHT to the given peers.
pub async fn put_record_to(&mut self, record: Record, peers: Vec<PeerId>) -> QueryId {
pub async fn put_record_to_peers(&mut self, record: Record, peers: Vec<PeerId>) -> QueryId {
let query_id = self.next_query_id();
let _ = self
.cmd_tx
.send(KademliaCommand::PutRecord {
.send(KademliaCommand::PutRecordToPeers {
record,
query_id,
peers: Some(peers),
peers,
})
.await;

Expand Down Expand Up @@ -267,24 +271,24 @@ impl KademliaHandle {
pub fn try_put_record(&mut self, record: Record) -> Result<QueryId, ()> {
let query_id = self.next_query_id();
self.cmd_tx
.try_send(KademliaCommand::PutRecord {
record,
query_id,
peers: None,
})
.try_send(KademliaCommand::PutRecord { record, query_id })
.map(|_| query_id)
.map_err(|_| ())
}

/// Try to initiate `PUT_VALUE` query to the given peers and if the channel is clogged,
/// return an error.
pub fn try_put_record_to(&mut self, record: Record, peers: Vec<PeerId>) -> Result<QueryId, ()> {
pub fn try_put_record_to_peers(
&mut self,
record: Record,
peers: Vec<PeerId>,
) -> Result<QueryId, ()> {
let query_id = self.next_query_id();
self.cmd_tx
.try_send(KademliaCommand::PutRecord {
.try_send(KademliaCommand::PutRecordToPeers {
record,
query_id,
peers: Some(peers),
peers,
})
.map(|_| query_id)
.map_err(|_| ())
Expand Down
43 changes: 21 additions & 22 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use futures::StreamExt;
use multiaddr::Multiaddr;
use tokio::sync::mpsc::{Receiver, Sender};

use std::collections::{hash_map::Entry, HashMap, VecDeque};
use std::collections::{hash_map::Entry, HashMap};

pub use config::{Config, ConfigBuilder};
pub use handle::{KademliaEvent, KademliaHandle, Quorum, RoutingTableUpdateMode};
Expand Down Expand Up @@ -738,32 +738,31 @@ impl Kademlia {
self.routing_table.closest(Key::from(peer), self.replication_factor).into()
);
}
Some(KademliaCommand::PutRecord { record, query_id, peers }) => {
Some(KademliaCommand::PutRecord { record, query_id }) => {
tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT");

let key = Key::new(record.key.clone());

if let Some(peers) = peers {
// Put the record to the specified peers.
let peers = peers.into_iter().filter_map(|peer| {
match self.routing_table.entry(Key::from(peer)) {
KBucketEntry::Occupied(entry) => Some(entry.clone()),
_ => None,
}
}).collect();

if let Err(error) = self.on_query_action(QueryAction::PutRecordToFoundNodes { record, peers }).await {
tracing::debug!(target: LOG_TARGET, ?error, "failed to put record to predefined peers");
}
} else {
self.store.put(record.clone());
self.store.put(record.clone());

self.engine.start_put_record(
query_id,
record,
self.routing_table.closest(key, self.replication_factor).into(),
);
}
self.engine.start_put_record(
query_id,
record,
self.routing_table.closest(key, self.replication_factor).into(),
);
}
Some(KademliaCommand::PutRecordToPeers { record, query_id, peers }) => {
tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT to specified peers");

let key = Key::new(record.key.clone());

self.store.put(record.clone());

self.engine.start_put_record(
query_id,
record,
self.routing_table.closest(key, self.replication_factor).into(),
);
}
Some(KademliaCommand::GetRecord { key, quorum, query_id }) => {
tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT");
Expand Down
70 changes: 70 additions & 0 deletions src/protocol/libp2p/kademlia/query/find_many_nodes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2023 litep2p developers
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::{
protocol::libp2p::kademlia::{
query::{QueryAction, QueryId},
types::KademliaPeer,
},
PeerId,
};

/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::find_many_nodes";

/// Context for multiple `FIND_NODE` queries.
#[derive(Debug)]
pub struct FindManyNodesContext {
/// Local peer ID.
local_peer_id: PeerId,

/// Query ID.
pub query: QueryId,

/// The peers we are looking for.
pub peers_to_report: Vec<PeerId>,
}

impl FindManyNodesContext {
/// Creates a new [`FindManyNodesContext`].
pub fn new(local_peer_id: PeerId, query: QueryId, peers_to_report: Vec<PeerId>) -> Self {
Self {
local_peer_id,
query,
peers_to_report,
}
}

/// Register response failure for `peer`.
pub fn register_response_failure(&mut self, _peer: PeerId) {}

/// Register `FIND_NODE` response from `peer`.
pub fn register_response(&mut self, _peer: PeerId, _peers: Vec<KademliaPeer>) {}

/// Get next action for `peer`.
pub fn next_peer_action(&mut self, _peer: &PeerId) -> Option<QueryAction> {
None
}

/// Get next action for a `FIND_NODE` query.
pub fn next_action(&mut self) -> Option<QueryAction> {
return Some(QueryAction::QuerySucceeded { query: self.query });
}
}
10 changes: 10 additions & 0 deletions src/protocol/libp2p/kademlia/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use std::collections::{HashMap, VecDeque};

mod find_node;
mod get_record;
mod find_many_nodes;

/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query";
Expand Down Expand Up @@ -63,6 +64,15 @@ enum QueryType {
context: FindNodeContext<RecordKey>,
},

/// `PUT_VALUE` query to specified peers.
PutRecordToPeers {
/// Record that needs to be stored.
record: Record,

/// Context for the `FIND_NODE` query
context: FindNodeContext<RecordKey>,
},

/// `GET_VALUE` query.
GetRecord {
/// Context for the `GET_VALUE` query.
Expand Down

0 comments on commit ad3ec06

Please sign in to comment.