diff --git a/core/lib/dal/src/proof_generation_dal.rs b/core/lib/dal/src/proof_generation_dal.rs index 040b4246604f..88300cf08a18 100644 --- a/core/lib/dal/src/proof_generation_dal.rs +++ b/core/lib/dal/src/proof_generation_dal.rs @@ -3,7 +3,9 @@ use std::time::Duration; use strum::{Display, EnumString}; use zksync_db_connection::{ - connection::Connection, error::DalResult, instrument::Instrumented, + connection::Connection, + error::DalResult, + instrument::{InstrumentExt, Instrumented}, utils::pg_interval_from_duration, }; use zksync_types::L1BatchNumber; @@ -110,13 +112,13 @@ impl ProofGenerationDal<'_, '_> { Ok(()) } + /// The caller should ensure that `l1_batch_number` exists in the database. pub async fn insert_proof_generation_details( &mut self, - block_number: L1BatchNumber, + l1_batch_number: L1BatchNumber, proof_gen_data_blob_url: &str, ) -> DalResult<()> { - let l1_batch_number = i64::from(block_number.0); - let query = sqlx::query!( + let result = sqlx::query!( r#" INSERT INTO proof_generation_details (l1_batch_number, status, proof_gen_data_blob_url, created_at, updated_at) @@ -124,25 +126,22 @@ impl ProofGenerationDal<'_, '_> { ($1, 'ready_to_be_proven', $2, NOW(), NOW()) ON CONFLICT (l1_batch_number) DO NOTHING "#, - l1_batch_number, + i64::from(l1_batch_number.0), proof_gen_data_blob_url, - ); - let instrumentation = Instrumented::new("insert_proof_generation_details") - .with_arg("l1_batch_number", &l1_batch_number) - .with_arg("proof_gen_data_blob_url", &proof_gen_data_blob_url); - let result = instrumentation - .clone() - .with(query) - .execute(self.storage) - .await?; + ) + .instrument("insert_proof_generation_details") + .with_arg("l1_batch_number", &l1_batch_number) + .with_arg("proof_gen_data_blob_url", &proof_gen_data_blob_url) + .report_latency() + .execute(self.storage) + .await?; + if result.rows_affected() == 0 { - let err = instrumentation.constraint_error(anyhow::anyhow!( - "Cannot save proof_blob_url for a batch number {} that does not exist", - l1_batch_number - )); - return Err(err); + // Not an error: we may call `insert_proof_generation_details()` from multiple full trees instantiated + // for the same node. Unlike tree data, we don't particularly care about correspondence of `proof_gen_data_blob_url` across calls, + // so just log this fact and carry on. + tracing::debug!("L1 batch #{l1_batch_number}: proof generation data wasn't updated as it's already present"); } - Ok(()) } @@ -229,3 +228,97 @@ impl ProofGenerationDal<'_, '_> { Ok(result) } } + +#[cfg(test)] +mod tests { + use zksync_types::ProtocolVersion; + + use super::*; + use crate::{tests::create_l1_batch_header, ConnectionPool, CoreDal}; + + #[tokio::test] + async fn proof_generation_workflow() { + let pool = ConnectionPool::::test_pool().await; + let mut conn = pool.connection().await.unwrap(); + + conn.protocol_versions_dal() + .save_protocol_version_with_tx(&ProtocolVersion::default()) + .await + .unwrap(); + conn.blocks_dal() + .insert_mock_l1_batch(&create_l1_batch_header(1)) + .await + .unwrap(); + + let unpicked_l1_batch = conn + .proof_generation_dal() + .get_oldest_unpicked_batch() + .await + .unwrap(); + assert_eq!(unpicked_l1_batch, None); + + conn.proof_generation_dal() + .insert_proof_generation_details(L1BatchNumber(1), "generation_data") + .await + .unwrap(); + + let unpicked_l1_batch = conn + .proof_generation_dal() + .get_oldest_unpicked_batch() + .await + .unwrap(); + assert_eq!(unpicked_l1_batch, Some(L1BatchNumber(1))); + + // Calling the method multiple times should work fine. + conn.proof_generation_dal() + .insert_proof_generation_details(L1BatchNumber(1), "generation_data") + .await + .unwrap(); + + let unpicked_l1_batch = conn + .proof_generation_dal() + .get_oldest_unpicked_batch() + .await + .unwrap(); + assert_eq!(unpicked_l1_batch, Some(L1BatchNumber(1))); + + let picked_l1_batch = conn + .proof_generation_dal() + .get_next_block_to_be_proven(Duration::MAX) + .await + .unwrap(); + assert_eq!(picked_l1_batch, Some(L1BatchNumber(1))); + let unpicked_l1_batch = conn + .proof_generation_dal() + .get_oldest_unpicked_batch() + .await + .unwrap(); + assert_eq!(unpicked_l1_batch, None); + + // Check that with small enough processing timeout, the L1 batch can be picked again + let picked_l1_batch = conn + .proof_generation_dal() + .get_next_block_to_be_proven(Duration::ZERO) + .await + .unwrap(); + assert_eq!(picked_l1_batch, Some(L1BatchNumber(1))); + + conn.proof_generation_dal() + .save_proof_artifacts_metadata(L1BatchNumber(1), "proof") + .await + .unwrap(); + + let picked_l1_batch = conn + .proof_generation_dal() + .get_next_block_to_be_proven(Duration::MAX) + .await + .unwrap(); + assert_eq!(picked_l1_batch, None); + let unpicked_l1_batch = conn + .proof_generation_dal() + .get_oldest_unpicked_batch() + .await + .unwrap(); + assert_eq!(unpicked_l1_batch, None); + } +} diff --git a/core/lib/db_connection/src/instrument.rs b/core/lib/db_connection/src/instrument.rs index e0728ce22b85..91f207838c3b 100644 --- a/core/lib/db_connection/src/instrument.rs +++ b/core/lib/db_connection/src/instrument.rs @@ -200,6 +200,21 @@ impl<'a> InstrumentedData<'a> { } } + fn observe_error(&self, err: &dyn fmt::Display) { + let InstrumentedData { + name, + location, + args, + .. + } = self; + tracing::warn!( + "Query {name}{args} called at {file}:{line} has resulted in error: {err}", + file = location.file(), + line = location.line() + ); + REQUEST_METRICS.request_error[name].inc(); + } + async fn fetch( self, connection_tags: Option<&ConnectionTags>, @@ -295,32 +310,40 @@ impl<'a> Instrumented<'a, ()> { } } - /// Wraps a provided argument validation error. + /// Wraps a provided argument validation error. It is assumed that the returned error + /// will be returned as an error cause (e.g., it is logged as an error and observed using metrics). + #[must_use] pub fn arg_error(&self, arg_name: &str, err: E) -> DalError where E: Into, { let err: anyhow::Error = err.into(); let err = err.context(format!("failed validating query argument `{arg_name}`")); - DalRequestError::new( + let err = DalRequestError::new( sqlx::Error::Decode(err.into()), self.data.name, self.data.location, ) - .with_args(self.data.args.to_owned()) - .into() + .with_args(self.data.args.to_owned()); + + self.data.observe_error(&err); + err.into() } - /// Wraps a provided application-level data constraint error. + /// Wraps a provided application-level data constraint error. It is assumed that the returned error + /// will be returned as an error cause (e.g., it is logged as an error and observed using metrics). + #[must_use] pub fn constraint_error(&self, err: anyhow::Error) -> DalError { let err = err.context("application-level data constraint violation"); - DalRequestError::new( + let err = DalRequestError::new( sqlx::Error::Decode(err.into()), self.data.name, self.data.location, ) - .with_args(self.data.args.to_owned()) - .into() + .with_args(self.data.args.to_owned()); + + self.data.observe_error(&err); + err.into() } pub fn with(self, query: Q) -> Instrumented<'a, Q> { diff --git a/core/node/metadata_calculator/src/updater.rs b/core/node/metadata_calculator/src/updater.rs index 2056b8315664..bfb6ad1912a0 100644 --- a/core/node/metadata_calculator/src/updater.rs +++ b/core/node/metadata_calculator/src/updater.rs @@ -145,8 +145,7 @@ impl TreeUpdater { storage .tee_verifier_input_producer_dal() .create_tee_verifier_input_producer_job(l1_batch_number) - .await - .expect("failed to create tee_verifier_input_producer job"); + .await?; // Save the proof generation details to Postgres storage .proof_generation_dal()