Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expose DHT content providers API from sc-network #6711

Merged
merged 9 commits into from
Dec 13, 2024
13 changes: 13 additions & 0 deletions prdoc/pr_6711.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
title: Expose DHT content providers API from `sc-network`
doc:
- audience: Node Dev
description: |-
Expose the Kademlia content providers API for the use by `sc-network` client code:
1. Extend the `NetworkDHTProvider` trait with functions to start/stop providing content and query the DHT for the list of content providers for a given key.
2. Extend the `DhtEvent` enum with events reporting the found providers or query failures.
3. Implement the above for libp2p & litep2p network backends.
crates:
- name: sc-network
bump: major
- name: sc-authority-discovery
bump: major
3 changes: 3 additions & 0 deletions substrate/client/authority-discovery/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,9 @@ where
metrics.dht_event_received.with_label_values(&["put_record_req"]).inc();
}
},
DhtEvent::StartProvidingFailed(..) => {},
DhtEvent::ProvidersFound(..) => {},
DhtEvent::ProvidersNotFound(..) => {},
}
}

Expand Down
12 changes: 12 additions & 0 deletions substrate/client/authority-discovery/src/worker/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,18 @@ impl NetworkDHTProvider for TestNetwork {
.unbounded_send(TestNetworkEvent::StoreRecordCalled)
.unwrap();
}

fn start_providing(&self, _: KademliaKey) {
unimplemented!()
}

fn stop_providing(&self, _: KademliaKey) {
unimplemented!()
}

fn get_providers(&self, _: KademliaKey) {
unimplemented!()
}
}

impl NetworkStateInfo for TestNetwork {
Expand Down
27 changes: 27 additions & 0 deletions substrate/client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,22 @@ impl<B: BlockT> Behaviour<B> {
) {
self.discovery.store_record(record_key, record_value, publisher, expires);
}

/// Start providing `key` on the DHT.
pub fn start_providing(&mut self, key: RecordKey) {
self.discovery.start_providing(key)
}

/// Stop providing `key` on the DHT.
pub fn stop_providing(&mut self, key: &RecordKey) {
self.discovery.stop_providing(key)
}

/// Start searching for providers on the DHT. Will later produce either a `ProvidersFound`
/// or `ProvidersNotFound` event.
pub fn get_providers(&mut self, key: RecordKey) {
self.discovery.get_providers(key)
}
}

impl From<CustomMessageOutcome> for BehaviourOut {
Expand Down Expand Up @@ -387,6 +403,17 @@ impl From<DiscoveryOut> for BehaviourOut {
),
DiscoveryOut::ValuePutFailed(key, duration) =>
BehaviourOut::Dht(DhtEvent::ValuePutFailed(key.into()), Some(duration)),
DiscoveryOut::StartProvidingFailed(key) =>
BehaviourOut::Dht(DhtEvent::StartProvidingFailed(key.into()), None),
DiscoveryOut::ProvidersFound(key, providers, duration) => BehaviourOut::Dht(
DhtEvent::ProvidersFound(
key.into(),
providers.into_iter().map(Into::into).collect(),
),
Some(duration),
),
DiscoveryOut::ProvidersNotFound(key, duration) =>
BehaviourOut::Dht(DhtEvent::ProvidersNotFound(key.into()), Some(duration)),
DiscoveryOut::RandomKademliaStarted => BehaviourOut::RandomKademliaStarted,
}
}
Expand Down
88 changes: 86 additions & 2 deletions substrate/client/network/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ use libp2p::{
self,
record::store::{MemoryStore, RecordStore},
Behaviour as Kademlia, BucketInserts, Config as KademliaConfig, Event as KademliaEvent,
GetClosestPeersError, GetRecordOk, PeerRecord, QueryId, QueryResult, Quorum, Record,
RecordKey,
GetClosestPeersError, GetProvidersError, GetProvidersOk, GetRecordOk, PeerRecord, QueryId,
QueryResult, Quorum, Record, RecordKey,
},
mdns::{self, tokio::Behaviour as TokioMdns},
multiaddr::Protocol,
Expand Down Expand Up @@ -466,6 +466,31 @@ impl DiscoveryBehaviour {
}
}
}

/// Register as a content provider on the DHT for `key`.
pub fn start_providing(&mut self, key: RecordKey) {
if let Some(kad) = self.kademlia.as_mut() {
if let Err(e) = kad.start_providing(key.clone()) {
warn!(target: "sub-libp2p", "Libp2p => Failed to start providing {key:?}: {e}.");
self.pending_events.push_back(DiscoveryOut::StartProvidingFailed(key));
}
}
}

/// Deregister as a content provider on the DHT for `key`.
pub fn stop_providing(&mut self, key: &RecordKey) {
if let Some(kad) = self.kademlia.as_mut() {
kad.stop_providing(key);
}
}

/// Get content providers for `key` from the DHT.
pub fn get_providers(&mut self, key: RecordKey) {
if let Some(kad) = self.kademlia.as_mut() {
kad.get_providers(key);
}
}

/// Store a record in the Kademlia record store.
pub fn store_record(
&mut self,
Expand Down Expand Up @@ -581,6 +606,15 @@ pub enum DiscoveryOut {
/// Returning the corresponding key as well as the request duration.
ValuePutFailed(RecordKey, Duration),

/// Starting providing a key failed.
StartProvidingFailed(RecordKey),
lexnv marked this conversation as resolved.
Show resolved Hide resolved

/// The DHT yielded results for the providers request.
ProvidersFound(RecordKey, HashSet<PeerId>, Duration),

/// Providers for the requested key were not found in the DHT.
ProvidersNotFound(RecordKey, Duration),

/// Started a random Kademlia query.
///
/// Only happens if [`DiscoveryConfig::with_dht_random_walk`] has been configured to `true`.
Expand Down Expand Up @@ -982,6 +1016,56 @@ impl NetworkBehaviour for DiscoveryBehaviour {
};
return Poll::Ready(ToSwarm::GenerateEvent(ev))
},
KademliaEvent::OutboundQueryProgressed {
result: QueryResult::GetProviders(res),
stats,
id,
..
} => {
let ev = match res {
Ok(GetProvidersOk::FoundProviders { key, providers }) => {
debug!(
target: "sub-libp2p",
"Libp2p => Found providers {:?} for key {:?}, id {:?}, stats {:?}",
providers,
key,
id,
stats,
);

DiscoveryOut::ProvidersFound(
key,
providers,
stats.duration().unwrap_or_default(),
)
},
Ok(GetProvidersOk::FinishedWithNoAdditionalRecord {
closest_peers: _,
}) => {
debug!(
target: "sub-libp2p",
"Libp2p => Finished with no additional providers {:?}, stats {:?}, took {:?} ms",
id,
stats,
stats.duration().map(|val| val.as_millis())
);

continue
},
Err(GetProvidersError::Timeout { key, closest_peers: _ }) => {
debug!(
target: "sub-libp2p",
"Libp2p => Failed to get providers for {key:?} due to timeout.",
);

DiscoveryOut::ProvidersNotFound(
key,
stats.duration().unwrap_or_default(),
)
},
};
return Poll::Ready(ToSwarm::GenerateEvent(ev))
},
KademliaEvent::OutboundQueryProgressed {
result: QueryResult::PutRecord(res),
stats,
Expand Down
9 changes: 9 additions & 0 deletions substrate/client/network/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,17 @@ pub enum DhtEvent {
/// An error has occurred while putting a record into the DHT.
ValuePutFailed(Key),

/// An error occured while registering as a content provider on the DHT.
StartProvidingFailed(Key),

/// The DHT received a put record request.
PutRecordRequest(Key, Vec<u8>, Option<sc_network_types::PeerId>, Option<std::time::Instant>),

/// The providers for [`Key`] were found.
ProvidersFound(Key, Vec<PeerId>),

/// The providers for [`Key`] were not found.
ProvidersNotFound(Key),
}

/// Type for events generated by networking layer.
Expand Down
43 changes: 40 additions & 3 deletions substrate/client/network/src/litep2p/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use litep2p::{
libp2p::{
identify::{Config as IdentifyConfig, IdentifyEvent},
kademlia::{
Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder,
Config as KademliaConfig, ConfigBuilder as KademliaConfigBuilder, ContentProvider,
IncomingRecordValidationMode, KademliaEvent, KademliaHandle, QueryId, Quorum,
Record, RecordKey, RecordsType,
},
Expand Down Expand Up @@ -144,6 +144,14 @@ pub enum DiscoveryEvent {
query_id: QueryId,
},

/// Providers were successfully retrieved.
GetProvidersSuccess {
/// Query ID.
query_id: QueryId,
/// Found providers sorted by distance to provided key.
providers: Vec<ContentProvider>,
},

/// Query failed.
QueryFailed {
/// Query ID.
Expand Down Expand Up @@ -407,6 +415,21 @@ impl Discovery {
.await;
}

/// Start providing `key`.
pub async fn start_providing(&mut self, key: KademliaKey) {
self.kademlia_handle.start_providing(key.into()).await;
}

/// Stop providing `key`.
pub async fn stop_providing(&mut self, key: KademliaKey) {
self.kademlia_handle.stop_providing(key.into()).await;
}

/// Get providers for `key`.
pub async fn get_providers(&mut self, key: KademliaKey) -> QueryId {
self.kademlia_handle.get_providers(key.into()).await
}

/// Check if the observed address is a known address.
fn is_known_address(known: &Multiaddr, observed: &Multiaddr) -> bool {
let mut known = known.iter();
Expand Down Expand Up @@ -581,8 +604,22 @@ impl Stream for Discovery {

return Poll::Ready(Some(DiscoveryEvent::IncomingRecord { record }))
},
// Content provider events are ignored for now.
Poll::Ready(Some(KademliaEvent::GetProvidersSuccess { .. })) |
Poll::Ready(Some(KademliaEvent::GetProvidersSuccess {
provided_key,
providers,
query_id,
})) => {
log::trace!(
target: LOG_TARGET,
"`GET_PROVIDERS` for {query_id:?} with {provided_key:?} yielded {providers:?}",
);

return Poll::Ready(Some(DiscoveryEvent::GetProvidersSuccess {
query_id,
providers,
}))
},
// We do not validate incoming providers.
Poll::Ready(Some(KademliaEvent::IncomingProvider { .. })) => {},
}

Expand Down
Loading
Loading