Skip to content

Commit

Permalink
kad: FindManyNodes stub
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv committed Apr 19, 2024
1 parent ad3ec06 commit c0b4c8d
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 21 deletions.
15 changes: 10 additions & 5 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,14 +754,19 @@ impl Kademlia {
Some(KademliaCommand::PutRecordToPeers { record, query_id, peers }) => {
tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT to specified peers");

let key = Key::new(record.key.clone());

self.store.put(record.clone());
// Put the record to the specified peers.
let peers = peers.into_iter().filter_map(|peer| {
if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) {
Some(entry.clone())
} else {
None
}
}).collect();

self.engine.start_put_record(
self.engine.start_put_record_to_peers(
query_id,
record,
self.routing_table.closest(key, self.replication_factor).into(),
peers,
);
}
Some(KademliaCommand::GetRecord { key, quorum, query_id }) => {
Expand Down
11 changes: 2 additions & 9 deletions src/protocol/libp2p/kademlia/query/find_many_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,20 @@ use crate::{
PeerId,
};

/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::find_many_nodes";

/// Context for multiple `FIND_NODE` queries.
#[derive(Debug)]
pub struct FindManyNodesContext {
/// Local peer ID.
local_peer_id: PeerId,

/// Query ID.
pub query: QueryId,

/// The peers we are looking for.
pub peers_to_report: Vec<PeerId>,
pub peers_to_report: Vec<KademliaPeer>,
}

impl FindManyNodesContext {
/// Creates a new [`FindManyNodesContext`].
pub fn new(local_peer_id: PeerId, query: QueryId, peers_to_report: Vec<PeerId>) -> Self {
pub fn new(query: QueryId, peers_to_report: Vec<KademliaPeer>) -> Self {
Self {
local_peer_id,
query,
peers_to_report,
}
Expand Down
57 changes: 50 additions & 7 deletions src/protocol/libp2p/kademlia/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ use bytes::Bytes;

use std::collections::{HashMap, VecDeque};

use self::find_many_nodes::FindManyNodesContext;

mod find_many_nodes;
mod find_node;
mod get_record;
mod find_many_nodes;

/// Logging target for the file.
const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query";
Expand Down Expand Up @@ -69,8 +71,8 @@ enum QueryType {
/// Record that needs to be stored.
record: Record,

/// Context for the `FIND_NODE` query
context: FindNodeContext<RecordKey>,
/// Context for finding peers.
context: FindManyNodesContext,
},

/// `GET_VALUE` query.
Expand Down Expand Up @@ -237,6 +239,32 @@ impl QueryEngine {
query_id
}

/// Start `PUT_VALUE` query to specified peers.
pub fn start_put_record_to_peers(
&mut self,
query_id: QueryId,
record: Record,
peers_to_report: Vec<KademliaPeer>,
) -> QueryId {
tracing::debug!(
target: LOG_TARGET,
?query_id,
target = ?record.key,
num_peers = ?peers_to_report.len(),
"start `PUT_VALUE` query to peers"
);

self.queries.insert(
query_id,
QueryType::PutRecordToPeers {
record,
context: FindManyNodesContext::new(query_id, peers_to_report),
},
);

query_id
}

/// Start `GET_VALUE` query.
pub fn start_get_record(
&mut self,
Expand Down Expand Up @@ -290,6 +318,9 @@ impl QueryEngine {
Some(QueryType::PutRecord { context, .. }) => {
context.register_response_failure(peer);
}
Some(QueryType::PutRecordToPeers { context, .. }) => {
context.register_response_failure(peer);
}
Some(QueryType::GetRecord { context }) => {
context.register_response_failure(peer);
}
Expand Down Expand Up @@ -317,6 +348,12 @@ impl QueryEngine {
}
_ => unreachable!(),
},
Some(QueryType::PutRecordToPeers { context, .. }) => match message {
KademliaMessage::FindNode { peers, .. } => {
context.register_response(peer, peers);
}
_ => unreachable!(),
},
Some(QueryType::GetRecord { context }) => match message {
KademliaMessage::GetRecord { record, peers, .. } => {
context.register_response(peer, record, peers);
Expand All @@ -333,11 +370,12 @@ impl QueryEngine {
match self.queries.get_mut(query) {
None => {
tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query");
return None;
None
}
Some(QueryType::FindNode { context }) => return context.next_peer_action(peer),
Some(QueryType::PutRecord { context, .. }) => return context.next_peer_action(peer),
Some(QueryType::GetRecord { context }) => return context.next_peer_action(peer),
Some(QueryType::FindNode { context }) => context.next_peer_action(peer),
Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer),
Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer),
Some(QueryType::GetRecord { context }) => context.next_peer_action(peer),
}
}

Expand All @@ -354,6 +392,10 @@ impl QueryEngine {
record,
peers: context.responses.into_iter().map(|(_, peer)| peer).collect::<Vec<_>>(),
},
QueryType::PutRecordToPeers { record, context } => QueryAction::PutRecordToFoundNodes {
record,
peers: context.peers_to_report,
},
QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone {
query_id: context.query,
record: context.found_record(),
Expand All @@ -375,6 +417,7 @@ impl QueryEngine {
let action = match state {
QueryType::FindNode { context } => context.next_action(),
QueryType::PutRecord { context, .. } => context.next_action(),
QueryType::PutRecordToPeers { context, .. } => context.next_action(),
QueryType::GetRecord { context } => context.next_action(),
};

Expand Down

0 comments on commit c0b4c8d

Please sign in to comment.