diff --git a/prdoc/pr_6711.prdoc b/prdoc/pr_6711.prdoc new file mode 100644 index 000000000000..ec09035e1356 --- /dev/null +++ b/prdoc/pr_6711.prdoc @@ -0,0 +1,13 @@ +title: Expose DHT content providers API from `sc-network` +doc: +- audience: Node Dev + description: |- + Expose the Kademlia content providers API for the use by `sc-network` client code: + 1. Extend the `NetworkDHTProvider` trait with functions to start/stop providing content and query the DHT for the list of content providers for a given key. + 2. Extend the `DhtEvent` enum with events reporting the found providers or query failures. + 3. Implement the above for libp2p & litep2p network backends. +crates: +- name: sc-network + bump: major +- name: sc-authority-discovery + bump: major diff --git a/substrate/client/authority-discovery/src/worker.rs b/substrate/client/authority-discovery/src/worker.rs index ba82910efcdf..6630b7157d96 100644 --- a/substrate/client/authority-discovery/src/worker.rs +++ b/substrate/client/authority-discovery/src/worker.rs @@ -677,6 +677,9 @@ where metrics.dht_event_received.with_label_values(&["put_record_req"]).inc(); } }, + DhtEvent::StartProvidingFailed(..) => {}, + DhtEvent::ProvidersFound(..) => {}, + DhtEvent::ProvidersNotFound(..) => {}, } } diff --git a/substrate/client/authority-discovery/src/worker/tests.rs b/substrate/client/authority-discovery/src/worker/tests.rs index 6c3a3b56b1cb..c14771585655 100644 --- a/substrate/client/authority-discovery/src/worker/tests.rs +++ b/substrate/client/authority-discovery/src/worker/tests.rs @@ -231,6 +231,18 @@ impl NetworkDHTProvider for TestNetwork { .unbounded_send(TestNetworkEvent::StoreRecordCalled) .unwrap(); } + + fn start_providing(&self, _: KademliaKey) { + unimplemented!() + } + + fn stop_providing(&self, _: KademliaKey) { + unimplemented!() + } + + fn get_providers(&self, _: KademliaKey) { + unimplemented!() + } } impl NetworkStateInfo for TestNetwork { diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index dbb72381b660..cee80b6c1e86 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -310,6 +310,22 @@ impl Behaviour { ) { self.discovery.store_record(record_key, record_value, publisher, expires); } + + /// Start providing `key` on the DHT. + pub fn start_providing(&mut self, key: RecordKey) { + self.discovery.start_providing(key) + } + + /// Stop providing `key` on the DHT. + pub fn stop_providing(&mut self, key: &RecordKey) { + self.discovery.stop_providing(key) + } + + /// Start searching for providers on the DHT. Will later produce either a `ProvidersFound` + /// or `ProvidersNotFound` event. + pub fn get_providers(&mut self, key: RecordKey) { + self.discovery.get_providers(key) + } } impl From for BehaviourOut { @@ -387,6 +403,17 @@ impl From for BehaviourOut { ), DiscoveryOut::ValuePutFailed(key, duration) => BehaviourOut::Dht(DhtEvent::ValuePutFailed(key.into()), Some(duration)), + DiscoveryOut::StartProvidingFailed(key) => + BehaviourOut::Dht(DhtEvent::StartProvidingFailed(key.into()), None), + DiscoveryOut::ProvidersFound(key, providers, duration) => BehaviourOut::Dht( + DhtEvent::ProvidersFound( + key.into(), + providers.into_iter().map(Into::into).collect(), + ), + Some(duration), + ), + DiscoveryOut::ProvidersNotFound(key, duration) => + BehaviourOut::Dht(DhtEvent::ProvidersNotFound(key.into()), Some(duration)), DiscoveryOut::RandomKademliaStarted => BehaviourOut::RandomKademliaStarted, } } diff --git a/substrate/client/network/src/discovery.rs b/substrate/client/network/src/discovery.rs index 8080bda9a574..81baa00e201e 100644 --- a/substrate/client/network/src/discovery.rs +++ b/substrate/client/network/src/discovery.rs @@ -58,8 +58,8 @@ use libp2p::{ self, record::store::{MemoryStore, RecordStore}, Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent, - GetClosestPeersError, GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record, - RecordKey, + GetClosestPeersError, GetProvidersError, GetProvidersOk, GetRecordOk, PeerRecord, QueryId, + QueryResult, Quorum, Record, RecordKey, }, mdns::{self, tokio::Behaviour as TokioMdns}, multiaddr::Protocol, @@ -466,6 +466,31 @@ impl DiscoveryBehaviour { } } } + + /// Register as a content provider on the DHT for `key`. + pub fn start_providing(&mut self, key: RecordKey) { + if let Some(kad) = self.kademlia.as_mut() { + if let Err(e) = kad.start_providing(key.clone()) { + warn!(target: "sub-libp2p", "Libp2p => Failed to start providing {key:?}: {e}."); + self.pending_events.push_back(DiscoveryOut::StartProvidingFailed(key)); + } + } + } + + /// Deregister as a content provider on the DHT for `key`. + pub fn stop_providing(&mut self, key: &RecordKey) { + if let Some(kad) = self.kademlia.as_mut() { + kad.stop_providing(key); + } + } + + /// Get content providers for `key` from the DHT. + pub fn get_providers(&mut self, key: RecordKey) { + if let Some(kad) = self.kademlia.as_mut() { + kad.get_providers(key); + } + } + /// Store a record in the Kademlia record store. pub fn store_record( &mut self, @@ -581,6 +606,15 @@ pub enum DiscoveryOut { /// Returning the corresponding key as well as the request duration. ValuePutFailed(RecordKey, Duration), + /// Starting providing a key failed. + StartProvidingFailed(RecordKey), + + /// The DHT yielded results for the providers request. + ProvidersFound(RecordKey, HashSet, Duration), + + /// Providers for the requested key were not found in the DHT. + ProvidersNotFound(RecordKey, Duration), + /// Started a random Kademlia query. /// /// Only happens if [`DiscoveryConfig::with_dht_random_walk`] has been configured to `true`. @@ -982,6 +1016,56 @@ impl NetworkBehaviour for DiscoveryBehaviour { }; return Poll::Ready(ToSwarm::GenerateEvent(ev)) }, + KademliaEvent::OutboundQueryProgressed { + result: QueryResult::GetProviders(res), + stats, + id, + .. + } => { + let ev = match res { + Ok(GetProvidersOk::FoundProviders { key, providers }) => { + debug!( + target: "sub-libp2p", + "Libp2p => Found providers {:?} for key {:?}, id {:?}, stats {:?}", + providers, + key, + id, + stats, + ); + + DiscoveryOut::ProvidersFound( + key, + providers, + stats.duration().unwrap_or_default(), + ) + }, + Ok(GetProvidersOk::FinishedWithNoAdditionalRecord { + closest_peers: _, + }) => { + debug!( + target: "sub-libp2p", + "Libp2p => Finished with no additional providers {:?}, stats {:?}, took {:?} ms", + id, + stats, + stats.duration().map(|val| val.as_millis()) + ); + + continue + }, + Err(GetProvidersError::Timeout { key, closest_peers: _ }) => { + debug!( + target: "sub-libp2p", + "Libp2p => Failed to get providers for {key:?} due to timeout.", + ); + + DiscoveryOut::ProvidersNotFound( + key, + stats.duration().unwrap_or_default(), + ) + }, + }; + return Poll::Ready(ToSwarm::GenerateEvent(ev)) + }, KademliaEvent::OutboundQueryProgressed { result: QueryResult::PutRecord(res), stats, diff --git a/substrate/client/network/src/event.rs b/substrate/client/network/src/event.rs index 626cf516a7ec..e8ec1eee2545 100644 --- a/substrate/client/network/src/event.rs +++ b/substrate/client/network/src/event.rs @@ -45,8 +45,17 @@ pub enum DhtEvent { /// An error has occurred while putting a record into the DHT. ValuePutFailed(Key), + /// An error occured while registering as a content provider on the DHT. + StartProvidingFailed(Key), + /// The DHT received a put record request. PutRecordRequest(Key, Vec, Option, Option), + + /// The providers for [`Key`] were found. + ProvidersFound(Key, Vec), + + /// The providers for [`Key`] were not found. + ProvidersNotFound(Key), } /// Type for events generated by networking layer. diff --git a/substrate/client/network/src/litep2p/discovery.rs b/substrate/client/network/src/litep2p/discovery.rs index 3a9454e317cc..2bea2e5a80dc 100644 --- a/substrate/client/network/src/litep2p/discovery.rs +++ b/substrate/client/network/src/litep2p/discovery.rs @@ -32,7 +32,7 @@ use litep2p::{ libp2p::{ identify::{Config as IdentifyConfig, IdentifyEvent}, kademlia::{ - Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, + Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, ContentProvider, IncomingRecordValidationMode, KademliaEvent, KademliaHandle, QueryId, Quorum, Record, RecordKey, RecordsType, }, @@ -144,6 +144,14 @@ pub enum DiscoveryEvent { query_id: QueryId, }, + /// Providers were successfully retrieved. + GetProvidersSuccess { + /// Query ID. + query_id: QueryId, + /// Found providers sorted by distance to provided key. + providers: Vec, + }, + /// Query failed. QueryFailed { /// Query ID. @@ -407,6 +415,21 @@ impl Discovery { .await; } + /// Start providing `key`. + pub async fn start_providing(&mut self, key: KademliaKey) { + self.kademlia_handle.start_providing(key.into()).await; + } + + /// Stop providing `key`. + pub async fn stop_providing(&mut self, key: KademliaKey) { + self.kademlia_handle.stop_providing(key.into()).await; + } + + /// Get providers for `key`. + pub async fn get_providers(&mut self, key: KademliaKey) -> QueryId { + self.kademlia_handle.get_providers(key.into()).await + } + /// Check if the observed address is a known address. fn is_known_address(known: &Multiaddr, observed: &Multiaddr) -> bool { let mut known = known.iter(); @@ -581,8 +604,22 @@ impl Stream for Discovery { return Poll::Ready(Some(DiscoveryEvent::IncomingRecord { record })) }, - // Content provider events are ignored for now. - Poll::Ready(Some(KademliaEvent::GetProvidersSuccess { .. })) | + Poll::Ready(Some(KademliaEvent::GetProvidersSuccess { + provided_key, + providers, + query_id, + })) => { + log::trace!( + target: LOG_TARGET, + "`GET_PROVIDERS` for {query_id:?} with {provided_key:?} yielded {providers:?}", + ); + + return Poll::Ready(Some(DiscoveryEvent::GetProvidersSuccess { + query_id, + providers, + })) + }, + // We do not validate incoming providers. Poll::Ready(Some(KademliaEvent::IncomingProvider { .. })) => {}, } diff --git a/substrate/client/network/src/litep2p/mod.rs b/substrate/client/network/src/litep2p/mod.rs index b6d64b34d64a..52b2970525df 100644 --- a/substrate/client/network/src/litep2p/mod.rs +++ b/substrate/client/network/src/litep2p/mod.rs @@ -143,6 +143,17 @@ struct ConnectionContext { num_connections: usize, } +/// Kademlia query we are tracking. +#[derive(Debug)] +enum KadQuery { + /// `GET_VALUE` query for key and when it was initiated. + GetValue(RecordKey, Instant), + /// `PUT_VALUE` query for key and when it was initiated. + PutValue(RecordKey, Instant), + /// `GET_PROVIDERS` query for key and when it was initiated. + GetProviders(RecordKey, Instant), +} + /// Networking backend for `litep2p`. pub struct Litep2pNetworkBackend { /// Main `litep2p` object. @@ -157,11 +168,8 @@ pub struct Litep2pNetworkBackend { /// `Peerset` handles to notification protocols. peerset_handles: HashMap, - /// Pending `GET_VALUE` queries. - pending_get_values: HashMap, - - /// Pending `PUT_VALUE` queries. - pending_put_values: HashMap, + /// Pending Kademlia queries. + pending_queries: HashMap, /// Discovery. discovery: Discovery, @@ -615,8 +623,7 @@ impl NetworkBackend for Litep2pNetworkBac peerset_handles: notif_protocols, num_connected, discovery, - pending_put_values: HashMap::new(), - pending_get_values: HashMap::new(), + pending_queries: HashMap::new(), peerstore_handle: peer_store_handle, block_announce_protocol, event_streams: out_events::OutChannels::new(None)?, @@ -704,21 +711,30 @@ impl NetworkBackend for Litep2pNetworkBac Some(command) => match command { NetworkServiceCommand::GetValue{ key } => { let query_id = self.discovery.get_value(key.clone()).await; - self.pending_get_values.insert(query_id, (key, Instant::now())); + self.pending_queries.insert(query_id, KadQuery::GetValue(key, Instant::now())); } NetworkServiceCommand::PutValue { key, value } => { let query_id = self.discovery.put_value(key.clone(), value).await; - self.pending_put_values.insert(query_id, (key, Instant::now())); + self.pending_queries.insert(query_id, KadQuery::PutValue(key, Instant::now())); } NetworkServiceCommand::PutValueTo { record, peers, update_local_storage} => { let kademlia_key = record.key.clone(); let query_id = self.discovery.put_value_to_peers(record.into(), peers, update_local_storage).await; - self.pending_put_values.insert(query_id, (kademlia_key, Instant::now())); + self.pending_queries.insert(query_id, KadQuery::PutValue(kademlia_key, Instant::now())); } - NetworkServiceCommand::StoreRecord { key, value, publisher, expires } => { self.discovery.store_record(key, value, publisher.map(Into::into), expires).await; } + NetworkServiceCommand::StartProviding { key } => { + self.discovery.start_providing(key).await; + } + NetworkServiceCommand::StopProviding { key } => { + self.discovery.stop_providing(key).await; + } + NetworkServiceCommand::GetProviders { key } => { + let query_id = self.discovery.get_providers(key.clone()).await; + self.pending_queries.insert(query_id, KadQuery::GetProviders(key, Instant::now())); + } NetworkServiceCommand::EventStream { tx } => { self.event_streams.push(tx); } @@ -821,12 +837,8 @@ impl NetworkBackend for Litep2pNetworkBac } } Some(DiscoveryEvent::GetRecordSuccess { query_id, records }) => { - match self.pending_get_values.remove(&query_id) { - None => log::warn!( - target: LOG_TARGET, - "`GET_VALUE` succeeded for a non-existent query", - ), - Some((key, started)) => { + match self.pending_queries.remove(&query_id) { + Some(KadQuery::GetValue(key, started)) => { log::trace!( target: LOG_TARGET, "`GET_VALUE` for {:?} ({query_id:?}) succeeded", @@ -848,16 +860,19 @@ impl NetworkBackend for Litep2pNetworkBac .with_label_values(&["value-get"]) .observe(started.elapsed().as_secs_f64()); } - } + }, + query => { + log::error!( + target: LOG_TARGET, + "Missing/invalid pending query for `GET_VALUE`: {query:?}" + ); + debug_assert!(false); + }, } } Some(DiscoveryEvent::PutRecordSuccess { query_id }) => { - match self.pending_put_values.remove(&query_id) { - None => log::warn!( - target: LOG_TARGET, - "`PUT_VALUE` succeeded for a non-existent query", - ), - Some((key, started)) => { + match self.pending_queries.remove(&query_id) { + Some(KadQuery::PutValue(key, started)) => { log::trace!( target: LOG_TARGET, "`PUT_VALUE` for {key:?} ({query_id:?}) succeeded", @@ -873,35 +888,50 @@ impl NetworkBackend for Litep2pNetworkBac .with_label_values(&["value-put"]) .observe(started.elapsed().as_secs_f64()); } + }, + query => { + log::error!( + target: LOG_TARGET, + "Missing/invalid pending query for `PUT_VALUE`: {query:?}" + ); + debug_assert!(false); } } } - Some(DiscoveryEvent::QueryFailed { query_id }) => { - match self.pending_get_values.remove(&query_id) { - None => match self.pending_put_values.remove(&query_id) { - None => log::warn!( + Some(DiscoveryEvent::GetProvidersSuccess { query_id, providers }) => { + match self.pending_queries.remove(&query_id) { + Some(KadQuery::GetProviders(key, started)) => { + log::trace!( target: LOG_TARGET, - "non-existent query failed ({query_id:?})", - ), - Some((key, started)) => { - log::debug!( - target: LOG_TARGET, - "`PUT_VALUE` ({query_id:?}) failed for key {key:?}", - ); + "`GET_PROVIDERS` for {key:?} ({query_id:?}) succeeded", + ); - self.event_streams.send(Event::Dht( - DhtEvent::ValuePutFailed(key) - )); + self.event_streams.send(Event::Dht( + DhtEvent::ProvidersFound( + key.into(), + providers.into_iter().map(|p| p.peer.into()).collect() + ) + )); - if let Some(ref metrics) = self.metrics { - metrics - .kademlia_query_duration - .with_label_values(&["value-put-failed"]) - .observe(started.elapsed().as_secs_f64()); - } + if let Some(ref metrics) = self.metrics { + metrics + .kademlia_query_duration + .with_label_values(&["providers-get"]) + .observe(started.elapsed().as_secs_f64()); } + }, + query => { + log::error!( + target: LOG_TARGET, + "Missing/invalid pending query for `GET_PROVIDERS`: {query:?}" + ); + debug_assert!(false); } - Some((key, started)) => { + } + } + Some(DiscoveryEvent::QueryFailed { query_id }) => { + match self.pending_queries.remove(&query_id) { + Some(KadQuery::GetValue(key, started)) => { log::debug!( target: LOG_TARGET, "`GET_VALUE` ({query_id:?}) failed for key {key:?}", @@ -917,6 +947,46 @@ impl NetworkBackend for Litep2pNetworkBac .with_label_values(&["value-get-failed"]) .observe(started.elapsed().as_secs_f64()); } + }, + Some(KadQuery::PutValue(key, started)) => { + log::debug!( + target: LOG_TARGET, + "`PUT_VALUE` ({query_id:?}) failed for key {key:?}", + ); + + self.event_streams.send(Event::Dht( + DhtEvent::ValuePutFailed(key) + )); + + if let Some(ref metrics) = self.metrics { + metrics + .kademlia_query_duration + .with_label_values(&["value-put-failed"]) + .observe(started.elapsed().as_secs_f64()); + } + }, + Some(KadQuery::GetProviders(key, started)) => { + log::debug!( + target: LOG_TARGET, + "`GET_PROVIDERS` ({query_id:?}) failed for key {key:?}" + ); + + self.event_streams.send(Event::Dht( + DhtEvent::ProvidersNotFound(key) + )); + + if let Some(ref metrics) = self.metrics { + metrics + .kademlia_query_duration + .with_label_values(&["providers-get-failed"]) + .observe(started.elapsed().as_secs_f64()); + } + }, + None => { + log::warn!( + target: LOG_TARGET, + "non-existent query failed ({query_id:?})", + ); } } } diff --git a/substrate/client/network/src/litep2p/service.rs b/substrate/client/network/src/litep2p/service.rs index fa1d47e5a1b7..d270e90efdf5 100644 --- a/substrate/client/network/src/litep2p/service.rs +++ b/substrate/client/network/src/litep2p/service.rs @@ -104,6 +104,15 @@ pub enum NetworkServiceCommand { expires: Option, }, + /// Start providing `key`. + StartProviding { key: KademliaKey }, + + /// Stop providing `key`. + StopProviding { key: KademliaKey }, + + /// Get providers for `key`. + GetProviders { key: KademliaKey }, + /// Query network status. Status { /// `oneshot::Sender` for sending the status. @@ -296,6 +305,18 @@ impl NetworkDHTProvider for Litep2pNetworkService { expires, }); } + + fn start_providing(&self, key: KademliaKey) { + let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::StartProviding { key }); + } + + fn stop_providing(&self, key: KademliaKey) { + let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::StopProviding { key }); + } + + fn get_providers(&self, key: KademliaKey) { + let _ = self.cmd_tx.unbounded_send(NetworkServiceCommand::GetProviders { key }); + } } #[async_trait::async_trait] diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 5e5e4ee28589..803b81129139 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -973,6 +973,18 @@ where expires, )); } + + fn start_providing(&self, key: KademliaKey) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StartProviding(key)); + } + + fn stop_providing(&self, key: KademliaKey) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::StopProviding(key)); + } + + fn get_providers(&self, key: KademliaKey) { + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::GetProviders(key)); + } } #[async_trait::async_trait] @@ -1333,6 +1345,9 @@ enum ServiceToWorkerMsg { update_local_storage: bool, }, StoreRecord(KademliaKey, Vec, Option, Option), + StartProviding(KademliaKey), + StopProviding(KademliaKey), + GetProviders(KademliaKey), AddKnownAddress(PeerId, Multiaddr), EventStream(out_events::Sender), Request { @@ -1466,6 +1481,12 @@ where .network_service .behaviour_mut() .store_record(key.into(), value, publisher, expires), + ServiceToWorkerMsg::StartProviding(key) => + self.network_service.behaviour_mut().start_providing(key.into()), + ServiceToWorkerMsg::StopProviding(key) => + self.network_service.behaviour_mut().stop_providing(&key.into()), + ServiceToWorkerMsg::GetProviders(key) => + self.network_service.behaviour_mut().get_providers(key.into()), ServiceToWorkerMsg::AddKnownAddress(peer_id, addr) => self.network_service.behaviour_mut().add_known_address(peer_id, addr), ServiceToWorkerMsg::EventStream(sender) => self.event_streams.push(sender), @@ -1678,6 +1699,9 @@ where DhtEvent::ValuePut(_) => "value-put", DhtEvent::ValuePutFailed(_) => "value-put-failed", DhtEvent::PutRecordRequest(_, _, _, _) => "put-record-request", + DhtEvent::StartProvidingFailed(_) => "start-providing-failed", + DhtEvent::ProvidersFound(_, _) => "providers-found", + DhtEvent::ProvidersNotFound(_) => "providers-not-found", }; metrics .kademlia_query_duration diff --git a/substrate/client/network/src/service/traits.rs b/substrate/client/network/src/service/traits.rs index f5dd2995acb1..acfed9ea894c 100644 --- a/substrate/client/network/src/service/traits.rs +++ b/substrate/client/network/src/service/traits.rs @@ -234,6 +234,15 @@ pub trait NetworkDHTProvider { publisher: Option, expires: Option, ); + + /// Register this node as a provider for `key` on the DHT. + fn start_providing(&self, key: KademliaKey); + + /// Deregister this node as a provider for `key` on the DHT. + fn stop_providing(&self, key: KademliaKey); + + /// Start getting the list of providers for `key` on the DHT. + fn get_providers(&self, key: KademliaKey); } impl NetworkDHTProvider for Arc @@ -262,6 +271,18 @@ where ) { T::store_record(self, key, value, publisher, expires) } + + fn start_providing(&self, key: KademliaKey) { + T::start_providing(self, key) + } + + fn stop_providing(&self, key: KademliaKey) { + T::stop_providing(self, key) + } + + fn get_providers(&self, key: KademliaKey) { + T::get_providers(self, key) + } } /// Provides an ability to set a fork sync request for a particular block.