Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Prover CLI): status command lwg #1830

Merged
merged 12 commits into from
Apr 30, 2024
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/lib/basic_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ serde_json.workspace = true
chrono.workspace = true
strum = { workspace = true, features = ["derive"] }
num_enum.workspace = true
sqlx = { workspace = true, feature= ["derive"]}
56 changes: 53 additions & 3 deletions core/lib/basic_types/src/prover_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ pub struct ProverJobStatusInProgress {
pub started_at: DateTime<Utc>,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct WitnessJobStatusSuccessful {
pub started_at: DateTime<Utc>,
pub time_taken: Duration,
Expand All @@ -139,7 +139,7 @@ impl Default for WitnessJobStatusSuccessful {
}
}

#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub struct WitnessJobStatusFailed {
pub started_at: DateTime<Utc>,
pub error: String,
Expand All @@ -161,7 +161,7 @@ pub enum ProverJobStatus {
Ignored,
}

#[derive(Debug, strum::Display, strum::EnumString, strum::AsRefStr)]
#[derive(Debug, Clone, strum::Display, strum::EnumString, strum::AsRefStr)]
pub enum WitnessJobStatus {
#[strum(serialize = "failed")]
Failed(WitnessJobStatusFailed),
Expand Down Expand Up @@ -273,6 +273,56 @@ pub struct BasicWitnessGeneratorJobInfo {
pub eip_4844_blobs: Option<Eip4844Blobs>,
}

pub struct LeafWitnessGeneratorJobInfo {
ColoCarletti marked this conversation as resolved.
Show resolved Hide resolved
pub id: u32,
pub l1_batch_number: L1BatchNumber,
pub circuit_id: u32,
pub closed_form_inputs_blob_url: Option<String>,
pub attempts: u32,
pub status: WitnessJobStatus,
pub error: Option<String>,
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
pub processing_started_at: Option<NaiveDateTime>,
pub time_taken: Option<NaiveTime>,
pub is_blob_cleaned: Option<bool>,
pub number_of_basic_circuits: Option<i32>,
pub protocol_version: Option<i32>,
pub picked_by: Option<String>,
}

pub struct NodeWitnessGeneratorJobInfo {
ColoCarletti marked this conversation as resolved.
Show resolved Hide resolved
pub id: u32,
pub l1_batch_number: L1BatchNumber,
pub circuit_id: u32,
pub depth: u32,
pub status: WitnessJobStatus,
pub attempts: u32,
pub aggregations_url: Option<String>,
pub processing_started_at: Option<NaiveDateTime>,
pub time_taken: Option<NaiveTime>,
pub error: Option<String>,
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
pub number_of_dependent_jobs: Option<i32>,
pub protocol_version: Option<i32>,
pub picked_by: Option<String>,
}

pub struct SchedulerWitnessGeneratorJobInfo {
pub l1_batch_number: L1BatchNumber,
pub scheduler_partial_input_blob_url: String,
pub status: WitnessJobStatus,
pub processing_started_at: Option<NaiveDateTime>,
pub time_taken: Option<NaiveTime>,
pub error: Option<String>,
pub created_at: NaiveDateTime,
pub updated_at: NaiveDateTime,
pub attempts: u32,
pub protocol_version: Option<i32>,
pub picked_by: Option<String>,
}

#[derive(Debug, EnumString, Display)]
pub enum ProofCompressionJobStatus {
#[strum(serialize = "queued")]
Expand Down
1 change: 1 addition & 0 deletions prover/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

74 changes: 73 additions & 1 deletion prover/prover_cli/src/commands/status/batch.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use anyhow::{ensure, Context as _};
use clap::Args as ClapArgs;
use prover_dal::{Connection, ConnectionPool, Prover, ProverDal};
use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber};
use zksync_types::{
basic_fri_types::AggregationRound, prover_dal::WitnessJobStatus, L1BatchNumber,
};

use super::utils::{AggregationRoundInfo, BatchData, Task, TaskStatus};
use crate::commands::status::utils::postgres_config;
Expand Down Expand Up @@ -53,6 +55,34 @@ async fn get_batches_data(batches: Vec<L1BatchNumber>) -> anyhow::Result<Vec<Bat
)
.await,
},
leaf_witness_generator: Task::LeafWitnessGenerator {
status: get_proof_leaf_witness_generator_status_for_batch(batch, &mut conn).await,
aggregation_round_info: get_prover_jobs_data_for_batch(
batch,
AggregationRound::LeafAggregation,
&mut conn,
)
.await,
},
node_witness_generator: Task::LeafWitnessGenerator {
status: get_proof_node_witness_generator_status_for_batch(batch, &mut conn).await,
aggregation_round_info: get_prover_jobs_data_for_batch(
batch,
AggregationRound::NodeAggregation,
&mut conn,
)
.await,
},
scheduler_witness_generator: Task::LeafWitnessGenerator {
status: get_proof_scheduler_witness_generator_status_for_batch(batch, &mut conn)
.await,
aggregation_round_info: get_prover_jobs_data_for_batch(
batch,
AggregationRound::Scheduler,
&mut conn,
)
.await,
},
compressor: Task::Compressor(
get_proof_compression_job_status_for_batch(batch, &mut conn).await,
),
Expand Down Expand Up @@ -92,6 +122,48 @@ async fn get_proof_basic_witness_generator_status_for_batch<'a>(
.unwrap_or_default()
}

async fn get_proof_leaf_witness_generator_status_for_batch<'a>(
batch_number: L1BatchNumber,
conn: &mut Connection<'a, Prover>,
) -> TaskStatus {
let status_vec: Vec<WitnessJobStatus> = conn
.fri_witness_generator_dal()
.get_leaf_witness_generator_jobs_for_batch(batch_number)
.await
.iter()
.map(|s| s.status.clone())
.collect();
TaskStatus::from(status_vec)
}

async fn get_proof_node_witness_generator_status_for_batch<'a>(
batch_number: L1BatchNumber,
conn: &mut Connection<'a, Prover>,
) -> TaskStatus {
let status_vec: Vec<WitnessJobStatus> = conn
.fri_witness_generator_dal()
.get_node_witness_generator_jobs_for_batch(batch_number)
.await
.iter()
.map(|s| s.status.clone())
.collect();
TaskStatus::from(status_vec)
}

async fn get_proof_scheduler_witness_generator_status_for_batch<'a>(
batch_number: L1BatchNumber,
conn: &mut Connection<'a, Prover>,
) -> TaskStatus {
let status_vec: Vec<WitnessJobStatus> = conn
.fri_witness_generator_dal()
.get_scheduler_witness_generator_jobs_for_batch(batch_number)
.await
.iter()
.map(|s| s.status.clone())
.collect();
TaskStatus::from(status_vec)
}

async fn get_proof_compression_job_status_for_batch<'a>(
batch_number: L1BatchNumber,
conn: &mut Connection<'a, Prover>,
Expand Down
39 changes: 32 additions & 7 deletions prover/prover_cli/src/commands/status/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct BatchData {
/// The recursion tip data.
pub recursion_tip: Task,
/// The scheduler data.
pub scheduler: Task,
pub scheduler_witness_generator: Task,
/// The compressor data.
pub compressor: Task,
}
Expand All @@ -45,7 +45,7 @@ impl Debug for BatchData {
writeln!(f, "{:?}", self.leaf_witness_generator)?;
writeln!(f, "{:?}", self.node_witness_generator)?;
writeln!(f, "{:?}", self.recursion_tip)?;
writeln!(f, "{:?}", self.scheduler)?;
writeln!(f, "{:?}", self.scheduler_witness_generator)?;
writeln!(f, "{:?}", self.compressor)
}
}
Expand Down Expand Up @@ -82,7 +82,7 @@ impl Default for BatchData {
prover_jobs_status: TaskStatus::default(),
},
},
scheduler: Task::Scheduler {
scheduler_witness_generator: Task::SchedulerWitnessGenerator {
status: TaskStatus::default(),
aggregation_round_info: AggregationRoundInfo {
round: AggregationRound::Scheduler,
Expand Down Expand Up @@ -157,6 +157,31 @@ impl From<ProofCompressionJobStatus> for TaskStatus {
}
}

impl From<Vec<WitnessJobStatus>> for TaskStatus {
fn from(status_vector: Vec<WitnessJobStatus>) -> Self {
if status_vector.is_empty() {
TaskStatus::JobsNotFound
} else if status_vector
.iter()
.all(|job| matches!(job, WitnessJobStatus::Queued))
{
TaskStatus::Queued
} else if status_vector
.iter()
.all(|job| matches!(job, WitnessJobStatus::WaitingForProofs))
{
TaskStatus::WaitingForProofs
} else if status_vector
.iter()
.all(|job| matches!(job, WitnessJobStatus::InProgress))
{
TaskStatus::Successful
} else {
TaskStatus::InProgress
}
}
}

#[derive(EnumString, Clone, Display)]
pub enum Task {
/// Represents the basic witness generator task and its status.
Expand Down Expand Up @@ -185,7 +210,7 @@ pub enum Task {
},
/// Represents the scheduler task and its status.
#[strum(to_string = "Scheduler")]
Scheduler {
SchedulerWitnessGenerator {
status: TaskStatus,
aggregation_round_info: AggregationRoundInfo,
},
Expand All @@ -201,7 +226,7 @@ impl Task {
| Task::LeafWitnessGenerator { status, .. }
| Task::NodeWitnessGenerator { status, .. }
| Task::RecursionTip { status, .. }
| Task::Scheduler { status, .. }
| Task::SchedulerWitnessGenerator { status, .. }
| Task::Compressor(status) => status.clone(),
}
}
Expand All @@ -224,7 +249,7 @@ impl Task {
aggregation_round_info,
..
}
| Task::Scheduler {
| Task::SchedulerWitnessGenerator {
aggregation_round_info,
..
} => Some(aggregation_round_info.round),
Expand Down Expand Up @@ -254,7 +279,7 @@ impl Task {
status,
aggregation_round_info,
}
| Task::Scheduler {
| Task::SchedulerWitnessGenerator {
status,
aggregation_round_info,
} => match status {
Expand Down
Loading
Loading