From 6ac01ab2ca91d8be207a49c63ca74742c586b043 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Wed, 26 Jun 2024 17:40:34 +0200 Subject: [PATCH 1/8] compiles --- Cargo.lock | 1 + core/lib/dal/src/consensus_dal.rs | 68 ++++++++++++------ core/node/consensus/Cargo.toml | 1 + core/node/consensus/src/storage/mod.rs | 96 +++++++++++++++++++++----- 4 files changed, 129 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a537ea6c4f8f..c47ee84af18c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8505,6 +8505,7 @@ dependencies = [ "secrecy", "tempfile", "test-casing", + "thiserror", "tokio", "tracing", "zksync_concurrency", diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs index f2742cbedd8c..41efad20da51 100644 --- a/core/lib/dal/src/consensus_dal.rs +++ b/core/lib/dal/src/consensus_dal.rs @@ -5,7 +5,7 @@ use zksync_consensus_roles::validator; use zksync_consensus_storage::ReplicaState; use zksync_db_connection::{ connection::Connection, - error::{DalResult, SqlxContext}, + error::{DalError, DalResult, SqlxContext}, instrument::{InstrumentExt, Instrumented}, }; use zksync_types::L2BlockNumber; @@ -19,6 +19,17 @@ pub struct ConsensusDal<'a, 'c> { pub storage: &'a mut Connection<'c, Core>, } +/// Error returned by `ConsensusDal::insert_certificate()`. +#[derive(thiserror::Error, Debug)] +pub enum InsertCertificateError { + #[error("corresponding L2 block is missing")] + MissingPayload, + #[error(transparent)] + Dal(#[from] DalError), + #[error(transparent)] + Other(#[from] anyhow::Error), +} + impl ConsensusDal<'_, '_> { /// Fetches genesis. pub async fn genesis(&mut self) -> DalResult> { @@ -113,16 +124,25 @@ impl ConsensusDal<'_, '_> { /// Fetches the range of L2 blocks present in storage. /// If storage was recovered from snapshot, the range doesn't need to start at 0. pub async fn block_range(&mut self) -> DalResult> { - let mut txn = self.storage.start_transaction().await?; - let snapshot = txn + let mut start = L2BlockNumber(0); + if let Some(snapshot) = self + .storage .snapshot_recovery_dal() .get_applied_snapshot_status() - .await?; - // `snapshot.l2_block_number` indicates the last block processed. - // This block is NOT present in storage. Therefore, the first block - // that will appear in storage is `snapshot.l2_block_number + 1`. - let start = validator::BlockNumber(snapshot.map_or(0, |s| s.l2_block_number.0 + 1).into()); - let end = txn + .await? + { + // `snapshot.l2_block_number` indicates the last block processed. + // This block is NOT present in storage. Therefore, the first block + // that will appear in storage is `snapshot.l2_block_number + 1`. + start = start.max(snapshot.l2_block_number + 1); + } + let pruning_info = self.storage.pruning_dal().get_pruning_info().await?; + if let Some(last_pruned) = pruning_info.last_hard_pruned_l2_block { + start = start.max(last_pruned + 1); + } + let start = validator::BlockNumber(start.0.into()); + let end = self + .storage .blocks_dal() .get_sealed_l2_block_number() .await? @@ -339,24 +359,31 @@ impl ConsensusDal<'_, '_> { /// which will help us to detect bugs in the consensus implementation /// while it is "fresh". If it turns out to take too long, /// we can remove the verification checks later. - pub async fn insert_certificate(&mut self, cert: &validator::CommitQC) -> anyhow::Result<()> { + pub async fn insert_certificate( + &mut self, + cert: &validator::CommitQC, + ) -> Result<(), InsertCertificateError> { + use InsertCertificateError as Err; let header = &cert.message.proposal; let mut txn = self.storage.start_transaction().await?; if let Some(last) = txn.consensus_dal().last_certificate().await? { - anyhow::ensure!( - last.header().number.next() == header.number, - "expected certificate for a block after the current head block" - ); + if last.header().number.next() != header.number { + return Err(anyhow::format_err!( + "expected certificate for a block after the current head block" + ) + .into()); + } } let want_payload = txn .consensus_dal() .block_payload(cert.message.proposal.number) .await? - .context("corresponding L2 block is missing")?; - anyhow::ensure!( - header.payload == want_payload.encode().hash(), - "consensus block payload doesn't match the L2 block" - ); + .ok_or(Err::MissingPayload)?; + if header.payload != want_payload.encode().hash() { + return Err( + anyhow::format_err!("consensus block payload doesn't match the L2 block").into(), + ); + } sqlx::query!( r#" INSERT INTO @@ -368,7 +395,8 @@ impl ConsensusDal<'_, '_> { zksync_protobuf::serde::serialize(cert, serde_json::value::Serializer).unwrap(), ) .execute(txn.conn()) - .await?; + .await + .context("sqlx::query::execute()")?; txn.commit().await?; Ok(()) } diff --git a/core/node/consensus/Cargo.toml b/core/node/consensus/Cargo.toml index 5fc95b6c91f3..6332ac8c1a9d 100644 --- a/core/node/consensus/Cargo.toml +++ b/core/node/consensus/Cargo.toml @@ -36,6 +36,7 @@ anyhow.workspace = true async-trait.workspace = true secrecy.workspace = true tempfile.workspace = true +thiserror.workspace = true tracing.workspace = true [dev-dependencies] diff --git a/core/node/consensus/src/storage/mod.rs b/core/node/consensus/src/storage/mod.rs index bc8a0b8b8409..0afab4346fbb 100644 --- a/core/node/consensus/src/storage/mod.rs +++ b/core/node/consensus/src/storage/mod.rs @@ -2,11 +2,14 @@ use std::sync::Arc; use anyhow::Context as _; -use zksync_concurrency::{ctx, error::Wrap as _, sync, time}; +use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time}; use zksync_consensus_bft::PayloadManager; use zksync_consensus_roles::validator; use zksync_consensus_storage as storage; -use zksync_dal::{consensus_dal::Payload, Core, CoreDal, DalError}; +use zksync_dal::{ + consensus_dal::{self, Payload}, + Core, CoreDal, DalError, +}; use zksync_node_sync::{ fetcher::{FetchedBlock, FetchedTransaction, IoCursorExt as _}, sync_action::ActionQueueSender, @@ -24,6 +27,14 @@ pub(crate) mod testonly; #[derive(Debug, Clone)] pub(super) struct ConnectionPool(pub(super) zksync_dal::ConnectionPool); +#[derive(thiserror::Error, Debug)] +pub enum InsertCertificateError { + #[error(transparent)] + Canceled(#[from] ctx::Canceled), + #[error(transparent)] + Inner(#[from] consensus_dal::InsertCertificateError), +} + impl ConnectionPool { /// Wrapper for `connection_tagged()`. pub(super) async fn connection<'a>(&'a self, ctx: &ctx::Ctx) -> ctx::Result> { @@ -152,7 +163,7 @@ impl<'a> Connection<'a> { &mut self, ctx: &ctx::Ctx, cert: &validator::CommitQC, - ) -> ctx::Result<()> { + ) -> Result<(), InsertCertificateError> { Ok(ctx .wait(self.0.consensus_dal().insert_certificate(cert)) .await??) @@ -440,23 +451,74 @@ impl Store { impl StoreRunner { pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - let res = async { + let res = scope::run!(ctx, |ctx, s| async { + s.spawn::<()>(async { + const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); + loop { + let range = self + .pool + .connection(ctx) + .await + .wrap("connection")? + .block_range(ctx) + .await + .wrap("block_range()")?; + self.persisted.send_if_modified(|p| { + if range.start < p.first { + return false; + } + p.first = range.start; + if range.start >= p.next() { + p.last = None; + } + true + }); + ctx.sleep(POLL_INTERVAL).await?; + } + }); + + // Loop inserting certs to storage. loop { + const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); let cert = self.certificates.recv(ctx).await?; - self.pool - .wait_for_payload(ctx, cert.header().number) - .await - .wrap("wait_for_payload()")?; - self.pool - .connection(ctx) - .await - .wrap("connection()")? - .insert_certificate(ctx, &cert) - .await - .wrap("insert_certificate()")?; - self.persisted.send_modify(|p| p.last = Some(cert)); + // Wait for the block to be persisted, so that we can attach a cert to it. + // It may happen that persisted blocks get pruned and this certificate is not needed any + // more. + while self.persisted.borrow().next() == cert.header().number { + use consensus_dal::InsertCertificateError as E; + // Try to insert the cert. + match self + .pool + .connection(ctx) + .await + .wrap("connection")? + .insert_certificate(ctx, &cert) + .await + { + Ok(()) => { + self.persisted.send_if_modified(|p| { + if p.next() != cert.header().number { + return false; + } + p.last = Some(cert); + true + }); + break; + } + // In case of missing payload we just need to wait longer. + Err(InsertCertificateError::Inner(E::MissingPayload)) => { + ctx.sleep(POLL_INTERVAL).await?; + } + Err(InsertCertificateError::Canceled(err)) => { + return Err(ctx::Error::Canceled(err)) + } + Err(InsertCertificateError::Inner(err)) => { + return Err(ctx::Error::Internal(anyhow::Error::from(err))) + } + } + } } - } + }) .await; match res { Err(ctx::Error::Canceled(_)) | Ok(()) => Ok(()), From fbfc26ed7045d73be965c5f03e2cfe99eb2602c9 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Thu, 27 Jun 2024 12:34:00 +0200 Subject: [PATCH 2/8] support for pruning --- ...731b755cf2e09d877dd4eb70d58a1d11a977.json} | 8 +- ...5223f4599d4128db588d8645f3d106de5f50b.json | 20 -- core/lib/dal/src/consensus_dal.rs | 212 +++++++++--------- core/node/consensus/src/storage/mod.rs | 107 ++------- core/node/consensus/src/storage/testonly.rs | 16 +- 5 files changed, 139 insertions(+), 224 deletions(-) rename core/lib/dal/.sqlx/{query-3b013b93ea4a6766162c9f0c60517a7ffc993cf436ad3aeeae82ed3e330b07bd.json => query-d3d472436f1f3a6cc61bc9d47de5731b755cf2e09d877dd4eb70d58a1d11a977.json} (58%) delete mode 100644 core/lib/dal/.sqlx/query-fec7b791e371a4c58350b6537065223f4599d4128db588d8645f3d106de5f50b.json diff --git a/core/lib/dal/.sqlx/query-3b013b93ea4a6766162c9f0c60517a7ffc993cf436ad3aeeae82ed3e330b07bd.json b/core/lib/dal/.sqlx/query-d3d472436f1f3a6cc61bc9d47de5731b755cf2e09d877dd4eb70d58a1d11a977.json similarity index 58% rename from core/lib/dal/.sqlx/query-3b013b93ea4a6766162c9f0c60517a7ffc993cf436ad3aeeae82ed3e330b07bd.json rename to core/lib/dal/.sqlx/query-d3d472436f1f3a6cc61bc9d47de5731b755cf2e09d877dd4eb70d58a1d11a977.json index 6e7bffec4854..61497cdb1694 100644 --- a/core/lib/dal/.sqlx/query-3b013b93ea4a6766162c9f0c60517a7ffc993cf436ad3aeeae82ed3e330b07bd.json +++ b/core/lib/dal/.sqlx/query-d3d472436f1f3a6cc61bc9d47de5731b755cf2e09d877dd4eb70d58a1d11a977.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n certificate\n FROM\n miniblocks_consensus\n ORDER BY\n number ASC\n LIMIT\n 1\n ", + "query": "\n SELECT\n certificate\n FROM\n miniblocks_consensus\n WHERE\n number >= $1\n ORDER BY\n number DESC\n LIMIT\n 1\n ", "describe": { "columns": [ { @@ -10,11 +10,13 @@ } ], "parameters": { - "Left": [] + "Left": [ + "Int8" + ] }, "nullable": [ false ] }, - "hash": "3b013b93ea4a6766162c9f0c60517a7ffc993cf436ad3aeeae82ed3e330b07bd" + "hash": "d3d472436f1f3a6cc61bc9d47de5731b755cf2e09d877dd4eb70d58a1d11a977" } diff --git a/core/lib/dal/.sqlx/query-fec7b791e371a4c58350b6537065223f4599d4128db588d8645f3d106de5f50b.json b/core/lib/dal/.sqlx/query-fec7b791e371a4c58350b6537065223f4599d4128db588d8645f3d106de5f50b.json deleted file mode 100644 index c34d38ac2d03..000000000000 --- a/core/lib/dal/.sqlx/query-fec7b791e371a4c58350b6537065223f4599d4128db588d8645f3d106de5f50b.json +++ /dev/null @@ -1,20 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT\n certificate\n FROM\n miniblocks_consensus\n ORDER BY\n number DESC\n LIMIT\n 1\n ", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "certificate", - "type_info": "Jsonb" - } - ], - "parameters": { - "Left": [] - }, - "nullable": [ - false - ] - }, - "hash": "fec7b791e371a4c58350b6537065223f4599d4128db588d8645f3d106de5f50b" -} diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs index 41efad20da51..ab9834cb8641 100644 --- a/core/lib/dal/src/consensus_dal.rs +++ b/core/lib/dal/src/consensus_dal.rs @@ -1,8 +1,6 @@ -use std::ops; - use anyhow::Context as _; use zksync_consensus_roles::validator; -use zksync_consensus_storage::ReplicaState; +use zksync_consensus_storage::{BlockStoreState, ReplicaState}; use zksync_db_connection::{ connection::Connection, error::{DalError, DalResult, SqlxContext}, @@ -22,8 +20,12 @@ pub struct ConsensusDal<'a, 'c> { /// Error returned by `ConsensusDal::insert_certificate()`. #[derive(thiserror::Error, Debug)] pub enum InsertCertificateError { + #[error("unexpected certificate, want certificate for block {want}")] + UnexpectedCert { want: validator::BlockNumber }, #[error("corresponding L2 block is missing")] MissingPayload, + #[error("certificate doesn't match the payload")] + PayloadMismatch, #[error(transparent)] Dal(#[from] DalError), #[error(transparent)] @@ -96,14 +98,16 @@ impl ConsensusDal<'_, '_> { DELETE FROM miniblocks_consensus "# ) - .execute(txn.conn()) + .instrument("try_update_genesis#DELETE FROM miniblock_consensus") + .execute(&mut txn) .await?; sqlx::query!( r#" DELETE FROM consensus_replica_state "# ) - .execute(txn.conn()) + .instrument("try_update_genesis#DELETE FROM consensus_replica_state") + .execute(&mut txn) .await?; sqlx::query!( r#" @@ -115,41 +119,13 @@ impl ConsensusDal<'_, '_> { genesis, state, ) - .execute(txn.conn()) + .instrument("try_update_genesis#INSERT INTO consenuss_replica_state") + .execute(&mut txn) .await?; txn.commit().await?; Ok(()) } - /// Fetches the range of L2 blocks present in storage. - /// If storage was recovered from snapshot, the range doesn't need to start at 0. - pub async fn block_range(&mut self) -> DalResult> { - let mut start = L2BlockNumber(0); - if let Some(snapshot) = self - .storage - .snapshot_recovery_dal() - .get_applied_snapshot_status() - .await? - { - // `snapshot.l2_block_number` indicates the last block processed. - // This block is NOT present in storage. Therefore, the first block - // that will appear in storage is `snapshot.l2_block_number + 1`. - start = start.max(snapshot.l2_block_number + 1); - } - let pruning_info = self.storage.pruning_dal().get_pruning_info().await?; - if let Some(last_pruned) = pruning_info.last_hard_pruned_l2_block { - start = start.max(last_pruned + 1); - } - let start = validator::BlockNumber(start.0.into()); - let end = self - .storage - .blocks_dal() - .get_sealed_l2_block_number() - .await? - .map_or(start, |last| validator::BlockNumber(last.0.into()).next()); - Ok(start..end) - } - /// [Main node only] creates a new consensus fork starting at /// the last sealed L2 block. Resets the state of the consensus /// by calling `try_update_genesis()`. @@ -162,16 +138,14 @@ impl ConsensusDal<'_, '_> { let Some(old) = txn.consensus_dal().genesis().await.context("genesis()")? else { return Ok(()); }; - let first_block = txn - .consensus_dal() - .block_range() - .await - .context("get_block_range()")? - .end; let new = validator::GenesisRaw { chain_id: old.chain_id, fork_number: old.fork_number.next(), - first_block, + first_block: txn + .consensus_dal() + .next_block() + .await + .context("next_block()")?, protocol_version: old.protocol_version, committee: old.committee.clone(), @@ -222,62 +196,99 @@ impl ConsensusDal<'_, '_> { Ok(()) } - /// Fetches the first consensus certificate. - /// It might NOT be the certificate for the first L2 block: - /// see `validator::Genesis.first_block`. - pub async fn first_certificate(&mut self) -> DalResult> { - sqlx::query!( - r#" - SELECT - certificate - FROM - miniblocks_consensus - ORDER BY - number ASC - LIMIT - 1 - "# - ) - .try_map(|row| { - zksync_protobuf::serde::deserialize(row.certificate).decode_column("certificate") - }) - .instrument("first_certificate") - .fetch_optional(self.storage) - .await + /// First block that should be in storage. + async fn first_block(&mut self) -> anyhow::Result { + let mut start = validator::BlockNumber(0); + // If we recovered from a snapshot then it cannot be older than the first block after + // snapshot. + if let Some(snapshot) = self + .storage + .snapshot_recovery_dal() + .get_applied_snapshot_status() + .await? + { + start = start.max(validator::BlockNumber(snapshot.l2_block_number.0.into()) + 1); + } + // If the node was pruned, it cannot be older than the first block after soft pruned block. + let pruning_info = self.storage.pruning_dal().get_pruning_info().await?; + if let Some(last_pruned) = pruning_info.last_soft_pruned_l2_block { + start = start.max(validator::BlockNumber(last_pruned.0.into()) + 1); + } + Ok(start) + } + + /// Next block that should be inserted to storage. + pub async fn next_block(&mut self) -> anyhow::Result { + let mut txn = self.storage.start_transaction().await?; + if let Some(last) = txn + .blocks_dal() + .get_sealed_l2_block_number() + .await + .context("get_sealed_l2_block_number()")? + { + return Ok(validator::BlockNumber(last.0.into()) + 1); + } + let next = txn + .consensus_dal() + .first_block() + .await + .context("first_block()")?; + txn.commit().await.context("commit()")?; + Ok(next) } /// Fetches the last consensus certificate. /// Currently, certificates are NOT generated synchronously with L2 blocks, /// so it might NOT be the certificate for the last L2 block. - pub async fn last_certificate(&mut self) -> DalResult> { - sqlx::query!( + pub async fn certificates_range(&mut self) -> anyhow::Result { + let mut txn = self.storage.start_transaction().await?; + // It cannot be older than genesis first block. + let mut start = txn + .consensus_dal() + .genesis() + .await? + .context("genesis()")? + .first_block; + start = start.max( + txn.consensus_dal() + .first_block() + .await + .context("first_block()")?, + ); + let row = sqlx::query!( r#" SELECT certificate FROM miniblocks_consensus + WHERE + number >= $1 ORDER BY number DESC LIMIT 1 - "# + "#, + i64::try_from(start.0)?, ) - .try_map(|row| { - zksync_protobuf::serde::deserialize(row.certificate).decode_column("certificate") - }) .instrument("last_certificate") - .fetch_optional(self.storage) - .await + .fetch_optional(&mut txn) + .await?; + txn.commit().await.context("commit()")?; + Ok(BlockStoreState { + first: start, + last: match row { + None => None, + Some(row) => Some(zksync_protobuf::serde::deserialize(row.certificate)?), + }, + }) } /// Fetches the consensus certificate for the L2 block with the given `block_number`. pub async fn certificate( &mut self, block_number: validator::BlockNumber, - ) -> DalResult> { - let instrumentation = - Instrumented::new("certificate").with_arg("block_number", &block_number); - let query = sqlx::query!( + ) -> anyhow::Result> { + let Some(row) = sqlx::query!( r#" SELECT certificate @@ -286,17 +297,15 @@ impl ConsensusDal<'_, '_> { WHERE number = $1 "#, - i64::try_from(block_number.0) - .map_err(|err| { instrumentation.arg_error("block_number", err) })? + i64::try_from(block_number.0)? ) - .try_map(|row| { - zksync_protobuf::serde::deserialize(row.certificate).decode_column("certificate") - }); - - instrumentation - .with(query) - .fetch_optional(self.storage) - .await + .instrument("certificate") + .fetch_optional(self.storage) + .await? + else { + return Ok(None); + }; + Ok(Some(zksync_protobuf::serde::deserialize(row.certificate)?)) } /// Fetches a range of L2 blocks from storage and converts them to `Payload`s. @@ -352,7 +361,6 @@ impl ConsensusDal<'_, '_> { /// Inserts a certificate for the L2 block `cert.header().number`. It verifies that /// /// - the certified payload matches the L2 block in storage - /// - the `cert.header().parent` matches the parent L2 block. /// - the parent block already has a certificate. /// /// NOTE: This is an extra secure way of storing a certificate, @@ -363,26 +371,24 @@ impl ConsensusDal<'_, '_> { &mut self, cert: &validator::CommitQC, ) -> Result<(), InsertCertificateError> { - use InsertCertificateError as Err; + use InsertCertificateError as E; let header = &cert.message.proposal; let mut txn = self.storage.start_transaction().await?; - if let Some(last) = txn.consensus_dal().last_certificate().await? { - if last.header().number.next() != header.number { - return Err(anyhow::format_err!( - "expected certificate for a block after the current head block" - ) - .into()); - } + let range = txn.consensus_dal().certificates_range().await?; + // If we are not interested in this cert anymore, just return OK. + if range.next() < header.number { + return Ok(()); + } + if range.next() != header.number { + return Err(E::UnexpectedCert { want: range.next() }); } let want_payload = txn .consensus_dal() .block_payload(cert.message.proposal.number) .await? - .ok_or(Err::MissingPayload)?; + .ok_or(E::MissingPayload)?; if header.payload != want_payload.encode().hash() { - return Err( - anyhow::format_err!("consensus block payload doesn't match the L2 block").into(), - ); + return Err(E::PayloadMismatch); } sqlx::query!( r#" @@ -394,10 +400,10 @@ impl ConsensusDal<'_, '_> { header.number.0 as i64, zksync_protobuf::serde::serialize(cert, serde_json::value::Serializer).unwrap(), ) - .execute(txn.conn()) - .await - .context("sqlx::query::execute()")?; - txn.commit().await?; + .instrument("insert_certificate") + .execute(&mut txn) + .await?; + txn.commit().await.context("commit")?; Ok(()) } } diff --git a/core/node/consensus/src/storage/mod.rs b/core/node/consensus/src/storage/mod.rs index 0afab4346fbb..3c9e808e527f 100644 --- a/core/node/consensus/src/storage/mod.rs +++ b/core/node/consensus/src/storage/mod.rs @@ -89,17 +89,6 @@ impl<'a> Connection<'a> { Ok(ctx.wait(self.0.commit()).await?.context("sqlx")?) } - /// Wrapper for `consensus_dal().block_range()`. - pub async fn block_range( - &mut self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - Ok(ctx - .wait(self.0.consensus_dal().block_range()) - .await? - .context("sqlx")?) - } - /// Wrapper for `consensus_dal().block_payload()`. pub async fn payload( &mut self, @@ -124,28 +113,6 @@ impl<'a> Connection<'a> { .map_err(DalError::generalize)?) } - /// Wrapper for `consensus_dal().first_certificate()`. - pub async fn first_certificate( - &mut self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - Ok(ctx - .wait(self.0.consensus_dal().first_certificate()) - .await? - .map_err(DalError::generalize)?) - } - - /// Wrapper for `consensus_dal().last_certificate()`. - pub async fn last_certificate( - &mut self, - ctx: &ctx::Ctx, - ) -> ctx::Result> { - Ok(ctx - .wait(self.0.consensus_dal().last_certificate()) - .await? - .map_err(DalError::generalize)?) - } - /// Wrapper for `consensus_dal().certificate()`. pub async fn certificate( &mut self, @@ -154,8 +121,7 @@ impl<'a> Connection<'a> { ) -> ctx::Result> { Ok(ctx .wait(self.0.consensus_dal().certificate(number)) - .await? - .map_err(DalError::generalize)?) + .await??) } /// Wrapper for `consensus_dal().insert_certificate()`. @@ -215,6 +181,7 @@ impl<'a> Connection<'a> { }) } + /// Wrapper for `consensus_dal().genesis()`. pub async fn genesis(&mut self, ctx: &ctx::Ctx) -> ctx::Result> { Ok(ctx .wait(self.0.consensus_dal().genesis()) @@ -222,6 +189,7 @@ impl<'a> Connection<'a> { .map_err(DalError::generalize)?) } + /// Wrapper for `consensus_dal().try_update_genesis()`. pub async fn try_update_genesis( &mut self, ctx: &ctx::Ctx, @@ -232,52 +200,19 @@ impl<'a> Connection<'a> { .await??) } - /// Fetches and verifies consistency of certificates in storage. + /// Wrapper for `consensus_dal().next_block()`. + async fn next_block(&mut self, ctx: &ctx::Ctx) -> ctx::Result { + Ok(ctx.wait(self.0.consensus_dal().next_block()).await??) + } + + /// Wrapper for `consensus_dal().certificates_range()`. async fn certificates_range( &mut self, ctx: &ctx::Ctx, ) -> ctx::Result { - // Fetch the range of L2 blocks in storage. - let block_range = self.block_range(ctx).await.context("block_range")?; - - // Fetch the range of certificates in storage. - let genesis = self - .genesis(ctx) - .await - .wrap("genesis()")? - .context("genesis missing")?; - let first_expected_cert = genesis.first_block.max(block_range.start); - let last_cert = self - .last_certificate(ctx) - .await - .wrap("last_certificate()")?; - let next_expected_cert = last_cert - .as_ref() - .map_or(first_expected_cert, |cert| cert.header().number.next()); - - // Check that the first certificate in storage has the expected L2 block number. - if let Some(got) = self - .first_certificate(ctx) - .await - .wrap("first_certificate()")? - { - if got.header().number != first_expected_cert { - return Err(anyhow::format_err!( - "inconsistent storage: certificates should start at {first_expected_cert}, while they start at {}", - got.header().number, - ).into()); - } - } - - // Check that the node has all the blocks before the next expected certificate, because - // the node needs to know the state of the chain up to block `X` to process block `X+1`. - if block_range.end < next_expected_cert { - return Err(anyhow::format_err!("inconsistent storage: cannot start consensus for L2 block {next_expected_cert}, because earlier blocks are missing").into()); - } - Ok(storage::BlockStoreState { - first: first_expected_cert, - last: last_cert, - }) + Ok(ctx + .wait(self.0.consensus_dal().certificates_range()) + .await??) } /// (Re)initializes consensus genesis to start at the last L2 block in storage. @@ -287,7 +222,6 @@ impl<'a> Connection<'a> { ctx: &ctx::Ctx, spec: &config::GenesisSpec, ) -> ctx::Result<()> { - let block_range = self.block_range(ctx).await.wrap("block_range()")?; let mut txn = self .start_transaction(ctx) .await @@ -305,7 +239,7 @@ impl<'a> Connection<'a> { fork_number: old .as_ref() .map_or(validator::ForkNumber(0), |old| old.fork_number.next()), - first_block: block_range.end, + first_block: txn.next_block(ctx).await.context("next_block()")?, protocol_version: spec.protocol_version, committee: spec.validators.clone(), @@ -319,6 +253,7 @@ impl<'a> Connection<'a> { Ok(()) } + /// Fetches a block from storage. pub(super) async fn block( &mut self, ctx: &ctx::Ctx, @@ -453,6 +388,7 @@ impl StoreRunner { pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let res = scope::run!(ctx, |ctx, s| async { s.spawn::<()>(async { + // Loop observing the oldest block in storage. const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); loop { let range = self @@ -460,17 +396,14 @@ impl StoreRunner { .connection(ctx) .await .wrap("connection")? - .block_range(ctx) + .certificates_range(ctx) .await - .wrap("block_range()")?; + .wrap("certificates_range()")?; self.persisted.send_if_modified(|p| { - if range.start < p.first { + if p.first == range.first { return false; } - p.first = range.start; - if range.start >= p.next() { - p.last = None; - } + p.first = range.first; true }); ctx.sleep(POLL_INTERVAL).await?; @@ -478,8 +411,8 @@ impl StoreRunner { }); // Loop inserting certs to storage. + const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); loop { - const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); let cert = self.certificates.recv(ctx).await?; // Wait for the block to be persisted, so that we can attach a cert to it. // It may happen that persisted blocks get pruned and this certificate is not needed any diff --git a/core/node/consensus/src/storage/testonly.rs b/core/node/consensus/src/storage/testonly.rs index f5f30021b7c4..667bf36800d2 100644 --- a/core/node/consensus/src/storage/testonly.rs +++ b/core/node/consensus/src/storage/testonly.rs @@ -125,19 +125,13 @@ impl ConnectionPool { ) -> ctx::Result> { self.wait_for_certificate(ctx, want_last).await?; let mut conn = self.connection(ctx).await.wrap("connection()")?; - let last_cert = conn - .last_certificate(ctx) + let range = conn + .certificates_range(ctx) .await - .wrap("last_certificate()")? - .unwrap(); - let first_cert = conn - .first_certificate(ctx) - .await - .wrap("first_certificate()")? - .unwrap(); - assert_eq!(want_last, last_cert.header().number); + .wrap("certificates_range()")?; + assert_eq!(want_last.next(), range.next()); let mut blocks: Vec = vec![]; - for i in first_cert.header().number.0..=last_cert.header().number.0 { + for i in range.first.0..range.next().0 { let i = validator::BlockNumber(i); let block = conn.block(ctx, i).await.context("block()")?.unwrap(); blocks.push(block); From a31e4341632ad0dcb3d56fddb19b3d9be269e2c8 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Thu, 27 Jun 2024 16:43:41 +0200 Subject: [PATCH 3/8] test added, bumped era-consensus --- Cargo.lock | 239 ++++++++++++++++---- Cargo.toml | 20 +- core/lib/dal/src/consensus_dal.rs | 56 ++--- core/node/consensus/src/config.rs | 3 +- core/node/consensus/src/en.rs | 10 +- core/node/consensus/src/lib.rs | 10 +- core/node/consensus/src/storage/mod.rs | 46 +++- core/node/consensus/src/storage/testonly.rs | 29 +++ core/node/consensus/src/testonly.rs | 4 +- core/node/consensus/src/tests.rs | 73 +++++- prover/Cargo.lock | 17 +- 11 files changed, 391 insertions(+), 116 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c47ee84af18c..4634cd13cf05 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -293,6 +293,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "atomic-write-file" version = "0.1.2" @@ -331,9 +337,9 @@ dependencies = [ "bitflags 1.3.2", "bytes", "futures-util", - "http", - "http-body", - "hyper", + "http 0.2.9", + "http-body 0.4.6", + "hyper 0.14.29", "itoa", "matchit", "memchr", @@ -361,8 +367,8 @@ dependencies = [ "async-trait", "bytes", "futures-util", - "http", - "http-body", + "http 0.2.9", + "http-body 0.4.6", "mime", "rustversion", "tower-layer", @@ -420,6 +426,12 @@ version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "base64ct" version = "1.6.0" @@ -712,6 +724,12 @@ dependencies = [ "syn_derive", ] +[[package]] +name = "build_html" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3108fe6fe7ac796fb7625bdde8fa2b67b5a7731496251ca57c7b8cadd78a16a1" + [[package]] name = "bumpalo" version = "3.14.0" @@ -764,6 +782,12 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +[[package]] +name = "bytesize" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a3e368af43e418a04d52505cf3dbc23dda4e3407ae2fa99fd0e4f308ce546acc" + [[package]] name = "bzip2-sys" version = "0.1.11+1.0.8" @@ -2343,7 +2367,7 @@ dependencies = [ "futures-core", "futures-sink", "gloo-utils", - "http", + "http 0.2.9", "js-sys", "pin-project", "serde", @@ -2502,7 +2526,26 @@ dependencies = [ "futures-core", "futures-sink", "futures-util", - "http", + "http 0.2.9", + "indexmap 2.1.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "h2" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", "indexmap 2.1.0", "slab", "tokio", @@ -2652,6 +2695,17 @@ dependencies = [ "itoa", ] +[[package]] +name = "http" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" +dependencies = [ + "bytes", + "fnv", + "itoa", +] + [[package]] name = "http-body" version = "0.4.6" @@ -2659,7 +2713,30 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" dependencies = [ "bytes", - "http", + "http 0.2.9", + "pin-project-lite", +] + +[[package]] +name = "http-body" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cac85db508abc24a2e48553ba12a996e87244a0395ce011e62b37158745d643" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", "pin-project-lite", ] @@ -2691,9 +2768,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", - "http", - "http-body", + "h2 0.3.26", + "http 0.2.9", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -2705,6 +2782,27 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-rustls" version = "0.24.1" @@ -2712,8 +2810,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97" dependencies = [ "futures-util", - "http", - "hyper", + "http 0.2.9", + "hyper 0.14.29", "log", "rustls 0.21.11", "rustls-native-certs 0.6.3", @@ -2727,7 +2825,7 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.29", "pin-project-lite", "tokio", "tokio-io-timeout", @@ -2740,12 +2838,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.29", "native-tls", "tokio", "tokio-native-tls", ] +[[package]] +name = "hyper-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.3.1", + "pin-project-lite", + "socket2", + "tokio", + "tower", + "tower-service", + "tracing", +] + [[package]] name = "iai" version = "0.1.1" @@ -2970,7 +3088,7 @@ dependencies = [ "futures-channel", "futures-util", "gloo-net", - "http", + "http 0.2.9", "jsonrpsee-core", "pin-project", "rustls-native-certs 0.7.0", @@ -2997,7 +3115,7 @@ dependencies = [ "beef", "futures-timer", "futures-util", - "hyper", + "hyper 0.14.29", "jsonrpsee-types", "parking_lot", "pin-project", @@ -3019,7 +3137,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b7de9f3219d95985eb77fd03194d7c1b56c19bce1abfcc9d07462574b15572" dependencies = [ "async-trait", - "hyper", + "hyper 0.14.29", "hyper-rustls", "jsonrpsee-core", "jsonrpsee-types", @@ -3052,8 +3170,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5cc7c6d1a2c58f6135810284a390d9f823d0f508db74cd914d8237802de80f98" dependencies = [ "futures-util", - "http", - "hyper", + "http 0.2.9", + "hyper 0.14.29", "jsonrpsee-core", "jsonrpsee-types", "pin-project", @@ -3099,7 +3217,7 @@ version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "073c077471e89c4b511fa88b3df9a0f0abdf4a0a2e6683dd2ab36893af87bb2d" dependencies = [ - "http", + "http 0.2.9", "jsonrpsee-client-transport", "jsonrpsee-core", "jsonrpsee-types", @@ -3888,7 +4006,7 @@ checksum = "c7594ec0e11d8e33faf03530a4c49af7064ebba81c1480e01be67d90b356508b" dependencies = [ "async-trait", "bytes", - "http", + "http 0.2.9", "opentelemetry_api", "reqwest", ] @@ -3901,7 +4019,7 @@ checksum = "7e5e5a5c4135864099f3faafbe939eb4d7f9b80ebf68a8448da961b32a7c1275" dependencies = [ "async-trait", "futures-core", - "http", + "http 0.2.9", "opentelemetry-http", "opentelemetry-proto", "opentelemetry-semantic-conventions", @@ -4862,10 +4980,10 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", + "h2 0.3.26", + "http 0.2.9", + "http-body 0.4.6", + "hyper 0.14.29", "hyper-tls", "ipnet", "js-sys", @@ -5802,7 +5920,7 @@ dependencies = [ "base64 0.13.1", "bytes", "futures 0.3.28", - "http", + "http 0.2.9", "httparse", "log", "rand 0.8.5", @@ -6458,6 +6576,19 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" +[[package]] +name = "tls-listener" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce110c38c3c9b6e5cc4fe72e60feb5b327750388a10a276e3d5d7d431e3dc76c" +dependencies = [ + "futures-util", + "pin-project-lite", + "thiserror", + "tokio", + "tokio-rustls 0.25.0", +] + [[package]] name = "tokio" version = "1.34.0" @@ -6595,10 +6726,10 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "h2", - "http", - "http-body", - "hyper", + "h2 0.3.26", + "http 0.2.9", + "http-body 0.4.6", + "hyper 0.14.29", "hyper-timeout", "percent-encoding", "pin-project", @@ -6641,8 +6772,8 @@ dependencies = [ "bytes", "futures-core", "futures-util", - "http", - "http-body", + "http 0.2.9", + "http-body 0.4.6", "http-range-header", "pin-project-lite", "tokio", @@ -6998,7 +7129,7 @@ name = "vise-exporter" version = "0.1.0" source = "git+https://github.com/matter-labs/vise.git?rev=a5bb80c9ce7168663114ee30e794d6dc32159ee4#a5bb80c9ce7168663114ee30e794d6dc32159ee4" dependencies = [ - "hyper", + "hyper 0.14.29", "once_cell", "tokio", "tracing", @@ -7775,7 +7906,7 @@ dependencies = [ [[package]] name = "zksync_concurrency" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ "anyhow", "once_cell", @@ -7806,7 +7937,7 @@ dependencies = [ [[package]] name = "zksync_consensus_bft" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ "anyhow", "async-trait", @@ -7827,13 +7958,15 @@ dependencies = [ [[package]] name = "zksync_consensus_crypto" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ "anyhow", "blst", "ed25519-dalek", + "elliptic-curve 0.13.7", "ff_ce", "hex", + "k256 0.13.2", "num-bigint 0.4.4", "num-traits", "pairing_ce 0.28.5 (git+https://github.com/matter-labs/pairing.git?rev=d24f2c5871089c4cd4f54c0ca266bb9fef6115eb)", @@ -7848,7 +7981,7 @@ dependencies = [ [[package]] name = "zksync_consensus_executor" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ "anyhow", "rand 0.8.5", @@ -7867,10 +8000,16 @@ dependencies = [ [[package]] name = "zksync_consensus_network" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ "anyhow", "async-trait", + "base64 0.22.1", + "build_html", + "bytesize", + "http-body-util", + "hyper 1.3.1", + "hyper-util", "im", "once_cell", "pin-project", @@ -7878,6 +8017,9 @@ dependencies = [ "rand 0.8.5", "snow", "thiserror", + "tls-listener", + "tokio", + "tokio-rustls 0.25.0", "tracing", "vise", "zksync_concurrency", @@ -7892,7 +8034,7 @@ dependencies = [ [[package]] name = "zksync_consensus_roles" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ "anyhow", "bit-vec", @@ -7913,7 +8055,7 @@ dependencies = [ [[package]] name = "zksync_consensus_storage" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ "anyhow", "async-trait", @@ -7931,8 +8073,9 @@ dependencies = [ [[package]] name = "zksync_consensus_utils" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ + "anyhow", "rand 0.8.5", "thiserror", "zksync_concurrency", @@ -8457,7 +8600,7 @@ dependencies = [ "futures 0.3.28", "governor", "hex", - "http", + "http 0.2.9", "itertools 0.10.5", "lru", "once_cell", @@ -8710,7 +8853,7 @@ dependencies = [ "flate2", "google-cloud-auth", "google-cloud-storage", - "http", + "http 0.2.9", "prost 0.12.1", "rand 0.8.5", "reqwest", @@ -8731,7 +8874,7 @@ dependencies = [ "anyhow", "axum", "chrono", - "hyper", + "hyper 0.14.29", "serde_json", "tokio", "tower", @@ -8750,7 +8893,7 @@ dependencies = [ [[package]] name = "zksync_protobuf" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ "anyhow", "bit-vec", @@ -8770,7 +8913,7 @@ dependencies = [ [[package]] name = "zksync_protobuf_build" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ "anyhow", "heck 0.5.0", diff --git a/Cargo.toml b/Cargo.toml index 2f39c48cacbf..b8879d616d73 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -188,16 +188,16 @@ zk_evm_1_3_3 = { package = "zk_evm", git = "https://github.com/matter-labs/era-z zk_evm_1_4_0 = { package = "zk_evm", git = "https://github.com/matter-labs/era-zk_evm.git", branch = "v1.4.0" } zk_evm_1_4_1 = { package = "zk_evm", git = "https://github.com/matter-labs/era-zk_evm.git", branch = "v1.4.1" } zk_evm_1_5_0 = { package = "zk_evm", git = "https://github.com/matter-labs/era-zk_evm.git", branch = "v1.5.0" } -zksync_concurrency = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "3e6f101ee4124308c4c974caaa259d524549b0c6" } -zksync_consensus_bft = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "3e6f101ee4124308c4c974caaa259d524549b0c6" } -zksync_consensus_crypto = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "3e6f101ee4124308c4c974caaa259d524549b0c6" } -zksync_consensus_executor = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "3e6f101ee4124308c4c974caaa259d524549b0c6" } -zksync_consensus_network = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "3e6f101ee4124308c4c974caaa259d524549b0c6" } -zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "3e6f101ee4124308c4c974caaa259d524549b0c6" } -zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "3e6f101ee4124308c4c974caaa259d524549b0c6" } -zksync_consensus_utils = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "3e6f101ee4124308c4c974caaa259d524549b0c6" } -zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "3e6f101ee4124308c4c974caaa259d524549b0c6" } -zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "3e6f101ee4124308c4c974caaa259d524549b0c6" } +zksync_concurrency = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } +zksync_consensus_bft = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } +zksync_consensus_crypto = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } +zksync_consensus_executor = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } +zksync_consensus_network = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } +zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } +zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } +zksync_consensus_utils = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } +zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } +zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } # "Local" dependencies zksync_multivm = { path = "core/lib/multivm" } diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs index ab9834cb8641..3656b7862d01 100644 --- a/core/lib/dal/src/consensus_dal.rs +++ b/core/lib/dal/src/consensus_dal.rs @@ -20,8 +20,6 @@ pub struct ConsensusDal<'a, 'c> { /// Error returned by `ConsensusDal::insert_certificate()`. #[derive(thiserror::Error, Debug)] pub enum InsertCertificateError { - #[error("unexpected certificate, want certificate for block {want}")] - UnexpectedCert { want: validator::BlockNumber }, #[error("corresponding L2 block is missing")] MissingPayload, #[error("certificate doesn't match the payload")] @@ -148,7 +146,8 @@ impl ConsensusDal<'_, '_> { .context("next_block()")?, protocol_version: old.protocol_version, - committee: old.committee.clone(), + validators: old.validators.clone(), + attesters: old.attesters.clone(), leader_selection: old.leader_selection.clone(), } .with_hash(); @@ -190,6 +189,7 @@ impl ConsensusDal<'_, '_> { state_json ) .instrument("set_replica_state") + .report_latency() .with_arg("state.view", &state.view) .execute(self.storage) .await?; @@ -198,23 +198,19 @@ impl ConsensusDal<'_, '_> { /// First block that should be in storage. async fn first_block(&mut self) -> anyhow::Result { - let mut start = validator::BlockNumber(0); - // If we recovered from a snapshot then it cannot be older than the first block after - // snapshot. - if let Some(snapshot) = self + let info = self .storage - .snapshot_recovery_dal() - .get_applied_snapshot_status() - .await? - { - start = start.max(validator::BlockNumber(snapshot.l2_block_number.0.into()) + 1); - } - // If the node was pruned, it cannot be older than the first block after soft pruned block. - let pruning_info = self.storage.pruning_dal().get_pruning_info().await?; - if let Some(last_pruned) = pruning_info.last_soft_pruned_l2_block { - start = start.max(validator::BlockNumber(last_pruned.0.into()) + 1); - } - Ok(start) + .pruning_dal() + .get_pruning_info() + .await + .context("get_pruning_info()")?; + Ok(match info.last_soft_pruned_l2_block { + // It is guaranteed that pruning info values are set for storage recovered from + // snapshot, even if pruning was not enabled. + Some(last_pruned) => validator::BlockNumber(last_pruned.0.into()) + 1, + // No snapshot and no pruning: + None => validator::BlockNumber(0), + }) } /// Next block that should be inserted to storage. @@ -271,6 +267,7 @@ impl ConsensusDal<'_, '_> { i64::try_from(start.0)?, ) .instrument("last_certificate") + .report_latency() .fetch_optional(&mut txn) .await?; txn.commit().await.context("commit()")?; @@ -300,6 +297,7 @@ impl ConsensusDal<'_, '_> { i64::try_from(block_number.0)? ) .instrument("certificate") + .report_latency() .fetch_optional(self.storage) .await? else { @@ -358,15 +356,8 @@ impl ConsensusDal<'_, '_> { .next()) } - /// Inserts a certificate for the L2 block `cert.header().number`. It verifies that - /// - /// - the certified payload matches the L2 block in storage - /// - the parent block already has a certificate. - /// - /// NOTE: This is an extra secure way of storing a certificate, - /// which will help us to detect bugs in the consensus implementation - /// while it is "fresh". If it turns out to take too long, - /// we can remove the verification checks later. + /// Inserts a certificate for the L2 block `cert.header().number`. + /// Fails if certificate doesn't match the stored block. pub async fn insert_certificate( &mut self, cert: &validator::CommitQC, @@ -374,14 +365,6 @@ impl ConsensusDal<'_, '_> { use InsertCertificateError as E; let header = &cert.message.proposal; let mut txn = self.storage.start_transaction().await?; - let range = txn.consensus_dal().certificates_range().await?; - // If we are not interested in this cert anymore, just return OK. - if range.next() < header.number { - return Ok(()); - } - if range.next() != header.number { - return Err(E::UnexpectedCert { want: range.next() }); - } let want_payload = txn .consensus_dal() .block_payload(cert.message.proposal.number) @@ -401,6 +384,7 @@ impl ConsensusDal<'_, '_> { zksync_protobuf::serde::serialize(cert, serde_json::value::Serializer).unwrap(), ) .instrument("insert_certificate") + .report_latency() .execute(&mut txn) .await?; txn.commit().await.context("commit")?; diff --git a/core/node/consensus/src/config.rs b/core/node/consensus/src/config.rs index b0dfd3fbfef6..af64026c6e28 100644 --- a/core/node/consensus/src/config.rs +++ b/core/node/consensus/src/config.rs @@ -41,7 +41,7 @@ impl GenesisSpec { Self { chain_id: g.chain_id, protocol_version: g.protocol_version, - validators: g.committee.clone(), + validators: g.validators.clone(), leader_selection: g.leader_selection.clone(), } } @@ -107,5 +107,6 @@ pub(super) fn executor( .collect::>() .context("gossip_static_inbound")?, gossip_static_outbound, + debug_page: None, }) } diff --git a/core/node/consensus/src/en.rs b/core/node/consensus/src/en.rs index 685bc982bd07..3a3263d41b70 100644 --- a/core/node/consensus/src/en.rs +++ b/core/node/consensus/src/en.rs @@ -2,7 +2,7 @@ use anyhow::Context as _; use zksync_concurrency::{ctx, error::Wrap as _, scope, time}; use zksync_consensus_executor as executor; use zksync_consensus_roles::validator; -use zksync_consensus_storage::BlockStore; +use zksync_consensus_storage::{BatchStore, BlockStore}; use zksync_node_sync::{ fetcher::FetchedBlock, sync_action::ActionQueueSender, MainNodeClient, SyncState, }; @@ -77,9 +77,17 @@ impl EN { .await .wrap("BlockStore::new()")?; s.spawn_bg(async { Ok(runner.run(ctx).await?) }); + // Dummy batch store - we don't gossip batches yet, but we need one anyway. + let (batch_store, runner) = BatchStore::new(ctx, Box::new(store.clone())) + .await + .wrap("BatchStore::new()")?; + s.spawn_bg(async { Ok(runner.run(ctx).await?) }); + let executor = executor::Executor { config: config::executor(&cfg, &secrets)?, block_store, + batch_store, + attester: None, validator: config::validator_key(&secrets) .context("validator_key")? .map(|key| executor::Validator { diff --git a/core/node/consensus/src/lib.rs b/core/node/consensus/src/lib.rs index bc9776c42df5..a2c6847bd97e 100644 --- a/core/node/consensus/src/lib.rs +++ b/core/node/consensus/src/lib.rs @@ -7,7 +7,7 @@ use zksync_concurrency::{ctx, error::Wrap as _, scope}; use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets}; use zksync_consensus_executor as executor; use zksync_consensus_roles::validator; -use zksync_consensus_storage::BlockStore; +use zksync_consensus_storage::{BatchStore, BlockStore}; use crate::storage::{ConnectionPool, Store}; @@ -58,9 +58,17 @@ async fn run_main_node( "unsupported leader selection mode - main node has to be the leader" ); + // Dummy batch store - we don't gossip batches yet, but we need one anyway. + let (batch_store, runner) = BatchStore::new(ctx, Box::new(store.clone())) + .await + .wrap("BatchStore::new()")?; + s.spawn_bg(runner.run(ctx)); + let executor = executor::Executor { config: config::executor(&cfg, &secrets)?, block_store, + batch_store, + attester: None, validator: Some(executor::Validator { key: validator_key, replica_store: Box::new(store.clone()), diff --git a/core/node/consensus/src/storage/mod.rs b/core/node/consensus/src/storage/mod.rs index 3c9e808e527f..0f67ecb60bb6 100644 --- a/core/node/consensus/src/storage/mod.rs +++ b/core/node/consensus/src/storage/mod.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use anyhow::Context as _; use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time}; use zksync_consensus_bft::PayloadManager; -use zksync_consensus_roles::validator; +use zksync_consensus_roles::{attester, validator}; use zksync_consensus_storage as storage; use zksync_dal::{ consensus_dal::{self, Payload}, @@ -242,7 +242,8 @@ impl<'a> Connection<'a> { first_block: txn.next_block(ctx).await.context("next_block()")?, protocol_version: spec.protocol_version, - committee: spec.validators.clone(), + validators: spec.validators.clone(), + attesters: None, leader_selection: spec.leader_selection.clone(), } .with_hash(); @@ -400,10 +401,13 @@ impl StoreRunner { .await .wrap("certificates_range()")?; self.persisted.send_if_modified(|p| { - if p.first == range.first { + if &range == p { return false; } - p.first = range.first; + p.first = p.first.max(range.first); + if p.next() < range.next() { + p.last = range.last; + } true }); ctx.sleep(POLL_INTERVAL).await?; @@ -599,3 +603,37 @@ impl PayloadManager for Store { Ok(()) } } + +// Dummy implementation +#[async_trait::async_trait] +impl storage::PersistentBatchStore for Store { + fn last_batch(&self) -> attester::BatchNumber { + unimplemented!() + } + fn last_batch_qc(&self) -> attester::BatchQC { + unimplemented!() + } + fn get_batch(&self, _number: attester::BatchNumber) -> Option { + None + } + fn get_batch_qc(&self, _number: attester::BatchNumber) -> Option { + None + } + fn store_qc(&self, _qc: attester::BatchQC) { + unimplemented!() + } + fn persisted(&self) -> sync::watch::Receiver { + sync::watch::channel(storage::BatchStoreState { + first: attester::BatchNumber(0), + last: None, + }) + .1 + } + async fn queue_next_batch( + &self, + _ctx: &ctx::Ctx, + _batch: attester::SyncBatch, + ) -> ctx::Result<()> { + Err(anyhow::format_err!("unimplemented").into()) + } +} diff --git a/core/node/consensus/src/storage/testonly.rs b/core/node/consensus/src/storage/testonly.rs index 667bf36800d2..ee32a097cfc4 100644 --- a/core/node/consensus/src/storage/testonly.rs +++ b/core/node/consensus/src/storage/testonly.rs @@ -4,6 +4,7 @@ use anyhow::Context as _; use zksync_concurrency::{ctx, error::Wrap as _, time}; use zksync_consensus_roles::validator; use zksync_contracts::BaseSystemContracts; +use zksync_dal::CoreDal as _; use zksync_node_genesis::{insert_genesis_batch, mock_genesis_config, GenesisParams}; use zksync_node_test_utils::{recover, snapshot, Snapshot}; use zksync_types::{ @@ -159,4 +160,32 @@ impl ConnectionPool { } Ok(blocks) } + + pub async fn prune_batches( + &self, + ctx: &ctx::Ctx, + last_batch: L1BatchNumber, + ) -> ctx::Result<()> { + let mut conn = self.connection(ctx).await.context("connection()")?; + let (_, last_block) = ctx + .wait( + conn.0 + .blocks_dal() + .get_l2_block_range_of_l1_batch(last_batch), + ) + .await? + .context("get_l2_block_range_of_l1_batch()")? + .context("batch not found")?; + conn.0 + .pruning_dal() + .soft_prune_batches_range(last_batch, last_block) + .await + .context("soft_prune_batches_range()")?; + conn.0 + .pruning_dal() + .hard_prune_batches_range(last_batch, last_block) + .await + .context("hard_prune_batches_range()")?; + Ok(()) + } } diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index d20c379a5d66..148d079ec3c6 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -262,12 +262,14 @@ impl StateKeeper { } /// Pushes `SealBatch` command to the `StateKeeper`. - pub async fn seal_batch(&mut self) { + /// Returns the number of the sealed batch. + pub async fn seal_batch(&mut self) -> L1BatchNumber { // Each batch ends with an empty block (aka fictive block). let mut actions = vec![self.open_block()]; actions.push(SyncAction::SealBatch); self.actions_sender.push_actions(actions).await; self.batch_sealed = true; + self.last_batch } /// Pushes `count` random L2 blocks to the StateKeeper. diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index b16c66e478bb..c62a6a2a0e62 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -20,7 +20,7 @@ const VERSIONS: [ProtocolVersionId; 2] = [ProtocolVersionId::latest(), ProtocolV const FROM_SNAPSHOT: [bool; 2] = [true, false]; #[test_casing(2, VERSIONS)] -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_validator_block_store(version: ProtocolVersionId) { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); @@ -86,7 +86,7 @@ async fn test_validator_block_store(version: ProtocolVersionId) { // for the L2 blocks constructed by the StateKeeper. This means that consensus actor // is effectively just back filling the consensus certificates for the L2 blocks in storage. #[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))] -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_validator(from_snapshot: bool, version: ProtocolVersionId) { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::AffineClock::new(10.)); @@ -150,7 +150,7 @@ async fn test_validator(from_snapshot: bool, version: ProtocolVersionId) { // Test running a validator node and 2 full nodes recovered from different snapshots. #[test_casing(2, VERSIONS)] -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_nodes_from_various_snapshots(version: ProtocolVersionId) { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::AffineClock::new(10.)); @@ -229,7 +229,7 @@ async fn test_nodes_from_various_snapshots(version: ProtocolVersionId) { // Validator is producing signed blocks and fetchers are expected to fetch // them directly or indirectly. #[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))] -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_full_nodes(from_snapshot: bool, version: ProtocolVersionId) { const NODES: usize = 2; @@ -313,7 +313,7 @@ async fn test_full_nodes(from_snapshot: bool, version: ProtocolVersionId) { // Test running external node (non-leader) validators. #[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))] -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_en_validators(from_snapshot: bool, version: ProtocolVersionId) { const NODES: usize = 3; @@ -351,7 +351,7 @@ async fn test_en_validators(from_snapshot: bool, version: ProtocolVersionId) { tracing::info!("Run main node with all nodes being validators."); let (mut cfg, secrets) = testonly::config(&cfgs[0]); cfg.genesis_spec.as_mut().unwrap().validators = setup - .keys + .validator_keys .iter() .map(|k| WeightedValidator { key: ValidatorPublicKey(k.public().encode()), @@ -399,7 +399,7 @@ async fn test_en_validators(from_snapshot: bool, version: ProtocolVersionId) { // Test fetcher back filling missing certs. #[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))] -#[tokio::test(flavor = "multi_thread")] +#[tokio::test] async fn test_p2p_fetcher_backfill_certs(from_snapshot: bool, version: ProtocolVersionId) { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::AffineClock::new(10.)); @@ -473,6 +473,65 @@ async fn test_p2p_fetcher_backfill_certs(from_snapshot: bool, version: ProtocolV .unwrap(); } +#[test_casing(2, VERSIONS)] +#[tokio::test] +async fn test_with_pruning(version: ProtocolVersionId) { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + let mut setup = Setup::new(rng, 1); + let validator_cfg = new_configs(rng, &setup, 0)[0].clone(); + let node_cfg = new_fullnode(rng, &validator_cfg); + + scope::run!(ctx, |ctx, s| async { + let validator_pool = ConnectionPool::test(false, version).await; + let (mut validator, runner) = + testonly::StateKeeper::new(ctx, validator_pool.clone()).await?; + s.spawn_bg(async { + runner + .run(ctx) + .instrument(tracing::info_span!("validator")) + .await + .context("validator") + }); + tracing::info!("Run validator."); + let (cfg, secrets) = testonly::config(&validator_cfg); + s.spawn_bg(run_main_node(ctx, cfg, secrets, validator_pool.clone())); + // TODO: ensure at least L1 batch in `testonly::StateKeeper::new()` to make it fool proof. + validator.seal_batch().await; + + tracing::info!("Run node."); + let mut node_pool = ConnectionPool::test(false, version).await; + let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?; + s.spawn_bg(async { + runner + .run(ctx) + .instrument(tracing::info_span!("node")) + .await + .with_context(|| format!("node")) + }); + s.spawn_bg(node.run_consensus(ctx, validator.connect(ctx).await?, &node_cfg)); + + tracing::info!("Sync some blocks"); + validator.push_random_blocks(rng, 5).await; + let to_prune = validator.seal_batch().await; + validator.push_random_blocks(rng, 5).await; + node_pool + .wait_for_certificates(ctx, validator.last_block()) + .await?; + + tracing::info!("Prune some blocks and sync more"); + validator_pool.prune_batches(ctx, to_prune).await?; + validator.push_random_blocks(rng, 5); + node_pool + .wait_for_certificates(ctx, validator.last_block()) + .await?; + Ok(()) + }) + .await + .unwrap(); +} + #[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))] #[tokio::test] async fn test_centralized_fetcher(from_snapshot: bool, version: ProtocolVersionId) { diff --git a/prover/Cargo.lock b/prover/Cargo.lock index 7de9254ed2ee..875f75fb5499 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -7712,7 +7712,7 @@ dependencies = [ [[package]] name = "zksync_concurrency" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ "anyhow", "once_cell", @@ -7743,13 +7743,15 @@ dependencies = [ [[package]] name = "zksync_consensus_crypto" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ "anyhow", "blst", "ed25519-dalek", + "elliptic-curve 0.13.8", "ff_ce", "hex", + "k256 0.13.3", "num-bigint 0.4.5", "num-traits", "pairing_ce 0.28.5 (git+https://github.com/matter-labs/pairing.git?rev=d24f2c5871089c4cd4f54c0ca266bb9fef6115eb)", @@ -7764,7 +7766,7 @@ dependencies = [ [[package]] name = "zksync_consensus_roles" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ "anyhow", "bit-vec", @@ -7785,7 +7787,7 @@ dependencies = [ [[package]] name = "zksync_consensus_storage" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ "anyhow", "async-trait", @@ -7803,8 +7805,9 @@ dependencies = [ [[package]] name = "zksync_consensus_utils" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ + "anyhow", "rand 0.8.5", "thiserror", "zksync_concurrency", @@ -8102,7 +8105,7 @@ dependencies = [ [[package]] name = "zksync_protobuf" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ "anyhow", "bit-vec", @@ -8122,7 +8125,7 @@ dependencies = [ [[package]] name = "zksync_protobuf_build" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=3e6f101ee4124308c4c974caaa259d524549b0c6#3e6f101ee4124308c4c974caaa259d524549b0c6" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" dependencies = [ "anyhow", "heck 0.5.0", From d7fe1897c4891b32060203eacfe4e0e8b92035eb Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Thu, 27 Jun 2024 17:04:21 +0200 Subject: [PATCH 4/8] lint --- core/node/consensus/src/tests.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index b00d9ad5c6c2..085f42ee1afa 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -476,7 +476,7 @@ async fn test_with_pruning(version: ProtocolVersionId) { zksync_concurrency::testonly::abort_on_panic(); let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); - let mut setup = Setup::new(rng, 1); + let setup = Setup::new(rng, 1); let validator_cfg = new_configs(rng, &setup, 0)[0].clone(); let node_cfg = new_fullnode(rng, &validator_cfg); @@ -498,7 +498,7 @@ async fn test_with_pruning(version: ProtocolVersionId) { validator.seal_batch().await; tracing::info!("Run node."); - let mut node_pool = ConnectionPool::test(false, version).await; + let node_pool = ConnectionPool::test(false, version).await; let (node, runner) = testonly::StateKeeper::new(ctx, node_pool.clone()).await?; s.spawn_bg(async { runner @@ -519,7 +519,7 @@ async fn test_with_pruning(version: ProtocolVersionId) { tracing::info!("Prune some blocks and sync more"); validator_pool.prune_batches(ctx, to_prune).await?; - validator.push_random_blocks(rng, 5); + validator.push_random_blocks(rng, 5).await; node_pool .wait_for_certificates(ctx, validator.last_block()) .await?; From 1dd658b72ed3dd2f08bb0409ae68e95a5a05c701 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Thu, 27 Jun 2024 17:18:31 +0200 Subject: [PATCH 5/8] lint --- core/node/consensus/src/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index 085f42ee1afa..269d00d6c044 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -505,7 +505,7 @@ async fn test_with_pruning(version: ProtocolVersionId) { .run(ctx) .instrument(tracing::info_span!("node")) .await - .with_context(|| format!("node")) + .context("node") }); s.spawn_bg(node.run_consensus(ctx, validator.connect(ctx).await?, &node_cfg)); From 87147f1e2aed320a03c4da547810a148c08e6403 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Thu, 27 Jun 2024 22:32:47 +0200 Subject: [PATCH 6/8] test passes --- core/node/consensus/src/lib.rs | 8 +++--- core/node/consensus/src/storage/mod.rs | 8 ++++-- core/node/consensus/src/testonly.rs | 4 +-- core/node/consensus/src/tests.rs | 40 +++++++++++++++++++++----- 4 files changed, 44 insertions(+), 16 deletions(-) diff --git a/core/node/consensus/src/lib.rs b/core/node/consensus/src/lib.rs index a2c6847bd97e..82604d6f817c 100644 --- a/core/node/consensus/src/lib.rs +++ b/core/node/consensus/src/lib.rs @@ -47,11 +47,11 @@ async fn run_main_node( .wrap("adjust_genesis()")?; } let (store, runner) = Store::new(ctx, pool, None).await.wrap("Store::new()")?; - s.spawn_bg(runner.run(ctx)); + s.spawn_bg(async { runner.run(ctx).await.context("Store::runner()") }); let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())) .await .wrap("BlockStore::new()")?; - s.spawn_bg(runner.run(ctx)); + s.spawn_bg(async { runner.run(ctx).await.context("BlockStore::runner()") }); anyhow::ensure!( block_store.genesis().leader_selection == validator::LeaderSelectionMode::Sticky(validator_key.public()), @@ -62,7 +62,7 @@ async fn run_main_node( let (batch_store, runner) = BatchStore::new(ctx, Box::new(store.clone())) .await .wrap("BatchStore::new()")?; - s.spawn_bg(runner.run(ctx)); + s.spawn_bg(async { runner.run(ctx).await.context("BatchStore::runner()") }); let executor = executor::Executor { config: config::executor(&cfg, &secrets)?, @@ -75,7 +75,7 @@ async fn run_main_node( payload_manager: Box::new(store.clone()), }), }; - executor.run(ctx).await + executor.run(ctx).await.context("executor.run()") }) .await } diff --git a/core/node/consensus/src/storage/mod.rs b/core/node/consensus/src/storage/mod.rs index 0f67ecb60bb6..e0652e336917 100644 --- a/core/node/consensus/src/storage/mod.rs +++ b/core/node/consensus/src/storage/mod.rs @@ -59,7 +59,7 @@ impl ConnectionPool { .wrap("connection()")? .payload(ctx, number) .await - .wrap("payload()")? + .with_wrap(|| format!("payload({number})"))? { return Ok(payload); } @@ -553,7 +553,11 @@ impl PayloadManager for Store { block_number: validator::BlockNumber, ) -> ctx::Result { const LARGE_PAYLOAD_SIZE: usize = 1 << 20; - let payload = self.pool.wait_for_payload(ctx, block_number).await?; + let payload = self + .pool + .wait_for_payload(ctx, block_number) + .await + .wrap("wait_for_payload")?; let encoded_payload = payload.encode(); if encoded_payload.0.len() > LARGE_PAYLOAD_SIZE { tracing::warn!( diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index 148d079ec3c6..d20c379a5d66 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -262,14 +262,12 @@ impl StateKeeper { } /// Pushes `SealBatch` command to the `StateKeeper`. - /// Returns the number of the sealed batch. - pub async fn seal_batch(&mut self) -> L1BatchNumber { + pub async fn seal_batch(&mut self) { // Each batch ends with an empty block (aka fictive block). let mut actions = vec![self.open_block()]; actions.push(SyncAction::SealBatch); self.actions_sender.push_actions(actions).await; self.batch_sealed = true; - self.last_batch } /// Pushes `count` random L2 blocks to the StateKeeper. diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index 269d00d6c044..acff2365585f 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -493,7 +493,14 @@ async fn test_with_pruning(version: ProtocolVersionId) { }); tracing::info!("Run validator."); let (cfg, secrets) = testonly::config(&validator_cfg); - s.spawn_bg(run_main_node(ctx, cfg, secrets, validator_pool.clone())); + s.spawn_bg({ + let validator_pool = validator_pool.clone(); + async { + run_main_node(ctx, cfg, secrets, validator_pool) + .await + .context("run_main_node()") + } + }); // TODO: ensure at least L1 batch in `testonly::StateKeeper::new()` to make it fool proof. validator.seal_batch().await; @@ -507,22 +514,41 @@ async fn test_with_pruning(version: ProtocolVersionId) { .await .context("node") }); - s.spawn_bg(node.run_consensus(ctx, validator.connect(ctx).await?, &node_cfg)); + let conn = validator.connect(ctx).await?; + s.spawn_bg(async { + node.run_consensus(ctx, conn, &node_cfg) + .await + .context("run_consensus()") + }); tracing::info!("Sync some blocks"); validator.push_random_blocks(rng, 5).await; - let to_prune = validator.seal_batch().await; + validator.seal_batch().await; + let to_prune = validator.last_sealed_batch(); + tracing::info!( + "to_prune = batch {}; block {}", + to_prune, + validator.last_block() + ); + tracing::info!( + "Seal another batch to make sure that there is at least 1 sealed batch after pruning." + ); validator.push_random_blocks(rng, 5).await; - node_pool - .wait_for_certificates(ctx, validator.last_block()) + validator.seal_batch().await; + validator_pool + .wait_for_batch(ctx, validator.last_sealed_batch()) .await?; tracing::info!("Prune some blocks and sync more"); - validator_pool.prune_batches(ctx, to_prune).await?; + validator_pool + .prune_batches(ctx, to_prune) + .await + .context("prune_batches")?; validator.push_random_blocks(rng, 5).await; node_pool .wait_for_certificates(ctx, validator.last_block()) - .await?; + .await + .context("wait_for_certificates()")?; Ok(()) }) .await From 2bf2f3dfa11169c5020642234d2db2b4fb6d66c1 Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Fri, 28 Jun 2024 12:45:24 +0200 Subject: [PATCH 7/8] made rate limit for get_block settable in the config --- Cargo.lock | 21 +++-- Cargo.toml | 20 ++-- core/lib/config/Cargo.toml | 1 + core/lib/config/src/configs/consensus.rs | 26 ++++++ core/lib/config/src/testonly.rs | 9 ++ core/lib/dal/src/consensus_dal.rs | 23 +++-- core/lib/protobuf_config/build.rs | 2 +- core/lib/protobuf_config/src/consensus.rs | 22 ++++- .../src/proto/core/consensus.proto | 11 +++ core/lib/protobuf_config/src/tests.rs | 1 + core/node/consensus/src/config.rs | 5 + core/node/consensus/src/storage/mod.rs | 91 ++++++++++++------- core/node/consensus/src/testonly.rs | 1 + prover/Cargo.lock | 15 +-- 14 files changed, 177 insertions(+), 71 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 15b221b0742d..84a71a5bf764 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7908,7 +7908,7 @@ dependencies = [ [[package]] name = "zksync_concurrency" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "once_cell", @@ -7932,6 +7932,7 @@ dependencies = [ "secrecy", "serde", "zksync_basic_types", + "zksync_concurrency", "zksync_consensus_utils", "zksync_crypto_primitives", ] @@ -7939,7 +7940,7 @@ dependencies = [ [[package]] name = "zksync_consensus_bft" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "async-trait", @@ -7960,7 +7961,7 @@ dependencies = [ [[package]] name = "zksync_consensus_crypto" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "blst", @@ -7983,7 +7984,7 @@ dependencies = [ [[package]] name = "zksync_consensus_executor" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "rand 0.8.5", @@ -8002,7 +8003,7 @@ dependencies = [ [[package]] name = "zksync_consensus_network" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "async-trait", @@ -8036,7 +8037,7 @@ dependencies = [ [[package]] name = "zksync_consensus_roles" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "bit-vec", @@ -8057,7 +8058,7 @@ dependencies = [ [[package]] name = "zksync_consensus_storage" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "async-trait", @@ -8075,7 +8076,7 @@ dependencies = [ [[package]] name = "zksync_consensus_utils" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "rand 0.8.5", @@ -8895,7 +8896,7 @@ dependencies = [ [[package]] name = "zksync_protobuf" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "bit-vec", @@ -8915,7 +8916,7 @@ dependencies = [ [[package]] name = "zksync_protobuf_build" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "heck 0.5.0", diff --git a/Cargo.toml b/Cargo.toml index b8879d616d73..e49cbcbc8822 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -188,16 +188,16 @@ zk_evm_1_3_3 = { package = "zk_evm", git = "https://github.com/matter-labs/era-z zk_evm_1_4_0 = { package = "zk_evm", git = "https://github.com/matter-labs/era-zk_evm.git", branch = "v1.4.0" } zk_evm_1_4_1 = { package = "zk_evm", git = "https://github.com/matter-labs/era-zk_evm.git", branch = "v1.4.1" } zk_evm_1_5_0 = { package = "zk_evm", git = "https://github.com/matter-labs/era-zk_evm.git", branch = "v1.5.0" } -zksync_concurrency = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } -zksync_consensus_bft = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } -zksync_consensus_crypto = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } -zksync_consensus_executor = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } -zksync_consensus_network = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } -zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } -zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } -zksync_consensus_utils = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } -zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } -zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "177881457f392fca990dbb3df1695737d90fd0c7" } +zksync_concurrency = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ba7b171456e7362eada685234a91c20907b6a097" } +zksync_consensus_bft = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ba7b171456e7362eada685234a91c20907b6a097" } +zksync_consensus_crypto = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ba7b171456e7362eada685234a91c20907b6a097" } +zksync_consensus_executor = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ba7b171456e7362eada685234a91c20907b6a097" } +zksync_consensus_network = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ba7b171456e7362eada685234a91c20907b6a097" } +zksync_consensus_roles = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ba7b171456e7362eada685234a91c20907b6a097" } +zksync_consensus_storage = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ba7b171456e7362eada685234a91c20907b6a097" } +zksync_consensus_utils = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ba7b171456e7362eada685234a91c20907b6a097" } +zksync_protobuf = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ba7b171456e7362eada685234a91c20907b6a097" } +zksync_protobuf_build = { version = "0.1.0", git = "https://github.com/matter-labs/era-consensus.git", rev = "ba7b171456e7362eada685234a91c20907b6a097" } # "Local" dependencies zksync_multivm = { path = "core/lib/multivm" } diff --git a/core/lib/config/Cargo.toml b/core/lib/config/Cargo.toml index 144843c2bab2..2e1da7d0f3a2 100644 --- a/core/lib/config/Cargo.toml +++ b/core/lib/config/Cargo.toml @@ -13,6 +13,7 @@ categories.workspace = true zksync_basic_types.workspace = true zksync_crypto_primitives.workspace = true zksync_consensus_utils.workspace = true +zksync_concurrency.workspace = true anyhow.workspace = true rand.workspace = true diff --git a/core/lib/config/src/configs/consensus.rs b/core/lib/config/src/configs/consensus.rs index c31d34941d2b..433b05c954cf 100644 --- a/core/lib/config/src/configs/consensus.rs +++ b/core/lib/config/src/configs/consensus.rs @@ -2,6 +2,7 @@ use std::collections::{BTreeMap, BTreeSet}; use secrecy::{ExposeSecret as _, Secret}; use zksync_basic_types::L2ChainId; +use zksync_concurrency::{limiter, time}; /// `zksync_consensus_crypto::TextFmt` representation of `zksync_consensus_roles::validator::PublicKey`. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -65,6 +66,22 @@ pub struct GenesisSpec { pub leader: ValidatorPublicKey, } +#[derive(Clone, Debug, PartialEq, Default)] +pub struct RpcConfig { + /// Max number of blocks that can be send from/to each peer. + /// Defaults to 10 blocks/s/connection. + pub get_block_rate: Option, +} + +impl RpcConfig { + pub fn get_block_rate(&self) -> limiter::Rate { + self.get_block_rate.unwrap_or(limiter::Rate { + burst: 10, + refresh: time::Duration::milliseconds(100), + }) + } +} + /// Config (shared between main node and external node). #[derive(Clone, Debug, PartialEq)] pub struct ConsensusConfig { @@ -91,6 +108,15 @@ pub struct ConsensusConfig { /// Used to (re)initialize genesis if needed. /// External nodes fetch the genesis from the main node. pub genesis_spec: Option, + + /// Rate limiting configuration for the p2p RPCs. + pub rpc: Option, +} + +impl ConsensusConfig { + pub fn rpc(&self) -> RpcConfig { + self.rpc.clone().unwrap_or_default() + } } /// Secrets need for consensus. diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 2c8034dfe9d6..a05b3d096253 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -751,6 +751,15 @@ impl Distribution for EncodeDist { .map(|_| (NodePublicKey(self.sample(rng)), Host(self.sample(rng)))) .collect(), genesis_spec: self.sample(rng), + rpc: self.sample(rng), + } + } +} + +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> configs::consensus::RpcConfig { + configs::consensus::RpcConfig { + get_block_rate: self.sample(rng), } } } diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs index 3656b7862d01..15f76c7f55eb 100644 --- a/core/lib/dal/src/consensus_dal.rs +++ b/core/lib/dal/src/consensus_dal.rs @@ -215,7 +215,12 @@ impl ConsensusDal<'_, '_> { /// Next block that should be inserted to storage. pub async fn next_block(&mut self) -> anyhow::Result { - let mut txn = self.storage.start_transaction().await?; + let mut txn = self + .storage + .transaction_builder()? + .set_readonly() + .build() + .await?; if let Some(last) = txn .blocks_dal() .get_sealed_l2_block_number() @@ -229,7 +234,6 @@ impl ConsensusDal<'_, '_> { .first_block() .await .context("first_block()")?; - txn.commit().await.context("commit()")?; Ok(next) } @@ -237,7 +241,12 @@ impl ConsensusDal<'_, '_> { /// Currently, certificates are NOT generated synchronously with L2 blocks, /// so it might NOT be the certificate for the last L2 block. pub async fn certificates_range(&mut self) -> anyhow::Result { - let mut txn = self.storage.start_transaction().await?; + let mut txn = self + .storage + .transaction_builder()? + .set_readonly() + .build() + .await?; // It cannot be older than genesis first block. let mut start = txn .consensus_dal() @@ -270,13 +279,11 @@ impl ConsensusDal<'_, '_> { .report_latency() .fetch_optional(&mut txn) .await?; - txn.commit().await.context("commit()")?; Ok(BlockStoreState { first: start, - last: match row { - None => None, - Some(row) => Some(zksync_protobuf::serde::deserialize(row.certificate)?), - }, + last: row + .map(|row| zksync_protobuf::serde::deserialize(row.certificate)) + .transpose()?, }) } diff --git a/core/lib/protobuf_config/build.rs b/core/lib/protobuf_config/build.rs index 9a23d015239f..5705ed44c1d5 100644 --- a/core/lib/protobuf_config/build.rs +++ b/core/lib/protobuf_config/build.rs @@ -3,7 +3,7 @@ fn main() { zksync_protobuf_build::Config { input_root: "src/proto".into(), proto_root: "zksync".into(), - dependencies: vec![], + dependencies: vec!["::zksync_protobuf::proto".parse().unwrap()], protobuf_crate: "::zksync_protobuf".parse().unwrap(), is_public: true, } diff --git a/core/lib/protobuf_config/src/consensus.rs b/core/lib/protobuf_config/src/consensus.rs index 428333f450c6..3d2c862d7639 100644 --- a/core/lib/protobuf_config/src/consensus.rs +++ b/core/lib/protobuf_config/src/consensus.rs @@ -1,10 +1,10 @@ use anyhow::Context as _; use zksync_basic_types::L2ChainId; use zksync_config::configs::consensus::{ - ConsensusConfig, GenesisSpec, Host, NodePublicKey, ProtocolVersion, ValidatorPublicKey, - WeightedValidator, + ConsensusConfig, GenesisSpec, Host, NodePublicKey, ProtocolVersion, RpcConfig, + ValidatorPublicKey, WeightedValidator, }; -use zksync_protobuf::{repr::ProtoRepr, required}; +use zksync_protobuf::{read_optional, repr::ProtoRepr, required, ProtoFmt}; use crate::{proto::consensus as proto, read_optional_repr}; @@ -54,6 +54,20 @@ impl ProtoRepr for proto::GenesisSpec { } } +impl ProtoRepr for proto::RpcConfig { + type Type = RpcConfig; + fn read(&self) -> anyhow::Result { + Ok(Self::Type { + get_block_rate: read_optional(&self.get_block_rate).context("get_block_rate")?, + }) + } + fn build(this: &Self::Type) -> Self { + Self { + get_block_rate: this.get_block_rate.as_ref().map(ProtoFmt::build), + } + } +} + impl ProtoRepr for proto::Config { type Type = ConsensusConfig; fn read(&self) -> anyhow::Result { @@ -85,6 +99,7 @@ impl ProtoRepr for proto::Config { .map(|(i, e)| read_addr(e).context(i)) .collect::>()?, genesis_spec: read_optional_repr(&self.genesis_spec).context("genesis_spec")?, + rpc: read_optional_repr(&self.rpc_config).context("rpc_config")?, }) } @@ -110,6 +125,7 @@ impl ProtoRepr for proto::Config { }) .collect(), genesis_spec: this.genesis_spec.as_ref().map(ProtoRepr::build), + rpc_config: this.rpc.as_ref().map(ProtoRepr::build), } } } diff --git a/core/lib/protobuf_config/src/proto/core/consensus.proto b/core/lib/protobuf_config/src/proto/core/consensus.proto index aa23ad9192f9..5b59e5151cf7 100644 --- a/core/lib/protobuf_config/src/proto/core/consensus.proto +++ b/core/lib/protobuf_config/src/proto/core/consensus.proto @@ -29,6 +29,8 @@ syntax = "proto3"; package zksync.core.consensus; +import "zksync/std.proto"; + // (public key, ip address) of a gossip network node. message NodeAddr { optional string key = 1; // required; NodePublicKey @@ -49,6 +51,11 @@ message GenesisSpec { optional string leader = 4; // required; ValidatorPublicKey } +// Per peer connection RPC rate limits. +message RpcConfig { + optional std.RateLimit get_block_rate = 1; // optional; defaults to 10 blocks/s. +} + message Config { reserved 3; reserved "validators"; @@ -79,5 +86,9 @@ message Config { // Used to (re)initialize genesis if needed. // External nodes fetch the genesis from the main node. optional GenesisSpec genesis_spec = 8; + + // RPC rate limits configuration. + // If missing, defaults are used. + optional RpcConfig rpc_config = 9; // optional } diff --git a/core/lib/protobuf_config/src/tests.rs b/core/lib/protobuf_config/src/tests.rs index d9693aaffcbe..9ea69c17236d 100644 --- a/core/lib/protobuf_config/src/tests.rs +++ b/core/lib/protobuf_config/src/tests.rs @@ -20,6 +20,7 @@ fn test_encoding() { test_encode_all_formats::>(rng); test_encode_all_formats::>(rng); test_encode_all_formats::>(rng); + test_encode_all_formats::>(rng); test_encode_all_formats::>(rng); test_encode_all_formats::>(rng); test_encode_all_formats::>(rng); diff --git a/core/node/consensus/src/config.rs b/core/node/consensus/src/config.rs index af64026c6e28..cac9e9296227 100644 --- a/core/node/consensus/src/config.rs +++ b/core/node/consensus/src/config.rs @@ -91,6 +91,10 @@ pub(super) fn executor( append(k, v).with_context(|| format!("gossip_static_outbound[{i}]"))?; } } + + let mut rpc = executor::RpcConfig::default(); + rpc.get_block_rate = cfg.rpc().get_block_rate(); + Ok(executor::Config { server_addr: cfg.server_addr, public_addr: net::Host(cfg.public_addr.0.clone()), @@ -107,6 +111,7 @@ pub(super) fn executor( .collect::>() .context("gossip_static_inbound")?, gossip_static_outbound, + rpc, debug_page: None, }) } diff --git a/core/node/consensus/src/storage/mod.rs b/core/node/consensus/src/storage/mod.rs index e0652e336917..894c0c1c05e7 100644 --- a/core/node/consensus/src/storage/mod.rs +++ b/core/node/consensus/src/storage/mod.rs @@ -347,10 +347,12 @@ pub(super) struct Store { persisted: sync::watch::Receiver, } +struct PersistedState(sync::watch::Sender); + /// Background task of the `Store`. pub struct StoreRunner { pool: ConnectionPool, - persisted: sync::watch::Sender, + persisted: PersistedState, certificates: ctx::channel::UnboundedReceiver, } @@ -378,18 +380,55 @@ impl Store { }, StoreRunner { pool, - persisted, + persisted: PersistedState(persisted), certificates: certs_recv, }, )) } } +impl PersistedState { + /// Updates `persisted` to new. + /// Ends of the range can only be moved forward. + /// If `persisted.first` is moved forward, it means that blocks have been pruned. + /// If `persisted.last` is moved forward, it means that new blocks with certificates have been + /// persisted. + fn update(&self, new: storage::BlockStoreState) { + self.0.send_if_modified(|p| { + if &new == p { + return false; + } + p.first = p.first.max(new.first); + if p.next() < new.next() { + p.last = new.last; + } + true + }); + } + + /// Checks if the given certificate is exactly the next one that should + /// be persisted. + fn should_be_persisted(&self, cert: &validator::CommitQC) -> bool { + self.0.borrow().next() == cert.header().number + } + + /// Appends the `cert` to `persisted` range. + fn advance(&self, cert: validator::CommitQC) { + self.0.send_if_modified(|p| { + if p.next() != cert.header().number { + return false; + } + p.last = Some(cert); + true + }); + } +} + impl StoreRunner { pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let res = scope::run!(ctx, |ctx, s| async { s.spawn::<()>(async { - // Loop observing the oldest block in storage. + // Loop updating `persisted` whenever blocks get pruned. const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); loop { let range = self @@ -400,16 +439,7 @@ impl StoreRunner { .certificates_range(ctx) .await .wrap("certificates_range()")?; - self.persisted.send_if_modified(|p| { - if &range == p { - return false; - } - p.first = p.first.max(range.first); - if p.next() < range.next() { - p.last = range.last; - } - true - }); + self.persisted.update(range); ctx.sleep(POLL_INTERVAL).await?; } }); @@ -419,31 +449,28 @@ impl StoreRunner { loop { let cert = self.certificates.recv(ctx).await?; // Wait for the block to be persisted, so that we can attach a cert to it. - // It may happen that persisted blocks get pruned and this certificate is not needed any - // more. - while self.persisted.borrow().next() == cert.header().number { + // We may exit this loop without persisting the certificate in case the + // corresponding block has been pruned in the meantime. + while self.persisted.should_be_persisted(&cert) { use consensus_dal::InsertCertificateError as E; // Try to insert the cert. - match self + let res = self .pool .connection(ctx) .await .wrap("connection")? .insert_certificate(ctx, &cert) - .await - { + .await; + match res { Ok(()) => { - self.persisted.send_if_modified(|p| { - if p.next() != cert.header().number { - return false; - } - p.last = Some(cert); - true - }); + // Insertion succeeded: update persisted state + // and wait for the next cert. + self.persisted.advance(cert); break; } - // In case of missing payload we just need to wait longer. Err(InsertCertificateError::Inner(E::MissingPayload)) => { + // the payload is not in storage, it's either not yet persisted + // or already pruned. We will retry after a delay. ctx.sleep(POLL_INTERVAL).await?; } Err(InsertCertificateError::Canceled(err)) => { @@ -611,19 +638,19 @@ impl PayloadManager for Store { // Dummy implementation #[async_trait::async_trait] impl storage::PersistentBatchStore for Store { - fn last_batch(&self) -> attester::BatchNumber { + async fn last_batch(&self) -> attester::BatchNumber { unimplemented!() } - fn last_batch_qc(&self) -> attester::BatchQC { + async fn last_batch_qc(&self) -> attester::BatchQC { unimplemented!() } - fn get_batch(&self, _number: attester::BatchNumber) -> Option { + async fn get_batch(&self, _number: attester::BatchNumber) -> Option { None } - fn get_batch_qc(&self, _number: attester::BatchNumber) -> Option { + async fn get_batch_qc(&self, _number: attester::BatchNumber) -> Option { None } - fn store_qc(&self, _qc: attester::BatchQC) { + async fn store_qc(&self, _qc: attester::BatchQC) { unimplemented!() } fn persisted(&self) -> sync::watch::Receiver { diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index d20c379a5d66..514e66c81fea 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -100,6 +100,7 @@ pub(super) fn config(cfg: &network::Config) -> (config::ConsensusConfig, config: }], leader: config::ValidatorPublicKey(key.public().encode()), }), + rpc: None, }, config::ConsensusSecrets { node_key: Some(config::NodeSecretKey(cfg.gossip.key.encode().into())), diff --git a/prover/Cargo.lock b/prover/Cargo.lock index 61c772727a49..9c3ecb04a85a 100644 --- a/prover/Cargo.lock +++ b/prover/Cargo.lock @@ -7712,7 +7712,7 @@ dependencies = [ [[package]] name = "zksync_concurrency" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "once_cell", @@ -7736,6 +7736,7 @@ dependencies = [ "secrecy", "serde", "zksync_basic_types", + "zksync_concurrency", "zksync_consensus_utils", "zksync_crypto_primitives", ] @@ -7743,7 +7744,7 @@ dependencies = [ [[package]] name = "zksync_consensus_crypto" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "blst", @@ -7766,7 +7767,7 @@ dependencies = [ [[package]] name = "zksync_consensus_roles" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "bit-vec", @@ -7787,7 +7788,7 @@ dependencies = [ [[package]] name = "zksync_consensus_storage" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "async-trait", @@ -7805,7 +7806,7 @@ dependencies = [ [[package]] name = "zksync_consensus_utils" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "rand 0.8.5", @@ -8105,7 +8106,7 @@ dependencies = [ [[package]] name = "zksync_protobuf" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "bit-vec", @@ -8125,7 +8126,7 @@ dependencies = [ [[package]] name = "zksync_protobuf_build" version = "0.1.0" -source = "git+https://github.com/matter-labs/era-consensus.git?rev=177881457f392fca990dbb3df1695737d90fd0c7#177881457f392fca990dbb3df1695737d90fd0c7" +source = "git+https://github.com/matter-labs/era-consensus.git?rev=ba7b171456e7362eada685234a91c20907b6a097#ba7b171456e7362eada685234a91c20907b6a097" dependencies = [ "anyhow", "heck 0.5.0", From 4303d724e43173a67cabc5726ca61d2a4719865a Mon Sep 17 00:00:00 2001 From: Grzegorz Prusak Date: Fri, 28 Jun 2024 13:31:24 +0200 Subject: [PATCH 8/8] removed useless txns --- core/lib/dal/src/consensus_dal.rs | 32 ++++++------------------------- 1 file changed, 6 insertions(+), 26 deletions(-) diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs index 15f76c7f55eb..d4178fa32e00 100644 --- a/core/lib/dal/src/consensus_dal.rs +++ b/core/lib/dal/src/consensus_dal.rs @@ -215,13 +215,8 @@ impl ConsensusDal<'_, '_> { /// Next block that should be inserted to storage. pub async fn next_block(&mut self) -> anyhow::Result { - let mut txn = self + if let Some(last) = self .storage - .transaction_builder()? - .set_readonly() - .build() - .await?; - if let Some(last) = txn .blocks_dal() .get_sealed_l2_block_number() .await @@ -229,7 +224,8 @@ impl ConsensusDal<'_, '_> { { return Ok(validator::BlockNumber(last.0.into()) + 1); } - let next = txn + let next = self + .storage .consensus_dal() .first_block() .await @@ -241,25 +237,9 @@ impl ConsensusDal<'_, '_> { /// Currently, certificates are NOT generated synchronously with L2 blocks, /// so it might NOT be the certificate for the last L2 block. pub async fn certificates_range(&mut self) -> anyhow::Result { - let mut txn = self - .storage - .transaction_builder()? - .set_readonly() - .build() - .await?; // It cannot be older than genesis first block. - let mut start = txn - .consensus_dal() - .genesis() - .await? - .context("genesis()")? - .first_block; - start = start.max( - txn.consensus_dal() - .first_block() - .await - .context("first_block()")?, - ); + let mut start = self.genesis().await?.context("genesis()")?.first_block; + start = start.max(self.first_block().await.context("first_block()")?); let row = sqlx::query!( r#" SELECT @@ -277,7 +257,7 @@ impl ConsensusDal<'_, '_> { ) .instrument("last_certificate") .report_latency() - .fetch_optional(&mut txn) + .fetch_optional(self.storage) .await?; Ok(BlockStoreState { first: start,