From 5886b8df304ded15104ec228e0477bc5f44b7fbe Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Tue, 9 Jul 2024 16:57:26 +0100 Subject: [PATCH] feat: L1 batch QC database (BFT-476) (#2340) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ - [x] Add an `l1_batches_consensus` table to hold [L1 batch Quorum Certificates](https://github.com/matter-labs/era-consensus/blob/177881457f392fca990dbb3df1695737d90fd0c7/node/libs/roles/src/attester/messages/batch.rs#L67) from Attesters - [x] Add attesters to the config - [x] Implement methods in `PersistentBatchStore` - [x] `persisted` - [x] `last_batch` - [x] `last_batch_qc` - [x] `get_batch` - [x] `get_batch_qc` - [x] `store_qc` - [ ] `queue_next_batch` - _not going to implement for now_ - [ ] assign `SyncBatch::proof` - _not going to implement for now_ - [x] Add tests for all new methods in `ConsensusDal` and the `PersistentBatchStore` ### Caveat Implemented the updating of `persisted` with a loop that polls the database for newly available `SyncBatch` records, even if they have no proof. This inevitably triggers the gossiping of batch statuses and the pulling of `SyncBatch` between peers. For this reason `queue_next_batch` just drop the data, since we can't do anything with it without the proof yet. Returning an error or panicking would stop the consensus tasks. I ended up disabling the `persisted` by leaving its dummy implementation in place because when enabled the full node tests keep going on forever, printing the following logs in a loop: ```console ❯ RUST_LOG=info zk test rust test_full_nodes --no-capture ... 2024-07-03T14:22:57.882784Z INFO in{addr=[::1]:53082}: zksync_consensus_network: 191: new connection 2024-07-03T14:22:57.883457Z INFO in{addr=[::1]:53082}:gossip: zksync_consensus_network::gossip::runner: 383: peer = node:public:ed25519:068ffa0b3fedbbe5c2a6da3defd26e0d084248f12bfe98db85f7785b0b08b63e 2024-07-03T14:22:57.883764Z INFO out{addr="[::1]:52998"}:gossip: zksync_consensus_network::gossip::runner: 416: peer = node:public:ed25519:7710ed90aad9f5859dfba06e13fb4e6fb0fe4d686f81f9d819464ad1fdc371bd 2024-07-03T14:22:57.886204Z INFO in{addr=[::1]:53082}:gossip: zksync_consensus_network::rpc: 222: message too large: max = 10240B, got 13773B 2024-07-03T14:22:57.886280Z INFO out{addr="[::1]:52998"}:gossip: zksync_consensus_network::rpc: 222: message too large: max = 10240B, got 13773B 2024-07-03T14:22:57.886633Z INFO in{addr=[::1]:53082}:gossip: zksync_consensus_network::rpc: 222: canceled ... 2024-07-03T14:22:57.888143Z INFO out{addr="[::1]:52998"}:gossip: zksync_consensus_network::rpc: 222: disconnected ... 2024-07-03T14:22:57.888390Z INFO zksync_consensus_network: 216: [::1]:53082: gossip.run_inbound_stream(): push_batch_store_state.: end of stream 2024-07-03T14:22:57.888446Z INFO zksync_consensus_network: 158: gossip.run_outbound_stream("[::1]:52998"): push_batch_store_state.: end of stream ``` So in the tests the message size exceeds the maximum. I think it's [hardcoded here](https://github.com/matter-labs/era-consensus/blob/decb988eb9e1a45fd5171d2cc540a360d9ca5f1f/node/actors/network/src/gossip/runner.rs#L109). Since this functionality isn't expected to work, I think we can disable it for now. ## Why ❔ The workflow of signing and submitting L1 batch certificates will be like this: 1. Data is inserted into the `l1_batches` table. 2. If the node is one of the Attesters it picks up the batch, signs and sends it to the gossip layer via https://github.com/matter-labs/era-consensus/pull/137 3. The consensus collects votes about the L1 batch, and when the threshold is reached it saves the quorum certificate into Postgres 4. The node monitors Main Node (later L1) for new batch QCs and upserts them into the database (the QC can be different than what a particular node inserted based on gossip). This way a node which has been down for a period of time can backfill any QCs it missed. It is assumed that the Main Node API only serves QCs that have no gaps following them, ie. they are final - if it was L1 it wouldn't allow submissions with gaps, and this simulates that semantic. 5. The last height that doesn't have any gaps following it is used as a floor for what needs to be (re)signed and gossiped This PR supports the above workflow up to step 3. ## Checklist - [x] PR title corresponds to the body of PR (we generate changelog entries from PRs). - [x] Tests for the changes have been added / updated. - [x] Documentation comments have been added / updated. - [x] Code has been formatted via `zk fmt` and `zk lint`. --------- Co-authored-by: Bruno França --- Cargo.toml | 2 +- core/lib/config/src/configs/consensus.rs | 28 +- core/lib/config/src/testonly.rs | 16 +- ...37978579ba22eec525912c4aeeb235c3b984c.json | 20 ++ ...e733f635b183960226848b280383042ea3637.json | 22 ++ ...26306b02e328d7b1b69c495443bd2ca7f7510.json | 15 + ...240627142548_l1_batches_consensus.down.sql | 1 + ...20240627142548_l1_batches_consensus.up.sql | 9 + core/lib/dal/src/consensus_dal.rs | 214 +++++++++++- core/lib/dal/src/models/storage_eth_tx.rs | 2 +- .../lib/dal/src/models/storage_transaction.rs | 2 +- core/lib/protobuf_config/src/consensus.rs | 28 +- .../src/proto/config/secrets.proto | 1 + .../src/proto/core/consensus.proto | 7 + core/lib/protobuf_config/src/secrets.rs | 10 +- core/node/consensus/src/config.rs | 30 +- core/node/consensus/src/en.rs | 16 +- core/node/consensus/src/mn.rs | 10 +- core/node/consensus/src/storage/connection.rs | 183 +++++++++- core/node/consensus/src/storage/store.rs | 315 +++++++++++++----- core/node/consensus/src/storage/testonly.rs | 16 +- core/node/consensus/src/testonly.rs | 10 + core/node/consensus/src/tests.rs | 112 +++++-- 23 files changed, 916 insertions(+), 153 deletions(-) create mode 100644 core/lib/dal/.sqlx/query-849d54b4cf9212010fb4e41ce8137978579ba22eec525912c4aeeb235c3b984c.json create mode 100644 core/lib/dal/.sqlx/query-8c763c05187a409a54806b0eb88e733f635b183960226848b280383042ea3637.json create mode 100644 core/lib/dal/.sqlx/query-d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510.json create mode 100644 core/lib/dal/migrations/20240627142548_l1_batches_consensus.down.sql create mode 100644 core/lib/dal/migrations/20240627142548_l1_batches_consensus.up.sql diff --git a/Cargo.toml b/Cargo.toml index 34e5cb6141c7..443f85493865 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -283,4 +283,4 @@ zksync_node_consensus = { path = "core/node/consensus" } zksync_contract_verification_server = { path = "core/node/contract_verification_server" } zksync_node_api_server = { path = "core/node/api_server" } zksync_tee_verifier_input_producer = { path = "core/node/tee_verifier_input_producer" } -zksync_base_token_adjuster = {path = "core/node/base_token_adjuster"} +zksync_base_token_adjuster = { path = "core/node/base_token_adjuster" } diff --git a/core/lib/config/src/configs/consensus.rs b/core/lib/config/src/configs/consensus.rs index 433b05c954cf..ec4edd486ac0 100644 --- a/core/lib/config/src/configs/consensus.rs +++ b/core/lib/config/src/configs/consensus.rs @@ -12,6 +12,14 @@ pub struct ValidatorPublicKey(pub String); #[derive(Debug, Clone)] pub struct ValidatorSecretKey(pub Secret); +/// `zksync_consensus_crypto::TextFmt` representation of `zksync_consensus_roles::attester::PublicKey`. +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct AttesterPublicKey(pub String); + +/// `zksync_consensus_crypto::TextFmt` representation of `zksync_consensus_roles::attester::SecretKey`. +#[derive(Debug, Clone)] +pub struct AttesterSecretKey(pub Secret); + /// `zksync_consensus_crypto::TextFmt` representation of `zksync_consensus_roles::node::PublicKey`. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct NodePublicKey(pub String); @@ -26,6 +34,12 @@ impl PartialEq for ValidatorSecretKey { } } +impl PartialEq for AttesterSecretKey { + fn eq(&self, other: &Self) -> bool { + self.0.expose_secret().eq(other.0.expose_secret()) + } +} + impl PartialEq for NodeSecretKey { fn eq(&self, other: &Self) -> bool { self.0.expose_secret().eq(other.0.expose_secret()) @@ -41,6 +55,15 @@ pub struct WeightedValidator { pub weight: u64, } +/// Copy-paste of `zksync_consensus_roles::attester::WeightedAttester`. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct WeightedAttester { + /// Attester key + pub key: AttesterPublicKey, + /// Attester weight inside the Committee. + pub weight: u64, +} + /// Copy-paste of `zksync_concurrency::net::Host`. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct Host(pub String); @@ -61,6 +84,8 @@ pub struct GenesisSpec { pub protocol_version: ProtocolVersion, /// The validator committee. Represents `zksync_consensus_roles::validator::Committee`. pub validators: Vec, + /// The attester committee. Represents `zksync_consensus_roles::attester::Committee`. + pub attesters: Vec, /// Leader of the committee. Represents /// `zksync_consensus_roles::validator::LeaderSelectionMode::Sticky`. pub leader: ValidatorPublicKey, @@ -119,9 +144,10 @@ impl ConsensusConfig { } } -/// Secrets need for consensus. +/// Secrets needed for consensus. #[derive(Debug, Clone, PartialEq)] pub struct ConsensusSecrets { pub validator_key: Option, + pub attester_key: Option, pub node_key: Option, } diff --git a/core/lib/config/src/testonly.rs b/core/lib/config/src/testonly.rs index 939b24ea8c76..c41180fe42b3 100644 --- a/core/lib/config/src/testonly.rs +++ b/core/lib/config/src/testonly.rs @@ -725,6 +725,16 @@ impl Distribution for EncodeDist { } } +impl Distribution for EncodeDist { + fn sample(&self, rng: &mut R) -> configs::consensus::WeightedAttester { + use configs::consensus::{AttesterPublicKey, WeightedAttester}; + WeightedAttester { + key: AttesterPublicKey(self.sample(rng)), + weight: self.sample(rng), + } + } +} + impl Distribution for EncodeDist { fn sample(&self, rng: &mut R) -> configs::consensus::GenesisSpec { use configs::consensus::{GenesisSpec, ProtocolVersion, ValidatorPublicKey}; @@ -732,6 +742,7 @@ impl Distribution for EncodeDist { chain_id: L2ChainId::default(), protocol_version: ProtocolVersion(self.sample(rng)), validators: self.sample_collect(rng), + attesters: self.sample_collect(rng), leader: ValidatorPublicKey(self.sample(rng)), } } @@ -769,9 +780,12 @@ impl Distribution for EncodeDist { impl Distribution for EncodeDist { fn sample(&self, rng: &mut R) -> configs::consensus::ConsensusSecrets { - use configs::consensus::{ConsensusSecrets, NodeSecretKey, ValidatorSecretKey}; + use configs::consensus::{ + AttesterSecretKey, ConsensusSecrets, NodeSecretKey, ValidatorSecretKey, + }; ConsensusSecrets { validator_key: self.sample_opt(|| ValidatorSecretKey(String::into(self.sample(rng)))), + attester_key: self.sample_opt(|| AttesterSecretKey(String::into(self.sample(rng)))), node_key: self.sample_opt(|| NodeSecretKey(String::into(self.sample(rng)))), } } diff --git a/core/lib/dal/.sqlx/query-849d54b4cf9212010fb4e41ce8137978579ba22eec525912c4aeeb235c3b984c.json b/core/lib/dal/.sqlx/query-849d54b4cf9212010fb4e41ce8137978579ba22eec525912c4aeeb235c3b984c.json new file mode 100644 index 000000000000..5130763af73c --- /dev/null +++ b/core/lib/dal/.sqlx/query-849d54b4cf9212010fb4e41ce8137978579ba22eec525912c4aeeb235c3b984c.json @@ -0,0 +1,20 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n MAX(l1_batch_number) AS \"number\"\n FROM\n l1_batches_consensus\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "number", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + }, + "hash": "849d54b4cf9212010fb4e41ce8137978579ba22eec525912c4aeeb235c3b984c" +} diff --git a/core/lib/dal/.sqlx/query-8c763c05187a409a54806b0eb88e733f635b183960226848b280383042ea3637.json b/core/lib/dal/.sqlx/query-8c763c05187a409a54806b0eb88e733f635b183960226848b280383042ea3637.json new file mode 100644 index 000000000000..930c1c1a9fed --- /dev/null +++ b/core/lib/dal/.sqlx/query-8c763c05187a409a54806b0eb88e733f635b183960226848b280383042ea3637.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT\n certificate\n FROM\n l1_batches_consensus\n WHERE\n l1_batch_number = $1\n ", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "certificate", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Int8" + ] + }, + "nullable": [ + false + ] + }, + "hash": "8c763c05187a409a54806b0eb88e733f635b183960226848b280383042ea3637" +} diff --git a/core/lib/dal/.sqlx/query-d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510.json b/core/lib/dal/.sqlx/query-d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510.json new file mode 100644 index 000000000000..a42fbe98ff2f --- /dev/null +++ b/core/lib/dal/.sqlx/query-d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "\n INSERT INTO\n l1_batches_consensus (l1_batch_number, certificate, created_at, updated_at)\n VALUES\n ($1, $2, NOW(), NOW())\n ON CONFLICT (l1_batch_number) DO NOTHING\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8", + "Jsonb" + ] + }, + "nullable": [] + }, + "hash": "d9d71913a116abf390c71f5229426306b02e328d7b1b69c495443bd2ca7f7510" +} diff --git a/core/lib/dal/migrations/20240627142548_l1_batches_consensus.down.sql b/core/lib/dal/migrations/20240627142548_l1_batches_consensus.down.sql new file mode 100644 index 000000000000..45114088eaa8 --- /dev/null +++ b/core/lib/dal/migrations/20240627142548_l1_batches_consensus.down.sql @@ -0,0 +1 @@ +DROP TABLE l1_batches_consensus; diff --git a/core/lib/dal/migrations/20240627142548_l1_batches_consensus.up.sql b/core/lib/dal/migrations/20240627142548_l1_batches_consensus.up.sql new file mode 100644 index 000000000000..71c3854d640e --- /dev/null +++ b/core/lib/dal/migrations/20240627142548_l1_batches_consensus.up.sql @@ -0,0 +1,9 @@ +CREATE TABLE l1_batches_consensus ( + l1_batch_number BIGINT PRIMARY KEY REFERENCES l1_batches (number) ON DELETE CASCADE, + certificate JSONB NOT NULL, + + created_at TIMESTAMP NOT NULL, + updated_at TIMESTAMP NOT NULL, + + CHECK((certificate->'message'->'number')::jsonb::numeric = l1_batch_number) +); diff --git a/core/lib/dal/src/consensus_dal.rs b/core/lib/dal/src/consensus_dal.rs index d4178fa32e00..3efdf5ee577b 100644 --- a/core/lib/dal/src/consensus_dal.rs +++ b/core/lib/dal/src/consensus_dal.rs @@ -1,12 +1,13 @@ use anyhow::Context as _; -use zksync_consensus_roles::validator; +use bigdecimal::Zero; +use zksync_consensus_roles::{attester, validator}; use zksync_consensus_storage::{BlockStoreState, ReplicaState}; use zksync_db_connection::{ connection::Connection, error::{DalError, DalResult, SqlxContext}, instrument::{InstrumentExt, Instrumented}, }; -use zksync_types::L2BlockNumber; +use zksync_types::{L1BatchNumber, L2BlockNumber}; pub use crate::consensus::Payload; use crate::{Core, CoreDal}; @@ -20,7 +21,7 @@ pub struct ConsensusDal<'a, 'c> { /// Error returned by `ConsensusDal::insert_certificate()`. #[derive(thiserror::Error, Debug)] pub enum InsertCertificateError { - #[error("corresponding L2 block is missing")] + #[error("corresponding payload is missing")] MissingPayload, #[error("certificate doesn't match the payload")] PayloadMismatch, @@ -236,7 +237,7 @@ impl ConsensusDal<'_, '_> { /// Fetches the last consensus certificate. /// Currently, certificates are NOT generated synchronously with L2 blocks, /// so it might NOT be the certificate for the last L2 block. - pub async fn certificates_range(&mut self) -> anyhow::Result { + pub async fn block_certificates_range(&mut self) -> anyhow::Result { // It cannot be older than genesis first block. let mut start = self.genesis().await?.context("genesis()")?.first_block; start = start.max(self.first_block().await.context("first_block()")?); @@ -255,7 +256,7 @@ impl ConsensusDal<'_, '_> { "#, i64::try_from(start.0)?, ) - .instrument("last_certificate") + .instrument("block_certificate_range") .report_latency() .fetch_optional(self.storage) .await?; @@ -268,7 +269,7 @@ impl ConsensusDal<'_, '_> { } /// Fetches the consensus certificate for the L2 block with the given `block_number`. - pub async fn certificate( + pub async fn block_certificate( &mut self, block_number: validator::BlockNumber, ) -> anyhow::Result> { @@ -283,7 +284,33 @@ impl ConsensusDal<'_, '_> { "#, i64::try_from(block_number.0)? ) - .instrument("certificate") + .instrument("block_certificate") + .report_latency() + .fetch_optional(self.storage) + .await? + else { + return Ok(None); + }; + Ok(Some(zksync_protobuf::serde::deserialize(row.certificate)?)) + } + + /// Fetches the attester certificate for the L1 batch with the given `batch_number`. + pub async fn batch_certificate( + &mut self, + batch_number: attester::BatchNumber, + ) -> anyhow::Result> { + let Some(row) = sqlx::query!( + r#" + SELECT + certificate + FROM + l1_batches_consensus + WHERE + l1_batch_number = $1 + "#, + i64::try_from(batch_number.0)? + ) + .instrument("batch_certificate") .report_latency() .fetch_optional(self.storage) .await? @@ -345,7 +372,7 @@ impl ConsensusDal<'_, '_> { /// Inserts a certificate for the L2 block `cert.header().number`. /// Fails if certificate doesn't match the stored block. - pub async fn insert_certificate( + pub async fn insert_block_certificate( &mut self, cert: &validator::CommitQC, ) -> Result<(), InsertCertificateError> { @@ -370,22 +397,102 @@ impl ConsensusDal<'_, '_> { header.number.0 as i64, zksync_protobuf::serde::serialize(cert, serde_json::value::Serializer).unwrap(), ) - .instrument("insert_certificate") + .instrument("insert_block_certificate") .report_latency() .execute(&mut txn) .await?; txn.commit().await.context("commit")?; Ok(()) } + + /// Inserts a certificate for the L1 batch. + /// + /// Insertion is allowed even if it creates gaps in the L1 batch history. + /// + /// It fails if the batch payload is missing or it's not consistent with the QC. + pub async fn insert_batch_certificate( + &mut self, + cert: &attester::BatchQC, + ) -> Result<(), InsertCertificateError> { + use InsertCertificateError as E; + let mut txn = self.storage.start_transaction().await?; + + let l1_batch_number = L1BatchNumber(cert.message.number.0 as u32); + let _l1_batch_header = txn + .blocks_dal() + .get_l1_batch_header(l1_batch_number) + .await? + .ok_or(E::MissingPayload)?; + + // TODO: Verify that the certificate matches the stored batch: + // * add the hash of the batch to the `BatchQC` + // * find out which field in the `l1_batches` table contains the hash we need to match + // * ideally move the responsibility of validation outside this method + + // if header.payload != want_payload.encode().hash() { + // return Err(E::PayloadMismatch); + // } + + let res = sqlx::query!( + r#" + INSERT INTO + l1_batches_consensus (l1_batch_number, certificate, created_at, updated_at) + VALUES + ($1, $2, NOW(), NOW()) + ON CONFLICT (l1_batch_number) DO NOTHING + "#, + i64::from(l1_batch_number.0), + zksync_protobuf::serde::serialize(cert, serde_json::value::Serializer).unwrap(), + ) + .instrument("insert_batch_certificate") + .report_latency() + .execute(&mut txn) + .await?; + + if res.rows_affected().is_zero() { + tracing::debug!(%l1_batch_number, "duplicate batch certificate"); + } + + txn.commit().await.context("commit")?; + + Ok(()) + } + + /// Gets a number of the last L1 batch that was inserted. It might have gaps before it, + /// depending on the order in which votes have been collected over gossip by consensus. + pub async fn get_last_batch_certificate_number( + &mut self, + ) -> DalResult> { + let row = sqlx::query!( + r#" + SELECT + MAX(l1_batch_number) AS "number" + FROM + l1_batches_consensus + "# + ) + .instrument("get_last_batch_certificate_number") + .report_latency() + .fetch_one(self.storage) + .await?; + + Ok(row + .number + .map(|number| attester::BatchNumber(number as u64))) + } } #[cfg(test)] mod tests { use rand::Rng as _; - use zksync_consensus_roles::validator; + use zksync_consensus_roles::{attester, validator}; use zksync_consensus_storage::ReplicaState; + use zksync_types::{L1BatchNumber, ProtocolVersion}; - use crate::{ConnectionPool, Core, CoreDal}; + use crate::{ + tests::{create_l1_batch_header, create_l2_block_header}, + ConnectionPool, Core, CoreDal, + }; #[tokio::test] async fn replica_state_read_write() { @@ -421,4 +528,89 @@ mod tests { } } } + + #[tokio::test] + async fn test_batch_certificate() { + let rng = &mut rand::thread_rng(); + let pool = ConnectionPool::::test_pool().await; + let mut conn = pool.connection().await.unwrap(); + + let mut mock_batch_qc = |number: L1BatchNumber| { + let mut cert: attester::BatchQC = rng.gen(); + cert.message.number.0 = u64::from(number.0); + cert.signatures.add(rng.gen(), rng.gen()); + cert + }; + + // Required for inserting l2 blocks + conn.protocol_versions_dal() + .save_protocol_version_with_tx(&ProtocolVersion::default()) + .await + .unwrap(); + + // Insert some mock L2 blocks and L1 batches + let mut block_number = 0; + let mut batch_number = 0; + for _ in 0..3 { + for _ in 0..3 { + block_number += 1; + let l2_block = create_l2_block_header(block_number); + conn.blocks_dal().insert_l2_block(&l2_block).await.unwrap(); + } + batch_number += 1; + let l1_batch = create_l1_batch_header(batch_number); + + conn.blocks_dal() + .insert_mock_l1_batch(&l1_batch) + .await + .unwrap(); + + conn.blocks_dal() + .mark_l2_blocks_as_executed_in_l1_batch(l1_batch.number) + .await + .unwrap(); + } + + let l1_batch_number = L1BatchNumber(batch_number); + + // Insert a batch certificate for the last L1 batch. + let cert1 = mock_batch_qc(l1_batch_number); + + conn.consensus_dal() + .insert_batch_certificate(&cert1) + .await + .unwrap(); + + // Try insert duplicate batch certificate for the same batch. + let cert2 = mock_batch_qc(l1_batch_number); + + conn.consensus_dal() + .insert_batch_certificate(&cert2) + .await + .unwrap(); + + // Retrieve the latest certificate. + let number = conn + .consensus_dal() + .get_last_batch_certificate_number() + .await + .unwrap() + .unwrap(); + + let cert = conn + .consensus_dal() + .batch_certificate(number) + .await + .unwrap() + .unwrap(); + + assert_eq!(cert, cert1, "duplicates are ignored"); + + // Try insert batch certificate for non-existing batch + let cert3 = mock_batch_qc(l1_batch_number.next()); + conn.consensus_dal() + .insert_batch_certificate(&cert3) + .await + .expect_err("missing payload"); + } } diff --git a/core/lib/dal/src/models/storage_eth_tx.rs b/core/lib/dal/src/models/storage_eth_tx.rs index 615b365d8533..2654ffe0e0a7 100644 --- a/core/lib/dal/src/models/storage_eth_tx.rs +++ b/core/lib/dal/src/models/storage_eth_tx.rs @@ -77,7 +77,7 @@ impl From for EthTx { .expect("Incorrect address in db"), raw_tx: tx.raw_tx.clone(), tx_type: AggregatedActionType::from_str(&tx.tx_type).expect("Wrong agg type"), - created_at_timestamp: tx.created_at.timestamp() as u64, + created_at_timestamp: tx.created_at.and_utc().timestamp() as u64, predicted_gas_cost: tx.predicted_gas_cost as u64, from_addr: tx.from_addr.map(|f| Address::from_slice(&f)), blob_sidecar: tx.blob_sidecar.map(|b| { diff --git a/core/lib/dal/src/models/storage_transaction.rs b/core/lib/dal/src/models/storage_transaction.rs index bce5e554f383..31a182a7eca0 100644 --- a/core/lib/dal/src/models/storage_transaction.rs +++ b/core/lib/dal/src/models/storage_transaction.rs @@ -296,7 +296,7 @@ impl From for Transaction { let hash = H256::from_slice(&tx.hash); let execute = serde_json::from_value::(tx.data.clone()) .unwrap_or_else(|_| panic!("invalid json in database for tx {:?}", hash)); - let received_timestamp_ms = tx.received_at.timestamp_millis() as u64; + let received_timestamp_ms = tx.received_at.and_utc().timestamp_millis() as u64; match tx.tx_format { Some(t) if t == i32::from(PRIORITY_OPERATION_L2_TX_TYPE) => Transaction { common_data: ExecuteTransactionCommon::L1(tx.into()), diff --git a/core/lib/protobuf_config/src/consensus.rs b/core/lib/protobuf_config/src/consensus.rs index 3d2c862d7639..c04120edcc54 100644 --- a/core/lib/protobuf_config/src/consensus.rs +++ b/core/lib/protobuf_config/src/consensus.rs @@ -1,8 +1,8 @@ use anyhow::Context as _; use zksync_basic_types::L2ChainId; use zksync_config::configs::consensus::{ - ConsensusConfig, GenesisSpec, Host, NodePublicKey, ProtocolVersion, RpcConfig, - ValidatorPublicKey, WeightedValidator, + AttesterPublicKey, ConsensusConfig, GenesisSpec, Host, NodePublicKey, ProtocolVersion, + RpcConfig, ValidatorPublicKey, WeightedAttester, WeightedValidator, }; use zksync_protobuf::{read_optional, repr::ProtoRepr, required, ProtoFmt}; @@ -24,6 +24,22 @@ impl ProtoRepr for proto::WeightedValidator { } } +impl ProtoRepr for proto::WeightedAttester { + type Type = WeightedAttester; + fn read(&self) -> anyhow::Result { + Ok(Self::Type { + key: AttesterPublicKey(required(&self.key).context("key")?.clone()), + weight: *required(&self.weight).context("weight")?, + }) + } + fn build(this: &Self::Type) -> Self { + Self { + key: Some(this.key.0.clone()), + weight: Some(this.weight), + } + } +} + impl ProtoRepr for proto::GenesisSpec { type Type = GenesisSpec; fn read(&self) -> anyhow::Result { @@ -41,6 +57,13 @@ impl ProtoRepr for proto::GenesisSpec { .map(|(i, x)| x.read().context(i)) .collect::>() .context("validators")?, + attesters: self + .attesters + .iter() + .enumerate() + .map(|(i, x)| x.read().context(i)) + .collect::>() + .context("attesters")?, leader: ValidatorPublicKey(required(&self.leader).context("leader")?.clone()), }) } @@ -49,6 +72,7 @@ impl ProtoRepr for proto::GenesisSpec { chain_id: Some(this.chain_id.as_u64()), protocol_version: Some(this.protocol_version.0), validators: this.validators.iter().map(ProtoRepr::build).collect(), + attesters: this.attesters.iter().map(ProtoRepr::build).collect(), leader: Some(this.leader.0.clone()), } } diff --git a/core/lib/protobuf_config/src/proto/config/secrets.proto b/core/lib/protobuf_config/src/proto/config/secrets.proto index fb328883f99d..b711d81d5754 100644 --- a/core/lib/protobuf_config/src/proto/config/secrets.proto +++ b/core/lib/protobuf_config/src/proto/config/secrets.proto @@ -16,6 +16,7 @@ message L1Secrets { message ConsensusSecrets { optional string validator_key = 1; // required for validator nodes; ValidatorSecretKey optional string node_key = 2; // required for any node; NodeSecretKey + optional string attester_key = 3; // required for attester nodes; AttesterSecretKey } message Secrets { diff --git a/core/lib/protobuf_config/src/proto/core/consensus.proto b/core/lib/protobuf_config/src/proto/core/consensus.proto index 5b59e5151cf7..2adc70886e9e 100644 --- a/core/lib/protobuf_config/src/proto/core/consensus.proto +++ b/core/lib/protobuf_config/src/proto/core/consensus.proto @@ -43,12 +43,19 @@ message WeightedValidator { optional uint64 weight = 2; // required } +// Weighted member of an attester committee. +message WeightedAttester { + optional string key = 1; // required; AttesterPublic + optional uint64 weight = 2; // required +} + // Consensus genesis specification. message GenesisSpec { optional uint64 chain_id = 1; // required; L2ChainId, should be the same as `l2_chain_id` in the `zksync.config.genesis.Genesis`. optional uint32 protocol_version = 2; // required; validator::ProtocolVersion repeated WeightedValidator validators = 3; // must be non-empty; validator committee. optional string leader = 4; // required; ValidatorPublicKey + repeated WeightedAttester attesters = 5; // can be empty; attester committee. } // Per peer connection RPC rate limits. diff --git a/core/lib/protobuf_config/src/secrets.rs b/core/lib/protobuf_config/src/secrets.rs index 91a05b31f196..43f537a5fbfa 100644 --- a/core/lib/protobuf_config/src/secrets.rs +++ b/core/lib/protobuf_config/src/secrets.rs @@ -4,7 +4,7 @@ use anyhow::Context; use secrecy::ExposeSecret; use zksync_basic_types::url::SensitiveUrl; use zksync_config::configs::{ - consensus::{ConsensusSecrets, NodeSecretKey, ValidatorSecretKey}, + consensus::{AttesterSecretKey, ConsensusSecrets, NodeSecretKey, ValidatorSecretKey}, secrets::Secrets, DatabaseSecrets, L1Secrets, }; @@ -98,6 +98,10 @@ impl ProtoRepr for proto::ConsensusSecrets { .validator_key .as_ref() .map(|x| ValidatorSecretKey(x.clone().into())), + attester_key: self + .attester_key + .as_ref() + .map(|x| AttesterSecretKey(x.clone().into())), node_key: self .node_key .as_ref() @@ -111,6 +115,10 @@ impl ProtoRepr for proto::ConsensusSecrets { .validator_key .as_ref() .map(|x| x.0.expose_secret().clone()), + attester_key: this + .attester_key + .as_ref() + .map(|x| x.0.expose_secret().clone()), node_key: this.node_key.as_ref().map(|x| x.0.expose_secret().clone()), } } diff --git a/core/node/consensus/src/config.rs b/core/node/consensus/src/config.rs index cac9e9296227..75e329d6c347 100644 --- a/core/node/consensus/src/config.rs +++ b/core/node/consensus/src/config.rs @@ -10,7 +10,7 @@ use zksync_config::{ }; use zksync_consensus_crypto::{Text, TextFmt}; use zksync_consensus_executor as executor; -use zksync_consensus_roles::{node, validator}; +use zksync_consensus_roles::{attester, node, validator}; fn read_secret_text(text: Option<&Secret>) -> anyhow::Result> { text.map(|text| Text::new(text.expose_secret()).decode()) @@ -24,6 +24,12 @@ pub(super) fn validator_key( read_secret_text(secrets.validator_key.as_ref().map(|x| &x.0)) } +pub(super) fn attester_key( + secrets: &ConsensusSecrets, +) -> anyhow::Result> { + read_secret_text(secrets.attester_key.as_ref().map(|x| &x.0)) +} + /// Consensus genesis specification. /// It is a digest of the `validator::Genesis`, /// which allows to initialize genesis (if not present) @@ -33,6 +39,7 @@ pub(super) struct GenesisSpec { pub(super) chain_id: validator::ChainId, pub(super) protocol_version: validator::ProtocolVersion, pub(super) validators: validator::Committee, + pub(super) attesters: Option, pub(super) leader_selection: validator::LeaderSelectionMode, } @@ -42,6 +49,7 @@ impl GenesisSpec { chain_id: g.chain_id, protocol_version: g.protocol_version, validators: g.validators.clone(), + attesters: g.attesters.clone(), leader_selection: g.leader_selection.clone(), } } @@ -59,6 +67,20 @@ impl GenesisSpec { }) .collect::>() .context("validators")?; + + let attesters: Vec<_> = x + .attesters + .iter() + .enumerate() + .map(|(i, v)| { + Ok(attester::WeightedAttester { + key: Text::new(&v.key.0).decode().context("key").context(i)?, + weight: v.weight, + }) + }) + .collect::>() + .context("attesters")?; + Ok(Self { chain_id: validator::ChainId(x.chain_id.as_u64()), protocol_version: validator::ProtocolVersion(x.protocol_version.0), @@ -66,6 +88,11 @@ impl GenesisSpec { Text::new(&x.leader.0).decode().context("leader")?, ), validators: validator::Committee::new(validators).context("validators")?, + attesters: if attesters.is_empty() { + None + } else { + Some(attester::Committee::new(attesters).context("attesters")?) + }, }) } } @@ -112,6 +139,7 @@ pub(super) fn executor( .context("gossip_static_inbound")?, gossip_static_outbound, rpc, + // TODO: Add to configuration debug_page: None, }) } diff --git a/core/node/consensus/src/en.rs b/core/node/consensus/src/en.rs index 66326756fb77..077b4d64c524 100644 --- a/core/node/consensus/src/en.rs +++ b/core/node/consensus/src/en.rs @@ -39,18 +39,23 @@ impl EN { // Initialize genesis. let genesis = self.fetch_genesis(ctx).await.wrap("fetch_genesis()")?; let mut conn = self.pool.connection(ctx).await.wrap("connection()")?; + conn.try_update_genesis(ctx, &genesis) .await .wrap("set_genesis()")?; + let mut payload_queue = conn .new_payload_queue(ctx, actions, self.sync_state.clone()) .await .wrap("new_payload_queue()")?; + drop(conn); // Fetch blocks before the genesis. self.fetch_blocks(ctx, &mut payload_queue, Some(genesis.first_block)) - .await?; + .await + .wrap("fetch_blocks()")?; + // Monitor the genesis of the main node. // If it changes, it means that a hard fork occurred and we need to reset the consensus state. s.spawn_bg::<()>(async { @@ -69,15 +74,17 @@ impl EN { }); // Run consensus component. + // External nodes have a payload queue which they use to fetch data from the main node. let (store, runner) = Store::new(ctx, self.pool.clone(), Some(payload_queue)) .await .wrap("Store::new()")?; s.spawn_bg(async { Ok(runner.run(ctx).await?) }); + let (block_store, runner) = BlockStore::new(ctx, Box::new(store.clone())) .await .wrap("BlockStore::new()")?; s.spawn_bg(async { Ok(runner.run(ctx).await?) }); - // Dummy batch store - we don't gossip batches yet, but we need one anyway. + let (batch_store, runner) = BatchStore::new(ctx, Box::new(store.clone())) .await .wrap("BatchStore::new()")?; @@ -87,7 +94,6 @@ impl EN { config: config::executor(&cfg, &secrets)?, block_store, batch_store, - attester: None, validator: config::validator_key(&secrets) .context("validator_key")? .map(|key| executor::Validator { @@ -95,8 +101,12 @@ impl EN { replica_store: Box::new(store.clone()), payload_manager: Box::new(store.clone()), }), + attester: config::attester_key(&secrets) + .context("attester_key")? + .map(|key| executor::Attester { key }), }; executor.run(ctx).await?; + Ok(()) }) .await; diff --git a/core/node/consensus/src/mn.rs b/core/node/consensus/src/mn.rs index 0aac43b8ef87..3e8f0f4778bb 100644 --- a/core/node/consensus/src/mn.rs +++ b/core/node/consensus/src/mn.rs @@ -1,7 +1,7 @@ use anyhow::Context as _; use zksync_concurrency::{ctx, error::Wrap as _, scope}; use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets}; -use zksync_consensus_executor::{self as executor}; +use zksync_consensus_executor::{self as executor, Attester}; use zksync_consensus_roles::validator; use zksync_consensus_storage::{BatchStore, BlockStore}; @@ -23,6 +23,8 @@ pub async fn run_main_node( .context("validator_key")? .context("missing validator_key")?; + let attester_key_opt = config::attester_key(&secrets).context("attester_key")?; + scope::run!(&ctx, |ctx, s| async { if let Some(spec) = &cfg.genesis_spec { let spec = config::GenesisSpec::parse(spec).context("GenesisSpec::parse()")?; @@ -35,6 +37,7 @@ pub async fn run_main_node( .wrap("adjust_genesis()")?; } + // The main node doesn't have a payload queue as it produces all the L2 blocks itself. let (store, runner) = Store::new(ctx, pool, None).await.wrap("Store::new()")?; s.spawn_bg(runner.run(ctx)); @@ -49,22 +52,21 @@ pub async fn run_main_node( "unsupported leader selection mode - main node has to be the leader" ); - // Dummy batch store - we don't gossip batches yet, but we need one anyway. let (batch_store, runner) = BatchStore::new(ctx, Box::new(store.clone())) .await .wrap("BatchStore::new()")?; - s.spawn_bg(async { runner.run(ctx).await.context("BatchStore::runner()") }); + s.spawn_bg(runner.run(ctx)); let executor = executor::Executor { config: config::executor(&cfg, &secrets)?, block_store, batch_store, - attester: None, validator: Some(executor::Validator { key: validator_key, replica_store: Box::new(store.clone()), payload_manager: Box::new(store.clone()), }), + attester: attester_key_opt.map(|key| Attester { key }), }; executor.run(ctx).await }) diff --git a/core/node/consensus/src/storage/connection.rs b/core/node/consensus/src/storage/connection.rs index 673cb87d2f4e..1d8dfc3aed57 100644 --- a/core/node/consensus/src/storage/connection.rs +++ b/core/node/consensus/src/storage/connection.rs @@ -1,7 +1,7 @@ use anyhow::Context as _; use zksync_concurrency::{ctx, error::Wrap as _, time}; -use zksync_consensus_roles::validator; -use zksync_consensus_storage as storage; +use zksync_consensus_roles::{attester, validator}; +use zksync_consensus_storage::{self as storage, BatchStoreState}; use zksync_dal::{consensus_dal::Payload, Core, CoreDal, DalError}; use zksync_node_sync::{fetcher::IoCursorExt as _, ActionQueueSender, SyncState}; use zksync_state_keeper::io::common::IoCursor; @@ -92,25 +92,36 @@ impl<'a> Connection<'a> { .map_err(DalError::generalize)?) } - /// Wrapper for `consensus_dal().certificate()`. - pub async fn certificate( + /// Wrapper for `consensus_dal().block_certificate()`. + pub async fn block_certificate( &mut self, ctx: &ctx::Ctx, number: validator::BlockNumber, ) -> ctx::Result> { Ok(ctx - .wait(self.0.consensus_dal().certificate(number)) + .wait(self.0.consensus_dal().block_certificate(number)) .await??) } - /// Wrapper for `consensus_dal().insert_certificate()`. - pub async fn insert_certificate( + /// Wrapper for `consensus_dal().insert_block_certificate()`. + pub async fn insert_block_certificate( &mut self, ctx: &ctx::Ctx, cert: &validator::CommitQC, ) -> Result<(), InsertCertificateError> { Ok(ctx - .wait(self.0.consensus_dal().insert_certificate(cert)) + .wait(self.0.consensus_dal().insert_block_certificate(cert)) + .await??) + } + + /// Wrapper for `consensus_dal().insert_batch_certificate()`. + pub async fn insert_batch_certificate( + &mut self, + ctx: &ctx::Ctx, + cert: &attester::BatchQC, + ) -> Result<(), InsertCertificateError> { + Ok(ctx + .wait(self.0.consensus_dal().insert_batch_certificate(cert)) .await??) } @@ -134,7 +145,7 @@ impl<'a> Connection<'a> { .context("sqlx")?) } - /// Wrapper for `consensus_dal().get_l1_batch_metadata()`. + /// Wrapper for `blocks_dal().get_l1_batch_metadata()`. pub async fn batch( &mut self, ctx: &ctx::Ctx, @@ -184,13 +195,13 @@ impl<'a> Connection<'a> { Ok(ctx.wait(self.0.consensus_dal().next_block()).await??) } - /// Wrapper for `consensus_dal().certificates_range()`. - pub(crate) async fn certificates_range( + /// Wrapper for `consensus_dal().block_certificates_range()`. + pub(crate) async fn block_certificates_range( &mut self, ctx: &ctx::Ctx, ) -> ctx::Result { Ok(ctx - .wait(self.0.consensus_dal().certificates_range()) + .wait(self.0.consensus_dal().block_certificates_range()) .await??) } @@ -239,17 +250,163 @@ impl<'a> Connection<'a> { ctx: &ctx::Ctx, number: validator::BlockNumber, ) -> ctx::Result> { - let Some(justification) = self.certificate(ctx, number).await.wrap("certificate()")? else { + let Some(justification) = self + .block_certificate(ctx, number) + .await + .wrap("block_certificate()")? + else { return Ok(None); }; + let payload = self .payload(ctx, number) .await .wrap("payload()")? .context("L2 block disappeared from storage")?; + Ok(Some(validator::FinalBlock { payload: payload.encode(), justification, })) } + + /// Wrapper for `blocks_dal().get_sealed_l1_batch_number()`. + pub async fn get_last_batch_number( + &mut self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.blocks_dal().get_sealed_l1_batch_number()) + .await? + .context("get_sealed_l1_batch_number()")? + .map(|nr| attester::BatchNumber(nr.0 as u64))) + } + + /// Wrapper for `consensus_dal().get_last_batch_certificate_number()`. + pub async fn get_last_batch_certificate_number( + &mut self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.consensus_dal().get_last_batch_certificate_number()) + .await? + .context("get_last_batch_certificate_number()")?) + } + + /// Wrapper for `consensus_dal().batch_certificate()`. + pub async fn batch_certificate( + &mut self, + ctx: &ctx::Ctx, + number: attester::BatchNumber, + ) -> ctx::Result> { + Ok(ctx + .wait(self.0.consensus_dal().batch_certificate(number)) + .await? + .context("batch_certificate()")?) + } + + /// Wrapper for `blocks_dal().get_l2_block_range_of_l1_batch()`. + pub async fn get_l2_block_range_of_l1_batch( + &mut self, + ctx: &ctx::Ctx, + number: attester::BatchNumber, + ) -> ctx::Result> { + let number = L1BatchNumber(number.0.try_into().context("number")?); + + let range = ctx + .wait(self.0.blocks_dal().get_l2_block_range_of_l1_batch(number)) + .await? + .context("get_l2_block_range_of_l1_batch()")?; + + Ok(range.map(|(min, max)| { + let min = validator::BlockNumber(min.0 as u64); + let max = validator::BlockNumber(max.0 as u64); + (min, max) + })) + } + + /// Construct the [attester::SyncBatch] for a given batch number. + pub async fn get_batch( + &mut self, + ctx: &ctx::Ctx, + number: attester::BatchNumber, + ) -> ctx::Result> { + let Some((min, max)) = self + .get_l2_block_range_of_l1_batch(ctx, number) + .await + .context("get_l2_block_range_of_l1_batch()")? + else { + return Ok(None); + }; + + let payloads = self.payloads(ctx, min..max).await.wrap("payloads()")?; + let payloads = payloads.into_iter().map(|p| p.encode()).collect(); + + // TODO: Fill out the proof when we have the stateless L1 batch validation story finished. + // It is supposed to be a Merkle proof that the rolling hash of the batch has been included + // in the L1 state tree. The state root hash of L1 won't be available in the DB, it requires + // an API client. + let batch = attester::SyncBatch { + number, + payloads, + proof: Vec::new(), + }; + + Ok(Some(batch)) + } + + /// Construct the [storage::BatchStoreState] which contains the earliest batch and the last available [attester::SyncBatch]. + pub async fn batches_range(&mut self, ctx: &ctx::Ctx) -> ctx::Result { + let first = self + .0 + .blocks_dal() + .get_earliest_l1_batch_number() + .await + .context("get_earliest_l1_batch_number()")?; + + let first = if first.is_some() { + first + } else { + self.0 + .snapshot_recovery_dal() + .get_applied_snapshot_status() + .await + .context("get_earliest_l1_batch_number()")? + .map(|s| s.l1_batch_number) + }; + + // TODO: In the future when we start filling in the `SyncBatch::proof` field, + // we can only run `get_batch` expecting `Some` result on numbers where the + // L1 state root hash is already available, so that we can produce some + // Merkle proof that the rolling hash of the L2 blocks in the batch has + // been included in the L1 state tree. At that point we probably can't + // call `get_last_batch_number` here, but something that indicates that + // the hashes/commitments on the L1 batch are ready and the thing has + // been included in L1; that potentially requires an API client as well. + let last = self + .get_last_batch_number(ctx) + .await + .context("get_last_batch_number()")?; + + let last = if let Some(last) = last { + // For now it would be unexpected if we couldn't retrieve the payloads + // for the `last` batch number, as an L1 batch is only created if we + // have all the L2 miniblocks for it. + Some( + self.get_batch(ctx, last) + .await + .context("get_batch()")? + .context("last batch not available")?, + ) + } else { + None + }; + + Ok(BatchStoreState { + first: first + .map(|n| attester::BatchNumber(n.0 as u64)) + .unwrap_or(attester::BatchNumber(0)), + last, + }) + } } diff --git a/core/node/consensus/src/storage/store.rs b/core/node/consensus/src/storage/store.rs index 745ccce4bef3..c196989c300b 100644 --- a/core/node/consensus/src/storage/store.rs +++ b/core/node/consensus/src/storage/store.rs @@ -4,12 +4,12 @@ use anyhow::Context as _; use zksync_concurrency::{ctx, error::Wrap as _, scope, sync, time}; use zksync_consensus_bft::PayloadManager; use zksync_consensus_roles::{attester, validator}; -use zksync_consensus_storage as storage; +use zksync_consensus_storage::{self as storage, BatchStoreState}; use zksync_dal::consensus_dal::{self, Payload}; use zksync_node_sync::fetcher::{FetchedBlock, FetchedTransaction}; use zksync_types::L2BlockNumber; -use super::PayloadQueue; +use super::{Connection, PayloadQueue}; use crate::storage::{ConnectionPool, InsertCertificateError}; fn to_fetched_block( @@ -51,20 +51,27 @@ fn to_fetched_block( #[derive(Clone, Debug)] pub(crate) struct Store { pub(super) pool: ConnectionPool, - payloads: Arc>>, - /// L2 block QCs received over gossip - certificates: ctx::channel::UnboundedSender, + /// Action queue to fetch/store L2 block payloads + block_payloads: Arc>>, + /// L2 block QCs received from consensus + block_certificates: ctx::channel::UnboundedSender, + /// L1 batch QCs received from consensus + batch_certificates: ctx::channel::UnboundedSender, /// Range of L2 blocks for which we have a QC persisted. - persisted: sync::watch::Receiver, + blocks_persisted: sync::watch::Receiver, + /// Range of L1 batches we have persisted. + batches_persisted: sync::watch::Receiver, } -struct PersistedState(sync::watch::Sender); +struct PersistedBlockState(sync::watch::Sender); /// Background task of the `Store`. pub struct StoreRunner { pool: ConnectionPool, - persisted: PersistedState, - certificates: ctx::channel::UnboundedReceiver, + blocks_persisted: PersistedBlockState, + batches_persisted: sync::watch::Sender, + block_certificates: ctx::channel::UnboundedReceiver, + batch_certificates: ctx::channel::UnboundedReceiver, } impl Store { @@ -73,32 +80,50 @@ impl Store { pool: ConnectionPool, payload_queue: Option, ) -> ctx::Result<(Store, StoreRunner)> { - let persisted = pool - .connection(ctx) - .await - .wrap("connection()")? - .certificates_range(ctx) + let mut conn = pool.connection(ctx).await.wrap("connection()")?; + + // Initial state of persisted blocks + let blocks_persisted = conn + .block_certificates_range(ctx) .await - .wrap("certificates_range()")?; - let persisted = sync::watch::channel(persisted).0; - let (certs_send, certs_recv) = ctx::channel::unbounded(); + .wrap("block_certificates_range()")?; + + // Initial state of persisted batches + let batches_persisted = conn.batches_range(ctx).await.wrap("batches_range()")?; + + drop(conn); + + let blocks_persisted = sync::watch::channel(blocks_persisted).0; + let batches_persisted = sync::watch::channel(batches_persisted).0; + let (block_certs_send, block_certs_recv) = ctx::channel::unbounded(); + let (batch_certs_send, batch_certs_recv) = ctx::channel::unbounded(); + Ok(( Store { pool: pool.clone(), - certificates: certs_send, - payloads: Arc::new(sync::Mutex::new(payload_queue)), - persisted: persisted.subscribe(), + block_certificates: block_certs_send, + batch_certificates: batch_certs_send, + block_payloads: Arc::new(sync::Mutex::new(payload_queue)), + blocks_persisted: blocks_persisted.subscribe(), + batches_persisted: batches_persisted.subscribe(), }, StoreRunner { pool, - persisted: PersistedState(persisted), - certificates: certs_recv, + blocks_persisted: PersistedBlockState(blocks_persisted), + batches_persisted, + block_certificates: block_certs_recv, + batch_certificates: batch_certs_recv, }, )) } + + /// Get a fresh connection from the pool. + async fn conn(&self, ctx: &ctx::Ctx) -> ctx::Result { + self.pool.connection(ctx).await.wrap("connection") + } } -impl PersistedState { +impl PersistedBlockState { /// Updates `persisted` to new. /// Ends of the range can only be moved forward. /// If `persisted.first` is moved forward, it means that blocks have been pruned. @@ -136,47 +161,120 @@ impl PersistedState { } impl StoreRunner { - pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + let StoreRunner { + pool, + blocks_persisted, + batches_persisted, + mut block_certificates, + mut batch_certificates, + } = self; + let res = scope::run!(ctx, |ctx, s| async { s.spawn::<()>(async { - // Loop updating `persisted` whenever blocks get pruned. + // Loop updating `blocks_persisted` whenever blocks get pruned. const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); loop { - let range = self - .pool + let range = pool .connection(ctx) + .await? + .block_certificates_range(ctx) .await - .wrap("connection")? - .certificates_range(ctx) + .wrap("block_certificates_range()")?; + blocks_persisted.update(range); + ctx.sleep(POLL_INTERVAL).await?; + } + }); + + // NOTE: Running this update loop will trigger the gossip of `SyncBatches` which is currently + // pointless as there is no proof and we have to ignore them. We can disable it, but bear in + // mind that any node which gossips the availability will cause pushes and pulls in the consensus. + s.spawn::<()>(async { + // Loop updating `batches_persisted` whenever a new L1 batch is available in the database. + // We have to do this because the L1 batch is produced as L2 blocks are executed, + // which can happen on a different machine or in a different process, so we can't rely on some + // DAL method updating this memory construct. However I'm not sure that `BatchStoreState` + // really has to contain the full blown last batch, or whether it could have for example + // just the number of it. We can't just use the `attester::BatchQC`, which would make it + // analogous to the `BlockStoreState`, because the `SyncBatch` mechanism is for catching + // up with L1 batches from peers _without_ the QC, based on L1 inclusion proofs instead. + // Nevertheless since the `SyncBatch` contains all transactions for all L2 blocks, + // we can try to make it less frequent by querying just the last batch number first. + const POLL_INTERVAL: time::Duration = time::Duration::seconds(1); + let mut next_batch_number = { batches_persisted.borrow().next() }; + loop { + let mut conn = pool.connection(ctx).await?; + if let Some(last_batch_number) = conn + .get_last_batch_number(ctx) .await - .wrap("certificates_range()")?; - self.persisted.update(range); + .wrap("last_batch_number()")? + { + if last_batch_number >= next_batch_number { + let range = conn.batches_range(ctx).await.wrap("batches_range()")?; + next_batch_number = last_batch_number.next(); + batches_persisted.send_replace(range); + } + } ctx.sleep(POLL_INTERVAL).await?; } }); - // Loop inserting certs to storage. + s.spawn::<()>(async { + // Loop inserting batch certificates into storage + const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); + loop { + let cert = batch_certificates.recv(ctx).await?; + + loop { + use consensus_dal::InsertCertificateError as E; + // Try to insert the cert. + let res = pool + .connection(ctx) + .await? + .insert_batch_certificate(ctx, &cert) + .await; + + match res { + Ok(()) => { + break; + } + Err(InsertCertificateError::Inner(E::MissingPayload)) => { + // The L1 batch isn't available yet. + // We can wait until it's produced/received, or we could modify gossip + // so that we don't even accept votes until we have the corresponding batch. + ctx.sleep(POLL_INTERVAL).await?; + } + Err(InsertCertificateError::Inner(err)) => { + return Err(ctx::Error::Internal(anyhow::Error::from(err))) + } + Err(InsertCertificateError::Canceled(err)) => { + return Err(ctx::Error::Canceled(err)) + } + } + } + } + }); + + // Loop inserting block certs to storage. const POLL_INTERVAL: time::Duration = time::Duration::milliseconds(50); loop { - let cert = self.certificates.recv(ctx).await?; + let cert = block_certificates.recv(ctx).await?; // Wait for the block to be persisted, so that we can attach a cert to it. // We may exit this loop without persisting the certificate in case the // corresponding block has been pruned in the meantime. - while self.persisted.should_be_persisted(&cert) { + while blocks_persisted.should_be_persisted(&cert) { use consensus_dal::InsertCertificateError as E; // Try to insert the cert. - let res = self - .pool + let res = pool .connection(ctx) - .await - .wrap("connection")? - .insert_certificate(ctx, &cert) + .await? + .insert_block_certificate(ctx, &cert) .await; match res { Ok(()) => { // Insertion succeeded: update persisted state // and wait for the next cert. - self.persisted.advance(cert); + blocks_persisted.advance(cert); break; } Err(InsertCertificateError::Inner(E::MissingPayload)) => { @@ -195,6 +293,7 @@ impl StoreRunner { } }) .await; + match res { Err(ctx::Error::Canceled(_)) | Ok(()) => Ok(()), Err(ctx::Error::Internal(err)) => Err(err), @@ -206,17 +305,15 @@ impl StoreRunner { impl storage::PersistentBlockStore for Store { async fn genesis(&self, ctx: &ctx::Ctx) -> ctx::Result { Ok(self - .pool - .connection(ctx) - .await - .wrap("connection")? + .conn(ctx) + .await? .genesis(ctx) .await? .context("not found")?) } fn persisted(&self) -> sync::watch::Receiver { - self.persisted.clone() + self.blocks_persisted.clone() } async fn block( @@ -225,10 +322,8 @@ impl storage::PersistentBlockStore for Store { number: validator::BlockNumber, ) -> ctx::Result { Ok(self - .pool - .connection(ctx) - .await - .wrap("connection")? + .conn(ctx) + .await? .block(ctx, number) .await? .context("not found")?) @@ -247,14 +342,14 @@ impl storage::PersistentBlockStore for Store { ctx: &ctx::Ctx, block: validator::FinalBlock, ) -> ctx::Result<()> { - let mut payloads = sync::lock(ctx, &self.payloads).await?.into_async(); + let mut payloads = sync::lock(ctx, &self.block_payloads).await?.into_async(); if let Some(payloads) = &mut *payloads { payloads .send(to_fetched_block(block.number(), &block.payload).context("to_fetched_block")?) .await .context("payload_queue.send()")?; } - self.certificates.send(block.justification); + self.block_certificates.send(block.justification); Ok(()) } } @@ -262,20 +357,16 @@ impl storage::PersistentBlockStore for Store { #[async_trait::async_trait] impl storage::ReplicaStore for Store { async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result { - self.pool - .connection(ctx) - .await - .wrap("connection()")? + self.conn(ctx) + .await? .replica_state(ctx) .await .wrap("replica_state()") } async fn set_state(&self, ctx: &ctx::Ctx, state: &storage::ReplicaState) -> ctx::Result<()> { - self.pool - .connection(ctx) - .await - .wrap("connection()")? + self.conn(ctx) + .await? .set_replica_state(ctx, state) .await .wrap("set_replica_state()") @@ -321,7 +412,7 @@ impl PayloadManager for Store { block_number: validator::BlockNumber, payload: &validator::Payload, ) -> ctx::Result<()> { - let mut payloads = sync::lock(ctx, &self.payloads).await?.into_async(); + let mut payloads = sync::lock(ctx, &self.block_payloads).await?.into_async(); if let Some(payloads) = &mut *payloads { let block = to_fetched_block(block_number, payload).context("to_fetched_block")?; let n = block.number; @@ -346,44 +437,106 @@ impl PayloadManager for Store { } } -// Dummy implementation #[async_trait::async_trait] impl storage::PersistentBatchStore for Store { - async fn last_batch(&self, _ctx: &ctx::Ctx) -> ctx::Result> { - unimplemented!() + /// Range of batches persisted in storage. + fn persisted(&self) -> sync::watch::Receiver { + // Normally we'd return this, but it causes the following test to run forever: + // RUST_LOG=info zk test rust test_full_nodes --no-capture + // + // The error seems to be related to the size of messages, although I'm not sure + // why it retries it forever. Since the gossip of SyncBatch is not fully functional + // yet, for now let's just return a fake response that never changes, which should + // disable gossiping on honest nodes. + let _ = self.batches_persisted.clone(); + + sync::watch::channel(storage::BatchStoreState { + first: attester::BatchNumber(0), + last: None, + }) + .1 + } + + /// Get the highest L1 batch number from storage. + async fn last_batch(&self, ctx: &ctx::Ctx) -> ctx::Result> { + self.conn(ctx) + .await? + .get_last_batch_number(ctx) + .await + .wrap("get_last_batch_number") } - async fn last_batch_qc(&self, _ctx: &ctx::Ctx) -> ctx::Result> { - unimplemented!() + + /// Get the L1 batch QC from storage with the highest number. + /// + /// This might have gaps before it. Until there is a way to catch up with missing + /// certificates by fetching from the main node, returning the last inserted one + /// is the best we can do. + async fn last_batch_qc(&self, ctx: &ctx::Ctx) -> ctx::Result> { + let Some(number) = self + .conn(ctx) + .await? + .get_last_batch_certificate_number(ctx) + .await + .wrap("get_last_batch_certificate_number")? + else { + return Ok(None); + }; + + self.get_batch_qc(ctx, number).await } + + /// Returns the batch with the given number. async fn get_batch( &self, - _ctx: &ctx::Ctx, - _number: attester::BatchNumber, + ctx: &ctx::Ctx, + number: attester::BatchNumber, ) -> ctx::Result> { - Ok(None) + self.conn(ctx) + .await? + .get_batch(ctx, number) + .await + .wrap("get_batch") } + + /// Returns the QC of the batch with the given number. async fn get_batch_qc( &self, - _ctx: &ctx::Ctx, - _number: attester::BatchNumber, + ctx: &ctx::Ctx, + number: attester::BatchNumber, ) -> ctx::Result> { - Ok(None) - } - async fn store_qc(&self, _ctx: &ctx::Ctx, _qc: attester::BatchQC) -> ctx::Result<()> { - unimplemented!() + self.conn(ctx) + .await? + .batch_certificate(ctx, number) + .await + .wrap("batch_certificate") } - fn persisted(&self) -> sync::watch::Receiver { - sync::watch::channel(storage::BatchStoreState { - first: attester::BatchNumber(0), - last: None, - }) - .1 + + /// Store the given QC in the storage. + /// + /// Storing a QC is allowed even if it creates a gap in the L1 batch history. + /// If we need the last batch QC that still needs to be signed then the queries need to look for gaps. + async fn store_qc(&self, _ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()> { + // Storing asynchronously because we might get the QC before the L1 batch itself. + self.batch_certificates.send(qc); + Ok(()) } + + /// Queue the batch to be persisted in storage. + /// + /// The caller [BatchStore] ensures that this is only called when the batch is the next expected one. async fn queue_next_batch( &self, _ctx: &ctx::Ctx, _batch: attester::SyncBatch, ) -> ctx::Result<()> { - Err(anyhow::format_err!("unimplemented").into()) + // Currently the gossiping of `SyncBatch` and the `BatchStoreState` is unconditionally started by the `Network::run_stream` in consensus, + // and as long as any node reports new batches available by updating the `PersistentBatchStore::persisted` here, the other nodes + // will start pulling the corresponding batches, which will end up being passed to this method. + // If we return an error here or panic, it will stop the whole consensus task tree due to the way scopes work, so instead just return immediately. + // In the future we have to validate the proof agains the L1 state root hash, which IIUC we can't do just yet. + + // Err(anyhow::format_err!("unimplemented: queue_next_batch should not be called until we have the stateless L1 batch story completed.").into()) + + Ok(()) } } diff --git a/core/node/consensus/src/storage/testonly.rs b/core/node/consensus/src/storage/testonly.rs index 2f632b84a4d5..c73d20982c16 100644 --- a/core/node/consensus/src/storage/testonly.rs +++ b/core/node/consensus/src/storage/testonly.rs @@ -48,7 +48,7 @@ impl ConnectionPool { } /// Waits for the `number` L2 block to have a certificate. - pub async fn wait_for_certificate( + pub async fn wait_for_block_certificate( &self, ctx: &ctx::Ctx, number: validator::BlockNumber, @@ -58,9 +58,9 @@ impl ConnectionPool { .connection(ctx) .await .wrap("connection()")? - .certificate(ctx, number) + .block_certificate(ctx, number) .await - .wrap("certificate()")? + .wrap("block_certificate()")? .is_none() { ctx.sleep(POLL_INTERVAL).await?; @@ -119,15 +119,15 @@ impl ConnectionPool { } /// Waits for `want_last` block to have certificate then fetches all L2 blocks with certificates. - pub async fn wait_for_certificates( + pub async fn wait_for_block_certificates( &self, ctx: &ctx::Ctx, want_last: validator::BlockNumber, ) -> ctx::Result> { - self.wait_for_certificate(ctx, want_last).await?; + self.wait_for_block_certificate(ctx, want_last).await?; let mut conn = self.connection(ctx).await.wrap("connection()")?; let range = conn - .certificates_range(ctx) + .block_certificates_range(ctx) .await .wrap("certificates_range()")?; assert_eq!(want_last.next(), range.next()); @@ -141,12 +141,12 @@ impl ConnectionPool { } /// Same as `wait_for_certificates`, but additionally verifies all the blocks against genesis. - pub async fn wait_for_certificates_and_verify( + pub async fn wait_for_block_certificates_and_verify( &self, ctx: &ctx::Ctx, want_last: validator::BlockNumber, ) -> ctx::Result> { - let blocks = self.wait_for_certificates(ctx, want_last).await?; + let blocks = self.wait_for_block_certificates(ctx, want_last).await?; let genesis = self .connection(ctx) .await diff --git a/core/node/consensus/src/testonly.rs b/core/node/consensus/src/testonly.rs index 81084b8f599a..7ca518a183a7 100644 --- a/core/node/consensus/src/testonly.rs +++ b/core/node/consensus/src/testonly.rs @@ -99,6 +99,12 @@ pub(super) fn config(cfg: &network::Config) -> (config::ConsensusConfig, config: key: config::ValidatorPublicKey(key.public().encode()), weight: 1, }], + // We only have access to the main node attester key in the `cfg`, which is fine + // for validators because at the moment there is only one leader. It doesn't + // allow us to form a full attester committee. However in the current tests + // the `new_configs` used to produce the array of `network::Config` doesn't + // assign an attester key, so it doesn't matter. + attesters: Vec::new(), leader: config::ValidatorPublicKey(key.public().encode()), }), rpc: None, @@ -109,6 +115,10 @@ pub(super) fn config(cfg: &network::Config) -> (config::ConsensusConfig, config: .validator_key .as_ref() .map(|k| config::ValidatorSecretKey(k.encode().into())), + attester_key: cfg + .attester_key + .as_ref() + .map(|k| config::AttesterSecretKey(k.encode().into())), }, ) } diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index 3f57e4beeade..5506ec6ee8f4 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -1,5 +1,4 @@ use anyhow::Context as _; -use storage::Store; use test_casing::{test_casing, Product}; use tracing::Instrument as _; use zksync_concurrency::{ctx, error::Wrap, scope}; @@ -13,8 +12,11 @@ use zksync_consensus_roles::{ use zksync_consensus_storage::BlockStore; use zksync_types::{L1BatchNumber, ProtocolVersionId}; -use super::*; -use crate::{mn::run_main_node, storage::ConnectionPool}; +use crate::{ + mn::run_main_node, + storage::{ConnectionPool, Store}, + testonly, +}; const VERSIONS: [ProtocolVersionId; 2] = [ProtocolVersionId::latest(), ProtocolVersionId::next()]; const FROM_SNAPSHOT: [bool; 2] = [true, false]; @@ -71,7 +73,7 @@ async fn test_validator_block_store(version: ProtocolVersionId) { .await .unwrap(); let got = pool - .wait_for_certificates(ctx, block.number()) + .wait_for_block_certificates(ctx, block.number()) .await .unwrap(); assert_eq!(want[..=i], got); @@ -82,6 +84,68 @@ async fn test_validator_block_store(version: ProtocolVersionId) { } } +#[test_casing(4, Product((FROM_SNAPSHOT,VERSIONS)))] +#[tokio::test] +async fn test_connection_get_batch(from_snapshot: bool, version: ProtocolVersionId) { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + let rng = &mut ctx.rng(); + let pool = ConnectionPool::test(from_snapshot, version).await; + + // Fill storage with unsigned L2 blocks and L1 batches in a way that the + // last L1 batch is guaranteed to have some L2 blocks executed in it. + scope::run!(ctx, |ctx, s| async { + // Start state keeper. + let (mut sk, runner) = testonly::StateKeeper::new(ctx, pool.clone()).await?; + s.spawn_bg(runner.run(ctx)); + + for _ in 0..3 { + for _ in 0..2 { + sk.push_random_block(rng).await; + } + sk.seal_batch().await; + } + sk.push_random_block(rng).await; + + pool.wait_for_payload(ctx, sk.last_block()).await?; + + Ok(()) + }) + .await + .unwrap(); + + // Now we can try to retrieve the batch. + scope::run!(ctx, |ctx, _s| async { + let mut conn = pool.connection(ctx).await?; + let batches = conn.batches_range(ctx).await?; + let last = batches.last.expect("last is set"); + let (min, max) = conn + .get_l2_block_range_of_l1_batch(ctx, last.number) + .await? + .unwrap(); + + assert_eq!( + last.payloads.len(), + (max.0 - min.0) as usize, + "all block payloads present" + ); + + let first_payload = last.payloads.first().expect("last batch has payloads"); + + let want_payload = conn.payload(ctx, min).await?.expect("payload is in the DB"); + let want_payload = want_payload.encode(); + + assert_eq!( + first_payload, &want_payload, + "first payload is the right number" + ); + + anyhow::Ok(()) + }) + .await + .unwrap(); +} + // In the current implementation, consensus certificates are created asynchronously // for the L2 blocks constructed by the StateKeeper. This means that consensus actor // is effectively just back filling the consensus certificates for the L2 blocks in storage. @@ -119,24 +183,24 @@ async fn test_validator(from_snapshot: bool, version: ProtocolVersionId) { tracing::info!("Generate couple more blocks and wait for consensus to catch up."); sk.push_random_blocks(rng, 3).await; pool - .wait_for_certificate(ctx, sk.last_block()) + .wait_for_block_certificate(ctx, sk.last_block()) .await - .context("wait_for_certificate(<2nd phase>)")?; + .context("wait_for_block_certificate(<2nd phase>)")?; tracing::info!("Synchronously produce blocks one by one, and wait for consensus."); for _ in 0..2 { sk.push_random_blocks(rng, 1).await; pool - .wait_for_certificate(ctx, sk.last_block()) + .wait_for_block_certificate(ctx, sk.last_block()) .await - .context("wait_for_certificate(<3rd phase>)")?; + .context("wait_for_block_certificate(<3rd phase>)")?; } tracing::info!("Verify all certificates"); pool - .wait_for_certificates_and_verify(ctx, sk.last_block()) + .wait_for_block_certificates_and_verify(ctx, sk.last_block()) .await - .context("wait_for_certificates_and_verify()")?; + .context("wait_for_block_certificates_and_verify()")?; Ok(()) }) .await @@ -171,7 +235,7 @@ async fn test_nodes_from_various_snapshots(version: ProtocolVersionId) { validator.push_random_blocks(rng, 5).await; validator.seal_batch().await; validator_pool - .wait_for_certificate(ctx, validator.last_block()) + .wait_for_block_certificate(ctx, validator.last_block()) .await?; tracing::info!("take snapshot and start a node from it"); @@ -189,7 +253,7 @@ async fn test_nodes_from_various_snapshots(version: ProtocolVersionId) { validator.push_random_blocks(rng, 5).await; validator.seal_batch().await; node_pool - .wait_for_certificate(ctx, validator.last_block()) + .wait_for_block_certificate(ctx, validator.last_block()) .await?; tracing::info!("take another snapshot and start a node from it"); @@ -206,15 +270,15 @@ async fn test_nodes_from_various_snapshots(version: ProtocolVersionId) { tracing::info!("produce more blocks and compare storages"); validator.push_random_blocks(rng, 5).await; let want = validator_pool - .wait_for_certificates_and_verify(ctx, validator.last_block()) + .wait_for_block_certificates_and_verify(ctx, validator.last_block()) .await?; // node stores should be suffixes for validator store. for got in [ node_pool - .wait_for_certificates_and_verify(ctx, validator.last_block()) + .wait_for_block_certificates_and_verify(ctx, validator.last_block()) .await?, node_pool2 - .wait_for_certificates_and_verify(ctx, validator.last_block()) + .wait_for_block_certificates_and_verify(ctx, validator.last_block()) .await?, ] { assert_eq!(want[want.len() - got.len()..], got[..]); @@ -296,12 +360,12 @@ async fn test_full_nodes(from_snapshot: bool, version: ProtocolVersionId) { validator.push_random_blocks(rng, 5).await; let want_last = validator.last_block(); let want = validator_pool - .wait_for_certificates_and_verify(ctx, want_last) + .wait_for_block_certificates_and_verify(ctx, want_last) .await?; for pool in &node_pools { assert_eq!( want, - pool.wait_for_certificates_and_verify(ctx, want_last) + pool.wait_for_block_certificates_and_verify(ctx, want_last) .await? ); } @@ -382,12 +446,12 @@ async fn test_en_validators(from_snapshot: bool, version: ProtocolVersionId) { main_node.push_random_blocks(rng, 5).await; let want_last = main_node.last_block(); let want = main_node_pool - .wait_for_certificates_and_verify(ctx, want_last) + .wait_for_block_certificates_and_verify(ctx, want_last) .await?; for pool in &ext_node_pools { assert_eq!( want, - pool.wait_for_certificates_and_verify(ctx, want_last) + pool.wait_for_block_certificates_and_verify(ctx, want_last) .await? ); } @@ -429,7 +493,7 @@ async fn test_p2p_fetcher_backfill_certs(from_snapshot: bool, version: ProtocolV s.spawn_bg(node.run_consensus(ctx, client.clone(), &node_cfg)); validator.push_random_blocks(rng, 3).await; node_pool - .wait_for_certificate(ctx, validator.last_block()) + .wait_for_block_certificate(ctx, validator.last_block()) .await?; Ok(()) }) @@ -457,10 +521,10 @@ async fn test_p2p_fetcher_backfill_certs(from_snapshot: bool, version: ProtocolV s.spawn_bg(node.run_consensus(ctx, client.clone(), &node_cfg)); validator.push_random_blocks(rng, 3).await; let want = validator_pool - .wait_for_certificates_and_verify(ctx, validator.last_block()) + .wait_for_block_certificates_and_verify(ctx, validator.last_block()) .await?; let got = node_pool - .wait_for_certificates_and_verify(ctx, validator.last_block()) + .wait_for_block_certificates_and_verify(ctx, validator.last_block()) .await?; assert_eq!(want, got); Ok(()) @@ -549,9 +613,9 @@ async fn test_with_pruning(version: ProtocolVersionId) { .context("prune_batches")?; validator.push_random_blocks(rng, 5).await; node_pool - .wait_for_certificates(ctx, validator.last_block()) + .wait_for_block_certificates(ctx, validator.last_block()) .await - .context("wait_for_certificates()")?; + .context("wait_for_block_certificates()")?; Ok(()) }) .await