From 96e5e692c064ad254dc5f7ec7b6ecdefdeb30f58 Mon Sep 17 00:00:00 2001 From: Akosh Farkash Date: Mon, 8 Jul 2024 17:46:23 +0100 Subject: [PATCH] feat: Collect votes for batch QC and save into store (BFT-477) (#145) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ - [x] Added `BatchHash` and `Batch::hash` to give meaning to the attestation - [x] Refactored `BatchVotes` to look for a quorum over a height+hash combo - [x] Added a background task to listen to changes in the votes, look for a quorum and save it to the DB - [x] Test the quorum logic - [x] Test the propagation ### Caveat `BatchVotes` still only allows 1 vote per attester and thus doesn't fully handle multiple outstanding L1 batches. That will come later when we'll have subscribed to notifications about the highest finalised batch number, coming from the main node API which doesn't exist yet. That will serve as our beacon for what kind of ranges of votes to accept. Currently for defensive reasons we only accept one vote per attester, which means if attesters keep voting on different heights, always out of sync with each other, they might never form a quorum. That is considered okay at this point; we save what we can and see how it works. 1 minute batches should be long enough not to cause trouble for gossip. ## Why ❔ With these changes the node will react to votes coming in over gossip and forming a quorum over the latest L1 batch. --- node/actors/network/src/gossip/batch_votes.rs | 151 +++++++++++++- node/actors/network/src/gossip/mod.rs | 86 +++----- node/actors/network/src/gossip/runner.rs | 12 ++ node/actors/network/src/gossip/tests/mod.rs | 188 ++++++++++++++++-- node/actors/network/src/lib.rs | 7 +- node/actors/network/src/tests.rs | 1 + node/libs/roles/src/attester/conv.rs | 20 +- .../roles/src/attester/keys/secret_key.rs | 4 +- .../libs/roles/src/attester/messages/batch.rs | 30 ++- node/libs/roles/src/attester/messages/msg.rs | 18 +- node/libs/roles/src/attester/testonly.rs | 38 +++- node/libs/roles/src/attester/tests.rs | 6 +- node/libs/roles/src/proto/attester.proto | 6 + .../roles/src/validator/messages/consensus.rs | 5 +- node/libs/storage/src/batch_store.rs | 13 ++ node/libs/storage/src/testonly/mod.rs | 70 +++---- 16 files changed, 491 insertions(+), 164 deletions(-) diff --git a/node/actors/network/src/gossip/batch_votes.rs b/node/actors/network/src/gossip/batch_votes.rs index 4bf07656..cc934f67 100644 --- a/node/actors/network/src/gossip/batch_votes.rs +++ b/node/actors/network/src/gossip/batch_votes.rs @@ -4,19 +4,46 @@ use std::{collections::HashSet, sync::Arc}; use zksync_concurrency::sync; use zksync_consensus_roles::attester::{self, Batch}; -/// Mapping from attester::PublicKey to a signed attester::Batch message. /// Represents the currents state of node's knowledge about the attester votes. +/// +/// Eventually this data structure will have to track voting potentially happening +/// simultaneously on multiple heights, if we decrease the batch interval to be +/// several seconds, instead of a minute. By that point, the replicas should be +/// following the main node (or L1) to know what is the highest finalized batch, +/// which will act as a floor to the batch number we have to track here. It will +/// also help to protect ourselves from DoS attacks by malicious attesters casting +/// votes far into the future. +/// +/// For now, however, we just want a best effort where if we find a quorum, we +/// save it to the database, if not, we move on. For that, a simple protection +/// mechanism is to only allow one active vote per attester, which means any +/// previous vote can be removed when a new one is added. #[derive(Clone, Default, PartialEq, Eq)] -pub(crate) struct BatchVotes( - pub(super) im::HashMap>>, -); +pub(crate) struct BatchVotes { + /// The latest vote received from each attester. We only keep the last one + /// for now, hoping that with 1 minute batches there's plenty of time for + /// the quorum to be reached, but eventually we'll have to allow multiple + /// votes across different heights. + pub(crate) votes: im::HashMap>>, + + /// Total weight of votes at different heights and hashes. + /// + /// We will be looking for any hash that reaches a quorum threshold at any of the heights. + /// At that point we can remove all earlier heights, considering it final. In the future + /// we can instead keep heights until they are observed on the main node (or L1). + pub(crate) support: + im::OrdMap>, + + /// The minimum batch number for which we are still interested in votes. + pub(crate) min_batch_number: attester::BatchNumber, +} impl BatchVotes { - /// Returns a set of entries of `self` which are newer than the entries in `b`. + /// Returns a set of votes of `self` which are newer than the entries in `b`. pub(super) fn get_newer(&self, b: &Self) -> Vec>> { let mut newer = vec![]; - for (k, v) in &self.0 { - if let Some(bv) = b.0.get(k) { + for (k, v) in &self.votes { + if let Some(bv) = b.votes.get(k) { if v.msg <= bv.msg { continue; } @@ -47,23 +74,119 @@ impl BatchVotes { anyhow::bail!("duplicate entry for {:?}", d.key); } done.insert(d.key.clone()); - if !attesters.contains(&d.key) { + + if d.msg.number < self.min_batch_number { + continue; + } + + let Some(weight) = attesters.weight(&d.key) else { // We just skip the entries we are not interested in. // For now the set of attesters is static, so we could treat this as an error, // however we eventually want the attester set to be dynamic. continue; - } - if let Some(x) = self.0.get(&d.key) { + }; + + // If we already have a newer vote for this key, we can ignore this one. + if let Some(x) = self.votes.get(&d.key) { if d.msg <= x.msg { continue; } } + + // Check the signature before insertion. d.verify()?; - self.0.insert(d.key.clone(), d.clone()); + + self.add(d.clone(), weight); + changed = true; } Ok(changed) } + + /// Check if we have achieved quorum for any of the batch hashes. + /// + /// The return value is a vector because eventually we will be potentially waiting for + /// quorums on multiple heights simultaneously. + /// + /// For repeated queries we can supply a skip list of heights for which we already saved the QC. + pub(super) fn find_quorums( + &self, + attesters: &attester::Committee, + skip: impl Fn(attester::BatchNumber) -> bool, + ) -> Vec { + let threshold = attesters.threshold(); + self.support + .iter() + .filter(|(number, _)| !skip(**number)) + .flat_map(|(number, candidates)| { + candidates + .iter() + .filter(|(_, weight)| **weight >= threshold) + .map(|(hash, _)| { + let sigs = self + .votes + .values() + .filter(|vote| vote.msg.hash == *hash) + .map(|vote| (vote.key.clone(), vote.sig.clone())) + .fold(attester::MultiSig::default(), |mut sigs, (key, sig)| { + sigs.add(key, sig); + sigs + }); + attester::BatchQC { + message: Batch { + number: *number, + hash: *hash, + }, + signatures: sigs, + } + }) + }) + .collect() + } + + /// Set the minimum batch number for which we admit votes. + /// + /// Discards data about earlier heights. + pub(super) fn set_min_batch_number(&mut self, min_batch_number: attester::BatchNumber) { + self.min_batch_number = min_batch_number; + self.votes.retain(|_, v| v.msg.number >= min_batch_number); + if let Some(prev) = min_batch_number.prev() { + let (_, support) = self.support.split(&prev); + self.support = support; + } + } + + /// Add an already validated vote from an attester into the register. + fn add(&mut self, vote: Arc>, weight: attester::Weight) { + self.remove(&vote.key, weight); + + let batch = self.support.entry(vote.msg.number).or_default(); + let support = batch.entry(vote.msg.hash).or_default(); + *support = support.saturating_add(weight); + + self.votes.insert(vote.key.clone(), vote); + } + + /// Remove any existing vote. + /// + /// This is for DoS protection, until we have better control over the acceptable vote range. + fn remove(&mut self, key: &attester::PublicKey, weight: attester::Weight) { + let Some(vote) = self.votes.remove(key) else { + return; + }; + + let batch = self.support.entry(vote.msg.number).or_default(); + let support = batch.entry(vote.msg.hash).or_default(); + *support = support.saturating_sub(weight); + + if *support == 0u64 { + batch.remove(&vote.msg.hash); + } + + if batch.is_empty() { + self.support.remove(&vote.msg.number); + } + } } /// Watch wrapper of BatchVotes, @@ -99,6 +222,12 @@ impl BatchVotesWatch { } Ok(()) } + + /// Set the minimum batch number on the votes and discard old data. + pub(crate) async fn set_min_batch_number(&self, min_batch_number: attester::BatchNumber) { + let this = self.0.lock().await; + this.send_modify(|votes| votes.set_min_batch_number(min_batch_number)); + } } /// Wrapper around [BatchVotesWatch] to publish votes over batches signed by an attester key. diff --git a/node/actors/network/src/gossip/mod.rs b/node/actors/network/src/gossip/mod.rs index 905c7806..b0d74a4e 100644 --- a/node/actors/network/src/gossip/mod.rs +++ b/node/actors/network/src/gossip/mod.rs @@ -17,11 +17,10 @@ use self::batch_votes::BatchVotesWatch; use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats}; use anyhow::Context as _; use fetch::RequestItem; -use im::HashMap; use std::sync::{atomic::AtomicUsize, Arc}; pub(crate) use validator_addrs::*; use zksync_concurrency::{ctx, ctx::channel, scope, sync}; -use zksync_consensus_roles::{attester, node, validator}; +use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; mod batch_votes; @@ -57,10 +56,6 @@ pub(crate) struct Network { /// /// These are blocks that this node wants to request from remote peers via RPC. pub(crate) fetch_queue: fetch::Queue, - /// Last viewed QC. - pub(crate) last_viewed_qc: Option, - /// L1 batch qc. - pub(crate) batch_qc: HashMap, /// TESTONLY: how many time push_validator_addrs rpc was called by the peers. pub(crate) push_validator_addrs_calls: AtomicUsize, } @@ -82,8 +77,6 @@ impl Network { outbound: PoolWatch::new(cfg.gossip.static_outbound.keys().cloned().collect(), 0), validator_addrs: ValidatorAddrsWatch::default(), batch_votes: Arc::new(BatchVotesWatch::default()), - batch_qc: HashMap::new(), - last_viewed_qc: None, cfg, fetch_queue: fetch::Queue::default(), block_store, @@ -155,63 +148,36 @@ impl Network { .await; } - /// Task that keeps hearing about new votes and updates the L1 batch qc. + /// Task that keeps hearing about new votes and looks for an L1 batch qc. /// It will propagate the QC if there's enough votes. - pub(crate) async fn update_batch_qc(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { - // TODO This is not a good way to do this, we shouldn't be verifying the QC every time - // Can we get only the latest votes? - let attesters = self.genesis().attesters.as_ref().context("attesters")?; + pub(crate) async fn run_batch_qc_finder(&self, ctx: &ctx::Ctx) -> ctx::Result<()> { + let Some(attesters) = self.genesis().attesters.as_ref() else { + return Ok(()); + }; + let mut sub = self.batch_votes.subscribe(); loop { - let mut sub = self.batch_votes.subscribe(); - let votes = sync::changed(ctx, &mut sub) - .await - .context("batch votes")? - .clone(); - - // Check next QC to collect votes for. - let new_qc = self - .last_viewed_qc - .clone() - .map(|qc| { - attester::BatchQC::new(attester::Batch { - number: qc.message.number.next(), - }) - }) - .unwrap_or_else(|| { - attester::BatchQC::new(attester::Batch { - number: attester::BatchNumber(0), - }) - }) - .context("new qc")?; - - // Check votes for the correct QC. - for (_, sig) in votes.0 { - if self - .batch_qc - .clone() - .entry(new_qc.message.number) - .or_insert_with(|| attester::BatchQC::new(new_qc.message.clone()).expect("qc")) - .add(&sig, self.genesis()) - .is_err() - { - // TODO: Should we ban the peer somehow? - continue; - } - } + // In the future when we might be gossiping about multiple batches at the same time, + // we can collect the ones we submitted into a skip list until we see them confirmed + // on L1 and we can finally increase the minimum as well. + let quorums = { + let votes = sync::changed(ctx, &mut sub).await?; + votes.find_quorums(attesters, |_| false) + }; - let weight = attesters.weight_of_keys( - self.batch_qc - .get(&new_qc.message.number) - .context("last qc")? - .signatures - .keys(), - ); + for qc in quorums { + // In the future this should come from confirmations, but for now it's best effort, so we can forget ASAP. + // TODO: An initial value could be looked up in the database even now. + let next_batch_number = qc.message.number.next(); - if weight < attesters.threshold() { - return Ok(()); - }; + self.batch_store + .queue_batch_qc(ctx, qc) + .await + .context("queue_batch_qc")?; - // If we have enough weight, we can update the last viewed QC and propagate it. + self.batch_votes + .set_min_batch_number(next_batch_number) + .await; + } } } } diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index 6156a257..60765c46 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -32,6 +32,9 @@ impl rpc::Handler for PushValidatorAddrsServer<' } } +/// Receive the snapshot of known batch votes from a remote peer. +/// +/// The server receives the *diff* from remote peers, which it merges into the common register. struct PushBatchVotesServer<'a>(&'a Network); #[async_trait::async_trait] @@ -53,8 +56,10 @@ impl rpc::Handler for PushBatchVotesServer<'_> { } } +/// Represents what we know about the state of available blocks on the remote peer. struct PushBlockStoreStateServer<'a> { state: sync::watch::Sender, + /// The network is required for the verification of messages. net: &'a Network, } @@ -71,11 +76,13 @@ impl<'a> PushBlockStoreStateServer<'a> { } } +/// Represents what we know about the state of available batches on the remote peer. struct PushBatchStoreStateServer { state: sync::watch::Sender, } impl PushBatchStoreStateServer { + /// Start out not knowing anything about the remote peer. fn new() -> Self { Self { state: sync::watch::channel(BatchStoreState { @@ -194,6 +201,8 @@ impl Network { self.cfg.rpc.get_batch_rate, ) .add_server(ctx, rpc::ping::Server, rpc::ping::RATE); + + // If there is an attester committee then if self.genesis().attesters.as_ref().is_some() { let push_signature_client = rpc::Client::::new( ctx, @@ -208,11 +217,14 @@ impl Network { // Push L1 batch votes updates to peer. s.spawn::<()>(async { let push_signature_client = push_signature_client; + // Snapshot of the batches when we last pushed to the peer. let mut old = BatchVotes::default(); + // Subscribe to what we know about the state of the whole network. let mut sub = self.batch_votes.subscribe(); sub.mark_changed(); loop { let new = sync::changed(ctx, &mut sub).await?.clone(); + // Get the *new* votes, which haven't been pushed before. let diff = new.get_newer(&old); if diff.is_empty() { continue; diff --git a/node/actors/network/src/gossip/tests/mod.rs b/node/actors/network/src/gossip/tests/mod.rs index 8e3e1526..837d7fc2 100644 --- a/node/actors/network/src/gossip/tests/mod.rs +++ b/node/actors/network/src/gossip/tests/mod.rs @@ -1,6 +1,10 @@ use super::ValidatorAddrs; use crate::{ - gossip::{batch_votes::BatchVotesWatch, handshake, validator_addrs::ValidatorAddrsWatch}, + gossip::{ + batch_votes::{BatchVotes, BatchVotesWatch}, + handshake, + validator_addrs::ValidatorAddrsWatch, + }, metrics, preface, rpc, testonly, }; use anyhow::Context as _; @@ -18,7 +22,7 @@ use zksync_concurrency::{ time, }; use zksync_consensus_roles::{attester, validator}; -use zksync_consensus_storage::testonly::TestMemoryStorage; +use zksync_consensus_storage::{testonly::TestMemoryStorage, PersistentBatchStore}; mod fetch_batches; mod fetch_blocks; @@ -107,11 +111,14 @@ fn mk_netaddr( } fn mk_batch( - _rng: &mut R, + rng: &mut R, key: &attester::SecretKey, number: attester::BatchNumber, ) -> attester::Signed { - key.sign_msg(attester::Batch { number }) + key.sign_msg(attester::Batch { + number, + hash: rng.gen(), + }) } fn random_netaddr( @@ -132,6 +139,7 @@ fn random_batch_vote( ) -> Arc> { let batch = attester::Batch { number: attester::BatchNumber(rng.gen_range(0..1000)), + hash: rng.gen(), }; Arc::new(key.sign_msg(batch.to_owned())) } @@ -152,13 +160,14 @@ fn update_netaddr( } fn update_signature( - _rng: &mut R, + rng: &mut R, batch: &attester::Batch, key: &attester::SecretKey, batch_number_diff: i64, ) -> Arc> { let batch = attester::Batch { number: attester::BatchNumber((batch.number.0 as i64 + batch_number_diff) as u64), + hash: rng.gen(), }; Arc::new(key.sign_msg(batch.to_owned())) } @@ -549,23 +558,25 @@ async fn test_batch_votes() { want.insert(random_batch_vote(rng, k)); } votes.update(&attesters, &want.as_vec()).await.unwrap(); - assert_eq!(want.0, sub.borrow_and_update().0); + assert_eq!(want.0, sub.borrow_and_update().votes); - // newer batch number + // newer batch number, should be updated let k0v2 = update_signature(rng, &want.get(&keys[0]).msg, &keys[0], 1); - // same batch number + // same batch number, should be ignored let k1v2 = update_signature(rng, &want.get(&keys[1]).msg, &keys[1], 0); - // older batch number + // older batch number, should be ignored let k4v2 = update_signature(rng, &want.get(&keys[4]).msg, &keys[4], -1); - // first entry for a key in the config + // first entry for a key in the config, should be inserted let k6v1 = random_batch_vote(rng, &keys[6]); - // entry for a key outside of the config + // entry for a key outside of the config, should be ignored let k8 = rng.gen(); let k8v1 = random_batch_vote(rng, &k8); + // Update the ones we expect to succeed want.insert(k0v2.clone()); - want.insert(k1v2.clone()); want.insert(k6v1.clone()); + + // Send all of them to the votes let update = [ k0v2, k1v2, @@ -576,9 +587,9 @@ async fn test_batch_votes() { k8v1.clone(), ]; votes.update(&attesters, &update).await.unwrap(); - assert_eq!(want.0, sub.borrow_and_update().0); + assert_eq!(want.0, sub.borrow_and_update().votes); - // Invalid signature. + // Invalid signature, should be rejected. let mut k0v3 = mk_batch( rng, &keys[1], @@ -586,18 +597,151 @@ async fn test_batch_votes() { ); k0v3.key = keys[0].public(); assert!(votes.update(&attesters, &[Arc::new(k0v3)]).await.is_err()); - assert_eq!(want.0, sub.borrow_and_update().0); + assert_eq!(want.0, sub.borrow_and_update().votes); - // Duplicate entry in the update. + // Duplicate entry in the update, should be rejected. assert!(votes .update(&attesters, &[k8v1.clone(), k8v1]) .await .is_err()); - assert_eq!(want.0, sub.borrow_and_update().0); + assert_eq!(want.0, sub.borrow_and_update().votes); +} + +#[test] +fn test_batch_votes_quorum() { + abort_on_panic(); + let rng = &mut ctx::test_root(&ctx::RealClock).rng(); + + for _ in 0..10 { + let size = rng.gen_range(1..20); + let keys: Vec = (0..size).map(|_| rng.gen()).collect(); + let attesters = attester::Committee::new(keys.iter().map(|k| attester::WeightedAttester { + key: k.public(), + weight: rng.gen_range(1..=100), + })) + .unwrap(); + + let batch0 = rng.gen::(); + let batch1 = attester::Batch { + number: batch0.number.next(), + hash: rng.gen(), + }; + let mut batches = [(batch0, 0u64), (batch1, 0u64)]; + + let mut votes = BatchVotes::default(); + for sk in &keys { + // We need 4/5+1 for quorum, so let's say ~80% vote on the second batch. + let b = usize::from(rng.gen_range(0..100) < 80); + let batch = &batches[b].0; + let vote = sk.sign_msg(batch.clone()); + votes.update(&attesters, &[Arc::new(vote)]).unwrap(); + batches[b].1 += attesters.weight(&sk.public()).unwrap(); + + // Check that as soon as we have quorum it's found. + if batches[b].1 >= attesters.threshold() { + let qs = votes.find_quorums(&attesters, |_| false); + assert!(!qs.is_empty(), "should find quorum"); + assert!(qs[0].message == *batch); + assert!(qs[0].signatures.keys().count() > 0); + } + } + + if let Some(quorum) = batches + .iter() + .find(|b| b.1 >= attesters.threshold()) + .map(|(b, _)| b) + { + // Check that a quorum can be skipped + assert!(votes + .find_quorums(&attesters, |b| b == quorum.number) + .is_empty()); + } else { + // Check that if there was no quoroum then we don't find any. + assert!(votes.find_quorums(&attesters, |_| false).is_empty()); + } + + // Check that the minimum batch number prunes data. + let last_batch = batches[1].0.number; + + votes.set_min_batch_number(last_batch); + assert!(votes.votes.values().all(|v| v.msg.number >= last_batch)); + assert!(votes.support.keys().all(|n| *n >= last_batch)); + + votes.set_min_batch_number(last_batch.next()); + assert!(votes.votes.is_empty()); + assert!(votes.support.is_empty()); + } } -// TODO: This test is disabled because the logic for attesters to receive and sign batches is not implemented yet. -// It should be re-enabled once the logic is implemented. -// #[tokio::test(flavor = "multi_thread")] -// async fn test_batch_votes_propagation() { -// } +// +#[tokio::test(flavor = "multi_thread")] +async fn test_batch_votes_propagation() { + let _guard = set_timeout(time::Duration::seconds(10)); + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::AffineClock::new(10.)); + let rng = &mut ctx.rng(); + + let mut setup = validator::testonly::Setup::new(rng, 10); + + let cfgs = testonly::new_configs(rng, &setup, 1); + + scope::run!(ctx, |ctx, s| async { + // All nodes share the same in-memory store + let store = TestMemoryStorage::new(ctx, &setup.genesis).await; + s.spawn_bg(store.runner.run(ctx)); + + // Push the first batch into the store so we have something to vote on. + setup.push_blocks(rng, 2); + setup.push_batch(rng); + + let batch = setup.batches[0].clone(); + + store + .batches + .queue_batch(ctx, batch.clone(), setup.genesis.clone()) + .await + .unwrap(); + + let batch = attester::Batch { + number: batch.number, + hash: rng.gen(), + }; + + // Start all nodes. + let nodes: Vec = cfgs + .iter() + .enumerate() + .map(|(i, cfg)| { + let (node, runner) = testonly::Instance::new( + cfg.clone(), + store.blocks.clone(), + store.batches.clone(), + ); + s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); + node + }) + .collect(); + + // Cast votes on each node. It doesn't matter who signs where in this test, + // we just happen to know that we'll have as many nodes as attesters. + let attesters = setup.genesis.attesters.as_ref().unwrap(); + for (node, key) in nodes.iter().zip(setup.attester_keys.iter()) { + node.net + .batch_vote_publisher() + .publish(attesters, key, batch.clone()) + .await + .unwrap(); + } + + // Wait until one of the nodes collects a quorum over gossip and stores. + loop { + if let Some(qc) = store.im_batches.get_batch_qc(ctx, batch.number).await? { + assert_eq!(qc.message, batch); + return Ok(()); + } + ctx.sleep(time::Duration::milliseconds(100)).await?; + } + }) + .await + .unwrap(); +} diff --git a/node/actors/network/src/lib.rs b/node/actors/network/src/lib.rs index 7373b484..349ee5e2 100644 --- a/node/actors/network/src/lib.rs +++ b/node/actors/network/src/lib.rs @@ -132,9 +132,10 @@ impl Runner { // Update QC batches in the background. s.spawn(async { - // TODO: Handle this correctly. - let _ = self.net.gossip.update_batch_qc(ctx).await; - Ok(()) + match self.net.gossip.run_batch_qc_finder(ctx).await { + Err(ctx::Error::Canceled(_)) => Ok(()), + other => other, + } }); // Fetch missing batches in the background. diff --git a/node/actors/network/src/tests.rs b/node/actors/network/src/tests.rs index 239074c7..22d325e3 100644 --- a/node/actors/network/src/tests.rs +++ b/node/actors/network/src/tests.rs @@ -33,6 +33,7 @@ async fn test_metrics() { let mut encoded_metrics = String::new(); registry.encode(&mut encoded_metrics, vise::Format::OpenMetricsForPrometheus)?; tracing::info!("stats =\n{encoded_metrics}"); + Ok(()) }) .await diff --git a/node/libs/roles/src/attester/conv.rs b/node/libs/roles/src/attester/conv.rs index b101ddab..e3beec41 100644 --- a/node/libs/roles/src/attester/conv.rs +++ b/node/libs/roles/src/attester/conv.rs @@ -1,6 +1,6 @@ use super::{ - AggregateSignature, Batch, BatchNumber, BatchQC, Msg, MsgHash, MultiSig, PublicKey, Signature, - Signed, Signers, SyncBatch, WeightedAttester, + AggregateSignature, Batch, BatchHash, BatchNumber, BatchQC, Msg, MsgHash, MultiSig, PublicKey, + Signature, Signed, Signers, SyncBatch, WeightedAttester, }; use crate::{ proto::attester::{self as proto, Attestation}, @@ -11,16 +11,30 @@ use zksync_consensus_crypto::ByteFmt; use zksync_consensus_utils::enum_util::Variant; use zksync_protobuf::{read_map, read_required, required, ProtoFmt}; +impl ProtoFmt for BatchHash { + type Proto = proto::BatchHash; + fn read(r: &Self::Proto) -> anyhow::Result { + Ok(Self(ByteFmt::decode(required(&r.keccak256)?)?)) + } + fn build(&self) -> Self::Proto { + Self::Proto { + keccak256: Some(self.0.encode()), + } + } +} + impl ProtoFmt for Batch { type Proto = proto::Batch; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self { - number: BatchNumber(*required(&r.number).context("proposal")?), + hash: read_required(&r.hash).context("hash")?, + number: BatchNumber(*required(&r.number).context("number")?), }) } fn build(&self) -> Self::Proto { Self::Proto { number: Some(self.number.0), + hash: Some(self.hash.build()), } } } diff --git a/node/libs/roles/src/attester/keys/secret_key.rs b/node/libs/roles/src/attester/keys/secret_key.rs index e6c1c278..2e70302e 100644 --- a/node/libs/roles/src/attester/keys/secret_key.rs +++ b/node/libs/roles/src/attester/keys/secret_key.rs @@ -1,5 +1,5 @@ use super::{PublicKey, Signature}; -use crate::attester::{Batch, Msg, MsgHash, Signed}; +use crate::attester::{Msg, MsgHash, Signed}; use std::{fmt, sync::Arc}; use zksync_consensus_crypto::{secp256k1, ByteFmt, Text, TextFmt}; use zksync_consensus_utils::enum_util::Variant; @@ -22,7 +22,7 @@ impl SecretKey { } /// Signs a batch message. - pub fn sign_msg(&self, msg: Batch) -> Signed + pub fn sign_msg(&self, msg: V) -> Signed where V: Variant, { diff --git a/node/libs/roles/src/attester/messages/batch.rs b/node/libs/roles/src/attester/messages/batch.rs index 4e38ea62..67d7e56c 100644 --- a/node/libs/roles/src/attester/messages/batch.rs +++ b/node/libs/roles/src/attester/messages/batch.rs @@ -4,6 +4,7 @@ use crate::{ validator::{Genesis, Payload}, }; use anyhow::{ensure, Context as _}; +use zksync_consensus_crypto::{keccak256::Keccak256, ByteFmt, Text, TextFmt}; use zksync_consensus_utils::enum_util::Variant; /// A batch of L2 blocks used for the peers to fetch and keep in sync. @@ -52,13 +53,34 @@ impl std::ops::Add for BatchNumber { } } +/// Hash of the L1 batch. +#[derive(Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct BatchHash(pub Keccak256); + +impl TextFmt for BatchHash { + fn decode(text: Text) -> anyhow::Result { + text.strip("batch:keccak256:")?.decode_hex().map(Self) + } + + fn encode(&self) -> String { + format!("batch:keccak256:{}", hex::encode(ByteFmt::encode(&self.0))) + } +} + +impl std::fmt::Debug for BatchHash { + fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fmt.write_str(&TextFmt::encode(self)) + } +} + /// A message containing information about a batch of blocks. /// It is signed by the attesters and then propagated through the gossip network. #[derive(Clone, Debug, PartialEq, Eq, Hash, PartialOrd)] pub struct Batch { /// Header of the batch. pub number: BatchNumber, - // TODO: Hash of the batch. + /// Hash of the batch. + pub hash: BatchHash, } /// A certificate for a batch of L2 blocks to be sent to L1. @@ -112,11 +134,11 @@ pub enum BatchQCAddError { impl BatchQC { /// Create a new empty instance for a given `Batch` message. - pub fn new(message: Batch) -> anyhow::Result { - Ok(Self { + pub fn new(message: Batch) -> Self { + Self { message, signatures: attester::MultiSig::default(), - }) + } } /// Add a attester's signature. diff --git a/node/libs/roles/src/attester/messages/msg.rs b/node/libs/roles/src/attester/messages/msg.rs index 2cc118ca..cdf8b1e6 100644 --- a/node/libs/roles/src/attester/messages/msg.rs +++ b/node/libs/roles/src/attester/messages/msg.rs @@ -14,6 +14,9 @@ pub enum Msg { impl Msg { /// Returns the hash of the message. + /// + /// TODO: Eventually this should be a hash over an ABI encoded payload that + /// can be verified in Solidity. pub fn hash(&self) -> MsgHash { MsgHash(keccak256::Keccak256::new(&zksync_protobuf::canonical(self))) } @@ -151,12 +154,18 @@ impl Committee { threshold(self.total_weight()) } + /// Compute the sum of weights of as a list of public keys. + /// + /// The method assumes that the keys are unique and does not de-duplicate. + pub fn weight(&self, key: &attester::PublicKey) -> Option { + self.index(key).map(|i| self.vec[i].weight) + } + /// Compute the sum of weights of as a list of public keys. /// /// The method assumes that the keys are unique and does not de-duplicate. pub fn weight_of_keys<'a>(&self, keys: impl Iterator) -> u64 { - keys.filter_map(|pk| self.index(pk).map(|i| self.vec[i].weight)) - .sum() + keys.filter_map(|key| self.weight(key)).sum() } /// Compute the sum of weights of signers given as a bit vector. @@ -262,5 +271,8 @@ pub struct WeightedAttester { /// Attester key pub key: attester::PublicKey, /// Attester weight inside the Committee. - pub weight: u64, + pub weight: Weight, } + +/// Voting weight. +pub type Weight = u64; diff --git a/node/libs/roles/src/attester/testonly.rs b/node/libs/roles/src/attester/testonly.rs index 940f34f1..b4e53f79 100644 --- a/node/libs/roles/src/attester/testonly.rs +++ b/node/libs/roles/src/attester/testonly.rs @@ -1,6 +1,7 @@ use super::{ - AggregateMultiSig, AggregateSignature, Batch, BatchNumber, BatchQC, Committee, Msg, MsgHash, - MultiSig, PublicKey, SecretKey, Signature, Signed, Signers, SyncBatch, WeightedAttester, + AggregateMultiSig, AggregateSignature, Batch, BatchHash, BatchNumber, BatchQC, Committee, Msg, + MsgHash, MultiSig, PublicKey, SecretKey, Signature, Signed, Signers, SyncBatch, + WeightedAttester, }; use crate::validator::Payload; use bit_vec::BitVec; @@ -48,7 +49,10 @@ impl Distribution for Standard { impl Distribution for Standard { fn sample(&self, rng: &mut R) -> Batch { - Batch { number: rng.gen() } + Batch { + number: rng.gen(), + hash: rng.gen(), + } } } @@ -69,15 +73,17 @@ impl Distribution for Standard { } } +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> BatchHash { + BatchHash(rng.gen()) + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> BatchQC { - let mut signatures = MultiSig::default(); - for _ in 0..rng.gen_range(0..5) { - signatures.add(rng.gen(), rng.gen()); - } BatchQC { message: rng.gen(), - signatures, + signatures: rng.gen(), } } } @@ -100,6 +106,16 @@ impl Distribution for Standard { } } +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> MultiSig { + let mut sig = MultiSig::default(); + for _ in 0..rng.gen_range(0..5) { + sig.add(rng.gen(), rng.gen()); + } + sig + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> AggregateSignature { AggregateSignature(rng.gen()) @@ -115,7 +131,11 @@ impl Distribution for Standard { } } -impl> Distribution> for Standard { +impl Distribution> for Standard +where + V: Variant, + Standard: Distribution, +{ fn sample(&self, rng: &mut R) -> Signed { rng.gen::().sign_msg(rng.gen()) } diff --git a/node/libs/roles/src/attester/tests.rs b/node/libs/roles/src/attester/tests.rs index c167b962..b7d2f4c0 100644 --- a/node/libs/roles/src/attester/tests.rs +++ b/node/libs/roles/src/attester/tests.rs @@ -135,7 +135,7 @@ fn test_agg_signature_verify() { } fn make_batch_msg(rng: &mut impl Rng) -> Batch { - Batch { number: rng.gen() } + rng.gen() } #[test] @@ -169,7 +169,7 @@ fn test_batch_qc() { // Create QCs with increasing number of attesters. for i in 0..setup1.attester_keys.len() + 1 { - let mut qc = BatchQC::new(make_batch_msg(rng)).unwrap(); + let mut qc = BatchQC::new(make_batch_msg(rng)); for key in &setup1.attester_keys[0..i] { qc.add(&key.sign_msg(qc.message.clone()), &setup1.genesis) .unwrap(); @@ -202,7 +202,7 @@ fn test_attester_committee_weights() { let sums = [1000, 1600, 2400, 8400, 9300, 10000]; let msg = make_batch_msg(rng); - let mut qc = BatchQC::new(msg.clone()).unwrap(); + let mut qc = BatchQC::new(msg.clone()); for (n, weight) in sums.iter().enumerate() { let key = &setup.attester_keys[n]; qc.add(&key.sign_msg(msg.clone()), &setup.genesis).unwrap(); diff --git a/node/libs/roles/src/proto/attester.proto b/node/libs/roles/src/proto/attester.proto index efdea6ff..38743353 100644 --- a/node/libs/roles/src/proto/attester.proto +++ b/node/libs/roles/src/proto/attester.proto @@ -10,8 +10,14 @@ message SyncBatch { optional bytes proof = 3; // required } +message BatchHash { + optional bytes keccak256 = 1; // required +} + + message Batch { optional uint64 number = 1; // required + optional BatchHash hash = 2; // required } message BatchQC { diff --git a/node/libs/roles/src/validator/messages/consensus.rs b/node/libs/roles/src/validator/messages/consensus.rs index 2056a380..71738790 100644 --- a/node/libs/roles/src/validator/messages/consensus.rs +++ b/node/libs/roles/src/validator/messages/consensus.rs @@ -527,5 +527,8 @@ pub struct WeightedValidator { /// Validator key pub key: validator::PublicKey, /// Validator weight inside the Committee. - pub weight: u64, + pub weight: Weight, } + +/// Voting weight; +pub type Weight = u64; diff --git a/node/libs/storage/src/batch_store.rs b/node/libs/storage/src/batch_store.rs index 898a94ab..beda81d6 100644 --- a/node/libs/storage/src/batch_store.rs +++ b/node/libs/storage/src/batch_store.rs @@ -257,6 +257,19 @@ impl BatchStore { Ok(()) } + /// Wait until the database has a batch, then attach the corresponding QC. + pub async fn queue_batch_qc(&self, ctx: &ctx::Ctx, qc: attester::BatchQC) -> ctx::Result<()> { + // The `store_qc` implementation in `zksync-era` retries the insertion of the QC if the payload + // isn't yet available, but to be safe we can wait for the availability signal here as well. + sync::wait_for(ctx, &mut self.persistent.persisted(), |persisted| { + qc.message.number < persisted.next() + }) + .await?; + // Now it's definitely safe to store it. + self.persistent.store_qc(ctx, qc).await?; + Ok(()) + } + /// Waits until the given batch is queued (in memory, or persisted). /// Note that it doesn't mean that the batch is actually available, as old batches might get pruned. pub async fn wait_until_queued( diff --git a/node/libs/storage/src/testonly/mod.rs b/node/libs/storage/src/testonly/mod.rs index b3002180..1c11e024 100644 --- a/node/libs/storage/src/testonly/mod.rs +++ b/node/libs/storage/src/testonly/mod.rs @@ -40,6 +40,10 @@ pub struct TestMemoryStorage { pub batches: Arc, /// In-memory storage runner. pub runner: TestMemoryStorageRunner, + /// The in-memory block store representing the persistent store. + pub im_blocks: in_memory::BlockStore, + /// The in-memory batch store representing the persistent store. + pub im_batches: in_memory::BatchStore, } /// Test-only memory storage runner wrapping both block and batch store runners. @@ -74,14 +78,7 @@ impl TestMemoryStorageRunner { impl TestMemoryStorage { /// Constructs a new in-memory store for both blocks and batches with their respective runners. pub async fn new(ctx: &ctx::Ctx, genesis: &validator::Genesis) -> Self { - let (blocks, blocks_runner) = new_store(ctx, genesis).await; - let (batches, batches_runner) = new_batch_store(ctx).await; - let runner = TestMemoryStorageRunner::new(blocks_runner, batches_runner).await; - Self { - blocks, - batches, - runner, - } + Self::new_store_with_first_block(ctx, genesis, genesis.first_block).await } /// Constructs a new in-memory store with a custom expected first block @@ -91,50 +88,37 @@ impl TestMemoryStorage { genesis: &validator::Genesis, first: validator::BlockNumber, ) -> Self { - let (blocks, blocks_runner) = new_store_with_first(ctx, genesis, first).await; - let (batches, batches_runner) = new_batch_store(ctx).await; + let im_blocks = in_memory::BlockStore::new(genesis.clone(), first); + let im_batches = in_memory::BatchStore::new(attester::BatchNumber(0)); + Self::new_with_im(ctx, im_blocks, im_batches).await + } + + /// Constructs a new in-memory store for both blocks and batches with their respective runners. + async fn new_with_im( + ctx: &ctx::Ctx, + im_blocks: in_memory::BlockStore, + im_batches: in_memory::BatchStore, + ) -> Self { + let (blocks, blocks_runner) = BlockStore::new(ctx, Box::new(im_blocks.clone())) + .await + .unwrap(); + + let (batches, batches_runner) = BatchStore::new(ctx, Box::new(im_batches.clone())) + .await + .unwrap(); + let runner = TestMemoryStorageRunner::new(blocks_runner, batches_runner).await; + Self { blocks, batches, runner, + im_blocks, + im_batches, } } } -/// Constructs a new in-memory store. -async fn new_store( - ctx: &ctx::Ctx, - genesis: &validator::Genesis, -) -> (Arc, BlockStoreRunner) { - new_store_with_first(ctx, genesis, genesis.first_block).await -} - -/// Constructs a new in-memory batch store. -async fn new_batch_store(ctx: &ctx::Ctx) -> (Arc, BatchStoreRunner) { - BatchStore::new( - ctx, - Box::new(in_memory::BatchStore::new(attester::BatchNumber(0))), - ) - .await - .unwrap() -} - -/// Constructs a new in-memory store with a custom expected first block -/// (i.e. possibly different than `genesis.fork.first_block`). -async fn new_store_with_first( - ctx: &ctx::Ctx, - genesis: &validator::Genesis, - first: validator::BlockNumber, -) -> (Arc, BlockStoreRunner) { - BlockStore::new( - ctx, - Box::new(in_memory::BlockStore::new(genesis.clone(), first)), - ) - .await - .unwrap() -} - /// Dumps all the blocks stored in `store`. pub async fn dump(ctx: &ctx::Ctx, store: &dyn PersistentBlockStore) -> Vec { let genesis = store.genesis(ctx).await.unwrap();