From c0b4c8dccdf5b35c366ee92236481d89a41e3400 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile Date: Fri, 19 Apr 2024 15:24:47 +0300 Subject: [PATCH] kad: FindManyNodes stub Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/mod.rs | 15 +++-- .../libp2p/kademlia/query/find_many_nodes.rs | 11 +--- src/protocol/libp2p/kademlia/query/mod.rs | 57 ++++++++++++++++--- 3 files changed, 62 insertions(+), 21 deletions(-) diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index eb96e159..f6f34815 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -754,14 +754,19 @@ impl Kademlia { Some(KademliaCommand::PutRecordToPeers { record, query_id, peers }) => { tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT to specified peers"); - let key = Key::new(record.key.clone()); - - self.store.put(record.clone()); + // Put the record to the specified peers. + let peers = peers.into_iter().filter_map(|peer| { + if let KBucketEntry::Occupied(entry) = self.routing_table.entry(Key::from(peer)) { + Some(entry.clone()) + } else { + None + } + }).collect(); - self.engine.start_put_record( + self.engine.start_put_record_to_peers( query_id, record, - self.routing_table.closest(key, self.replication_factor).into(), + peers, ); } Some(KademliaCommand::GetRecord { key, quorum, query_id }) => { diff --git a/src/protocol/libp2p/kademlia/query/find_many_nodes.rs b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs index cc0f3de8..5ac43baf 100644 --- a/src/protocol/libp2p/kademlia/query/find_many_nodes.rs +++ b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs @@ -26,27 +26,20 @@ use crate::{ PeerId, }; -/// Logging target for the file. -const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query::find_many_nodes"; - /// Context for multiple `FIND_NODE` queries. #[derive(Debug)] pub struct FindManyNodesContext { - /// Local peer ID. - local_peer_id: PeerId, - /// Query ID. pub query: QueryId, /// The peers we are looking for. - pub peers_to_report: Vec, + pub peers_to_report: Vec, } impl FindManyNodesContext { /// Creates a new [`FindManyNodesContext`]. - pub fn new(local_peer_id: PeerId, query: QueryId, peers_to_report: Vec) -> Self { + pub fn new(query: QueryId, peers_to_report: Vec) -> Self { Self { - local_peer_id, query, peers_to_report, } diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index d3fbc052..4bba3e1b 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -33,9 +33,11 @@ use bytes::Bytes; use std::collections::{HashMap, VecDeque}; +use self::find_many_nodes::FindManyNodesContext; + +mod find_many_nodes; mod find_node; mod get_record; -mod find_many_nodes; /// Logging target for the file. const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query"; @@ -69,8 +71,8 @@ enum QueryType { /// Record that needs to be stored. record: Record, - /// Context for the `FIND_NODE` query - context: FindNodeContext, + /// Context for finding peers. + context: FindManyNodesContext, }, /// `GET_VALUE` query. @@ -237,6 +239,32 @@ impl QueryEngine { query_id } + /// Start `PUT_VALUE` query to specified peers. + pub fn start_put_record_to_peers( + &mut self, + query_id: QueryId, + record: Record, + peers_to_report: Vec, + ) -> QueryId { + tracing::debug!( + target: LOG_TARGET, + ?query_id, + target = ?record.key, + num_peers = ?peers_to_report.len(), + "start `PUT_VALUE` query to peers" + ); + + self.queries.insert( + query_id, + QueryType::PutRecordToPeers { + record, + context: FindManyNodesContext::new(query_id, peers_to_report), + }, + ); + + query_id + } + /// Start `GET_VALUE` query. pub fn start_get_record( &mut self, @@ -290,6 +318,9 @@ impl QueryEngine { Some(QueryType::PutRecord { context, .. }) => { context.register_response_failure(peer); } + Some(QueryType::PutRecordToPeers { context, .. }) => { + context.register_response_failure(peer); + } Some(QueryType::GetRecord { context }) => { context.register_response_failure(peer); } @@ -317,6 +348,12 @@ impl QueryEngine { } _ => unreachable!(), }, + Some(QueryType::PutRecordToPeers { context, .. }) => match message { + KademliaMessage::FindNode { peers, .. } => { + context.register_response(peer, peers); + } + _ => unreachable!(), + }, Some(QueryType::GetRecord { context }) => match message { KademliaMessage::GetRecord { record, peers, .. } => { context.register_response(peer, record, peers); @@ -333,11 +370,12 @@ impl QueryEngine { match self.queries.get_mut(query) { None => { tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query"); - return None; + None } - Some(QueryType::FindNode { context }) => return context.next_peer_action(peer), - Some(QueryType::PutRecord { context, .. }) => return context.next_peer_action(peer), - Some(QueryType::GetRecord { context }) => return context.next_peer_action(peer), + Some(QueryType::FindNode { context }) => context.next_peer_action(peer), + Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer), + Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer), + Some(QueryType::GetRecord { context }) => context.next_peer_action(peer), } } @@ -354,6 +392,10 @@ impl QueryEngine { record, peers: context.responses.into_iter().map(|(_, peer)| peer).collect::>(), }, + QueryType::PutRecordToPeers { record, context } => QueryAction::PutRecordToFoundNodes { + record, + peers: context.peers_to_report, + }, QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone { query_id: context.query, record: context.found_record(), @@ -375,6 +417,7 @@ impl QueryEngine { let action = match state { QueryType::FindNode { context } => context.next_action(), QueryType::PutRecord { context, .. } => context.next_action(), + QueryType::PutRecordToPeers { context, .. } => context.next_action(), QueryType::GetRecord { context } => context.next_action(), };