Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
feat: add tce client send/receive test with tce network
Browse files Browse the repository at this point in the history
fix: test
  • Loading branch information
atanmarko committed Sep 28, 2023
1 parent d9bc06e commit 33d837c
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/topos-tce-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ base64.workspace = true
[dev-dependencies]
topos-tce-transport = { path = "../topos-tce-transport" }
topos-tce = { path = "../topos-tce" }
topos-uci = { path = "../topos-uci"}
rstest = { workspace = true, features = ["async-timeout"] }
test-log.workspace = true
env_logger.workspace = true
Expand Down
89 changes: 83 additions & 6 deletions crates/topos-tce-proxy/tests/tce_tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use base64::Engine;
use futures::StreamExt;
use rstest::*;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use test_log::test;
use tokio::sync::{mpsc, oneshot};
use tokio::time::Duration;
Expand Down Expand Up @@ -610,7 +610,7 @@ async fn test_tce_proxy_submit_certificate(

// Get last pending certificate to check that all certificates are submitted
let (mut tce_client, _receiving_certificate_stream) =
create_tce_client(&context.api_entrypoint).await?;
create_tce_client(&context.api_entrypoint, SOURCE_SUBNET_ID_1).await?;
match tce_client
.get_last_pending_certificates(vec![tce_client.get_subnet_id()])
.await
Expand All @@ -634,6 +634,7 @@ async fn test_tce_proxy_submit_certificate(

async fn create_tce_client(
endpoint: &str,
source_subnet_id: topos_uci::SubnetId,
) -> Result<
(
TceClient,
Expand All @@ -646,7 +647,6 @@ async fn create_tce_client(
),
Box<dyn std::error::Error>,
> {
let source_subnet_id = SOURCE_SUBNET_ID_1;
let (evt_sender, _evt_rcv) = mpsc::channel::<TceProxyEvent>(128);
let (_tce_client_shutdown_channel, shutdown_receiver) = mpsc::channel::<oneshot::Sender<()>>(1);

Expand Down Expand Up @@ -678,7 +678,7 @@ async fn test_tce_client_submit_and_get_last_pending_certificate(
let last_sent_certificate = certificates.last().unwrap().clone().certificate;

let (mut tce_client, _receiving_certificate_stream) =
create_tce_client(&context.api_entrypoint).await?;
create_tce_client(&context.api_entrypoint, SOURCE_SUBNET_ID_1).await?;

// Create tce proxy client
for (index, cert) in certificates.into_iter().enumerate() {
Expand Down Expand Up @@ -728,7 +728,7 @@ async fn test_tce_client_get_empty_history_source_head(
let mut context = start_node.await;

let (mut tce_client, _receiving_certificate_stream) =
create_tce_client(&context.api_entrypoint).await?;
create_tce_client(&context.api_entrypoint, SOURCE_SUBNET_ID_1).await?;

// Get source head certificate, check if it is empty
match tce_client.get_source_head().await {
Expand Down Expand Up @@ -769,7 +769,7 @@ async fn test_tce_client_get_source_head(
.certificate;

let (mut tce_client, _receiving_certificate_stream) =
create_tce_client(&context.api_entrypoint).await?;
create_tce_client(&context.api_entrypoint, SOURCE_SUBNET_ID_1).await?;

// Get source head, check if it matches
match tce_client.get_source_head().await {
Expand Down Expand Up @@ -810,3 +810,80 @@ async fn test_tce_client_get_source_head(
context.shutdown().await?;
Ok(())
}

#[rstest]
#[test(tokio::test)]
#[timeout(Duration::from_secs(30))]
async fn test_tce_client_submit_and_get_certificate_delivered(
) -> Result<(), Box<dyn std::error::Error>> {
let peers_context = topos_test_sdk::tce::create_network(5, vec![]).await;
let mut peers = peers_context.into_iter();
let mut sending_tce: TceContext = peers.next().expect("valid peer 1").1;
let mut receiving_tce: TceContext = peers.next().expect("valid peer 2").1;

let mut certificates = Vec::new();
certificates.append(&mut create_certificate_chain(
SOURCE_SUBNET_ID_1,
&[TARGET_SUBNET_ID_1],
5,
));
let expected_certificates: HashSet<topos_uci::CertificateId> = certificates
.iter()
.map(|cert| cert.certificate.id)
.collect();

// Create tce proxy client for sending subnet
let (mut tce_client_source, _) =
create_tce_client(&sending_tce.api_entrypoint, SOURCE_SUBNET_ID_1).await?;

// Create tce proxy client for receiving subnet
let (_, mut target_receiving_certificate_stream) =
create_tce_client(&receiving_tce.api_entrypoint, TARGET_SUBNET_ID_1).await?;

// Send certificate from source subnet
for (index, cert) in certificates.into_iter().enumerate() {
match tce_client_source.send_certificate(cert.certificate).await {
Ok(_) => {
info!(
"Certificate {} successfully submitted by the tce client",
index
);
}
Err(e) => {
panic!("Error submitting certificate by the tce client: {e}");
}
}
}

// Wait for certificates to be submitted
tokio::time::sleep(Duration::from_secs(5)).await;

// Listen for certificates on target subnet
info!("Waiting for certificates to be received on the target subnet");
let mut received_certs = HashSet::new();
loop {
match target_receiving_certificate_stream.next().await {
Some((certificate, target_position)) => {
info!(
"Delivered certificate cert id {}, position {:?}",
&certificate.id, target_position
);
received_certs.insert(certificate.id);
if received_certs.len() == expected_certificates.len()
&& received_certs == expected_certificates
{
info!("All certificates successfully received");
break;
}
}
None => {
error!("Certificate not received!")
}
}
}

info!("Shutting down TCE node client");
sending_tce.shutdown().await?;
receiving_tce.shutdown().await?;
Ok(())
}

0 comments on commit 33d837c

Please sign in to comment.