From 903a58982aac4a07d426e6db8912c593a0da252a Mon Sep 17 00:00:00 2001 From: Marko Atanasievski Date: Mon, 30 Oct 2023 14:56:12 +0100 Subject: [PATCH] fix: assert delivery test (#350) --- Cargo.lock | 1 + crates/topos-test-sdk/Cargo.toml | 1 + crates/topos-test-sdk/src/lib.rs | 5 +- crates/topos/Cargo.toml | 1 - crates/topos/src/components/tce/mod.rs | 8 +- .../tce/services/push_certificate.rs | 117 +++++++++++++++--- crates/topos/tests/push-certificate.rs | 58 --------- 7 files changed, 106 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e223888ab..8a5dd1a9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7850,6 +7850,7 @@ dependencies = [ name = "topos-test-sdk" version = "0.1.0" dependencies = [ + "async-stream", "async-trait", "ethers", "futures", diff --git a/crates/topos-test-sdk/Cargo.toml b/crates/topos-test-sdk/Cargo.toml index aa9612a3c..331ac4778 100644 --- a/crates/topos-test-sdk/Cargo.toml +++ b/crates/topos-test-sdk/Cargo.toml @@ -40,6 +40,7 @@ tower.workspace = true tokio-util.workspace = true tokio.workspace = true tracing.workspace = true +async-stream.workspace = true [features] default = [] diff --git a/crates/topos-test-sdk/src/lib.rs b/crates/topos-test-sdk/src/lib.rs index ca99b7a46..e219508ef 100644 --- a/crates/topos-test-sdk/src/lib.rs +++ b/crates/topos-test-sdk/src/lib.rs @@ -1,12 +1,11 @@ pub mod certificates; -#[cfg(feature = "tce")] -pub mod tce; - pub mod networking; pub mod p2p; pub mod sequencer; pub mod storage; +#[cfg(feature = "tce")] +pub mod tce; use std::{collections::HashSet, net::SocketAddr, sync::Mutex}; diff --git a/crates/topos/Cargo.toml b/crates/topos/Cargo.toml index 42138e847..71dbc080e 100644 --- a/crates/topos/Cargo.toml +++ b/crates/topos/Cargo.toml @@ -60,7 +60,6 @@ serde_json.workspace = true test-log.workspace = true env_logger.workspace = true rand.workspace = true -async-stream.workspace = true futures.workspace = true libp2p = { workspace = true, features = ["identify"] } assert_cmd = "2.0.6" diff --git a/crates/topos/src/components/tce/mod.rs b/crates/topos/src/components/tce/mod.rs index e1083bc2f..73cfb1bae 100644 --- a/crates/topos/src/components/tce/mod.rs +++ b/crates/topos/src/components/tce/mod.rs @@ -63,12 +63,8 @@ pub(crate) async fn handle_command( .await .map_err(Box::::from) { - Err(_) => { - error!("Check failed due to timeout"); - std::process::exit(1); - } - Ok(Err(errors)) => { - error!("Check failed due to errors: {:?}", errors); + Err(e) => { + error!("Check failed: {:?}", e); std::process::exit(1); } _ => { diff --git a/crates/topos/src/components/tce/services/push_certificate.rs b/crates/topos/src/components/tce/services/push_certificate.rs index 3ad65986f..6714e5c41 100644 --- a/crates/topos/src/components/tce/services/push_certificate.rs +++ b/crates/topos/src/components/tce/services/push_certificate.rs @@ -22,26 +22,21 @@ use topos_core::{ }, uci::{Certificate, CERTIFICATE_ID_LENGTH, SUBNET_ID_LENGTH}, }; -use tracing::{debug, info}; +use tracing::{debug, warn}; use crate::options::input_format::{InputFormat, Parser}; -pub(crate) async fn check_delivery( +/// Picks a random peer and sends it a certificate. All other peers listen for broadcast certs. +/// Three possible outcomes: +/// 1. No errors, returns Ok +/// 2. There were errors, returns a list of all errors encountered +/// 3. timeout +pub(crate) async fn check_certificate_delivery( timeout_broadcast: u64, - format: InputFormat, - peers: Option, + peers: Vec, timeout: u64, ) -> Result>, Elapsed> { tokio::time::timeout(Duration::from_secs(timeout), async move { - info!("peers: {:?}", peers); - let peers: Vec = format - .parse(NodeList(peers)) - .map_err(|_| vec![format!("Unable to parse node list")])? - .into_iter() - .map(TryInto::try_into) - .collect::, _>>() - .map_err(|_| vec![format!("Unable to parse node list")])?; - let random_peer: Uri = peers .choose(&mut rand::thread_rng()) .ok_or_else(|| { @@ -58,11 +53,11 @@ pub(crate) async fn check_delivery( &[[2u8; SUBNET_ID_LENGTH].into()], ) .map_err(|_| vec![format!("Unable to create the certificate")])?; - let certificate_id = pushed_certificate.id; + let mut join_handlers = Vec::new(); - // check that every nodes delivered the certificate + // check that all nodes delivered the certificate for peer in peers { join_handlers.push(tokio::spawn(async move { let peer_string = peer.clone(); @@ -79,7 +74,7 @@ pub(crate) async fn check_delivery( let status = result.into_inner(); if !status.has_active_sample { - return Err((peer_string, "didn't succeed in the sample phase")); + return Err((peer_string, "failed to find active sample")); } let mut client = ApiServiceClient::connect(peer_string.clone()) @@ -165,11 +160,37 @@ pub(crate) async fn check_delivery( }) .await .map_err(|error| { - info!("Timeout reached: {:?}", error); + warn!("Timeout reached: {:?}", error); error }) } +pub(crate) async fn check_delivery( + timeout_broadcast: u64, + format: InputFormat, + peers: Option, + timeout: u64, +) -> Result<(), Box> { + debug!("peers: {:?}", peers); + + let peers: Vec = format + .parse(NodeList(peers)) + .map_err(|e| format!("Unable to parse node list: {e}"))? + .into_iter() + .map(TryInto::try_into) + .collect::, _>>() + .map_err(|e| format!("Unable to parse node list: {e}"))?; + + match check_certificate_delivery(timeout_broadcast, peers, timeout).await { + Ok(Err(e)) => Err(format!("Error with certificate delivery: {e:?}").into()), + Err(e) => Err(Box::new(io::Error::new( + io::ErrorKind::TimedOut, + format!("Timeout elapsed: {e}"), + ))), + Ok(_) => Ok(()), + } +} + pub(crate) struct NodeList(pub(crate) Option); #[derive(Deserialize)] @@ -203,3 +224,65 @@ impl Parser for InputFormat { } } } + +#[cfg(test)] +mod tests { + use super::check_certificate_delivery; + use rstest::*; + use std::time::Duration; + use topos_core::api::grpc::tce::v1::StatusRequest; + use topos_test_sdk::tce::create_network; + use tracing::{debug, info}; + + #[rstest] + #[test_log::test(tokio::test)] + #[timeout(Duration::from_secs(30))] + async fn assert_push_certificate_delivery() -> Result<(), Box> { + let mut peers_context = create_network(5, vec![]).await; + + let mut status: Vec = Vec::new(); + + for (_peer_id, client) in peers_context.iter_mut() { + let response = client + .console_grpc_client + .status(StatusRequest {}) + .await + .expect("Can't get status"); + + status.push(response.into_inner().has_active_sample); + } + + assert!(status.iter().all(|s| *s)); + + let nodes = peers_context + .iter() + .map(|peer| peer.1.api_entrypoint.clone()) + .collect::>(); + + debug!("Nodes used in test: {:?}", nodes); + + let assertion = async move { + let peers: Vec = nodes + .into_iter() + .map(TryInto::try_into) + .collect::>() + .map_err(|e| format!("Unable to parse node list: {e}")) + .expect("Valid node list"); + + match check_certificate_delivery(5, peers, 20).await { + Ok(Err(e)) => { + panic!("Error with certificate delivery: {e:?}"); + } + Err(e) => { + panic!("Timeout elapsed: {e}"); + } + Ok(_) => { + info!("Check certificate delivery passed!"); + } + } + }; + + assertion.await; + Ok(()) + } +} diff --git a/crates/topos/tests/push-certificate.rs b/crates/topos/tests/push-certificate.rs index 12d41e0d0..c92d3011a 100644 --- a/crates/topos/tests/push-certificate.rs +++ b/crates/topos/tests/push-certificate.rs @@ -1,13 +1,7 @@ mod utils; -use std::{thread, time::Duration}; - use assert_cmd::Command; -use rstest::*; -use topos_core::api::grpc::tce::v1::StatusRequest; -use topos_test_sdk::tce::create_network; - #[test] fn help_display() -> Result<(), Box> { let mut cmd = Command::cargo_bin("topos")?; @@ -21,55 +15,3 @@ fn help_display() -> Result<(), Box> { Ok(()) } - -#[rstest] -#[test_log::test(tokio::test)] -#[timeout(Duration::from_secs(20))] -// FIXME: This test is flaky, it fails sometimes because of sample failure -async fn assert_delivery() -> Result<(), Box> { - let mut peers_context = create_network(5, vec![]).await; - - let mut status: Vec = Vec::new(); - - for (_peer_id, client) in peers_context.iter_mut() { - let response = client - .console_grpc_client - .status(StatusRequest {}) - .await - .expect("Can't get status"); - - status.push(response.into_inner().has_active_sample); - } - - assert!(status.iter().all(|s| *s)); - - let nodes: String = peers_context - .iter() - .map(|peer| peer.1.api_entrypoint.clone()) - .collect::>() - .join(","); - - let (tx, rx) = tokio::sync::oneshot::channel::<()>(); - - _ = thread::spawn(|| { - let mut cmd = Command::cargo_bin("topos").unwrap(); - cmd.env("TOPOS_LOG_FORMAT", "json"); - cmd.env("RUST_LOG", "topos=debug"); - - cmd.arg("tce") - .arg("push-certificates") - .args(["-f", "plain"]) - .arg("-n") - .arg(nodes); - - cmd.assert().success(); - - tx.send(()).unwrap(); - }); - - _ = tokio::time::timeout(Duration::from_secs(15), rx) - .await - .unwrap(); - - Ok(()) -}