Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
fix: assert delivery test (#350)
Browse files Browse the repository at this point in the history
  • Loading branch information
atanmarko authored Oct 30, 2023
1 parent b8a2a3e commit 903a589
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 85 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/topos-test-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ tower.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tracing.workspace = true
async-stream.workspace = true

[features]
default = []
Expand Down
5 changes: 2 additions & 3 deletions crates/topos-test-sdk/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
pub mod certificates;

#[cfg(feature = "tce")]
pub mod tce;

pub mod networking;
pub mod p2p;
pub mod sequencer;
pub mod storage;
#[cfg(feature = "tce")]
pub mod tce;

use std::{collections::HashSet, net::SocketAddr, sync::Mutex};

Expand Down
1 change: 0 additions & 1 deletion crates/topos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ serde_json.workspace = true
test-log.workspace = true
env_logger.workspace = true
rand.workspace = true
async-stream.workspace = true
futures.workspace = true
libp2p = { workspace = true, features = ["identify"] }
assert_cmd = "2.0.6"
Expand Down
8 changes: 2 additions & 6 deletions crates/topos/src/components/tce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,8 @@ pub(crate) async fn handle_command(
.await
.map_err(Box::<dyn std::error::Error>::from)
{
Err(_) => {
error!("Check failed due to timeout");
std::process::exit(1);
}
Ok(Err(errors)) => {
error!("Check failed due to errors: {:?}", errors);
Err(e) => {
error!("Check failed: {:?}", e);
std::process::exit(1);
}
_ => {
Expand Down
117 changes: 100 additions & 17 deletions crates/topos/src/components/tce/services/push_certificate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,26 +22,21 @@ use topos_core::{
},
uci::{Certificate, CERTIFICATE_ID_LENGTH, SUBNET_ID_LENGTH},
};
use tracing::{debug, info};
use tracing::{debug, warn};

use crate::options::input_format::{InputFormat, Parser};

pub(crate) async fn check_delivery(
/// Picks a random peer and sends it a certificate. All other peers listen for broadcast certs.
/// Three possible outcomes:
/// 1. No errors, returns Ok
/// 2. There were errors, returns a list of all errors encountered
/// 3. timeout
pub(crate) async fn check_certificate_delivery(
timeout_broadcast: u64,
format: InputFormat,
peers: Option<String>,
peers: Vec<Uri>,
timeout: u64,
) -> Result<Result<(), Vec<String>>, Elapsed> {
tokio::time::timeout(Duration::from_secs(timeout), async move {
info!("peers: {:?}", peers);
let peers: Vec<Uri> = format
.parse(NodeList(peers))
.map_err(|_| vec![format!("Unable to parse node list")])?
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()
.map_err(|_| vec![format!("Unable to parse node list")])?;

let random_peer: Uri = peers
.choose(&mut rand::thread_rng())
.ok_or_else(|| {
Expand All @@ -58,11 +53,11 @@ pub(crate) async fn check_delivery(
&[[2u8; SUBNET_ID_LENGTH].into()],
)
.map_err(|_| vec![format!("Unable to create the certificate")])?;

let certificate_id = pushed_certificate.id;

let mut join_handlers = Vec::new();

// check that every nodes delivered the certificate
// check that all nodes delivered the certificate
for peer in peers {
join_handlers.push(tokio::spawn(async move {
let peer_string = peer.clone();
Expand All @@ -79,7 +74,7 @@ pub(crate) async fn check_delivery(

let status = result.into_inner();
if !status.has_active_sample {
return Err((peer_string, "didn't succeed in the sample phase"));
return Err((peer_string, "failed to find active sample"));
}

let mut client = ApiServiceClient::connect(peer_string.clone())
Expand Down Expand Up @@ -165,11 +160,37 @@ pub(crate) async fn check_delivery(
})
.await
.map_err(|error| {
info!("Timeout reached: {:?}", error);
warn!("Timeout reached: {:?}", error);
error
})
}

pub(crate) async fn check_delivery(
timeout_broadcast: u64,
format: InputFormat,
peers: Option<String>,
timeout: u64,
) -> Result<(), Box<dyn std::error::Error>> {
debug!("peers: {:?}", peers);

let peers: Vec<Uri> = format
.parse(NodeList(peers))
.map_err(|e| format!("Unable to parse node list: {e}"))?
.into_iter()
.map(TryInto::try_into)
.collect::<Result<Vec<_>, _>>()
.map_err(|e| format!("Unable to parse node list: {e}"))?;

match check_certificate_delivery(timeout_broadcast, peers, timeout).await {
Ok(Err(e)) => Err(format!("Error with certificate delivery: {e:?}").into()),
Err(e) => Err(Box::new(io::Error::new(
io::ErrorKind::TimedOut,
format!("Timeout elapsed: {e}"),
))),
Ok(_) => Ok(()),
}
}

pub(crate) struct NodeList(pub(crate) Option<String>);

#[derive(Deserialize)]
Expand Down Expand Up @@ -203,3 +224,65 @@ impl Parser<NodeList> for InputFormat {
}
}
}

#[cfg(test)]
mod tests {
use super::check_certificate_delivery;
use rstest::*;
use std::time::Duration;
use topos_core::api::grpc::tce::v1::StatusRequest;
use topos_test_sdk::tce::create_network;
use tracing::{debug, info};

#[rstest]
#[test_log::test(tokio::test)]
#[timeout(Duration::from_secs(30))]
async fn assert_push_certificate_delivery() -> Result<(), Box<dyn std::error::Error>> {
let mut peers_context = create_network(5, vec![]).await;

let mut status: Vec<bool> = Vec::new();

for (_peer_id, client) in peers_context.iter_mut() {
let response = client
.console_grpc_client
.status(StatusRequest {})
.await
.expect("Can't get status");

status.push(response.into_inner().has_active_sample);
}

assert!(status.iter().all(|s| *s));

let nodes = peers_context
.iter()
.map(|peer| peer.1.api_entrypoint.clone())
.collect::<Vec<_>>();

debug!("Nodes used in test: {:?}", nodes);

let assertion = async move {
let peers: Vec<tonic::transport::Uri> = nodes
.into_iter()
.map(TryInto::try_into)
.collect::<Result<_, _>>()
.map_err(|e| format!("Unable to parse node list: {e}"))
.expect("Valid node list");

match check_certificate_delivery(5, peers, 20).await {
Ok(Err(e)) => {
panic!("Error with certificate delivery: {e:?}");
}
Err(e) => {
panic!("Timeout elapsed: {e}");
}
Ok(_) => {
info!("Check certificate delivery passed!");
}
}
};

assertion.await;
Ok(())
}
}
58 changes: 0 additions & 58 deletions crates/topos/tests/push-certificate.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,7 @@
mod utils;

use std::{thread, time::Duration};

use assert_cmd::Command;

use rstest::*;
use topos_core::api::grpc::tce::v1::StatusRequest;
use topos_test_sdk::tce::create_network;

#[test]
fn help_display() -> Result<(), Box<dyn std::error::Error>> {
let mut cmd = Command::cargo_bin("topos")?;
Expand All @@ -21,55 +15,3 @@ fn help_display() -> Result<(), Box<dyn std::error::Error>> {

Ok(())
}

#[rstest]
#[test_log::test(tokio::test)]
#[timeout(Duration::from_secs(20))]
// FIXME: This test is flaky, it fails sometimes because of sample failure
async fn assert_delivery() -> Result<(), Box<dyn std::error::Error>> {
let mut peers_context = create_network(5, vec![]).await;

let mut status: Vec<bool> = Vec::new();

for (_peer_id, client) in peers_context.iter_mut() {
let response = client
.console_grpc_client
.status(StatusRequest {})
.await
.expect("Can't get status");

status.push(response.into_inner().has_active_sample);
}

assert!(status.iter().all(|s| *s));

let nodes: String = peers_context
.iter()
.map(|peer| peer.1.api_entrypoint.clone())
.collect::<Vec<_>>()
.join(",");

let (tx, rx) = tokio::sync::oneshot::channel::<()>();

_ = thread::spawn(|| {
let mut cmd = Command::cargo_bin("topos").unwrap();
cmd.env("TOPOS_LOG_FORMAT", "json");
cmd.env("RUST_LOG", "topos=debug");

cmd.arg("tce")
.arg("push-certificates")
.args(["-f", "plain"])
.arg("-n")
.arg(nodes);

cmd.assert().success();

tx.send(()).unwrap();
});

_ = tokio::time::timeout(Duration::from_secs(15), rx)
.await
.unwrap();

Ok(())
}

0 comments on commit 903a589

Please sign in to comment.