From ab31f031dfcaa7ddf296786ddccb78e8edd2d3c5 Mon Sep 17 00:00:00 2001 From: perekopskiy <53865202+perekopskiy@users.noreply.github.com> Date: Fri, 10 Nov 2023 12:18:04 +0200 Subject: [PATCH] feat(job-processor): report attempts metrics (#448) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Every component implementing JobProcessor now exposes two metrics: `job_attempts` -- heatmaps in grafana will be added displaying this metric. `max_attempts_reached` -- alert will fire when this metric is increased (alert rules not added yet). Also, `attempts` columns in DB were increased in both `get_next*` methods and `requeue_stuck_jobs`, so it was basically `attempts += 2` on retry. I removed `attempts` update in `requeue_stuck_jobs`. ## Why ❔ Better monitoring and alerting ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. --- Cargo.lock | 1 + core/bin/contract-verifier/src/verifier.rs | 8 + core/lib/dal/sqlx-data.json | 614 +++++++++++------- .../src/basic_witness_input_producer_dal.rs | 18 +- core/lib/dal/src/fri_proof_compressor_dal.rs | 23 +- core/lib/dal/src/fri_prover_dal.rs | 14 +- core/lib/dal/src/fri_witness_generator_dal.rs | 84 ++- core/lib/dal/src/prover_dal.rs | 2 +- core/lib/queued_job_processor/Cargo.toml | 1 + core/lib/queued_job_processor/src/lib.rs | 35 +- .../src/basic_witness_input_producer/mod.rs | 20 +- .../src/witness_generator/basic_circuits.rs | 9 + .../src/witness_generator/leaf_aggregation.rs | 9 + .../src/witness_generator/node_aggregation.rs | 9 + .../src/witness_generator/scheduler.rs | 9 + prover/Cargo.lock | 1 + .../src/circuit_synthesizer.rs | 9 + prover/proof_fri_compressor/src/compressor.rs | 21 + prover/proof_fri_compressor/src/main.rs | 1 + .../src/gpu_prover_job_processor.rs | 18 + prover/prover_fri/src/prover_job_processor.rs | 18 + .../witness_generator/src/basic_circuits.rs | 19 + .../witness_generator/src/leaf_aggregation.rs | 19 +- prover/witness_generator/src/main.rs | 2 + .../witness_generator/src/node_aggregation.rs | 22 + prover/witness_generator/src/scheduler.rs | 22 + .../witness_vector_generator/src/generator.rs | 21 + prover/witness_vector_generator/src/main.rs | 4 +- 28 files changed, 775 insertions(+), 258 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 953001c820a4..699972002e43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8279,6 +8279,7 @@ dependencies = [ "async-trait", "tokio", "tracing", + "vise", "zksync_utils", ] diff --git a/core/bin/contract-verifier/src/verifier.rs b/core/bin/contract-verifier/src/verifier.rs index 03e94dde75ce..e34b4784c1cb 100644 --- a/core/bin/contract-verifier/src/verifier.rs +++ b/core/bin/contract-verifier/src/verifier.rs @@ -524,4 +524,12 @@ impl JobProcessor for ContractVerifier { // Do nothing Ok(()) } + + fn max_attempts(&self) -> u32 { + u32::MAX + } + + async fn get_job_attempts(&self, _job_id: &Self::JobId) -> anyhow::Result { + Ok(1) + } } diff --git a/core/lib/dal/sqlx-data.json b/core/lib/dal/sqlx-data.json index 1527f2e5a754..79758c4274fb 100644 --- a/core/lib/dal/sqlx-data.json +++ b/core/lib/dal/sqlx-data.json @@ -2196,6 +2196,26 @@ }, "query": "UPDATE contract_verification_requests SET status = 'successful', updated_at = now() WHERE id = $1" }, + "29f7f469cd58b256237536463f1e9d58438314fd1fe733a6bb53e6523f78bb49": { + "describe": { + "columns": [ + { + "name": "attempts", + "ordinal": 0, + "type_info": "Int2" + } + ], + "nullable": [ + false + ], + "parameters": { + "Left": [ + "Int8" + ] + } + }, + "query": "SELECT attempts FROM prover_jobs_fri WHERE id = $1" + }, "2a38561e789af470d6ef1a905143f2d8d102b4ff23cebe97586681da9e4084a9": { "describe": { "columns": [ @@ -2321,6 +2341,39 @@ }, "query": "SELECT number FROM l1_batches LEFT JOIN eth_txs_history as execute_tx ON (l1_batches.eth_execute_tx_id = execute_tx.eth_tx_id) WHERE execute_tx.confirmed_at IS NOT NULL ORDER BY number DESC LIMIT 1" }, + "2af0eddab563f0800a4762031e8703dbcac11450daacf3439289641b9b179b1c": { + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Int8" + }, + { + "name": "status", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "attempts", + "ordinal": 2, + "type_info": "Int2" + } + ], + "nullable": [ + false, + false, + false + ], + "parameters": { + "Left": [ + "Interval", + "Int2" + ] + } + }, + "query": "\n UPDATE node_aggregation_witness_jobs_fri\n SET status = 'queued', updated_at = now(), processing_started_at = now()\n WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'failed' AND attempts < $2)\n RETURNING id, status, attempts\n " + }, "2b22e7d15adf069c8e68954059b83f71a71350f3325b4280840c4be7e54a319f": { "describe": { "columns": [ @@ -2387,6 +2440,26 @@ }, "query": "\n UPDATE leaf_aggregation_witness_jobs_fri\n SET status='queued'\n WHERE (l1_batch_number, circuit_id) IN\n (SELECT prover_jobs_fri.l1_batch_number, prover_jobs_fri.circuit_id\n FROM prover_jobs_fri\n JOIN leaf_aggregation_witness_jobs_fri lawj ON\n prover_jobs_fri.l1_batch_number = lawj.l1_batch_number\n AND prover_jobs_fri.circuit_id = lawj.circuit_id\n WHERE lawj.status = 'waiting_for_proofs'\n AND prover_jobs_fri.status = 'successful'\n AND prover_jobs_fri.aggregation_round = 0\n GROUP BY prover_jobs_fri.l1_batch_number, prover_jobs_fri.circuit_id, lawj.number_of_basic_circuits\n HAVING COUNT(*) = lawj.number_of_basic_circuits)\n RETURNING l1_batch_number, circuit_id;\n " }, + "2bd9137542076526c245366057f0f3f57c08368f6e0dc86d49293a91875272b8": { + "describe": { + "columns": [ + { + "name": "attempts", + "ordinal": 0, + "type_info": "Int2" + } + ], + "nullable": [ + false + ], + "parameters": { + "Left": [ + "Int8" + ] + } + }, + "query": "SELECT attempts FROM witness_inputs_fri WHERE l1_batch_number = $1" + }, "2c136284610f728ddba3e255d7dc573b10e4baf9151de194b7d8e0dc40c40602": { "describe": { "columns": [], @@ -3364,39 +3437,6 @@ }, "query": "SELECT * FROM protocol_versions ORDER BY id DESC LIMIT 1" }, - "3665394a2f91f1a07c2c517ae9e75434f9ec12ed4debf370662b8104e015c75f": { - "describe": { - "columns": [ - { - "name": "id", - "ordinal": 0, - "type_info": "Int8" - }, - { - "name": "status", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "attempts", - "ordinal": 2, - "type_info": "Int2" - } - ], - "nullable": [ - false, - false, - false - ], - "parameters": { - "Left": [ - "Interval", - "Int2" - ] - } - }, - "query": "\n UPDATE prover_jobs_fri\n SET status = 'queued', attempts = attempts + 1, updated_at = now(), processing_started_at = now()\n WHERE id in (\n SELECT id\n FROM prover_jobs_fri\n WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'in_gpu_proof' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'failed' AND attempts < $2)\n FOR UPDATE SKIP LOCKED\n )\n RETURNING id, status, attempts\n " - }, "37e4a0eea7b72bd3b75c26e003f3fa62039d9b614f0f2fa3d61e8c5e95f002fd": { "describe": { "columns": [ @@ -3672,6 +3712,26 @@ }, "query": "\n UPDATE scheduler_witness_jobs\n SET status = 'in_progress', attempts = attempts + 1,\n updated_at = now(), processing_started_at = now()\n WHERE l1_batch_number = (\n SELECT l1_batch_number\n FROM scheduler_witness_jobs\n WHERE l1_batch_number <= $3\n AND\n ( status = 'queued'\n OR (status = 'in_progress' AND processing_started_at < now() - $1::interval)\n OR (status = 'failed' AND attempts < $2)\n )\n AND protocol_version = ANY($4)\n ORDER BY l1_batch_number ASC\n LIMIT 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING scheduler_witness_jobs.*\n " }, + "3be0d3fd7a1ff997edb1eaff3fac59324a5b33663e7862cfddd4a5db8015f13c": { + "describe": { + "columns": [ + { + "name": "attempts", + "ordinal": 0, + "type_info": "Int2" + } + ], + "nullable": [ + false + ], + "parameters": { + "Left": [ + "Int8" + ] + } + }, + "query": "SELECT attempts FROM leaf_aggregation_witness_jobs_fri WHERE id = $1" + }, "3c582aeed32235ef175707de412a9f9129fad6ea5e87ebb85f68e20664b0da46": { "describe": { "columns": [], @@ -4265,39 +4325,6 @@ }, "query": "\n UPDATE gpu_prover_queue\n SET instance_status = 'reserved',\n updated_at = now(),\n processing_started_at = now()\n WHERE id in (\n SELECT id\n FROM gpu_prover_queue\n WHERE specialized_prover_group_id=$2\n AND region=$3\n AND zone=$4\n AND (\n instance_status = 'available'\n OR (instance_status = 'reserved' AND processing_started_at < now() - $1::interval)\n )\n ORDER BY updated_at ASC\n LIMIT 1\n FOR UPDATE\n SKIP LOCKED\n )\n RETURNING gpu_prover_queue.*\n " }, - "4fca2f4497b3b5040cb8ccefe44a29c2583578942fd7c58e71c0eaeb2d9bec9e": { - "describe": { - "columns": [ - { - "name": "l1_batch_number", - "ordinal": 0, - "type_info": "Int8" - }, - { - "name": "status", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "attempts", - "ordinal": 2, - "type_info": "Int2" - } - ], - "nullable": [ - false, - false, - false - ], - "parameters": { - "Left": [ - "Interval", - "Int2" - ] - } - }, - "query": "UPDATE proof_compression_jobs_fri SET status = 'queued', attempts = attempts + 1, updated_at = now(), processing_started_at = now() WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2) OR (status = 'failed' AND attempts < $2) RETURNING l1_batch_number, status, attempts" - }, "5089dfb745ff04a9b071b5785e68194a6f6a7a72754d23a65adc7d6838f7f640": { "describe": { "columns": [], @@ -4953,6 +4980,26 @@ }, "query": "SELECT MAX(number) as \"number\" FROM l1_batches WHERE hash IS NOT NULL" }, + "5cc93efebc14dc0b78ed32bf7f167a44bd083f32ab308662c57ce1f726c0f1f9": { + "describe": { + "columns": [ + { + "name": "attempts", + "ordinal": 0, + "type_info": "Int2" + } + ], + "nullable": [ + false + ], + "parameters": { + "Left": [ + "Int8" + ] + } + }, + "query": "SELECT attempts FROM node_aggregation_witness_jobs_fri WHERE id = $1" + }, "5df806b33f84893d4ddfacf3b289b0e173e85ad9204cbb7ad314e68a94cdc41e": { "describe": { "columns": [], @@ -5120,6 +5167,39 @@ }, "query": "SELECT (SELECT l1_batch_number FROM miniblocks WHERE number = $1) as \"block_batch?\", (SELECT MAX(number) + 1 FROM l1_batches) as \"max_batch?\"" }, + "5f40849646bb7436e29cda8fb87fece2a4dcb580644f45ecb82388dece04f222": { + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Int8" + }, + { + "name": "status", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "attempts", + "ordinal": 2, + "type_info": "Int2" + } + ], + "nullable": [ + false, + false, + false + ], + "parameters": { + "Left": [ + "Interval", + "Int2" + ] + } + }, + "query": "\n UPDATE prover_jobs_fri\n SET status = 'queued', updated_at = now(), processing_started_at = now()\n WHERE id in (\n SELECT id\n FROM prover_jobs_fri\n WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'in_gpu_proof' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'failed' AND attempts < $2)\n FOR UPDATE SKIP LOCKED\n )\n RETURNING id, status, attempts\n " + }, "5f4b1091b74424ffd20c0aede98287418afa2bb37dbc941200c1d6190c96bec5": { "describe": { "columns": [ @@ -5587,6 +5667,39 @@ }, "query": "INSERT INTO commitments (l1_batch_number, events_queue_commitment, bootloader_initial_content_commitment) VALUES ($1, $2, $3) ON CONFLICT (l1_batch_number) DO UPDATE SET events_queue_commitment = $2, bootloader_initial_content_commitment = $3" }, + "6939e766e122458b2ac618d19b2759c4a7298ef72b81e8c3957e0a5cf35c9552": { + "describe": { + "columns": [ + { + "name": "l1_batch_number", + "ordinal": 0, + "type_info": "Int8" + }, + { + "name": "status", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "attempts", + "ordinal": 2, + "type_info": "Int2" + } + ], + "nullable": [ + false, + false, + false + ], + "parameters": { + "Left": [ + "Interval", + "Int2" + ] + } + }, + "query": "\n UPDATE witness_inputs_fri\n SET status = 'queued', updated_at = now(), processing_started_at = now()\n WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'in_gpu_proof' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'failed' AND attempts < $2)\n RETURNING l1_batch_number, status, attempts\n " + }, "694f1d154f3f38b123d8f845fef6e876d35dc3743f1c5b69dce6be694e5e726c": { "describe": { "columns": [], @@ -6021,6 +6134,26 @@ }, "query": "SELECT * FROM call_traces WHERE tx_hash IN (SELECT hash FROM transactions WHERE miniblock_number = $1)" }, + "79cdb4cdd3c47b3654e6240178985fb4b4420e0634f9482a6ef8169e90200b84": { + "describe": { + "columns": [ + { + "name": "attempts", + "ordinal": 0, + "type_info": "Int2" + } + ], + "nullable": [ + false + ], + "parameters": { + "Left": [ + "Int8" + ] + } + }, + "query": "SELECT attempts FROM scheduler_witness_jobs_fri WHERE l1_batch_number = $1" + }, "7a5aba2130fec60318266c8059d3757cd78eb6099d50486b4996fb4090c99622": { "describe": { "columns": [], @@ -7679,21 +7812,54 @@ }, "query": "\n SELECT value\n FROM storage_logs\n WHERE storage_logs.hashed_key = $1 AND storage_logs.miniblock_number <= $2\n ORDER BY storage_logs.miniblock_number DESC, storage_logs.operation_number DESC\n LIMIT 1\n " }, - "9554593134830bc197e95f3a7e69844839bfe31bf567934ddbab760017710e39": { + "944c38995043e7b11e6633beb68b5479059ff27b26fd2df171a3d9650f070547": { "describe": { "columns": [ { - "name": "bytecode", + "name": "id", "ordinal": 0, - "type_info": "Bytea" + "type_info": "Int8" }, { - "name": "data?", + "name": "status", "ordinal": 1, - "type_info": "Jsonb" + "type_info": "Text" }, { - "name": "contract_address?", + "name": "attempts", + "ordinal": 2, + "type_info": "Int2" + } + ], + "nullable": [ + false, + false, + false + ], + "parameters": { + "Left": [ + "Interval", + "Int2" + ] + } + }, + "query": "\n UPDATE leaf_aggregation_witness_jobs_fri\n SET status = 'queued', updated_at = now(), processing_started_at = now()\n WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'failed' AND attempts < $2)\n RETURNING id, status, attempts\n " + }, + "9554593134830bc197e95f3a7e69844839bfe31bf567934ddbab760017710e39": { + "describe": { + "columns": [ + { + "name": "bytecode", + "ordinal": 0, + "type_info": "Bytea" + }, + { + "name": "data?", + "ordinal": 1, + "type_info": "Jsonb" + }, + { + "name": "contract_address?", "ordinal": 2, "type_info": "Bytea" } @@ -8230,6 +8396,26 @@ }, "query": "\n UPDATE transactions\n SET \n hash = data_table.hash,\n signature = data_table.signature,\n gas_limit = data_table.gas_limit,\n max_fee_per_gas = data_table.max_fee_per_gas,\n max_priority_fee_per_gas = data_table.max_priority_fee_per_gas,\n gas_per_pubdata_limit = data_table.gas_per_pubdata_limit,\n input = data_table.input,\n data = data_table.data,\n tx_format = data_table.tx_format,\n miniblock_number = $21,\n index_in_block = data_table.index_in_block,\n error = NULLIF(data_table.error, ''),\n effective_gas_price = data_table.effective_gas_price,\n execution_info = data_table.new_execution_info,\n refunded_gas = data_table.refunded_gas,\n value = data_table.value,\n contract_address = data_table.contract_address,\n paymaster = data_table.paymaster,\n paymaster_input = data_table.paymaster_input,\n in_mempool = FALSE,\n updated_at = now()\n FROM\n (\n SELECT data_table_temp.* FROM (\n SELECT\n UNNEST($1::bytea[]) AS initiator_address,\n UNNEST($2::int[]) AS nonce,\n UNNEST($3::bytea[]) AS hash,\n UNNEST($4::bytea[]) AS signature,\n UNNEST($5::numeric[]) AS gas_limit,\n UNNEST($6::numeric[]) AS max_fee_per_gas,\n UNNEST($7::numeric[]) AS max_priority_fee_per_gas,\n UNNEST($8::numeric[]) AS gas_per_pubdata_limit,\n UNNEST($9::int[]) AS tx_format,\n UNNEST($10::integer[]) AS index_in_block,\n UNNEST($11::varchar[]) AS error,\n UNNEST($12::numeric[]) AS effective_gas_price,\n UNNEST($13::jsonb[]) AS new_execution_info,\n UNNEST($14::bytea[]) AS input,\n UNNEST($15::jsonb[]) AS data,\n UNNEST($16::bigint[]) as refunded_gas,\n UNNEST($17::numeric[]) as value,\n UNNEST($18::bytea[]) as contract_address,\n UNNEST($19::bytea[]) as paymaster,\n UNNEST($20::bytea[]) as paymaster_input\n ) AS data_table_temp\n JOIN transactions ON transactions.initiator_address = data_table_temp.initiator_address\n AND transactions.nonce = data_table_temp.nonce\n ORDER BY transactions.hash\n ) AS data_table\n WHERE transactions.initiator_address=data_table.initiator_address\n AND transactions.nonce=data_table.nonce\n " }, + "9a326e8fb44f8ebfdd26d945b73a054fd6802551594b23687d057a3954e24f33": { + "describe": { + "columns": [ + { + "name": "attempts", + "ordinal": 0, + "type_info": "Int2" + } + ], + "nullable": [ + false + ], + "parameters": { + "Left": [ + "Int8" + ] + } + }, + "query": "SELECT attempts FROM basic_witness_input_producer_jobs WHERE l1_batch_number = $1" + }, "9aaf98668f384f634860c4acf793ff47be08975e5d09061cc26fd53dea249c55": { "describe": { "columns": [], @@ -8903,6 +9089,39 @@ }, "query": "SELECT * FROM eth_txs_history WHERE eth_tx_id = $1 ORDER BY created_at DESC LIMIT 1" }, + "ad495160a947cf1bd7343819e723d18c9332bc95cfc2014ed8d04907eff3896e": { + "describe": { + "columns": [ + { + "name": "l1_batch_number", + "ordinal": 0, + "type_info": "Int8" + }, + { + "name": "status", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "attempts", + "ordinal": 2, + "type_info": "Int2" + } + ], + "nullable": [ + false, + false, + false + ], + "parameters": { + "Left": [ + "Interval", + "Int2" + ] + } + }, + "query": "\n UPDATE scheduler_witness_jobs_fri\n SET status = 'queued', updated_at = now(), processing_started_at = now()\n WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'failed' AND attempts < $2)\n RETURNING l1_batch_number, status, attempts\n " + }, "ad4f74aa6f131df0243f4fa500ade1b98aa335bd71ed417b02361e2c697e60f8": { "describe": { "columns": [], @@ -9426,72 +9645,6 @@ }, "query": "INSERT INTO prover_protocol_versions\n (id, timestamp, recursion_scheduler_level_vk_hash, recursion_node_level_vk_hash,\n recursion_leaf_level_vk_hash, recursion_circuits_set_vks_hash, verifier_address, created_at)\n VALUES ($1, $2, $3, $4, $5, $6, $7, now())\n " }, - "b7ab3aeee71e87c7469428ec411b410d81282ff6fed63fe5cda0e81a330d2ac5": { - "describe": { - "columns": [ - { - "name": "id", - "ordinal": 0, - "type_info": "Int8" - }, - { - "name": "status", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "attempts", - "ordinal": 2, - "type_info": "Int2" - } - ], - "nullable": [ - false, - false, - false - ], - "parameters": { - "Left": [ - "Interval", - "Int2" - ] - } - }, - "query": "\n UPDATE leaf_aggregation_witness_jobs_fri\n SET status = 'queued', attempts = attempts + 1, updated_at = now(), processing_started_at = now()\n WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'failed' AND attempts < $2)\n RETURNING id, status, attempts\n " - }, - "b7d3b30bff2ed9aabcdaed89ebfd1f0303b70c6d5483ff9183475bb232a04f21": { - "describe": { - "columns": [ - { - "name": "l1_batch_number", - "ordinal": 0, - "type_info": "Int8" - }, - { - "name": "status", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "attempts", - "ordinal": 2, - "type_info": "Int2" - } - ], - "nullable": [ - false, - false, - false - ], - "parameters": { - "Left": [ - "Interval", - "Int2" - ] - } - }, - "query": "\n UPDATE witness_inputs_fri\n SET status = 'queued', attempts = attempts + 1, updated_at = now(), processing_started_at = now()\n WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'in_gpu_proof' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'failed' AND attempts < $2)\n RETURNING l1_batch_number, status, attempts\n " - }, "b944df7af612ec911170a43be846eb2f6e27163b0d3983672de2b8d5d60af640": { "describe": { "columns": [ @@ -9717,39 +9870,6 @@ }, "query": "SELECT storage_refunds FROM l1_batches WHERE number = $1" }, - "c49a6925e9462cc85a6e1cc850f2e147e0a5d990efed56f27792698e6cf9ff0c": { - "describe": { - "columns": [ - { - "name": "l1_batch_number", - "ordinal": 0, - "type_info": "Int8" - }, - { - "name": "status", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "attempts", - "ordinal": 2, - "type_info": "Int2" - } - ], - "nullable": [ - false, - false, - false - ], - "parameters": { - "Left": [ - "Interval", - "Int2" - ] - } - }, - "query": "\n UPDATE scheduler_witness_jobs_fri\n SET status = 'queued', attempts = attempts + 1, updated_at = now(), processing_started_at = now()\n WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'failed' AND attempts < $2)\n RETURNING l1_batch_number, status, attempts\n " - }, "c59d052f89ddfc3d2c07be84d6d9837adfbe2cefb10d01e09d31aa5e3364e281": { "describe": { "columns": [ @@ -9896,39 +10016,6 @@ }, "query": "\n UPDATE transactions\n SET\n miniblock_number = $1,\n index_in_block = data_table.index_in_block,\n error = NULLIF(data_table.error, ''),\n in_mempool=FALSE,\n execution_info = execution_info || data_table.new_execution_info,\n refunded_gas = data_table.refunded_gas,\n effective_gas_price = data_table.effective_gas_price,\n updated_at = now()\n FROM\n (\n SELECT\n UNNEST($2::bytea[]) AS hash,\n UNNEST($3::integer[]) AS index_in_block,\n UNNEST($4::varchar[]) AS error,\n UNNEST($5::jsonb[]) AS new_execution_info,\n UNNEST($6::bigint[]) as refunded_gas,\n UNNEST($7::numeric[]) as effective_gas_price\n ) AS data_table\n WHERE transactions.hash = data_table.hash\n " }, - "c66b0e0867a1a634f984645ca576a6502b51b67aa0be2dae98e0e2adeb450963": { - "describe": { - "columns": [ - { - "name": "id", - "ordinal": 0, - "type_info": "Int8" - }, - { - "name": "status", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "attempts", - "ordinal": 2, - "type_info": "Int4" - } - ], - "nullable": [ - false, - false, - false - ], - "parameters": { - "Left": [ - "Interval", - "Int4" - ] - } - }, - "query": "\n UPDATE prover_jobs\n SET status = 'queued', attempts = attempts + 1, updated_at = now(), processing_started_at = now()\n WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'in_gpu_proof' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'failed' AND attempts < $2)\n RETURNING id, status, attempts\n " - }, "c6aadc4ec78e30f5775f7a9f866ad02984b78de3e3d1f34c144a4057ff44ea6a": { "describe": { "columns": [ @@ -10086,6 +10173,26 @@ }, "query": "INSERT INTO protocol_versions (id, timestamp, recursion_scheduler_level_vk_hash, recursion_node_level_vk_hash, recursion_leaf_level_vk_hash, recursion_circuits_set_vks_hash, bootloader_code_hash, default_account_code_hash, verifier_address, upgrade_tx_hash, created_at) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, now())" }, + "cd2f668e3febead6b8c5c5dacaf95f0840b9c40f6c8585df93b0541f9b5b1548": { + "describe": { + "columns": [ + { + "name": "attempts", + "ordinal": 0, + "type_info": "Int2" + } + ], + "nullable": [ + false + ], + "parameters": { + "Left": [ + "Int8" + ] + } + }, + "query": "SELECT attempts FROM proof_compression_jobs_fri WHERE l1_batch_number = $1" + }, "ce3666b149f7fc62a68139a8efb83ed149c7deace17b8968817941763e45a147": { "describe": { "columns": [], @@ -10406,39 +10513,6 @@ }, "query": "SELECT COUNT(miniblocks.number) FROM miniblocks WHERE l1_batch_number IS NULL" }, - "dc751a25528a272bac17416f782fce3d0aee44b1ae25be0220718b356fda02e8": { - "describe": { - "columns": [ - { - "name": "id", - "ordinal": 0, - "type_info": "Int8" - }, - { - "name": "status", - "ordinal": 1, - "type_info": "Text" - }, - { - "name": "attempts", - "ordinal": 2, - "type_info": "Int2" - } - ], - "nullable": [ - false, - false, - false - ], - "parameters": { - "Left": [ - "Interval", - "Int2" - ] - } - }, - "query": "\n UPDATE node_aggregation_witness_jobs_fri\n SET status = 'queued', attempts = attempts + 1, updated_at = now(), processing_started_at = now()\n WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'failed' AND attempts < $2)\n RETURNING id, status, attempts\n " - }, "dd330bc075a163974c59ec55ecfddd769d05801963b3e0e840e7f11e7bc6d3e9": { "describe": { "columns": [ @@ -10856,6 +10930,39 @@ }, "query": "UPDATE eth_txs SET gas_used = $1, confirmed_eth_tx_history_id = $2 WHERE id = $3" }, + "e8988deed66ad9d10be89e89966082aeb920c5dc91eb5fad16bd0d3118708c2e": { + "describe": { + "columns": [ + { + "name": "id", + "ordinal": 0, + "type_info": "Int8" + }, + { + "name": "status", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "attempts", + "ordinal": 2, + "type_info": "Int4" + } + ], + "nullable": [ + false, + false, + false + ], + "parameters": { + "Left": [ + "Interval", + "Int4" + ] + } + }, + "query": "\n UPDATE prover_jobs\n SET status = 'queued', updated_at = now(), processing_started_at = now()\n WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'in_gpu_proof' AND processing_started_at <= now() - $1::interval AND attempts < $2)\n OR (status = 'failed' AND attempts < $2)\n RETURNING id, status, attempts\n " + }, "e900682a160af90d532da47a1222fc1d7c9962ee8996dbd9b9bb63f13820cf2b": { "describe": { "columns": [], @@ -11730,6 +11837,39 @@ }, "query": "SELECT * FROM transactions WHERE l1_batch_number = $1 ORDER BY miniblock_number, index_in_block" }, + "f39893caa0ad524eda13ab89539fd61804c9190b3d62f4416de83159c2c189e4": { + "describe": { + "columns": [ + { + "name": "l1_batch_number", + "ordinal": 0, + "type_info": "Int8" + }, + { + "name": "status", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "attempts", + "ordinal": 2, + "type_info": "Int2" + } + ], + "nullable": [ + false, + false, + false + ], + "parameters": { + "Left": [ + "Interval", + "Int2" + ] + } + }, + "query": "UPDATE proof_compression_jobs_fri SET status = 'queued', updated_at = now(), processing_started_at = now() WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2) OR (status = 'failed' AND attempts < $2) RETURNING l1_batch_number, status, attempts" + }, "f5e3c4b23fa0d0686b400b64c42cf78b2219f0cbcf1c9240b77e4132513e36ef": { "describe": { "columns": [ diff --git a/core/lib/dal/src/basic_witness_input_producer_dal.rs b/core/lib/dal/src/basic_witness_input_producer_dal.rs index fc25cf65c602..ac0627a96a05 100644 --- a/core/lib/dal/src/basic_witness_input_producer_dal.rs +++ b/core/lib/dal/src/basic_witness_input_producer_dal.rs @@ -11,7 +11,7 @@ pub struct BasicWitnessInputProducerDal<'a, 'c> { } /// The amount of attempts to process a job before giving up. -const JOB_MAX_ATTEMPT: i16 = 10; +pub const JOB_MAX_ATTEMPT: i16 = 10; /// Time to wait for job to be processed const JOB_PROCESSING_TIMEOUT: PgInterval = pg_interval_from_duration(Duration::from_secs(10 * 60)); @@ -98,6 +98,22 @@ impl BasicWitnessInputProducerDal<'_, '_> { Ok(l1_batch_number) } + pub async fn get_basic_witness_input_producer_job_attempts( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> sqlx::Result> { + let attempts = sqlx::query!( + "SELECT attempts FROM basic_witness_input_producer_jobs \ + WHERE l1_batch_number = $1", + l1_batch_number.0 as i64, + ) + .fetch_optional(self.storage.conn()) + .await? + .map(|job| job.attempts as u32); + + Ok(attempts) + } + pub async fn mark_job_as_successful( &mut self, l1_batch_number: L1BatchNumber, diff --git a/core/lib/dal/src/fri_proof_compressor_dal.rs b/core/lib/dal/src/fri_proof_compressor_dal.rs index 0ca1f435aff2..97caf76ebce0 100644 --- a/core/lib/dal/src/fri_proof_compressor_dal.rs +++ b/core/lib/dal/src/fri_proof_compressor_dal.rs @@ -67,7 +67,7 @@ impl FriProofCompressorDal<'_, '_> { &mut self, picked_by: &str, ) -> Option { - let result: Option = sqlx::query!( + sqlx::query!( "UPDATE proof_compression_jobs_fri \ SET status = $1, attempts = attempts + 1, \ updated_at = now(), processing_started_at = now(), \ @@ -89,8 +89,23 @@ impl FriProofCompressorDal<'_, '_> { .fetch_optional(self.storage.conn()) .await .unwrap() - .map(|row| L1BatchNumber(row.l1_batch_number as u32)); - result + .map(|row| L1BatchNumber(row.l1_batch_number as u32)) + } + + pub async fn get_proof_compression_job_attempts( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> sqlx::Result> { + let attempts = sqlx::query!( + "SELECT attempts FROM proof_compression_jobs_fri \ + WHERE l1_batch_number = $1", + l1_batch_number.0 as i64, + ) + .fetch_optional(self.storage.conn()) + .await? + .map(|row| row.attempts as u32); + + Ok(attempts) } pub async fn mark_proof_compression_job_successful( @@ -200,7 +215,7 @@ impl FriProofCompressorDal<'_, '_> { { sqlx::query!( "UPDATE proof_compression_jobs_fri \ - SET status = 'queued', attempts = attempts + 1, updated_at = now(), processing_started_at = now() \ + SET status = 'queued', updated_at = now(), processing_started_at = now() \ WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2) \ OR (status = 'failed' AND attempts < $2) \ RETURNING l1_batch_number, status, attempts", diff --git a/core/lib/dal/src/fri_prover_dal.rs b/core/lib/dal/src/fri_prover_dal.rs index 09fa5ca3b41e..d3cb13644556 100644 --- a/core/lib/dal/src/fri_prover_dal.rs +++ b/core/lib/dal/src/fri_prover_dal.rs @@ -166,6 +166,18 @@ impl FriProverDal<'_, '_> { } } + pub async fn get_prover_job_attempts(&mut self, id: u32) -> sqlx::Result> { + let attempts = sqlx::query!( + "SELECT attempts FROM prover_jobs_fri WHERE id = $1", + id as i64, + ) + .fetch_optional(self.storage.conn()) + .await? + .map(|row| row.attempts as u32); + + Ok(attempts) + } + pub async fn save_proof( &mut self, id: u32, @@ -213,7 +225,7 @@ impl FriProverDal<'_, '_> { sqlx::query!( " UPDATE prover_jobs_fri - SET status = 'queued', attempts = attempts + 1, updated_at = now(), processing_started_at = now() + SET status = 'queued', updated_at = now(), processing_started_at = now() WHERE id in ( SELECT id FROM prover_jobs_fri diff --git a/core/lib/dal/src/fri_witness_generator_dal.rs b/core/lib/dal/src/fri_witness_generator_dal.rs index d96f8774f137..c05dd3b3d1a8 100644 --- a/core/lib/dal/src/fri_witness_generator_dal.rs +++ b/core/lib/dal/src/fri_witness_generator_dal.rs @@ -64,7 +64,7 @@ impl FriWitnessGeneratorDal<'_, '_> { picked_by: &str, ) -> Option { let protocol_versions: Vec = protocol_versions.iter().map(|&id| id as i32).collect(); - let result: Option = sqlx::query!( + sqlx::query!( " UPDATE witness_inputs_fri SET status = 'in_progress', attempts = attempts + 1, @@ -90,8 +90,23 @@ impl FriWitnessGeneratorDal<'_, '_> { .fetch_optional(self.storage.conn()) .await .unwrap() - .map(|row| L1BatchNumber(row.l1_batch_number as u32)); - result + .map(|row| L1BatchNumber(row.l1_batch_number as u32)) + } + + pub async fn get_basic_circuit_witness_job_attempts( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> sqlx::Result> { + let attempts = sqlx::query!( + "SELECT attempts FROM witness_inputs_fri \ + WHERE l1_batch_number = $1", + l1_batch_number.0 as i64, + ) + .fetch_optional(self.storage.conn()) + .await? + .map(|row| row.attempts as u32); + + Ok(attempts) } pub async fn mark_witness_job( @@ -184,7 +199,7 @@ impl FriWitnessGeneratorDal<'_, '_> { sqlx::query!( " UPDATE witness_inputs_fri - SET status = 'queued', attempts = attempts + 1, updated_at = now(), processing_started_at = now() + SET status = 'queued', updated_at = now(), processing_started_at = now() WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2) OR (status = 'in_gpu_proof' AND processing_started_at <= now() - $1::interval AND attempts < $2) OR (status = 'failed' AND attempts < $2) @@ -325,6 +340,23 @@ impl FriWitnessGeneratorDal<'_, '_> { }) } + pub async fn get_leaf_aggregation_job_attempts( + &mut self, + id: u32, + ) -> sqlx::Result> { + let attempts = sqlx::query!( + "SELECT attempts FROM leaf_aggregation_witness_jobs_fri \ + WHERE id = $1", + id as i64, + ) + .fetch_optional(self.storage.conn()) + .await + .unwrap() + .map(|row| row.attempts as u32); + + Ok(attempts) + } + async fn prover_job_ids_for( &mut self, block_number: L1BatchNumber, @@ -461,6 +493,23 @@ impl FriWitnessGeneratorDal<'_, '_> { }) } + pub async fn get_node_aggregation_job_attempts( + &mut self, + id: u32, + ) -> sqlx::Result> { + let attempts = sqlx::query!( + "SELECT attempts FROM node_aggregation_witness_jobs_fri \ + WHERE id = $1", + id as i64, + ) + .fetch_optional(self.storage.conn()) + .await + .unwrap() + .map(|row| row.attempts as u32); + + Ok(attempts) + } + pub async fn mark_node_aggregation_job_failed(&mut self, error: &str, id: u32) { sqlx::query!( " @@ -583,7 +632,7 @@ impl FriWitnessGeneratorDal<'_, '_> { sqlx::query!( " UPDATE leaf_aggregation_witness_jobs_fri - SET status = 'queued', attempts = attempts + 1, updated_at = now(), processing_started_at = now() + SET status = 'queued', updated_at = now(), processing_started_at = now() WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2) OR (status = 'failed' AND attempts < $2) RETURNING id, status, attempts @@ -608,7 +657,7 @@ impl FriWitnessGeneratorDal<'_, '_> { sqlx::query!( " UPDATE node_aggregation_witness_jobs_fri - SET status = 'queued', attempts = attempts + 1, updated_at = now(), processing_started_at = now() + SET status = 'queued', updated_at = now(), processing_started_at = now() WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2) OR (status = 'failed' AND attempts < $2) RETURNING id, status, attempts @@ -649,7 +698,7 @@ impl FriWitnessGeneratorDal<'_, '_> { sqlx::query!( " UPDATE scheduler_witness_jobs_fri - SET status = 'queued', attempts = attempts + 1, updated_at = now(), processing_started_at = now() + SET status = 'queued', updated_at = now(), processing_started_at = now() WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2) OR (status = 'failed' AND attempts < $2) RETURNING l1_batch_number, status, attempts @@ -671,7 +720,7 @@ impl FriWitnessGeneratorDal<'_, '_> { picked_by: &str, ) -> Option { let protocol_versions: Vec = protocol_versions.iter().map(|&id| id as i32).collect(); - let result: Option = sqlx::query!( + sqlx::query!( " UPDATE scheduler_witness_jobs_fri SET status = 'in_progress', attempts = attempts + 1, @@ -695,8 +744,23 @@ impl FriWitnessGeneratorDal<'_, '_> { .fetch_optional(self.storage.conn()) .await .unwrap() - .map(|row| L1BatchNumber(row.l1_batch_number as u32)); - result + .map(|row| L1BatchNumber(row.l1_batch_number as u32)) + } + + pub async fn get_scheduler_witness_job_attempts( + &mut self, + l1_batch_number: L1BatchNumber, + ) -> sqlx::Result> { + let attempts = sqlx::query!( + "SELECT attempts FROM scheduler_witness_jobs_fri \ + WHERE l1_batch_number = $1", + l1_batch_number.0 as i64, + ) + .fetch_optional(self.storage.conn()) + .await? + .map(|row| row.attempts as u32); + + Ok(attempts) } pub async fn mark_scheduler_job_as_successful( diff --git a/core/lib/dal/src/prover_dal.rs b/core/lib/dal/src/prover_dal.rs index f29ff97642f2..d84d0628372b 100644 --- a/core/lib/dal/src/prover_dal.rs +++ b/core/lib/dal/src/prover_dal.rs @@ -246,7 +246,7 @@ impl ProverDal<'_, '_> { sqlx::query!( " UPDATE prover_jobs - SET status = 'queued', attempts = attempts + 1, updated_at = now(), processing_started_at = now() + SET status = 'queued', updated_at = now(), processing_started_at = now() WHERE (status = 'in_progress' AND processing_started_at <= now() - $1::interval AND attempts < $2) OR (status = 'in_gpu_proof' AND processing_started_at <= now() - $1::interval AND attempts < $2) OR (status = 'failed' AND attempts < $2) diff --git a/core/lib/queued_job_processor/Cargo.toml b/core/lib/queued_job_processor/Cargo.toml index 913a6934c0c3..72ff3daa6295 100644 --- a/core/lib/queued_job_processor/Cargo.toml +++ b/core/lib/queued_job_processor/Cargo.toml @@ -17,3 +17,4 @@ tokio = { version = "1", features = ["time"] } tracing = "0.1" zksync_utils = { path = "../../lib/utils" } +vise = { git = "https://github.com/matter-labs/vise.git", version = "0.1.0", rev = "dd05139b76ab0843443ab3ff730174942c825dae" } diff --git a/core/lib/queued_job_processor/src/lib.rs b/core/lib/queued_job_processor/src/lib.rs index 9cff70b4fc0a..281e91927b96 100644 --- a/core/lib/queued_job_processor/src/lib.rs +++ b/core/lib/queued_job_processor/src/lib.rs @@ -9,10 +9,26 @@ use tokio::time::sleep; use zksync_utils::panic_extractor::try_extract_panic_message; +use vise::{Buckets, Counter, Histogram, LabeledFamily, Metrics}; + +const ATTEMPT_BUCKETS: Buckets = Buckets::exponential(1.0..=64.0, 2.0); + +#[derive(Debug, Metrics)] +#[metrics(prefix = "job_processor")] +struct JobProcessorMetrics { + #[metrics(labels = ["service", "job_id"])] + max_attempts_reached: LabeledFamily<(&'static str, String), Counter, 2>, + #[metrics(labels = ["service"], buckets = ATTEMPT_BUCKETS)] + attempts: LabeledFamily<&'static str, Histogram>, +} + +#[vise::register] +static METRICS: vise::Global = vise::Global::new(); + #[async_trait] pub trait JobProcessor: Sync + Send { type Job: Send + 'static; - type JobId: Send + Debug + 'static; + type JobId: Send + Sync + Debug + 'static; type JobArtifacts: Send + 'static; const POLLING_INTERVAL_MS: u64 = 1000; @@ -94,6 +110,7 @@ pub trait JobProcessor: Sync + Send { started_at: Instant, task: JoinHandle>, ) -> anyhow::Result<()> { + let attempts = self.get_job_attempts(&job_id).await?; let result = loop { tracing::trace!( "Polling {} task with id {:?}. Is finished: {}", @@ -113,6 +130,7 @@ pub trait JobProcessor: Sync + Send { Self::SERVICE_NAME, job_id ); + METRICS.attempts[&Self::SERVICE_NAME].observe(attempts as usize); return self .save_result(job_id, started_at, data) .await @@ -127,6 +145,16 @@ pub trait JobProcessor: Sync + Send { job_id, error_message ); + + let max_attempts = self.max_attempts(); + if attempts == max_attempts { + METRICS.max_attempts_reached[&(Self::SERVICE_NAME, format!("{job_id:?}"))].inc(); + tracing::error!( + "Max attempts ({max_attempts}) reached for {} job {:?}", + Self::SERVICE_NAME, + job_id, + ); + } self.save_failure(job_id, started_at, error_message).await; Ok(()) } @@ -138,4 +166,9 @@ pub trait JobProcessor: Sync + Send { started_at: Instant, artifacts: Self::JobArtifacts, ) -> anyhow::Result<()>; + + fn max_attempts(&self) -> u32; + + /// Invoked in `wait_for_task` for in-progress job. + async fn get_job_attempts(&self, job_id: &Self::JobId) -> anyhow::Result; } diff --git a/core/lib/zksync_core/src/basic_witness_input_producer/mod.rs b/core/lib/zksync_core/src/basic_witness_input_producer/mod.rs index 32f58503d6d4..e4d605d2545d 100644 --- a/core/lib/zksync_core/src/basic_witness_input_producer/mod.rs +++ b/core/lib/zksync_core/src/basic_witness_input_producer/mod.rs @@ -2,7 +2,7 @@ use anyhow::Context; use std::sync::Arc; use std::time::Instant; -use zksync_dal::ConnectionPool; +use zksync_dal::{basic_witness_input_producer_dal::JOB_MAX_ATTEMPT, ConnectionPool}; use zksync_object_store::{ObjectStore, ObjectStoreFactory}; use zksync_queued_job_processor::JobProcessor; use zksync_types::witness_block_state::WitnessBlockState; @@ -203,4 +203,22 @@ impl JobProcessor for BasicWitnessInputProducer { METRICS.block_number_processed.set(job_id.0 as i64); Ok(()) } + + fn max_attempts(&self) -> u32 { + JOB_MAX_ATTEMPT as u32 + } + + async fn get_job_attempts(&self, job_id: &L1BatchNumber) -> anyhow::Result { + let mut connection = self + .connection_pool + .access_storage() + .await + .context("failed to acquire DB connection for BasicWitnessInputProducer")?; + connection + .basic_witness_input_producer_dal() + .get_basic_witness_input_producer_job_attempts(*job_id) + .await + .map(|attempts| attempts.unwrap_or(0)) + .context("failed to get job attempts for BasicWitnessInputProducer") + } } diff --git a/core/lib/zksync_core/src/witness_generator/basic_circuits.rs b/core/lib/zksync_core/src/witness_generator/basic_circuits.rs index de6ede7e14fd..c700d59120a1 100644 --- a/core/lib/zksync_core/src/witness_generator/basic_circuits.rs +++ b/core/lib/zksync_core/src/witness_generator/basic_circuits.rs @@ -238,6 +238,15 @@ impl JobProcessor for BasicWitnessGenerator { } Ok(()) } + + fn max_attempts(&self) -> u32 { + self.config.max_attempts + } + + async fn get_job_attempts(&self, _job_id: &Self::JobId) -> anyhow::Result { + // Witness generator will be removed soon in favor of FRI one, so returning blank value. + Ok(1) + } } pub async fn process_basic_circuits_job( diff --git a/core/lib/zksync_core/src/witness_generator/leaf_aggregation.rs b/core/lib/zksync_core/src/witness_generator/leaf_aggregation.rs index bd8c15acb18b..4c9201b65f68 100644 --- a/core/lib/zksync_core/src/witness_generator/leaf_aggregation.rs +++ b/core/lib/zksync_core/src/witness_generator/leaf_aggregation.rs @@ -169,6 +169,15 @@ impl JobProcessor for LeafAggregationWitnessGenerator { .await; Ok(()) } + + fn max_attempts(&self) -> u32 { + self.config.max_attempts + } + + async fn get_job_attempts(&self, _job_id: &Self::JobId) -> anyhow::Result { + // Witness generator will be removed soon in favor of FRI one, so returning blank value. + Ok(1) + } } pub fn process_leaf_aggregation_job( diff --git a/core/lib/zksync_core/src/witness_generator/node_aggregation.rs b/core/lib/zksync_core/src/witness_generator/node_aggregation.rs index 349ab56b1556..6d884563c9d4 100644 --- a/core/lib/zksync_core/src/witness_generator/node_aggregation.rs +++ b/core/lib/zksync_core/src/witness_generator/node_aggregation.rs @@ -174,6 +174,15 @@ impl JobProcessor for NodeAggregationWitnessGenerator { update_database(&self.prover_connection_pool, started_at, job_id, blob_urls).await; Ok(()) } + + fn max_attempts(&self) -> u32 { + self.config.max_attempts + } + + async fn get_job_attempts(&self, _job_id: &Self::JobId) -> anyhow::Result { + // Witness generator will be removed soon in favor of FRI one, so returning blank value. + Ok(1) + } } pub fn process_node_aggregation_job( diff --git a/core/lib/zksync_core/src/witness_generator/scheduler.rs b/core/lib/zksync_core/src/witness_generator/scheduler.rs index c38a70eff3e9..ae8c2daff732 100644 --- a/core/lib/zksync_core/src/witness_generator/scheduler.rs +++ b/core/lib/zksync_core/src/witness_generator/scheduler.rs @@ -179,6 +179,15 @@ impl JobProcessor for SchedulerWitnessGenerator { .await; Ok(()) } + + fn max_attempts(&self) -> u32 { + self.config.max_attempts + } + + async fn get_job_attempts(&self, _job_id: &Self::JobId) -> anyhow::Result { + // Witness generator will be removed soon in favor of FRI one, so returning blank value. + Ok(1) + } } pub fn process_scheduler_job( diff --git a/prover/Cargo.lock b/prover/Cargo.lock index 9652ee98a5c1..0af717daff5c 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -7037,6 +7037,7 @@ dependencies = [ "async-trait", "tokio", "tracing", + "vise", "zksync_utils", ] diff --git a/prover/circuit_synthesizer/src/circuit_synthesizer.rs b/prover/circuit_synthesizer/src/circuit_synthesizer.rs index ce1a5251d53c..1d68138cc608 100644 --- a/prover/circuit_synthesizer/src/circuit_synthesizer.rs +++ b/prover/circuit_synthesizer/src/circuit_synthesizer.rs @@ -272,6 +272,15 @@ impl JobProcessor for CircuitSynthesizer { ); Ok(()) } + + fn max_attempts(&self) -> u32 { + self.config.max_attempts + } + + async fn get_job_attempts(&self, _job_id: &u32) -> anyhow::Result { + // Circuit synthesizer will be removed soon in favor of FRI one, so returning blank value. + Ok(1) + } } async fn handle_send_result( diff --git a/prover/proof_fri_compressor/src/compressor.rs b/prover/proof_fri_compressor/src/compressor.rs index df9faaff257a..9da71c83eac4 100644 --- a/prover/proof_fri_compressor/src/compressor.rs +++ b/prover/proof_fri_compressor/src/compressor.rs @@ -29,6 +29,7 @@ pub struct ProofCompressor { pool: ConnectionPool, compression_mode: u8, verify_wrapper_proof: bool, + max_attempts: u32, } impl ProofCompressor { @@ -37,12 +38,14 @@ impl ProofCompressor { pool: ConnectionPool, compression_mode: u8, verify_wrapper_proof: bool, + max_attempts: u32, ) -> Self { Self { blob_store, pool, compression_mode, verify_wrapper_proof, + max_attempts, } } @@ -202,4 +205,22 @@ impl JobProcessor for ProofCompressor { .await; Ok(()) } + + fn max_attempts(&self) -> u32 { + self.max_attempts + } + + async fn get_job_attempts(&self, job_id: &L1BatchNumber) -> anyhow::Result { + let mut prover_storage = self + .pool + .access_storage() + .await + .context("failed to acquire DB connection for ProofCompressor")?; + prover_storage + .fri_proof_compressor_dal() + .get_proof_compression_job_attempts(*job_id) + .await + .map(|attempts| attempts.unwrap_or(0)) + .context("failed to get job attempts for ProofCompressor") + } } diff --git a/prover/proof_fri_compressor/src/main.rs b/prover/proof_fri_compressor/src/main.rs index a805bb3e3216..f6a9c201f8f9 100644 --- a/prover/proof_fri_compressor/src/main.rs +++ b/prover/proof_fri_compressor/src/main.rs @@ -63,6 +63,7 @@ async fn main() -> anyhow::Result<()> { pool, config.compression_mode, config.verify_wrapper_proof, + config.max_attempts, ); let (stop_sender, stop_receiver) = watch::channel(false); diff --git a/prover/prover_fri/src/gpu_prover_job_processor.rs b/prover/prover_fri/src/gpu_prover_job_processor.rs index 1919d5888214..c624c040ce3f 100644 --- a/prover/prover_fri/src/gpu_prover_job_processor.rs +++ b/prover/prover_fri/src/gpu_prover_job_processor.rs @@ -274,6 +274,24 @@ pub mod gpu_prover { .await; Ok(()) } + + fn max_attempts(&self) -> u32 { + self.config.max_attempts + } + + async fn get_job_attempts(&self, job_id: &u32) -> anyhow::Result { + let mut prover_storage = self + .prover_connection_pool + .access_storage() + .await + .context("failed to acquire DB connection for Prover")?; + prover_storage + .fri_prover_jobs_dal() + .get_prover_job_attempts(*job_id) + .await + .map(|attempts| attempts.unwrap_or(0)) + .context("failed to get job attempts for Prover") + } } pub fn load_setup_data_cache(config: &FriProverConfig) -> anyhow::Result { diff --git a/prover/prover_fri/src/prover_job_processor.rs b/prover/prover_fri/src/prover_job_processor.rs index bc53465af91f..d540771fd14f 100644 --- a/prover/prover_fri/src/prover_job_processor.rs +++ b/prover/prover_fri/src/prover_job_processor.rs @@ -259,6 +259,24 @@ impl JobProcessor for Prover { .await; Ok(()) } + + fn max_attempts(&self) -> u32 { + self.config.max_attempts + } + + async fn get_job_attempts(&self, job_id: &u32) -> anyhow::Result { + let mut prover_storage = self + .prover_connection_pool + .access_storage() + .await + .context("failed to acquire DB connection for Prover")?; + prover_storage + .fri_prover_jobs_dal() + .get_prover_job_attempts(*job_id) + .await + .map(|attempts| attempts.unwrap_or(0)) + .context("failed to get job attempts for Prover") + } } #[allow(dead_code)] diff --git a/prover/witness_generator/src/basic_circuits.rs b/prover/witness_generator/src/basic_circuits.rs index 04692dbced19..645e36722745 100644 --- a/prover/witness_generator/src/basic_circuits.rs +++ b/prover/witness_generator/src/basic_circuits.rs @@ -6,6 +6,7 @@ use std::{ time::Instant, }; +use anyhow::Context as _; use async_trait::async_trait; use zksync_prover_fri_types::circuit_definitions::ZkSyncDefaultRoundFunction; use rand::Rng; @@ -267,6 +268,24 @@ impl JobProcessor for BasicWitnessGenerator { } } } + + fn max_attempts(&self) -> u32 { + self.config.max_attempts + } + + async fn get_job_attempts(&self, job_id: &L1BatchNumber) -> anyhow::Result { + let mut prover_storage = self + .prover_connection_pool + .access_storage() + .await + .context("failed to acquire DB connection for BasicWitnessGenerator")?; + prover_storage + .fri_witness_generator_dal() + .get_basic_circuit_witness_job_attempts(*job_id) + .await + .map(|attempts| attempts.unwrap_or(0)) + .context("failed to get job attempts for BasicWitnessGenerator") + } } async fn process_basic_circuits_job( diff --git a/prover/witness_generator/src/leaf_aggregation.rs b/prover/witness_generator/src/leaf_aggregation.rs index a9a6741755ee..084a7bd2d4a7 100644 --- a/prover/witness_generator/src/leaf_aggregation.rs +++ b/prover/witness_generator/src/leaf_aggregation.rs @@ -60,7 +60,6 @@ pub struct LeafAggregationWitnessGeneratorJob { #[derive(Debug)] pub struct LeafAggregationWitnessGenerator { - #[allow(dead_code)] config: FriWitnessGeneratorConfig, object_store: Box, prover_connection_pool: ConnectionPool, @@ -162,6 +161,24 @@ impl JobProcessor for LeafAggregationWitnessGenerator { .await; Ok(()) } + + fn max_attempts(&self) -> u32 { + self.config.max_attempts + } + + async fn get_job_attempts(&self, job_id: &u32) -> anyhow::Result { + let mut prover_storage = self + .prover_connection_pool + .access_storage() + .await + .context("failed to acquire DB connection for LeafAggregationWitnessGenerator")?; + prover_storage + .fri_witness_generator_dal() + .get_leaf_aggregation_job_attempts(*job_id) + .await + .map(|attempts| attempts.unwrap_or(0)) + .context("failed to get job attempts for LeafAggregationWitnessGenerator") + } } pub async fn prepare_leaf_aggregation_job( diff --git a/prover/witness_generator/src/main.rs b/prover/witness_generator/src/main.rs index c662b14405ce..1e6d82d96182 100644 --- a/prover/witness_generator/src/main.rs +++ b/prover/witness_generator/src/main.rs @@ -162,6 +162,7 @@ async fn main() -> anyhow::Result<()> { } AggregationRound::NodeAggregation => { let generator = NodeAggregationWitnessGenerator::new( + config, &store_factory, prover_connection_pool, protocol_versions.clone(), @@ -171,6 +172,7 @@ async fn main() -> anyhow::Result<()> { } AggregationRound::Scheduler => { let generator = SchedulerWitnessGenerator::new( + config, &store_factory, prover_connection_pool, protocol_versions, diff --git a/prover/witness_generator/src/node_aggregation.rs b/prover/witness_generator/src/node_aggregation.rs index 7dd65a8a9c31..8349b1e18e9d 100644 --- a/prover/witness_generator/src/node_aggregation.rs +++ b/prover/witness_generator/src/node_aggregation.rs @@ -20,6 +20,7 @@ use crate::utils::{ load_proofs_for_job_ids, save_node_aggregations_artifacts, save_recursive_layer_prover_input_artifacts, AggregationWrapper, }; +use zksync_config::configs::FriWitnessGeneratorConfig; use zksync_dal::ConnectionPool; use zksync_object_store::{AggregationsKey, ObjectStore, ObjectStoreFactory}; use zksync_prover_fri_types::{get_current_pod_name, FriProofWrapper}; @@ -63,6 +64,7 @@ pub struct NodeAggregationWitnessGeneratorJob { #[derive(Debug)] pub struct NodeAggregationWitnessGenerator { + config: FriWitnessGeneratorConfig, object_store: Box, prover_connection_pool: ConnectionPool, protocol_versions: Vec, @@ -70,11 +72,13 @@ pub struct NodeAggregationWitnessGenerator { impl NodeAggregationWitnessGenerator { pub async fn new( + config: FriWitnessGeneratorConfig, store_factory: &ObjectStoreFactory, prover_connection_pool: ConnectionPool, protocol_versions: Vec, ) -> Self { Self { + config, object_store: store_factory.create_store().await, prover_connection_pool, protocol_versions, @@ -197,6 +201,24 @@ impl JobProcessor for NodeAggregationWitnessGenerator { .await; Ok(()) } + + fn max_attempts(&self) -> u32 { + self.config.max_attempts + } + + async fn get_job_attempts(&self, job_id: &u32) -> anyhow::Result { + let mut prover_storage = self + .prover_connection_pool + .access_storage() + .await + .context("failed to acquire DB connection for NodeAggregationWitnessGenerator")?; + prover_storage + .fri_witness_generator_dal() + .get_node_aggregation_job_attempts(*job_id) + .await + .map(|attempts| attempts.unwrap_or(0)) + .context("failed to get job attempts for NodeAggregationWitnessGenerator") + } } pub async fn prepare_job( diff --git a/prover/witness_generator/src/scheduler.rs b/prover/witness_generator/src/scheduler.rs index 50d579744f6f..50950c8e986e 100644 --- a/prover/witness_generator/src/scheduler.rs +++ b/prover/witness_generator/src/scheduler.rs @@ -17,6 +17,7 @@ use zksync_vk_setup_data_server_fri::get_recursive_layer_vk_for_circuit_type; use zksync_vk_setup_data_server_fri::utils::get_leaf_vk_params; use crate::utils::{load_proofs_for_job_ids, SchedulerPartialInputWrapper}; +use zksync_config::configs::FriWitnessGeneratorConfig; use zksync_dal::ConnectionPool; use zksync_object_store::{FriCircuitKey, ObjectStore, ObjectStoreFactory}; use zksync_prover_fri_types::{get_current_pod_name, CircuitWrapper, FriProofWrapper}; @@ -42,6 +43,7 @@ pub struct SchedulerWitnessGeneratorJob { #[derive(Debug)] pub struct SchedulerWitnessGenerator { + config: FriWitnessGeneratorConfig, object_store: Box, prover_connection_pool: ConnectionPool, protocol_versions: Vec, @@ -49,11 +51,13 @@ pub struct SchedulerWitnessGenerator { impl SchedulerWitnessGenerator { pub async fn new( + config: FriWitnessGeneratorConfig, store_factory: &ObjectStoreFactory, prover_connection_pool: ConnectionPool, protocol_versions: Vec, ) -> Self { Self { + config, object_store: store_factory.create_store().await, prover_connection_pool, protocol_versions, @@ -203,6 +207,24 @@ impl JobProcessor for SchedulerWitnessGenerator { transaction.commit().await.unwrap(); Ok(()) } + + fn max_attempts(&self) -> u32 { + self.config.max_attempts + } + + async fn get_job_attempts(&self, job_id: &L1BatchNumber) -> anyhow::Result { + let mut prover_storage = self + .prover_connection_pool + .access_storage() + .await + .context("failed to acquire DB connection for SchedulerWitnessGenerator")?; + prover_storage + .fri_witness_generator_dal() + .get_scheduler_witness_job_attempts(*job_id) + .await + .map(|attempts| attempts.unwrap_or(0)) + .context("failed to get job attempts for SchedulerWitnessGenerator") + } } pub async fn prepare_job( diff --git a/prover/witness_vector_generator/src/generator.rs b/prover/witness_vector_generator/src/generator.rs index 9851802a54b2..27e71b6cdec4 100644 --- a/prover/witness_vector_generator/src/generator.rs +++ b/prover/witness_vector_generator/src/generator.rs @@ -26,6 +26,7 @@ pub struct WitnessVectorGenerator { zone: String, config: FriWitnessVectorGeneratorConfig, vk_commitments: L1VerifierConfig, + max_attempts: u32, } impl WitnessVectorGenerator { @@ -36,6 +37,7 @@ impl WitnessVectorGenerator { zone: String, config: FriWitnessVectorGeneratorConfig, vk_commitments: L1VerifierConfig, + max_attempts: u32, ) -> Self { Self { blob_store, @@ -44,6 +46,7 @@ impl WitnessVectorGenerator { zone, config, vk_commitments, + max_attempts, } } @@ -166,6 +169,24 @@ impl JobProcessor for WitnessVectorGenerator { ); Ok(()) } + + fn max_attempts(&self) -> u32 { + self.max_attempts + } + + async fn get_job_attempts(&self, job_id: &u32) -> anyhow::Result { + let mut prover_storage = self + .pool + .access_storage() + .await + .context("failed to acquire DB connection for WitnessVectorGenerator")?; + prover_storage + .fri_prover_jobs_dal() + .get_prover_job_attempts(*job_id) + .await + .map(|attempts| attempts.unwrap_or(0)) + .context("failed to get job attempts for WitnessVectorGenerator") + } } async fn handle_send_result( diff --git a/prover/witness_vector_generator/src/main.rs b/prover/witness_vector_generator/src/main.rs index 16eeb660da38..bc6601d36e22 100644 --- a/prover/witness_vector_generator/src/main.rs +++ b/prover/witness_vector_generator/src/main.rs @@ -7,7 +7,7 @@ use tokio::{sync::oneshot, sync::watch}; use crate::generator::WitnessVectorGenerator; use zksync_config::configs::fri_prover_group::FriProverGroupConfig; -use zksync_config::configs::{FriWitnessVectorGeneratorConfig, ProverGroupConfig}; +use zksync_config::configs::{FriProverConfig, FriWitnessVectorGeneratorConfig, ProverGroupConfig}; use zksync_dal::connection::DbVariant; use zksync_dal::ConnectionPool; use zksync_env_config::{object_store::ProverObjectStoreConfig, FromEnv}; @@ -74,6 +74,7 @@ async fn main() -> anyhow::Result<()> { ProverGroupConfig::from_env().context("ProverGroupConfig::from_env()")?; let zone = get_zone(&prover_group_config).await.context("get_zone()")?; let vk_commitments = get_cached_commitments(); + let fri_prover_config = FriProverConfig::from_env().context("FriProverConfig::from_env()")?; let witness_vector_generator = WitnessVectorGenerator::new( blob_store, pool, @@ -81,6 +82,7 @@ async fn main() -> anyhow::Result<()> { zone.clone(), config, vk_commitments, + fri_prover_config.max_attempts, ); let (stop_sender, stop_receiver) = watch::channel(false);