Skip to content

Commit

Permalink
Merge pull request #1479 from carver/trace-failed-content
Browse files Browse the repository at this point in the history
Add peer failures to query traces
  • Loading branch information
carver authored Sep 26, 2024
2 parents 548cc6d + ca72d09 commit ec8ee7f
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 32 deletions.
168 changes: 143 additions & 25 deletions ethportal-api/src/types/query_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub struct QueryTrace {
pub origin: NodeId,
/// Map of a node's ID to its corresponding `QueryResponse`
pub responses: HashMap<NodeId, QueryResponse>,
/// Track if and how each node fails during a query
pub failures: HashMap<NodeId, QueryFailure>,
/// Contains a map from node ID to the metadata object for that node.
pub metadata: HashMap<NodeId, NodeInfo>,
/// Timestamp when the query was started.
Expand All @@ -37,6 +39,7 @@ impl QueryTrace {
received_from: None,
origin: local_enr.into(),
responses: HashMap::new(),
failures: HashMap::new(),
metadata: HashMap::new(),
started_at_ms: SystemTime::now(),
cancelled: Vec::new(),
Expand Down Expand Up @@ -89,6 +92,18 @@ impl QueryTrace {
self.add_metadata(enr, true);
}

/// Mark that we have removed a node from the query, for invalid behavior.
pub fn node_failed(&mut self, node_id: NodeId, failure: QueryFailureKind) {
let timestamp_u64 = QueryTrace::timestamp_millis_u64(self.started_at_ms);
self.failures.insert(
node_id,
QueryFailure {
duration_ms: timestamp_u64,
failure,
},
);
}

/// Mark the node that sent the content that was finally verified.
pub fn content_validated(&mut self, node_id: NodeId) {
if self.received_from.is_none() {
Expand Down Expand Up @@ -150,6 +165,32 @@ pub struct QueryResponse {
pub responded_with: Vec<NodeId>,
}

/// Represents a fatal failure of a single node during a query.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct QueryFailure {
/// Milliseconds since query started.
pub duration_ms: u64,
/// Way in which the node failed.
pub failure: QueryFailureKind,
}

/// Represents the kind of failure a node experienced during a query.
///
/// These do not include the case where a node does not respond at all, or behaves in an
/// undesirable but recoverable way. This only includes fatal failures that cause us to immediately
/// stop querying the node.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum QueryFailureKind {
/// The uTP connection to the node failed.
UtpConnectionFailed,
/// The uTP transfer from the node failed.
UtpTransferFailed,
/// The node fully sent content, but the content was invalid.
InvalidContent,
}

/// Represents additional info for a given node.
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
Expand All @@ -165,58 +206,135 @@ mod tests {

use crate::types::enr::generate_random_remote_enr;

fn new_node() -> (NodeId, Enr) {
let (_, enr) = generate_random_remote_enr();
let node_id = enr.node_id();
(node_id, enr)
}

#[test]
fn test_query_trace() {
let (_, local_enr) = generate_random_remote_enr();
let local_node_id = &local_enr.node_id();
let (local_node_id, local_enr) = new_node();
let mut tracer = QueryTrace::new(&local_enr, local_enr.node_id().raw());
let (_, enr_a) = generate_random_remote_enr();
let node_id_a: &NodeId = &enr_a.clone().into();
let (_, enr_b) = generate_random_remote_enr();
let node_id_b = &enr_b.clone().into();
let (_, enr_c) = generate_random_remote_enr();
let node_id_c = &enr_c.clone().into();
let (node_id_a, enr_a) = new_node();
let (node_id_b, enr_b) = new_node();
let (node_id_c, enr_c) = new_node();

tracer.node_responded_with(&local_enr, vec![&enr_a.clone()]);
tracer.node_responded_with(&local_enr, vec![&enr_b.clone()]);
tracer.node_responded_with(&local_enr, vec![&enr_c]);

let (_, enr_d) = generate_random_remote_enr();
let node_id_d = &enr_d.node_id();
let (node_id_d, enr_d) = new_node();

tracer.node_responded_with(&enr_a, vec![]);
tracer.node_responded_with(&enr_b, vec![&enr_d]);
tracer.node_responded_with_content(&enr_c);
tracer.content_validated(*node_id_c);
tracer.content_validated(node_id_c);

let origin_entry = tracer.responses.get(local_node_id).unwrap();
let origin_entry = tracer.responses.get(&local_node_id).unwrap();

// check that entry for origin contains a, b, c
assert!(origin_entry.responded_with.contains(node_id_a));
assert!(origin_entry.responded_with.contains(node_id_b));
assert!(origin_entry.responded_with.contains(node_id_c));
assert!(origin_entry.responded_with.contains(&node_id_a));
assert!(origin_entry.responded_with.contains(&node_id_b));
assert!(origin_entry.responded_with.contains(&node_id_c));

// check that entry for a contains empty list
let a_entry = tracer.responses.get(node_id_a).unwrap();
let a_entry = tracer.responses.get(&node_id_a).unwrap();
assert!(a_entry.responded_with.is_empty());
// check that entry for b contains d
let b_entry = tracer.responses.get(node_id_b).unwrap();
assert!(b_entry.responded_with.contains(node_id_d));
let b_entry = tracer.responses.get(&node_id_b).unwrap();
assert!(b_entry.responded_with.contains(&node_id_d));
// check that content_received_from_node is c
let c_entry = tracer.responses.get(node_id_c).unwrap();
let c_entry = tracer.responses.get(&node_id_c).unwrap();
assert!(c_entry.responded_with.is_empty());
assert_eq!(tracer.received_from, Some(*node_id_c));
assert_eq!(tracer.received_from, Some(node_id_c));

// check node metadata
let a_data = tracer.metadata.get(node_id_a).unwrap();
let a_data = tracer.metadata.get(&node_id_a).unwrap();
assert_eq!(a_data.enr, enr_a);
let b_data = tracer.metadata.get(node_id_b).unwrap();
let b_data = tracer.metadata.get(&node_id_b).unwrap();
assert_eq!(b_data.enr, enr_b);
let c_data = tracer.metadata.get(node_id_c).unwrap();
let c_data = tracer.metadata.get(&node_id_c).unwrap();
assert_eq!(c_data.enr, enr_c);
let d_data = tracer.metadata.get(node_id_d).unwrap();
let d_data = tracer.metadata.get(&node_id_d).unwrap();
assert_eq!(d_data.enr, enr_d);
let local_data = tracer.metadata.get(local_node_id).unwrap();
let local_data = tracer.metadata.get(&local_node_id).unwrap();
assert_eq!(local_data.enr, local_enr);
}

#[test]
fn test_query_trace_multiple_peers() {
let (local_node_id, local_enr) = new_node();
let mut tracer = QueryTrace::new(&local_enr, local_enr.node_id().raw());
let (node_id_a, enr_a) = new_node();
let (node_id_b, enr_b) = new_node();
let (node_id_c, enr_c) = new_node();

tracer.node_responded_with(&local_enr, vec![&enr_a.clone(), &enr_b.clone(), &enr_c]);

let origin_entry = tracer.responses.get(&local_node_id).unwrap();

// check that entry for origin contains a, b, c
assert!(origin_entry.responded_with.contains(&node_id_a));
assert!(origin_entry.responded_with.contains(&node_id_b));
assert!(origin_entry.responded_with.contains(&node_id_c));
}

#[test]
fn test_query_trace_failures() {
let (_, local_enr) = generate_random_remote_enr();
let mut tracer = QueryTrace::new(&local_enr, local_enr.node_id().raw());
let (node_id_a, enr_a) = new_node();
let (node_id_b, enr_b) = new_node();
let (node_id_c, enr_c) = new_node();

tracer.node_responded_with(&local_enr, vec![&enr_a.clone(), &enr_b.clone(), &enr_c]);

let (node_id_d, enr_d) = new_node();
let (node_id_e, enr_e) = new_node();

tracer.node_responded_with_content(&enr_a);
tracer.node_failed(node_id_a, QueryFailureKind::UtpConnectionFailed);
tracer.node_responded_with(&enr_b, vec![&enr_d, &enr_e]);
tracer.node_responded_with_content(&enr_c);
tracer.node_failed(node_id_c, QueryFailureKind::UtpTransferFailed);

tracer.node_responded_with_content(&enr_d);
tracer.node_failed(node_id_d, QueryFailureKind::InvalidContent);
tracer.node_responded_with_content(&enr_e);
tracer.content_validated(node_id_e);

// check that entry for a contains empty list
let a_entry = tracer.responses.get(&node_id_a).unwrap();
assert!(a_entry.responded_with.is_empty());
// check that entry for b contains d and e
let b_entry = tracer.responses.get(&node_id_b).unwrap();
assert!(b_entry.responded_with.contains(&node_id_d));
assert!(b_entry.responded_with.contains(&node_id_e));
// check that entry for c contains empty list
let c_entry = tracer.responses.get(&node_id_c).unwrap();
assert!(c_entry.responded_with.is_empty());
// check that entry for d contains empty list
let d_entry = tracer.responses.get(&node_id_d).unwrap();
assert!(d_entry.responded_with.is_empty());
// check that content_received_from_node is e
let e_entry = tracer.responses.get(&node_id_e).unwrap();
assert!(e_entry.responded_with.is_empty());
assert_eq!(tracer.received_from, Some(node_id_e));

// check for failures
assert_eq!(tracer.failures.len(), 3);
assert_eq!(
tracer.failures.get(&node_id_a).unwrap().failure,
QueryFailureKind::UtpConnectionFailed
);
assert_eq!(
tracer.failures.get(&node_id_c).unwrap().failure,
QueryFailureKind::UtpTransferFailed
);
assert_eq!(
tracer.failures.get(&node_id_d).unwrap().failure,
QueryFailureKind::InvalidContent
);
}
}
9 changes: 7 additions & 2 deletions portalnet/src/find/query_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,12 @@ pub enum QueryPoolState<'a, TNodeId, TQuery, TContentKey> {
)>,
),
/// A query has received a result from the given peer. It may require validation
Validating(&'a mut QueryInfo<TContentKey>, &'a mut TQuery, TNodeId),
Validating(
QueryId,
&'a mut QueryInfo<TContentKey>,
&'a mut TQuery,
TNodeId,
),
/// A query has finished.
Finished(QueryId, QueryInfo<TContentKey>, TQuery),
/// A query has timed out.
Expand Down Expand Up @@ -146,7 +151,7 @@ where

if let Some((query_id, sending_peer)) = validating {
let (query_info, query) = self.queries.get_mut(&query_id).expect("s.a.");
return QueryPoolState::Validating(query_info, query, sending_peer);
return QueryPoolState::Validating(query_id, query_info, query, sending_peer);
}

if let Some(query_id) = finished {
Expand Down
Loading

0 comments on commit ec8ee7f

Please sign in to comment.