Skip to content

Commit

Permalink
Move query ID generation from KademliaHandle to Kademlia
Browse files Browse the repository at this point in the history
  • Loading branch information
dmitry-markin committed Aug 29, 2024
1 parent feb493d commit 982c73d
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 61 deletions.
113 changes: 57 additions & 56 deletions src/protocol/libp2p/kademlia/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ use crate::{

use futures::Stream;
use multiaddr::Multiaddr;
use tokio::sync::mpsc::{Receiver, Sender};
use tokio::sync::{
mpsc::{Receiver, Sender},
oneshot,
};

use std::{
num::NonZeroUsize,
Expand Down Expand Up @@ -88,17 +91,17 @@ pub(crate) enum KademliaCommand {
/// Peer ID.
peer: PeerId,

/// Query ID for the query.
query_id: QueryId,
/// Query ID callback.
query_id_tx: oneshot::Sender<QueryId>,
},

/// Store record to DHT.
PutRecord {
/// Record.
record: Record,

/// Query ID for the query.
query_id: QueryId,
/// Query ID callback.
query_id_tx: oneshot::Sender<QueryId>,
},

/// Store record to DHT to the given peers.
Expand All @@ -108,8 +111,8 @@ pub(crate) enum KademliaCommand {
/// Record.
record: Record,

/// Query ID for the query.
query_id: QueryId,
/// Query ID callback.
query_id_tx: oneshot::Sender<QueryId>,

/// Use the following peers for the put request.
peers: Vec<PeerId>,
Expand All @@ -126,8 +129,8 @@ pub(crate) enum KademliaCommand {
/// [`Quorum`] for the query.
quorum: Quorum,

/// Query ID for the query.
query_id: QueryId,
/// Query ID callback.
query_id_tx: oneshot::Sender<QueryId>,
},

/// Register as a content provider for `key`.
Expand All @@ -139,7 +142,7 @@ pub(crate) enum KademliaCommand {
public_addresses: Vec<Multiaddr>,

/// Query ID for the query.
query_id: QueryId,
query_id_tx: oneshot::Sender<QueryId>,
},

/// Store record locally.
Expand Down Expand Up @@ -232,27 +235,12 @@ pub struct KademliaHandle {

/// RX channel for receiving events from `Kademlia`.
event_rx: Receiver<KademliaEvent>,

/// Next query ID.
next_query_id: usize,
}

impl KademliaHandle {
/// Create new [`KademliaHandle`].
pub(super) fn new(cmd_tx: Sender<KademliaCommand>, event_rx: Receiver<KademliaEvent>) -> Self {
Self {
cmd_tx,
event_rx,
next_query_id: 0usize,
}
}

/// Allocate next query ID.
fn next_query_id(&mut self) -> QueryId {
let query_id = self.next_query_id;
self.next_query_id += 1;

QueryId(query_id)
Self { cmd_tx, event_rx }
}

/// Add known peer.
Expand All @@ -261,19 +249,32 @@ impl KademliaHandle {
}

/// Send `FIND_NODE` query to known peers.
pub async fn find_node(&mut self, peer: PeerId) -> QueryId {
let query_id = self.next_query_id();
let _ = self.cmd_tx.send(KademliaCommand::FindNode { peer, query_id }).await;

query_id
///
/// Returns [`Err`] only if [`super::Kademlia`] is terminating.
pub async fn find_node(&mut self, peer: PeerId) -> Result<QueryId, ()> {
let (query_id_tx, query_id_rx) = oneshot::channel();
self.cmd_tx
.send(KademliaCommand::FindNode { peer, query_id_tx })
.await
.map_err(|_| ())?;

query_id_rx.await.map_err(|_| ())
}

/// Store record to DHT.
pub async fn put_record(&mut self, record: Record) -> QueryId {
let query_id = self.next_query_id();
let _ = self.cmd_tx.send(KademliaCommand::PutRecord { record, query_id }).await;
///
/// Returns [`Err`] only if [`super::Kademlia`] is terminating.
pub async fn put_record(&mut self, record: Record) -> Result<QueryId, ()> {
let (query_id_tx, query_id_rx) = oneshot::channel();
self.cmd_tx
.send(KademliaCommand::PutRecord {
record,
query_id_tx,
})
.await
.map_err(|_| ())?;

query_id
query_id_rx.await.map_err(|_| ())
}

/// Store record to DHT to the given peers.
Expand All @@ -282,34 +283,34 @@ impl KademliaHandle {
record: Record,
peers: Vec<PeerId>,
update_local_store: bool,
) -> QueryId {
let query_id = self.next_query_id();
let _ = self
.cmd_tx
) -> Result<QueryId, ()> {
let (query_id_tx, query_id_rx) = oneshot::channel();
self.cmd_tx
.send(KademliaCommand::PutRecordToPeers {
record,
query_id,
query_id_tx,
peers,
update_local_store,
})
.await;
.await
.map_err(|_| ())?;

query_id
query_id_rx.await.map_err(|_| ())
}

/// Get record from DHT.
pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> QueryId {
let query_id = self.next_query_id();
let _ = self
.cmd_tx
pub async fn get_record(&mut self, key: RecordKey, quorum: Quorum) -> Result<QueryId, ()> {
let (query_id_tx, query_id_rx) = oneshot::channel();
self.cmd_tx
.send(KademliaCommand::GetRecord {
key,
quorum,
query_id,
query_id_tx,
})
.await;
.await
.map_err(|_| ())?;

query_id
query_id_rx.await.map_err(|_| ())
}

/// Register as a content provider on the DHT.
Expand All @@ -319,18 +320,18 @@ impl KademliaHandle {
&mut self,
key: RecordKey,
public_addresses: Vec<Multiaddr>,
) -> QueryId {
let query_id = self.next_query_id();
let _ = self
.cmd_tx
) -> Result<QueryId, ()> {
let (query_id_tx, query_id_rx) = oneshot::channel();
self.cmd_tx
.send(KademliaCommand::StartProviding {
key,
public_addresses,
query_id,
query_id_tx,
})
.await;
.await
.map_err(|_| ())?;

query_id
query_id_rx.await.map_err(|_| ())
}

/// Store the record in the local store. Used in combination with
Expand Down
37 changes: 32 additions & 5 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ pub(crate) struct Kademlia {

/// Query executor.
executor: QueryExecutor,

/// Next query ID.
next_query_id: usize,
}

impl Kademlia {
Expand Down Expand Up @@ -207,6 +210,7 @@ impl Kademlia {
provider_ttl: config.provider_ttl,
replication_factor: config.replication_factor,
engine: QueryEngine::new(local_peer_id, config.replication_factor, PARALLELISM_FACTOR),
next_query_id: 0usize,
}
}

Expand Down Expand Up @@ -938,7 +942,10 @@ impl Kademlia {
},
command = self.cmd_rx.recv() => {
match command {
Some(KademliaCommand::FindNode { peer, query_id }) => {
Some(KademliaCommand::FindNode { peer, query_id_tx }) => {
let query_id = self.next_query_id();
let _ = query_id_tx.send(query_id);

tracing::debug!(
target: LOG_TARGET,
?peer,
Expand All @@ -954,7 +961,10 @@ impl Kademlia {
.into()
);
}
Some(KademliaCommand::PutRecord { mut record, query_id }) => {
Some(KademliaCommand::PutRecord { mut record, query_id_tx }) => {
let query_id = self.next_query_id();
let _ = query_id_tx.send(query_id);

tracing::debug!(
target: LOG_TARGET,
query = ?query_id,
Expand Down Expand Up @@ -982,10 +992,13 @@ impl Kademlia {
}
Some(KademliaCommand::PutRecordToPeers {
mut record,
query_id,
query_id_tx,
peers,
update_local_store,
}) => {
let query_id = self.next_query_id();
let _ = query_id_tx.send(query_id);

tracing::debug!(
target: LOG_TARGET,
query = ?query_id,
Expand Down Expand Up @@ -1025,8 +1038,11 @@ impl Kademlia {
Some(KademliaCommand::StartProviding {
key,
public_addresses,
query_id
query_id_tx
}) => {
let query_id = self.next_query_id();
let _ = query_id_tx.send(query_id);

tracing::debug!(
target: LOG_TARGET,
query = ?query_id,
Expand All @@ -1052,7 +1068,10 @@ impl Kademlia {
.into(),
);
}
Some(KademliaCommand::GetRecord { key, quorum, query_id }) => {
Some(KademliaCommand::GetRecord { key, quorum, query_id_tx }) => {
let query_id = self.next_query_id();
let _ = query_id_tx.send(query_id);

tracing::debug!(target: LOG_TARGET, ?key, "get record from DHT");

match (self.store.get(&key), quorum) {
Expand Down Expand Up @@ -1135,6 +1154,14 @@ impl Kademlia {
}
}
}

/// Allocate next query ID.
fn next_query_id(&mut self) -> QueryId {
let query_id = self.next_query_id;
self.next_query_id += 1;

QueryId(query_id)
}
}

#[cfg(test)]
Expand Down

0 comments on commit 982c73d

Please sign in to comment.