Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Time Ingest jobs #182

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 40 additions & 27 deletions grpc-ingest/src/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use {
Body, Request, Response, Server, StatusCode,
},
program_transformers::error::ProgramTransformerError,
prometheus::{IntCounter, IntCounterVec, IntGauge, IntGaugeVec, Opts, Registry, TextEncoder},
prometheus::{
HistogramOpts, HistogramVec, IntCounter, IntCounterVec, IntGaugeVec, Opts, Registry,
TextEncoder,
},
std::{net::SocketAddr, sync::Once},
tracing::{error, info},
};
Expand All @@ -31,28 +34,29 @@ lazy_static::lazy_static! {
&["stream", "label", "status"]
).unwrap();

static ref REDIS_XREAD_COUNT: IntCounterVec = IntCounterVec::new(
static ref REDIS_XREAD_COUNT: IntCounterVec = IntCounterVec::new(
Opts::new("redis_xread_count", "Count of messages seen"),
&["stream"]
&["stream", "consumer"]
).unwrap();

static ref REDIS_XACK_COUNT: IntCounterVec = IntCounterVec::new(
Opts::new("redis_xack_count", "Total number of processed messages"),
&["stream"]
&["stream", "consumer"]
).unwrap();

static ref PGPOOL_CONNECTIONS: IntGaugeVec = IntGaugeVec::new(
Opts::new("pgpool_connections", "Total number of connections in Postgres Pool"),
&["kind"]
).unwrap();

static ref PROGRAM_TRANSFORMER_TASKS: IntGauge = IntGauge::new(
"program_transformer_tasks", "Number of tasks spawned for program transform"
).unwrap();

static ref PROGRAM_TRANSFORMER_TASK_STATUS_COUNT: IntCounterVec = IntCounterVec::new(
Opts::new("program_transformer_task_status_count", "Status of processed messages"),
&["status"],
&["stream", "consumer", "status"],
).unwrap();

static ref INGEST_JOB_TIME: HistogramVec = HistogramVec::new(
HistogramOpts::new("ingest_job_time", "Time taken for ingest jobs"),
&["stream", "consumer"]
).unwrap();

static ref DOWNLOAD_METADATA_INSERTED_COUNT: IntCounter = IntCounter::new(
Expand All @@ -61,20 +65,19 @@ lazy_static::lazy_static! {

static ref INGEST_TASKS: IntGaugeVec = IntGaugeVec::new(
Opts::new("ingest_tasks", "Number of tasks spawned for ingest"),
&["stream"]
&["stream", "consumer"]
).unwrap();

static ref ACK_TASKS: IntGaugeVec = IntGaugeVec::new(
Opts::new("ack_tasks", "Number of tasks spawned for ack redis messages"),
&["stream"]
&["stream", "consumer"]
).unwrap();

static ref GRPC_TASKS: IntGaugeVec = IntGaugeVec::new(
Opts::new("grpc_tasks", "Number of tasks spawned for writing grpc messages to redis "),
&["label","stream"]
).unwrap();


static ref BUBBLEGUM_TREE_TOTAL_LEAVES: IntGaugeVec = IntGaugeVec::new(
Opts::new("bubblegum_tree_total_leaves", "Total number of leaves in the bubblegum tree"),
&["tree"]
Expand Down Expand Up @@ -118,8 +121,8 @@ pub fn run_server(address: SocketAddr) -> anyhow::Result<()> {
register!(REDIS_XREAD_COUNT);
register!(REDIS_XACK_COUNT);
register!(PGPOOL_CONNECTIONS);
register!(PROGRAM_TRANSFORMER_TASKS);
register!(PROGRAM_TRANSFORMER_TASK_STATUS_COUNT);
register!(INGEST_JOB_TIME);
register!(DOWNLOAD_METADATA_INSERTED_COUNT);
register!(INGEST_TASKS);
register!(ACK_TASKS);
Expand Down Expand Up @@ -188,6 +191,12 @@ pub fn redis_xlen_set(stream: &str, len: usize) {
.set(len as i64);
}

pub fn ingest_job_time_set(stream: &str, consumer: &str, value: f64) {
INGEST_JOB_TIME
.with_label_values(&[stream, consumer])
.observe(value);
}

pub fn redis_xadd_status_inc(stream: &str, label: &str, status: Result<(), ()>, delta: usize) {
REDIS_XADD_STATUS_COUNT
.with_label_values(&[
Expand All @@ -198,15 +207,15 @@ pub fn redis_xadd_status_inc(stream: &str, label: &str, status: Result<(), ()>,
.inc_by(delta as u64);
}

pub fn redis_xread_inc(stream: &str, delta: usize) {
pub fn redis_xread_inc(stream: &str, consumer: &str, delta: usize) {
REDIS_XREAD_COUNT
.with_label_values(&[stream])
.with_label_values(&[stream, consumer])
.inc_by(delta as u64)
}

pub fn redis_xack_inc(stream: &str, delta: usize) {
pub fn redis_xack_inc(stream: &str, consumer: &str, delta: usize) {
REDIS_XACK_COUNT
.with_label_values(&[stream])
.with_label_values(&[stream, consumer])
.inc_by(delta as u64)
}

Expand All @@ -225,20 +234,20 @@ pub fn pgpool_connections_set(kind: PgpoolConnectionsKind, size: usize) {
.set(size as i64)
}

pub fn ingest_tasks_total_inc(stream: &str) {
INGEST_TASKS.with_label_values(&[stream]).inc()
pub fn ingest_tasks_total_inc(stream: &str, consumer: &str) {
INGEST_TASKS.with_label_values(&[stream, consumer]).inc()
}

pub fn ingest_tasks_total_dec(stream: &str) {
INGEST_TASKS.with_label_values(&[stream]).dec()
pub fn ingest_tasks_total_dec(stream: &str, consumer: &str) {
INGEST_TASKS.with_label_values(&[stream, consumer]).dec()
}

pub fn ack_tasks_total_inc(stream: &str) {
ACK_TASKS.with_label_values(&[stream]).inc()
pub fn ack_tasks_total_inc(stream: &str, consumer: &str) {
ACK_TASKS.with_label_values(&[stream, consumer]).inc()
}

pub fn ack_tasks_total_dec(stream: &str) {
ACK_TASKS.with_label_values(&[stream]).dec()
pub fn ack_tasks_total_dec(stream: &str, consumer: &str) {
ACK_TASKS.with_label_values(&[stream, consumer]).dec()
}

pub fn grpc_tasks_total_inc(label: &str, stream: &str) {
Expand Down Expand Up @@ -353,9 +362,13 @@ impl ProgramTransformerTaskStatusKind {
}
}

pub fn program_transformer_task_status_inc(kind: ProgramTransformerTaskStatusKind) {
pub fn program_transformer_task_status_inc(
stream: &str,
consumer: &str,
kind: ProgramTransformerTaskStatusKind,
) {
PROGRAM_TRANSFORMER_TASK_STATUS_COUNT
.with_label_values(&[kind.to_str()])
.with_label_values(&[stream, consumer, kind.to_str()])
.inc()
}

Expand Down
28 changes: 16 additions & 12 deletions grpc-ingest/src/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use {
crate::{
config::{ConfigIngestStream, REDIS_STREAM_DATA_KEY},
prom::{
ack_tasks_total_dec, ack_tasks_total_inc, ingest_tasks_total_dec,
ack_tasks_total_dec, ack_tasks_total_inc, ingest_job_time_set, ingest_tasks_total_dec,
ingest_tasks_total_inc, program_transformer_task_status_inc, redis_xack_inc,
redis_xlen_set, redis_xread_inc, ProgramTransformerTaskStatusKind,
},
Expand Down Expand Up @@ -313,7 +313,7 @@ impl Acknowledge {
config.name, response, count
);

redis_xack_inc(&config.name, count);
redis_xack_inc(&config.name, &config.consumer, count);
}
Err(e) => {
error!(
Expand All @@ -324,7 +324,7 @@ impl Acknowledge {
}
}

ack_tasks_total_dec(&config.name);
ack_tasks_total_dec(&config.name, &config.consumer);
}
}

Expand Down Expand Up @@ -424,33 +424,37 @@ impl<H: MessageHandler> IngestStream<H> {
let ack_tx = ack_tx.clone();
let config = Arc::clone(&config);

ingest_tasks_total_inc(&config.name);
ingest_tasks_total_inc(&config.name, &config.consumer);

tasks.push(tokio::spawn(async move {
let start_time = tokio::time::Instant::now();
let result = handler.handle(map).await.map_err(IngestMessageError::into);
let elapsed_time = start_time.elapsed().as_secs_f64();

ingest_job_time_set(&config.name, &config.consumer, elapsed_time);

match result {
Ok(()) => {
program_transformer_task_status_inc(ProgramTransformerTaskStatusKind::Success);
program_transformer_task_status_inc(&config.name, &config.consumer, ProgramTransformerTaskStatusKind::Success);
}
Err(IngestMessageError::RedisStreamMessage(e)) => {
error!("Failed to process message: {:?}", e);
program_transformer_task_status_inc(e.into());
program_transformer_task_status_inc(&config.name, &config.consumer, e.into());
}
Err(IngestMessageError::DownloadMetadataJson(e)) => {
program_transformer_task_status_inc(e.into());
program_transformer_task_status_inc(&config.name, &config.consumer, e.into());
}
Err(IngestMessageError::ProgramTransformer(e)) => {
error!("Failed to process message: {:?}", e);
program_transformer_task_status_inc(e.into());
program_transformer_task_status_inc(&config.name, &config.consumer, e.into());
}
}

if let Err(e) = ack_tx.send(id).await {
error!(target: "ingest_stream", "action=send_ack stream={} error={:?}", &config.name, e);
}

ingest_tasks_total_dec(&config.name);
ingest_tasks_total_dec(&config.name, &config.consumer);
}));
}
}
Expand Down Expand Up @@ -487,7 +491,7 @@ impl<H: MessageHandler> IngestStream<H> {
let handler = Arc::clone(&handler);


ack_tasks_total_inc(&config.name);
ack_tasks_total_inc(&config.name, &config.consumer);

tasks.push(tokio::spawn(async move {
handler.handle(ids).await;
Expand All @@ -503,7 +507,7 @@ impl<H: MessageHandler> IngestStream<H> {
let ids = std::mem::take(&mut pending);
let handler = Arc::clone(&handler);

ack_tasks_total_inc(&config.name);
ack_tasks_total_inc(&config.name, &config.consumer);

tasks.push(tokio::spawn(async move {
handler.handle(ids).await;
Expand Down Expand Up @@ -586,7 +590,7 @@ impl<H: MessageHandler> IngestStream<H> {
let count = ids.len();
debug!(target: "ingest_stream", "action=xread stream={} count={:?}", &config.name, count);

redis_xread_inc(&config.name, count);
redis_xread_inc(&config.name, &config.consumer, count);

if let Err(e) = msg_tx.send(ids).await {
error!(target: "ingest_stream", "action=send_ids stream={} error={:?}", &config.name, e);
Expand Down
Loading