From cd077557f1a08d545649210bb5758f6ed59d8653 Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Mon, 8 Jul 2024 16:09:54 +0400 Subject: [PATCH 1/3] chore: Misc preparations for publishing (#149) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ - Changes version to `0.1.0-rc.1` since the codebase is under heavy changes and is not in production yet; will prevent us from premature version bumps until consensus is on mainnet. - Sets other required fields for publishing. - Marks tests as `publish=false`. ## Why ❔ Preparations for publishing on crates.io. --- node/Cargo.lock | 24 ++++++++++++------------ node/Cargo.toml | 28 +++++++++++++++------------- node/actors/bft/Cargo.toml | 2 ++ node/actors/executor/Cargo.toml | 2 ++ node/actors/network/Cargo.toml | 2 ++ node/libs/concurrency/Cargo.toml | 2 ++ node/libs/crypto/Cargo.toml | 2 ++ node/libs/protobuf/Cargo.toml | 2 ++ node/libs/protobuf_build/Cargo.toml | 2 ++ node/libs/roles/Cargo.toml | 2 ++ node/libs/storage/Cargo.toml | 2 ++ node/libs/utils/Cargo.toml | 2 ++ node/tests/Cargo.toml | 3 +++ node/tools/Cargo.toml | 2 ++ 14 files changed, 52 insertions(+), 25 deletions(-) diff --git a/node/Cargo.lock b/node/Cargo.lock index 3a6cfade2..98cf249b0 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -3234,7 +3234,7 @@ dependencies = [ [[package]] name = "tester" -version = "0.1.0" +version = "0.1.0-rc.1" dependencies = [ "anyhow", "clap", @@ -3987,7 +3987,7 @@ dependencies = [ [[package]] name = "zksync_concurrency" -version = "0.1.0" +version = "0.1.0-rc.1" dependencies = [ "anyhow", "assert_matches", @@ -4005,7 +4005,7 @@ dependencies = [ [[package]] name = "zksync_consensus_bft" -version = "0.1.0" +version = "0.1.0-rc.1" dependencies = [ "anyhow", "assert_matches", @@ -4029,7 +4029,7 @@ dependencies = [ [[package]] name = "zksync_consensus_crypto" -version = "0.1.0" +version = "0.1.0-rc.1" dependencies = [ "anyhow", "blst", @@ -4052,7 +4052,7 @@ dependencies = [ [[package]] name = "zksync_consensus_executor" -version = "0.1.0" +version = "0.1.0-rc.1" dependencies = [ "anyhow", "rand 0.8.5", @@ -4072,7 +4072,7 @@ dependencies = [ [[package]] name = "zksync_consensus_network" -version = "0.1.0" +version = "0.1.0-rc.1" dependencies = [ "anyhow", "assert_matches", @@ -4108,7 +4108,7 @@ dependencies = [ [[package]] name = "zksync_consensus_roles" -version = "0.1.0" +version = "0.1.0-rc.1" dependencies = [ "anyhow", "assert_matches", @@ -4129,7 +4129,7 @@ dependencies = [ [[package]] name = "zksync_consensus_storage" -version = "0.1.0" +version = "0.1.0-rc.1" dependencies = [ "anyhow", "assert_matches", @@ -4150,7 +4150,7 @@ dependencies = [ [[package]] name = "zksync_consensus_tools" -version = "0.1.0" +version = "0.1.0-rc.1" dependencies = [ "anyhow", "async-trait", @@ -4185,7 +4185,7 @@ dependencies = [ [[package]] name = "zksync_consensus_utils" -version = "0.1.0" +version = "0.1.0-rc.1" dependencies = [ "anyhow", "rand 0.8.5", @@ -4195,7 +4195,7 @@ dependencies = [ [[package]] name = "zksync_protobuf" -version = "0.1.0" +version = "0.1.0-rc.1" dependencies = [ "anyhow", "bit-vec", @@ -4217,7 +4217,7 @@ dependencies = [ [[package]] name = "zksync_protobuf_build" -version = "0.1.0" +version = "0.1.0-rc.1" dependencies = [ "anyhow", "heck", diff --git a/node/Cargo.toml b/node/Cargo.toml index 85e26017d..30f332af6 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -19,24 +19,26 @@ resolver = "2" edition = "2021" authors = ["The Matter Labs Team "] homepage = "https://matter-labs.io/" -license = "MIT" -version = "0.1.0" +repository = "https://github.com/matter-labs/era-consensus" +license = "MIT OR Apache-2.0" +keywords = ["blockchain", "zksync"] +version = "0.1.0-rc.1" [workspace.dependencies] # Crates from this repo. -zksync_consensus_bft = { path = "actors/bft" } -zksync_consensus_crypto = { path = "libs/crypto" } -zksync_consensus_executor = { path = "actors/executor" } -zksync_consensus_network = { path = "actors/network" } -zksync_consensus_roles = { path = "libs/roles" } -zksync_consensus_storage = { path = "libs/storage" } -zksync_consensus_tools = { path = "tools" } -zksync_consensus_utils = { path = "libs/utils" } +zksync_consensus_bft = { version = "=0.1.0-rc.1", path = "actors/bft" } +zksync_consensus_crypto = { version = "=0.1.0-rc.1", path = "libs/crypto" } +zksync_consensus_executor = { version = "=0.1.0-rc.1", path = "actors/executor" } +zksync_consensus_network = { version = "=0.1.0-rc.1", path = "actors/network" } +zksync_consensus_roles = { version = "=0.1.0-rc.1", path = "libs/roles" } +zksync_consensus_storage = { version = "=0.1.0-rc.1", path = "libs/storage" } +zksync_consensus_tools = { version = "=0.1.0-rc.1", path = "tools" } +zksync_consensus_utils = { version = "=0.1.0-rc.1", path = "libs/utils" } # Crates from this repo that might become independent in the future. -zksync_concurrency = { path = "libs/concurrency" } -zksync_protobuf = { path = "libs/protobuf" } -zksync_protobuf_build = { path = "libs/protobuf_build" } +zksync_concurrency = { version = "=0.1.0-rc.1", path = "libs/concurrency" } +zksync_protobuf = { version = "=0.1.0-rc.1", path = "libs/protobuf" } +zksync_protobuf_build = { version = "=0.1.0-rc.1", path = "libs/protobuf_build" } # Crates from Matter Labs. pairing = { package = "pairing_ce", version = "=0.28.6" } diff --git a/node/actors/bft/Cargo.toml b/node/actors/bft/Cargo.toml index 8cbe0a657..863c2a305 100644 --- a/node/actors/bft/Cargo.toml +++ b/node/actors/bft/Cargo.toml @@ -5,6 +5,8 @@ edition.workspace = true authors.workspace = true homepage.workspace = true license.workspace = true +repository.workspace = true +keywords.workspace = true [dependencies] zksync_concurrency.workspace = true diff --git a/node/actors/executor/Cargo.toml b/node/actors/executor/Cargo.toml index 03e0c9f2b..eee74d397 100644 --- a/node/actors/executor/Cargo.toml +++ b/node/actors/executor/Cargo.toml @@ -5,6 +5,8 @@ edition.workspace = true authors.workspace = true homepage.workspace = true license.workspace = true +repository.workspace = true +keywords.workspace = true [dependencies] zksync_concurrency.workspace = true diff --git a/node/actors/network/Cargo.toml b/node/actors/network/Cargo.toml index ab7f7dbb7..8d7fc4dad 100644 --- a/node/actors/network/Cargo.toml +++ b/node/actors/network/Cargo.toml @@ -5,6 +5,8 @@ edition.workspace = true authors.workspace = true homepage.workspace = true license.workspace = true +repository.workspace = true +keywords.workspace = true [dependencies] zksync_concurrency.workspace = true diff --git a/node/libs/concurrency/Cargo.toml b/node/libs/concurrency/Cargo.toml index 96a72f3dd..f5611cd94 100644 --- a/node/libs/concurrency/Cargo.toml +++ b/node/libs/concurrency/Cargo.toml @@ -5,6 +5,8 @@ edition.workspace = true authors.workspace = true homepage.workspace = true license.workspace = true +repository.workspace = true +keywords.workspace = true [dependencies] anyhow.workspace = true diff --git a/node/libs/crypto/Cargo.toml b/node/libs/crypto/Cargo.toml index 44f7fd838..d28374cda 100644 --- a/node/libs/crypto/Cargo.toml +++ b/node/libs/crypto/Cargo.toml @@ -5,6 +5,8 @@ edition.workspace = true authors.workspace = true homepage.workspace = true license.workspace = true +repository.workspace = true +keywords.workspace = true [dependencies] anyhow.workspace = true diff --git a/node/libs/protobuf/Cargo.toml b/node/libs/protobuf/Cargo.toml index 1be22904c..118418fde 100644 --- a/node/libs/protobuf/Cargo.toml +++ b/node/libs/protobuf/Cargo.toml @@ -5,6 +5,8 @@ edition.workspace = true authors.workspace = true homepage.workspace = true license.workspace = true +repository.workspace = true +keywords.workspace = true links = "zksync_protobuf_proto" [dependencies] diff --git a/node/libs/protobuf_build/Cargo.toml b/node/libs/protobuf_build/Cargo.toml index 58055e7ab..1d9be8fc2 100644 --- a/node/libs/protobuf_build/Cargo.toml +++ b/node/libs/protobuf_build/Cargo.toml @@ -5,6 +5,8 @@ edition.workspace = true authors.workspace = true homepage.workspace = true license.workspace = true +repository.workspace = true +keywords.workspace = true [dependencies] anyhow.workspace = true diff --git a/node/libs/roles/Cargo.toml b/node/libs/roles/Cargo.toml index d46666893..b4278ed15 100644 --- a/node/libs/roles/Cargo.toml +++ b/node/libs/roles/Cargo.toml @@ -5,6 +5,8 @@ edition.workspace = true authors.workspace = true homepage.workspace = true license.workspace = true +repository.workspace = true +keywords.workspace = true links = "zksync_consensus_roles_proto" [dependencies] diff --git a/node/libs/storage/Cargo.toml b/node/libs/storage/Cargo.toml index e44a5cd48..a85a42d7c 100644 --- a/node/libs/storage/Cargo.toml +++ b/node/libs/storage/Cargo.toml @@ -5,6 +5,8 @@ edition.workspace = true authors.workspace = true homepage.workspace = true license.workspace = true +repository.workspace = true +keywords.workspace = true [dependencies] zksync_concurrency.workspace = true diff --git a/node/libs/utils/Cargo.toml b/node/libs/utils/Cargo.toml index ffb3f5b1c..5f3f7dfcb 100644 --- a/node/libs/utils/Cargo.toml +++ b/node/libs/utils/Cargo.toml @@ -5,6 +5,8 @@ edition.workspace = true authors.workspace = true homepage.workspace = true license.workspace = true +repository.workspace = true +keywords.workspace = true [dependencies] zksync_concurrency.workspace = true diff --git a/node/tests/Cargo.toml b/node/tests/Cargo.toml index d46a09254..7f1e33cef 100644 --- a/node/tests/Cargo.toml +++ b/node/tests/Cargo.toml @@ -5,6 +5,9 @@ edition.workspace = true authors.workspace = true homepage.workspace = true license.workspace = true +repository.workspace = true +keywords.workspace = true +publish = false [dependencies] zksync_consensus_tools.workspace = true diff --git a/node/tools/Cargo.toml b/node/tools/Cargo.toml index 70ba698f7..f16968534 100644 --- a/node/tools/Cargo.toml +++ b/node/tools/Cargo.toml @@ -5,6 +5,8 @@ edition.workspace = true authors.workspace = true homepage.workspace = true license.workspace = true +repository.workspace = true +keywords.workspace = true default-run = "executor" [dependencies] From 92fe49cec6f982ed3f1d92cb17912a8e6b4fc629 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20Fran=C3=A7a?= Date: Mon, 8 Jul 2024 16:29:15 +0100 Subject: [PATCH 2/3] Refactored TimeoutQC to contain just one CommitQC (#144) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Refactored (in the informal spec) `TimeoutQC`, `SignedTimeoutVote` and `TimeoutVote` so that `TimeoutQC` contains only one `CommitQC` (the high QC). ## Why ❔ It reduces the state space which should help with model checking. --------- Co-authored-by: Igor Konnov --- spec/informal-spec/replica.rs | 8 ++--- spec/informal-spec/types.rs | 59 +++++++++++++++++------------------ 2 files changed, 32 insertions(+), 35 deletions(-) diff --git a/spec/informal-spec/replica.rs b/spec/informal-spec/replica.rs index 02d6ec146..901c5dc23 100644 --- a/spec/informal-spec/replica.rs +++ b/spec/informal-spec/replica.rs @@ -165,7 +165,7 @@ fn on_proposal(self, proposal: Proposal) { match proposal.justification { Commit(qc) => self.process_commit_qc(Some(qc)), Timeout(qc) => { - self.process_commit_qc(qc.high_qc()); + self.process_commit_qc(qc.high_commit_qc); self.high_timeout_qc = max(Some(qc), self.high_timeout_qc); } }; @@ -177,7 +177,7 @@ fn on_proposal(self, proposal: Proposal) { // Processed an (already verified) commit_qc received from the network // as part of some message. It bumps the local high_commit_qc and if // we have the proposal corresponding to this qc, we append it to the committed_blocks. -fn process_commit_qc(self, qc_opt: Option[CommitQC]) { +fn process_commit_qc(self, qc_opt: Option) { if let Some(qc) = qc_opt { self.high_commit_qc = max(Some(qc), self.high_commit_qc); let Some(block) = self.cached_proposals.get((qc.vote.block_number,qc.vote.block_hash)) else { return }; @@ -218,7 +218,7 @@ fn on_timeout(self, sig_vote: SignedTimeoutVote) { // Check if we now have a timeout QC for this view. if let Some(qc) = self.get_timeout_qc(sig_vote.view()) { - self.process_commit_qc(qc.high_qc()); + self.process_commit_qc(qc.high_commit_qc); self.high_timeout_qc = max(Some(qc), self.high_timeout_qc); self.start_new_view(sig_vote.view() + 1); } @@ -235,7 +235,7 @@ fn on_new_view(self, new_view: NewView) { match new_view.justification { Commit(qc) => self.process_commit_qc(Some(qc)), Timeout(qc) => { - self.process_commit_qc(qc.high_qc()); + self.process_commit_qc(qc.high_commit_qc); self.high_timeout_qc = max(Some(qc), self.high_timeout_qc); } }; diff --git a/spec/informal-spec/types.rs b/spec/informal-spec/types.rs index 16d955ea0..7dd9885ed 100644 --- a/spec/informal-spec/types.rs +++ b/spec/informal-spec/types.rs @@ -77,7 +77,7 @@ impl Justification { // Get the high commit QC of the timeout QC. We compare the high QC field of // all timeout votes in the QC, and get the highest one, if it exists. - let high_qc: Option = qc.high_qc(); + let high_qc: Option = qc.high_commit_qc; if high_vote.is_some() && (high_qc.is_none() || high_vote.block_number > high_qc.block_number) @@ -154,16 +154,19 @@ struct TimeoutVote { view: ViewNumber, // The commit vote with the highest view that this replica has signed, if any. high_vote: Option, - // The commit quorum certificate with the highest view that this replica + // The view number of the highest commit quorum certificate that this replica // has observed, if any. - high_commit_qc: Option, + high_commit_qc_view: Option, } struct SignedTimeoutVote { // The timeout. vote: TimeoutVote, - // Signature of the sender. + // Signature of the sender. This signature is ONLY over the vote field. sig: Signature, + // The commit quorum certificate with the highest view that this replica + // has observed, if any. It MUST match `high_commit_qc_view` in `vote`. + high_commit_qc: Option, } impl SignedTimeoutVote { @@ -172,9 +175,10 @@ impl SignedTimeoutVote { } fn verify(self) -> bool { - // Technically, only the signature needs to be verified. But if we wish, there are two invariants that are easy to check: - // self.view() >= self.high_vote.view() && self.high_vote.view() >= self.high_commit_qc.view() - self.verify_sig() + // If we wish, there are two invariants that are easy to check but aren't required for correctness: + // self.view() >= self.high_vote.view() && self.high_vote.view() >= self.high_commit_qc_view + self.vote.high_commit_qc_view == self.high_commit_qc.map(|x| x.view()) && self.verify_sig() + && self.high_commit_qc.map(|qc| qc.verify()) } } @@ -189,6 +193,9 @@ struct TimeoutQC { // The aggregate signature of the replicas. We ignore the details here. // Can be something as simple as a vector of signatures. agg_sig: AggregateSignature, + // The commit quorum certificate with the highest view that all replicas in this + // QC have observed, if any. It MUST match the highest `high_commit_qc_view` in `votes`. + high_commit_qc: Option, } impl TimeoutQC { @@ -197,27 +204,27 @@ impl TimeoutQC { } fn verify(self) -> bool { - // Check that all votes have the same view. - for (_, vote) in votes { - if vote.view != self.view() { - return false; - } - } + // Check that all votes have the same view and get the highest commit QC view of the timeout QC. + let high_qc_view = None; - // Get the high commit QC of the timeout QC. We compare the high QC field of all - // timeout votes in the QC, and get the highest one, if it exists. - // We then need to verify that it is valid. We don't need to verify the commit QCs - // of the other timeout votes, since they don't have any influence in the result. - if let Some(high_qc) = self.high_qc() { - if !high_qc.verify() { + for (_, vote) in self.votes { + if vote.view != self.view() { return false; } + high_qc_view = max(high_qc_view, vote.high_commit_qc_view); } - // In here we need to not only check the signature, but also that // it has enough weight beyond it. - self.verify_agg_sig(QUORUM_WEIGHT) + if !self.verify_agg_sig(QUORUM_WEIGHT) { + return false; + } + + // We check that the high commit QC view matches the high commit QC that we have, and we verify the QC. + match self.high_commit_qc { + Some(high_qc) => high_qc_view == Some(high_qc.view()) && high_qc.verify(); + None => high_qc_view.is_none(); + } } fn high_vote(self) -> Option { @@ -245,16 +252,6 @@ impl TimeoutQC { None } } - - fn high_qc(self) -> Option { - let high_qc = None; - - for (_, vote) in votes { - high_qc = max(high_qc, vote.high_commit_qc); - } - - high_qc - } } struct NewView { From 96e5e692c064ad254dc5f7ec7b6ecdefdeb30f58 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Mon, 8 Jul 2024 17:46:23 +0100 Subject: [PATCH 3/3] feat: Collect votes for batch QC and save into store (BFT-477) (#145) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ - [x] Added `BatchHash` and `Batch::hash` to give meaning to the attestation - [x] Refactored `BatchVotes` to look for a quorum over a height+hash combo - [x] Added a background task to listen to changes in the votes, look for a quorum and save it to the DB - [x] Test the quorum logic - [x] Test the propagation ### Caveat `BatchVotes` still only allows 1 vote per attester and thus doesn't fully handle multiple outstanding L1 batches. That will come later when we'll have subscribed to notifications about the highest finalised batch number, coming from the main node API which doesn't exist yet. That will serve as our beacon for what kind of ranges of votes to accept. Currently for defensive reasons we only accept one vote per attester, which means if attesters keep voting on different heights, always out of sync with each other, they might never form a quorum. That is considered okay at this point; we save what we can and see how it works. 1 minute batches should be long enough not to cause trouble for gossip. ## Why ❔ With these changes the node will react to votes coming in over gossip and forming a quorum over the latest L1 batch. --- node/actors/network/src/gossip/batch_votes.rs | 151 +++++++++++++- node/actors/network/src/gossip/mod.rs | 86 +++----- node/actors/network/src/gossip/runner.rs | 12 ++ node/actors/network/src/gossip/tests/mod.rs | 188 ++++++++++++++++-- node/actors/network/src/lib.rs | 7 +- node/actors/network/src/tests.rs | 1 + node/libs/roles/src/attester/conv.rs | 20 +- .../roles/src/attester/keys/secret_key.rs | 4 +- .../libs/roles/src/attester/messages/batch.rs | 30 ++- node/libs/roles/src/attester/messages/msg.rs | 18 +- node/libs/roles/src/attester/testonly.rs | 38 +++- node/libs/roles/src/attester/tests.rs | 6 +- node/libs/roles/src/proto/attester.proto | 6 + .../roles/src/validator/messages/consensus.rs | 5 +- node/libs/storage/src/batch_store.rs | 13 ++ node/libs/storage/src/testonly/mod.rs | 70 +++---- 16 files changed, 491 insertions(+), 164 deletions(-) diff --git a/node/actors/network/src/gossip/batch_votes.rs b/node/actors/network/src/gossip/batch_votes.rs index 4bf076567..cc934f673 100644 --- a/node/actors/network/src/gossip/batch_votes.rs +++ b/node/actors/network/src/gossip/batch_votes.rs @@ -4,19 +4,46 @@ use std::{collections::HashSet, sync::Arc}; use zksync_concurrency::sync; use zksync_consensus_roles::attester::{self, Batch}; -/// Mapping from attester::PublicKey to a signed attester::Batch message. /// Represents the currents state of node's knowledge about the attester votes. +/// +/// Eventually this data structure will have to track voting potentially happening +/// simultaneously on multiple heights, if we decrease the batch interval to be +/// several seconds, instead of a minute. By that point, the replicas should be +/// following the main node (or L1) to know what is the highest finalized batch, +/// which will act as a floor to the batch number we have to track here. It will +/// also help to protect ourselves from DoS attacks by malicious attesters casting +/// votes far into the future. +/// +/// For now, however, we just want a best effort where if we find a quorum, we +/// save it to the database, if not, we move on. For that, a simple protection +/// mechanism is to only allow one active vote per attester, which means any +/// previous vote can be removed when a new one is added. #[derive(Clone, Default, PartialEq, Eq)] -pub(crate) struct BatchVotes( - pub(super) im::HashMap>>, -); +pub(crate) struct BatchVotes { + /// The latest vote received from each attester. We only keep the last one + /// for now, hoping that with 1 minute batches there's plenty of time for + /// the quorum to be reached, but eventually we'll have to allow multiple + /// votes across different heights. + pub(crate) votes: im::HashMap>>, + + /// Total weight of votes at different heights and hashes. + /// + /// We will be looking for any hash that reaches a quorum threshold at any of the heights. + /// At that point we can remove all earlier heights, considering it final. In the future + /// we can instead keep heights until they are observed on the main node (or L1). + pub(crate) support: + im::OrdMap>, + + /// The minimum batch number for which we are still interested in votes. + pub(crate) min_batch_number: attester::BatchNumber, +} impl BatchVotes { - /// Returns a set of entries of `self` which are newer than the entries in `b`. + /// Returns a set of votes of `self` which are newer than the entries in `b`. pub(super) fn get_newer(&self, b: &Self) -> Vec>> { let mut newer = vec![]; - for (k, v) in &self.0 { - if let Some(bv) = b.0.get(k) { + for (k, v) in &self.votes { + if let Some(bv) = b.votes.get(k) { if v.msg <= bv.msg { continue; } @@ -47,23 +74,119 @@ impl BatchVotes { anyhow::bail!("duplicate entry for {:?}", d.key); } done.insert(d.key.clone()); - if !attesters.contains(&d.key) { + + if d.msg.number < self.min_batch_number { + continue; + } + + let Some(weight) = attesters.weight(&d.key) else { // We just skip the entries we are not interested in. // For now the set of attesters is static, so we could treat this as an error, // however we eventually want the attester set to be dynamic. continue; - } - if let Some(x) = self.0.get(&d.key) { + }; + + // If we already have a newer vote for this key, we can ignore this one. + if let Some(x) = self.votes.get(&d.key) { if d.msg <= x.msg { continue; } } + + // Check the signature before insertion. d.verify()?; - self.0.insert(d.key.clone(), d.clone()); + + self.add(d.clone(), weight); + changed = true; } Ok(changed) } + + /// Check if we have achieved quorum for any of the batch hashes. + /// + /// The return value is a vector because eventually we will be potentially waiting for + /// quorums on multiple heights simultaneously. + /// + /// For repeated queries we can supply a skip list of heights for which we already saved the QC. + pub(super) fn find_quorums( + &self, + attesters: &attester::Committee, + skip: impl Fn(attester::BatchNumber) -> bool, + ) -> Vec { + let threshold = attesters.threshold(); + self.support + .iter() + .filter(|(number, _)| !skip(**number)) + .flat_map(|(number, candidates)| { + candidates + .iter() + .filter(|(_, weight)| **weight >= threshold) + .map(|(hash, _)| { + let sigs = self + .votes + .values() + .filter(|vote| vote.msg.hash == *hash) + .map(|vote| (vote.key.clone(), vote.sig.clone())) + .fold(attester::MultiSig::default(), |mut sigs, (key, sig)| { + sigs.add(key, sig); + sigs + }); + attester::BatchQC { + message: Batch { + number: *number, + hash: *hash, + }, + signatures: sigs, + } + }) + }) + .collect() + } + + /// Set the minimum batch number for which we admit votes. + /// + /// Discards data about earlier heights. + pub(super) fn set_min_batch_number(&mut self, min_batch_number: attester::BatchNumber) { + self.min_batch_number = min_batch_number; + self.votes.retain(|_, v| v.msg.number >= min_batch_number); + if let Some(prev) = min_batch_number.prev() { + let (_, support) = self.support.split(&prev); + self.support = support; + } + } + + /// Add an already validated vote from an attester into the register. + fn add(&mut self, vote: Arc>, weight: attester::Weight) { + self.remove(&vote.key, weight); + + let batch = self.support.entry(vote.msg.number).or_default(); + let support = batch.entry(vote.msg.hash).or_default(); + *support = support.saturating_add(weight); + + self.votes.insert(vote.key.clone(), vote); + } + + /// Remove any existing vote. + /// + /// This is for DoS protection, until we have better control over the acceptable vote range. + fn remove(&mut self, key: &attester::PublicKey, weight: attester::Weight) { + let Some(vote) = self.votes.remove(key) else { + return; + }; + + let batch = self.support.entry(vote.msg.number).or_default(); + let support = batch.entry(vote.msg.hash).or_default(); + *support = support.saturating_sub(weight); + + if *support == 0u64 { + batch.remove(&vote.msg.hash); + } + + if batch.is_empty() { + self.support.remove(&vote.msg.number); + } + } } /// Watch wrapper of BatchVotes, @@ -99,6 +222,12 @@ impl BatchVotesWatch { } Ok(()) } + + /// Set the minimum batch number on the votes and discard old data. + pub(crate) async fn set_min_batch_number(&self, min_batch_number: attester::BatchNumber) { + let this = self.0.lock().await; + this.send_modify(|votes| votes.set_min_batch_number(min_batch_number)); + } } /// Wrapper around [BatchVotesWatch] to publish votes over batches signed by an attester key. diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 905c78061..b0d74a4ef 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -17,11 +17,10 @@ use self::batch_votes::BatchVotesWatch; use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats}; use anyhow::Context as _; use fetch::RequestItem; -use im::HashMap; use std::sync::{atomic::AtomicUsize, Arc}; pub(crate) use validator_addrs::*; use zksync_concurrency::{ctx, ctx::channel, scope, sync}; -use zksync_consensus_roles::{attester, node, validator}; +use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; mod batch_votes; @@ -57,10 +56,6 @@ pub(crate) struct Network { /// /// These are blocks that this node wants to request from remote peers via RPC. pub(crate) fetch_queue: fetch::Queue, - /// Last viewed QC. - pub(crate) last_viewed_qc: Option, - /// L1 batch qc. - pub(crate) batch_qc: HashMap, /// TESTONLY: how many time push_validator_addrs rpc was called by the peers. pub(crate) push_validator_addrs_calls: AtomicUsize, } @@ -82,8 +77,6 @@ impl Network { outbound: PoolWatch::new(cfg.gossip.static_outbound.keys().cloned().collect(), 0), validator_addrs: ValidatorAddrsWatch::default(), batch_votes: Arc::new(BatchVotesWatch::default()), - batch_qc: HashMap::new(), - last_viewed_qc: None, cfg, fetch_queue: fetch::Queue::default(), block_store, @@ -155,63 +148,36 @@ impl Network { .await; } - /// Task that keeps hearing about new votes and updates the L1 batch qc. + /// Task that keeps hearing about new votes and looks for an L1 batch qc. /// It will propagate the QC if there's enough votes. - pub(crate) async fn update_batch_qc(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - // TODO This is not a good way to do this, we shouldn't be verifying the QC every time - // Can we get only the latest votes? - let attesters = self.genesis().attesters.as_ref().context("attesters")?; + pub(crate) async fn run_batch_qc_finder(&self, ctx: &ctx::Ctx) -> ctx::Result<()> { + let Some(attesters) = self.genesis().attesters.as_ref() else { + return Ok(()); + }; + let mut sub = self.batch_votes.subscribe(); loop { - let mut sub = self.batch_votes.subscribe(); - let votes = sync::changed(ctx, &mut sub) - .await - .context("batch votes")? - .clone(); - - // Check next QC to collect votes for. - let new_qc = self - .last_viewed_qc - .clone() - .map(|qc| { - attester::BatchQC::new(attester::Batch { - number: qc.message.number.next(), - }) - }) - .unwrap_or_else(|| { - attester::BatchQC::new(attester::Batch { - number: attester::BatchNumber(0), - }) - }) - .context("new qc")?; - - // Check votes for the correct QC. - for (_, sig) in votes.0 { - if self - .batch_qc - .clone() - .entry(new_qc.message.number) - .or_insert_with(|| attester::BatchQC::new(new_qc.message.clone()).expect("qc")) - .add(&sig, self.genesis()) - .is_err() - { - // TODO: Should we ban the peer somehow? - continue; - } - } + // In the future when we might be gossiping about multiple batches at the same time, + // we can collect the ones we submitted into a skip list until we see them confirmed + // on L1 and we can finally increase the minimum as well. + let quorums = { + let votes = sync::changed(ctx, &mut sub).await?; + votes.find_quorums(attesters, |_| false) + }; - let weight = attesters.weight_of_keys( - self.batch_qc - .get(&new_qc.message.number) - .context("last qc")? - .signatures - .keys(), - ); + for qc in quorums { + // In the future this should come from confirmations, but for now it's best effort, so we can forget ASAP. + // TODO: An initial value could be looked up in the database even now. + let next_batch_number = qc.message.number.next(); - if weight < attesters.threshold() { - return Ok(()); - }; + self.batch_store + .queue_batch_qc(ctx, qc) + .await + .context("queue_batch_qc")?; - // If we have enough weight, we can update the last viewed QC and propagate it. + self.batch_votes + .set_min_batch_number(next_batch_number) + .await; + } } } } diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index 6156a2579..60765c46f 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -32,6 +32,9 @@ impl rpc::Handler for PushValidatorAddrsServer<' } } +/// Receive the snapshot of known batch votes from a remote peer. +/// +/// The server receives the *diff* from remote peers, which it merges into the common register. struct PushBatchVotesServer<'a>(&'a Network); #[async_trait::async_trait] @@ -53,8 +56,10 @@ impl rpc::Handler for PushBatchVotesServer<'_> { } } +/// Represents what we know about the state of available blocks on the remote peer. struct PushBlockStoreStateServer<'a> { state: sync::watch::Sender, + /// The network is required for the verification of messages. net: &'a Network, } @@ -71,11 +76,13 @@ impl<'a> PushBlockStoreStateServer<'a> { } } +/// Represents what we know about the state of available batches on the remote peer. struct PushBatchStoreStateServer { state: sync::watch::Sender, } impl PushBatchStoreStateServer { + /// Start out not knowing anything about the remote peer. fn new() -> Self { Self { state: sync::watch::channel(BatchStoreState { @@ -194,6 +201,8 @@ impl Network { self.cfg.rpc.get_batch_rate, ) .add_server(ctx, rpc::ping::Server, rpc::ping::RATE); + + // If there is an attester committee then if self.genesis().attesters.as_ref().is_some() { let push_signature_client = rpc::Client::::new( ctx, @@ -208,11 +217,14 @@ impl Network { // Push L1 batch votes updates to peer. s.spawn::<()>(async { let push_signature_client = push_signature_client; + // Snapshot of the batches when we last pushed to the peer. let mut old = BatchVotes::default(); + // Subscribe to what we know about the state of the whole network. let mut sub = self.batch_votes.subscribe(); sub.mark_changed(); loop { let new = sync::changed(ctx, &mut sub).await?.clone(); + // Get the *new* votes, which haven't been pushed before. let diff = new.get_newer(&old); if diff.is_empty() { continue; diff --git a/node/actors/network/src/gossip/tests/mod.rs b/node/actors/network/src/gossip/tests/mod.rs index 8e3e15264..837d7fc27 100644 --- a/node/actors/network/src/gossip/tests/mod.rs +++ b/node/actors/network/src/gossip/tests/mod.rs @@ -1,6 +1,10 @@ use super::ValidatorAddrs; use crate::{ - gossip::{batch_votes::BatchVotesWatch, handshake, validator_addrs::ValidatorAddrsWatch}, + gossip::{ + batch_votes::{BatchVotes, BatchVotesWatch}, + handshake, + validator_addrs::ValidatorAddrsWatch, + }, metrics, preface, rpc, testonly, }; use anyhow::Context as _; @@ -18,7 +22,7 @@ use zksync_concurrency::{ time, }; use zksync_consensus_roles::{attester, validator}; -use zksync_consensus_storage::testonly::TestMemoryStorage; +use zksync_consensus_storage::{testonly::TestMemoryStorage, PersistentBatchStore}; mod fetch_batches; mod fetch_blocks; @@ -107,11 +111,14 @@ fn mk_netaddr( } fn mk_batch( - _rng: &mut R, + rng: &mut R, key: &attester::SecretKey, number: attester::BatchNumber, ) -> attester::Signed { - key.sign_msg(attester::Batch { number }) + key.sign_msg(attester::Batch { + number, + hash: rng.gen(), + }) } fn random_netaddr( @@ -132,6 +139,7 @@ fn random_batch_vote( ) -> Arc> { let batch = attester::Batch { number: attester::BatchNumber(rng.gen_range(0..1000)), + hash: rng.gen(), }; Arc::new(key.sign_msg(batch.to_owned())) } @@ -152,13 +160,14 @@ fn update_netaddr( } fn update_signature( - _rng: &mut R, + rng: &mut R, batch: &attester::Batch, key: &attester::SecretKey, batch_number_diff: i64, ) -> Arc> { let batch = attester::Batch { number: attester::BatchNumber((batch.number.0 as i64 + batch_number_diff) as u64), + hash: rng.gen(), }; Arc::new(key.sign_msg(batch.to_owned())) } @@ -549,23 +558,25 @@ async fn test_batch_votes() { want.insert(random_batch_vote(rng, k)); } votes.update(&attesters, &want.as_vec()).await.unwrap(); - assert_eq!(want.0, sub.borrow_and_update().0); + assert_eq!(want.0, sub.borrow_and_update().votes); - // newer batch number + // newer batch number, should be updated let k0v2 = update_signature(rng, &want.get(&keys[0]).msg, &keys[0], 1); - // same batch number + // same batch number, should be ignored let k1v2 = update_signature(rng, &want.get(&keys[1]).msg, &keys[1], 0); - // older batch number + // older batch number, should be ignored let k4v2 = update_signature(rng, &want.get(&keys[4]).msg, &keys[4], -1); - // first entry for a key in the config + // first entry for a key in the config, should be inserted let k6v1 = random_batch_vote(rng, &keys[6]); - // entry for a key outside of the config + // entry for a key outside of the config, should be ignored let k8 = rng.gen(); let k8v1 = random_batch_vote(rng, &k8); + // Update the ones we expect to succeed want.insert(k0v2.clone()); - want.insert(k1v2.clone()); want.insert(k6v1.clone()); + + // Send all of them to the votes let update = [ k0v2, k1v2, @@ -576,9 +587,9 @@ async fn test_batch_votes() { k8v1.clone(), ]; votes.update(&attesters, &update).await.unwrap(); - assert_eq!(want.0, sub.borrow_and_update().0); + assert_eq!(want.0, sub.borrow_and_update().votes); - // Invalid signature. + // Invalid signature, should be rejected. let mut k0v3 = mk_batch( rng, &keys[1], @@ -586,18 +597,151 @@ async fn test_batch_votes() { ); k0v3.key = keys[0].public(); assert!(votes.update(&attesters, &[Arc::new(k0v3)]).await.is_err()); - assert_eq!(want.0, sub.borrow_and_update().0); + assert_eq!(want.0, sub.borrow_and_update().votes); - // Duplicate entry in the update. + // Duplicate entry in the update, should be rejected. assert!(votes .update(&attesters, &[k8v1.clone(), k8v1]) .await .is_err()); - assert_eq!(want.0, sub.borrow_and_update().0); + assert_eq!(want.0, sub.borrow_and_update().votes); +} + +#[test] +fn test_batch_votes_quorum() { + abort_on_panic(); + let rng = &mut ctx::test_root(&ctx::RealClock).rng(); + + for _ in 0..10 { + let size = rng.gen_range(1..20); + let keys: Vec = (0..size).map(|_| rng.gen()).collect(); + let attesters = attester::Committee::new(keys.iter().map(|k| attester::WeightedAttester { + key: k.public(), + weight: rng.gen_range(1..=100), + })) + .unwrap(); + + let batch0 = rng.gen::(); + let batch1 = attester::Batch { + number: batch0.number.next(), + hash: rng.gen(), + }; + let mut batches = [(batch0, 0u64), (batch1, 0u64)]; + + let mut votes = BatchVotes::default(); + for sk in &keys { + // We need 4/5+1 for quorum, so let's say ~80% vote on the second batch. + let b = usize::from(rng.gen_range(0..100) < 80); + let batch = &batches[b].0; + let vote = sk.sign_msg(batch.clone()); + votes.update(&attesters, &[Arc::new(vote)]).unwrap(); + batches[b].1 += attesters.weight(&sk.public()).unwrap(); + + // Check that as soon as we have quorum it's found. + if batches[b].1 >= attesters.threshold() { + let qs = votes.find_quorums(&attesters, |_| false); + assert!(!qs.is_empty(), "should find quorum"); + assert!(qs[0].message == *batch); + assert!(qs[0].signatures.keys().count() > 0); + } + } + + if let Some(quorum) = batches + .iter() + .find(|b| b.1 >= attesters.threshold()) + .map(|(b, _)| b) + { + // Check that a quorum can be skipped + assert!(votes + .find_quorums(&attesters, |b| b == quorum.number) + .is_empty()); + } else { + // Check that if there was no quoroum then we don't find any. + assert!(votes.find_quorums(&attesters, |_| false).is_empty()); + } + + // Check that the minimum batch number prunes data. + let last_batch = batches[1].0.number; + + votes.set_min_batch_number(last_batch); + assert!(votes.votes.values().all(|v| v.msg.number >= last_batch)); + assert!(votes.support.keys().all(|n| *n >= last_batch)); + + votes.set_min_batch_number(last_batch.next()); + assert!(votes.votes.is_empty()); + assert!(votes.support.is_empty()); + } } -// TODO: This test is disabled because the logic for attesters to receive and sign batches is not implemented yet. -// It should be re-enabled once the logic is implemented. -// #[tokio::test(flavor = "multi_thread")] -// async fn test_batch_votes_propagation() { -// } +// +#[tokio::test(flavor = "multi_thread")] +async fn test_batch_votes_propagation() { + let _guard = set_timeout(time::Duration::seconds(10)); + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.)); + let rng = &mut ctx.rng(); + + let mut setup = validator::testonly::Setup::new(rng, 10); + + let cfgs = testonly::new_configs(rng, &setup, 1); + + scope::run!(ctx, |ctx, s| async { + // All nodes share the same in-memory store + let store = TestMemoryStorage::new(ctx, &setup.genesis).await; + s.spawn_bg(store.runner.run(ctx)); + + // Push the first batch into the store so we have something to vote on. + setup.push_blocks(rng, 2); + setup.push_batch(rng); + + let batch = setup.batches[0].clone(); + + store + .batches + .queue_batch(ctx, batch.clone(), setup.genesis.clone()) + .await + .unwrap(); + + let batch = attester::Batch { + number: batch.number, + hash: rng.gen(), + }; + + // Start all nodes. + let nodes: Vec = cfgs + .iter() + .enumerate() + .map(|(i, cfg)| { + let (node, runner) = testonly::Instance::new( + cfg.clone(), + store.blocks.clone(), + store.batches.clone(), + ); + s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); + node + }) + .collect(); + + // Cast votes on each node. It doesn't matter who signs where in this test, + // we just happen to know that we'll have as many nodes as attesters. + let attesters = setup.genesis.attesters.as_ref().unwrap(); + for (node, key) in nodes.iter().zip(setup.attester_keys.iter()) { + node.net + .batch_vote_publisher() + .publish(attesters, key, batch.clone()) + .await + .unwrap(); + } + + // Wait until one of the nodes collects a quorum over gossip and stores. + loop { + if let Some(qc) = store.im_batches.get_batch_qc(ctx, batch.number).await? { + assert_eq!(qc.message, batch); + return Ok(()); + } + ctx.sleep(time::Duration::milliseconds(100)).await?; + } + }) + .await + .unwrap(); +} diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index 7373b484f..349ee5e28 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -132,9 +132,10 @@ impl Runner { // Update QC batches in the background. s.spawn(async { - // TODO: Handle this correctly. - let _ = self.net.gossip.update_batch_qc(ctx).await; - Ok(()) + match self.net.gossip.run_batch_qc_finder(ctx).await { + Err(ctx::Error::Canceled(_)) => Ok(()), + other => other, + } }); // Fetch missing batches in the background. diff --git a/node/actors/network/src/tests.rs b/node/actors/network/src/tests.rs index 239074c7f..22d325e35 100644 --- a/node/actors/network/src/tests.rs +++ b/node/actors/network/src/tests.rs @@ -33,6 +33,7 @@ async fn test_metrics() { let mut encoded_metrics = String::new(); registry.encode(&mut encoded_metrics, vise::Format::OpenMetricsForPrometheus)?; tracing::info!("stats =\n{encoded_metrics}"); + Ok(()) }) .await diff --git a/node/libs/roles/src/attester/conv.rs b/node/libs/roles/src/attester/conv.rs index b101ddab8..e3beec41b 100644 --- a/node/libs/roles/src/attester/conv.rs +++ b/node/libs/roles/src/attester/conv.rs @@ -1,6 +1,6 @@ use super::{ - AggregateSignature, Batch, BatchNumber, BatchQC, Msg, MsgHash, MultiSig, PublicKey, Signature, - Signed, Signers, SyncBatch, WeightedAttester, + AggregateSignature, Batch, BatchHash, BatchNumber, BatchQC, Msg, MsgHash, MultiSig, PublicKey, + Signature, Signed, Signers, SyncBatch, WeightedAttester, }; use crate::{ proto::attester::{self as proto, Attestation}, @@ -11,16 +11,30 @@ use zksync_consensus_crypto::ByteFmt; use zksync_consensus_utils::enum_util::Variant; use zksync_protobuf::{read_map, read_required, required, ProtoFmt}; +impl ProtoFmt for BatchHash { + type Proto = proto::BatchHash; + fn read(r: &Self::Proto) -> anyhow::Result { + Ok(Self(ByteFmt::decode(required(&r.keccak256)?)?)) + } + fn build(&self) -> Self::Proto { + Self::Proto { + keccak256: Some(self.0.encode()), + } + } +} + impl ProtoFmt for Batch { type Proto = proto::Batch; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self { - number: BatchNumber(*required(&r.number).context("proposal")?), + hash: read_required(&r.hash).context("hash")?, + number: BatchNumber(*required(&r.number).context("number")?), }) } fn build(&self) -> Self::Proto { Self::Proto { number: Some(self.number.0), + hash: Some(self.hash.build()), } } } diff --git a/node/libs/roles/src/attester/keys/secret_key.rs b/node/libs/roles/src/attester/keys/secret_key.rs index e6c1c2785..2e70302e2 100644 --- a/node/libs/roles/src/attester/keys/secret_key.rs +++ b/node/libs/roles/src/attester/keys/secret_key.rs @@ -1,5 +1,5 @@ use super::{PublicKey, Signature}; -use crate::attester::{Batch, Msg, MsgHash, Signed}; +use crate::attester::{Msg, MsgHash, Signed}; use std::{fmt, sync::Arc}; use zksync_consensus_crypto::{secp256k1, ByteFmt, Text, TextFmt}; use zksync_consensus_utils::enum_util::Variant; @@ -22,7 +22,7 @@ impl SecretKey { } /// Signs a batch message. - pub fn sign_msg(&self, msg: Batch) -> Signed + pub fn sign_msg(&self, msg: V) -> Signed where V: Variant, { diff --git a/node/libs/roles/src/attester/messages/batch.rs b/node/libs/roles/src/attester/messages/batch.rs index 4e38ea624..67d7e56cc 100644 --- a/node/libs/roles/src/attester/messages/batch.rs +++ b/node/libs/roles/src/attester/messages/batch.rs @@ -4,6 +4,7 @@ use crate::{ validator::{Genesis, Payload}, }; use anyhow::{ensure, Context as _}; +use zksync_consensus_crypto::{keccak256::Keccak256, ByteFmt, Text, TextFmt}; use zksync_consensus_utils::enum_util::Variant; /// A batch of L2 blocks used for the peers to fetch and keep in sync. @@ -52,13 +53,34 @@ impl std::ops::Add for BatchNumber { } } +/// Hash of the L1 batch. +#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct BatchHash(pub Keccak256); + +impl TextFmt for BatchHash { + fn decode(text: Text) -> anyhow::Result { + text.strip("batch:keccak256:")?.decode_hex().map(Self) + } + + fn encode(&self) -> String { + format!("batch:keccak256:{}", hex::encode(ByteFmt::encode(&self.0))) + } +} + +impl std::fmt::Debug for BatchHash { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fmt.write_str(&TextFmt::encode(self)) + } +} + /// A message containing information about a batch of blocks. /// It is signed by the attesters and then propagated through the gossip network. #[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd)] pub struct Batch { /// Header of the batch. pub number: BatchNumber, - // TODO: Hash of the batch. + /// Hash of the batch. + pub hash: BatchHash, } /// A certificate for a batch of L2 blocks to be sent to L1. @@ -112,11 +134,11 @@ pub enum BatchQCAddError { impl BatchQC { /// Create a new empty instance for a given `Batch` message. - pub fn new(message: Batch) -> anyhow::Result { - Ok(Self { + pub fn new(message: Batch) -> Self { + Self { message, signatures: attester::MultiSig::default(), - }) + } } /// Add a attester's signature. diff --git a/node/libs/roles/src/attester/messages/msg.rs b/node/libs/roles/src/attester/messages/msg.rs index 2cc118ca7..cdf8b1e66 100644 --- a/node/libs/roles/src/attester/messages/msg.rs +++ b/node/libs/roles/src/attester/messages/msg.rs @@ -14,6 +14,9 @@ pub enum Msg { impl Msg { /// Returns the hash of the message. + /// + /// TODO: Eventually this should be a hash over an ABI encoded payload that + /// can be verified in Solidity. pub fn hash(&self) -> MsgHash { MsgHash(keccak256::Keccak256::new(&zksync_protobuf::canonical(self))) } @@ -151,12 +154,18 @@ impl Committee { threshold(self.total_weight()) } + /// Compute the sum of weights of as a list of public keys. + /// + /// The method assumes that the keys are unique and does not de-duplicate. + pub fn weight(&self, key: &attester::PublicKey) -> Option { + self.index(key).map(|i| self.vec[i].weight) + } + /// Compute the sum of weights of as a list of public keys. /// /// The method assumes that the keys are unique and does not de-duplicate. pub fn weight_of_keys<'a>(&self, keys: impl Iterator) -> u64 { - keys.filter_map(|pk| self.index(pk).map(|i| self.vec[i].weight)) - .sum() + keys.filter_map(|key| self.weight(key)).sum() } /// Compute the sum of weights of signers given as a bit vector. @@ -262,5 +271,8 @@ pub struct WeightedAttester { /// Attester key pub key: attester::PublicKey, /// Attester weight inside the Committee. - pub weight: u64, + pub weight: Weight, } + +/// Voting weight. +pub type Weight = u64; diff --git a/node/libs/roles/src/attester/testonly.rs b/node/libs/roles/src/attester/testonly.rs index 940f34f1a..b4e53f798 100644 --- a/node/libs/roles/src/attester/testonly.rs +++ b/node/libs/roles/src/attester/testonly.rs @@ -1,6 +1,7 @@ use super::{ - AggregateMultiSig, AggregateSignature, Batch, BatchNumber, BatchQC, Committee, Msg, MsgHash, - MultiSig, PublicKey, SecretKey, Signature, Signed, Signers, SyncBatch, WeightedAttester, + AggregateMultiSig, AggregateSignature, Batch, BatchHash, BatchNumber, BatchQC, Committee, Msg, + MsgHash, MultiSig, PublicKey, SecretKey, Signature, Signed, Signers, SyncBatch, + WeightedAttester, }; use crate::validator::Payload; use bit_vec::BitVec; @@ -48,7 +49,10 @@ impl Distribution for Standard { impl Distribution for Standard { fn sample(&self, rng: &mut R) -> Batch { - Batch { number: rng.gen() } + Batch { + number: rng.gen(), + hash: rng.gen(), + } } } @@ -69,15 +73,17 @@ impl Distribution for Standard { } } +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> BatchHash { + BatchHash(rng.gen()) + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> BatchQC { - let mut signatures = MultiSig::default(); - for _ in 0..rng.gen_range(0..5) { - signatures.add(rng.gen(), rng.gen()); - } BatchQC { message: rng.gen(), - signatures, + signatures: rng.gen(), } } } @@ -100,6 +106,16 @@ impl Distribution for Standard { } } +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> MultiSig { + let mut sig = MultiSig::default(); + for _ in 0..rng.gen_range(0..5) { + sig.add(rng.gen(), rng.gen()); + } + sig + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> AggregateSignature { AggregateSignature(rng.gen()) @@ -115,7 +131,11 @@ impl Distribution for Standard { } } -impl> Distribution> for Standard { +impl Distribution> for Standard +where + V: Variant, + Standard: Distribution, +{ fn sample(&self, rng: &mut R) -> Signed { rng.gen::().sign_msg(rng.gen()) } diff --git a/node/libs/roles/src/attester/tests.rs b/node/libs/roles/src/attester/tests.rs index c167b9626..b7d2f4c04 100644 --- a/node/libs/roles/src/attester/tests.rs +++ b/node/libs/roles/src/attester/tests.rs @@ -135,7 +135,7 @@ fn test_agg_signature_verify() { } fn make_batch_msg(rng: &mut impl Rng) -> Batch { - Batch { number: rng.gen() } + rng.gen() } #[test] @@ -169,7 +169,7 @@ fn test_batch_qc() { // Create QCs with increasing number of attesters. for i in 0..setup1.attester_keys.len() + 1 { - let mut qc = BatchQC::new(make_batch_msg(rng)).unwrap(); + let mut qc = BatchQC::new(make_batch_msg(rng)); for key in &setup1.attester_keys[0..i] { qc.add(&key.sign_msg(qc.message.clone()), &setup1.genesis) .unwrap(); @@ -202,7 +202,7 @@ fn test_attester_committee_weights() { let sums = [1000, 1600, 2400, 8400, 9300, 10000]; let msg = make_batch_msg(rng); - let mut qc = BatchQC::new(msg.clone()).unwrap(); + let mut qc = BatchQC::new(msg.clone()); for (n, weight) in sums.iter().enumerate() { let key = &setup.attester_keys[n]; qc.add(&key.sign_msg(msg.clone()), &setup.genesis).unwrap(); diff --git a/node/libs/roles/src/proto/attester.proto b/node/libs/roles/src/proto/attester.proto index efdea6ffb..387433531 100644 --- a/node/libs/roles/src/proto/attester.proto +++ b/node/libs/roles/src/proto/attester.proto @@ -10,8 +10,14 @@ message SyncBatch { optional bytes proof = 3; // required } +message BatchHash { + optional bytes keccak256 = 1; // required +} + + message Batch { optional uint64 number = 1; // required + optional BatchHash hash = 2; // required } message BatchQC { diff --git a/node/libs/roles/src/validator/messages/consensus.rs b/node/libs/roles/src/validator/messages/consensus.rs index 2056a380a..717387900 100644 --- a/node/libs/roles/src/validator/messages/consensus.rs +++ b/node/libs/roles/src/validator/messages/consensus.rs @@ -527,5 +527,8 @@ pub struct WeightedValidator { /// Validator key pub key: validator::PublicKey, /// Validator weight inside the Committee. - pub weight: u64, + pub weight: Weight, } + +/// Voting weight; +pub type Weight = u64; diff --git a/node/libs/storage/src/batch_store.rs b/node/libs/storage/src/batch_store.rs index 898a94abf..beda81d6d 100644 --- a/node/libs/storage/src/batch_store.rs +++ b/node/libs/storage/src/batch_store.rs @@ -257,6 +257,19 @@ impl BatchStore { Ok(()) } + /// Wait until the database has a batch, then attach the corresponding QC. + pub async fn queue_batch_qc(&self, ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()> { + // The `store_qc` implementation in `zksync-era` retries the insertion of the QC if the payload + // isn't yet available, but to be safe we can wait for the availability signal here as well. + sync::wait_for(ctx, &mut self.persistent.persisted(), |persisted| { + qc.message.number < persisted.next() + }) + .await?; + // Now it's definitely safe to store it. + self.persistent.store_qc(ctx, qc).await?; + Ok(()) + } + /// Waits until the given batch is queued (in memory, or persisted). /// Note that it doesn't mean that the batch is actually available, as old batches might get pruned. pub async fn wait_until_queued( diff --git a/node/libs/storage/src/testonly/mod.rs b/node/libs/storage/src/testonly/mod.rs index b30021808..1c11e024d 100644 --- a/node/libs/storage/src/testonly/mod.rs +++ b/node/libs/storage/src/testonly/mod.rs @@ -40,6 +40,10 @@ pub struct TestMemoryStorage { pub batches: Arc, /// In-memory storage runner. pub runner: TestMemoryStorageRunner, + /// The in-memory block store representing the persistent store. + pub im_blocks: in_memory::BlockStore, + /// The in-memory batch store representing the persistent store. + pub im_batches: in_memory::BatchStore, } /// Test-only memory storage runner wrapping both block and batch store runners. @@ -74,14 +78,7 @@ impl TestMemoryStorageRunner { impl TestMemoryStorage { /// Constructs a new in-memory store for both blocks and batches with their respective runners. pub async fn new(ctx: &ctx::Ctx, genesis: &validator::Genesis) -> Self { - let (blocks, blocks_runner) = new_store(ctx, genesis).await; - let (batches, batches_runner) = new_batch_store(ctx).await; - let runner = TestMemoryStorageRunner::new(blocks_runner, batches_runner).await; - Self { - blocks, - batches, - runner, - } + Self::new_store_with_first_block(ctx, genesis, genesis.first_block).await } /// Constructs a new in-memory store with a custom expected first block @@ -91,50 +88,37 @@ impl TestMemoryStorage { genesis: &validator::Genesis, first: validator::BlockNumber, ) -> Self { - let (blocks, blocks_runner) = new_store_with_first(ctx, genesis, first).await; - let (batches, batches_runner) = new_batch_store(ctx).await; + let im_blocks = in_memory::BlockStore::new(genesis.clone(), first); + let im_batches = in_memory::BatchStore::new(attester::BatchNumber(0)); + Self::new_with_im(ctx, im_blocks, im_batches).await + } + + /// Constructs a new in-memory store for both blocks and batches with their respective runners. + async fn new_with_im( + ctx: &ctx::Ctx, + im_blocks: in_memory::BlockStore, + im_batches: in_memory::BatchStore, + ) -> Self { + let (blocks, blocks_runner) = BlockStore::new(ctx, Box::new(im_blocks.clone())) + .await + .unwrap(); + + let (batches, batches_runner) = BatchStore::new(ctx, Box::new(im_batches.clone())) + .await + .unwrap(); + let runner = TestMemoryStorageRunner::new(blocks_runner, batches_runner).await; + Self { blocks, batches, runner, + im_blocks, + im_batches, } } } -/// Constructs a new in-memory store. -async fn new_store( - ctx: &ctx::Ctx, - genesis: &validator::Genesis, -) -> (Arc, BlockStoreRunner) { - new_store_with_first(ctx, genesis, genesis.first_block).await -} - -/// Constructs a new in-memory batch store. -async fn new_batch_store(ctx: &ctx::Ctx) -> (Arc, BatchStoreRunner) { - BatchStore::new( - ctx, - Box::new(in_memory::BatchStore::new(attester::BatchNumber(0))), - ) - .await - .unwrap() -} - -/// Constructs a new in-memory store with a custom expected first block -/// (i.e. possibly different than `genesis.fork.first_block`). -async fn new_store_with_first( - ctx: &ctx::Ctx, - genesis: &validator::Genesis, - first: validator::BlockNumber, -) -> (Arc, BlockStoreRunner) { - BlockStore::new( - ctx, - Box::new(in_memory::BlockStore::new(genesis.clone(), first)), - ) - .await - .unwrap() -} - /// Dumps all the blocks stored in `store`. pub async fn dump(ctx: &ctx::Ctx, store: &dyn PersistentBlockStore) -> Vec { let genesis = store.genesis(ctx).await.unwrap();