Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
fix: add next_pending_certificate on end task (#455)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd authored Feb 16, 2024
1 parent 3335846 commit 2aaa500
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 147 deletions.
6 changes: 4 additions & 2 deletions crates/topos-tce-broadcast/benches/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub async fn processing_double_echo(n: u64, validator_store: Arc<ValidatorStore>
cmd_receiver,
event_sender,
double_echo_shutdown_receiver,
validator_store,
validator_store.clone(),
broadcast_sender,
);

Expand All @@ -82,7 +82,9 @@ pub async fn processing_double_echo(n: u64, validator_store: Arc<ValidatorStore>
.collect::<Vec<_>>();

for cert in &certificates {
double_echo.broadcast(cert.certificate.clone(), true).await;
_ = validator_store
.insert_pending_certificate(&cert.certificate)
.unwrap();
}

for cert in &certificates {
Expand Down
93 changes: 1 addition & 92 deletions crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ use std::sync::Arc;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use topos_config::tce::broadcast::ReliableBroadcastParams;
use topos_core::{
types::ValidatorId,
uci::{Certificate, CertificateId},
};
use topos_core::{types::ValidatorId, uci::CertificateId};
use topos_crypto::messages::{MessageSigner, Signature};
use topos_tce_storage::store::ReadStore;
use topos_tce_storage::types::CertificateDeliveredWithPositions;
Expand Down Expand Up @@ -138,8 +135,6 @@ impl DoubleEcho {
Some(command) = self.command_receiver.recv() => {
match command {

DoubleEchoCommand::Broadcast { need_gossip, cert } => self.broadcast(cert, need_gossip).await,

command if self.subscriptions.is_some() => {
match command {
DoubleEchoCommand::Echo { certificate_id, validator_id, signature } => {
Expand Down Expand Up @@ -204,92 +199,6 @@ impl DoubleEcho {
}
}

impl DoubleEcho {
/// Called to process new certificate submitted from the API or received on
/// the gossip p2p layer
pub async fn broadcast(&mut self, cert: Certificate, origin: bool) {
info!("🙌 Starting broadcasting the Certificate {}", &cert.id);
if self.cert_pre_broadcast_check(&cert).is_err() {
error!("Failure on the pre-check for the Certificate {}", &cert.id);
self.event_sender
.try_send(ProtocolEvents::BroadcastFailed {
certificate_id: cert.id,
})
.unwrap();
return;
}

match self.validator_store.get_certificate(&cert.id) {
Ok(Some(_)) => {
_ = self
.event_sender
.send(ProtocolEvents::AlreadyDelivered {
certificate_id: cert.id,
})
.await;
}
Ok(None) => {
if self
.delivery_state_for_new_cert(cert, origin)
.await
.is_none()
{
error!("Ill-formed samples");
_ = self.event_sender.try_send(ProtocolEvents::Die);
}
}
Err(storage_error) => {
error!(
"Unable to broadcast the Certificate {} due to {:?}",
&cert.id, storage_error
);
}
}
}

/// Build initial delivery state
async fn delivery_state_for_new_cert(
&mut self,
certificate: Certificate,
origin: bool,
) -> Option<bool> {
let subscriptions = self.subscriptions.clone();

// Check whether inbound sets are empty
if subscriptions.echo.is_empty() || subscriptions.ready.is_empty() {
error!(
"One Subscription sample is empty: Echo({}), Ready({})",
subscriptions.echo.is_empty(),
subscriptions.ready.is_empty(),
);
None
} else {
_ = self
.task_manager_message_sender
.send(DoubleEchoCommand::Broadcast {
need_gossip: origin,
cert: certificate,
})
.await;

Some(true)
}
}

/// Checks done before starting to broadcast
fn cert_pre_broadcast_check(&self, cert: &Certificate) -> Result<(), ()> {
if cert.check_signature().is_err() {
error!("Error on the signature");
}

if cert.check_proof().is_err() {
error!("Error on the proof");
}

Ok(())
}
}

impl DoubleEcho {
pub async fn handle_echo(
&mut self,
Expand Down
2 changes: 0 additions & 2 deletions crates/topos-tce-broadcast/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,4 @@ pub enum ProtocolEvents {
signature: Signature,
validator_id: ValidatorId,
},
/// For simulation purpose, for now only caused by ill-formed sampling
Die,
}
30 changes: 1 addition & 29 deletions crates/topos-tce-broadcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,9 @@ use topos_config::tce::broadcast::ReliableBroadcastParams;
use topos_core::types::ValidatorId;
use topos_core::uci::{Certificate, CertificateId};
use topos_crypto::messages::{MessageSigner, Signature};
use topos_metrics::DOUBLE_ECHO_COMMAND_CHANNEL_CAPACITY_TOTAL;
use topos_tce_storage::types::CertificateDeliveredWithPositions;
use topos_tce_storage::validator::ValidatorStore;
use tracing::{debug, error, info};
use tracing::{debug, error};

pub use topos_core::uci;

Expand Down Expand Up @@ -159,33 +158,6 @@ impl ReliableBroadcastClient {
self.command_sender.clone()
}

/// Use to broadcast new certificate to the TCE network
pub async fn broadcast_new_certificate(
&self,
certificate: Certificate,
origin: bool,
) -> Result<(), ()> {
let broadcast_commands = self.command_sender.clone();

if broadcast_commands.capacity() <= *constant::COMMAND_CHANNEL_CAPACITY {
DOUBLE_ECHO_COMMAND_CHANNEL_CAPACITY_TOTAL.inc();
}

info!("Send certificate to be broadcast");
if broadcast_commands
.send(DoubleEchoCommand::Broadcast {
cert: certificate,
need_gossip: origin,
})
.await
.is_err()
{
error!("Unable to send broadcast_new_certificate command, Receiver was dropped");
}

Ok(())
}

pub async fn shutdown(&self) -> Result<(), Errors> {
debug!("Shutting down reliable broadcast client");
let (double_echo_sender, double_echo_receiver) = oneshot::channel();
Expand Down
47 changes: 33 additions & 14 deletions crates/topos-tce-broadcast/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,32 @@ impl TaskManager {
}
}

/// Fetch the next pending certificates from the storage and create tasks for them.
/// This method is called periodically to check for new pending certificates and when
/// a task has finished.
fn next_pending_certificate(&mut self) {
debug!("Checking for next pending_certificates");
match self.validator_store.get_next_pending_certificates(
&self.latest_pending_id,
*PENDING_LIMIT_PER_REQUEST_TO_STORAGE,
) {
Ok(pendings) => {
debug!("Received {} pending certificates", pendings.len());
for (pending_id, certificate) in pendings {
debug!(
"Creating task for pending certificate {} at position {} if needed",
certificate.id, pending_id
);
self.create_task(&certificate, true);
self.latest_pending_id = pending_id;
}
}
Err(error) => {
error!("Error while fetching pending certificates: {:?}", error);
}
}
}

pub async fn run(mut self, shutdown_receiver: CancellationToken) {
let mut interval = tokio::time::interval(Duration::from_secs(1));

Expand All @@ -95,20 +121,7 @@ impl TaskManager {
biased;

_ = interval.tick() => {
debug!("Checking for next pending_certificates");
match self.validator_store.get_next_pending_certificates(&self.latest_pending_id, *PENDING_LIMIT_PER_REQUEST_TO_STORAGE) {
Ok(pendings) => {
debug!("Received {} pending certificates", pendings.len());
for (pending_id, certificate) in pendings {
debug!("Creating task for pending certificate {} at position {} if needed", certificate.id, pending_id);
self.create_task(&certificate, true);
self.latest_pending_id = pending_id;
}
}
Err(error) => {
error!("Error while fetching pending certificates: {:?}", error);
}
}
self.next_pending_certificate();
}
Some(msg) = self.message_receiver.recv() => {
match msg {
Expand Down Expand Up @@ -136,9 +149,12 @@ impl TaskManager {
debug!("Task for certificate {} finished successfully", certificate_id);
self.tasks.remove(&certificate_id);
DOUBLE_ECHO_ACTIVE_TASKS_COUNT.dec();

} else {
debug!("Task for certificate {} finished unsuccessfully", certificate_id);
}

self.next_pending_certificate();
}

_ = shutdown_receiver.cancelled() => {
Expand Down Expand Up @@ -186,6 +202,9 @@ impl TaskManager {
}
}

/// Create a new task for the given certificate and add it to the running tasks.
/// If the previous certificate is not available yet, the task will be created but not started.
/// This method is called when a pending certificate is fetched from the storage.
fn create_task(&mut self, cert: &Certificate, need_gossip: bool) {
match self.tasks.entry(cert.id) {
std::collections::hash_map::Entry::Vacant(entry) => {
Expand Down
16 changes: 11 additions & 5 deletions crates/topos-tce-broadcast/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ struct TceParams {
struct Context {
event_receiver: Receiver<ProtocolEvents>,
broadcast_receiver: broadcast::Receiver<CertificateDeliveredWithPositions>,
validator_store: Arc<ValidatorStore>,
}

async fn create_context(params: TceParams) -> (DoubleEcho, Context) {
Expand Down Expand Up @@ -80,7 +81,7 @@ async fn create_context(params: TceParams) -> (DoubleEcho, Context) {
cmd_receiver,
event_sender,
double_echo_shutdown_receiver,
validator_store,
validator_store.clone(),
broadcast_sender,
);

Expand All @@ -91,6 +92,7 @@ async fn create_context(params: TceParams) -> (DoubleEcho, Context) {
Context {
event_receiver,
broadcast_receiver,
validator_store,
},
)
}
Expand Down Expand Up @@ -178,8 +180,10 @@ async fn trigger_success_path_upon_reaching_threshold(#[case] params: TceParams)
Certificate::new_with_default_fields(PREV_CERTIFICATE_ID, SOURCE_SUBNET_ID_1, &[])
.expect("Dummy certificate");

// Trigger Echo upon dispatching
double_echo.broadcast(dummy_cert.clone(), true).await;
_ = ctx
.validator_store
.insert_pending_certificate(&dummy_cert)
.unwrap();

assert!(matches!(
ctx.event_receiver.recv().await,
Expand Down Expand Up @@ -230,8 +234,10 @@ async fn trigger_ready_when_reached_enough_ready(#[case] params: TceParams) {
Certificate::new_with_default_fields(PREV_CERTIFICATE_ID, SOURCE_SUBNET_ID_1, &[])
.expect("Dummy certificate");

// Trigger Echo upon dispatching
double_echo.broadcast(dummy_cert.clone(), true).await;
_ = ctx
.validator_store
.insert_pending_certificate(&dummy_cert)
.unwrap();

assert!(matches!(
ctx.event_receiver.recv().await,
Expand Down
3 changes: 3 additions & 0 deletions crates/topos-tce-broadcast/src/tests/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tokio::{
};
use tokio_util::sync::CancellationToken;
use topos_crypto::{messages::MessageSigner, validator_id::ValidatorId};
use topos_metrics::DOUBLE_ECHO_ACTIVE_TASKS_COUNT;
use topos_tce_storage::validator::ValidatorStore;
use topos_test_sdk::{
certificates::create_certificate_chain,
Expand Down Expand Up @@ -80,4 +81,6 @@ async fn can_start(#[future] create_validator_store: Arc<ValidatorStore>) {
cert: parent.certificate.clone(),
})
.await;

assert_eq!(DOUBLE_ECHO_ACTIVE_TASKS_COUNT.get(), 1);
}
3 changes: 0 additions & 3 deletions crates/topos-tce/src/app_context/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ impl AppContext {
ProtocolEvents::AlreadyDelivered { certificate_id } => {
info!("Certificate {certificate_id} already delivered")
}
ProtocolEvents::Die => {
error!("The DoubleEcho unexpectedly died, this is unrecoverable")
}
_ => {}
}
}
Expand Down

0 comments on commit 2aaa500

Please sign in to comment.