From c5238d6a8be390df95c3f90819b4680029f1ab50 Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Fri, 19 Apr 2024 16:58:20 +0300 Subject: [PATCH 1/2] kad: Implement put_record_to and try_put_record_to (#77) This PR implements the `put_record_to` and `try_put_record_to` to selectively pick peers to update their records. The main use-case from substrate would be the following: - A peer is discovered to have an outdated authority record (needs https://github.com/paritytech/litep2p/pull/76) - Update the record with the latest authority record available (part of this PR) This PR provided peers to the engine if the peers are part of the kBucket. The first step of the discovery in substrate motivates this assumption. We can probably do things a bit more optimally since we know the peers part of the kBucket were discovered previously (or currently connected): - The query starts with a [FindNodeContext](https://github.com/paritytech/litep2p/blob/96e827b54f9f937c6d0489bef6a438b48cf50e58/src/protocol/libp2p/kademlia/query/find_node.rs#L37), which in this case will do a peer discovery as well - We could implement a `PutNodeContext` which circumvents the need to discover the peers and just forwards a kad `PUT_VALUE` to those peers We'd have to double check that with libp2p as well (my brief looking over code points to this direction). To unblock https://github.com/paritytech/polkadot-sdk/pull/3786 we can merge this and then come back with a better / optimal solution for this Builds on top of: https://github.com/paritytech/litep2p/pull/76 cc @paritytech/networking --------- Signed-off-by: Alexandru Vasile --- src/protocol/libp2p/kademlia/handle.rs | 47 ++++++++++++++ src/protocol/libp2p/kademlia/mod.rs | 21 +++++- .../libp2p/kademlia/query/find_many_nodes.rs | 64 +++++++++++++++++++ src/protocol/libp2p/kademlia/query/mod.rs | 61 ++++++++++++++++-- src/protocol/libp2p/kademlia/routing_table.rs | 2 +- 5 files changed, 189 insertions(+), 6 deletions(-) create mode 100644 src/protocol/libp2p/kademlia/query/find_many_nodes.rs diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index d2e3988d..6d693cdb 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -90,6 +90,20 @@ pub(crate) enum KademliaCommand { query_id: QueryId, }, + /// Store record to DHT to the given peers. + /// + /// Similar to [`KademliaCommand::PutRecord`] but allows user to specify the peers. + PutRecordToPeers { + /// Record. + record: Record, + + /// Query ID for the query. + query_id: QueryId, + + /// Use the following peers for the put request. + peers: Vec, + }, + /// Get record from DHT. GetRecord { /// Record key. @@ -207,6 +221,21 @@ impl KademliaHandle { query_id } + /// Store record to DHT to the given peers. + pub async fn put_record_to_peers(&mut self, record: Record, peers: Vec) -> QueryId { + let query_id = self.next_query_id(); + let _ = self + .cmd_tx + .send(KademliaCommand::PutRecordToPeers { + record, + query_id, + peers, + }) + .await; + + query_id + } + /// Get record from DHT. pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId { let query_id = self.next_query_id(); @@ -247,6 +276,24 @@ impl KademliaHandle { .map_err(|_| ()) } + /// Try to initiate `PUT_VALUE` query to the given peers and if the channel is clogged, + /// return an error. + pub fn try_put_record_to_peers( + &mut self, + record: Record, + peers: Vec, + ) -> Result { + let query_id = self.next_query_id(); + self.cmd_tx + .try_send(KademliaCommand::PutRecordToPeers { + record, + query_id, + peers, + }) + .map(|_| query_id) + .map_err(|_| ()) + } + /// Try to initiate `GET_VALUE` query and if the channel is clogged, return an error. pub fn try_get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result { let query_id = self.next_query_id(); diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index e3cee89c..ed21dfb4 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -741,15 +741,34 @@ impl Kademlia { Some(KademliaCommand::PutRecord { record, query_id }) => { tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT"); - self.store.put(record.clone()); let key = Key::new(record.key.clone()); + self.store.put(record.clone()); + self.engine.start_put_record( query_id, record, self.routing_table.closest(key, self.replication_factor).into(), ); } + Some(KademliaCommand::PutRecordToPeers { record, query_id, peers }) => { + tracing::debug!(target: LOG_TARGET, ?query_id, key = ?record.key, "store record to DHT to specified peers"); + + // Put the record to the specified peers. + let peers = peers.into_iter().filter_map(|peer| { + match self.routing_table.entry(Key::from(peer)) { + KBucketEntry::Occupied(entry) => Some(entry.clone()), + KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() => Some(entry.clone()), + _ => None, + } + }).collect(); + + self.engine.start_put_record_to_peers( + query_id, + record, + peers, + ); + } Some(KademliaCommand::GetRecord { key, quorum, query_id }) => { tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT"); diff --git a/src/protocol/libp2p/kademlia/query/find_many_nodes.rs b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs new file mode 100644 index 00000000..3bc04a55 --- /dev/null +++ b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs @@ -0,0 +1,64 @@ +// Copyright 2023 litep2p developers +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the "Software"), +// to deal in the Software without restriction, including without limitation +// the rights to use, copy, modify, merge, publish, distribute, sublicense, +// and/or sell copies of the Software, and to permit persons to whom the +// Software is furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use crate::{ + protocol::libp2p::kademlia::{ + query::{QueryAction, QueryId}, + types::KademliaPeer, + }, + PeerId, +}; + +/// Context for multiple `FIND_NODE` queries. +// TODO: implement https://github.com/paritytech/litep2p/issues/80. +#[derive(Debug)] +pub struct FindManyNodesContext { + /// Query ID. + pub query: QueryId, + + /// The peers we are looking for. + pub peers_to_report: Vec, +} + +impl FindManyNodesContext { + /// Creates a new [`FindManyNodesContext`]. + pub fn new(query: QueryId, peers_to_report: Vec) -> Self { + Self { + query, + peers_to_report, + } + } + + /// Register response failure for `peer`. + pub fn register_response_failure(&mut self, _peer: PeerId) {} + + /// Register `FIND_NODE` response from `peer`. + pub fn register_response(&mut self, _peer: PeerId, _peers: Vec) {} + + /// Get next action for `peer`. + pub fn next_peer_action(&mut self, _peer: &PeerId) -> Option { + None + } + + /// Get next action for a `FIND_NODE` query. + pub fn next_action(&mut self) -> Option { + return Some(QueryAction::QuerySucceeded { query: self.query }); + } +} diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index 71f47036..4bba3e1b 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -33,6 +33,9 @@ use bytes::Bytes; use std::collections::{HashMap, VecDeque}; +use self::find_many_nodes::FindManyNodesContext; + +mod find_many_nodes; mod find_node; mod get_record; @@ -63,6 +66,15 @@ enum QueryType { context: FindNodeContext, }, + /// `PUT_VALUE` query to specified peers. + PutRecordToPeers { + /// Record that needs to be stored. + record: Record, + + /// Context for finding peers. + context: FindManyNodesContext, + }, + /// `GET_VALUE` query. GetRecord { /// Context for the `GET_VALUE` query. @@ -227,6 +239,32 @@ impl QueryEngine { query_id } + /// Start `PUT_VALUE` query to specified peers. + pub fn start_put_record_to_peers( + &mut self, + query_id: QueryId, + record: Record, + peers_to_report: Vec, + ) -> QueryId { + tracing::debug!( + target: LOG_TARGET, + ?query_id, + target = ?record.key, + num_peers = ?peers_to_report.len(), + "start `PUT_VALUE` query to peers" + ); + + self.queries.insert( + query_id, + QueryType::PutRecordToPeers { + record, + context: FindManyNodesContext::new(query_id, peers_to_report), + }, + ); + + query_id + } + /// Start `GET_VALUE` query. pub fn start_get_record( &mut self, @@ -280,6 +318,9 @@ impl QueryEngine { Some(QueryType::PutRecord { context, .. }) => { context.register_response_failure(peer); } + Some(QueryType::PutRecordToPeers { context, .. }) => { + context.register_response_failure(peer); + } Some(QueryType::GetRecord { context }) => { context.register_response_failure(peer); } @@ -307,6 +348,12 @@ impl QueryEngine { } _ => unreachable!(), }, + Some(QueryType::PutRecordToPeers { context, .. }) => match message { + KademliaMessage::FindNode { peers, .. } => { + context.register_response(peer, peers); + } + _ => unreachable!(), + }, Some(QueryType::GetRecord { context }) => match message { KademliaMessage::GetRecord { record, peers, .. } => { context.register_response(peer, record, peers); @@ -323,11 +370,12 @@ impl QueryEngine { match self.queries.get_mut(query) { None => { tracing::trace!(target: LOG_TARGET, ?query, ?peer, "response failure for a stale query"); - return None; + None } - Some(QueryType::FindNode { context }) => return context.next_peer_action(peer), - Some(QueryType::PutRecord { context, .. }) => return context.next_peer_action(peer), - Some(QueryType::GetRecord { context }) => return context.next_peer_action(peer), + Some(QueryType::FindNode { context }) => context.next_peer_action(peer), + Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer), + Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer), + Some(QueryType::GetRecord { context }) => context.next_peer_action(peer), } } @@ -344,6 +392,10 @@ impl QueryEngine { record, peers: context.responses.into_iter().map(|(_, peer)| peer).collect::>(), }, + QueryType::PutRecordToPeers { record, context } => QueryAction::PutRecordToFoundNodes { + record, + peers: context.peers_to_report, + }, QueryType::GetRecord { context } => QueryAction::GetRecordQueryDone { query_id: context.query, record: context.found_record(), @@ -365,6 +417,7 @@ impl QueryEngine { let action = match state { QueryType::FindNode { context } => context.next_action(), QueryType::PutRecord { context, .. } => context.next_action(), + QueryType::PutRecordToPeers { context, .. } => context.next_action(), QueryType::GetRecord { context } => context.next_action(), }; diff --git a/src/protocol/libp2p/kademlia/routing_table.rs b/src/protocol/libp2p/kademlia/routing_table.rs index 28dd251e..0077c861 100644 --- a/src/protocol/libp2p/kademlia/routing_table.rs +++ b/src/protocol/libp2p/kademlia/routing_table.rs @@ -176,7 +176,7 @@ impl RoutingTable { } } - /// Get `limit` closests peers to `target` from the k-buckets. + /// Get `limit` closest peers to `target` from the k-buckets. pub fn closest(&mut self, target: Key, limit: usize) -> Vec { ClosestBucketsIter::new(self.local_key.distance(&target)) .map(|index| self.buckets[index.get()].closest_iter(&target)) From 53174df032327d6a84de8c83e1881e072a9343bc Mon Sep 17 00:00:00 2001 From: Alexandru Vasile <60601340+lexnv@users.noreply.github.com> Date: Fri, 19 Apr 2024 17:10:08 +0300 Subject: [PATCH 2/2] Update src/protocol/libp2p/kademlia/query/find_many_nodes.rs Co-authored-by: Dmitry Markin --- src/protocol/libp2p/kademlia/query/find_many_nodes.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/protocol/libp2p/kademlia/query/find_many_nodes.rs b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs index 3bc04a55..ac61e93c 100644 --- a/src/protocol/libp2p/kademlia/query/find_many_nodes.rs +++ b/src/protocol/libp2p/kademlia/query/find_many_nodes.rs @@ -27,7 +27,8 @@ use crate::{ }; /// Context for multiple `FIND_NODE` queries. -// TODO: implement https://github.com/paritytech/litep2p/issues/80. +// TODO: implement finding nodes not present in the routing table, +// see https://github.com/paritytech/litep2p/issues/80. #[derive(Debug)] pub struct FindManyNodesContext { /// Query ID.