Skip to content

Commit

Permalink
DPLT-1129 Publish unprocessed stream messages count to Grafana (#169)
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley authored Aug 8, 2023
1 parent 6218297 commit 54ee547
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 2 deletions.
19 changes: 18 additions & 1 deletion indexer/queryapi_coordinator/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use actix_web::{get, App, HttpServer, Responder};
use lazy_static::lazy_static;
use prometheus::{Encoder, IntCounter, IntGauge, Opts};
use prometheus::{Encoder, IntCounter, IntGauge, IntGaugeVec, Opts};
use tracing::info;

lazy_static! {
Expand All @@ -14,6 +14,23 @@ lazy_static! {
"Number of indexed blocks"
)
.unwrap();
pub(crate) static ref UNPROCESSED_STREAM_MESSAGES: IntGaugeVec = try_create_int_gauge_vec(
"queryapi_coordinator_unprocessed_stream_messages",
"Number of Redis Stream messages not processed by Runner",
&["stream"]
)
.unwrap();
}

fn try_create_int_gauge_vec(
name: &str,
help: &str,
labels: &[&str],
) -> prometheus::Result<IntGaugeVec> {
let opts = Opts::new(name, help);
let gauge = IntGaugeVec::new(opts, labels)?;
prometheus::register(Box::new(gauge.clone()))?;
Ok(gauge)
}

fn try_create_int_gauge(name: &str, help: &str) -> prometheus::Result<IntGauge> {
Expand Down
32 changes: 31 additions & 1 deletion indexer/queryapi_coordinator/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::metrics;

pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager) {
let interval_secs = 10;
let interval_secs = 5;
let mut previous_processed_blocks: u64 =
storage::get::<u64>(&redis_connection_manager, "blocks_processed")
.await
Expand Down Expand Up @@ -46,6 +48,34 @@ pub(crate) async fn stats(redis_connection_manager: storage::ConnectionManager)
alert_rules_count,
);
previous_processed_blocks = processed_blocks;

let streams = storage::smembers(&redis_connection_manager, storage::INDEXER_SET_KEY)
.await
.unwrap_or(Vec::new());

for stream in streams {
let latest_id = storage::get::<String>(
&redis_connection_manager,
storage::generate_stream_last_id_key(&stream),
)
.await
.unwrap_or(storage::STREAM_SMALLEST_ID.to_string());

let unprocessed_message_count = storage::xrange(
&redis_connection_manager,
storage::generate_stream_key(&stream),
&latest_id,
storage::STREAM_LARGEST_ID,
)
.await
.unwrap_or(Vec::new())
.len() as i64;

metrics::UNPROCESSED_STREAM_MESSAGES
.with_label_values(&[&stream])
.set(unprocessed_message_count);
}

tokio::time::sleep(std::time::Duration::from_secs(interval_secs)).await;
}
}
44 changes: 44 additions & 0 deletions indexer/storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ pub use redis::{self, aio::ConnectionManager, FromRedisValue, ToRedisArgs};
const STORAGE: &str = "storage_alertexer";

pub const INDEXER_SET_KEY: &str = "indexers";
pub const STREAM_SMALLEST_ID: &str = "-";
pub const STREAM_LARGEST_ID: &str = "+";

pub async fn get_redis_client(redis_connection_str: &str) -> redis::Client {
redis::Client::open(redis_connection_str).expect("can create redis client")
Expand All @@ -16,6 +18,10 @@ pub fn generate_stream_key(name: &str) -> String {
format!("{}:stream", name)
}

pub fn generate_stream_last_id_key(name: &str) -> String {
format!("{}:stream:lastId", name)
}

pub async fn connect(redis_connection_str: &str) -> anyhow::Result<ConnectionManager> {
Ok(get_redis_client(redis_connection_str)
.await
Expand Down Expand Up @@ -105,6 +111,44 @@ pub async fn xadd(
Ok(())
}

pub async fn smembers(
redis_connection_manager: &ConnectionManager,
key: &str,
) -> anyhow::Result<Vec<String>> {
tracing::debug!(target: STORAGE, "SMEMBERS: {:?}", key);

let members: Vec<String> = redis::cmd("SMEMBERS")
.arg(key)
.query_async(&mut redis_connection_manager.clone())
.await?;

Ok(members)
}

pub async fn xrange(
redis_connection_manager: &ConnectionManager,
stream_key: impl ToRedisArgs + std::fmt::Debug,
start_id: &str,
end_id: &str,
) -> anyhow::Result<Vec<redis::Value>> {
tracing::debug!(
target: STORAGE,
"XRANGE: {:?}, {:?}, {:?}",
stream_key,
start_id,
end_id
);

let results = redis::cmd("XRANGE")
.arg(stream_key)
.arg(start_id)
.arg(end_id)
.query_async(&mut redis_connection_manager.clone())
.await?;

Ok(results)
}

/// Sets the key `receipt_id: &str` with value `transaction_hash: &str` to the Redis storage.
/// Increments the counter `receipts_{transaction_hash}` by one.
/// The counter holds how many Receipts related to the Transaction are in watching list
Expand Down

0 comments on commit 54ee547

Please sign in to comment.