Skip to content

Commit

Permalink
Expose DHT content providers API from sc-network (#6711)
Browse files Browse the repository at this point in the history
Expose the Kademlia content providers API for the use by `sc-network`
client code:
1. Extend the `NetworkDHTProvider` trait with functions to start/stop
providing content and query the DHT for the list of content providers
for a given key.
2. Extend the `DhtEvent` enum with events reporting the found providers
or query failures.
3. Implement the above for libp2p & litep2p network backends.

---------

Co-authored-by: GitHub Action <[email protected]>
Co-authored-by: Alexandru Vasile <[email protected]>
  • Loading branch information
3 people authored Dec 13, 2024
1 parent e1add3e commit 4b054c6
Show file tree
Hide file tree
Showing 11 changed files with 371 additions and 50 deletions.
13 changes: 13 additions & 0 deletions prdoc/pr_6711.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
title: Expose DHT content providers API from `sc-network`
doc:
- audience: Node Dev
description: |-
Expose the Kademlia content providers API for the use by `sc-network` client code:
1. Extend the `NetworkDHTProvider` trait with functions to start/stop providing content and query the DHT for the list of content providers for a given key.
2. Extend the `DhtEvent` enum with events reporting the found providers or query failures.
3. Implement the above for libp2p & litep2p network backends.
crates:
- name: sc-network
bump: major
- name: sc-authority-discovery
bump: major
3 changes: 3 additions & 0 deletions substrate/client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,9 @@ where
metrics.dht_event_received.with_label_values(&["put_record_req"]).inc();
}
},
DhtEvent::StartProvidingFailed(..) => {},
DhtEvent::ProvidersFound(..) => {},
DhtEvent::ProvidersNotFound(..) => {},
}
}

Expand Down
12 changes: 12 additions & 0 deletions substrate/client/authority-discovery/src/worker/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,18 @@ impl NetworkDHTProvider for TestNetwork {
.unbounded_send(TestNetworkEvent::StoreRecordCalled)
.unwrap();
}

fn start_providing(&self, _: KademliaKey) {
unimplemented!()
}

fn stop_providing(&self, _: KademliaKey) {
unimplemented!()
}

fn get_providers(&self, _: KademliaKey) {
unimplemented!()
}
}

impl NetworkStateInfo for TestNetwork {
Expand Down
27 changes: 27 additions & 0 deletions substrate/client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,22 @@ impl<B: BlockT> Behaviour<B> {
) {
self.discovery.store_record(record_key, record_value, publisher, expires);
}

/// Start providing `key` on the DHT.
pub fn start_providing(&mut self, key: RecordKey) {
self.discovery.start_providing(key)
}

/// Stop providing `key` on the DHT.
pub fn stop_providing(&mut self, key: &RecordKey) {
self.discovery.stop_providing(key)
}

/// Start searching for providers on the DHT. Will later produce either a `ProvidersFound`
/// or `ProvidersNotFound` event.
pub fn get_providers(&mut self, key: RecordKey) {
self.discovery.get_providers(key)
}
}

impl From<CustomMessageOutcome> for BehaviourOut {
Expand Down Expand Up @@ -387,6 +403,17 @@ impl From<DiscoveryOut> for BehaviourOut {
),
DiscoveryOut::ValuePutFailed(key, duration) =>
BehaviourOut::Dht(DhtEvent::ValuePutFailed(key.into()), Some(duration)),
DiscoveryOut::StartProvidingFailed(key) =>
BehaviourOut::Dht(DhtEvent::StartProvidingFailed(key.into()), None),
DiscoveryOut::ProvidersFound(key, providers, duration) => BehaviourOut::Dht(
DhtEvent::ProvidersFound(
key.into(),
providers.into_iter().map(Into::into).collect(),
),
Some(duration),
),
DiscoveryOut::ProvidersNotFound(key, duration) =>
BehaviourOut::Dht(DhtEvent::ProvidersNotFound(key.into()), Some(duration)),
DiscoveryOut::RandomKademliaStarted => BehaviourOut::RandomKademliaStarted,
}
}
Expand Down
88 changes: 86 additions & 2 deletions substrate/client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ use libp2p::{
self,
record::store::{MemoryStore, RecordStore},
Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
GetClosestPeersError, GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record,
RecordKey,
GetClosestPeersError, GetProvidersError, GetProvidersOk, GetRecordOk, PeerRecord, QueryId,
QueryResult, Quorum, Record, RecordKey,
},
mdns::{self, tokio::Behaviour as TokioMdns},
multiaddr::Protocol,
Expand Down Expand Up @@ -466,6 +466,31 @@ impl DiscoveryBehaviour {
}
}
}

/// Register as a content provider on the DHT for `key`.
pub fn start_providing(&mut self, key: RecordKey) {
if let Some(kad) = self.kademlia.as_mut() {
if let Err(e) = kad.start_providing(key.clone()) {
warn!(target: "sub-libp2p", "Libp2p => Failed to start providing {key:?}: {e}.");
self.pending_events.push_back(DiscoveryOut::StartProvidingFailed(key));
}
}
}

/// Deregister as a content provider on the DHT for `key`.
pub fn stop_providing(&mut self, key: &RecordKey) {
if let Some(kad) = self.kademlia.as_mut() {
kad.stop_providing(key);
}
}

/// Get content providers for `key` from the DHT.
pub fn get_providers(&mut self, key: RecordKey) {
if let Some(kad) = self.kademlia.as_mut() {
kad.get_providers(key);
}
}

/// Store a record in the Kademlia record store.
pub fn store_record(
&mut self,
Expand Down Expand Up @@ -581,6 +606,15 @@ pub enum DiscoveryOut {
/// Returning the corresponding key as well as the request duration.
ValuePutFailed(RecordKey, Duration),

/// Starting providing a key failed.
StartProvidingFailed(RecordKey),

/// The DHT yielded results for the providers request.
ProvidersFound(RecordKey, HashSet<PeerId>, Duration),

/// Providers for the requested key were not found in the DHT.
ProvidersNotFound(RecordKey, Duration),

/// Started a random Kademlia query.
///
/// Only happens if [`DiscoveryConfig::with_dht_random_walk`] has been configured to `true`.
Expand Down Expand Up @@ -982,6 +1016,56 @@ impl NetworkBehaviour for DiscoveryBehaviour {
};
return Poll::Ready(ToSwarm::GenerateEvent(ev))
},
KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetProviders(res),
stats,
id,
..
} => {
let ev = match res {
Ok(GetProvidersOk::FoundProviders { key, providers }) => {
debug!(
target: "sub-libp2p",
"Libp2p => Found providers {:?} for key {:?}, id {:?}, stats {:?}",
providers,
key,
id,
stats,
);

DiscoveryOut::ProvidersFound(
key,
providers,
stats.duration().unwrap_or_default(),
)
},
Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
closest_peers: _,
}) => {
debug!(
target: "sub-libp2p",
"Libp2p => Finished with no additional providers {:?}, stats {:?}, took {:?} ms",
id,
stats,
stats.duration().map(|val| val.as_millis())
);

continue
},
Err(GetProvidersError::Timeout { key, closest_peers: _ }) => {
debug!(
target: "sub-libp2p",
"Libp2p => Failed to get providers for {key:?} due to timeout.",
);

DiscoveryOut::ProvidersNotFound(
key,
stats.duration().unwrap_or_default(),
)
},
};
return Poll::Ready(ToSwarm::GenerateEvent(ev))
},
KademliaEvent::OutboundQueryProgressed {
result: QueryResult::PutRecord(res),
stats,
Expand Down
9 changes: 9 additions & 0 deletions substrate/client/network/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,17 @@ pub enum DhtEvent {
/// An error has occurred while putting a record into the DHT.
ValuePutFailed(Key),

/// An error occured while registering as a content provider on the DHT.
StartProvidingFailed(Key),

/// The DHT received a put record request.
PutRecordRequest(Key, Vec<u8>, Option<sc_network_types::PeerId>, Option<std::time::Instant>),

/// The providers for [`Key`] were found.
ProvidersFound(Key, Vec<PeerId>),

/// The providers for [`Key`] were not found.
ProvidersNotFound(Key),
}

/// Type for events generated by networking layer.
Expand Down
43 changes: 40 additions & 3 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use litep2p::{
libp2p::{
identify::{Config as IdentifyConfig, IdentifyEvent},
kademlia::{
Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder,
Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, ContentProvider,
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, QueryId, Quorum,
Record, RecordKey, RecordsType,
},
Expand Down Expand Up @@ -144,6 +144,14 @@ pub enum DiscoveryEvent {
query_id: QueryId,
},

/// Providers were successfully retrieved.
GetProvidersSuccess {
/// Query ID.
query_id: QueryId,
/// Found providers sorted by distance to provided key.
providers: Vec<ContentProvider>,
},

/// Query failed.
QueryFailed {
/// Query ID.
Expand Down Expand Up @@ -407,6 +415,21 @@ impl Discovery {
.await;
}

/// Start providing `key`.
pub async fn start_providing(&mut self, key: KademliaKey) {
self.kademlia_handle.start_providing(key.into()).await;
}

/// Stop providing `key`.
pub async fn stop_providing(&mut self, key: KademliaKey) {
self.kademlia_handle.stop_providing(key.into()).await;
}

/// Get providers for `key`.
pub async fn get_providers(&mut self, key: KademliaKey) -> QueryId {
self.kademlia_handle.get_providers(key.into()).await
}

/// Check if the observed address is a known address.
fn is_known_address(known: &Multiaddr, observed: &Multiaddr) -> bool {
let mut known = known.iter();
Expand Down Expand Up @@ -581,8 +604,22 @@ impl Stream for Discovery {

return Poll::Ready(Some(DiscoveryEvent::IncomingRecord { record }))
},
// Content provider events are ignored for now.
Poll::Ready(Some(KademliaEvent::GetProvidersSuccess { .. })) |
Poll::Ready(Some(KademliaEvent::GetProvidersSuccess {
provided_key,
providers,
query_id,
})) => {
log::trace!(
target: LOG_TARGET,
"`GET_PROVIDERS` for {query_id:?} with {provided_key:?} yielded {providers:?}",
);

return Poll::Ready(Some(DiscoveryEvent::GetProvidersSuccess {
query_id,
providers,
}))
},
// We do not validate incoming providers.
Poll::Ready(Some(KademliaEvent::IncomingProvider { .. })) => {},
}

Expand Down
Loading

0 comments on commit 4b054c6

Please sign in to comment.