diff --git a/grpc-ingest/src/prom.rs b/grpc-ingest/src/prom.rs index 44f9557d7..1bb3a125a 100644 --- a/grpc-ingest/src/prom.rs +++ b/grpc-ingest/src/prom.rs @@ -8,7 +8,10 @@ use { Body, Request, Response, Server, StatusCode, }, program_transformers::error::ProgramTransformerError, - prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder}, + prometheus::{ + HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec, Opts, Registry, + TextEncoder, + }, std::{net::SocketAddr, sync::Once}, tracing::{error, info}, }; @@ -31,14 +34,14 @@ lazy_static::lazy_static! { &["stream", "label", "status"] ).unwrap(); - static ref REDIS_XREAD_COUNT: IntCounterVec = IntCounterVec::new( + static ref REDIS_XREAD_COUNT: IntCounterVec = IntCounterVec::new( Opts::new("redis_xread_count", "Count of messages seen"), - &["stream"] + &["stream", "consumer"] ).unwrap(); static ref REDIS_XACK_COUNT: IntCounterVec = IntCounterVec::new( Opts::new("redis_xack_count", "Total number of processed messages"), - &["stream"] + &["stream", "consumer"] ).unwrap(); static ref PGPOOL_CONNECTIONS: IntGaugeVec = IntGaugeVec::new( @@ -46,13 +49,14 @@ lazy_static::lazy_static! { &["kind"] ).unwrap(); - static ref PROGRAM_TRANSFORMER_TASKS: IntGauge = IntGauge::new( - "program_transformer_tasks", "Number of tasks spawned for program transform" - ).unwrap(); - static ref PROGRAM_TRANSFORMER_TASK_STATUS_COUNT: IntCounterVec = IntCounterVec::new( Opts::new("program_transformer_task_status_count", "Status of processed messages"), - &["status"], + &["stream", "consumer", "status"], + ).unwrap(); + + static ref INGEST_JOB_TIME: HistogramVec = HistogramVec::new( + HistogramOpts::new("ingest_job_time", "Time taken for ingest jobs"), + &["stream", "consumer"] ).unwrap(); static ref DOWNLOAD_METADATA_INSERTED_COUNT: IntCounter = IntCounter::new( @@ -61,12 +65,12 @@ lazy_static::lazy_static! { static ref INGEST_TASKS: IntGaugeVec = IntGaugeVec::new( Opts::new("ingest_tasks", "Number of tasks spawned for ingest"), - &["stream"] + &["stream", "consumer"] ).unwrap(); static ref ACK_TASKS: IntGaugeVec = IntGaugeVec::new( Opts::new("ack_tasks", "Number of tasks spawned for ack redis messages"), - &["stream"] + &["stream", "consumer"] ).unwrap(); static ref GRPC_TASKS: IntGaugeVec = IntGaugeVec::new( @@ -74,7 +78,6 @@ lazy_static::lazy_static! { &["label","stream"] ).unwrap(); - static ref BUBBLEGUM_TREE_TOTAL_LEAVES: IntGaugeVec = IntGaugeVec::new( Opts::new("bubblegum_tree_total_leaves", "Total number of leaves in the bubblegum tree"), &["tree"] @@ -118,8 +121,8 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> { register!(REDIS_XREAD_COUNT); register!(REDIS_XACK_COUNT); register!(PGPOOL_CONNECTIONS); - register!(PROGRAM_TRANSFORMER_TASKS); register!(PROGRAM_TRANSFORMER_TASK_STATUS_COUNT); + register!(INGEST_JOB_TIME); register!(DOWNLOAD_METADATA_INSERTED_COUNT); register!(INGEST_TASKS); register!(ACK_TASKS); @@ -188,6 +191,12 @@ pub fn redis_xlen_set(stream: &str, len: usize) { .set(len as i64); } +pub fn ingest_job_time_set(stream: &str, consumer: &str, value: f64) { + INGEST_JOB_TIME + .with_label_values(&[stream, consumer]) + .observe(value); +} + pub fn redis_xadd_status_inc(stream: &str, label: &str, status: Result<(), ()>, delta: usize) { REDIS_XADD_STATUS_COUNT .with_label_values(&[ @@ -198,15 +207,15 @@ pub fn redis_xadd_status_inc(stream: &str, label: &str, status: Result<(), ()>, .inc_by(delta as u64); } -pub fn redis_xread_inc(stream: &str, delta: usize) { +pub fn redis_xread_inc(stream: &str, consumer: &str, delta: usize) { REDIS_XREAD_COUNT - .with_label_values(&[stream]) + .with_label_values(&[stream, consumer]) .inc_by(delta as u64) } -pub fn redis_xack_inc(stream: &str, delta: usize) { +pub fn redis_xack_inc(stream: &str, consumer: &str, delta: usize) { REDIS_XACK_COUNT - .with_label_values(&[stream]) + .with_label_values(&[stream, consumer]) .inc_by(delta as u64) } @@ -225,20 +234,20 @@ pub fn pgpool_connections_set(kind: PgpoolConnectionsKind, size: usize) { .set(size as i64) } -pub fn ingest_tasks_total_inc(stream: &str) { - INGEST_TASKS.with_label_values(&[stream]).inc() +pub fn ingest_tasks_total_inc(stream: &str, consumer: &str) { + INGEST_TASKS.with_label_values(&[stream, consumer]).inc() } -pub fn ingest_tasks_total_dec(stream: &str) { - INGEST_TASKS.with_label_values(&[stream]).dec() +pub fn ingest_tasks_total_dec(stream: &str, consumer: &str) { + INGEST_TASKS.with_label_values(&[stream, consumer]).dec() } -pub fn ack_tasks_total_inc(stream: &str) { - ACK_TASKS.with_label_values(&[stream]).inc() +pub fn ack_tasks_total_inc(stream: &str, consumer: &str) { + ACK_TASKS.with_label_values(&[stream, consumer]).inc() } -pub fn ack_tasks_total_dec(stream: &str) { - ACK_TASKS.with_label_values(&[stream]).dec() +pub fn ack_tasks_total_dec(stream: &str, consumer: &str) { + ACK_TASKS.with_label_values(&[stream, consumer]).dec() } pub fn grpc_tasks_total_inc(label: &str, stream: &str) { @@ -353,9 +362,13 @@ impl ProgramTransformerTaskStatusKind { } } -pub fn program_transformer_task_status_inc(kind: ProgramTransformerTaskStatusKind) { +pub fn program_transformer_task_status_inc( + stream: &str, + consumer: &str, + kind: ProgramTransformerTaskStatusKind, +) { PROGRAM_TRANSFORMER_TASK_STATUS_COUNT - .with_label_values(&[kind.to_str()]) + .with_label_values(&[stream, consumer, kind.to_str()]) .inc() } diff --git a/grpc-ingest/src/redis.rs b/grpc-ingest/src/redis.rs index 9903bb5f4..9da08f0ab 100644 --- a/grpc-ingest/src/redis.rs +++ b/grpc-ingest/src/redis.rs @@ -2,7 +2,7 @@ use { crate::{ config::{ConfigIngestStream, REDIS_STREAM_DATA_KEY}, prom::{ - ack_tasks_total_dec, ack_tasks_total_inc, ingest_tasks_total_dec, + ack_tasks_total_dec, ack_tasks_total_inc, ingest_job_time_set, ingest_tasks_total_dec, ingest_tasks_total_inc, program_transformer_task_status_inc, redis_xack_inc, redis_xlen_set, redis_xread_inc, ProgramTransformerTaskStatusKind, }, @@ -313,7 +313,7 @@ impl Acknowledge { config.name, response, count ); - redis_xack_inc(&config.name, count); + redis_xack_inc(&config.name, &config.consumer, count); } Err(e) => { error!( @@ -324,7 +324,7 @@ impl Acknowledge { } } - ack_tasks_total_dec(&config.name); + ack_tasks_total_dec(&config.name, &config.consumer); } } @@ -424,25 +424,29 @@ impl IngestStream { let ack_tx = ack_tx.clone(); let config = Arc::clone(&config); - ingest_tasks_total_inc(&config.name); + ingest_tasks_total_inc(&config.name, &config.consumer); tasks.push(tokio::spawn(async move { + let start_time = tokio::time::Instant::now(); let result = handler.handle(map).await.map_err(IngestMessageError::into); + let elapsed_time = start_time.elapsed().as_secs_f64(); + + ingest_job_time_set(&config.name, &config.consumer, elapsed_time); match result { Ok(()) => { - program_transformer_task_status_inc(ProgramTransformerTaskStatusKind::Success); + program_transformer_task_status_inc(&config.name, &config.consumer, ProgramTransformerTaskStatusKind::Success); } Err(IngestMessageError::RedisStreamMessage(e)) => { error!("Failed to process message: {:?}", e); - program_transformer_task_status_inc(e.into()); + program_transformer_task_status_inc(&config.name, &config.consumer, e.into()); } Err(IngestMessageError::DownloadMetadataJson(e)) => { - program_transformer_task_status_inc(e.into()); + program_transformer_task_status_inc(&config.name, &config.consumer, e.into()); } Err(IngestMessageError::ProgramTransformer(e)) => { error!("Failed to process message: {:?}", e); - program_transformer_task_status_inc(e.into()); + program_transformer_task_status_inc(&config.name, &config.consumer, e.into()); } } @@ -450,7 +454,7 @@ impl IngestStream { error!(target: "ingest_stream", "action=send_ack stream={} error={:?}", &config.name, e); } - ingest_tasks_total_dec(&config.name); + ingest_tasks_total_dec(&config.name, &config.consumer); })); } } @@ -487,7 +491,7 @@ impl IngestStream { let handler = Arc::clone(&handler); - ack_tasks_total_inc(&config.name); + ack_tasks_total_inc(&config.name, &config.consumer); tasks.push(tokio::spawn(async move { handler.handle(ids).await; @@ -503,7 +507,7 @@ impl IngestStream { let ids = std::mem::take(&mut pending); let handler = Arc::clone(&handler); - ack_tasks_total_inc(&config.name); + ack_tasks_total_inc(&config.name, &config.consumer); tasks.push(tokio::spawn(async move { handler.handle(ids).await; @@ -586,7 +590,7 @@ impl IngestStream { let count = ids.len(); debug!(target: "ingest_stream", "action=xread stream={} count={:?}", &config.name, count); - redis_xread_inc(&config.name, count); + redis_xread_inc(&config.name, &config.consumer, count); if let Err(e) = msg_tx.send(ids).await { error!(target: "ingest_stream", "action=send_ids stream={} error={:?}", &config.name, e);