Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
chore: adding republish_count
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Mar 12, 2024
1 parent ebe121a commit b9d7dd6
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 14 deletions.
8 changes: 6 additions & 2 deletions crates/topos-tce-api/tests/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,9 @@ async fn get_pending_pool(
create_validator_store(vec![], futures::future::ready(fullnode_store.clone())).await;

for certificate in &certificates {
_ = store.insert_pending_certificate(&certificate.certificate);
_ = store
.insert_pending_certificate(&certificate.certificate)
.await;
}

let storage_client = StorageClient::new(store.clone());
Expand Down Expand Up @@ -795,7 +797,9 @@ async fn check_precedence(
create_validator_store(vec![], futures::future::ready(fullnode_store.clone())).await;

for certificate in &certificates {
_ = store.insert_pending_certificate(&certificate.certificate);
_ = store
.insert_pending_certificate(&certificate.certificate)
.await;
}

let storage_client = StorageClient::new(store.clone());
Expand Down
27 changes: 16 additions & 11 deletions crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct BroadcastState {
readies: HashSet<Ready>,
pub(crate) expected_position: Option<Position>,
republish_count: u64,
need_gossip: bool,
}

impl BroadcastState {
Expand Down Expand Up @@ -61,6 +62,7 @@ impl BroadcastState {
readies: HashSet::new(),
expected_position: None,
republish_count: 0,
need_gossip,
};

_ = state.event_sender.try_send(ProtocolEvents::Broadcast {
Expand All @@ -85,20 +87,23 @@ impl BroadcastState {
}

pub(crate) fn republish_certificate(&mut self) {
if let Some(count) = self.republish_count.checked_add(1) {
self.republish_count = count;

// TODO: catch error
let _ = self.event_sender.try_send(ProtocolEvents::Gossip {
cert: self.certificate.clone(),
republish_count: self.republish_count,
});
if self.need_gossip {
if let Some(count) = self.republish_count.checked_add(1) {
self.republish_count = count;

// TODO: catch error
let _ = self.event_sender.try_send(ProtocolEvents::Gossip {
cert: self.certificate.clone(),
republish_count: self.republish_count,
});
}
}

self.status = Status::Pending;
self.status = Status::Pending;

self.update_status();
}
self.update_status();
}

pub fn into_delivered(&self) -> CertificateDelivered {
CertificateDelivered {
certificate: self.certificate.clone(),
Expand Down
5 changes: 4 additions & 1 deletion crates/topos-tce-broadcast/src/task_manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use topos_tce_storage::errors::StorageError;
use topos_tce_storage::store::{ReadStore, WriteStore};
use topos_tce_storage::types::CertificateDeliveredWithPositions;
use topos_tce_storage::validator::ValidatorStore;
use tracing::{error, info, warn};
use tracing::{debug, error};

use crate::double_echo::broadcast_state::{BroadcastState, Status};
use crate::{DoubleEchoCommand, TaskStatus};
Expand Down Expand Up @@ -99,6 +99,9 @@ impl IntoFuture for Task {

let mut gossip_timer = tokio::time::interval(Duration::from_secs(5));
gossip_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
// First tick is instant
gossip_timer.tick().await;

loop {
tokio::select! {
_tick = gossip_timer.tick(), if !self.broadcast_state.received_echo() => {
Expand Down

0 comments on commit b9d7dd6

Please sign in to comment.