From dd2fb03cc3165c6c919cc2bcaa258e566b06854e Mon Sep 17 00:00:00 2001 From: Emil Date: Sat, 28 Oct 2023 17:56:58 +0200 Subject: [PATCH] feat(basic_witness_input_producer): Witness inputs queued after BWIP run --- core/lib/dal/sqlx-data.json | 42 ++++++++++++------- core/lib/dal/src/witness_generator_dal.rs | 15 ++++++- .../src/basic_witness_input_producer/mod.rs | 14 ++++++- 3 files changed, 54 insertions(+), 17 deletions(-) diff --git a/core/lib/dal/sqlx-data.json b/core/lib/dal/sqlx-data.json index 7fc1026a3d5a..d1a6b77a9142 100644 --- a/core/lib/dal/sqlx-data.json +++ b/core/lib/dal/sqlx-data.json @@ -244,21 +244,6 @@ }, "query": "\n WITH events_select AS (\n SELECT\n address, topic1, topic2, topic3, topic4, value,\n miniblock_number, tx_hash, tx_index_in_block,\n event_index_in_block, event_index_in_tx\n FROM events\n WHERE miniblock_number > $1\n ORDER BY miniblock_number ASC, event_index_in_block ASC\n )\n SELECT miniblocks.hash as \"block_hash?\",\n address as \"address!\", topic1 as \"topic1!\", topic2 as \"topic2!\", topic3 as \"topic3!\", topic4 as \"topic4!\", value as \"value!\",\n miniblock_number as \"miniblock_number!\", miniblocks.l1_batch_number as \"l1_batch_number?\", tx_hash as \"tx_hash!\",\n tx_index_in_block as \"tx_index_in_block!\", event_index_in_block as \"event_index_in_block!\", event_index_in_tx as \"event_index_in_tx!\"\n FROM events_select\n INNER JOIN miniblocks ON events_select.miniblock_number = miniblocks.number\n ORDER BY miniblock_number ASC, event_index_in_block ASC\n " }, - "073582051133075adfc51a18d15639129dd00628aa4994b602843ac979ad4419": { - "describe": { - "columns": [], - "nullable": [], - "parameters": { - "Left": [ - "Int8", - "Bytea", - "Text", - "Int4" - ] - } - }, - "query": "INSERT INTO witness_inputs(l1_batch_number, merkle_tree_paths, merkel_tree_paths_blob_url, status, protocol_version, created_at, updated_at) VALUES ($1, $2, $3, 'queued', $4, now(), now())\n ON CONFLICT (l1_batch_number) DO NOTHING" - }, "073d304fe756940303f00b514ef1e24036a1d3d3c3c7fb204b484f681a3520d7": { "describe": { "columns": [], @@ -5153,6 +5138,18 @@ }, "query": "INSERT INTO commitments (l1_batch_number, events_queue_commitment, bootloader_initial_content_commitment) VALUES ($1, $2, $3) ON CONFLICT (l1_batch_number) DO UPDATE SET events_queue_commitment = $2, bootloader_initial_content_commitment = $3" }, + "694f1d154f3f38b123d8f845fef6e876d35dc3743f1c5b69dce6be694e5e726c": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [ + "Int8" + ] + } + }, + "query": "UPDATE witness_inputs SET status='queued' WHERE l1_batch_number=$1 AND status='waiting_for_artifacts'" + }, "697835cdd5be1b99a0f332c4c8f3245e317b0282b46e55f15e728a7642382b25": { "describe": { "columns": [ @@ -7021,6 +7018,21 @@ }, "query": "SELECT number, timestamp, is_finished, l1_tx_count, l2_tx_count, fee_account_address, bloom, priority_ops_onchain_data, hash, parent_hash, commitment, compressed_write_logs, compressed_contracts, eth_prove_tx_id, eth_commit_tx_id, eth_execute_tx_id, merkle_root_hash, l2_to_l1_logs, l2_to_l1_messages, used_contract_hashes, compressed_initial_writes, compressed_repeated_writes, l2_l1_compressed_messages, l2_l1_merkle_root, l1_gas_price, l2_fair_gas_price, rollup_last_leaf_index, zkporter_is_available, bootloader_code_hash, default_aa_code_hash, base_fee_per_gas, aux_data_hash, pass_through_data_hash, meta_parameters_hash, protocol_version, compressed_state_diffs, system_logs, events_queue_commitment, bootloader_initial_content_commitment FROM l1_batches LEFT JOIN commitments ON commitments.l1_batch_number = l1_batches.number WHERE eth_prove_tx_id IS NOT NULL AND eth_execute_tx_id IS NULL ORDER BY number LIMIT $1" }, + "8ff9d76b4791af1177231661847b6c8879ad625fd11c15de51a16c81d8712129": { + "describe": { + "columns": [], + "nullable": [], + "parameters": { + "Left": [ + "Int8", + "Bytea", + "Text", + "Int4" + ] + } + }, + "query": "INSERT INTO witness_inputs(l1_batch_number, merkle_tree_paths, merkel_tree_paths_blob_url, status, protocol_version, created_at, updated_at) VALUES ($1, $2, $3, 'waiting_for_artifacts', $4, now(), now()) ON CONFLICT (l1_batch_number) DO NOTHING" + }, "9008367aad7877f269b765c4d0772d0f60689fcde6987c620fe5749a259a8db7": { "describe": { "columns": [ diff --git a/core/lib/dal/src/witness_generator_dal.rs b/core/lib/dal/src/witness_generator_dal.rs index 071412eb9812..6f926e94f952 100644 --- a/core/lib/dal/src/witness_generator_dal.rs +++ b/core/lib/dal/src/witness_generator_dal.rs @@ -728,7 +728,7 @@ impl WitnessGeneratorDal<'_, '_> { { sqlx::query!( "INSERT INTO witness_inputs(l1_batch_number, merkle_tree_paths, merkel_tree_paths_blob_url, status, protocol_version, created_at, updated_at) \ - VALUES ($1, $2, $3, 'queued', $4, now(), now()) + VALUES ($1, $2, $3, 'waiting_for_artifacts', $4, now(), now()) \ ON CONFLICT (l1_batch_number) DO NOTHING", block_number.0 as i64, // TODO(SMA-1476): remove the below column once blob is migrated to GCS. @@ -742,6 +742,19 @@ impl WitnessGeneratorDal<'_, '_> { } } + pub async fn mark_witness_inputs_job_as_queued(&mut self, block_number: L1BatchNumber) { + sqlx::query!( + "UPDATE witness_inputs \ + SET status='queued' \ + WHERE l1_batch_number=$1 \ + AND status='waiting_for_artifacts'", + block_number.0 as i64, + ) + .execute(self.storage.conn()) + .await + .unwrap(); + } + pub async fn get_basic_circuit_and_circuit_inputs_blob_urls_to_be_cleaned( &mut self, limit: u8, diff --git a/core/lib/zksync_core/src/basic_witness_input_producer/mod.rs b/core/lib/zksync_core/src/basic_witness_input_producer/mod.rs index 233c5e93ba31..7f243fd6654a 100644 --- a/core/lib/zksync_core/src/basic_witness_input_producer/mod.rs +++ b/core/lib/zksync_core/src/basic_witness_input_producer/mod.rs @@ -183,11 +183,23 @@ impl JobProcessor for BasicWitnessInputProducer { .access_storage() .await .context("failed to acquire DB connection for BasicWitnessInputProducer")?; - connection + let mut transaction = connection + .start_transaction() + .await + .context("failed to acquire DB transaction for BasicWitnessInputProducer")?; + transaction .basic_witness_input_producer_dal() .mark_job_as_successful(job_id, started_at, &object_path) .await .context("failed to mark job as successful for BasicWitnessInputProducer")?; + transaction + .witness_generator_dal() + .mark_witness_inputs_job_as_queued(job_id) + .await; + transaction + .commit() + .await + .context("failed to commit DB transaction for BasicWitnessInputProducer")?; METRICS.block_number_processed.set(job_id.0 as i64); Ok(()) }