Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(basic_witness_input_producer): Witness inputs queued after BWIP run #345

Merged
merged 2 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 27 additions & 15 deletions core/lib/dal/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -244,21 +244,6 @@
},
"query": "\n WITH events_select AS (\n SELECT\n address, topic1, topic2, topic3, topic4, value,\n miniblock_number, tx_hash, tx_index_in_block,\n event_index_in_block, event_index_in_tx\n FROM events\n WHERE miniblock_number > $1\n ORDER BY miniblock_number ASC, event_index_in_block ASC\n )\n SELECT miniblocks.hash as \"block_hash?\",\n address as \"address!\", topic1 as \"topic1!\", topic2 as \"topic2!\", topic3 as \"topic3!\", topic4 as \"topic4!\", value as \"value!\",\n miniblock_number as \"miniblock_number!\", miniblocks.l1_batch_number as \"l1_batch_number?\", tx_hash as \"tx_hash!\",\n tx_index_in_block as \"tx_index_in_block!\", event_index_in_block as \"event_index_in_block!\", event_index_in_tx as \"event_index_in_tx!\"\n FROM events_select\n INNER JOIN miniblocks ON events_select.miniblock_number = miniblocks.number\n ORDER BY miniblock_number ASC, event_index_in_block ASC\n "
},
"073582051133075adfc51a18d15639129dd00628aa4994b602843ac979ad4419": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": [
"Int8",
"Bytea",
"Text",
"Int4"
]
}
},
"query": "INSERT INTO witness_inputs(l1_batch_number, merkle_tree_paths, merkel_tree_paths_blob_url, status, protocol_version, created_at, updated_at) VALUES ($1, $2, $3, 'queued', $4, now(), now())\n ON CONFLICT (l1_batch_number) DO NOTHING"
},
"073d304fe756940303f00b514ef1e24036a1d3d3c3c7fb204b484f681a3520d7": {
"describe": {
"columns": [],
Expand Down Expand Up @@ -5437,6 +5422,18 @@
},
"query": "INSERT INTO commitments (l1_batch_number, events_queue_commitment, bootloader_initial_content_commitment) VALUES ($1, $2, $3) ON CONFLICT (l1_batch_number) DO UPDATE SET events_queue_commitment = $2, bootloader_initial_content_commitment = $3"
},
"694f1d154f3f38b123d8f845fef6e876d35dc3743f1c5b69dce6be694e5e726c": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": [
"Int8"
]
}
},
"query": "UPDATE witness_inputs SET status='queued' WHERE l1_batch_number=$1 AND status='waiting_for_artifacts'"
},
"697835cdd5be1b99a0f332c4c8f3245e317b0282b46e55f15e728a7642382b25": {
"describe": {
"columns": [
Expand Down Expand Up @@ -7272,6 +7269,21 @@
},
"query": "SELECT number, timestamp, is_finished, l1_tx_count, l2_tx_count, fee_account_address, bloom, priority_ops_onchain_data, hash, parent_hash, commitment, compressed_write_logs, compressed_contracts, eth_prove_tx_id, eth_commit_tx_id, eth_execute_tx_id, merkle_root_hash, l2_to_l1_logs, l2_to_l1_messages, used_contract_hashes, compressed_initial_writes, compressed_repeated_writes, l2_l1_compressed_messages, l2_l1_merkle_root, l1_gas_price, l2_fair_gas_price, rollup_last_leaf_index, zkporter_is_available, bootloader_code_hash, default_aa_code_hash, base_fee_per_gas, aux_data_hash, pass_through_data_hash, meta_parameters_hash, protocol_version, compressed_state_diffs, system_logs, events_queue_commitment, bootloader_initial_content_commitment FROM l1_batches LEFT JOIN commitments ON commitments.l1_batch_number = l1_batches.number WHERE eth_prove_tx_id IS NOT NULL AND eth_execute_tx_id IS NULL ORDER BY number LIMIT $1"
},
"8ff9d76b4791af1177231661847b6c8879ad625fd11c15de51a16c81d8712129": {
"describe": {
"columns": [],
"nullable": [],
"parameters": {
"Left": [
"Int8",
"Bytea",
"Text",
"Int4"
]
}
},
"query": "INSERT INTO witness_inputs(l1_batch_number, merkle_tree_paths, merkel_tree_paths_blob_url, status, protocol_version, created_at, updated_at) VALUES ($1, $2, $3, 'waiting_for_artifacts', $4, now(), now()) ON CONFLICT (l1_batch_number) DO NOTHING"
},
"9008367aad7877f269b765c4d0772d0f60689fcde6987c620fe5749a259a8db7": {
"describe": {
"columns": [
Expand Down
15 changes: 14 additions & 1 deletion core/lib/dal/src/witness_generator_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ impl WitnessGeneratorDal<'_, '_> {
{
sqlx::query!(
"INSERT INTO witness_inputs(l1_batch_number, merkle_tree_paths, merkel_tree_paths_blob_url, status, protocol_version, created_at, updated_at) \
VALUES ($1, $2, $3, 'queued', $4, now(), now())
VALUES ($1, $2, $3, 'waiting_for_artifacts', $4, now(), now()) \
EmilLuta marked this conversation as resolved.
Show resolved Hide resolved
ON CONFLICT (l1_batch_number) DO NOTHING",
block_number.0 as i64,
// TODO(SMA-1476): remove the below column once blob is migrated to GCS.
Expand All @@ -742,6 +742,19 @@ impl WitnessGeneratorDal<'_, '_> {
}
}

pub async fn mark_witness_inputs_job_as_queued(&mut self, block_number: L1BatchNumber) {
sqlx::query!(
"UPDATE witness_inputs \
SET status='queued' \
WHERE l1_batch_number=$1 \
AND status='waiting_for_artifacts'",
block_number.0 as i64,
)
.execute(self.storage.conn())
.await
.unwrap();
}

pub async fn get_basic_circuit_and_circuit_inputs_blob_urls_to_be_cleaned(
&mut self,
limit: u8,
Expand Down
14 changes: 13 additions & 1 deletion core/lib/zksync_core/src/basic_witness_input_producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,23 @@ impl JobProcessor for BasicWitnessInputProducer {
.access_storage()
.await
.context("failed to acquire DB connection for BasicWitnessInputProducer")?;
connection
let mut transaction = connection
.start_transaction()
.await
.context("failed to acquire DB transaction for BasicWitnessInputProducer")?;
transaction
.basic_witness_input_producer_dal()
.mark_job_as_successful(job_id, started_at, &object_path)
.await
.context("failed to mark job as successful for BasicWitnessInputProducer")?;
transaction
.witness_generator_dal()
.mark_witness_inputs_job_as_queued(job_id)
.await;
transaction
.commit()
.await
.context("failed to commit DB transaction for BasicWitnessInputProducer")?;
METRICS.block_number_processed.set(job_id.0 as i64);
Ok(())
}
Expand Down