Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(db): Fix insert_proof_generation_details() #2291

Merged
merged 4 commits into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 113 additions & 20 deletions core/lib/dal/src/proof_generation_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::time::Duration;

use strum::{Display, EnumString};
use zksync_db_connection::{
connection::Connection, error::DalResult, instrument::Instrumented,
connection::Connection,
error::DalResult,
instrument::{InstrumentExt, Instrumented},
utils::pg_interval_from_duration,
};
use zksync_types::L1BatchNumber;
Expand Down Expand Up @@ -110,39 +112,36 @@ impl ProofGenerationDal<'_, '_> {
Ok(())
}

/// The caller should ensure that `l1_batch_number` exists in the database.
pub async fn insert_proof_generation_details(
&mut self,
block_number: L1BatchNumber,
l1_batch_number: L1BatchNumber,
proof_gen_data_blob_url: &str,
) -> DalResult<()> {
let l1_batch_number = i64::from(block_number.0);
let query = sqlx::query!(
let result = sqlx::query!(
r#"
INSERT INTO
proof_generation_details (l1_batch_number, status, proof_gen_data_blob_url, created_at, updated_at)
VALUES
($1, 'ready_to_be_proven', $2, NOW(), NOW())
ON CONFLICT (l1_batch_number) DO NOTHING
"#,
l1_batch_number,
i64::from(l1_batch_number.0),
proof_gen_data_blob_url,
);
let instrumentation = Instrumented::new("insert_proof_generation_details")
.with_arg("l1_batch_number", &l1_batch_number)
.with_arg("proof_gen_data_blob_url", &proof_gen_data_blob_url);
let result = instrumentation
.clone()
.with(query)
.execute(self.storage)
.await?;
)
.instrument("insert_proof_generation_details")
.with_arg("l1_batch_number", &l1_batch_number)
.with_arg("proof_gen_data_blob_url", &proof_gen_data_blob_url)
.report_latency()
.execute(self.storage)
.await?;

if result.rows_affected() == 0 {
let err = instrumentation.constraint_error(anyhow::anyhow!(
"Cannot save proof_blob_url for a batch number {} that does not exist",
l1_batch_number
));
return Err(err);
// Not an error: we may call `insert_proof_generation_details()` from multiple full trees instantiated
// for the same node. Unlike tree data, we don't particularly care about correspondence of `proof_gen_data_blob_url` across calls,
// so just log this fact and carry on.
tracing::debug!("L1 batch #{l1_batch_number}: proof generation data wasn't updated as it's already present");
}

Ok(())
}

Expand Down Expand Up @@ -229,3 +228,97 @@ impl ProofGenerationDal<'_, '_> {
Ok(result)
}
}

#[cfg(test)]
mod tests {
use zksync_types::ProtocolVersion;

use super::*;
use crate::{tests::create_l1_batch_header, ConnectionPool, CoreDal};

#[tokio::test]
async fn proof_generation_workflow() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut conn = pool.connection().await.unwrap();

conn.protocol_versions_dal()
.save_protocol_version_with_tx(&ProtocolVersion::default())
.await
.unwrap();
conn.blocks_dal()
.insert_mock_l1_batch(&create_l1_batch_header(1))
.await
.unwrap();

let unpicked_l1_batch = conn
.proof_generation_dal()
.get_oldest_unpicked_batch()
.await
.unwrap();
assert_eq!(unpicked_l1_batch, None);

conn.proof_generation_dal()
.insert_proof_generation_details(L1BatchNumber(1), "generation_data")
.await
.unwrap();

let unpicked_l1_batch = conn
.proof_generation_dal()
.get_oldest_unpicked_batch()
.await
.unwrap();
assert_eq!(unpicked_l1_batch, Some(L1BatchNumber(1)));

// Calling the method multiple times should work fine.
conn.proof_generation_dal()
.insert_proof_generation_details(L1BatchNumber(1), "generation_data")
.await
.unwrap();

let unpicked_l1_batch = conn
.proof_generation_dal()
.get_oldest_unpicked_batch()
.await
.unwrap();
assert_eq!(unpicked_l1_batch, Some(L1BatchNumber(1)));

let picked_l1_batch = conn
.proof_generation_dal()
.get_next_block_to_be_proven(Duration::MAX)
.await
.unwrap();
assert_eq!(picked_l1_batch, Some(L1BatchNumber(1)));
let unpicked_l1_batch = conn
.proof_generation_dal()
.get_oldest_unpicked_batch()
.await
.unwrap();
assert_eq!(unpicked_l1_batch, None);

// Check that with small enough processing timeout, the L1 batch can be picked again
let picked_l1_batch = conn
.proof_generation_dal()
.get_next_block_to_be_proven(Duration::ZERO)
.await
.unwrap();
assert_eq!(picked_l1_batch, Some(L1BatchNumber(1)));

conn.proof_generation_dal()
.save_proof_artifacts_metadata(L1BatchNumber(1), "proof")
.await
.unwrap();

let picked_l1_batch = conn
.proof_generation_dal()
.get_next_block_to_be_proven(Duration::MAX)
.await
.unwrap();
assert_eq!(picked_l1_batch, None);
let unpicked_l1_batch = conn
.proof_generation_dal()
.get_oldest_unpicked_batch()
.await
.unwrap();
assert_eq!(unpicked_l1_batch, None);
}
}
39 changes: 31 additions & 8 deletions core/lib/db_connection/src/instrument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,21 @@ impl<'a> InstrumentedData<'a> {
}
}

fn observe_error(&self, err: &dyn fmt::Display) {
let InstrumentedData {
name,
location,
args,
..
} = self;
tracing::warn!(
"Query {name}{args} called at {file}:{line} has resulted in error: {err}",
file = location.file(),
line = location.line()
);
REQUEST_METRICS.request_error[name].inc();
}

async fn fetch<R>(
self,
connection_tags: Option<&ConnectionTags>,
Expand Down Expand Up @@ -295,32 +310,40 @@ impl<'a> Instrumented<'a, ()> {
}
}

/// Wraps a provided argument validation error.
/// Wraps a provided argument validation error. It is assumed that the returned error
/// will be returned as an error cause (e.g., it is logged as an error and observed using metrics).
#[must_use]
pub fn arg_error<E>(&self, arg_name: &str, err: E) -> DalError
where
E: Into<anyhow::Error>,
{
let err: anyhow::Error = err.into();
let err = err.context(format!("failed validating query argument `{arg_name}`"));
DalRequestError::new(
let err = DalRequestError::new(
sqlx::Error::Decode(err.into()),
self.data.name,
self.data.location,
)
.with_args(self.data.args.to_owned())
.into()
.with_args(self.data.args.to_owned());

self.data.observe_error(&err);
err.into()
}

/// Wraps a provided application-level data constraint error.
/// Wraps a provided application-level data constraint error. It is assumed that the returned error
/// will be returned as an error cause (e.g., it is logged as an error and observed using metrics).
#[must_use]
pub fn constraint_error(&self, err: anyhow::Error) -> DalError {
let err = err.context("application-level data constraint violation");
DalRequestError::new(
let err = DalRequestError::new(
sqlx::Error::Decode(err.into()),
self.data.name,
self.data.location,
)
.with_args(self.data.args.to_owned())
.into()
.with_args(self.data.args.to_owned());

self.data.observe_error(&err);
err.into()
}

pub fn with<Q>(self, query: Q) -> Instrumented<'a, Q> {
Expand Down
3 changes: 1 addition & 2 deletions core/node/metadata_calculator/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ impl TreeUpdater {
storage
.tee_verifier_input_producer_dal()
.create_tee_verifier_input_producer_job(l1_batch_number)
.await
.expect("failed to create tee_verifier_input_producer job");
.await?;
// Save the proof generation details to Postgres
storage
.proof_generation_dal()
Expand Down
Loading