Skip to content

Commit

Permalink
error details, tuning of params, dep updates
Browse files Browse the repository at this point in the history
Signed-off-by: Alexandru Vasile <[email protected]>
  • Loading branch information
lexnv committed Dec 17, 2024
1 parent 93ba8df commit 7615cc5
Show file tree
Hide file tree
Showing 12 changed files with 595 additions and 237 deletions.
597 changes: 437 additions & 160 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 8 additions & 8 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,26 @@ parking_lot = "0.12.3"
pin-project = "1.1.0"
prost = "0.12.6"
quinn = { version = "0.9.3", default-features = false, features = ["tls-rustls", "runtime-tokio"], optional = true }
rand = { version = "0.8.0", features = ["getrandom"] }
rand = { version = "0.8.5", features = ["getrandom"] }
rcgen = "0.10.0"
ring = "0.16.20"
serde = "1.0.158"
sha2 = "0.10.8"
simple-dns = "0.7.0"
simple-dns = "0.9.1"
smallvec = "1.13.2"
snow = { version = "0.9.3", features = ["ring-resolver"], default-features = false }
socket2 = { version = "0.5.7", features = ["all"] }
str0m = { version = "0.6.2", optional = true }
thiserror = "1.0.61"
tokio-stream = "0.1.12"
tokio-tungstenite = { version = "0.20.0", features = ["rustls-tls-native-roots"], optional = true }
tokio-util = { version = "0.7.11", features = ["compat", "io", "codec"] }
tokio = { version = "1.26.0", features = ["rt", "net", "io-util", "time", "macros", "sync", "parking_lot"] }
thiserror = "2.0.7"
tokio-stream = "0.1.17"
tokio-tungstenite = { version = "0.25.0", features = ["rustls-tls-native-roots", "url"], optional = true }
tokio-util = { version = "0.7.13", features = ["compat", "io", "codec"] }
tokio = { version = "1.42.0", features = ["rt", "net", "io-util", "time", "macros", "sync", "parking_lot"] }
tracing = { version = "0.1.40", features = ["log"] }
hickory-resolver = "0.24.2"
uint = "0.9.5"
unsigned-varint = { version = "0.8.0", features = ["codec"] }
url = "2.4.0"
url = "2.5.4"
webpki = { version = "0.22.4", optional = true }
x25519-dalek = "2.0.0"
x509-parser = "0.16.0"
Expand Down
63 changes: 39 additions & 24 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
//! [`/ipfs/kad/1.0.0`](https://github.com/libp2p/specs/blob/master/kad-dht/README.md) implementation.
use crate::{
addresses,
error::{Error, ImmediateDialError, SubstreamError},
protocol::{
libp2p::kademlia::{
Expand All @@ -41,8 +42,9 @@ use crate::{
};

use bytes::{Bytes, BytesMut};
use futures::StreamExt;
use futures::{sink::Close, StreamExt};
use multiaddr::Multiaddr;
use rustls::client;
use tokio::sync::mpsc::{Receiver, Sender};

use std::{
Expand Down Expand Up @@ -245,7 +247,7 @@ impl Kademlia {
context.add_pending_action(substream_id, action);
}
Err(error) => {
tracing::debug!(
tracing::error!(
target: LOG_TARGET,
?peer,
?action,
Expand Down Expand Up @@ -410,6 +412,25 @@ impl Kademlia {
}
}

fn closest_peers<K: Clone>(&mut self, target: &Key<K>) -> Vec<KademliaPeer> {
// Find closest peers from kademlia.
let mut closest_peers = self.routing_table.closest(target, self.replication_factor);

// Get the true addresses of the peers.
let mut peer_to_addresses =
self.service.peer_addresses(closest_peers.iter().map(|p| p.peer));

// Update the addresses of the peers.
for closest in closest_peers.iter_mut() {
if let Some(addresses) = peer_to_addresses.remove(&closest.peer) {
closest.addresses = addresses;
} else {
closest.addresses = Vec::new();
}
}
closest_peers
}

/// Handle received message.
async fn on_message_received(
&mut self,
Expand Down Expand Up @@ -448,11 +469,8 @@ impl Kademlia {
"handle `FIND_NODE` request",
);

let message = KademliaMessage::find_node_response(
&target,
self.routing_table
.closest(&Key::new(target.as_ref()), self.replication_factor),
);
let peers = self.closest_peers(&Key::new(target.as_ref()));
let message = KademliaMessage::find_node_response(&target, peers);
self.executor.send_message(peer, message.into(), substream);
}
}
Expand Down Expand Up @@ -500,9 +518,7 @@ impl Kademlia {
);

let value = self.store.get(&key).cloned();
let closest_peers = self
.routing_table
.closest(&Key::new(key.as_ref()), self.replication_factor);
let closest_peers = self.closest_peers(&Key::new(key.as_ref()));

let message =
KademliaMessage::get_value_response(key, closest_peers, value);
Expand Down Expand Up @@ -612,9 +628,7 @@ impl Kademlia {
p.addresses = self.service.public_addresses().get_addresses();
});

let closer_peers = self
.routing_table
.closest(&Key::new(key.as_ref()), self.replication_factor);
let closer_peers = self.closest_peers(&Key::new(key.as_ref()));

let message =
KademliaMessage::get_providers_response(providers, &closer_peers);
Expand Down Expand Up @@ -667,7 +681,7 @@ impl Kademlia {
}

/// Handle dial failure.
fn on_dial_failure(&mut self, peer: PeerId, address: Multiaddr) {
fn on_dial_failure(&mut self, peer: PeerId, address: Multiaddr, reason: String) {
tracing::trace!(target: LOG_TARGET, ?peer, ?address, "failed to dial peer");

let Some(actions) = self.pending_dials.remove(&peer) else {
Expand All @@ -681,6 +695,7 @@ impl Kademlia {
?peer,
query = ?query_id,
?address,
?reason,
"report failure for pending query",
);

Expand Down Expand Up @@ -928,8 +943,8 @@ impl Kademlia {
Some(TransportEvent::SubstreamOpenFailure { substream, error }) => {
self.on_substream_open_failure(substream, error).await;
}
Some(TransportEvent::DialFailure { peer, address, .. }) =>
self.on_dial_failure(peer, address),
Some(TransportEvent::DialFailure { peer, address, reason }) =>
self.on_dial_failure(peer, address, reason),
None => return Err(Error::EssentialTaskClosed),
},
context = self.executor.next() => {
Expand Down Expand Up @@ -988,12 +1003,11 @@ impl Kademlia {
"starting `FIND_NODE` query",
);

let closest = self.closest_peers(&Key::from(peer));
self.engine.start_find_node(
query_id,
peer,
self.routing_table
.closest(&Key::from(peer), self.replication_factor)
.into()
closest.into(),
);
}
Some(KademliaCommand::PutRecord { mut record, query_id }) => {
Expand All @@ -1017,10 +1031,11 @@ impl Kademlia {

self.store.put(record.clone());

let closest = self.closest_peers(&key);
self.engine.start_put_record(
query_id,
record,
self.routing_table.closest(&key, self.replication_factor).into(),
closest.into(),
);
}
Some(KademliaCommand::PutRecordToPeers {
Expand Down Expand Up @@ -1083,14 +1098,14 @@ impl Kademlia {
};

self.store.put_provider(key.clone(), provider.clone());
let key_saved = key.clone();
let closest = self.closest_peers(&Key::new(key));

self.engine.start_add_provider(
query_id,
key.clone(),
key_saved,
provider,
self.routing_table
.closest(&Key::new(key), self.replication_factor)
.into(),
closest.into(),
);
}
Some(KademliaCommand::StopProviding {
Expand Down
44 changes: 23 additions & 21 deletions src/protocol/libp2p/kademlia/query/find_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ pub struct FindNodeContext<T: Clone + Into<Vec<u8>>> {
/// that have failed to respond within the `Self::peer_timeout`
pending_responses: usize,

start_time: std::time::Instant,

is_done: bool,
waker: Option<std::task::Waker>,
}
Expand Down Expand Up @@ -127,11 +129,15 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {

is_done: false,
waker: None,

start_time: std::time::Instant::now(),
}
}

/// Register response failure for `peer`.
pub fn register_response_failure(&mut self, peer: PeerId) {
tracing::warn!(target: LOG_TARGET, query = ?self.config.query, ?peer, "peer failed to respond");

let Some((peer, instant)) = self.pending.remove(&peer) else {
tracing::debug!(target: LOG_TARGET, query = ?self.config.query, ?peer, "pending peer doesn't exist during response failure");
return;
Expand All @@ -140,7 +146,8 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {

tracing::trace!(target: LOG_TARGET, query = ?self.config.query, ?peer, elapsed = ?instant.elapsed(), "peer failed to respond");

self.queried.insert(peer.peer);
// Add a retry mechanism for failure responses.
// self.queried.insert(peer.peer);
}

/// Register `FIND_NODE` response from `peer`.
Expand All @@ -160,25 +167,7 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {

// always mark the peer as queried to prevent it getting queried again
self.queried.insert(peer.peer);

if self.responses.len() < self.config.replication_factor {
self.responses.insert(distance, peer);
} else {
// Update the furthest peer if this response is closer.
// Find the furthest distance.
let furthest_distance =
self.responses.last_entry().map(|entry| *entry.key()).unwrap_or(distance);

// The response received from the peer is closer than the furthest response.
if distance < furthest_distance {
self.responses.insert(distance, peer);

// Remove the furthest entry.
if self.responses.len() > self.config.replication_factor {
self.responses.pop_last();
}
}
}
self.responses.insert(distance, peer);

let to_query_candidate = peers.into_iter().filter_map(|peer| {
// Peer already produced a response.
Expand Down Expand Up @@ -241,6 +230,18 @@ impl<T: Clone + Into<Vec<u8>>> FindNodeContext<T> {

/// Get next action for a `FIND_NODE` query.
pub fn next_action(&mut self) -> Option<QueryAction> {
// if self.start_time.elapsed() > std::time::Duration::from_secs(10) {
// return if self.responses.is_empty() {
// Some(QueryAction::QueryFailed {
// query: self.config.query,
// })
// } else {
// Some(QueryAction::QuerySucceeded {
// query: self.config.query,
// })
// };
// }

// If we cannot make progress, return the final result.
// A query failed when we are not able to identify one single peer.
if self.is_done() {
Expand Down Expand Up @@ -495,7 +496,8 @@ mod tests {
let in_peers_set: HashSet<_> = [peer_a, peer_b, peer_c].into_iter().collect();
assert_eq!(in_peers_set.len(), 3);

let in_peers = [peer_a, peer_b, peer_c].iter().map(|peer| peer_to_kad(*peer)).collect();
let in_peers: VecDeque<KademliaPeer> =
[peer_a, peer_b, peer_c].iter().map(|peer| peer_to_kad(*peer)).collect();
let mut context = FindNodeContext::new(config, in_peers);

// Schedule peer queries.
Expand Down
3 changes: 3 additions & 0 deletions src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ pub enum TransportEvent {

/// Dialed address.
address: Multiaddr,

/// Reason for the dial failure.
reason: String,
},

/// Substream opened for `peer`.
Expand Down
14 changes: 12 additions & 2 deletions src/protocol/protocol_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ pub enum InnerTransportEvent {

/// Dialed address.
address: Multiaddr,

/// Reason for the failure.
reason: String,
},

/// Substream opened for `peer`.
Expand Down Expand Up @@ -144,8 +147,15 @@ pub enum InnerTransportEvent {
impl From<InnerTransportEvent> for TransportEvent {
fn from(event: InnerTransportEvent) -> Self {
match event {
InnerTransportEvent::DialFailure { peer, address } =>
TransportEvent::DialFailure { peer, address },
InnerTransportEvent::DialFailure {
peer,
address,
reason,
} => TransportEvent::DialFailure {
peer,
address,
reason,
},
InnerTransportEvent::SubstreamOpened {
peer,
protocol,
Expand Down
8 changes: 8 additions & 0 deletions src/protocol/transport_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,14 @@ impl TransportService {
self.transport_handle.add_known_address(peer, addresses.into_iter());
}

// Get peer addresses from the manager.
pub fn peer_addresses(
&self,
wanted_peers: impl IntoIterator<Item = PeerId>,
) -> HashMap<PeerId, Vec<Multiaddr>> {
self.transport_handle.peer_addresses(wanted_peers)
}

/// Open substream to `peer`.
///
/// Call fails if there is no connection open to `peer` or the channel towards
Expand Down
Loading

0 comments on commit 7615cc5

Please sign in to comment.