Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

feat: tce client additional tests #314

Merged
merged 5 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/topos-tce-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ base64ct.workspace = true
[dev-dependencies]
topos-tce-transport = { path = "../topos-tce-transport" }
topos-tce = { path = "../topos-tce" }
topos-uci = { path = "../topos-uci"}
rstest = { workspace = true, features = ["async-timeout"] }
test-log.workspace = true
env_logger.workspace = true
Expand Down
282 changes: 279 additions & 3 deletions crates/topos-tce-proxy/tests/tce_tests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use base64ct::{Base64, Encoding};
use futures::StreamExt;
use rstest::*;
use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use test_log::test;
use tokio::sync::{mpsc, oneshot};
use tokio::time::Duration;
use topos_core::api::grpc::shared::v1::positions::SourceStreamPosition;
use topos_core::api::grpc::shared::v1::{
Expand All @@ -19,8 +20,9 @@ use topos_core::api::grpc::tce::v1::{
use topos_core::api::grpc::uci::v1::Certificate;
use topos_core::types::CertificateDelivered;
use topos_core::uci::SUBNET_ID_LENGTH;
use topos_tce_proxy::client::{TceClient, TceClientBuilder};
use topos_tce_proxy::worker::TceProxyWorker;
use topos_tce_proxy::{TceProxyCommand, TceProxyConfig};
use topos_tce_proxy::{TceProxyCommand, TceProxyConfig, TceProxyEvent};
use tracing::{debug, error, info, warn};

use topos_test_sdk::{
Expand Down Expand Up @@ -551,6 +553,8 @@ async fn test_tce_proxy_submit_certificate(
&[TARGET_SUBNET_ID_1],
5,
));
let last_sent_certificate = certificates.last().unwrap().clone().certificate;

// Create tce proxy client
let (tce_proxy_worker, _source_head_certificate_id) =
match TceProxyWorker::new(TceProxyConfig {
Expand Down Expand Up @@ -604,9 +608,281 @@ async fn test_tce_proxy_submit_certificate(
// Wait for certificates to be submitted
tokio::time::sleep(Duration::from_secs(5)).await;

// TODO: get pending certificates from TCE to make sure they were actually submitted
// Get last pending certificate to check that all certificates are submitted
let (mut tce_client, _receiving_certificate_stream) =
create_tce_client(&context.api_entrypoint, SOURCE_SUBNET_ID_1).await?;
match tce_client
.get_last_pending_certificates(vec![tce_client.get_subnet_id()])
.await
{
Ok(mut pending_certificates) => {
let pending_certificate = pending_certificates
.remove(&tce_client.get_subnet_id())
.unwrap_or_default();
info!("Last pending certificate: {:?}", pending_certificate);
assert_eq!(pending_certificate.unwrap().0, last_sent_certificate);
}
Err(e) => {
panic!("Unable to retrieve latest pending certificate {e}");
}
};

info!("Shutting down TCE node client");
context.shutdown().await?;
Ok(())
}

async fn create_tce_client(
endpoint: &str,
source_subnet_id: topos_uci::SubnetId,
) -> Result<
(
TceClient,
impl futures::stream::Stream<
Item = (
topos_core::uci::Certificate,
topos_core::api::grpc::checkpoints::TargetStreamPosition,
),
>,
),
Box<dyn std::error::Error>,
> {
let (evt_sender, _evt_rcv) = mpsc::channel::<TceProxyEvent>(128);
let (_tce_client_shutdown_channel, shutdown_receiver) = mpsc::channel::<oneshot::Sender<()>>(1);

let (tce_client, receiving_certificate_stream) = TceClientBuilder::default()
.set_subnet_id(source_subnet_id)
.set_tce_endpoint(endpoint)
.set_proxy_event_sender(evt_sender.clone())
.build_and_launch(shutdown_receiver)
.await?;

tce_client.open_stream(Vec::new()).await?;

Ok((tce_client, receiving_certificate_stream))
}

#[rstest]
#[test(tokio::test)]
async fn test_tce_client_submit_and_get_last_pending_certificate(
#[future] start_node: TceContext,
) -> Result<(), Box<dyn std::error::Error>> {
let mut context = start_node.await;

let mut certificates = Vec::new();
certificates.append(&mut create_certificate_chain(
SOURCE_SUBNET_ID_1,
&[TARGET_SUBNET_ID_1],
5,
));
let last_sent_certificate = certificates.last().unwrap().clone().certificate;

let (mut tce_client, _receiving_certificate_stream) =
create_tce_client(&context.api_entrypoint, SOURCE_SUBNET_ID_1).await?;

// Create tce proxy client
for (index, cert) in certificates.into_iter().enumerate() {
match tce_client.send_certificate(cert.certificate).await {
Ok(_) => {
info!(
"Certificate {} successfully submitted by the tce client",
index
);
}
Err(e) => {
panic!("Error submitting certificate by the tce client: {e}");
}
}
}

// Wait for certificates to be submitted
tokio::time::sleep(Duration::from_secs(5)).await;

// Get last pending certificate to check that all certificates are submitted
match tce_client
.get_last_pending_certificates(vec![tce_client.get_subnet_id()])
.await
{
Ok(mut pending_certificates) => {
let pending_certificate = pending_certificates
.remove(&tce_client.get_subnet_id())
.unwrap_or_default();
info!("Last pending certificate: {:?}", pending_certificate);
assert_eq!(pending_certificate.unwrap().0, last_sent_certificate);
}
Err(e) => {
panic!("Unable to retrieve latest pending certificate {e}");
}
};

info!("Shutting down TCE node client");
context.shutdown().await?;
Ok(())
}

#[rstest]
#[test(tokio::test)]
async fn test_tce_client_get_empty_history_source_head(
#[future] start_node: TceContext,
) -> Result<(), Box<dyn std::error::Error>> {
let mut context = start_node.await;

let (mut tce_client, _receiving_certificate_stream) =
create_tce_client(&context.api_entrypoint, SOURCE_SUBNET_ID_1).await?;

// Get source head certificate, check if it is empty
match tce_client.get_source_head().await {
Ok((source_head_cert, position)) => {
info!(
"Source head certificate: {:?}, position {}",
source_head_cert, position
);

assert_eq!(source_head_cert.id, CertificateId::from([0u8; 32]));
assert_eq!(position, 0);
}
Err(e) => {
panic!("Unable to retrieve latest pending certificate {e}");
}
};
info!("Shutting down TCE node client");
context.shutdown().await?;
Ok(())
}

#[rstest]
#[test(tokio::test)]
async fn test_tce_client_get_source_head(
input_certificates: Vec<CertificateDelivered>,
#[with(input_certificates.clone())]
#[future]
start_node: TceContext,
) -> Result<(), Box<dyn std::error::Error>> {
let mut context = start_node.await;

// Tce is prefilled with delivered certificates
let source_subnet_id_1_prefilled_certificates =
&input_certificates[0..SOURCE_SUBNET_ID_1_NUMBER_OF_PREFILLED_CERTIFICATES];
let last_delivered_certificate = &source_subnet_id_1_prefilled_certificates
.last()
.unwrap()
.certificate;

let (mut tce_client, _receiving_certificate_stream) =
create_tce_client(&context.api_entrypoint, SOURCE_SUBNET_ID_1).await?;

// Get source head, check if it matches
match tce_client.get_source_head().await {
Ok((source_head_cert, position)) => {
info!(
"Source head certificate: {:?}, position {}",
source_head_cert, position
);
assert_eq!(source_head_cert, *last_delivered_certificate);
assert_eq!(
position,
SOURCE_SUBNET_ID_1_NUMBER_OF_PREFILLED_CERTIFICATES as u64 - 1
);
}
Err(e) => {
panic!("Unable to retrieve latest pending certificate {e}");
}
};

// Last pending certificate should be empty
match tce_client
.get_last_pending_certificates(vec![tce_client.get_subnet_id()])
.await
{
Ok(mut pending_certificates) => {
let pending_certificate = pending_certificates
.remove(&tce_client.get_subnet_id())
.unwrap_or_default();
info!("Last pending certificates: {:?}", pending_certificates);
assert_eq!(pending_certificate, None);
}
Err(e) => {
panic!("Unable to retrieve latest pending certificate {e}");
}
};

info!("Shutting down TCE node client");
context.shutdown().await?;
Ok(())
}

#[rstest]
#[test(tokio::test)]
#[timeout(Duration::from_secs(30))]
async fn test_tce_client_submit_and_get_certificate_delivered(
) -> Result<(), Box<dyn std::error::Error>> {
let peers_context = topos_test_sdk::tce::create_network(5, vec![]).await;
let mut peers = peers_context.into_iter();
let mut sending_tce: TceContext = peers.next().expect("valid peer 1").1;
let mut receiving_tce: TceContext = peers.next().expect("valid peer 2").1;

let mut certificates = Vec::new();
certificates.append(&mut create_certificate_chain(
SOURCE_SUBNET_ID_1,
&[TARGET_SUBNET_ID_1],
5,
));
let expected_certs: HashSet<topos_uci::CertificateId> = certificates
.iter()
.map(|cert| cert.certificate.id)
.collect();

// Create tce proxy client for sending subnet
let (mut tce_client_source, _) =
create_tce_client(&sending_tce.api_entrypoint, SOURCE_SUBNET_ID_1).await?;

// Create tce proxy client for receiving subnet
let (_, mut target_receiving_certificate_stream) =
create_tce_client(&receiving_tce.api_entrypoint, TARGET_SUBNET_ID_1).await?;

// Send certificate from source subnet
for (index, cert) in certificates.into_iter().enumerate() {
match tce_client_source.send_certificate(cert.certificate).await {
Ok(_) => {
info!(
"Certificate {} successfully submitted by the tce client",
index
);
}
Err(e) => {
panic!("Error submitting certificate by the tce client: {e}");
}
}
}

// Wait for certificates to be submitted
tokio::time::sleep(Duration::from_secs(5)).await;

// Listen for certificates on target subnet
info!("Waiting for certificates to be received on the target subnet");
let mut received_certs = HashSet::new();
loop {
match target_receiving_certificate_stream.next().await {
Some((certificate, target_position)) => {
info!(
"Delivered certificate cert id {}, position {:?}",
&certificate.id, target_position
);
received_certs.insert(certificate.id);
if received_certs.len() == expected_certs.len() && received_certs == expected_certs
{
info!("All certificates successfully received");
break;
}
}
None => {
error!("Certificate not received!")
}
}
}

info!("Shutting down TCE node client");
sending_tce.shutdown().await?;
receiving_tce.shutdown().await?;
Ok(())
}
4 changes: 2 additions & 2 deletions crates/topos-test-sdk/src/tce/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ pub async fn create_synchronizer(
topos_tce_synchronizer::Synchronizer::builder()
.with_shutdown(shutdown)
.with_store(store)
.with_gatekeeper_client(gatekeeper_client.clone())
.with_network_client(network_client.clone())
.with_gatekeeper_client(gatekeeper_client)
.with_network_client(network_client)
.build()
.expect("Can't create the Synchronizer");

Expand Down
Loading