Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
fix: submit certificate to tce using backoff algorithm in case of err…
Browse files Browse the repository at this point in the history
…or (#313)
  • Loading branch information
atanmarko authored and hadjiszs committed Sep 28, 2023
1 parent e041392 commit 7638b5c
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 23 deletions.
2 changes: 1 addition & 1 deletion crates/topos-tce-broadcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl ReliableBroadcastClient {
event_sender,
double_echo_shutdown_receiver,
pending_certificate_count,
validator_store.clone(),
validator_store,
broadcast_sender,
);

Expand Down
1 change: 1 addition & 0 deletions crates/topos-tce-broadcast/src/task_manager_futures/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub struct TaskManager {
}

impl TaskManager {
#[allow(clippy::too_many_arguments)]
pub fn new(
message_receiver: mpsc::Receiver<DoubleEchoCommand>,
task_completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>,
Expand Down
61 changes: 41 additions & 20 deletions crates/topos-tce-proxy/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use topos_core::{
},
uci::{Certificate, SubnetId},
};
use tracing::{error, info, info_span, warn, Instrument, Span};
use tracing::{debug, error, info, info_span, warn, Instrument, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;

const CERTIFICATE_OUTBOUND_CHANNEL_SIZE: usize = 100;
Expand Down Expand Up @@ -50,6 +50,15 @@ pub(crate) enum TceClientCommand {
Shutdown,
}

/// Create new backoff library error based on error that happened
pub(crate) fn new_tce_proxy_backoff_err<E: std::fmt::Display>(err: E) -> backoff::Error<E> {
// Retry according to backoff policy
backoff::Error::Transient {
err,
retry_after: None,
}
}

pub struct TceClient {
subnet_id: topos_core::uci::SubnetId,
tce_endpoint: String,
Expand Down Expand Up @@ -349,30 +358,42 @@ impl TceClientBuilder {
let span = info_span!(parent: &span, "SendCertificate", %cert_id, %previous_cert_id, %tce_endpoint);
let context = span.context();

let mut request = SubmitCertificateRequest {
certificate: Some(topos_core::api::grpc::uci::v1::Certificate::from(*cert)),
}.into_request();

let mut span_context = topos_telemetry::TonicMetaInjector(request.metadata_mut());

span_context.inject(&context);

let tce_endpoint = tce_endpoint.clone();
let mut tce_grpc_client = tce_grpc_client.clone();
let tce_grpc_client = tce_grpc_client.clone();
let context_backoff = context.clone();
certificate_to_send.push(async move {
match tce_grpc_client
debug!("Submitting certificate {} to the TCE using backoff strategy...", &tce_endpoint);
let cert = cert.clone();
let op = || async {
let mut tce_grpc_client = tce_grpc_client.clone();
let mut request = SubmitCertificateRequest {
certificate: Some(topos_core::api::grpc::uci::v1::Certificate::from(*(cert.clone()))),
}.into_request();

let mut span_context = topos_telemetry::TonicMetaInjector(request.metadata_mut());
span_context.inject(&context_backoff);

tce_grpc_client
.submit_certificate(request)
.with_current_context()
.with_context(context_backoff.clone())
.instrument(Span::current())
.await
.map(|r| r.into_inner()) {
Ok(_response)=> {
info!("Successfully sent the Certificate {} (previous: {}) to the TCE at {}", &cert_id, &previous_cert_id, &tce_endpoint);
}
Err(e) => {
error!("Failed to submit the Certificate to the TCE at {}: {e}", &tce_endpoint);
}
}
.map(|_response| {
info!("Successfully submitted the Certificate {} (previous: {}) to the TCE at {}",
&cert_id, &previous_cert_id, &tce_endpoint);
})
.map_err(|e| {
error!("Failed to submit the Certificate to the TCE at {}, will retry: {e}", &tce_endpoint);
new_tce_proxy_backoff_err(e)
})
};

backoff::future::retry(backoff::ExponentialBackoff::default(), op)
.await
.map_err(|e| {
error!("Failed to submit certificate to the TCE: {e}");
e
})
}
.with_context(context)
.instrument(span));
Expand Down
81 changes: 80 additions & 1 deletion crates/topos-tce-proxy/tests/tce_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ use topos_core::api::grpc::tce::v1::{
use topos_core::api::grpc::uci::v1::Certificate;
use topos_core::types::CertificateDelivered;
use topos_core::uci::SUBNET_ID_LENGTH;
use tracing::{debug, error, info};
use topos_tce_proxy::worker::TceProxyWorker;
use topos_tce_proxy::{TceProxyCommand, TceProxyConfig};
use tracing::{debug, error, info, warn};

use topos_test_sdk::{
certificates::create_certificate_chain,
Expand Down Expand Up @@ -532,3 +534,80 @@ fn input_certificates() -> Vec<CertificateDelivered> {

certificates
}

#[rstest]
#[test(tokio::test)]
async fn test_tce_proxy_submit_certificate(
#[future] start_node: TceContext,
) -> Result<(), Box<dyn std::error::Error>> {
let mut context = start_node.await;

let source_subnet_id = SOURCE_SUBNET_ID_1;
let target_subnet_stream_positions = Vec::new();

let mut certificates = Vec::new();
certificates.append(&mut create_certificate_chain(
SOURCE_SUBNET_ID_1,
&[TARGET_SUBNET_ID_1],
5,
));

// Create tce proxy client
let (tce_proxy_worker, _source_head_certificate_id) =
match TceProxyWorker::new(TceProxyConfig {
subnet_id: source_subnet_id,
base_tce_api_url: context.api_entrypoint.clone(),
positions: target_subnet_stream_positions,
})
.await
{
Ok((tce_proxy_worker, mut source_head_certificate)) => {
if let Some((cert, _position)) = &mut source_head_certificate {
if cert.id == CertificateId::default() {
warn!(
"Tce has not provided source head certificate, starting from subnet \
genesis block..."
);
source_head_certificate = None;
}
}

info!(
"TCE proxy client is starting for the source subnet {:?} from the head {:?}",
source_subnet_id, source_head_certificate
);
let source_head_certificate_id =
source_head_certificate.map(|(cert, position)| (cert.id, position));
(tce_proxy_worker, source_head_certificate_id)
}
Err(e) => {
panic!("Unable to create TCE Proxy: {e}");
}
};

for (index, cert) in certificates.into_iter().enumerate() {
match tce_proxy_worker
.send_command(TceProxyCommand::SubmitCertificate {
cert: Box::new(cert.certificate),
ctx: Default::default(),
})
.await
{
Ok(_) => {
info!("Certificate {} successfully submitted", index);
}
Err(e) => {
panic!("Error submitting certificate: {e}");
}
}
}

// Wait for certificates to be submitted
tokio::time::sleep(Duration::from_secs(5)).await;

// TODO: get pending certificates from TCE to make sure they were actually submitted

info!("Shutting down TCE node client");
context.shutdown().await?;
Ok(())
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fn encode() {
request_id: Some(request_id),
checkpoint_diff: vec![CheckpointMapFieldEntry {
key: subnet.to_string(),
value: vec![cert.proof_of_delivery.clone().into()],
value: vec![cert.proof_of_delivery.into()],
}],
};

Expand Down

0 comments on commit 7638b5c

Please sign in to comment.