Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

feat: track queue size in metrics/graphql #454

Merged
merged 2 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ strip = true
[workspace.dependencies]
topos-core = { path = "./crates/topos-core", default-features = false }
topos-crypto = { path = "./crates/topos-crypto", default-features = false }
topos-metrics = { path = "./crates/topos-metrics/", default-features = false }

# Various utility crates
clap = { version = "4.0", features = ["derive", "env", "string"] }
Expand Down
16 changes: 14 additions & 2 deletions crates/topos-metrics/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use prometheus::{
self, register_histogram_with_registry, register_int_counter_with_registry, Histogram,
IntCounter,
self, register_histogram_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, Histogram, IntCounter, IntGauge,
};

use lazy_static::lazy_static;
Expand Down Expand Up @@ -31,4 +31,16 @@ lazy_static! {
TOPOS_METRIC_REGISTRY
)
.unwrap();
pub static ref STORAGE_PENDING_POOL_COUNT: IntGauge = register_int_gauge_with_registry!(
"storage_pending_pool_count",
"Number of certificates in the pending pool.",
TOPOS_METRIC_REGISTRY
)
.unwrap();
pub static ref STORAGE_PRECEDENCE_POOL_COUNT: IntGauge = register_int_gauge_with_registry!(
"storage_precedence_pool_count",
"Number of certificates in the precedence pool.",
TOPOS_METRIC_REGISTRY
)
.unwrap();
}
16 changes: 16 additions & 0 deletions crates/topos-tce-api/src/graphql/query.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::sync::Arc;

use async_graphql::{Context, EmptyMutation, Object, Schema, Subscription};
Expand All @@ -13,6 +14,7 @@ use topos_core::api::graphql::{
query::CertificateQuery,
};
use topos_core::types::stream::CertificateSourceStreamPosition;
use topos_metrics::{STORAGE_PENDING_POOL_COUNT, STORAGE_PRECEDENCE_POOL_COUNT};
use topos_tce_storage::fullnode::FullNodeStore;
use topos_tce_storage::store::ReadStore;

Expand Down Expand Up @@ -114,6 +116,20 @@ impl QueryRoot {
Self::certificate_by_id(ctx, certificate_id).await
}

/// This endpoint is used to get the current storage pool stats.
/// It returns the number of certificates in the pending and precedence pools.
/// The values are estimated as having a precise count is costly.
async fn get_storage_pool_stats(
&self,
_ctx: &Context<'_>,
) -> Result<HashMap<&str, i64>, GraphQLServerError> {
let mut stats = HashMap::new();
stats.insert("pending_pool", STORAGE_PENDING_POOL_COUNT.get());
stats.insert("precedence_pool", STORAGE_PRECEDENCE_POOL_COUNT.get());

Ok(stats)
}

/// This endpoint is used to get the current checkpoint of the source streams.
/// The checkpoint is the position of the last certificate delivered for each source stream.
async fn get_checkpoint(
Expand Down
71 changes: 71 additions & 0 deletions crates/topos-tce-api/tests/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use topos_core::{
},
uci::Certificate,
};
use topos_metrics::{STORAGE_PENDING_POOL_COUNT, STORAGE_PRECEDENCE_POOL_COUNT};
use topos_tce_api::{Runtime, RuntimeEvent};
use topos_tce_storage::types::CertificateDeliveredWithPositions;
use topos_tce_storage::StorageClient;
Expand Down Expand Up @@ -621,3 +622,73 @@ async fn can_query_graphql_endpoint_for_certificates(
graphql_certificate.source_subnet_id
);
}

#[rstest]
#[timeout(Duration::from_secs(4))]
#[test(tokio::test)]
async fn check_storage_pool_stats(
broadcast_stream: broadcast::Receiver<CertificateDeliveredWithPositions>,
) {
let addr = get_available_addr();
let graphql_addr = get_available_addr();
let metrics_addr = get_available_addr();

let fullnode_store = create_fullnode_store::default().await;

let store =
create_validator_store(vec![], futures::future::ready(fullnode_store.clone())).await;
STORAGE_PENDING_POOL_COUNT.set(10);
STORAGE_PRECEDENCE_POOL_COUNT.set(200);

let storage_client = StorageClient::new(store.clone());

let (_runtime_client, _launcher, _ctx) = Runtime::builder()
.with_broadcast_stream(broadcast_stream)
.storage(storage_client)
.store(store)
.serve_grpc_addr(addr)
.serve_graphql_addr(graphql_addr)
.serve_metrics_addr(metrics_addr)
.build_and_launch()
.await;

// Wait for server to boot
tokio::time::sleep(Duration::from_millis(100)).await;

let query = "query {getStoragePoolStats}";

#[derive(Debug, Deserialize)]
struct Response {
// data: HashMap<String, serde_json::Value>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove I think.

data: Stats,
}

#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct Stats {
get_storage_pool_stats: PoolStats,
}

#[derive(Debug, Deserialize)]
struct PoolStats {
pending_pool: u64,
precedence_pool: u64,
}

let client = reqwest::Client::new();

let response = client
.post(format!("http://{}", graphql_addr))
.json(&serde_json::json!({
"query": query,
}))
.send()
.await
.unwrap()
.json::<Response>()
.await
.unwrap();

assert_eq!(response.data.get_storage_pool_stats.pending_pool, 10);
assert_eq!(response.data.get_storage_pool_stats.precedence_pool, 200);
}
1 change: 1 addition & 0 deletions crates/topos-tce-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ workspace = true

[dependencies]
topos-core = { workspace = true, features = ["uci", "api"] }
topos-metrics = { workspace = true }

async-stream.workspace = true
async-trait.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions crates/topos-tce-storage/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ pub enum InternalStorageError {
#[error("Invalid query argument: {0}")]
InvalidQueryArgument(&'static str),

#[error("Unexpected DB state: {0}")]
UnexpectedDBState(&'static str),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not really a problem with the DB state if a property is missing. Wouldn't this error be for a situation where there's a new/old/incompatible rocksdb version that does not support some property?


#[error(transparent)]
Bincode(#[from] Box<bincode::ErrorKind>),

Expand Down
8 changes: 6 additions & 2 deletions crates/topos-tce-storage/src/fullnode/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{collections::HashMap, sync::Arc};
use arc_swap::ArcSwap;
use async_trait::async_trait;

use rocksdb::properties::ESTIMATE_NUM_KEYS;
use topos_core::{
types::{
stream::{CertificateSourceStreamPosition, CertificateTargetStreamPosition, Position},
Expand Down Expand Up @@ -232,8 +233,11 @@ impl WriteStore for FullNodeStore {
}

impl ReadStore for FullNodeStore {
fn count_certificates_delivered(&self) -> Result<usize, StorageError> {
Ok(self.perpetual_tables.certificates.iter()?.count())
fn count_certificates_delivered(&self) -> Result<u64, StorageError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should call this estimate_certs_delivered?

Ok(self
.perpetual_tables
.certificates
.property_int_value(ESTIMATE_NUM_KEYS)?)
}

fn get_source_head(&self, subnet_id: &SubnetId) -> Result<Option<SourceHead>, StorageError> {
Expand Down
17 changes: 14 additions & 3 deletions crates/topos-tce-storage/src/rocks/db_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use std::path::Path;
#[cfg(test)]
use rocksdb::ColumnFamilyDescriptor;
use rocksdb::{
BoundColumnFamily, DBRawIteratorWithThreadMode, DBWithThreadMode, Direction, IteratorMode,
MultiThreaded, ReadOptions, WriteBatch,
BoundColumnFamily, CStrLike, DBRawIteratorWithThreadMode, DBWithThreadMode, Direction,
IteratorMode, MultiThreaded, ReadOptions, WriteBatch,
};

use bincode::Options;
Expand Down Expand Up @@ -66,7 +66,7 @@ impl<K, V> DBColumn<K, V> {
}

/// Returns the CF of the DBColumn, used to build queries.
fn cf(&self) -> Result<Arc<BoundColumnFamily<'_>>, InternalStorageError> {
pub(crate) fn cf(&self) -> Result<Arc<BoundColumnFamily<'_>>, InternalStorageError> {
self.rocksdb
.cf_handle(self.cf)
.ok_or(InternalStorageError::InvalidColumnFamily(self.cf))
Expand All @@ -78,6 +78,17 @@ where
K: DeserializeOwned + Serialize + std::fmt::Debug,
V: DeserializeOwned + Serialize + std::fmt::Debug,
{
pub(crate) fn property_int_value(
&self,
property: impl CStrLike,
) -> Result<u64, InternalStorageError> {
self.rocksdb
.property_int_value_cf(&self.cf()?, property)?
.ok_or(InternalStorageError::UnexpectedDBState(
"Property not found",
))
}

/// Insert a record into the storage by passing a Key and a Value.
///
/// Key are fixed length bincode serialized.
Expand Down
2 changes: 1 addition & 1 deletion crates/topos-tce-storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub trait WriteStore: Send {
/// [`FullNodeStore`](struct@super::fullnode::FullNodeStore) to read data.
pub trait ReadStore: Send {
/// Returns the number of certificates delivered
fn count_certificates_delivered(&self) -> Result<usize, StorageError>;
fn count_certificates_delivered(&self) -> Result<u64, StorageError>;

/// Try to get a SourceHead of a subnet
///
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-tce-storage/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,15 +485,15 @@ async fn get_source_head_for_subnet(store: Arc<ValidatorStore>) {
create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_2], 10);

store
.insert_certificates_delivered(&expected_certificates_for_source_subnet_1)
.insert_certificates_delivered(&expected_certificates_for_source_subnet_1[..])
.await
.unwrap();

let expected_certificates_for_source_subnet_2 =
create_certificate_chain(SOURCE_SUBNET_ID_2, &[TARGET_SUBNET_ID_2], 10);

store
.insert_certificates_delivered(&expected_certificates_for_source_subnet_2)
.insert_certificates_delivered(&expected_certificates_for_source_subnet_2[..])
.await
.unwrap();

Expand Down
Loading
Loading