diff --git a/core/lib/db_connection/src/connection_pool.rs b/core/lib/db_connection/src/connection_pool.rs index 7cf29632b7df..d262e374aef3 100644 --- a/core/lib/db_connection/src/connection_pool.rs +++ b/core/lib/db_connection/src/connection_pool.rs @@ -158,6 +158,14 @@ impl TestTemplate { Ok(Self(db_url.parse()?)) } + pub fn prover_empty() -> anyhow::Result { + let db_url = env::var("TEST_DATABASE_PROVER_URL").context( + "TEST_DATABASE_PROVER_URL must be set. Normally, this is done by the 'zk' tool. \ + Make sure that you are running the tests with 'zk test rust' command or equivalent.", + )?; + Ok(Self(db_url.parse()?)) + } + /// Closes the connection pool, disallows connecting to the underlying db, /// so that the db can be used as a template. pub async fn freeze(pool: ConnectionPool) -> anyhow::Result { @@ -291,6 +299,11 @@ impl ConnectionPool { Self::constrained_test_pool(DEFAULT_CONNECTIONS).await } + pub async fn prover_test_pool() -> ConnectionPool { + const DEFAULT_CONNECTIONS: u32 = 100; // Expected to be enough for any unit test. + Self::constrained_prover_test_pool(DEFAULT_CONNECTIONS).await + } + /// Same as [`Self::test_pool()`], but with a configurable number of connections. This is useful to test /// behavior of components that rely on singleton / constrained pools in production. pub async fn constrained_test_pool(connections: u32) -> ConnectionPool { @@ -309,6 +322,22 @@ impl ConnectionPool { pool } + pub async fn constrained_prover_test_pool(connections: u32) -> ConnectionPool { + assert!(connections > 0, "Number of connections must be positive"); + let mut builder = TestTemplate::prover_empty() + .expect("failed creating test template") + .create_db(connections) + .await + .expect("failed creating database for tests"); + let mut pool = builder + .set_acquire_timeout(Some(Self::TEST_ACQUIRE_TIMEOUT)) + .build() + .await + .expect("cannot build connection pool"); + pool.traced_connections = Some(Arc::default()); + pool + } + /// Initializes a builder for connection pools. pub fn builder(database_url: SensitiveUrl, max_pool_size: u32) -> ConnectionPoolBuilder { ConnectionPoolBuilder { diff --git a/infrastructure/zk/src/test/test.ts b/infrastructure/zk/src/test/test.ts index 2e3202051917..9059283af447 100644 --- a/infrastructure/zk/src/test/test.ts +++ b/infrastructure/zk/src/test/test.ts @@ -7,9 +7,25 @@ import * as db from '../database'; export { integration }; -export async function prover() { +export async function prover(options: string[]) { + await db.resetTest({ core: false, prover: true }); process.chdir(process.env.ZKSYNC_HOME! + '/prover'); - await utils.spawn('cargo test --release --workspace --locked'); + + let result = await utils.exec('cargo install --list'); + let test_runner = 'cargo nextest run'; + + if (!result.stdout.includes('cargo-nextest')) { + console.warn( + chalk.bold.red( + `cargo-nextest is missing, please run "cargo install cargo-nextest". Falling back to "cargo test".` + ) + ); + test_runner = 'cargo test'; + } + + let cmd = `${test_runner} --release --locked --${options.join(' ')}`; + console.log(`running prover unit tests with '${cmd}'`); + await utils.spawn(cmd); } export async function rust(options: string[]) { @@ -38,7 +54,13 @@ export async function l1Contracts() { export const command = new Command('test').description('run test suites').addCommand(integration.command); -command.command('prover').description('run unit-tests for the prover').action(prover); +command + .command('prover [command...]') + .allowUnknownOption() + .description('run unit-tests for the prover') + .action(async (args: string[]) => { + await prover(args); + }); command.command('l1-contracts').description('run unit-tests for the layer 1 smart contracts').action(l1Contracts); command .command('rust [command...]') diff --git a/prover/Cargo.lock b/prover/Cargo.lock index 5ac79d1dd0f9..e48dc075b2f5 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -193,6 +193,21 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "assert_cmd" +version = "2.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed72493ac66d5804837f480ab3766c72bdfab91a65e565fc54fa9e42db0073a8" +dependencies = [ + "anstyle", + "bstr", + "doc-comment", + "predicates", + "predicates-core", + "predicates-tree", + "wait-timeout", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -754,6 +769,17 @@ dependencies = [ "syn_derive", ] +[[package]] +name = "bstr" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05efc5cfd9110c8416e471df0e96702d58690178e206e61b7173706673c93706" +dependencies = [ + "memchr", + "regex-automata 0.4.6", + "serde", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -1598,6 +1624,12 @@ version = "0.1.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56254986775e3233ffa9c4d7d3faaf6d36a2c09d30b20687e9f88bc8bafc16c8" +[[package]] +name = "difflib" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6184e33543162437515c2e2b48714794e37845ec9851711914eec9d308f6ebe8" + [[package]] name = "digest" version = "0.9.0" @@ -1619,6 +1651,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + [[package]] name = "dotenvy" version = "0.15.7" @@ -4136,6 +4174,33 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "predicates" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e9086cc7640c29a356d1a29fd134380bee9d8f79a17410aa76e7ad295f42c97" +dependencies = [ + "anstyle", + "difflib", + "predicates-core", +] + +[[package]] +name = "predicates-core" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae8177bee8e75d6846599c6b9ff679ed51e882816914eec639944d7c9aa11931" + +[[package]] +name = "predicates-tree" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41b740d195ed3166cd147c8047ec98db0e22ec019eb8eeb76d343b795304fb13" +dependencies = [ + "predicates-core", + "termtree", +] + [[package]] name = "pretty_assertions" version = "1.4.0" @@ -4421,6 +4486,7 @@ name = "prover_cli" version = "0.1.0" dependencies = [ "anyhow", + "assert_cmd", "bincode", "chrono", "circuit_definitions", @@ -4429,7 +4495,6 @@ dependencies = [ "dialoguer", "hex", "serde_json", - "sqlx", "strum", "tokio", "tracing", @@ -6085,6 +6150,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "termtree" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3369f5ac52d5eb6ab48c6b4ffdc8efbcad6b89c765749064ba298f2c68a16a76" + [[package]] name = "test-log" version = "0.2.16" diff --git a/prover/crates/bin/prover_cli/Cargo.toml b/prover/crates/bin/prover_cli/Cargo.toml index f91cd47e0945..e4ccb280574d 100644 --- a/prover/crates/bin/prover_cli/Cargo.toml +++ b/prover/crates/bin/prover_cli/Cargo.toml @@ -32,12 +32,14 @@ zksync_dal.workspace = true zksync_utils.workspace = true strum.workspace = true colored.workspace = true -sqlx.workspace = true circuit_definitions.workspace = true serde_json.workspace = true zkevm_test_harness = { workspace = true, optional = true, features = ["verbose_circuits"] } chrono.workspace = true +[dev-dependencies] +assert_cmd = "2" + [features] # enable verbose circuits, if you want to use debug_circuit command (as it is quite heavy dependency). verbose_circuits = ["zkevm_test_harness"] diff --git a/prover/crates/bin/prover_cli/src/cli.rs b/prover/crates/bin/prover_cli/src/cli.rs index 7174830f44d1..0c7022cae297 100644 --- a/prover/crates/bin/prover_cli/src/cli.rs +++ b/prover/crates/bin/prover_cli/src/cli.rs @@ -1,19 +1,37 @@ use clap::{command, Args, Parser, Subcommand}; use zksync_types::url::SensitiveUrl; -use crate::commands::{self, config, debug_proof, delete, get_file_info, requeue, restart, stats}; +use crate::commands::{ + config, debug_proof, delete, get_file_info, requeue, restart, stats, status::StatusCommand, +}; pub const VERSION_STRING: &str = env!("CARGO_PKG_VERSION"); #[derive(Parser)] #[command(name = "prover-cli", version = VERSION_STRING, about, long_about = None)] -struct ProverCLI { +pub struct ProverCLI { #[command(subcommand)] command: ProverCommand, #[clap(flatten)] config: ProverCLIConfig, } +impl ProverCLI { + pub async fn start(self) -> anyhow::Result<()> { + match self.command { + ProverCommand::FileInfo(args) => get_file_info::run(args).await?, + ProverCommand::Config(cfg) => config::run(cfg).await?, + ProverCommand::Delete(args) => delete::run(args, self.config).await?, + ProverCommand::Status(cmd) => cmd.run(self.config).await?, + ProverCommand::Requeue(args) => requeue::run(args, self.config).await?, + ProverCommand::Restart(args) => restart::run(args).await?, + ProverCommand::DebugProof(args) => debug_proof::run(args).await?, + ProverCommand::Stats(args) => stats::run(args, self.config).await?, + }; + Ok(()) + } +} + // Note: this is set via the `config` command. Values are taken from the file pointed // by the env var `PLI__CONFIG` or from `$ZKSYNC_HOME/etc/pliconfig` if unset. #[derive(Args)] @@ -26,31 +44,15 @@ pub struct ProverCLIConfig { } #[derive(Subcommand)] -enum ProverCommand { +pub enum ProverCommand { DebugProof(debug_proof::Args), FileInfo(get_file_info::Args), Config(ProverCLIConfig), Delete(delete::Args), #[command(subcommand)] - Status(commands::StatusCommand), + Status(StatusCommand), Requeue(requeue::Args), Restart(restart::Args), #[command(about = "Displays L1 Batch proving stats for a given period")] Stats(stats::Options), } - -pub async fn start() -> anyhow::Result<()> { - let ProverCLI { command, config } = ProverCLI::parse(); - match command { - ProverCommand::FileInfo(args) => get_file_info::run(args).await?, - ProverCommand::Config(cfg) => config::run(cfg).await?, - ProverCommand::Delete(args) => delete::run(args, config).await?, - ProverCommand::Status(cmd) => cmd.run(config).await?, - ProverCommand::Requeue(args) => requeue::run(args, config).await?, - ProverCommand::Restart(args) => restart::run(args).await?, - ProverCommand::DebugProof(args) => debug_proof::run(args).await?, - ProverCommand::Stats(args) => stats::run(args, config).await?, - }; - - Ok(()) -} diff --git a/prover/crates/bin/prover_cli/src/commands/debug_proof.rs b/prover/crates/bin/prover_cli/src/commands/debug_proof.rs index 7875554ae920..26856ed6ca8d 100644 --- a/prover/crates/bin/prover_cli/src/commands/debug_proof.rs +++ b/prover/crates/bin/prover_cli/src/commands/debug_proof.rs @@ -1,13 +1,13 @@ use clap::Args as ClapArgs; #[derive(ClapArgs)] -pub(crate) struct Args { +pub struct Args { /// File with the basic proof. #[clap(short, long)] file: String, } -pub(crate) async fn run(_args: Args) -> anyhow::Result<()> { +pub async fn run(_args: Args) -> anyhow::Result<()> { #[cfg(not(feature = "verbose_circuits"))] anyhow::bail!("Please compile with verbose_circuits feature"); #[cfg(feature = "verbose_circuits")] diff --git a/prover/crates/bin/prover_cli/src/commands/delete.rs b/prover/crates/bin/prover_cli/src/commands/delete.rs index 436bb10e10cb..da45a909af3b 100644 --- a/prover/crates/bin/prover_cli/src/commands/delete.rs +++ b/prover/crates/bin/prover_cli/src/commands/delete.rs @@ -7,7 +7,7 @@ use zksync_types::L1BatchNumber; use crate::cli::ProverCLIConfig; #[derive(ClapArgs)] -pub(crate) struct Args { +pub struct Args { /// Delete data from all batches #[clap( short, @@ -22,7 +22,7 @@ pub(crate) struct Args { batch: L1BatchNumber, } -pub(crate) async fn run(args: Args, config: ProverCLIConfig) -> anyhow::Result<()> { +pub async fn run(args: Args, config: ProverCLIConfig) -> anyhow::Result<()> { let confirmation = Input::::with_theme(&ColorfulTheme::default()) .with_prompt("Are you sure you want to delete the data?") .default("no".to_owned()) diff --git a/prover/crates/bin/prover_cli/src/commands/get_file_info.rs b/prover/crates/bin/prover_cli/src/commands/get_file_info.rs index cb4a45ca3908..271cf38c37a8 100644 --- a/prover/crates/bin/prover_cli/src/commands/get_file_info.rs +++ b/prover/crates/bin/prover_cli/src/commands/get_file_info.rs @@ -18,7 +18,7 @@ use zksync_prover_fri_types::{ use zksync_prover_interface::outputs::L1BatchProofForL1; #[derive(ClapArgs)] -pub(crate) struct Args { +pub struct Args { #[clap(short, long)] file_path: String, } diff --git a/prover/crates/bin/prover_cli/src/commands/mod.rs b/prover/crates/bin/prover_cli/src/commands/mod.rs index 4bc8b2eb392a..d9dde52284b4 100644 --- a/prover/crates/bin/prover_cli/src/commands/mod.rs +++ b/prover/crates/bin/prover_cli/src/commands/mod.rs @@ -1,4 +1,3 @@ -pub(crate) use status::StatusCommand; pub(crate) mod config; pub(crate) mod debug_proof; pub(crate) mod delete; @@ -6,4 +5,4 @@ pub(crate) mod get_file_info; pub(crate) mod requeue; pub(crate) mod restart; pub(crate) mod stats; -pub(crate) mod status; +pub mod status; diff --git a/prover/crates/bin/prover_cli/src/commands/restart.rs b/prover/crates/bin/prover_cli/src/commands/restart.rs index 75beafd7100c..24bd76e63357 100644 --- a/prover/crates/bin/prover_cli/src/commands/restart.rs +++ b/prover/crates/bin/prover_cli/src/commands/restart.rs @@ -8,7 +8,7 @@ use zksync_prover_dal::{ use zksync_types::{basic_fri_types::AggregationRound, L1BatchNumber}; #[derive(ClapArgs)] -pub(crate) struct Args { +pub struct Args { /// Batch number to restart #[clap( short, @@ -22,7 +22,7 @@ pub(crate) struct Args { prover_job: Option, } -pub(crate) async fn run(args: Args) -> anyhow::Result<()> { +pub async fn run(args: Args) -> anyhow::Result<()> { let config = DatabaseSecrets::from_env()?; let prover_connection_pool = ConnectionPool::::singleton(config.prover_url()?) .build() diff --git a/prover/crates/bin/prover_cli/src/commands/stats.rs b/prover/crates/bin/prover_cli/src/commands/stats.rs index 307775fa27d3..538238f22110 100644 --- a/prover/crates/bin/prover_cli/src/commands/stats.rs +++ b/prover/crates/bin/prover_cli/src/commands/stats.rs @@ -14,7 +14,7 @@ enum StatsPeriod { } #[derive(Args)] -pub(crate) struct Options { +pub struct Options { #[clap( short = 'p', long = "period", @@ -24,7 +24,7 @@ pub(crate) struct Options { period: StatsPeriod, } -pub(crate) async fn run(opts: Options, config: ProverCLIConfig) -> anyhow::Result<()> { +pub async fn run(opts: Options, config: ProverCLIConfig) -> anyhow::Result<()> { let prover_connection_pool = ConnectionPool::::singleton(config.db_url) .build() .await diff --git a/prover/crates/bin/prover_cli/src/commands/status/mod.rs b/prover/crates/bin/prover_cli/src/commands/status/mod.rs index b6df8680151b..574d7f7be23c 100644 --- a/prover/crates/bin/prover_cli/src/commands/status/mod.rs +++ b/prover/crates/bin/prover_cli/src/commands/status/mod.rs @@ -4,7 +4,7 @@ use crate::cli::ProverCLIConfig; pub(crate) mod batch; pub(crate) mod l1; -mod utils; +pub mod utils; #[derive(Subcommand)] pub enum StatusCommand { diff --git a/prover/crates/bin/prover_cli/src/main.rs b/prover/crates/bin/prover_cli/src/main.rs index b393fad6a31b..c334b2b2e1fb 100644 --- a/prover/crates/bin/prover_cli/src/main.rs +++ b/prover/crates/bin/prover_cli/src/main.rs @@ -1,4 +1,5 @@ -use prover_cli::{cli, config}; +use clap::Parser; +use prover_cli::{cli::ProverCLI, config}; #[tokio::main] async fn main() { @@ -14,7 +15,9 @@ async fn main() { }) .unwrap(); - match cli::start().await { + let prover = ProverCLI::parse(); + + match prover.start().await { Ok(_) => {} Err(err) => { tracing::error!("{err:?}"); diff --git a/prover/crates/bin/prover_cli/tests/batch.rs b/prover/crates/bin/prover_cli/tests/batch.rs new file mode 100644 index 000000000000..9e9060fe8837 --- /dev/null +++ b/prover/crates/bin/prover_cli/tests/batch.rs @@ -0,0 +1,1340 @@ +use assert_cmd::Command; +use circuit_definitions::zkevm_circuits::scheduler::aux::BaseLayerCircuitType; +use prover_cli::commands::status::utils::Status; +use zksync_prover_dal::{ + fri_witness_generator_dal::FriWitnessJobStatus, Connection, ConnectionPool, Prover, ProverDal, +}; +use zksync_types::{ + basic_fri_types::AggregationRound, + protocol_version::{L1VerifierConfig, ProtocolSemanticVersion}, + prover_dal::{ + ProofCompressionJobStatus, ProverJobStatus, ProverJobStatusInProgress, + ProverJobStatusSuccessful, WitnessJobStatus, WitnessJobStatusSuccessful, + }, + L1BatchNumber, +}; + +const NON_EXISTING_BATCH_STATUS_STDOUT: &str = "== Batch 10000 Status == +> No batch found. 🚫 +"; + +const MULTIPLE_NON_EXISTING_BATCHES_STATUS_STDOUT: &str = "== Batch 10000 Status == +> No batch found. 🚫 +== Batch 10001 Status == +> No batch found. 🚫 +"; + +const COMPLETE_BATCH_STATUS_STDOUT: &str = "== Batch 0 Status == +> Proof sent to server ✅ +"; + +#[test] +#[doc = "prover_cli status"] +fn pli_status_empty_fails() { + Command::cargo_bin("prover_cli") + .unwrap() + .arg("status") + .assert() + .failure(); +} + +#[test] +#[doc = "prover_cli status --help"] +fn pli_status_help_succeeds() { + Command::cargo_bin("prover_cli") + .unwrap() + .arg("status") + .arg("help") + .assert() + .success(); +} + +#[test] +#[doc = "prover_cli status batch"] +fn pli_status_batch_empty_fails() { + Command::cargo_bin("prover_cli") + .unwrap() + .arg("status") + .arg("batch") + .assert() + .failure(); +} + +#[test] +#[doc = "prover_cli status batch --help"] +fn pli_status_batch_help_succeeds() { + Command::cargo_bin("prover_cli") + .unwrap() + .arg("status") + .arg("batch") + .arg("--help") + .assert() + .success(); +} + +#[tokio::test] +#[doc = "prover_cli status batch -n 10000"] +async fn pli_status_of_non_existing_batch_succeeds() { + let connection_pool = ConnectionPool::::prover_test_pool().await; + let mut connection = connection_pool.connection().await.unwrap(); + + connection + .fri_protocol_versions_dal() + .save_prover_protocol_version( + ProtocolSemanticVersion::default(), + L1VerifierConfig::default(), + ) + .await; + + Command::cargo_bin("prover_cli") + .unwrap() + .arg(connection_pool.database_url().expose_str()) + .arg("status") + .arg("batch") + .args(["-n", "10000"]) + .assert() + .success() + .stdout(NON_EXISTING_BATCH_STATUS_STDOUT); +} + +#[tokio::test] +#[doc = "prover_cli status batch -n 10000 10001"] +async fn pli_status_of_multiple_non_existing_batch_succeeds() { + let connection_pool = ConnectionPool::::prover_test_pool().await; + let mut connection = connection_pool.connection().await.unwrap(); + + connection + .fri_protocol_versions_dal() + .save_prover_protocol_version( + ProtocolSemanticVersion::default(), + L1VerifierConfig::default(), + ) + .await; + + Command::cargo_bin("prover_cli") + .unwrap() + .arg(connection_pool.database_url().expose_str()) + .arg("status") + .arg("batch") + .args(["-n", "10000", "10001"]) + .assert() + .success() + .stdout(MULTIPLE_NON_EXISTING_BATCHES_STATUS_STDOUT); +} + +fn status_batch_0_expects(db_url: &str, expected_output: String) { + Command::cargo_bin("prover_cli") + .unwrap() + .arg(db_url) + .arg("status") + .arg("batch") + .args(["-n", "0"]) + .assert() + .success() + .stdout(expected_output); +} + +fn status_verbose_batch_0_expects(db_url: &str, expected_output: String) { + Command::cargo_bin("prover_cli") + .unwrap() + .arg(db_url) + .arg("status") + .arg("batch") + .args(["-n", "0", "--verbose"]) + .assert() + .success() + .stdout(expected_output); +} + +async fn insert_prover_job( + status: ProverJobStatus, + circuit_id: BaseLayerCircuitType, + aggregation_round: AggregationRound, + batch_number: L1BatchNumber, + sequence_number: usize, + connection: &mut Connection<'_, Prover>, +) { + connection + .fri_prover_jobs_dal() + .insert_prover_job( + batch_number, + circuit_id as u8, + 0, + sequence_number, + aggregation_round, + "", + false, + ProtocolSemanticVersion::default(), + ) + .await; + connection + .cli_test_dal() + .update_prover_job( + status, + circuit_id as u8, + aggregation_round as i64, + batch_number, + sequence_number, + ) + .await; +} + +async fn insert_bwg_job( + status: FriWitnessJobStatus, + batch_number: L1BatchNumber, + connection: &mut Connection<'_, Prover>, +) { + connection + .fri_witness_generator_dal() + .save_witness_inputs(batch_number, "", ProtocolSemanticVersion::default()) + .await; + connection + .fri_witness_generator_dal() + .mark_witness_job(status, batch_number) + .await; +} + +async fn insert_lwg_job( + status: WitnessJobStatus, + batch_number: L1BatchNumber, + circuit_id: BaseLayerCircuitType, + connection: &mut Connection<'_, Prover>, +) { + connection + .cli_test_dal() + .insert_lwg_job(status, batch_number, circuit_id as u8) + .await; +} + +async fn insert_nwg_job( + status: WitnessJobStatus, + batch_number: L1BatchNumber, + circuit_id: BaseLayerCircuitType, + connection: &mut Connection<'_, Prover>, +) { + connection + .cli_test_dal() + .insert_nwg_job(status, batch_number, circuit_id as u8) + .await; +} + +async fn insert_rt_job( + status: WitnessJobStatus, + batch_number: L1BatchNumber, + connection: &mut Connection<'_, Prover>, +) { + connection + .cli_test_dal() + .insert_rt_job(status, batch_number) + .await; +} + +async fn insert_scheduler_job( + status: WitnessJobStatus, + batch_number: L1BatchNumber, + connection: &mut Connection<'_, Prover>, +) { + connection + .cli_test_dal() + .insert_scheduler_job(status, batch_number) + .await; +} + +async fn insert_compressor_job( + status: ProofCompressionJobStatus, + batch_number: L1BatchNumber, + connection: &mut Connection<'_, Prover>, +) { + connection + .cli_test_dal() + .insert_compressor_job(status, batch_number) + .await; +} + +#[derive(Default)] +struct Scenario { + bwg_status: Option, + agg_0_prover_jobs_status: Option>, + lwg_status: Option>, + agg_1_prover_jobs_status: Option>, + nwg_status: Option>, + agg_2_prover_jobs_status: Option>, + rt_status: Option, + scheduler_status: Option, + compressor_status: Option, + batch_number: L1BatchNumber, +} + +impl Scenario { + fn new(batch_number: L1BatchNumber) -> Scenario { + Scenario { + batch_number, + ..Default::default() + } + } + fn add_bwg(mut self, status: FriWitnessJobStatus) -> Self { + self.bwg_status = Some(status); + self + } + + fn add_agg_0_prover_job( + mut self, + job_status: ProverJobStatus, + circuit_type: BaseLayerCircuitType, + sequence_number: usize, + ) -> Self { + if let Some(ref mut vec) = self.agg_0_prover_jobs_status { + vec.push((job_status, circuit_type, sequence_number)); + } else { + self.agg_0_prover_jobs_status = Some(vec![(job_status, circuit_type, sequence_number)]); + } + self + } + + fn add_lwg(mut self, job_status: WitnessJobStatus, circuit_type: BaseLayerCircuitType) -> Self { + if let Some(ref mut vec) = self.lwg_status { + vec.push((job_status, circuit_type)); + } else { + self.lwg_status = Some(vec![(job_status, circuit_type)]); + } + self + } + + fn add_agg_1_prover_job( + mut self, + job_status: ProverJobStatus, + circuit_type: BaseLayerCircuitType, + sequence_number: usize, + ) -> Self { + if let Some(ref mut vec) = self.agg_1_prover_jobs_status { + vec.push((job_status, circuit_type, sequence_number)); + } else { + self.agg_1_prover_jobs_status = Some(vec![(job_status, circuit_type, sequence_number)]); + } + self + } + + fn add_nwg(mut self, job_status: WitnessJobStatus, circuit_type: BaseLayerCircuitType) -> Self { + if let Some(ref mut vec) = self.nwg_status { + vec.push((job_status, circuit_type)); + } else { + self.nwg_status = Some(vec![(job_status, circuit_type)]); + } + self + } + + fn add_agg_2_prover_job( + mut self, + job_status: ProverJobStatus, + circuit_type: BaseLayerCircuitType, + sequence_number: usize, + ) -> Self { + if let Some(ref mut vec) = self.agg_2_prover_jobs_status { + vec.push((job_status, circuit_type, sequence_number)); + } else { + self.agg_2_prover_jobs_status = Some(vec![(job_status, circuit_type, sequence_number)]); + } + self + } + + fn add_rt(mut self, status: WitnessJobStatus) -> Self { + self.rt_status = Some(status); + self + } + + fn add_scheduler(mut self, status: WitnessJobStatus) -> Self { + self.scheduler_status = Some(status); + self + } + + fn add_compressor(mut self, status: ProofCompressionJobStatus) -> Self { + self.compressor_status = Some(status); + self + } +} + +#[allow(clippy::too_many_arguments)] +async fn load_scenario(scenario: Scenario, connection: &mut Connection<'_, Prover>) { + if let Some(status) = scenario.bwg_status { + insert_bwg_job(status, scenario.batch_number, connection).await; + } + if let Some(jobs) = scenario.agg_0_prover_jobs_status { + for (status, circuit_id, sequence_number) in jobs.into_iter() { + insert_prover_job( + status, + circuit_id, + AggregationRound::BasicCircuits, + scenario.batch_number, + sequence_number, + connection, + ) + .await; + } + } + if let Some(jobs) = scenario.lwg_status { + for (status, circuit_id) in jobs.into_iter() { + insert_lwg_job(status, scenario.batch_number, circuit_id, connection).await; + } + } + if let Some(jobs) = scenario.agg_1_prover_jobs_status { + for (status, circuit_id, sequence_number) in jobs.into_iter() { + insert_prover_job( + status, + circuit_id, + AggregationRound::LeafAggregation, + scenario.batch_number, + sequence_number, + connection, + ) + .await; + } + } + if let Some(jobs) = scenario.nwg_status { + for (status, circuit_id) in jobs.into_iter() { + insert_nwg_job(status, scenario.batch_number, circuit_id, connection).await; + } + } + if let Some(jobs) = scenario.agg_2_prover_jobs_status { + for (status, circuit_id, sequence_number) in jobs.into_iter() { + insert_prover_job( + status, + circuit_id, + AggregationRound::NodeAggregation, + scenario.batch_number, + sequence_number, + connection, + ) + .await; + } + } + if let Some(status) = scenario.rt_status { + insert_rt_job(status, scenario.batch_number, connection).await; + } + if let Some(status) = scenario.scheduler_status { + insert_scheduler_job(status, scenario.batch_number, connection).await; + } + if let Some(status) = scenario.compressor_status { + insert_compressor_job(status, scenario.batch_number, connection).await; + } +} + +#[allow(clippy::too_many_arguments)] +fn scenario_expected_stdout( + bwg_status: Status, + agg_0_prover_jobs_status: Option, + lwg_status: Status, + agg_1_prover_jobs_status: Option, + nwg_status: Status, + agg_2_prover_jobs_status: Option, + rt_status: Status, + scheduler_status: Status, + compressor_status: Status, + batch_number: L1BatchNumber, +) -> String { + let agg_0_prover_jobs_status = match agg_0_prover_jobs_status { + Some(status) => format!("\n> Prover Jobs: {}", status), + None => String::new(), + }; + let agg_1_prover_jobs_status = match agg_1_prover_jobs_status { + Some(status) => format!("\n> Prover Jobs: {}", status), + None => String::new(), + }; + let agg_2_prover_jobs_status = match agg_2_prover_jobs_status { + Some(status) => format!("\n> Prover Jobs: {}", status), + None => String::new(), + }; + + format!( + "== Batch {} Status == + +-- Aggregation Round 0 -- +Basic Witness Generator: {}{} + +-- Aggregation Round 1 -- +Leaf Witness Generator: {}{} + +-- Aggregation Round 2 -- +Node Witness Generator: {}{} + +-- Aggregation Round 3 -- +Recursion Tip: {} + +-- Aggregation Round 4 -- +Scheduler: {} + +-- Proof Compression -- +Compressor: {} +", + batch_number.0, + bwg_status, + agg_0_prover_jobs_status, + lwg_status, + agg_1_prover_jobs_status, + nwg_status, + agg_2_prover_jobs_status, + rt_status, + scheduler_status, + compressor_status + ) +} + +#[tokio::test] +async fn pli_status_complete() { + let connection_pool = ConnectionPool::::prover_test_pool().await; + let mut connection = connection_pool.connection().await.unwrap(); + + connection + .fri_protocol_versions_dal() + .save_prover_protocol_version( + ProtocolSemanticVersion::default(), + L1VerifierConfig::default(), + ) + .await; + + let batch_0 = L1BatchNumber(0); + + // A BWG is created for batch 0. + let scenario = Scenario::new(batch_0).add_bwg(FriWitnessJobStatus::Queued); + + load_scenario(scenario, &mut connection).await; + + status_batch_0_expects( + connection_pool.database_url().expose_str(), + scenario_expected_stdout( + Status::Queued, + None, + Status::JobsNotFound, + None, + Status::JobsNotFound, + None, + Status::JobsNotFound, + Status::JobsNotFound, + Status::JobsNotFound, + batch_0, + ), + ); + + // The BWS start, agg_round 0 prover jobs created. All WG set in wating for proofs. + let scenario = Scenario::new(batch_0) + .add_bwg(FriWitnessJobStatus::InProgress) + .add_agg_0_prover_job(ProverJobStatus::Queued, BaseLayerCircuitType::VM, 1) + .add_agg_0_prover_job(ProverJobStatus::Queued, BaseLayerCircuitType::VM, 2) + .add_agg_0_prover_job( + ProverJobStatus::Queued, + BaseLayerCircuitType::DecommitmentsFilter, + 1, + ) + .add_lwg(WitnessJobStatus::WaitingForProofs, BaseLayerCircuitType::VM) + .add_lwg( + WitnessJobStatus::WaitingForProofs, + BaseLayerCircuitType::DecommitmentsFilter, + ) + .add_nwg(WitnessJobStatus::WaitingForProofs, BaseLayerCircuitType::VM) + .add_nwg( + WitnessJobStatus::WaitingForProofs, + BaseLayerCircuitType::DecommitmentsFilter, + ) + .add_rt(WitnessJobStatus::WaitingForProofs) + .add_scheduler(WitnessJobStatus::WaitingForProofs); + load_scenario(scenario, &mut connection).await; + + status_batch_0_expects( + connection_pool.database_url().expose_str(), + scenario_expected_stdout( + Status::InProgress, + Some(Status::Queued), + Status::WaitingForProofs, + None, + Status::WaitingForProofs, + None, + Status::WaitingForProofs, + Status::WaitingForProofs, + Status::JobsNotFound, + batch_0, + ), + ); + + // The BWS done, agg_round 0 prover jobs in progress. + let scenario = Scenario::new(batch_0) + .add_bwg(FriWitnessJobStatus::Successful) + .add_agg_0_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + 1, + ) + .add_agg_0_prover_job( + ProverJobStatus::InProgress(ProverJobStatusInProgress::default()), + BaseLayerCircuitType::VM, + 2, + ) + .add_agg_0_prover_job( + ProverJobStatus::Queued, + BaseLayerCircuitType::DecommitmentsFilter, + 1, + ); + load_scenario(scenario, &mut connection).await; + + status_batch_0_expects( + connection_pool.database_url().expose_str(), + scenario_expected_stdout( + Status::Successful, + Some(Status::InProgress), + Status::WaitingForProofs, + None, + Status::WaitingForProofs, + None, + Status::WaitingForProofs, + Status::WaitingForProofs, + Status::JobsNotFound, + batch_0, + ), + ); + + // Agg_round 0, prover jobs done for VM circuit, LWG set in queue. + let scenario = Scenario::new(batch_0) + .add_agg_0_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + 2, + ) + .add_agg_0_prover_job( + ProverJobStatus::InProgress(ProverJobStatusInProgress::default()), + BaseLayerCircuitType::DecommitmentsFilter, + 1, + ) + .add_lwg(WitnessJobStatus::Queued, BaseLayerCircuitType::VM); + load_scenario(scenario, &mut connection).await; + + status_batch_0_expects( + connection_pool.database_url().expose_str(), + scenario_expected_stdout( + Status::Successful, + Some(Status::InProgress), + Status::Queued, + None, + Status::WaitingForProofs, + None, + Status::WaitingForProofs, + Status::WaitingForProofs, + Status::JobsNotFound, + batch_0, + ), + ); + + // Agg_round 0: all prover jobs successful, LWG in progress. Agg_round 1: prover jobs in queue. + let scenario = Scenario::new(batch_0) + .add_agg_0_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::DecommitmentsFilter, + 1, + ) + .add_lwg( + WitnessJobStatus::Successful(WitnessJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + ) + .add_lwg( + WitnessJobStatus::InProgress, + BaseLayerCircuitType::DecommitmentsFilter, + ) + .add_agg_1_prover_job(ProverJobStatus::Queued, BaseLayerCircuitType::VM, 1) + .add_agg_1_prover_job(ProverJobStatus::Queued, BaseLayerCircuitType::VM, 2); + load_scenario(scenario, &mut connection).await; + + status_batch_0_expects( + connection_pool.database_url().expose_str(), + scenario_expected_stdout( + Status::Successful, + Some(Status::Successful), + Status::InProgress, + Some(Status::Queued), + Status::WaitingForProofs, + None, + Status::WaitingForProofs, + Status::WaitingForProofs, + Status::JobsNotFound, + batch_0, + ), + ); + + // LWG succees. Agg_round 1: Done for VM circuit. + let scenario = Scenario::new(batch_0) + .add_lwg( + WitnessJobStatus::Successful(WitnessJobStatusSuccessful::default()), + BaseLayerCircuitType::DecommitmentsFilter, + ) + .add_agg_1_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + 1, + ) + .add_agg_1_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + 2, + ) + .add_agg_1_prover_job( + ProverJobStatus::InProgress(ProverJobStatusInProgress::default()), + BaseLayerCircuitType::DecommitmentsFilter, + 1, + ); + load_scenario(scenario, &mut connection).await; + + status_batch_0_expects( + connection_pool.database_url().expose_str(), + scenario_expected_stdout( + Status::Successful, + Some(Status::Successful), + Status::Successful, + Some(Status::InProgress), + Status::WaitingForProofs, + None, + Status::WaitingForProofs, + Status::WaitingForProofs, + Status::JobsNotFound, + batch_0, + ), + ); + + // Agg_round 1: all prover jobs successful. NWG queue. + let scenario = Scenario::new(batch_0) + .add_agg_1_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::DecommitmentsFilter, + 1, + ) + .add_nwg(WitnessJobStatus::Queued, BaseLayerCircuitType::VM) + .add_nwg( + WitnessJobStatus::Queued, + BaseLayerCircuitType::DecommitmentsFilter, + ); + load_scenario(scenario, &mut connection).await; + + status_batch_0_expects( + connection_pool.database_url().expose_str(), + scenario_expected_stdout( + Status::Successful, + Some(Status::Successful), + Status::Successful, + Some(Status::Successful), + Status::Queued, + None, + Status::WaitingForProofs, + Status::WaitingForProofs, + Status::JobsNotFound, + batch_0, + ), + ); + + // NWG successful for VM circuit, agg_round 2 prover jobs created. + let scenario = Scenario::new(batch_0) + .add_nwg( + WitnessJobStatus::Successful(WitnessJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + ) + .add_nwg( + WitnessJobStatus::InProgress, + BaseLayerCircuitType::DecommitmentsFilter, + ) + .add_agg_2_prover_job(ProverJobStatus::Queued, BaseLayerCircuitType::VM, 1); + load_scenario(scenario, &mut connection).await; + + status_batch_0_expects( + connection_pool.database_url().expose_str(), + scenario_expected_stdout( + Status::Successful, + Some(Status::Successful), + Status::Successful, + Some(Status::Successful), + Status::InProgress, + Some(Status::Queued), + Status::WaitingForProofs, + Status::WaitingForProofs, + Status::JobsNotFound, + batch_0, + ), + ); + + // NWG successful, agg_round 2 prover jobs updated. + let scenario = Scenario::new(batch_0) + .add_nwg( + WitnessJobStatus::Successful(WitnessJobStatusSuccessful::default()), + BaseLayerCircuitType::DecommitmentsFilter, + ) + .add_agg_2_prover_job( + ProverJobStatus::InProgress(ProverJobStatusInProgress::default()), + BaseLayerCircuitType::VM, + 1, + ) + .add_agg_2_prover_job( + ProverJobStatus::Queued, + BaseLayerCircuitType::DecommitmentsFilter, + 1, + ); + load_scenario(scenario, &mut connection).await; + + status_batch_0_expects( + connection_pool.database_url().expose_str(), + scenario_expected_stdout( + Status::Successful, + Some(Status::Successful), + Status::Successful, + Some(Status::Successful), + Status::Successful, + Some(Status::InProgress), + Status::WaitingForProofs, + Status::WaitingForProofs, + Status::JobsNotFound, + batch_0, + ), + ); + + // Agg_round 2 prover jobs successful. RT in progress. + let scenario = Scenario::new(batch_0) + .add_agg_2_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + 1, + ) + .add_agg_2_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::DecommitmentsFilter, + 1, + ) + .add_rt(WitnessJobStatus::InProgress); + load_scenario(scenario, &mut connection).await; + + status_batch_0_expects( + connection_pool.database_url().expose_str(), + scenario_expected_stdout( + Status::Successful, + Some(Status::Successful), + Status::Successful, + Some(Status::Successful), + Status::Successful, + Some(Status::Successful), + Status::InProgress, + Status::WaitingForProofs, + Status::JobsNotFound, + batch_0, + ), + ); + + // RT in successful, Scheduler in progress. + let scenario = Scenario::new(batch_0) + .add_rt(WitnessJobStatus::Successful( + WitnessJobStatusSuccessful::default(), + )) + .add_scheduler(WitnessJobStatus::InProgress); + load_scenario(scenario, &mut connection).await; + + status_batch_0_expects( + connection_pool.database_url().expose_str(), + scenario_expected_stdout( + Status::Successful, + Some(Status::Successful), + Status::Successful, + Some(Status::Successful), + Status::Successful, + Some(Status::Successful), + Status::Successful, + Status::InProgress, + Status::JobsNotFound, + batch_0, + ), + ); + + // Scheduler in successful, Compressor in progress. + let scenario = Scenario::new(batch_0) + .add_scheduler(WitnessJobStatus::Successful( + WitnessJobStatusSuccessful::default(), + )) + .add_compressor(ProofCompressionJobStatus::InProgress); + load_scenario(scenario, &mut connection).await; + + status_batch_0_expects( + connection_pool.database_url().expose_str(), + scenario_expected_stdout( + Status::Successful, + Some(Status::Successful), + Status::Successful, + Some(Status::Successful), + Status::Successful, + Some(Status::Successful), + Status::Successful, + Status::Successful, + Status::InProgress, + batch_0, + ), + ); + + // Compressor Done. + let scenario = Scenario::new(batch_0).add_compressor(ProofCompressionJobStatus::SentToServer); + load_scenario(scenario, &mut connection).await; + + status_batch_0_expects( + connection_pool.database_url().expose_str(), + COMPLETE_BATCH_STATUS_STDOUT.into(), + ); +} + +#[tokio::test] +async fn pli_status_complete_verbose() { + let connection_pool = ConnectionPool::::prover_test_pool().await; + let mut connection = connection_pool.connection().await.unwrap(); + + connection + .fri_protocol_versions_dal() + .save_prover_protocol_version( + ProtocolSemanticVersion::default(), + L1VerifierConfig::default(), + ) + .await; + + let batch_0 = L1BatchNumber(0); + + let scenario = Scenario::new(batch_0) + .add_bwg(FriWitnessJobStatus::Successful) + .add_agg_0_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + 1, + ) + .add_agg_0_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + 2, + ) + .add_agg_0_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + 3, + ) + .add_agg_0_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::DecommitmentsFilter, + 1, + ) + .add_agg_0_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::DecommitmentsFilter, + 2, + ) + .add_agg_0_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::DecommitmentsFilter, + 3, + ) + .add_agg_0_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::Decommiter, + 1, + ) + .add_agg_0_prover_job( + ProverJobStatus::InProgress(ProverJobStatusInProgress::default()), + BaseLayerCircuitType::Decommiter, + 2, + ) + .add_agg_0_prover_job(ProverJobStatus::Queued, BaseLayerCircuitType::Decommiter, 3) + .add_agg_0_prover_job( + ProverJobStatus::Queued, + BaseLayerCircuitType::LogDemultiplexer, + 1, + ) + .add_agg_0_prover_job( + ProverJobStatus::Queued, + BaseLayerCircuitType::LogDemultiplexer, + 2, + ) + .add_agg_0_prover_job( + ProverJobStatus::Queued, + BaseLayerCircuitType::LogDemultiplexer, + 3, + ) + .add_lwg(WitnessJobStatus::WaitingForProofs, BaseLayerCircuitType::VM) + .add_lwg( + WitnessJobStatus::WaitingForProofs, + BaseLayerCircuitType::DecommitmentsFilter, + ) + .add_lwg( + WitnessJobStatus::WaitingForProofs, + BaseLayerCircuitType::Decommiter, + ) + .add_lwg( + WitnessJobStatus::WaitingForProofs, + BaseLayerCircuitType::LogDemultiplexer, + ) + .add_nwg(WitnessJobStatus::WaitingForProofs, BaseLayerCircuitType::VM) + .add_nwg( + WitnessJobStatus::WaitingForProofs, + BaseLayerCircuitType::DecommitmentsFilter, + ) + .add_nwg( + WitnessJobStatus::WaitingForProofs, + BaseLayerCircuitType::Decommiter, + ) + .add_nwg( + WitnessJobStatus::WaitingForProofs, + BaseLayerCircuitType::LogDemultiplexer, + ) + .add_rt(WitnessJobStatus::WaitingForProofs) + .add_scheduler(WitnessJobStatus::WaitingForProofs); + load_scenario(scenario, &mut connection).await; + + status_verbose_batch_0_expects( + connection_pool.database_url().expose_str(), + "== Batch 0 Status == + +-- Aggregation Round 0 -- +> Basic Witness Generator: Successful ✅ +v Prover Jobs: In Progress ⌛️ + > VM: Successful ✅ + > DecommitmentsFilter: Successful ✅ + > Decommiter: In Progress ⌛️ + - Total jobs: 3 + - Successful: 1 + - In Progress: 1 + - Queued: 1 + - Failed: 0 + > LogDemultiplexer: Queued 📥 + +-- Aggregation Round 1 -- + > Leaf Witness Generator: Waiting for Proof ⏱️ + +-- Aggregation Round 2 -- + > Node Witness Generator: Waiting for Proof ⏱️ + +-- Aggregation Round 3 -- + > Recursion Tip: Waiting for Proof ⏱️ + +-- Aggregation Round 4 -- + > Scheduler: Waiting for Proof ⏱️ + +-- Proof Compression -- + > Compressor: Jobs not found 🚫 +" + .into(), + ); + + let scenario = Scenario::new(batch_0) + .add_agg_0_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::Decommiter, + 2, + ) + .add_agg_0_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::Decommiter, + 3, + ) + .add_agg_0_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::LogDemultiplexer, + 1, + ) + .add_agg_0_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::LogDemultiplexer, + 2, + ) + .add_agg_0_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::LogDemultiplexer, + 3, + ) + .add_lwg( + WitnessJobStatus::Successful(WitnessJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + ) + .add_lwg( + WitnessJobStatus::Successful(WitnessJobStatusSuccessful::default()), + BaseLayerCircuitType::DecommitmentsFilter, + ) + .add_lwg( + WitnessJobStatus::InProgress, + BaseLayerCircuitType::Decommiter, + ) + .add_lwg( + WitnessJobStatus::Queued, + BaseLayerCircuitType::LogDemultiplexer, + ) + .add_agg_1_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + 1, + ) + .add_agg_1_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + 2, + ) + .add_agg_1_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + 3, + ) + .add_agg_1_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + 4, + ) + .add_agg_1_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::DecommitmentsFilter, + 1, + ) + .add_agg_1_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::DecommitmentsFilter, + 2, + ) + .add_agg_1_prover_job( + ProverJobStatus::InProgress(ProverJobStatusInProgress::default()), + BaseLayerCircuitType::DecommitmentsFilter, + 3, + ) + .add_agg_1_prover_job( + ProverJobStatus::InProgress(ProverJobStatusInProgress::default()), + BaseLayerCircuitType::Decommiter, + 1, + ) + .add_agg_1_prover_job(ProverJobStatus::Queued, BaseLayerCircuitType::Decommiter, 2) + .add_agg_1_prover_job( + ProverJobStatus::InProgress(ProverJobStatusInProgress::default()), + BaseLayerCircuitType::Decommiter, + 3, + ) + .add_nwg(WitnessJobStatus::Queued, BaseLayerCircuitType::VM); + load_scenario(scenario, &mut connection).await; + + status_verbose_batch_0_expects( + connection_pool.database_url().expose_str(), + "== Batch 0 Status == + +-- Aggregation Round 0 -- +> Basic Witness Generator: Successful ✅ +> Prover Jobs: Successful ✅ + +-- Aggregation Round 1 -- +v Leaf Witness Generator: In Progress ⌛️ + > VM: Successful ✅ + > DecommitmentsFilter: Successful ✅ + > Decommiter: In Progress ⌛️ + > LogDemultiplexer: Queued 📥 +v Prover Jobs: In Progress ⌛️ + > VM: Successful ✅ + > DecommitmentsFilter: In Progress ⌛️ + - Total jobs: 3 + - Successful: 2 + - In Progress: 1 + - Queued: 0 + - Failed: 0 + > Decommiter: In Progress ⌛️ + - Total jobs: 3 + - Successful: 0 + - In Progress: 2 + - Queued: 1 + - Failed: 0 + +-- Aggregation Round 2 -- + > Node Witness Generator: Queued 📥 + +-- Aggregation Round 3 -- + > Recursion Tip: Waiting for Proof ⏱️ + +-- Aggregation Round 4 -- + > Scheduler: Waiting for Proof ⏱️ + +-- Proof Compression -- + > Compressor: Jobs not found 🚫 +" + .into(), + ); + + let scenario = Scenario::new(batch_0) + .add_lwg( + WitnessJobStatus::Successful(WitnessJobStatusSuccessful::default()), + BaseLayerCircuitType::Decommiter, + ) + .add_lwg( + WitnessJobStatus::Successful(WitnessJobStatusSuccessful::default()), + BaseLayerCircuitType::LogDemultiplexer, + ) + .add_agg_1_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::DecommitmentsFilter, + 3, + ) + .add_agg_1_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::Decommiter, + 1, + ) + .add_agg_1_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::Decommiter, + 2, + ) + .add_agg_1_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::Decommiter, + 3, + ) + .add_nwg( + WitnessJobStatus::Successful(WitnessJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + ) + .add_nwg( + WitnessJobStatus::Successful(WitnessJobStatusSuccessful::default()), + BaseLayerCircuitType::DecommitmentsFilter, + ) + .add_nwg( + WitnessJobStatus::InProgress, + BaseLayerCircuitType::Decommiter, + ) + .add_nwg( + WitnessJobStatus::Queued, + BaseLayerCircuitType::LogDemultiplexer, + ) + .add_agg_2_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::VM, + 1, + ) + .add_agg_2_prover_job( + ProverJobStatus::InProgress(ProverJobStatusInProgress::default()), + BaseLayerCircuitType::DecommitmentsFilter, + 1, + ); + load_scenario(scenario, &mut connection).await; + + status_verbose_batch_0_expects( + connection_pool.database_url().expose_str(), + "== Batch 0 Status == + +-- Aggregation Round 0 -- +> Basic Witness Generator: Successful ✅ +> Prover Jobs: Successful ✅ + +-- Aggregation Round 1 -- +> Leaf Witness Generator: Successful ✅ +> Prover Jobs: Successful ✅ + +-- Aggregation Round 2 -- +v Node Witness Generator: In Progress ⌛️ + > VM: Successful ✅ + > DecommitmentsFilter: Successful ✅ + > Decommiter: In Progress ⌛️ + > LogDemultiplexer: Queued 📥 +v Prover Jobs: In Progress ⌛️ + > VM: Successful ✅ + > DecommitmentsFilter: In Progress ⌛️ + - Total jobs: 1 + - Successful: 0 + - In Progress: 1 + - Queued: 0 + - Failed: 0 + +-- Aggregation Round 3 -- + > Recursion Tip: Waiting for Proof ⏱️ + +-- Aggregation Round 4 -- + > Scheduler: Waiting for Proof ⏱️ + +-- Proof Compression -- + > Compressor: Jobs not found 🚫 +" + .into(), + ); + + let scenario = Scenario::new(batch_0) + .add_nwg( + WitnessJobStatus::Successful(WitnessJobStatusSuccessful::default()), + BaseLayerCircuitType::Decommiter, + ) + .add_nwg( + WitnessJobStatus::Successful(WitnessJobStatusSuccessful::default()), + BaseLayerCircuitType::LogDemultiplexer, + ) + .add_agg_2_prover_job( + ProverJobStatus::Successful(ProverJobStatusSuccessful::default()), + BaseLayerCircuitType::DecommitmentsFilter, + 1, + ) + .add_rt(WitnessJobStatus::InProgress); + load_scenario(scenario, &mut connection).await; + + status_verbose_batch_0_expects( + connection_pool.database_url().expose_str(), + "== Batch 0 Status == + +-- Aggregation Round 0 -- +> Basic Witness Generator: Successful ✅ +> Prover Jobs: Successful ✅ + +-- Aggregation Round 1 -- +> Leaf Witness Generator: Successful ✅ +> Prover Jobs: Successful ✅ + +-- Aggregation Round 2 -- +> Node Witness Generator: Successful ✅ +> Prover Jobs: Successful ✅ + +-- Aggregation Round 3 -- +v Recursion Tip: In Progress ⌛️ + +-- Aggregation Round 4 -- + > Scheduler: Waiting for Proof ⏱️ + +-- Proof Compression -- + > Compressor: Jobs not found 🚫 +" + .into(), + ); + + let scenario = Scenario::new(batch_0) + .add_rt(WitnessJobStatus::Successful( + WitnessJobStatusSuccessful::default(), + )) + .add_scheduler(WitnessJobStatus::InProgress); + load_scenario(scenario, &mut connection).await; + + status_verbose_batch_0_expects( + connection_pool.database_url().expose_str(), + "== Batch 0 Status == + +-- Aggregation Round 0 -- +> Basic Witness Generator: Successful ✅ +> Prover Jobs: Successful ✅ + +-- Aggregation Round 1 -- +> Leaf Witness Generator: Successful ✅ +> Prover Jobs: Successful ✅ + +-- Aggregation Round 2 -- +> Node Witness Generator: Successful ✅ +> Prover Jobs: Successful ✅ + +-- Aggregation Round 3 -- +> Recursion Tip: Successful ✅ + +-- Aggregation Round 4 -- +v Scheduler: In Progress ⌛️ + +-- Proof Compression -- + > Compressor: Jobs not found 🚫 +" + .into(), + ); + + let scenario = Scenario::new(batch_0) + .add_scheduler(WitnessJobStatus::Successful( + WitnessJobStatusSuccessful::default(), + )) + .add_compressor(ProofCompressionJobStatus::SentToServer); + load_scenario(scenario, &mut connection).await; + + status_batch_0_expects( + connection_pool.database_url().expose_str(), + COMPLETE_BATCH_STATUS_STDOUT.into(), + ); +} diff --git a/prover/crates/bin/prover_cli/tests/cli.rs b/prover/crates/bin/prover_cli/tests/cli.rs new file mode 100644 index 000000000000..4a68491f09be --- /dev/null +++ b/prover/crates/bin/prover_cli/tests/cli.rs @@ -0,0 +1,42 @@ +use assert_cmd::Command; +use zksync_dal::ConnectionPool; +use zksync_prover_dal::{Prover, ProverDal}; +use zksync_types::protocol_version::{L1VerifierConfig, ProtocolSemanticVersion}; + +#[test] +#[doc = "prover_cli"] +fn pli_empty_fails() { + Command::cargo_bin("prover_cli").unwrap().assert().failure(); +} + +#[test] +#[doc = "prover_cli"] +fn pli_help_succeeds() { + Command::cargo_bin("prover_cli") + .unwrap() + .arg("help") + .assert() + .success(); +} + +#[tokio::test] +#[doc = "prover_cli config"] +async fn pli_config_succeeds() { + let connection_pool = ConnectionPool::::prover_test_pool().await; + let mut connection = connection_pool.connection().await.unwrap(); + + connection + .fri_protocol_versions_dal() + .save_prover_protocol_version( + ProtocolSemanticVersion::default(), + L1VerifierConfig::default(), + ) + .await; + + Command::cargo_bin("prover_cli") + .unwrap() + .arg("config") + .arg(connection_pool.database_url().expose_str()) + .assert() + .success(); +} diff --git a/prover/crates/lib/prover_dal/src/cli_test_dal.rs b/prover/crates/lib/prover_dal/src/cli_test_dal.rs new file mode 100644 index 000000000000..474c84c53fd5 --- /dev/null +++ b/prover/crates/lib/prover_dal/src/cli_test_dal.rs @@ -0,0 +1,173 @@ +use zksync_basic_types::{ + prover_dal::{ProofCompressionJobStatus, ProverJobStatus, WitnessJobStatus}, + L1BatchNumber, +}; +use zksync_db_connection::connection::Connection; + +use crate::Prover; + +#[derive(Debug)] +pub struct CliTestDal<'a, 'c> { + pub storage: &'a mut Connection<'c, Prover>, +} + +impl CliTestDal<'_, '_> { + pub async fn update_prover_job( + &mut self, + status: ProverJobStatus, + circuit_id: u8, + aggregation_round: i64, + batch_number: L1BatchNumber, + sequence_number: usize, + ) { + sqlx::query(&format!( + "UPDATE prover_jobs_fri SET status = '{}' + WHERE l1_batch_number = {} + AND sequence_number = {} + AND aggregation_round = {} + AND circuit_id = {}", + status, batch_number.0, sequence_number, aggregation_round, circuit_id, + )) + .execute(self.storage.conn()) + .await + .unwrap(); + } + + pub async fn insert_lwg_job( + &mut self, + status: WitnessJobStatus, + batch_number: L1BatchNumber, + circuit_id: u8, + ) { + sqlx::query(&format!( + " + INSERT INTO + leaf_aggregation_witness_jobs_fri ( + l1_batch_number, + circuit_id, + status, + number_of_basic_circuits, + created_at, + updated_at + ) + VALUES + ({}, {}, 'waiting_for_proofs', 2, NOW(), NOW()) + ON CONFLICT (l1_batch_number, circuit_id) DO + UPDATE + SET status = '{}' + ", + batch_number.0, circuit_id, status + )) + .execute(self.storage.conn()) + .await + .unwrap(); + } + + pub async fn insert_nwg_job( + &mut self, + status: WitnessJobStatus, + batch_number: L1BatchNumber, + circuit_id: u8, + ) { + sqlx::query(&format!( + " + INSERT INTO + node_aggregation_witness_jobs_fri ( + l1_batch_number, + circuit_id, + status, + created_at, + updated_at + ) + VALUES + ({}, {}, 'waiting_for_proofs', NOW(), NOW()) + ON CONFLICT (l1_batch_number, circuit_id, depth) DO + UPDATE + SET status = '{}' + ", + batch_number.0, circuit_id, status, + )) + .execute(self.storage.conn()) + .await + .unwrap(); + } + + pub async fn insert_rt_job(&mut self, status: WitnessJobStatus, batch_number: L1BatchNumber) { + sqlx::query(&format!( + " + INSERT INTO + recursion_tip_witness_jobs_fri ( + l1_batch_number, + status, + number_of_final_node_jobs, + created_at, + updated_at + ) + VALUES + ({}, 'waiting_for_proofs',1, NOW(), NOW()) + ON CONFLICT (l1_batch_number) DO + UPDATE + SET status = '{}' + ", + batch_number.0, status, + )) + .execute(self.storage.conn()) + .await + .unwrap(); + } + + pub async fn insert_scheduler_job( + &mut self, + status: WitnessJobStatus, + batch_number: L1BatchNumber, + ) { + sqlx::query(&format!( + " + INSERT INTO + scheduler_witness_jobs_fri ( + l1_batch_number, + scheduler_partial_input_blob_url, + status, + created_at, + updated_at + ) + VALUES + ({}, '', 'waiting_for_proofs', NOW(), NOW()) + ON CONFLICT (l1_batch_number) DO + UPDATE + SET status = '{}' + ", + batch_number.0, status, + )) + .execute(self.storage.conn()) + .await + .unwrap(); + } + + pub async fn insert_compressor_job( + &mut self, + status: ProofCompressionJobStatus, + batch_number: L1BatchNumber, + ) { + sqlx::query(&format!( + " + INSERT INTO + proof_compression_jobs_fri ( + l1_batch_number, + status, + created_at, + updated_at + ) + VALUES + ({}, '{}', NOW(), NOW()) + ON CONFLICT (l1_batch_number) DO + UPDATE + SET status = '{}' + ", + batch_number.0, status, status, + )) + .execute(self.storage.conn()) + .await + .unwrap(); + } +} diff --git a/prover/crates/lib/prover_dal/src/lib.rs b/prover/crates/lib/prover_dal/src/lib.rs index bb552b899e90..85fcc260aa8d 100644 --- a/prover/crates/lib/prover_dal/src/lib.rs +++ b/prover/crates/lib/prover_dal/src/lib.rs @@ -6,12 +6,13 @@ pub use zksync_db_connection::{ }; use crate::{ - fri_gpu_prover_queue_dal::FriGpuProverQueueDal, + cli_test_dal::CliTestDal, fri_gpu_prover_queue_dal::FriGpuProverQueueDal, fri_proof_compressor_dal::FriProofCompressorDal, fri_protocol_versions_dal::FriProtocolVersionsDal, fri_prover_dal::FriProverDal, fri_witness_generator_dal::FriWitnessGeneratorDal, }; +pub mod cli_test_dal; pub mod fri_gpu_prover_queue_dal; pub mod fri_proof_compressor_dal; pub mod fri_protocol_versions_dal; @@ -29,6 +30,8 @@ pub trait ProverDal<'a>: private::Sealed where Self: 'a, { + fn cli_test_dal(&mut self) -> CliTestDal<'_, 'a>; + fn fri_witness_generator_dal(&mut self) -> FriWitnessGeneratorDal<'_, 'a>; fn fri_prover_jobs_dal(&mut self) -> FriProverDal<'_, 'a>; @@ -68,4 +71,7 @@ impl<'a> ProverDal<'a> for Connection<'a, Prover> { fn fri_proof_compressor_dal(&mut self) -> FriProofCompressorDal<'_, 'a> { FriProofCompressorDal { storage: self } } + fn cli_test_dal(&mut self) -> CliTestDal<'_, 'a> { + CliTestDal { storage: self } + } }