Skip to content

Commit

Permalink
feat(prover): Add endpoint to PJM to get queue reports (#2918)
Browse files Browse the repository at this point in the history
## What ❔

Add `/queue_report` endpoint, which will get the data about queue and
send it.

## Why ❔

To work with new autoscaler

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [ ] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [ ] Code has been formatted via `zk fmt` and `zk lint`.

---------

Co-authored-by: EmilLuta <[email protected]>
  • Loading branch information
Artemka374 and EmilLuta authored Sep 19, 2024
1 parent 633bca4 commit 2cec83f
Show file tree
Hide file tree
Showing 18 changed files with 334 additions and 13 deletions.
3 changes: 2 additions & 1 deletion core/lib/basic_types/src/prover_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::{net::IpAddr, ops::Add, str::FromStr};

use chrono::{DateTime, Duration, NaiveDateTime, NaiveTime, Utc};
use serde::{Deserialize, Serialize};
use strum::{Display, EnumString};

use crate::{
Expand All @@ -27,7 +28,7 @@ pub struct ExtendedJobCountStatistics {
pub successful: usize,
}

#[derive(Debug, Clone, Copy, Default)]
#[derive(Debug, Clone, Copy, Default, Serialize, Deserialize)]
pub struct JobCountStatistics {
pub queued: usize,
pub in_progress: usize,
Expand Down
2 changes: 2 additions & 0 deletions core/lib/config/src/configs/prover_job_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ pub struct ProverJobMonitorConfig {
/// The interval between runs for Witness Job Queuer.
#[serde(default = "ProverJobMonitorConfig::default_witness_job_queuer_run_interval_ms")]
pub witness_job_queuer_run_interval_ms: u64,
/// HTTP port of the ProverJobMonitor to send requests to.
pub http_port: u16,
}

impl ProverJobMonitorConfig {
Expand Down
1 change: 1 addition & 0 deletions core/lib/config/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,6 +1113,7 @@ impl Distribution<configs::prover_job_monitor::ProverJobMonitorConfig> for Encod
prover_queue_reporter_run_interval_ms: self.sample(rng),
witness_generator_queue_reporter_run_interval_ms: self.sample(rng),
witness_job_queuer_run_interval_ms: self.sample(rng),
http_port: self.sample(rng),
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions core/lib/env_config/src/prover_job_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod tests {
prover_queue_reporter_run_interval_ms: 10000,
witness_generator_queue_reporter_run_interval_ms: 10000,
witness_job_queuer_run_interval_ms: 10000,
http_port: 3074,
}
}

Expand All @@ -55,6 +56,7 @@ mod tests {
fn from_env_with_default() {
let config = r#"
PROVER_JOB_MONITOR_PROMETHEUS_PORT=3317
PROVER_JOB_MONITOR_HTTP_PORT=3074
PROVER_JOB_MONITOR_MAX_DB_CONNECTIONS=9
"#;
let mut lock = MUTEX.lock();
Expand All @@ -80,6 +82,7 @@ mod tests {
PROVER_JOB_MONITOR_PROVER_QUEUE_REPORTER_RUN_INTERVAL_MS=10001
PROVER_JOB_MONITOR_WITNESS_GENERATOR_QUEUE_REPORTER_RUN_INTERVAL_MS=10001
PROVER_JOB_MONITOR_WITNESS_JOB_QUEUER_RUN_INTERVAL_MS=10001
PROVER_JOB_MONITOR_HTTP_PORT=3074
"#;
let mut lock = MUTEX.lock();
lock.set_env(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ message ProverJobMonitor {
optional uint64 prover_queue_reporter_run_interval_ms = 12; // optional; ms
optional uint64 witness_generator_queue_reporter_run_interval_ms = 13; // optional; ms
optional uint64 witness_job_queuer_run_interval_ms = 14; // optional; ms
optional uint32 http_port = 15; // required; u32
}
4 changes: 4 additions & 0 deletions core/lib/protobuf_config/src/prover_job_monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ impl ProtoRepr for proto::ProverJobMonitor {
.or_else(|| Some(Self::Type::default_witness_job_queuer_run_interval_ms())),
)
.context("witness_job_queuer_run_interval_ms")?,
http_port: required(&self.http_port)
.and_then(|x| Ok((*x).try_into()?))
.context("http_port")?,
})
}

Expand Down Expand Up @@ -126,6 +129,7 @@ impl ProtoRepr for proto::ProverJobMonitor {
this.witness_generator_queue_reporter_run_interval_ms,
),
witness_job_queuer_run_interval_ms: Some(this.witness_job_queuer_run_interval_ms),
http_port: Some(this.http_port.into()),
}
}
}
1 change: 1 addition & 0 deletions etc/env/base/prover_job_monitor.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ proof_compressor_queue_reporter_run_interval_ms = 10000
prover_queue_reporter_run_interval_ms = 10000
witness_generator_queue_reporter_run_interval_ms = 10000
witness_job_queuer_run_interval_ms = 10000
http_port = 3074
1 change: 1 addition & 0 deletions etc/env/file_based/general.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ prover_job_monitor:
prover_queue_reporter_run_interval_ms: 10000
witness_generator_queue_reporter_run_interval_ms: 10000
witness_job_queuer_run_interval_ms: 10000
http_port: 3074


base_token_adjuster:
Expand Down
29 changes: 25 additions & 4 deletions prover/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions prover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ categories = ["cryptography"]
[workspace.dependencies]
# Common dependencies
anyhow = "1.0"
axum = "0.7.5"
async-trait = "0.1"
bincode = "1"
chrono = "0.4.38"
Expand Down
3 changes: 3 additions & 0 deletions prover/crates/bin/prover_job_monitor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ zksync_prover_dal.workspace = true
zksync_utils.workspace = true
zksync_types.workspace = true
zksync_config = { workspace = true, features = ["observability_ext"] }
zksync_db_connection.workspace = true

vise.workspace = true

Expand All @@ -25,3 +26,5 @@ clap = { workspace = true, features = ["derive"] }
ctrlc = { workspace = true, features = ["termination"] }
tracing.workspace = true
async-trait.workspace = true
serde.workspace = true
axum.workspace = true
176 changes: 176 additions & 0 deletions prover/crates/bin/prover_job_monitor/src/autoscaler_queue_reporter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use std::collections::HashMap;

use axum::{
http::StatusCode,
response::{IntoResponse, Response},
routing::get,
Json, Router,
};
use serde::{Deserialize, Serialize};
use zksync_db_connection::error::DalError;
use zksync_prover_dal::{ConnectionPool, Prover, ProverDal};
use zksync_types::{
basic_fri_types::AggregationRound, protocol_version::ProtocolSemanticVersion,
prover_dal::JobCountStatistics,
};

#[derive(Debug, Clone)]
pub struct AutoscalerQueueReporter {
connection_pool: ConnectionPool<Prover>,
}

#[derive(Default, Debug, Serialize, Deserialize)]
pub struct QueueReport {
pub basic_witness_jobs: JobCountStatistics,
pub leaf_witness_jobs: JobCountStatistics,
pub node_witness_jobs: JobCountStatistics,
pub recursion_tip_witness_jobs: JobCountStatistics,
pub scheduler_witness_jobs: JobCountStatistics,
pub prover_jobs: JobCountStatistics,
pub proof_compressor_jobs: JobCountStatistics,
}

#[derive(Default, Debug, Serialize, Deserialize)]
pub struct VersionedQueueReport {
pub version: ProtocolSemanticVersion,
pub report: QueueReport,
}

impl AutoscalerQueueReporter {
pub fn new(connection_pool: ConnectionPool<Prover>) -> Self {
Self { connection_pool }
}

pub async fn get_report(&self) -> Result<Json<Vec<VersionedQueueReport>>, ProcessorError> {
tracing::debug!("Received request to get queue report");

let mut result = HashMap::<ProtocolSemanticVersion, QueueReport>::new();

for round in AggregationRound::ALL_ROUNDS {
self.get_witness_jobs_report(round, &mut result).await?;
}

self.get_prover_jobs_report(&mut result).await?;
self.get_proof_compressor_jobs_report(&mut result).await?;

Ok(Json(
result
.into_iter()
.map(|(version, report)| VersionedQueueReport { version, report })
.collect(),
))
}

async fn get_witness_jobs_report(
&self,
aggregation_round: AggregationRound,
state: &mut HashMap<ProtocolSemanticVersion, QueueReport>,
) -> anyhow::Result<()> {
let stats = self
.connection_pool
.connection()
.await?
.fri_witness_generator_dal()
.get_witness_jobs_stats(aggregation_round)
.await;

for (protocol_version, job_stats) in stats {
let report = state.entry(protocol_version).or_default();

match aggregation_round {
AggregationRound::BasicCircuits => report.basic_witness_jobs = job_stats,
AggregationRound::LeafAggregation => report.leaf_witness_jobs = job_stats,
AggregationRound::NodeAggregation => report.node_witness_jobs = job_stats,
AggregationRound::RecursionTip => report.recursion_tip_witness_jobs = job_stats,
AggregationRound::Scheduler => report.scheduler_witness_jobs = job_stats,
}
}
Ok(())
}

async fn get_prover_jobs_report(
&self,
state: &mut HashMap<ProtocolSemanticVersion, QueueReport>,
) -> anyhow::Result<()> {
let stats = self
.connection_pool
.connection()
.await?
.fri_prover_jobs_dal()
.get_generic_prover_jobs_stats()
.await;

for (protocol_version, stats) in stats {
let report = state.entry(protocol_version).or_default();

report.prover_jobs = stats;
}
Ok(())
}

async fn get_proof_compressor_jobs_report(
&self,
state: &mut HashMap<ProtocolSemanticVersion, QueueReport>,
) -> anyhow::Result<()> {
let stats = self
.connection_pool
.connection()
.await?
.fri_proof_compressor_dal()
.get_jobs_stats()
.await;

for (protocol_version, stats) in stats {
let report = state.entry(protocol_version).or_default();

report.proof_compressor_jobs = stats;
}

Ok(())
}
}

pub fn get_queue_reporter_router(connection_pool: ConnectionPool<Prover>) -> Router {
let autoscaler_queue_reporter = AutoscalerQueueReporter::new(connection_pool);

Router::new().route(
"/queue_report",
get(move || async move { autoscaler_queue_reporter.get_report().await }),
)
}

pub enum ProcessorError {
Dal(DalError),
Custom(String),
}

impl From<DalError> for ProcessorError {
fn from(err: DalError) -> Self {
ProcessorError::Dal(err)
}
}

impl From<anyhow::Error> for ProcessorError {
fn from(err: anyhow::Error) -> Self {
ProcessorError::Custom(err.to_string())
}
}

impl IntoResponse for ProcessorError {
fn into_response(self) -> Response {
let (status_code, message) = match self {
ProcessorError::Dal(err) => {
tracing::error!("Sqlx error: {:?}", err);
(
StatusCode::INTERNAL_SERVER_ERROR,
"Failed getting data from database",
)
}
ProcessorError::Custom(err) => {
tracing::error!("Custom error invoked: {:?}", &err);
(StatusCode::INTERNAL_SERVER_ERROR, "Internal error")
}
};
(status_code, message).into_response()
}
}
1 change: 1 addition & 0 deletions prover/crates/bin/prover_job_monitor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod archiver;
pub mod autoscaler_queue_reporter;
pub mod job_requeuer;
pub(crate) mod metrics;
pub mod queue_reporter;
Expand Down
Loading

0 comments on commit 2cec83f

Please sign in to comment.