From 54ee547a6e306cc207e32a627e5400996da89c00 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Tue, 8 Aug 2023 14:34:03 +1200 Subject: [PATCH] DPLT-1129 Publish unprocessed stream messages count to Grafana (#169) --- indexer/queryapi_coordinator/src/metrics.rs | 19 ++++++++- indexer/queryapi_coordinator/src/utils.rs | 32 ++++++++++++++- indexer/storage/src/lib.rs | 44 +++++++++++++++++++++ 3 files changed, 93 insertions(+), 2 deletions(-) diff --git a/indexer/queryapi_coordinator/src/metrics.rs b/indexer/queryapi_coordinator/src/metrics.rs index f106a0624..6ba2d0090 100644 --- a/indexer/queryapi_coordinator/src/metrics.rs +++ b/indexer/queryapi_coordinator/src/metrics.rs @@ -1,6 +1,6 @@ use actix_web::{get, App, HttpServer, Responder}; use lazy_static::lazy_static; -use prometheus::{Encoder, IntCounter, IntGauge, Opts}; +use prometheus::{Encoder, IntCounter, IntGauge, IntGaugeVec, Opts}; use tracing::info; lazy_static! { @@ -14,6 +14,23 @@ lazy_static! { "Number of indexed blocks" ) .unwrap(); + pub(crate) static ref UNPROCESSED_STREAM_MESSAGES: IntGaugeVec = try_create_int_gauge_vec( + "queryapi_coordinator_unprocessed_stream_messages", + "Number of Redis Stream messages not processed by Runner", + &["stream"] + ) + .unwrap(); +} + +fn try_create_int_gauge_vec( + name: &str, + help: &str, + labels: &[&str], +) -> prometheus::Result { + let opts = Opts::new(name, help); + let gauge = IntGaugeVec::new(opts, labels)?; + prometheus::register(Box::new(gauge.clone()))?; + Ok(gauge) } fn try_create_int_gauge(name: &str, help: &str) -> prometheus::Result { diff --git a/indexer/queryapi_coordinator/src/utils.rs b/indexer/queryapi_coordinator/src/utils.rs index 2cf5207ed..a2ec8ab1a 100644 --- a/indexer/queryapi_coordinator/src/utils.rs +++ b/indexer/queryapi_coordinator/src/utils.rs @@ -1,5 +1,7 @@ +use crate::metrics; + pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager) { - let interval_secs = 10; + let interval_secs = 5; let mut previous_processed_blocks: u64 = storage::get::(&redis_connection_manager, "blocks_processed") .await @@ -46,6 +48,34 @@ pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager) alert_rules_count, ); previous_processed_blocks = processed_blocks; + + let streams = storage::smembers(&redis_connection_manager, storage::INDEXER_SET_KEY) + .await + .unwrap_or(Vec::new()); + + for stream in streams { + let latest_id = storage::get::( + &redis_connection_manager, + storage::generate_stream_last_id_key(&stream), + ) + .await + .unwrap_or(storage::STREAM_SMALLEST_ID.to_string()); + + let unprocessed_message_count = storage::xrange( + &redis_connection_manager, + storage::generate_stream_key(&stream), + &latest_id, + storage::STREAM_LARGEST_ID, + ) + .await + .unwrap_or(Vec::new()) + .len() as i64; + + metrics::UNPROCESSED_STREAM_MESSAGES + .with_label_values(&[&stream]) + .set(unprocessed_message_count); + } + tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await; } } diff --git a/indexer/storage/src/lib.rs b/indexer/storage/src/lib.rs index 3753e186f..183ef0bbe 100644 --- a/indexer/storage/src/lib.rs +++ b/indexer/storage/src/lib.rs @@ -3,6 +3,8 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs}; const STORAGE: &str = "storage_alertexer"; pub const INDEXER_SET_KEY: &str = "indexers"; +pub const STREAM_SMALLEST_ID: &str = "-"; +pub const STREAM_LARGEST_ID: &str = "+"; pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client { redis::Client::open(redis_connection_str).expect("can create redis client") @@ -16,6 +18,10 @@ pub fn generate_stream_key(name: &str) -> String { format!("{}:stream", name) } +pub fn generate_stream_last_id_key(name: &str) -> String { + format!("{}:stream:lastId", name) +} + pub async fn connect(redis_connection_str: &str) -> anyhow::Result { Ok(get_redis_client(redis_connection_str) .await @@ -105,6 +111,44 @@ pub async fn xadd( Ok(()) } +pub async fn smembers( + redis_connection_manager: &ConnectionManager, + key: &str, +) -> anyhow::Result> { + tracing::debug!(target: STORAGE, "SMEMBERS: {:?}", key); + + let members: Vec = redis::cmd("SMEMBERS") + .arg(key) + .query_async(&mut redis_connection_manager.clone()) + .await?; + + Ok(members) +} + +pub async fn xrange( + redis_connection_manager: &ConnectionManager, + stream_key: impl ToRedisArgs + std::fmt::Debug, + start_id: &str, + end_id: &str, +) -> anyhow::Result> { + tracing::debug!( + target: STORAGE, + "XRANGE: {:?}, {:?}, {:?}", + stream_key, + start_id, + end_id + ); + + let results = redis::cmd("XRANGE") + .arg(stream_key) + .arg(start_id) + .arg(end_id) + .query_async(&mut redis_connection_manager.clone()) + .await?; + + Ok(results) +} + /// Sets the key `receipt_id: &str` with value `transaction_hash: &str` to the Redis storage. /// Increments the counter `receipts_{transaction_hash}` by one. /// The counter holds how many Receipts related to the Transaction are in watching list