Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(proof_data_handler): TEE blob fetching error handling #2674

Merged
merged 11 commits into from
Aug 26, 2024
2 changes: 1 addition & 1 deletion core/bin/zksync_tee_prover/src/tee_prover.rs
Original file line number Diff line number Diff line change
@@ -201,8 +201,8 @@ impl Task for TeeProver {
if !err.is_retriable() || retries > self.config.max_retries {
return Err(err.into());
}
retries += 1;
tracing::warn!(%err, "Failed TEE prover step function {retries}/{}, retrying in {} milliseconds.", self.config.max_retries, backoff.as_millis());
retries += 1;
backoff = std::cmp::min(
backoff.mul_f32(self.config.retry_backoff_multiplier),
self.config.max_backoff,

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions core/lib/dal/doc/TeeProofGenerationDal.md
Original file line number Diff line number Diff line change
@@ -12,8 +12,7 @@ title: Status Diagram
---
stateDiagram-v2
[*] --> ready_to_be_proven : insert_tee_proof_generation_job
ready_to_be_proven --> picked_by_prover : get_next_batch_to_be_proven
ready_to_be_proven --> picked_by_prover : lock_batch_for_proving
picked_by_prover --> generated : save_proof_artifacts_metadata
generated --> [*]

```
70 changes: 56 additions & 14 deletions core/lib/dal/src/tee_proof_generation_dal.rs
Original file line number Diff line number Diff line change
@@ -2,7 +2,9 @@
use std::time::Duration;

use zksync_db_connection::{
connection::Connection, error::DalResult, instrument::Instrumented,
connection::Connection,
error::DalResult,
instrument::{InstrumentExt, Instrumented},
utils::pg_interval_from_duration,
};
use zksync_types::{tee_types::TeeType, L1BatchNumber};
@@ -17,14 +19,23 @@ pub struct TeeProofGenerationDal<'a, 'c> {
pub(crate) storage: &'a mut Connection<'c, Core>,
}

#[derive(Debug, Clone, sqlx::FromRow)]
pub struct L1BatchNumberModel {
pub l1_batch_number: i64,
}

impl TeeProofGenerationDal<'_, '_> {
pub async fn get_next_batch_to_be_proven(
pub async fn lock_batch_for_proving(
&mut self,
tee_type: TeeType,
processing_timeout: Duration,
min_batch_number: Option<L1BatchNumber>,
) -> DalResult<Option<L1BatchNumber>> {
let processing_timeout = pg_interval_from_duration(processing_timeout);
let query = sqlx::query!(
let min_batch_condition = min_batch_number.map_or(String::new(), |min_batch| {
pbeza marked this conversation as resolved.
Show resolved Hide resolved
format!("AND proofs.l1_batch_number >= {}", min_batch.0)
});
let query_str = format!(
r#"
UPDATE tee_proof_generation_details
SET
@@ -48,6 +59,7 @@ impl TeeProofGenerationDal<'_, '_> {
AND proofs.prover_taken_at < NOW() - $3::INTERVAL
)
)
{min_batch_condition}
ORDER BY
l1_batch_number ASC
LIMIT
@@ -58,19 +70,49 @@ impl TeeProofGenerationDal<'_, '_> {
RETURNING
tee_proof_generation_details.l1_batch_number
"#,
&tee_type.to_string(),
TeeVerifierInputProducerJobStatus::Successful as TeeVerifierInputProducerJobStatus,
&processing_timeout,
min_batch_condition = min_batch_condition
);
let batch_number = Instrumented::new("get_next_batch_to_be_proven")
.with_arg("tee_type", &tee_type)
.with_arg("processing_timeout", &processing_timeout)
.with(query)
.fetch_optional(self.storage)
.await?
.map(|row| L1BatchNumber(row.l1_batch_number as u32));

Ok(batch_number)
let tee_type_str = tee_type.to_string();
let query = sqlx::query_as(&query_str)
.bind(&tee_type_str)
.bind(
TeeVerifierInputProducerJobStatus::Successful as TeeVerifierInputProducerJobStatus,
)
.bind(&processing_timeout);

let batch_number: Option<L1BatchNumberModel> =
query.fetch_optional(self.storage.conn()).await.unwrap();

Ok(batch_number.map(|row| L1BatchNumber(row.l1_batch_number as u32)))
}

pub async fn unlock_batch(
&mut self,
l1_batch_number: L1BatchNumber,
tee_type: TeeType,
) -> DalResult<()> {
let batch_number = i64::from(l1_batch_number.0);
sqlx::query!(
r#"
UPDATE tee_proof_generation_details
SET
status = 'unpicked',
updated_at = NOW()
WHERE
l1_batch_number = $1
AND tee_type = $2
"#,
batch_number,
tee_type.to_string()
)
.instrument("unlock_batch")
.with_arg("l1_batch_number", &batch_number)
.with_arg("tee_type", &tee_type)
.execute(self.storage)
.await?;

Ok(())
}

pub async fn save_proof_artifacts_metadata(
92 changes: 64 additions & 28 deletions core/node/proof_data_handler/src/tee_request_processor.rs
Original file line number Diff line number Diff line change
@@ -3,15 +3,12 @@ use std::sync::Arc;
use axum::{extract::Path, Json};
use zksync_config::configs::ProofDataHandlerConfig;
use zksync_dal::{ConnectionPool, Core, CoreDal};
use zksync_object_store::ObjectStore;
use zksync_prover_interface::{
api::{
RegisterTeeAttestationRequest, RegisterTeeAttestationResponse, SubmitProofResponse,
SubmitTeeProofRequest, TeeProofGenerationDataRequest, TeeProofGenerationDataResponse,
},
inputs::TeeVerifierInput,
use zksync_object_store::{ObjectStore, ObjectStoreError};
use zksync_prover_interface::api::{
RegisterTeeAttestationRequest, RegisterTeeAttestationResponse, SubmitProofResponse,
SubmitTeeProofRequest, TeeProofGenerationDataRequest, TeeProofGenerationDataResponse,
};
use zksync_types::L1BatchNumber;
use zksync_types::{tee_types::TeeType, L1BatchNumber};

use crate::errors::RequestProcessorError;

@@ -41,32 +38,71 @@ impl TeeRequestProcessor {
) -> Result<Json<TeeProofGenerationDataResponse>, RequestProcessorError> {
tracing::info!("Received request for proof generation data: {:?}", request);

let mut connection = self
.pool
let mut min_batch_number: Option<L1BatchNumber> = None;

loop {
let l1_batch_number = match self
.lock_batch_for_proving(request.tee_type, min_batch_number)
.await?
{
Some(number) => number,
None => return Ok(Json(TeeProofGenerationDataResponse(None))),
};

match self.blob_store.get(l1_batch_number).await {
Ok(input) => {
return Ok(Json(TeeProofGenerationDataResponse(Some(Box::new(input)))));
}
Err(ObjectStoreError::KeyNotFound(_)) => {
tracing::warn!(
"Blob for batch number {} has not been found in the object store.",
l1_batch_number
);
self.unlock_batch(l1_batch_number, request.tee_type).await?;
min_batch_number =
min_batch_number.map_or(Some(l1_batch_number + 1), |num| Some(num + 1));
pbeza marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
Err(err) => {
self.unlock_batch(l1_batch_number, request.tee_type).await?;
return Err(RequestProcessorError::ObjectStore(err));
}
}
}
}

async fn lock_batch_for_proving(
&self,
tee_type: TeeType,
min_batch_number: Option<L1BatchNumber>,
) -> Result<Option<L1BatchNumber>, RequestProcessorError> {
self.pool
.connection()
.await
.map_err(RequestProcessorError::Dal)?;

let l1_batch_number_result = connection
.map_err(RequestProcessorError::Dal)?
.tee_proof_generation_dal()
.get_next_batch_to_be_proven(request.tee_type, self.config.proof_generation_timeout())
.lock_batch_for_proving(
tee_type,
self.config.proof_generation_timeout(),
min_batch_number,
)
.await
.map_err(RequestProcessorError::Dal)?;

let l1_batch_number = match l1_batch_number_result {
Some(number) => number,
None => return Ok(Json(TeeProofGenerationDataResponse(None))),
};
.map_err(RequestProcessorError::Dal)
pbeza marked this conversation as resolved.
Show resolved Hide resolved
}

let tee_verifier_input: TeeVerifierInput = self
.blob_store
.get(l1_batch_number)
async fn unlock_batch(
&self,
l1_batch_number: L1BatchNumber,
tee_type: TeeType,
) -> Result<(), RequestProcessorError> {
self.pool
.connection()
.await
.map_err(RequestProcessorError::ObjectStore)?;

let response = TeeProofGenerationDataResponse(Some(Box::new(tee_verifier_input)));

Ok(Json(response))
.map_err(RequestProcessorError::Dal)?
.tee_proof_generation_dal()
.unlock_batch(l1_batch_number, tee_type)
.await
.map_err(RequestProcessorError::Dal)
}

pub(crate) async fn submit_proof(