diff --git a/src/protocol/libp2p/kademlia/handle.rs b/src/protocol/libp2p/kademlia/handle.rs index 5d3b4630..f1b4c218 100644 --- a/src/protocol/libp2p/kademlia/handle.rs +++ b/src/protocol/libp2p/kademlia/handle.rs @@ -130,6 +130,18 @@ pub(crate) enum KademliaCommand { query_id: QueryId, }, + /// Register as a content provider for `key`. + StartProviding { + /// Provided key. + key: RecordKey, + + /// Our external addresses to publish. + public_addresses: Vec, + + /// Query ID for the query. + query_id: QueryId, + }, + /// Store record locally. StoreRecord { // Record. @@ -175,7 +187,8 @@ pub enum KademliaEvent { }, /// `PUT_VALUE` query succeeded. - PutRecordSucess { + // TODO: this is never emitted. Implement + add `AddProviderSuccess`. + PutRecordSuccess { /// Query ID. query_id: QueryId, @@ -299,6 +312,27 @@ impl KademliaHandle { query_id } + /// Register as a content provider on the DHT. + /// + /// Register the local peer ID & its `public_addresses` as a provider for a given `key`. + pub async fn start_providing( + &mut self, + key: RecordKey, + public_addresses: Vec, + ) -> QueryId { + let query_id = self.next_query_id(); + let _ = self + .cmd_tx + .send(KademliaCommand::StartProviding { + key, + public_addresses, + query_id, + }) + .await; + + query_id + } + /// Store the record in the local store. Used in combination with /// [`IncomingRecordValidationMode::Manual`]. pub async fn store_record(&mut self, record: Record) { diff --git a/src/protocol/libp2p/kademlia/message.rs b/src/protocol/libp2p/kademlia/message.rs index 4f53fbc1..bba2b285 100644 --- a/src/protocol/libp2p/kademlia/message.rs +++ b/src/protocol/libp2p/kademlia/message.rs @@ -172,8 +172,7 @@ impl KademliaMessage { } /// Create `ADD_PROVIDER` message with `provider`. - #[allow(unused)] - pub fn add_provider(provider: ProviderRecord) -> Vec { + pub fn add_provider(provider: ProviderRecord) -> Bytes { let peer = KademliaPeer::new( provider.provider, provider.addresses, @@ -187,10 +186,10 @@ impl KademliaMessage { ..Default::default() }; - let mut buf = Vec::with_capacity(message.encoded_len()); + let mut buf = BytesMut::with_capacity(message.encoded_len()); message.encode(&mut buf).expect("Vec to provide needed capacity"); - buf + buf.freeze() } /// Create `GET_PROVIDERS` request for `key`. diff --git a/src/protocol/libp2p/kademlia/mod.rs b/src/protocol/libp2p/kademlia/mod.rs index 82e9da24..0b3da797 100644 --- a/src/protocol/libp2p/kademlia/mod.rs +++ b/src/protocol/libp2p/kademlia/mod.rs @@ -83,13 +83,16 @@ mod schema { } /// Peer action. -#[derive(Debug)] +#[derive(Debug, Clone)] enum PeerAction { /// Send `FIND_NODE` message to peer. SendFindNode(QueryId), /// Send `PUT_VALUE` message to peer. SendPutValue(Bytes), + + /// Send `ADD_PROVIDER` message to peer. + SendAddProvider(Bytes), } /// Peer context. @@ -335,7 +338,12 @@ impl Kademlia { } } Some(PeerAction::SendPutValue(message)) => { - tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` response"); + tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` message"); + + self.executor.send_message(peer, message, substream); + } + Some(PeerAction::SendAddProvider(message)) => { + tracing::trace!(target: LOG_TARGET, ?peer, "send `ADD_PROVIDER` message"); self.executor.send_message(peer, message, substream); } @@ -758,6 +766,35 @@ impl Kademlia { Ok(()) } + QueryAction::AddProviderToFoundNodes { provider, peers } => { + tracing::trace!( + target: LOG_TARGET, + provided_key = ?provider.key, + num_peers = ?peers.len(), + "add provider record to found peers", + ); + + let provided_key = provider.key.clone(); + let message = KademliaMessage::add_provider(provider); + + for peer in peers { + if let Err(error) = self.open_substream_or_dial( + peer.peer, + PeerAction::SendAddProvider(message.clone()), + None, + ) { + tracing::debug!( + target: LOG_TARGET, + ?peer, + ?provided_key, + ?error, + "failed to add provider record to peer", + ) + } + } + + Ok(()) + } QueryAction::GetRecordQueryDone { query_id, records } => { let _ = self .event_tx @@ -794,7 +831,11 @@ impl Kademlia { event = self.service.next() => match event { Some(TransportEvent::ConnectionEstablished { peer, .. }) => { if let Err(error) = self.on_connection_established(peer) { - tracing::debug!(target: LOG_TARGET, ?error, "failed to handle established connection"); + tracing::debug!( + target: LOG_TARGET, + ?error, + "failed to handle established connection", + ); } } Some(TransportEvent::ConnectionClosed { peer }) => { @@ -804,7 +845,10 @@ impl Kademlia { match direction { Direction::Inbound => self.on_inbound_substream(peer, substream).await, Direction::Outbound(substream_id) => { - if let Err(error) = self.on_outbound_substream(peer, substream_id, substream).await { + if let Err(error) = self + .on_outbound_substream(peer, substream_id, substream) + .await + { tracing::debug!( target: LOG_TARGET, ?peer, @@ -819,7 +863,8 @@ impl Kademlia { Some(TransportEvent::SubstreamOpenFailure { substream, error }) => { self.on_substream_open_failure(substream, error).await; } - Some(TransportEvent::DialFailure { peer, address, .. }) => self.on_dial_failure(peer, address), + Some(TransportEvent::DialFailure { peer, address, .. }) => + self.on_dial_failure(peer, address), None => return Err(Error::EssentialTaskClosed), }, context = self.executor.next() => { @@ -827,14 +872,32 @@ impl Kademlia { match result { QueryResult::SendSuccess { substream } => { - tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message sent to peer"); + tracing::trace!( + target: LOG_TARGET, + ?peer, + query = ?query_id, + "message sent to peer", + ); let _ = substream.close().await; } QueryResult::ReadSuccess { substream, message } => { - tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message read from peer"); + tracing::trace!(target: LOG_TARGET, + ?peer, + query = ?query_id, + "message read from peer", + ); - if let Err(error) = self.on_message_received(peer, query_id, message, substream).await { - tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to process message"); + if let Err(error) = self.on_message_received( + peer, + query_id, + message, + substream + ).await { + tracing::debug!(target: LOG_TARGET, + ?peer, + ?error, + "failed to process message", + ); } } QueryResult::SubstreamClosed | QueryResult::Timeout => { @@ -853,22 +916,36 @@ impl Kademlia { command = self.cmd_rx.recv() => { match command { Some(KademliaCommand::FindNode { peer, query_id }) => { - tracing::debug!(target: LOG_TARGET, ?peer, query = ?query_id, "starting `FIND_NODE` query"); + tracing::debug!( + target: LOG_TARGET, + ?peer, + query = ?query_id, + "starting `FIND_NODE` query", + ); self.engine.start_find_node( query_id, peer, - self.routing_table.closest(Key::from(peer), self.replication_factor).into() + self.routing_table + .closest(Key::from(peer), self.replication_factor) + .into() ); } Some(KademliaCommand::PutRecord { mut record, query_id }) => { - tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT"); + tracing::debug!( + target: LOG_TARGET, + query = ?query_id, + key = ?record.key, + "store record to DHT", + ); // For `PUT_VALUE` requests originating locally we are always the publisher. record.publisher = Some(self.local_key.clone().into_preimage()); // Make sure TTL is set. - record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl)); + record.expires = record + .expires + .or_else(|| Some(Instant::now() + self.record_ttl)); let key = Key::new(record.key.clone()); @@ -880,11 +957,23 @@ impl Kademlia { self.routing_table.closest(key, self.replication_factor).into(), ); } - Some(KademliaCommand::PutRecordToPeers { mut record, query_id, peers, update_local_store }) => { - tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT to specified peers"); + Some(KademliaCommand::PutRecordToPeers { + mut record, + query_id, + peers, + update_local_store, + }) => { + tracing::debug!( + target: LOG_TARGET, + query = ?query_id, + key = ?record.key, + "store record to DHT to specified peers", + ); // Make sure TTL is set. - record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl)); + record.expires = record + .expires + .or_else(|| Some(Instant::now() + self.record_ttl)); if update_local_store { self.store.put(record.clone()); @@ -898,7 +987,8 @@ impl Kademlia { match self.routing_table.entry(Key::from(peer)) { KBucketEntry::Occupied(entry) => Some(entry.clone()), - KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() => Some(entry.clone()), + KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() => + Some(entry.clone()), _ => None, } }).collect(); @@ -909,6 +999,36 @@ impl Kademlia { peers, ); } + Some(KademliaCommand::StartProviding { + key, + public_addresses, + query_id + }) => { + tracing::debug!( + target: LOG_TARGET, + query = ?query_id, + ?key, + ?public_addresses, + "register as content provider" + ); + + let provider = ProviderRecord { + key: key.clone(), + provider: self.service.local_peer_id(), + addresses: public_addresses, + expires: Instant::now() + self.provider_ttl, + }; + + self.store.put_provider(provider.clone()); + + self.engine.start_add_provider( + query_id, + provider, + self.routing_table + .closest(Key::new(key), self.replication_factor) + .into(), + ); + } Some(KademliaCommand::GetRecord { key, quorum, query_id }) => { tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT"); diff --git a/src/protocol/libp2p/kademlia/query/mod.rs b/src/protocol/libp2p/kademlia/query/mod.rs index f29af805..34f6e84e 100644 --- a/src/protocol/libp2p/kademlia/query/mod.rs +++ b/src/protocol/libp2p/kademlia/query/mod.rs @@ -25,7 +25,7 @@ use crate::{ find_node::{FindNodeConfig, FindNodeContext}, get_record::{GetRecordConfig, GetRecordContext}, }, - record::{Key as RecordKey, Record}, + record::{Key as RecordKey, ProviderRecord, Record}, types::{KademliaPeer, Key}, PeerRecord, Quorum, }, @@ -45,8 +45,6 @@ mod get_record; /// Logging target for the file. const LOG_TARGET: &str = "litep2p::ipfs::kademlia::query"; -// TODO: store record key instead of the actual record - /// Type representing a query ID. #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq)] pub struct QueryId(pub usize); @@ -56,7 +54,7 @@ pub struct QueryId(pub usize); enum QueryType { /// `FIND_NODE` query. FindNode { - /// Context for the `FIND_NODE` query + /// Context for the `FIND_NODE` query. context: FindNodeContext, }, @@ -65,7 +63,7 @@ enum QueryType { /// Record that needs to be stored. record: Record, - /// Context for the `FIND_NODE` query + /// Context for the `FIND_NODE` query. context: FindNodeContext, }, @@ -83,6 +81,15 @@ enum QueryType { /// Context for the `GET_VALUE` query. context: GetRecordContext, }, + + /// `ADD_PROVIDER` query. + AddProvider { + /// Provider record that need to be stored. + provider: ProviderRecord, + + /// Context for the `FIND_NODE` query. + context: FindNodeContext, + }, } /// Query action. @@ -122,6 +129,15 @@ pub enum QueryAction { peers: Vec, }, + /// Add the provider record to nodes closest to the target key. + AddProviderToFoundNodes { + /// Provider record. + provider: ProviderRecord, + + /// Peers for whom the `ADD_PROVIDER` must be sent to. + peers: Vec, + }, + /// `GET_VALUE` query succeeded. GetRecordQueryDone { /// Query ID. @@ -131,7 +147,6 @@ pub enum QueryAction { records: Vec, }, - // TODO: remove /// Query succeeded. QuerySucceeded { /// ID of the query that succeeded. @@ -308,6 +323,41 @@ impl QueryEngine { query_id } + /// Start `ADD_PROVIDER` query. + pub fn start_add_provider( + &mut self, + query_id: QueryId, + provider: ProviderRecord, + candidates: VecDeque, + ) -> QueryId { + tracing::debug!( + target: LOG_TARGET, + ?query_id, + ?provider, + num_peers = ?candidates.len(), + "start `ADD_PROVIDER` query", + ); + + let target = Key::new(provider.key.clone()); + let config = FindNodeConfig { + local_peer_id: self.local_peer_id, + replication_factor: self.replication_factor, + parallelism_factor: self.parallelism_factor, + query: query_id, + target, + }; + + self.queries.insert( + query_id, + QueryType::AddProvider { + provider, + context: FindNodeContext::new(config, candidates), + }, + ); + + query_id + } + /// Register response failure from a queried peer. pub fn register_response_failure(&mut self, query: QueryId, peer: PeerId) { tracing::trace!(target: LOG_TARGET, ?query, ?peer, "register response failure"); @@ -328,6 +378,9 @@ impl QueryEngine { Some(QueryType::GetRecord { context }) => { context.register_response_failure(peer); } + Some(QueryType::AddProvider { context, .. }) => { + context.register_response_failure(peer); + } } } @@ -363,6 +416,12 @@ impl QueryEngine { } _ => unreachable!(), }, + Some(QueryType::AddProvider { context, .. }) => match message { + KademliaMessage::FindNode { peers, .. } => { + context.register_response(peer, peers); + } + _ => unreachable!(), + }, } } @@ -379,6 +438,7 @@ impl QueryEngine { Some(QueryType::PutRecord { context, .. }) => context.next_peer_action(peer), Some(QueryType::PutRecordToPeers { context, .. }) => context.next_peer_action(peer), Some(QueryType::GetRecord { context }) => context.next_peer_action(peer), + Some(QueryType::AddProvider { context, .. }) => context.next_peer_action(peer), } } @@ -403,6 +463,10 @@ impl QueryEngine { query_id: context.config.query, records: context.found_records(), }, + QueryType::AddProvider { provider, context } => QueryAction::AddProviderToFoundNodes { + provider, + peers: context.responses.into_values().collect::>(), + }, } } @@ -422,6 +486,7 @@ impl QueryEngine { QueryType::PutRecord { context, .. } => context.next_action(), QueryType::PutRecordToPeers { context, .. } => context.next_action(), QueryType::GetRecord { context } => context.next_action(), + QueryType::AddProvider { context, .. } => context.next_action(), }; match action {