Skip to content

Commit

Permalink
feat: Publish UNPROCESSED_STREAM_MESSAGES for each stream
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Aug 8, 2023
1 parent fac01ae commit b2ac422
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 0 deletions.
30 changes: 30 additions & 0 deletions indexer/queryapi_coordinator/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use crate::metrics;

pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager) {
let interval_secs = 10;
let mut previous_processed_blocks: u64 =
Expand Down Expand Up @@ -46,6 +48,34 @@ pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager)
alert_rules_count,
);
previous_processed_blocks = processed_blocks;

let streams = storage::smembers(&redis_connection_manager, storage::INDEXER_SET_KEY)
.await
.unwrap_or(Vec::new());

for stream in streams {
let latest_id = storage::get::<String>(
&redis_connection_manager,
storage::generate_stream_last_id_key(&stream),
)
.await
.unwrap_or(storage::STREAM_SMALLEST_ID.to_string());

let unprocessed_message_count = storage::xrange(
&redis_connection_manager,
storage::generate_stream_key(&stream),
&latest_id,
storage::STREAM_LARGEST_ID,
)
.await
.unwrap_or(Vec::new())
.len() as i64;

metrics::UNPROCESSED_STREAM_MESSAGES
.with_label_values(&[&stream])
.set(unprocessed_message_count);
}

tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
}
}
44 changes: 44 additions & 0 deletions indexer/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs};
const STORAGE: &str = "storage_alertexer";

pub const INDEXER_SET_KEY: &str = "indexers";
pub const STREAM_SMALLEST_ID: &str = "-";
pub const STREAM_LARGEST_ID: &str = "+";

pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client {
redis::Client::open(redis_connection_str).expect("can create redis client")
Expand All @@ -16,6 +18,10 @@ pub fn generate_stream_key(name: &str) -> String {
format!("{}:stream", name)
}

pub fn generate_stream_last_id_key(name: &str) -> String {
format!("{}:stream:lastId", name)
}

pub async fn connect(redis_connection_str: &str) -> anyhow::Result<ConnectionManager> {
Ok(get_redis_client(redis_connection_str)
.await
Expand Down Expand Up @@ -105,6 +111,44 @@ pub async fn xadd(
Ok(())
}

pub async fn smembers(
redis_connection_manager: &ConnectionManager,
key: &str,
) -> anyhow::Result<Vec<String>> {
tracing::debug!(target: STORAGE, "SMEMBERS: {:?}", key);

let members: Vec<String> = redis::cmd("SMEMBERS")
.arg(key)
.query_async(&mut redis_connection_manager.clone())
.await?;

Ok(members)
}

pub async fn xrange(
redis_connection_manager: &ConnectionManager,
stream_key: impl ToRedisArgs + std::fmt::Debug,
start_id: &str,
end_id: &str,
) -> anyhow::Result<Vec<redis::Value>> {
tracing::debug!(
target: STORAGE,
"XRANGE: {:?}, {:?}, {:?}",
stream_key,
start_id,
end_id
);

let results = redis::cmd("XRANGE")
.arg(stream_key)
.arg(start_id)
.arg(end_id)
.query_async(&mut redis_connection_manager.clone())
.await?;

Ok(results)
}

/// Sets the key `receipt_id: &str` with value `transaction_hash: &str` to the Redis storage.
/// Increments the counter `receipts_{transaction_hash}` by one.
/// The counter holds how many Receipts related to the Transaction are in watching list
Expand Down

0 comments on commit b2ac422

Please sign in to comment.