Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
chore: refactor after review
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Mar 12, 2024
1 parent 0d255d7 commit faf394d
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 83 deletions.
2 changes: 1 addition & 1 deletion crates/topos-core/src/api/graphql/certificate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub struct UndeliveredCertificate {
pub verifier: u32,
}

impl From<&crate::uci::Certificate> for UndeliveredCertificate {
impl From<&uci::Certificate> for UndeliveredCertificate {
fn from(value: &crate::uci::Certificate) -> Self {
Self {
id: CertificateId(value.id.to_string()),
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-api/src/graphql/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ impl ServerBuilder {
.take()
.expect("Cannot build GraphQL server without a FullNode store");

let fulltnode_store = store.get_fullnode_store();
let fullnode_store = store.get_fullnode_store();
let runtime = self
.runtime
.take()
.expect("Cannot build GraphQL server without the internal runtime channel");

let schema: ServiceSchema = Schema::build(QueryRoot, EmptyMutation, SubscriptionRoot)
.data(store)
.data(fulltnode_store)
.data(fullnode_store)
.data(runtime)
.finish();

Expand Down
38 changes: 26 additions & 12 deletions crates/topos-tce-api/src/graphql/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,11 @@ impl QueryRoot {
ctx: &Context<'_>,
) -> Result<HashMap<&str, i64>, GraphQLServerError> {
let mut stats = HashMap::new();
stats.insert("pending_pool", STORAGE_PENDING_POOL_COUNT.get());
stats.insert("precedence_pool", STORAGE_PRECEDENCE_POOL_COUNT.get());
stats.insert("metrics_pending_pool", STORAGE_PENDING_POOL_COUNT.get());
stats.insert(
"metrics_precedence_pool",
STORAGE_PRECEDENCE_POOL_COUNT.get(),
);

let store = ctx.data::<Arc<ValidatorStore>>().map_err(|_| {
tracing::error!("Failed to get store from context");
Expand All @@ -136,26 +139,38 @@ impl QueryRoot {
})?;

stats.insert(
"pending_certificate_iter",
"count_pending_certificates",
store
.iter_pending_pool()
.map_err(|_| GraphQLServerError::StorageError)?
.count()
.try_into()
.unwrap_or(i64::MAX),
);

stats.insert(
"count_precedence_certificates",
store
.get_pending_certificates()
.iter_precedence_pool()
.map_err(|_| GraphQLServerError::StorageError)?
.len() as i64,
.count()
.try_into()
.unwrap_or(i64::MAX),
);

stats.insert(
"iter_pending_certificate",
"pending_pool_size",
store
.iter_count_pending_certificates()
.pending_pool_size()
.map_err(|_| GraphQLServerError::StorageError)?
.try_into()
.unwrap_or(i64::MAX),
);

stats.insert(
"iter_precedence_certificate",
"precedence_pool_size",
store
.iter_count_precedence_pool_certificates()
.precedence_pool_size()
.map_err(|_| GraphQLServerError::StorageError)?
.try_into()
.unwrap_or(i64::MAX),
Expand Down Expand Up @@ -203,10 +218,9 @@ impl QueryRoot {
})?;

Ok(store
.get_pending_certificates()
.iter_pending_pool()
.map_err(|_| GraphQLServerError::StorageError)?
.iter()
.map(|(id, certificate)| (*id, certificate.id.into()))
.map(|(id, certificate)| (id, certificate.id.into()))
.collect())
}

Expand Down
3 changes: 3 additions & 0 deletions crates/topos-tce-api/src/runtime/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ pub enum RuntimeError {

#[error("Unexpected store error: {0}")]
Store(#[from] StorageError),

#[error("Communication error: {0}")]
CommunicationError(String),
}
18 changes: 12 additions & 6 deletions crates/topos-tce-api/tests/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -675,8 +675,8 @@ async fn check_storage_pool_stats(

#[derive(Debug, Deserialize)]
struct PoolStats {
pending_pool: u64,
precedence_pool: u64,
metrics_pending_pool: u64,
metrics_precedence_pool: u64,
}

let client = reqwest::Client::new();
Expand All @@ -693,8 +693,14 @@ async fn check_storage_pool_stats(
.await
.unwrap();

assert_eq!(response.data.get_storage_pool_stats.pending_pool, 10);
assert_eq!(response.data.get_storage_pool_stats.precedence_pool, 200);
assert_eq!(
response.data.get_storage_pool_stats.metrics_pending_pool,
10
);
assert_eq!(
response.data.get_storage_pool_stats.metrics_precedence_pool,
200
);
}

#[rstest]
Expand All @@ -713,7 +719,7 @@ async fn get_pending_pool(
let fullnode_store = create_fullnode_store::default().await;

let store: Arc<ValidatorStore> =
create_validator_store(vec![], futures::future::ready(fullnode_store.clone())).await;
create_validator_store(&[], futures::future::ready(fullnode_store.clone())).await;

for certificate in &certificates {
_ = store.insert_pending_certificate(&certificate.certificate);
Expand Down Expand Up @@ -791,7 +797,7 @@ async fn check_precedence(
let fullnode_store = create_fullnode_store::default().await;

let store: Arc<ValidatorStore> =
create_validator_store(vec![], futures::future::ready(fullnode_store.clone())).await;
create_validator_store(&[], futures::future::ready(fullnode_store.clone())).await;

for certificate in &certificates {
_ = store.insert_pending_certificate(&certificate.certificate);
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-tce-storage/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl StorageClient {
pub async fn get_pending_certificates(
&self,
) -> Result<Vec<(PendingCertificateId, Certificate)>, StorageError> {
self.store.get_pending_certificates()
Ok(self.store.iter_pending_pool()?.collect())
}

pub async fn fetch_certificates(
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-storage/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ async fn get_pending_certificates(store: Arc<ValidatorStore>) {
)
.unwrap();

let pending_certificates = store.get_pending_certificates().unwrap();
let pending_certificates = store.iter_pending_pool().unwrap().collect::<Vec<_>>();
assert_eq!(
expected_pending_certificates.len(),
pending_certificates.len()
Expand All @@ -654,7 +654,7 @@ async fn get_pending_certificates(store: Arc<ValidatorStore>) {
let cert_to_remove = expected_pending_certificates.remove(8);
store.delete_pending_certificate(&cert_to_remove.0).unwrap();

let pending_certificates = store.get_pending_certificates().unwrap();
let pending_certificates = store.iter_pending_pool().unwrap().collect::<Vec<_>>();
assert_eq!(
expected_pending_certificates.len(),
pending_certificates.len()
Expand Down
91 changes: 43 additions & 48 deletions crates/topos-tce-storage/src/validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,26 +98,19 @@ impl ValidatorStore {
None::<&[u8]>,
);

let pending_count: i64 =
store
.count_pending_certificates()?
.try_into()
.map_err(|error| {
error!("Failed to convert estimate-num-keys to i64: {}", error);
StorageError::InternalStorage(InternalStorageError::UnexpectedDBState(
"Failed to convert estimate-num-keys to i64",
))
})?;

let precedence_count: i64 = store
.count_precedence_pool_certificates()?
.try_into()
.map_err(|error| {
error!("Failed to convert estimate-num-keys to i64: {}", error);
StorageError::InternalStorage(InternalStorageError::UnexpectedDBState(
"Failed to convert estimate-num-keys to i64",
))
})?;
let pending_count: i64 = store.pending_pool_size()?.try_into().map_err(|error| {
error!("Failed to convert estimate-num-keys to i64: {}", error);
StorageError::InternalStorage(InternalStorageError::UnexpectedDBState(
"Failed to convert estimate-num-keys to i64",
))
})?;

let precedence_count: i64 = store.precedence_pool_size()?.try_into().map_err(|error| {
error!("Failed to convert estimate-num-keys to i64: {}", error);
StorageError::InternalStorage(InternalStorageError::UnexpectedDBState(
"Failed to convert estimate-num-keys to i64",
))
})?;

STORAGE_PENDING_POOL_COUNT.set(pending_count);
STORAGE_PRECEDENCE_POOL_COUNT.set(precedence_count);
Expand All @@ -131,43 +124,21 @@ impl ValidatorStore {
}

/// Returns the number of certificates in the pending pool
pub fn count_pending_certificates(&self) -> Result<u64, StorageError> {
pub fn pending_pool_size(&self) -> Result<u64, StorageError> {
Ok(self
.pending_tables
.pending_pool
.property_int_value(ESTIMATE_NUM_KEYS)?)
}

/// Returns the number of certificates in the pending pool (by iterating)
pub fn iter_count_pending_certificates(&self) -> Result<u64, StorageError> {
Ok(self
.pending_tables
.pending_pool
.iter()?
.count()
.try_into()
.unwrap_or(u64::MAX))
}

/// Returns the number of certificates in the precedence pool
pub fn count_precedence_pool_certificates(&self) -> Result<u64, StorageError> {
pub fn precedence_pool_size(&self) -> Result<u64, StorageError> {
Ok(self
.pending_tables
.precedence_pool
.property_int_value(ESTIMATE_NUM_KEYS)?)
}

/// Returns the number of certificates in the precedence pool (by iterating)
pub fn iter_count_precedence_pool_certificates(&self) -> Result<u64, StorageError> {
Ok(self
.pending_tables
.precedence_pool
.iter()?
.count()
.try_into()
.unwrap_or(u64::MAX))
}

/// Try to return the [`PendingCertificateId`] for a [`CertificateId`]
///
/// Return `Ok(None)` if the `certificate_id` is not found.
Expand All @@ -188,11 +159,35 @@ impl ValidatorStore {
Ok(self.pending_tables.pending_pool.get(pending_id)?)
}

/// Returns the entire pending_pool
pub fn get_pending_certificates(
/// Returns an iterator over the pending pool
///
/// Note: this can be slow on large datasets.
#[doc(hidden)]
pub fn iter_pending_pool(
&self,
) -> Result<Vec<(PendingCertificateId, Certificate)>, StorageError> {
Ok(self.pending_tables.pending_pool.iter()?.collect())
) -> Result<impl Iterator<Item = (PendingCertificateId, Certificate)> + '_, StorageError> {
Ok(self.pending_tables.pending_pool.iter()?)
}

/// Returns an iterator over the pending pool starting at a given `PendingCertificateId`
///
/// Note: this can be slow on large datasets.
#[doc(hidden)]
pub fn iter_pending_pool_at(
&self,
pending_id: &PendingCertificateId,
) -> Result<impl Iterator<Item = (PendingCertificateId, Certificate)> + '_, StorageError> {
Ok(self.pending_tables.pending_pool.iter_at(pending_id)?)
}

/// Returns an iterator over the precedence pool
///
/// Note: this can be slow on large datasets.
#[doc(hidden)]
pub fn iter_precedence_pool(
&self,
) -> Result<impl Iterator<Item = (CertificateId, Certificate)> + '_, StorageError> {
Ok(self.pending_tables.precedence_pool.iter()?)
}

pub fn get_next_pending_certificates(
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,13 +162,13 @@ impl AppContext {

let pending_certificates = self
.validator_store
.count_pending_certificates()
.pending_pool_size()
.map_err(|error| format!("Unable to count pending certificates: {error}"))
.unwrap();

let precedence_pool_certificates = self
.validator_store
.count_precedence_pool_certificates()
.precedence_pool_size()
.map_err(|error| format!("Unable to count precedence pool certificates: {error}"))
.unwrap();

Expand Down
23 changes: 19 additions & 4 deletions crates/topos-tce/src/app_context/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,37 @@ impl AppContext {
.insert_pending_certificate(&certificate)
{
Ok(Some(pending_id)) => {
let certificate_id = certificate.id;
debug!(
"Certificate {} from subnet {} has been inserted into pending pool",
certificate.id, certificate.source_subnet_id
certificate_id, certificate.source_subnet_id
);

_ = self
if self
.tce_cli
.get_double_echo_channel()
.send(DoubleEchoCommand::Broadcast {
need_gossip: true,
cert: *certificate,
pending_id,
})
.await;
.await
.is_err()
{
error!(
"Unable to send DoubleEchoCommand::Broadcast command to double \
echo for {}",
certificate_id
);

sender.send(Ok(PendingResult::InPending(pending_id)))
sender.send(Err(RuntimeError::CommunicationError(
"Unable to send DoubleEchoCommand::Broadcast command to double \
echo"
.to_string(),
)))
} else {
sender.send(Ok(PendingResult::InPending(pending_id)))
}
}
Ok(None) => {
debug!(
Expand Down
16 changes: 13 additions & 3 deletions crates/topos-tce/src/app_context/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,31 @@ impl AppContext {

match self.validator_store.insert_pending_certificate(&cert) {
Ok(Some(pending_id)) => {
let certificate_id = cert.id;
debug!(
"Certificate {} has been inserted into pending pool",
cert.id
certificate_id
);

_ = self
if self
.tce_cli
.get_double_echo_channel()
.send(DoubleEchoCommand::Broadcast {
need_gossip: false,
cert,
pending_id,
})
.await;
.await
.is_err()
{
error!(
"Unable to send DoubleEchoCommand::Broadcast command \
to double echo for {}",
certificate_id
);
}
}

Ok(None) => {
debug!(
"Certificate {} from subnet {} has been inserted into \
Expand Down
Loading

0 comments on commit faf394d

Please sign in to comment.