Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
docs: adding some docs to the tce-proxy (#316)
Browse files Browse the repository at this point in the history
  • Loading branch information
dvdplm authored Sep 29, 2023
1 parent 8898199 commit 745a57d
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 13 deletions.
2 changes: 1 addition & 1 deletion crates/topos-sequencer/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl AppContext {
let (tce_proxy_worker, _source_head_certificate_id) =
match TceProxyWorker::new(topos_tce_proxy::TceProxyConfig {
subnet_id: config.subnet_id,
base_tce_api_url: config.base_tce_api_url.clone(),
tce_endpoint: config.tce_endpoint.clone(),
positions: Vec::new(), // TODO: acquire from subnet
})
.await
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-sequencer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub async fn launch(
// TODO: Revise this approach?
let (tce_proxy_worker, source_head_certificate_id) = match TceProxyWorker::new(TceProxyConfig {
subnet_id,
base_tce_api_url: config.tce_grpc_endpoint.clone(),
tce_endpoint: config.tce_grpc_endpoint.clone(),
positions: target_subnet_stream_positions,
})
.await
Expand Down
17 changes: 11 additions & 6 deletions crates/topos-tce-proxy/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//!
//! The module to handle incoming events from the friendly TCE node
//! Handles incoming events from the friendly TCE node
//!
pub mod client;
pub mod worker;
Expand Down Expand Up @@ -52,6 +52,7 @@ pub enum Error {
},
}

/// Control the TceProxy
#[derive(Debug)]
pub enum TceProxyCommand {
/// Submit a newly created certificate to the TCE
Expand All @@ -64,29 +65,33 @@ pub enum TceProxyCommand {
Shutdown(tokio::sync::oneshot::Sender<()>),
}

/// Events related to synchronizing certificates with the TCE network.
#[derive(Debug, Clone)]
pub enum TceProxyEvent {
/// New delivered certificate (and its position) fetched from the TCE network
NewDeliveredCerts {
certificates: Vec<(Certificate, u64)>,
ctx: Context,
},
/// Failed watching certificates channel
/// Requires restart of sequencer tce proxy
/// Failed watching certificates channel. Requires a restart of the sequencer tce proxy to recover.
WatchCertificatesChannelFailed,
}

/// Configuration data for the TCE proxy, used to configure the [`TceProxyWorker`].
pub struct TceProxyConfig {
/// The [`SubnetId`] this config handles certificate proxying for.
pub subnet_id: SubnetId,
pub base_tce_api_url: String,
/// The GRPC endpoint where the Sequencer is expecting to find a TCE node.
pub tce_endpoint: String,
/// The positions in the index of the known Certificates.
pub positions: Vec<TargetStreamPosition>,
}

async fn connect_to_tce_service_with_retry(
endpoint: String,
) -> Result<ApiServiceClient<tonic::transport::channel::Channel>, Error> {
info!(
"Connecting to the TCE at {} using backoff strategy...",
"Connecting to the TCE at {} using the exponential backoff strategy...",
endpoint
);
let op = || async {
Expand All @@ -102,7 +107,7 @@ async fn connect_to_tce_service_with_retry(
backoff::future::retry(backoff::ExponentialBackoff::default(), op)
.await
.map_err(|e| {
error!("Failed to connect to the TCE: {e}");
error!("Failed to connect to the TCE at {}: {e}", &endpoint);
Error::TonicTransportError { source: e }
})
}
10 changes: 7 additions & 3 deletions crates/topos-tce-proxy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ use tracing_opentelemetry::OpenTelemetrySpanExt;

/// Proxy with the TCE
///
/// 1) Fetch the delivered certificates from the TCE
/// 2) Submit the new certificate to the TCE
/// Performs two tasks:
/// 1) Fetch the certificates that were delivered from the TCE
/// 2) Submit the new certificates to the TCE
pub struct TceProxyWorker {
/// The [`TceProxyConfig`] used to setup this worker.
pub config: TceProxyConfig,
commands: mpsc::Sender<TceProxyCommand>,
events: mpsc::Receiver<TceProxyEvent>,
}

impl TceProxyWorker {
/// Construct a new [`TceProxyWorker`] with a 128 items deep channel to send commands to and receive events from a TCE node on the given subnet.
/// The worker holds a [`crate::client::TceClient`]
pub async fn new(config: TceProxyConfig) -> Result<(Self, Option<(Certificate, u64)>), Error> {
let (command_sender, mut command_rcv) = mpsc::channel::<TceProxyCommand>(128);
let (evt_sender, evt_rcv) = mpsc::channel::<TceProxyEvent>(128);
Expand All @@ -25,7 +29,7 @@ impl TceProxyWorker {

let (mut tce_client, mut receiving_certificate_stream) = TceClientBuilder::default()
.set_subnet_id(config.subnet_id)
.set_tce_endpoint(&config.base_tce_api_url)
.set_tce_endpoint(&config.tce_endpoint)
.set_proxy_event_sender(evt_sender.clone())
.build_and_launch(shutdown_receiver)
.await?;
Expand Down
3 changes: 1 addition & 2 deletions crates/topos-tce-proxy/tests/tce_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,12 +551,11 @@ async fn test_tce_proxy_submit_certificate(
&[TARGET_SUBNET_ID_1],
5,
));

// Create tce proxy client
let (tce_proxy_worker, _source_head_certificate_id) =
match TceProxyWorker::new(TceProxyConfig {
subnet_id: source_subnet_id,
base_tce_api_url: context.api_entrypoint.clone(),
tce_endpoint: context.api_entrypoint.clone(),
positions: target_subnet_stream_positions,
})
.await
Expand Down

0 comments on commit 745a57d

Please sign in to comment.