Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
chore: adding republish_count
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Mar 12, 2024
1 parent 314e8c2 commit 6c1d916
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 1 deletion.
1 change: 1 addition & 0 deletions crates/topos-core/proto/topos/tce/v1/double_echo.proto
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import "topos/uci/v1/certification.proto";

message Gossip {
topos.uci.v1.Certificate certificate = 1;
uint64 republish_count = 2;
}

message Echo {
Expand Down
Binary file modified crates/topos-core/src/api/grpc/generated/topos.bin
Binary file not shown.
2 changes: 2 additions & 0 deletions crates/topos-core/src/api/grpc/generated/topos.tce.v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1453,6 +1453,8 @@ pub mod console_service_server {
pub struct Gossip {
#[prost(message, optional, tag = "1")]
pub certificate: ::core::option::Option<super::super::uci::v1::Certificate>,
#[prost(uint64, tag = "2")]
pub republish_count: u64,
}
#[derive(serde::Deserialize, serde::Serialize)]
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down
24 changes: 24 additions & 0 deletions crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub struct BroadcastState {
delivery_time: time::Instant,
readies: HashSet<Ready>,
pub(crate) expected_position: Option<Position>,
republish_count: u64,
}

impl BroadcastState {
Expand Down Expand Up @@ -59,6 +60,7 @@ impl BroadcastState {
delivery_time: time::Instant::now(),
readies: HashSet::new(),
expected_position: None,
republish_count: 0,
};

_ = state.event_sender.try_send(ProtocolEvents::Broadcast {
Expand All @@ -70,8 +72,10 @@ impl BroadcastState {
"📣 Gossiping the Certificate {} from the source subnet {}",
&state.certificate.id, &state.certificate.source_subnet_id
);

let _ = state.event_sender.try_send(ProtocolEvents::Gossip {
cert: state.certificate.clone(),
republish_count: state.republish_count,
});
}

Expand All @@ -80,6 +84,21 @@ impl BroadcastState {
state
}

pub(crate) fn republish_certificate(&mut self) {
if let Some(count) = self.republish_count.checked_add(1) {
self.republish_count = count;

// TODO: catch error
let _ = self.event_sender.try_send(ProtocolEvents::Gossip {
cert: self.certificate.clone(),
republish_count: self.republish_count,
});

self.status = Status::Pending;

self.update_status();
}
}
pub fn into_delivered(&self) -> CertificateDelivered {
CertificateDelivered {
certificate: self.certificate.clone(),
Expand Down Expand Up @@ -129,6 +148,7 @@ impl BroadcastState {
payload.extend_from_slice(self.certificate.id.as_array());
payload.extend_from_slice(self.validator_id.as_bytes());

// TODO: catch error
let _ = self.event_sender.try_send(ProtocolEvents::Echo {
certificate_id: self.certificate.id,
signature: self.message_signer.sign_message(&payload).ok()?,
Expand Down Expand Up @@ -250,4 +270,8 @@ impl BroadcastState {

delivery_threshold
}

pub(crate) fn received_echo(&self) -> bool {
self.subscriptions_view.echo.len() < self.subscriptions_view.network_size
}
}
1 change: 1 addition & 0 deletions crates/topos-tce-broadcast/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pub enum ProtocolEvents {
/// Indicates that 'gossip' message broadcasting is required
Gossip {
cert: Certificate,
republish_count: u64,
},
/// Indicates that 'echo' message broadcasting is required
Echo {
Expand Down
6 changes: 6 additions & 0 deletions crates/topos-tce-broadcast/src/task_manager/task.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::future::{Future, IntoFuture};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};

use topos_core::types::stream::Position;
Expand Down Expand Up @@ -96,8 +97,13 @@ impl IntoFuture for Task {
);
self.broadcast_state.expected_position = Some(expected_position);

let mut gossip_timer = tokio::time::interval(Duration::from_secs(5));
gossip_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
loop {
tokio::select! {
_tick = gossip_timer.tick(), if !self.broadcast_state.received_echo() => {
self.broadcast_state.republish_certificate();
}
Some(msg) = self.message_receiver.recv() => {
match msg {
DoubleEchoCommand::Echo { validator_id, .. } => {
Expand Down
1 change: 1 addition & 0 deletions crates/topos-tce/src/app_context/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl AppContext {
match double_echo_request {
double_echo_request::Request::Gossip(Gossip {
certificate: Some(certificate),
..
}) => match uci::Certificate::try_from(certificate) {
Ok(cert) => {
if let hash_map::Entry::Vacant(entry) =
Expand Down
6 changes: 5 additions & 1 deletion crates/topos-tce/src/app_context/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ impl AppContext {
info!("Broadcasting certificate {}", certificate_id);
}

ProtocolEvents::Gossip { cert } => {
ProtocolEvents::Gossip {
cert,
republish_count,
} => {
let cert_id = cert.id;

let request = DoubleEchoRequest {
request: Some(double_echo_request::Request::Gossip(Gossip {
certificate: Some(cert.into()),
republish_count,
})),
};

Expand Down
1 change: 1 addition & 0 deletions crates/topos-tce/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ async fn non_validator_publish_gossip(
context
.on_protocol_event(ProtocolEvents::Gossip {
cert: certificates[0].certificate.clone(),
republish_count: 0,
})
.await;

Expand Down
2 changes: 2 additions & 0 deletions crates/topos-tce/src/tests/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ async fn handle_gossip(
let msg = DoubleEchoRequest {
request: Some(double_echo_request::Request::Gossip(Gossip {
certificate: Some(certificate.into()),
republish_count: 0,
})),
};
context
Expand Down Expand Up @@ -118,6 +119,7 @@ async fn handle_already_delivered(
let msg = DoubleEchoRequest {
request: Some(double_echo_request::Request::Gossip(Gossip {
certificate: Some(certificate.into()),
republish_count: 0,
})),
};
_ = context
Expand Down

0 comments on commit 6c1d916

Please sign in to comment.