Skip to content

Commit

Permalink
fix(db): Fix insert_proof_generation_details() (#2291)
Browse files Browse the repository at this point in the history
## What ❔

Removes a double insertion check from
`insert_proof_generation_details()` in `ProofGenerationDal`.

## Why ❔

It is not an error, and can and will happen if multiple full trees are
run for the same node.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
  • Loading branch information
slowli authored Jun 21, 2024
1 parent 06c287b commit c2412cf
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 30 deletions.
133 changes: 113 additions & 20 deletions core/lib/dal/src/proof_generation_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::time::Duration;

use strum::{Display, EnumString};
use zksync_db_connection::{
connection::Connection, error::DalResult, instrument::Instrumented,
connection::Connection,
error::DalResult,
instrument::{InstrumentExt, Instrumented},
utils::pg_interval_from_duration,
};
use zksync_types::L1BatchNumber;
Expand Down Expand Up @@ -110,39 +112,36 @@ impl ProofGenerationDal<'_, '_> {
Ok(())
}

/// The caller should ensure that `l1_batch_number` exists in the database.
pub async fn insert_proof_generation_details(
&mut self,
block_number: L1BatchNumber,
l1_batch_number: L1BatchNumber,
proof_gen_data_blob_url: &str,
) -> DalResult<()> {
let l1_batch_number = i64::from(block_number.0);
let query = sqlx::query!(
let result = sqlx::query!(
r#"
INSERT INTO
proof_generation_details (l1_batch_number, status, proof_gen_data_blob_url, created_at, updated_at)
VALUES
($1, 'ready_to_be_proven', $2, NOW(), NOW())
ON CONFLICT (l1_batch_number) DO NOTHING
"#,
l1_batch_number,
i64::from(l1_batch_number.0),
proof_gen_data_blob_url,
);
let instrumentation = Instrumented::new("insert_proof_generation_details")
.with_arg("l1_batch_number", &l1_batch_number)
.with_arg("proof_gen_data_blob_url", &proof_gen_data_blob_url);
let result = instrumentation
.clone()
.with(query)
.execute(self.storage)
.await?;
)
.instrument("insert_proof_generation_details")
.with_arg("l1_batch_number", &l1_batch_number)
.with_arg("proof_gen_data_blob_url", &proof_gen_data_blob_url)
.report_latency()
.execute(self.storage)
.await?;

if result.rows_affected() == 0 {
let err = instrumentation.constraint_error(anyhow::anyhow!(
"Cannot save proof_blob_url for a batch number {} that does not exist",
l1_batch_number
));
return Err(err);
// Not an error: we may call `insert_proof_generation_details()` from multiple full trees instantiated
// for the same node. Unlike tree data, we don't particularly care about correspondence of `proof_gen_data_blob_url` across calls,
// so just log this fact and carry on.
tracing::debug!("L1 batch #{l1_batch_number}: proof generation data wasn't updated as it's already present");
}

Ok(())
}

Expand Down Expand Up @@ -229,3 +228,97 @@ impl ProofGenerationDal<'_, '_> {
Ok(result)
}
}

#[cfg(test)]
mod tests {
use zksync_types::ProtocolVersion;

use super::*;
use crate::{tests::create_l1_batch_header, ConnectionPool, CoreDal};

#[tokio::test]
async fn proof_generation_workflow() {
let pool = ConnectionPool::<Core>::test_pool().await;
let mut conn = pool.connection().await.unwrap();

conn.protocol_versions_dal()
.save_protocol_version_with_tx(&ProtocolVersion::default())
.await
.unwrap();
conn.blocks_dal()
.insert_mock_l1_batch(&create_l1_batch_header(1))
.await
.unwrap();

let unpicked_l1_batch = conn
.proof_generation_dal()
.get_oldest_unpicked_batch()
.await
.unwrap();
assert_eq!(unpicked_l1_batch, None);

conn.proof_generation_dal()
.insert_proof_generation_details(L1BatchNumber(1), "generation_data")
.await
.unwrap();

let unpicked_l1_batch = conn
.proof_generation_dal()
.get_oldest_unpicked_batch()
.await
.unwrap();
assert_eq!(unpicked_l1_batch, Some(L1BatchNumber(1)));

// Calling the method multiple times should work fine.
conn.proof_generation_dal()
.insert_proof_generation_details(L1BatchNumber(1), "generation_data")
.await
.unwrap();

let unpicked_l1_batch = conn
.proof_generation_dal()
.get_oldest_unpicked_batch()
.await
.unwrap();
assert_eq!(unpicked_l1_batch, Some(L1BatchNumber(1)));

let picked_l1_batch = conn
.proof_generation_dal()
.get_next_block_to_be_proven(Duration::MAX)
.await
.unwrap();
assert_eq!(picked_l1_batch, Some(L1BatchNumber(1)));
let unpicked_l1_batch = conn
.proof_generation_dal()
.get_oldest_unpicked_batch()
.await
.unwrap();
assert_eq!(unpicked_l1_batch, None);

// Check that with small enough processing timeout, the L1 batch can be picked again
let picked_l1_batch = conn
.proof_generation_dal()
.get_next_block_to_be_proven(Duration::ZERO)
.await
.unwrap();
assert_eq!(picked_l1_batch, Some(L1BatchNumber(1)));

conn.proof_generation_dal()
.save_proof_artifacts_metadata(L1BatchNumber(1), "proof")
.await
.unwrap();

let picked_l1_batch = conn
.proof_generation_dal()
.get_next_block_to_be_proven(Duration::MAX)
.await
.unwrap();
assert_eq!(picked_l1_batch, None);
let unpicked_l1_batch = conn
.proof_generation_dal()
.get_oldest_unpicked_batch()
.await
.unwrap();
assert_eq!(unpicked_l1_batch, None);
}
}
39 changes: 31 additions & 8 deletions core/lib/db_connection/src/instrument.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,21 @@ impl<'a> InstrumentedData<'a> {
}
}

fn observe_error(&self, err: &dyn fmt::Display) {
let InstrumentedData {
name,
location,
args,
..
} = self;
tracing::warn!(
"Query {name}{args} called at {file}:{line} has resulted in error: {err}",
file = location.file(),
line = location.line()
);
REQUEST_METRICS.request_error[name].inc();
}

async fn fetch<R>(
self,
connection_tags: Option<&ConnectionTags>,
Expand Down Expand Up @@ -295,32 +310,40 @@ impl<'a> Instrumented<'a, ()> {
}
}

/// Wraps a provided argument validation error.
/// Wraps a provided argument validation error. It is assumed that the returned error
/// will be returned as an error cause (e.g., it is logged as an error and observed using metrics).
#[must_use]
pub fn arg_error<E>(&self, arg_name: &str, err: E) -> DalError
where
E: Into<anyhow::Error>,
{
let err: anyhow::Error = err.into();
let err = err.context(format!("failed validating query argument `{arg_name}`"));
DalRequestError::new(
let err = DalRequestError::new(
sqlx::Error::Decode(err.into()),
self.data.name,
self.data.location,
)
.with_args(self.data.args.to_owned())
.into()
.with_args(self.data.args.to_owned());

self.data.observe_error(&err);
err.into()
}

/// Wraps a provided application-level data constraint error.
/// Wraps a provided application-level data constraint error. It is assumed that the returned error
/// will be returned as an error cause (e.g., it is logged as an error and observed using metrics).
#[must_use]
pub fn constraint_error(&self, err: anyhow::Error) -> DalError {
let err = err.context("application-level data constraint violation");
DalRequestError::new(
let err = DalRequestError::new(
sqlx::Error::Decode(err.into()),
self.data.name,
self.data.location,
)
.with_args(self.data.args.to_owned())
.into()
.with_args(self.data.args.to_owned());

self.data.observe_error(&err);
err.into()
}

pub fn with<Q>(self, query: Q) -> Instrumented<'a, Q> {
Expand Down
3 changes: 1 addition & 2 deletions core/node/metadata_calculator/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ impl TreeUpdater {
storage
.tee_verifier_input_producer_dal()
.create_tee_verifier_input_producer_job(l1_batch_number)
.await
.expect("failed to create tee_verifier_input_producer job");
.await?;
// Save the proof generation details to Postgres
storage
.proof_generation_dal()
Expand Down

0 comments on commit c2412cf

Please sign in to comment.