Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
refactor: improve delivery timing
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Feb 29, 2024
1 parent c41a51a commit 4217661
Show file tree
Hide file tree
Showing 14 changed files with 203 additions and 23 deletions.
40 changes: 39 additions & 1 deletion crates/topos-core/src/api/graphql/certificate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ use super::{checkpoint::SourceStreamPosition, subnet::SubnetId};
#[derive(Serialize, Deserialize, Debug, NewType)]
pub struct CertificateId(String);

impl From<uci::CertificateId> for CertificateId {
fn from(value: uci::CertificateId) -> Self {
Self(value.to_string())
}
}

#[derive(Serialize, Deserialize, Debug, SimpleObject)]
#[serde(rename_all = "camelCase")]
pub struct CertificatePositions {
Expand All @@ -30,6 +36,38 @@ pub struct Certificate {
pub positions: CertificatePositions,
}

#[derive(Debug, Serialize, Deserialize, SimpleObject)]
#[serde(rename_all = "camelCase")]
pub struct UndeliveredCertificate {
pub id: CertificateId,
pub prev_id: CertificateId,
pub proof: String,
pub signature: String,
pub source_subnet_id: SubnetId,
pub state_root: String,
pub target_subnets: Vec<SubnetId>,
pub tx_root_hash: String,
pub receipts_root_hash: String,
pub verifier: u32,
}

impl From<&crate::uci::Certificate> for UndeliveredCertificate {
fn from(value: &crate::uci::Certificate) -> Self {
Self {
id: CertificateId(value.id.to_string()),
prev_id: CertificateId(value.prev_id.to_string()),
proof: hex::encode(&value.proof),
signature: hex::encode(&value.signature),
source_subnet_id: (&value.source_subnet_id).into(),
state_root: hex::encode(value.state_root),
target_subnets: value.target_subnets.iter().map(Into::into).collect(),
tx_root_hash: hex::encode(value.tx_root_hash),
receipts_root_hash: format!("0x{}", hex::encode(value.receipts_root_hash)),
verifier: value.verifier,
}
}
}

#[derive(Debug, Serialize, Deserialize, SimpleObject)]
pub struct Ready {
message: String,
Expand All @@ -52,7 +90,7 @@ impl From<&CertificateDelivered> for Certificate {
receipts_root_hash: format!("0x{}", hex::encode(uci_cert.receipts_root_hash)),
verifier: uci_cert.verifier,
positions: CertificatePositions {
source: (&value.proof_of_delivery.delivery_position).into(),
source: (&value.proof_of_delivery).into(),
},
}
}
Expand Down
12 changes: 7 additions & 5 deletions crates/topos-core/src/api/graphql/checkpoint.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_graphql::{InputObject, SimpleObject};
use serde::{Deserialize, Serialize};

use crate::types::stream::CertificateSourceStreamPosition;
use crate::types::ProofOfDelivery;

use super::{certificate::CertificateId, subnet::SubnetId};

Expand All @@ -17,13 +17,15 @@ pub struct SourceStreamPositionInput {
pub struct SourceStreamPosition {
pub source_subnet_id: SubnetId,
pub position: u64,
pub certificate_id: CertificateId,
}

impl From<&CertificateSourceStreamPosition> for SourceStreamPosition {
fn from(value: &CertificateSourceStreamPosition) -> Self {
impl From<&ProofOfDelivery> for SourceStreamPosition {
fn from(value: &ProofOfDelivery) -> Self {
Self {
source_subnet_id: (&value.subnet_id).into(),
position: *value.position,
certificate_id: value.certificate_id.into(),
source_subnet_id: (&value.delivery_position.subnet_id).into(),
position: *value.delivery_position.position,
}
}
}
Expand Down
8 changes: 5 additions & 3 deletions crates/topos-tce-api/src/graphql/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ use crate::{
},
runtime::InternalRuntimeCommand,
};
use topos_tce_storage::fullnode::FullNodeStore;
use topos_tce_storage::validator::ValidatorStore;

use super::query::SubscriptionRoot;

#[derive(Default)]
pub struct ServerBuilder {
store: Option<Arc<FullNodeStore>>,
store: Option<Arc<ValidatorStore>>,
serve_addr: Option<SocketAddr>,
runtime: Option<mpsc::Sender<InternalRuntimeCommand>>,
}
Expand All @@ -34,7 +34,7 @@ impl ServerBuilder {

self
}
pub(crate) fn store(mut self, store: Arc<FullNodeStore>) -> Self {
pub(crate) fn store(mut self, store: Arc<ValidatorStore>) -> Self {
self.store = Some(store);

self
Expand Down Expand Up @@ -62,13 +62,15 @@ impl ServerBuilder {
.take()
.expect("Cannot build GraphQL server without a FullNode store");

let fulltnode_store = store.get_fullnode_store();
let runtime = self
.runtime
.take()
.expect("Cannot build GraphQL server without the internal runtime channel");

let schema: ServiceSchema = Schema::build(QueryRoot, EmptyMutation, SubscriptionRoot)
.data(store)
.data(fulltnode_store)
.data(runtime)
.finish();

Expand Down
79 changes: 78 additions & 1 deletion crates/topos-tce-api/src/graphql/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use async_graphql::{Context, EmptyMutation, Object, Schema, Subscription};
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use tokio::sync::{mpsc, oneshot};
use topos_core::api::graphql::certificate::UndeliveredCertificate;
use topos_core::api::graphql::checkpoint::SourceStreamPosition;
use topos_core::api::graphql::errors::GraphQLServerError;
use topos_core::api::graphql::filter::SubnetFilter;
Expand All @@ -18,6 +19,7 @@ use topos_metrics::{STORAGE_PENDING_POOL_COUNT, STORAGE_PRECEDENCE_POOL_COUNT};
use topos_tce_storage::fullnode::FullNodeStore;
use topos_tce_storage::store::ReadStore;

use topos_tce_storage::validator::ValidatorStore;
use tracing::debug;

use crate::runtime::InternalRuntimeCommand;
Expand Down Expand Up @@ -121,12 +123,44 @@ impl QueryRoot {
/// The values are estimated as having a precise count is costly.
async fn get_storage_pool_stats(
&self,
_ctx: &Context<'_>,
ctx: &Context<'_>,
) -> Result<HashMap<&str, i64>, GraphQLServerError> {
let mut stats = HashMap::new();
stats.insert("pending_pool", STORAGE_PENDING_POOL_COUNT.get());
stats.insert("precedence_pool", STORAGE_PRECEDENCE_POOL_COUNT.get());

let store = ctx.data::<Arc<ValidatorStore>>().map_err(|_| {
tracing::error!("Failed to get store from context");

GraphQLServerError::ParseDataConnector
})?;

stats.insert(
"pending_certificate_iter",
store
.get_pending_certificates()
.map_err(|_| GraphQLServerError::StorageError)?
.len() as i64,
);

stats.insert(
"iter_pending_certificate",
store
.iter_count_pending_certificates()
.map_err(|_| GraphQLServerError::StorageError)?
.try_into()
.unwrap_or(i64::MAX),
);

stats.insert(
"iter_precedence_certificate",
store
.iter_count_precedence_pool_certificates()
.map_err(|_| GraphQLServerError::StorageError)?
.try_into()
.unwrap_or(i64::MAX),
);

Ok(stats)
}

Expand All @@ -151,9 +185,52 @@ impl QueryRoot {
.map(|(subnet_id, head)| SourceStreamPosition {
source_subnet_id: subnet_id.into(),
position: *head.position,
certificate_id: head.certificate_id.into(),
})
.collect())
}

/// This endpoint is used to get the current pending pool.
/// It returns [`CertificateId`] and the [`PendingCertificateId`]
async fn get_pending_pool(
&self,
ctx: &Context<'_>,
) -> Result<HashMap<u64, CertificateId>, GraphQLServerError> {
let store = ctx.data::<Arc<ValidatorStore>>().map_err(|_| {
tracing::error!("Failed to get store from context");

GraphQLServerError::ParseDataConnector
})?;

Ok(store
.get_pending_certificates()
.map_err(|_| GraphQLServerError::StorageError)?
.iter()
.map(|(id, certificate)| (*id, certificate.id.into()))
.collect())
}

/// This endpoint is used to check if a certificate has any child certificate in the precedence pool.
async fn check_precedence(
&self,
ctx: &Context<'_>,
certificate_id: CertificateId,
) -> Result<Option<UndeliveredCertificate>, GraphQLServerError> {
let store = ctx.data::<Arc<ValidatorStore>>().map_err(|_| {
tracing::error!("Failed to get store from context");

GraphQLServerError::ParseDataConnector
})?;

store
.check_precedence(
&certificate_id
.try_into()
.map_err(|_| GraphQLServerError::ParseCertificateId)?,
)
.map_err(|_| GraphQLServerError::StorageError)
.map(|certificate| certificate.as_ref().map(Into::into))
}
}

pub struct SubscriptionRoot;
Expand Down
1 change: 1 addition & 0 deletions crates/topos-tce-api/src/graphql/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ async fn open_watch_certificate_delivered() {
source {
sourceSubnetId
position
certificateId
}
}
}
Expand Down
1 change: 0 additions & 1 deletion crates/topos-tce-api/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ impl RuntimeBuilder {
.store(
self.store
.take()
.map(|store| store.get_fullnode_store())
.expect("Unable to build GraphQL Server, Store is missing"),
)
.runtime(internal_runtime_command_sender.clone())
Expand Down
1 change: 1 addition & 0 deletions crates/topos-tce-api/tests/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,7 @@ async fn can_query_graphql_endpoint_for_certificates(
source {{
sourceSubnetId
position
certificateId
}}
}}
}}
Expand Down
7 changes: 6 additions & 1 deletion crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,12 @@ impl DoubleEcho {

command if self.subscriptions.is_some() => {
match command {
DoubleEchoCommand::Broadcast { cert, need_gossip, pending_id } => {
_ = self
.task_manager_message_sender
.send(DoubleEchoCommand::Broadcast { need_gossip, cert, pending_id })
.await;
}
DoubleEchoCommand::Echo { certificate_id, validator_id, signature } => {
// Check if source is part of known_validators
if !self.validators.contains(&validator_id) {
Expand Down Expand Up @@ -173,7 +179,6 @@ impl DoubleEcho {

self.handle_ready(certificate_id, validator_id, signature).await
},
_ => {}
}

},
Expand Down
1 change: 1 addition & 0 deletions crates/topos-tce-broadcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub enum DoubleEchoCommand {
Broadcast {
need_gossip: bool,
cert: Certificate,
pending_id: u64,
},

/// When echo reply received
Expand Down
21 changes: 11 additions & 10 deletions crates/topos-tce-broadcast/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use topos_tce_storage::validator::ValidatorStore;
use topos_tce_storage::PendingCertificateId;
use tracing::debug;
use tracing::error;
use tracing::trace;
use tracing::warn;

pub mod task;
Expand Down Expand Up @@ -99,11 +100,7 @@ impl TaskManager {
Ok(pendings) => {
debug!("Received {} pending certificates", pendings.len());
for (pending_id, certificate) in pendings {
debug!(
"Creating task for pending certificate {} at position {} if needed",
certificate.id, pending_id
);
self.create_task(&certificate, true);
self.create_task(&certificate, true, pending_id);
self.latest_pending_id = pending_id;
}
}
Expand Down Expand Up @@ -135,10 +132,10 @@ impl TaskManager {
.push(msg);
};
}
DoubleEchoCommand::Broadcast { ref cert, need_gossip } => {
debug!("Received broadcast message for certificate {} ", cert.id);
DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => {
trace!("Received broadcast message for certificate {} ", cert.id);

self.create_task(cert, need_gossip)
self.create_task(cert, need_gossip, pending_id)
}
}
}
Expand Down Expand Up @@ -205,7 +202,7 @@ impl TaskManager {
/// Create a new task for the given certificate and add it to the running tasks.
/// If the previous certificate is not available yet, the task will be created but not started.
/// This method is called when a pending certificate is fetched from the storage.
fn create_task(&mut self, cert: &Certificate, need_gossip: bool) {
fn create_task(&mut self, cert: &Certificate, need_gossip: bool, pending_id: u64) {
match self.tasks.entry(cert.id) {
std::collections::hash_map::Entry::Vacant(entry) => {
let broadcast_state = BroadcastState::new(
Expand Down Expand Up @@ -245,10 +242,14 @@ impl TaskManager {
cert.id, cert.prev_id
);
}
debug!(
"Creating task for pending certificate {} at position {} if needed",
cert.id, pending_id
);
entry.insert(task_context);
}
std::collections::hash_map::Entry::Occupied(_) => {
debug!(
trace!(
"Received broadcast message for certificate {} but it is already being \
processed",
cert.id
Expand Down
3 changes: 3 additions & 0 deletions crates/topos-tce-broadcast/src/tests/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,23 @@ async fn can_start(#[future] create_validator_store: Arc<ValidatorStore>) {
.send(crate::DoubleEchoCommand::Broadcast {
need_gossip: false,
cert: child.certificate.clone(),
pending_id: 0,
})
.await;

let _ = message_sender
.send(crate::DoubleEchoCommand::Broadcast {
need_gossip: false,
cert: parent.certificate.clone(),
pending_id: 0,
})
.await;

let _ = message_sender
.send(crate::DoubleEchoCommand::Broadcast {
need_gossip: false,
cert: parent.certificate.clone(),
pending_id: 0,
})
.await;

Expand Down
Loading

0 comments on commit 4217661

Please sign in to comment.