From 2aaa50071ca14415bb0284930c404659b6c463d8 Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Fri, 16 Feb 2024 13:59:12 +0100 Subject: [PATCH] fix: add next_pending_certificate on end task (#455) Signed-off-by: Simon Paitrault --- .../benches/task_manager.rs | 6 +- .../src/double_echo/mod.rs | 93 +------------------ crates/topos-tce-broadcast/src/event.rs | 2 - crates/topos-tce-broadcast/src/lib.rs | 30 +----- .../src/task_manager/mod.rs | 47 +++++++--- crates/topos-tce-broadcast/src/tests/mod.rs | 16 +++- .../src/tests/task_manager.rs | 3 + crates/topos-tce/src/app_context/protocol.rs | 3 - 8 files changed, 53 insertions(+), 147 deletions(-) diff --git a/crates/topos-tce-broadcast/benches/task_manager.rs b/crates/topos-tce-broadcast/benches/task_manager.rs index 6a7b57561..7ad0d4858 100644 --- a/crates/topos-tce-broadcast/benches/task_manager.rs +++ b/crates/topos-tce-broadcast/benches/task_manager.rs @@ -56,7 +56,7 @@ pub async fn processing_double_echo(n: u64, validator_store: Arc cmd_receiver, event_sender, double_echo_shutdown_receiver, - validator_store, + validator_store.clone(), broadcast_sender, ); @@ -82,7 +82,9 @@ pub async fn processing_double_echo(n: u64, validator_store: Arc .collect::>(); for cert in &certificates { - double_echo.broadcast(cert.certificate.clone(), true).await; + _ = validator_store + .insert_pending_certificate(&cert.certificate) + .unwrap(); } for cert in &certificates { diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index 0c3930d6a..702e67596 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -20,10 +20,7 @@ use std::sync::Arc; use tokio::sync::{broadcast, mpsc, oneshot}; use tokio_util::sync::CancellationToken; use topos_config::tce::broadcast::ReliableBroadcastParams; -use topos_core::{ - types::ValidatorId, - uci::{Certificate, CertificateId}, -}; +use topos_core::{types::ValidatorId, uci::CertificateId}; use topos_crypto::messages::{MessageSigner, Signature}; use topos_tce_storage::store::ReadStore; use topos_tce_storage::types::CertificateDeliveredWithPositions; @@ -138,8 +135,6 @@ impl DoubleEcho { Some(command) = self.command_receiver.recv() => { match command { - DoubleEchoCommand::Broadcast { need_gossip, cert } => self.broadcast(cert, need_gossip).await, - command if self.subscriptions.is_some() => { match command { DoubleEchoCommand::Echo { certificate_id, validator_id, signature } => { @@ -204,92 +199,6 @@ impl DoubleEcho { } } -impl DoubleEcho { - /// Called to process new certificate submitted from the API or received on - /// the gossip p2p layer - pub async fn broadcast(&mut self, cert: Certificate, origin: bool) { - info!("🙌 Starting broadcasting the Certificate {}", &cert.id); - if self.cert_pre_broadcast_check(&cert).is_err() { - error!("Failure on the pre-check for the Certificate {}", &cert.id); - self.event_sender - .try_send(ProtocolEvents::BroadcastFailed { - certificate_id: cert.id, - }) - .unwrap(); - return; - } - - match self.validator_store.get_certificate(&cert.id) { - Ok(Some(_)) => { - _ = self - .event_sender - .send(ProtocolEvents::AlreadyDelivered { - certificate_id: cert.id, - }) - .await; - } - Ok(None) => { - if self - .delivery_state_for_new_cert(cert, origin) - .await - .is_none() - { - error!("Ill-formed samples"); - _ = self.event_sender.try_send(ProtocolEvents::Die); - } - } - Err(storage_error) => { - error!( - "Unable to broadcast the Certificate {} due to {:?}", - &cert.id, storage_error - ); - } - } - } - - /// Build initial delivery state - async fn delivery_state_for_new_cert( - &mut self, - certificate: Certificate, - origin: bool, - ) -> Option { - let subscriptions = self.subscriptions.clone(); - - // Check whether inbound sets are empty - if subscriptions.echo.is_empty() || subscriptions.ready.is_empty() { - error!( - "One Subscription sample is empty: Echo({}), Ready({})", - subscriptions.echo.is_empty(), - subscriptions.ready.is_empty(), - ); - None - } else { - _ = self - .task_manager_message_sender - .send(DoubleEchoCommand::Broadcast { - need_gossip: origin, - cert: certificate, - }) - .await; - - Some(true) - } - } - - /// Checks done before starting to broadcast - fn cert_pre_broadcast_check(&self, cert: &Certificate) -> Result<(), ()> { - if cert.check_signature().is_err() { - error!("Error on the signature"); - } - - if cert.check_proof().is_err() { - error!("Error on the proof"); - } - - Ok(()) - } -} - impl DoubleEcho { pub async fn handle_echo( &mut self, diff --git a/crates/topos-tce-broadcast/src/event.rs b/crates/topos-tce-broadcast/src/event.rs index b8e530800..78fcf4314 100644 --- a/crates/topos-tce-broadcast/src/event.rs +++ b/crates/topos-tce-broadcast/src/event.rs @@ -34,6 +34,4 @@ pub enum ProtocolEvents { signature: Signature, validator_id: ValidatorId, }, - /// For simulation purpose, for now only caused by ill-formed sampling - Die, } diff --git a/crates/topos-tce-broadcast/src/lib.rs b/crates/topos-tce-broadcast/src/lib.rs index b3dbc5d4d..0ae98ee3d 100644 --- a/crates/topos-tce-broadcast/src/lib.rs +++ b/crates/topos-tce-broadcast/src/lib.rs @@ -46,10 +46,9 @@ use topos_config::tce::broadcast::ReliableBroadcastParams; use topos_core::types::ValidatorId; use topos_core::uci::{Certificate, CertificateId}; use topos_crypto::messages::{MessageSigner, Signature}; -use topos_metrics::DOUBLE_ECHO_COMMAND_CHANNEL_CAPACITY_TOTAL; use topos_tce_storage::types::CertificateDeliveredWithPositions; use topos_tce_storage::validator::ValidatorStore; -use tracing::{debug, error, info}; +use tracing::{debug, error}; pub use topos_core::uci; @@ -159,33 +158,6 @@ impl ReliableBroadcastClient { self.command_sender.clone() } - /// Use to broadcast new certificate to the TCE network - pub async fn broadcast_new_certificate( - &self, - certificate: Certificate, - origin: bool, - ) -> Result<(), ()> { - let broadcast_commands = self.command_sender.clone(); - - if broadcast_commands.capacity() <= *constant::COMMAND_CHANNEL_CAPACITY { - DOUBLE_ECHO_COMMAND_CHANNEL_CAPACITY_TOTAL.inc(); - } - - info!("Send certificate to be broadcast"); - if broadcast_commands - .send(DoubleEchoCommand::Broadcast { - cert: certificate, - need_gossip: origin, - }) - .await - .is_err() - { - error!("Unable to send broadcast_new_certificate command, Receiver was dropped"); - } - - Ok(()) - } - pub async fn shutdown(&self) -> Result<(), Errors> { debug!("Shutting down reliable broadcast client"); let (double_echo_sender, double_echo_receiver) = oneshot::channel(); diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index 288c773aa..c5f4921c6 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -87,6 +87,32 @@ impl TaskManager { } } + /// Fetch the next pending certificates from the storage and create tasks for them. + /// This method is called periodically to check for new pending certificates and when + /// a task has finished. + fn next_pending_certificate(&mut self) { + debug!("Checking for next pending_certificates"); + match self.validator_store.get_next_pending_certificates( + &self.latest_pending_id, + *PENDING_LIMIT_PER_REQUEST_TO_STORAGE, + ) { + Ok(pendings) => { + debug!("Received {} pending certificates", pendings.len()); + for (pending_id, certificate) in pendings { + debug!( + "Creating task for pending certificate {} at position {} if needed", + certificate.id, pending_id + ); + self.create_task(&certificate, true); + self.latest_pending_id = pending_id; + } + } + Err(error) => { + error!("Error while fetching pending certificates: {:?}", error); + } + } + } + pub async fn run(mut self, shutdown_receiver: CancellationToken) { let mut interval = tokio::time::interval(Duration::from_secs(1)); @@ -95,20 +121,7 @@ impl TaskManager { biased; _ = interval.tick() => { - debug!("Checking for next pending_certificates"); - match self.validator_store.get_next_pending_certificates(&self.latest_pending_id, *PENDING_LIMIT_PER_REQUEST_TO_STORAGE) { - Ok(pendings) => { - debug!("Received {} pending certificates", pendings.len()); - for (pending_id, certificate) in pendings { - debug!("Creating task for pending certificate {} at position {} if needed", certificate.id, pending_id); - self.create_task(&certificate, true); - self.latest_pending_id = pending_id; - } - } - Err(error) => { - error!("Error while fetching pending certificates: {:?}", error); - } - } + self.next_pending_certificate(); } Some(msg) = self.message_receiver.recv() => { match msg { @@ -136,9 +149,12 @@ impl TaskManager { debug!("Task for certificate {} finished successfully", certificate_id); self.tasks.remove(&certificate_id); DOUBLE_ECHO_ACTIVE_TASKS_COUNT.dec(); + } else { debug!("Task for certificate {} finished unsuccessfully", certificate_id); } + + self.next_pending_certificate(); } _ = shutdown_receiver.cancelled() => { @@ -186,6 +202,9 @@ impl TaskManager { } } + /// Create a new task for the given certificate and add it to the running tasks. + /// If the previous certificate is not available yet, the task will be created but not started. + /// This method is called when a pending certificate is fetched from the storage. fn create_task(&mut self, cert: &Certificate, need_gossip: bool) { match self.tasks.entry(cert.id) { std::collections::hash_map::Entry::Vacant(entry) => { diff --git a/crates/topos-tce-broadcast/src/tests/mod.rs b/crates/topos-tce-broadcast/src/tests/mod.rs index 521b27382..e22d8b1c5 100644 --- a/crates/topos-tce-broadcast/src/tests/mod.rs +++ b/crates/topos-tce-broadcast/src/tests/mod.rs @@ -48,6 +48,7 @@ struct TceParams { struct Context { event_receiver: Receiver, broadcast_receiver: broadcast::Receiver, + validator_store: Arc, } async fn create_context(params: TceParams) -> (DoubleEcho, Context) { @@ -80,7 +81,7 @@ async fn create_context(params: TceParams) -> (DoubleEcho, Context) { cmd_receiver, event_sender, double_echo_shutdown_receiver, - validator_store, + validator_store.clone(), broadcast_sender, ); @@ -91,6 +92,7 @@ async fn create_context(params: TceParams) -> (DoubleEcho, Context) { Context { event_receiver, broadcast_receiver, + validator_store, }, ) } @@ -178,8 +180,10 @@ async fn trigger_success_path_upon_reaching_threshold(#[case] params: TceParams) Certificate::new_with_default_fields(PREV_CERTIFICATE_ID, SOURCE_SUBNET_ID_1, &[]) .expect("Dummy certificate"); - // Trigger Echo upon dispatching - double_echo.broadcast(dummy_cert.clone(), true).await; + _ = ctx + .validator_store + .insert_pending_certificate(&dummy_cert) + .unwrap(); assert!(matches!( ctx.event_receiver.recv().await, @@ -230,8 +234,10 @@ async fn trigger_ready_when_reached_enough_ready(#[case] params: TceParams) { Certificate::new_with_default_fields(PREV_CERTIFICATE_ID, SOURCE_SUBNET_ID_1, &[]) .expect("Dummy certificate"); - // Trigger Echo upon dispatching - double_echo.broadcast(dummy_cert.clone(), true).await; + _ = ctx + .validator_store + .insert_pending_certificate(&dummy_cert) + .unwrap(); assert!(matches!( ctx.event_receiver.recv().await, diff --git a/crates/topos-tce-broadcast/src/tests/task_manager.rs b/crates/topos-tce-broadcast/src/tests/task_manager.rs index 3a1963d8f..4fd9ac96a 100644 --- a/crates/topos-tce-broadcast/src/tests/task_manager.rs +++ b/crates/topos-tce-broadcast/src/tests/task_manager.rs @@ -7,6 +7,7 @@ use tokio::{ }; use tokio_util::sync::CancellationToken; use topos_crypto::{messages::MessageSigner, validator_id::ValidatorId}; +use topos_metrics::DOUBLE_ECHO_ACTIVE_TASKS_COUNT; use topos_tce_storage::validator::ValidatorStore; use topos_test_sdk::{ certificates::create_certificate_chain, @@ -80,4 +81,6 @@ async fn can_start(#[future] create_validator_store: Arc) { cert: parent.certificate.clone(), }) .await; + + assert_eq!(DOUBLE_ECHO_ACTIVE_TASKS_COUNT.get(), 1); } diff --git a/crates/topos-tce/src/app_context/protocol.rs b/crates/topos-tce/src/app_context/protocol.rs index a6987be31..57dd40068 100644 --- a/crates/topos-tce/src/app_context/protocol.rs +++ b/crates/topos-tce/src/app_context/protocol.rs @@ -80,9 +80,6 @@ impl AppContext { ProtocolEvents::AlreadyDelivered { certificate_id } => { info!("Certificate {certificate_id} already delivered") } - ProtocolEvents::Die => { - error!("The DoubleEcho unexpectedly died, this is unrecoverable") - } _ => {} } }