Skip to content

Commit

Permalink
kad: Providers part 3: publish provider records (start providing) (#234)
Browse files Browse the repository at this point in the history
Implement `ADD_PROVIDER` network request and execute `FIND_NODE` query
to publish local provider to the target peers.
  • Loading branch information
dmitry-markin authored Sep 30, 2024
1 parent d50ec10 commit 41fa8e2
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 28 deletions.
36 changes: 35 additions & 1 deletion src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,18 @@ pub(crate) enum KademliaCommand {
query_id: QueryId,
},

/// Register as a content provider for `key`.
StartProviding {
/// Provided key.
key: RecordKey,

/// Our external addresses to publish.
public_addresses: Vec<Multiaddr>,

/// Query ID for the query.
query_id: QueryId,
},

/// Store record locally.
StoreRecord {
// Record.
Expand Down Expand Up @@ -175,7 +187,8 @@ pub enum KademliaEvent {
},

/// `PUT_VALUE` query succeeded.
PutRecordSucess {
// TODO: this is never emitted. Implement + add `AddProviderSuccess`.
PutRecordSuccess {
/// Query ID.
query_id: QueryId,

Expand Down Expand Up @@ -299,6 +312,27 @@ impl KademliaHandle {
query_id
}

/// Register as a content provider on the DHT.
///
/// Register the local peer ID & its `public_addresses` as a provider for a given `key`.
pub async fn start_providing(
&mut self,
key: RecordKey,
public_addresses: Vec<Multiaddr>,
) -> QueryId {
let query_id = self.next_query_id();
let _ = self
.cmd_tx
.send(KademliaCommand::StartProviding {
key,
public_addresses,
query_id,
})
.await;

query_id
}

/// Store the record in the local store. Used in combination with
/// [`IncomingRecordValidationMode::Manual`].
pub async fn store_record(&mut self, record: Record) {
Expand Down
7 changes: 3 additions & 4 deletions src/protocol/libp2p/kademlia/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ impl KademliaMessage {
}

/// Create `ADD_PROVIDER` message with `provider`.
#[allow(unused)]
pub fn add_provider(provider: ProviderRecord) -> Vec<u8> {
pub fn add_provider(provider: ProviderRecord) -> Bytes {
let peer = KademliaPeer::new(
provider.provider,
provider.addresses,
Expand All @@ -187,10 +186,10 @@ impl KademliaMessage {
..Default::default()
};

let mut buf = Vec::with_capacity(message.encoded_len());
let mut buf = BytesMut::with_capacity(message.encoded_len());
message.encode(&mut buf).expect("Vec<u8> to provide needed capacity");

buf
buf.freeze()
}

/// Create `GET_PROVIDERS` request for `key`.
Expand Down
154 changes: 137 additions & 17 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,16 @@ mod schema {
}

/// Peer action.
#[derive(Debug)]
#[derive(Debug, Clone)]
enum PeerAction {
/// Send `FIND_NODE` message to peer.
SendFindNode(QueryId),

/// Send `PUT_VALUE` message to peer.
SendPutValue(Bytes),

/// Send `ADD_PROVIDER` message to peer.
SendAddProvider(Bytes),
}

/// Peer context.
Expand Down Expand Up @@ -335,7 +338,12 @@ impl Kademlia {
}
}
Some(PeerAction::SendPutValue(message)) => {
tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` response");
tracing::trace!(target: LOG_TARGET, ?peer, "send `PUT_VALUE` message");

self.executor.send_message(peer, message, substream);
}
Some(PeerAction::SendAddProvider(message)) => {
tracing::trace!(target: LOG_TARGET, ?peer, "send `ADD_PROVIDER` message");

self.executor.send_message(peer, message, substream);
}
Expand Down Expand Up @@ -758,6 +766,35 @@ impl Kademlia {

Ok(())
}
QueryAction::AddProviderToFoundNodes { provider, peers } => {
tracing::trace!(
target: LOG_TARGET,
provided_key = ?provider.key,
num_peers = ?peers.len(),
"add provider record to found peers",
);

let provided_key = provider.key.clone();
let message = KademliaMessage::add_provider(provider);

for peer in peers {
if let Err(error) = self.open_substream_or_dial(
peer.peer,
PeerAction::SendAddProvider(message.clone()),
None,
) {
tracing::debug!(
target: LOG_TARGET,
?peer,
?provided_key,
?error,
"failed to add provider record to peer",
)
}
}

Ok(())
}
QueryAction::GetRecordQueryDone { query_id, records } => {
let _ = self
.event_tx
Expand Down Expand Up @@ -794,7 +831,11 @@ impl Kademlia {
event = self.service.next() => match event {
Some(TransportEvent::ConnectionEstablished { peer, .. }) => {
if let Err(error) = self.on_connection_established(peer) {
tracing::debug!(target: LOG_TARGET, ?error, "failed to handle established connection");
tracing::debug!(
target: LOG_TARGET,
?error,
"failed to handle established connection",
);
}
}
Some(TransportEvent::ConnectionClosed { peer }) => {
Expand All @@ -804,7 +845,10 @@ impl Kademlia {
match direction {
Direction::Inbound => self.on_inbound_substream(peer, substream).await,
Direction::Outbound(substream_id) => {
if let Err(error) = self.on_outbound_substream(peer, substream_id, substream).await {
if let Err(error) = self
.on_outbound_substream(peer, substream_id, substream)
.await
{
tracing::debug!(
target: LOG_TARGET,
?peer,
Expand All @@ -819,22 +863,41 @@ impl Kademlia {
Some(TransportEvent::SubstreamOpenFailure { substream, error }) => {
self.on_substream_open_failure(substream, error).await;
}
Some(TransportEvent::DialFailure { peer, address, .. }) => self.on_dial_failure(peer, address),
Some(TransportEvent::DialFailure { peer, address, .. }) =>
self.on_dial_failure(peer, address),
None => return Err(Error::EssentialTaskClosed),
},
context = self.executor.next() => {
let QueryContext { peer, query_id, result } = context.unwrap();

match result {
QueryResult::SendSuccess { substream } => {
tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message sent to peer");
tracing::trace!(
target: LOG_TARGET,
?peer,
query = ?query_id,
"message sent to peer",
);
let _ = substream.close().await;
}
QueryResult::ReadSuccess { substream, message } => {
tracing::trace!(target: LOG_TARGET, ?peer, query = ?query_id, "message read from peer");
tracing::trace!(target: LOG_TARGET,
?peer,
query = ?query_id,
"message read from peer",
);

if let Err(error) = self.on_message_received(peer, query_id, message, substream).await {
tracing::debug!(target: LOG_TARGET, ?peer, ?error, "failed to process message");
if let Err(error) = self.on_message_received(
peer,
query_id,
message,
substream
).await {
tracing::debug!(target: LOG_TARGET,
?peer,
?error,
"failed to process message",
);
}
}
QueryResult::SubstreamClosed | QueryResult::Timeout => {
Expand All @@ -853,22 +916,36 @@ impl Kademlia {
command = self.cmd_rx.recv() => {
match command {
Some(KademliaCommand::FindNode { peer, query_id }) => {
tracing::debug!(target: LOG_TARGET, ?peer, query = ?query_id, "starting `FIND_NODE` query");
tracing::debug!(
target: LOG_TARGET,
?peer,
query = ?query_id,
"starting `FIND_NODE` query",
);

self.engine.start_find_node(
query_id,
peer,
self.routing_table.closest(Key::from(peer), self.replication_factor).into()
self.routing_table
.closest(Key::from(peer), self.replication_factor)
.into()
);
}
Some(KademliaCommand::PutRecord { mut record, query_id }) => {
tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT");
tracing::debug!(
target: LOG_TARGET,
query = ?query_id,
key = ?record.key,
"store record to DHT",
);

// For `PUT_VALUE` requests originating locally we are always the publisher.
record.publisher = Some(self.local_key.clone().into_preimage());

// Make sure TTL is set.
record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));
record.expires = record
.expires
.or_else(|| Some(Instant::now() + self.record_ttl));

let key = Key::new(record.key.clone());

Expand All @@ -880,11 +957,23 @@ impl Kademlia {
self.routing_table.closest(key, self.replication_factor).into(),
);
}
Some(KademliaCommand::PutRecordToPeers { mut record, query_id, peers, update_local_store }) => {
tracing::debug!(target: LOG_TARGET, query = ?query_id, key = ?record.key, "store record to DHT to specified peers");
Some(KademliaCommand::PutRecordToPeers {
mut record,
query_id,
peers,
update_local_store,
}) => {
tracing::debug!(
target: LOG_TARGET,
query = ?query_id,
key = ?record.key,
"store record to DHT to specified peers",
);

// Make sure TTL is set.
record.expires = record.expires.or_else(|| Some(Instant::now() + self.record_ttl));
record.expires = record
.expires
.or_else(|| Some(Instant::now() + self.record_ttl));

if update_local_store {
self.store.put(record.clone());
Expand All @@ -898,7 +987,8 @@ impl Kademlia {

match self.routing_table.entry(Key::from(peer)) {
KBucketEntry::Occupied(entry) => Some(entry.clone()),
KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() => Some(entry.clone()),
KBucketEntry::Vacant(entry) if !entry.addresses.is_empty() =>
Some(entry.clone()),
_ => None,
}
}).collect();
Expand All @@ -909,6 +999,36 @@ impl Kademlia {
peers,
);
}
Some(KademliaCommand::StartProviding {
key,
public_addresses,
query_id
}) => {
tracing::debug!(
target: LOG_TARGET,
query = ?query_id,
?key,
?public_addresses,
"register as content provider"
);

let provider = ProviderRecord {
key: key.clone(),
provider: self.service.local_peer_id(),
addresses: public_addresses,
expires: Instant::now() + self.provider_ttl,
};

self.store.put_provider(provider.clone());

self.engine.start_add_provider(
query_id,
provider,
self.routing_table
.closest(Key::new(key), self.replication_factor)
.into(),
);
}
Some(KademliaCommand::GetRecord { key, quorum, query_id }) => {
tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT");

Expand Down
Loading

0 comments on commit 41fa8e2

Please sign in to comment.