diff --git a/indexer/queryapi_coordinator/src/utils.rs b/indexer/queryapi_coordinator/src/utils.rs index 2cf5207ed..601978966 100644 --- a/indexer/queryapi_coordinator/src/utils.rs +++ b/indexer/queryapi_coordinator/src/utils.rs @@ -1,3 +1,5 @@ +use crate::metrics; + pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager) { let interval_secs = 10; let mut previous_processed_blocks: u64 = @@ -46,6 +48,34 @@ pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager) alert_rules_count, ); previous_processed_blocks = processed_blocks; + + let streams = storage::smembers(&redis_connection_manager, storage::INDEXER_SET_KEY) + .await + .unwrap_or(Vec::new()); + + for stream in streams { + let latest_id = storage::get::( + &redis_connection_manager, + storage::generate_stream_last_id_key(&stream), + ) + .await + .unwrap_or(storage::STREAM_SMALLEST_ID.to_string()); + + let unprocessed_message_count = storage::xrange( + &redis_connection_manager, + storage::generate_stream_key(&stream), + &latest_id, + storage::STREAM_LARGEST_ID, + ) + .await + .unwrap_or(Vec::new()) + .len() as i64; + + metrics::UNPROCESSED_STREAM_MESSAGES + .with_label_values(&[&stream]) + .set(unprocessed_message_count); + } + tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await; } } diff --git a/indexer/storage/src/lib.rs b/indexer/storage/src/lib.rs index 3753e186f..183ef0bbe 100644 --- a/indexer/storage/src/lib.rs +++ b/indexer/storage/src/lib.rs @@ -3,6 +3,8 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs}; const STORAGE: &str = "storage_alertexer"; pub const INDEXER_SET_KEY: &str = "indexers"; +pub const STREAM_SMALLEST_ID: &str = "-"; +pub const STREAM_LARGEST_ID: &str = "+"; pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client { redis::Client::open(redis_connection_str).expect("can create redis client") @@ -16,6 +18,10 @@ pub fn generate_stream_key(name: &str) -> String { format!("{}:stream", name) } +pub fn generate_stream_last_id_key(name: &str) -> String { + format!("{}:stream:lastId", name) +} + pub async fn connect(redis_connection_str: &str) -> anyhow::Result { Ok(get_redis_client(redis_connection_str) .await @@ -105,6 +111,44 @@ pub async fn xadd( Ok(()) } +pub async fn smembers( + redis_connection_manager: &ConnectionManager, + key: &str, +) -> anyhow::Result> { + tracing::debug!(target: STORAGE, "SMEMBERS: {:?}", key); + + let members: Vec = redis::cmd("SMEMBERS") + .arg(key) + .query_async(&mut redis_connection_manager.clone()) + .await?; + + Ok(members) +} + +pub async fn xrange( + redis_connection_manager: &ConnectionManager, + stream_key: impl ToRedisArgs + std::fmt::Debug, + start_id: &str, + end_id: &str, +) -> anyhow::Result> { + tracing::debug!( + target: STORAGE, + "XRANGE: {:?}, {:?}, {:?}", + stream_key, + start_id, + end_id + ); + + let results = redis::cmd("XRANGE") + .arg(stream_key) + .arg(start_id) + .arg(end_id) + .query_async(&mut redis_connection_manager.clone()) + .await?; + + Ok(results) +} + /// Sets the key `receipt_id: &str` with value `transaction_hash: &str` to the Redis storage. /// Increments the counter `receipts_{transaction_hash}` by one. /// The counter holds how many Receipts related to the Transaction are in watching list