From 7c336b1e180b9d5ba1ba74169c61ce27a251e2fc Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Fri, 26 Jul 2024 08:15:43 +0400 Subject: [PATCH] fix(proof_data_handler): Unlock jobs on transient errors (#2486) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Currently, proof data handler locks the job for proof generation, and starts fetching required data after that. If any error happens during fetching, the method will err, and the job will remain locked. This PR changes it, so that if any error occurs, we unlock the job before we return an error. Additionally, it reduces the amount of non-necessary panics in the touched code, and adds some docs. ## Why ❔ Correctness & efficiency. ## Checklist - [ ] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [ ] Tests for the changes have been added / updated. - [ ] Documentation comments have been added / updated. - [ ] Code has been formatted via `zk fmt` and `zk lint`. --- ...f8fbd8fdaf23266412e2faffb7e3813213b98.json | 14 +++ core/lib/dal/src/proof_generation_dal.rs | 57 ++++++++-- .../src/request_processor.rs | 104 +++++++++++------- 3 files changed, 131 insertions(+), 44 deletions(-) create mode 100644 core/lib/dal/.sqlx/query-a23d63b7c4264ee0f5b60c09f09f8fbd8fdaf23266412e2faffb7e3813213b98.json diff --git a/core/lib/dal/.sqlx/query-a23d63b7c4264ee0f5b60c09f09f8fbd8fdaf23266412e2faffb7e3813213b98.json b/core/lib/dal/.sqlx/query-a23d63b7c4264ee0f5b60c09f09f8fbd8fdaf23266412e2faffb7e3813213b98.json new file mode 100644 index 000000000000..257ce7050619 --- /dev/null +++ b/core/lib/dal/.sqlx/query-a23d63b7c4264ee0f5b60c09f09f8fbd8fdaf23266412e2faffb7e3813213b98.json @@ -0,0 +1,14 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE proof_generation_details\n SET\n status = 'unpicked',\n updated_at = NOW()\n WHERE\n l1_batch_number = $1\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [] + }, + "hash": "a23d63b7c4264ee0f5b60c09f09f8fbd8fdaf23266412e2faffb7e3813213b98" +} diff --git a/core/lib/dal/src/proof_generation_dal.rs b/core/lib/dal/src/proof_generation_dal.rs index cf1437ff411c..4e37cc644f8e 100644 --- a/core/lib/dal/src/proof_generation_dal.rs +++ b/core/lib/dal/src/proof_generation_dal.rs @@ -30,7 +30,14 @@ enum ProofGenerationJobStatus { } impl ProofGenerationDal<'_, '_> { - pub async fn get_next_block_to_be_proven( + /// Chooses the batch number so that it has all the necessary data to generate the proof + /// and is not already picked. + /// + /// Marks the batch as picked by the prover, preventing it from being picked twice. + /// + /// The batch can be unpicked either via a corresponding DAL method, or it is considered + /// not picked after `processing_timeout` passes. + pub async fn lock_batch_for_proving( &mut self, processing_timeout: Duration, ) -> DalResult> { @@ -72,14 +79,38 @@ impl ProofGenerationDal<'_, '_> { "#, &processing_timeout, ) - .fetch_optional(self.storage.conn()) - .await - .unwrap() + .instrument("lock_batch_for_proving") + .with_arg("processing_timeout", &processing_timeout) + .fetch_optional(self.storage) + .await? .map(|row| L1BatchNumber(row.l1_batch_number as u32)); Ok(result) } + /// Marks a previously locked batch as 'unpicked', allowing it to be picked without having + /// to wait for the processing timeout. + pub async fn unlock_batch(&mut self, l1_batch_number: L1BatchNumber) -> DalResult<()> { + let batch_number = i64::from(l1_batch_number.0); + sqlx::query!( + r#" + UPDATE proof_generation_details + SET + status = 'unpicked', + updated_at = NOW() + WHERE + l1_batch_number = $1 + "#, + batch_number, + ) + .instrument("unlock_batch") + .with_arg("l1_batch_number", &l1_batch_number) + .execute(self.storage) + .await?; + + Ok(()) + } + pub async fn save_proof_artifacts_metadata( &mut self, batch_number: L1BatchNumber, @@ -388,7 +419,7 @@ mod tests { let picked_l1_batch = conn .proof_generation_dal() - .get_next_block_to_be_proven(Duration::MAX) + .lock_batch_for_proving(Duration::MAX) .await .unwrap(); assert_eq!(picked_l1_batch, Some(L1BatchNumber(1))); @@ -399,10 +430,22 @@ mod tests { .unwrap(); assert_eq!(unpicked_l1_batch, None); + // Check that we can unlock the batch and then pick it again. + conn.proof_generation_dal() + .unlock_batch(L1BatchNumber(1)) + .await + .unwrap(); + let picked_l1_batch = conn + .proof_generation_dal() + .lock_batch_for_proving(Duration::MAX) + .await + .unwrap(); + assert_eq!(picked_l1_batch, Some(L1BatchNumber(1))); + // Check that with small enough processing timeout, the L1 batch can be picked again let picked_l1_batch = conn .proof_generation_dal() - .get_next_block_to_be_proven(Duration::ZERO) + .lock_batch_for_proving(Duration::ZERO) .await .unwrap(); assert_eq!(picked_l1_batch, Some(L1BatchNumber(1))); @@ -414,7 +457,7 @@ mod tests { let picked_l1_batch = conn .proof_generation_dal() - .get_next_block_to_be_proven(Duration::MAX) + .lock_batch_for_proving(Duration::MAX) .await .unwrap(); assert_eq!(picked_l1_batch, None); diff --git a/core/node/proof_data_handler/src/request_processor.rs b/core/node/proof_data_handler/src/request_processor.rs index a89f9b63a848..11d0aebfa806 100644 --- a/core/node/proof_data_handler/src/request_processor.rs +++ b/core/node/proof_data_handler/src/request_processor.rs @@ -51,21 +51,64 @@ impl RequestProcessor { ) -> Result, RequestProcessorError> { tracing::info!("Received request for proof generation data: {:?}", request); - let l1_batch_number_result = self - .pool + let l1_batch_number = match self.lock_batch_for_proving().await? { + Some(number) => number, + None => return Ok(Json(ProofGenerationDataResponse::Success(None))), // no batches pending to be proven + }; + + let proof_generation_data = self + .proof_generation_data_for_existing_batch(l1_batch_number) + .await; + + // If we weren't able to fetch all the data, we should unlock the batch before returning. + match proof_generation_data { + Ok(data) => Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new( + data, + ))))), + Err(err) => { + self.unlock_batch(l1_batch_number).await?; + Err(err) + } + } + } + + /// Will choose a batch that has all the required data and isn't picked up by any prover yet. + async fn lock_batch_for_proving(&self) -> Result, RequestProcessorError> { + self.pool .connection() .await - .unwrap() + .map_err(RequestProcessorError::Dal)? .proof_generation_dal() - .get_next_block_to_be_proven(self.config.proof_generation_timeout()) + .lock_batch_for_proving(self.config.proof_generation_timeout()) .await - .map_err(RequestProcessorError::Dal)?; + .map_err(RequestProcessorError::Dal) + } - let l1_batch_number = match l1_batch_number_result { - Some(number) => number, - None => return Ok(Json(ProofGenerationDataResponse::Success(None))), // no batches pending to be proven - }; + /// Marks the batch as 'unpicked', allowing it to be picked up by another prover. + async fn unlock_batch( + &self, + l1_batch_number: L1BatchNumber, + ) -> Result<(), RequestProcessorError> { + self.pool + .connection() + .await + .map_err(RequestProcessorError::Dal)? + .proof_generation_dal() + .unlock_batch(l1_batch_number) + .await + .map_err(RequestProcessorError::Dal) + } + /// Will fetch all the required data for the batch and return it. + /// + /// ## Panics + /// + /// Expects all the data to be present in the database. + /// Will panic if any of the required data is missing. + async fn proof_generation_data_for_existing_batch( + &self, + l1_batch_number: L1BatchNumber, + ) -> Result { let vm_run_data: VMRunWitnessInputData = self .blob_store .get(l1_batch_number) @@ -77,52 +120,43 @@ impl RequestProcessor { .await .map_err(RequestProcessorError::ObjectStore)?; - let previous_batch_metadata = self + // Acquire connection after interacting with GCP, to avoid holding the connection for too long. + let mut conn = self .pool .connection() .await - .unwrap() + .map_err(RequestProcessorError::Dal)?; + + let previous_batch_metadata = conn .blocks_dal() .get_l1_batch_metadata(L1BatchNumber(l1_batch_number.checked_sub(1).unwrap())) .await - .unwrap() + .map_err(RequestProcessorError::Dal)? .expect("No metadata for previous batch"); - let header = self - .pool - .connection() - .await - .unwrap() + let header = conn .blocks_dal() .get_l1_batch_header(l1_batch_number) .await - .unwrap() + .map_err(RequestProcessorError::Dal)? .unwrap_or_else(|| panic!("Missing header for {}", l1_batch_number)); let minor_version = header.protocol_version.unwrap(); - let protocol_version = self - .pool - .connection() - .await - .unwrap() + let protocol_version = conn .protocol_versions_dal() .get_protocol_version_with_latest_patch(minor_version) .await - .unwrap() + .map_err(RequestProcessorError::Dal)? .unwrap_or_else(|| { panic!("Missing l1 verifier info for protocol version {minor_version}") }); - let batch_header = self - .pool - .connection() - .await - .unwrap() + let batch_header = conn .blocks_dal() .get_l1_batch_header(l1_batch_number) .await - .unwrap() - .unwrap(); + .map_err(RequestProcessorError::Dal)? + .unwrap_or_else(|| panic!("Missing header for {}", l1_batch_number)); let eip_4844_blobs = match self.commitment_mode { L1BatchCommitmentMode::Validium => Eip4844Blobs::empty(), @@ -149,16 +183,12 @@ impl RequestProcessor { METRICS.observe_blob_sizes(&blob); - let proof_gen_data = ProofGenerationData { + Ok(ProofGenerationData { l1_batch_number, witness_input_data: blob, protocol_version: protocol_version.version, l1_verifier_config: protocol_version.l1_verifier_config, - }; - - Ok(Json(ProofGenerationDataResponse::Success(Some(Box::new( - proof_gen_data, - ))))) + }) } pub(crate) async fn submit_proof(