Skip to content

Commit

Permalink
Merge branch 'grpc-ingest' into triton-build
Browse files Browse the repository at this point in the history
  • Loading branch information
kespinola committed Nov 14, 2024
2 parents 0150f58 + 80d61dc commit ea897eb
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 39 deletions.
67 changes: 40 additions & 27 deletions grpc-ingest/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use {
Body, Request, Response, Server, StatusCode,
},
program_transformers::error::ProgramTransformerError,
prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder},
prometheus::{
HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec, Opts, Registry,
TextEncoder,
},
std::{net::SocketAddr, sync::Once},
tracing::{error, info},
};
Expand All @@ -31,28 +34,29 @@ lazy_static::lazy_static! {
&["stream", "label", "status"]
).unwrap();

static ref REDIS_XREAD_COUNT: IntCounterVec = IntCounterVec::new(
static ref REDIS_XREAD_COUNT: IntCounterVec = IntCounterVec::new(
Opts::new("redis_xread_count", "Count of messages seen"),
&["stream"]
&["stream", "consumer"]
).unwrap();

static ref REDIS_XACK_COUNT: IntCounterVec = IntCounterVec::new(
Opts::new("redis_xack_count", "Total number of processed messages"),
&["stream"]
&["stream", "consumer"]
).unwrap();

static ref PGPOOL_CONNECTIONS: IntGaugeVec = IntGaugeVec::new(
Opts::new("pgpool_connections", "Total number of connections in Postgres Pool"),
&["kind"]
).unwrap();

static ref PROGRAM_TRANSFORMER_TASKS: IntGauge = IntGauge::new(
"program_transformer_tasks", "Number of tasks spawned for program transform"
).unwrap();

static ref PROGRAM_TRANSFORMER_TASK_STATUS_COUNT: IntCounterVec = IntCounterVec::new(
Opts::new("program_transformer_task_status_count", "Status of processed messages"),
&["status"],
&["stream", "consumer", "status"],
).unwrap();

static ref INGEST_JOB_TIME: HistogramVec = HistogramVec::new(
HistogramOpts::new("ingest_job_time", "Time taken for ingest jobs"),
&["stream", "consumer"]
).unwrap();

static ref DOWNLOAD_METADATA_INSERTED_COUNT: IntCounter = IntCounter::new(
Expand All @@ -61,20 +65,19 @@ lazy_static::lazy_static! {

static ref INGEST_TASKS: IntGaugeVec = IntGaugeVec::new(
Opts::new("ingest_tasks", "Number of tasks spawned for ingest"),
&["stream"]
&["stream", "consumer"]
).unwrap();

static ref ACK_TASKS: IntGaugeVec = IntGaugeVec::new(
Opts::new("ack_tasks", "Number of tasks spawned for ack redis messages"),
&["stream"]
&["stream", "consumer"]
).unwrap();

static ref GRPC_TASKS: IntGaugeVec = IntGaugeVec::new(
Opts::new("grpc_tasks", "Number of tasks spawned for writing grpc messages to redis "),
&["label","stream"]
).unwrap();


static ref BUBBLEGUM_TREE_TOTAL_LEAVES: IntGaugeVec = IntGaugeVec::new(
Opts::new("bubblegum_tree_total_leaves", "Total number of leaves in the bubblegum tree"),
&["tree"]
Expand Down Expand Up @@ -118,8 +121,8 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
register!(REDIS_XREAD_COUNT);
register!(REDIS_XACK_COUNT);
register!(PGPOOL_CONNECTIONS);
register!(PROGRAM_TRANSFORMER_TASKS);
register!(PROGRAM_TRANSFORMER_TASK_STATUS_COUNT);
register!(INGEST_JOB_TIME);
register!(DOWNLOAD_METADATA_INSERTED_COUNT);
register!(INGEST_TASKS);
register!(ACK_TASKS);
Expand Down Expand Up @@ -188,6 +191,12 @@ pub fn redis_xlen_set(stream: &str, len: usize) {
.set(len as i64);
}

pub fn ingest_job_time_set(stream: &str, consumer: &str, value: f64) {
INGEST_JOB_TIME
.with_label_values(&[stream, consumer])
.observe(value);
}

pub fn redis_xadd_status_inc(stream: &str, label: &str, status: Result<(), ()>, delta: usize) {
REDIS_XADD_STATUS_COUNT
.with_label_values(&[
Expand All @@ -198,15 +207,15 @@ pub fn redis_xadd_status_inc(stream: &str, label: &str, status: Result<(), ()>,
.inc_by(delta as u64);
}

pub fn redis_xread_inc(stream: &str, delta: usize) {
pub fn redis_xread_inc(stream: &str, consumer: &str, delta: usize) {
REDIS_XREAD_COUNT
.with_label_values(&[stream])
.with_label_values(&[stream, consumer])
.inc_by(delta as u64)
}

pub fn redis_xack_inc(stream: &str, delta: usize) {
pub fn redis_xack_inc(stream: &str, consumer: &str, delta: usize) {
REDIS_XACK_COUNT
.with_label_values(&[stream])
.with_label_values(&[stream, consumer])
.inc_by(delta as u64)
}

Expand All @@ -225,20 +234,20 @@ pub fn pgpool_connections_set(kind: PgpoolConnectionsKind, size: usize) {
.set(size as i64)
}

pub fn ingest_tasks_total_inc(stream: &str) {
INGEST_TASKS.with_label_values(&[stream]).inc()
pub fn ingest_tasks_total_inc(stream: &str, consumer: &str) {
INGEST_TASKS.with_label_values(&[stream, consumer]).inc()
}

pub fn ingest_tasks_total_dec(stream: &str) {
INGEST_TASKS.with_label_values(&[stream]).dec()
pub fn ingest_tasks_total_dec(stream: &str, consumer: &str) {
INGEST_TASKS.with_label_values(&[stream, consumer]).dec()
}

pub fn ack_tasks_total_inc(stream: &str) {
ACK_TASKS.with_label_values(&[stream]).inc()
pub fn ack_tasks_total_inc(stream: &str, consumer: &str) {
ACK_TASKS.with_label_values(&[stream, consumer]).inc()
}

pub fn ack_tasks_total_dec(stream: &str) {
ACK_TASKS.with_label_values(&[stream]).dec()
pub fn ack_tasks_total_dec(stream: &str, consumer: &str) {
ACK_TASKS.with_label_values(&[stream, consumer]).dec()
}

pub fn grpc_tasks_total_inc(label: &str, stream: &str) {
Expand Down Expand Up @@ -353,9 +362,13 @@ impl ProgramTransformerTaskStatusKind {
}
}

pub fn program_transformer_task_status_inc(kind: ProgramTransformerTaskStatusKind) {
pub fn program_transformer_task_status_inc(
stream: &str,
consumer: &str,
kind: ProgramTransformerTaskStatusKind,
) {
PROGRAM_TRANSFORMER_TASK_STATUS_COUNT
.with_label_values(&[kind.to_str()])
.with_label_values(&[stream, consumer, kind.to_str()])
.inc()
}

Expand Down
28 changes: 16 additions & 12 deletions grpc-ingest/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{
config::{ConfigIngestStream, REDIS_STREAM_DATA_KEY},
prom::{
ack_tasks_total_dec, ack_tasks_total_inc, ingest_tasks_total_dec,
ack_tasks_total_dec, ack_tasks_total_inc, ingest_job_time_set, ingest_tasks_total_dec,
ingest_tasks_total_inc, program_transformer_task_status_inc, redis_xack_inc,
redis_xlen_set, redis_xread_inc, ProgramTransformerTaskStatusKind,
},
Expand Down Expand Up @@ -313,7 +313,7 @@ impl Acknowledge {
config.name, response, count
);

redis_xack_inc(&config.name, count);
redis_xack_inc(&config.name, &config.consumer, count);
}
Err(e) => {
error!(
Expand All @@ -324,7 +324,7 @@ impl Acknowledge {
}
}

ack_tasks_total_dec(&config.name);
ack_tasks_total_dec(&config.name, &config.consumer);
}
}

Expand Down Expand Up @@ -424,33 +424,37 @@ impl<H: MessageHandler> IngestStream<H> {
let ack_tx = ack_tx.clone();
let config = Arc::clone(&config);

ingest_tasks_total_inc(&config.name);
ingest_tasks_total_inc(&config.name, &config.consumer);

tasks.push(tokio::spawn(async move {
let start_time = tokio::time::Instant::now();
let result = handler.handle(map).await.map_err(IngestMessageError::into);
let elapsed_time = start_time.elapsed().as_secs_f64();

ingest_job_time_set(&config.name, &config.consumer, elapsed_time);

match result {
Ok(()) => {
program_transformer_task_status_inc(ProgramTransformerTaskStatusKind::Success);
program_transformer_task_status_inc(&config.name, &config.consumer, ProgramTransformerTaskStatusKind::Success);
}
Err(IngestMessageError::RedisStreamMessage(e)) => {
error!("Failed to process message: {:?}", e);
program_transformer_task_status_inc(e.into());
program_transformer_task_status_inc(&config.name, &config.consumer, e.into());
}
Err(IngestMessageError::DownloadMetadataJson(e)) => {
program_transformer_task_status_inc(e.into());
program_transformer_task_status_inc(&config.name, &config.consumer, e.into());
}
Err(IngestMessageError::ProgramTransformer(e)) => {
error!("Failed to process message: {:?}", e);
program_transformer_task_status_inc(e.into());
program_transformer_task_status_inc(&config.name, &config.consumer, e.into());
}
}

if let Err(e) = ack_tx.send(id).await {
error!(target: "ingest_stream", "action=send_ack stream={} error={:?}", &config.name, e);
}

ingest_tasks_total_dec(&config.name);
ingest_tasks_total_dec(&config.name, &config.consumer);
}));
}
}
Expand Down Expand Up @@ -487,7 +491,7 @@ impl<H: MessageHandler> IngestStream<H> {
let handler = Arc::clone(&handler);


ack_tasks_total_inc(&config.name);
ack_tasks_total_inc(&config.name, &config.consumer);

tasks.push(tokio::spawn(async move {
handler.handle(ids).await;
Expand All @@ -503,7 +507,7 @@ impl<H: MessageHandler> IngestStream<H> {
let ids = std::mem::take(&mut pending);
let handler = Arc::clone(&handler);

ack_tasks_total_inc(&config.name);
ack_tasks_total_inc(&config.name, &config.consumer);

tasks.push(tokio::spawn(async move {
handler.handle(ids).await;
Expand Down Expand Up @@ -586,7 +590,7 @@ impl<H: MessageHandler> IngestStream<H> {
let count = ids.len();
debug!(target: "ingest_stream", "action=xread stream={} count={:?}", &config.name, count);

redis_xread_inc(&config.name, count);
redis_xread_inc(&config.name, &config.consumer, count);

if let Err(e) = msg_tx.send(ids).await {
error!(target: "ingest_stream", "action=send_ids stream={} error={:?}", &config.name, e);
Expand Down

0 comments on commit ea897eb

Please sign in to comment.