Skip to content

Commit

Permalink
feat: Collect votes for batch QC and save into store (BFT-477) (#145)
Browse files Browse the repository at this point in the history
## What ❔

- [x] Added `BatchHash` and `Batch::hash` to give meaning to the
attestation
- [x] Refactored `BatchVotes` to look for a quorum over a height+hash
combo
- [x] Added a background task to listen to changes in the votes, look
for a quorum and save it to the DB
- [x] Test the quorum logic
- [x] Test the propagation

### Caveat

`BatchVotes` still only allows 1 vote per attester and thus doesn't
fully handle multiple outstanding L1 batches. That will come later when
we'll have subscribed to notifications about the highest finalised batch
number, coming from the main node API which doesn't exist yet. That will
serve as our beacon for what kind of ranges of votes to accept.
Currently for defensive reasons we only accept one vote per attester,
which means if attesters keep voting on different heights, always out of
sync with each other, they might never form a quorum. That is considered
okay at this point; we save what we can and see how it works. 1 minute
batches should be long enough not to cause trouble for gossip.

## Why ❔

With these changes the node will react to votes coming in over gossip
and forming a quorum over the latest L1 batch.
  • Loading branch information
aakoshh authored Jul 8, 2024
1 parent 92fe49c commit 96e5e69
Show file tree
Hide file tree
Showing 16 changed files with 491 additions and 164 deletions.
151 changes: 140 additions & 11 deletions node/actors/network/src/gossip/batch_votes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,46 @@ use std::{collections::HashSet, sync::Arc};
use zksync_concurrency::sync;
use zksync_consensus_roles::attester::{self, Batch};

/// Mapping from attester::PublicKey to a signed attester::Batch message.
/// Represents the currents state of node's knowledge about the attester votes.
///
/// Eventually this data structure will have to track voting potentially happening
/// simultaneously on multiple heights, if we decrease the batch interval to be
/// several seconds, instead of a minute. By that point, the replicas should be
/// following the main node (or L1) to know what is the highest finalized batch,
/// which will act as a floor to the batch number we have to track here. It will
/// also help to protect ourselves from DoS attacks by malicious attesters casting
/// votes far into the future.
///
/// For now, however, we just want a best effort where if we find a quorum, we
/// save it to the database, if not, we move on. For that, a simple protection
/// mechanism is to only allow one active vote per attester, which means any
/// previous vote can be removed when a new one is added.
#[derive(Clone, Default, PartialEq, Eq)]
pub(crate) struct BatchVotes(
pub(super) im::HashMap<attester::PublicKey, Arc<attester::Signed<Batch>>>,
);
pub(crate) struct BatchVotes {
/// The latest vote received from each attester. We only keep the last one
/// for now, hoping that with 1 minute batches there's plenty of time for
/// the quorum to be reached, but eventually we'll have to allow multiple
/// votes across different heights.
pub(crate) votes: im::HashMap<attester::PublicKey, Arc<attester::Signed<Batch>>>,

/// Total weight of votes at different heights and hashes.
///
/// We will be looking for any hash that reaches a quorum threshold at any of the heights.
/// At that point we can remove all earlier heights, considering it final. In the future
/// we can instead keep heights until they are observed on the main node (or L1).
pub(crate) support:
im::OrdMap<attester::BatchNumber, im::HashMap<attester::BatchHash, attester::Weight>>,

/// The minimum batch number for which we are still interested in votes.
pub(crate) min_batch_number: attester::BatchNumber,
}

impl BatchVotes {
/// Returns a set of entries of `self` which are newer than the entries in `b`.
/// Returns a set of votes of `self` which are newer than the entries in `b`.
pub(super) fn get_newer(&self, b: &Self) -> Vec<Arc<attester::Signed<Batch>>> {
let mut newer = vec![];
for (k, v) in &self.0 {
if let Some(bv) = b.0.get(k) {
for (k, v) in &self.votes {
if let Some(bv) = b.votes.get(k) {
if v.msg <= bv.msg {
continue;
}
Expand Down Expand Up @@ -47,23 +74,119 @@ impl BatchVotes {
anyhow::bail!("duplicate entry for {:?}", d.key);
}
done.insert(d.key.clone());
if !attesters.contains(&d.key) {

if d.msg.number < self.min_batch_number {
continue;
}

let Some(weight) = attesters.weight(&d.key) else {
// We just skip the entries we are not interested in.
// For now the set of attesters is static, so we could treat this as an error,
// however we eventually want the attester set to be dynamic.
continue;
}
if let Some(x) = self.0.get(&d.key) {
};

// If we already have a newer vote for this key, we can ignore this one.
if let Some(x) = self.votes.get(&d.key) {
if d.msg <= x.msg {
continue;
}
}

// Check the signature before insertion.
d.verify()?;
self.0.insert(d.key.clone(), d.clone());

self.add(d.clone(), weight);

changed = true;
}
Ok(changed)
}

/// Check if we have achieved quorum for any of the batch hashes.
///
/// The return value is a vector because eventually we will be potentially waiting for
/// quorums on multiple heights simultaneously.
///
/// For repeated queries we can supply a skip list of heights for which we already saved the QC.
pub(super) fn find_quorums(
&self,
attesters: &attester::Committee,
skip: impl Fn(attester::BatchNumber) -> bool,
) -> Vec<attester::BatchQC> {
let threshold = attesters.threshold();
self.support
.iter()
.filter(|(number, _)| !skip(**number))
.flat_map(|(number, candidates)| {
candidates
.iter()
.filter(|(_, weight)| **weight >= threshold)
.map(|(hash, _)| {
let sigs = self
.votes
.values()
.filter(|vote| vote.msg.hash == *hash)
.map(|vote| (vote.key.clone(), vote.sig.clone()))
.fold(attester::MultiSig::default(), |mut sigs, (key, sig)| {
sigs.add(key, sig);
sigs
});
attester::BatchQC {
message: Batch {
number: *number,
hash: *hash,
},
signatures: sigs,
}
})
})
.collect()
}

/// Set the minimum batch number for which we admit votes.
///
/// Discards data about earlier heights.
pub(super) fn set_min_batch_number(&mut self, min_batch_number: attester::BatchNumber) {
self.min_batch_number = min_batch_number;
self.votes.retain(|_, v| v.msg.number >= min_batch_number);
if let Some(prev) = min_batch_number.prev() {
let (_, support) = self.support.split(&prev);
self.support = support;
}
}

/// Add an already validated vote from an attester into the register.
fn add(&mut self, vote: Arc<attester::Signed<Batch>>, weight: attester::Weight) {
self.remove(&vote.key, weight);

let batch = self.support.entry(vote.msg.number).or_default();
let support = batch.entry(vote.msg.hash).or_default();
*support = support.saturating_add(weight);

self.votes.insert(vote.key.clone(), vote);
}

/// Remove any existing vote.
///
/// This is for DoS protection, until we have better control over the acceptable vote range.
fn remove(&mut self, key: &attester::PublicKey, weight: attester::Weight) {
let Some(vote) = self.votes.remove(key) else {
return;
};

let batch = self.support.entry(vote.msg.number).or_default();
let support = batch.entry(vote.msg.hash).or_default();
*support = support.saturating_sub(weight);

if *support == 0u64 {
batch.remove(&vote.msg.hash);
}

if batch.is_empty() {
self.support.remove(&vote.msg.number);
}
}
}

/// Watch wrapper of BatchVotes,
Expand Down Expand Up @@ -99,6 +222,12 @@ impl BatchVotesWatch {
}
Ok(())
}

/// Set the minimum batch number on the votes and discard old data.
pub(crate) async fn set_min_batch_number(&self, min_batch_number: attester::BatchNumber) {
let this = self.0.lock().await;
this.send_modify(|votes| votes.set_min_batch_number(min_batch_number));
}
}

/// Wrapper around [BatchVotesWatch] to publish votes over batches signed by an attester key.
Expand Down
86 changes: 26 additions & 60 deletions node/actors/network/src/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ use self::batch_votes::BatchVotesWatch;
use crate::{gossip::ValidatorAddrsWatch, io, pool::PoolWatch, Config, MeteredStreamStats};
use anyhow::Context as _;
use fetch::RequestItem;
use im::HashMap;
use std::sync::{atomic::AtomicUsize, Arc};
pub(crate) use validator_addrs::*;
use zksync_concurrency::{ctx, ctx::channel, scope, sync};
use zksync_consensus_roles::{attester, node, validator};
use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::{BatchStore, BlockStore};

mod batch_votes;
Expand Down Expand Up @@ -57,10 +56,6 @@ pub(crate) struct Network {
///
/// These are blocks that this node wants to request from remote peers via RPC.
pub(crate) fetch_queue: fetch::Queue,
/// Last viewed QC.
pub(crate) last_viewed_qc: Option<attester::BatchQC>,
/// L1 batch qc.
pub(crate) batch_qc: HashMap<attester::BatchNumber, attester::BatchQC>,
/// TESTONLY: how many time push_validator_addrs rpc was called by the peers.
pub(crate) push_validator_addrs_calls: AtomicUsize,
}
Expand All @@ -82,8 +77,6 @@ impl Network {
outbound: PoolWatch::new(cfg.gossip.static_outbound.keys().cloned().collect(), 0),
validator_addrs: ValidatorAddrsWatch::default(),
batch_votes: Arc::new(BatchVotesWatch::default()),
batch_qc: HashMap::new(),
last_viewed_qc: None,
cfg,
fetch_queue: fetch::Queue::default(),
block_store,
Expand Down Expand Up @@ -155,63 +148,36 @@ impl Network {
.await;
}

/// Task that keeps hearing about new votes and updates the L1 batch qc.
/// Task that keeps hearing about new votes and looks for an L1 batch qc.
/// It will propagate the QC if there's enough votes.
pub(crate) async fn update_batch_qc(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> {
// TODO This is not a good way to do this, we shouldn't be verifying the QC every time
// Can we get only the latest votes?
let attesters = self.genesis().attesters.as_ref().context("attesters")?;
pub(crate) async fn run_batch_qc_finder(&self, ctx: &ctx::Ctx) -> ctx::Result<()> {
let Some(attesters) = self.genesis().attesters.as_ref() else {
return Ok(());
};
let mut sub = self.batch_votes.subscribe();
loop {
let mut sub = self.batch_votes.subscribe();
let votes = sync::changed(ctx, &mut sub)
.await
.context("batch votes")?
.clone();

// Check next QC to collect votes for.
let new_qc = self
.last_viewed_qc
.clone()
.map(|qc| {
attester::BatchQC::new(attester::Batch {
number: qc.message.number.next(),
})
})
.unwrap_or_else(|| {
attester::BatchQC::new(attester::Batch {
number: attester::BatchNumber(0),
})
})
.context("new qc")?;

// Check votes for the correct QC.
for (_, sig) in votes.0 {
if self
.batch_qc
.clone()
.entry(new_qc.message.number)
.or_insert_with(|| attester::BatchQC::new(new_qc.message.clone()).expect("qc"))
.add(&sig, self.genesis())
.is_err()
{
// TODO: Should we ban the peer somehow?
continue;
}
}
// In the future when we might be gossiping about multiple batches at the same time,
// we can collect the ones we submitted into a skip list until we see them confirmed
// on L1 and we can finally increase the minimum as well.
let quorums = {
let votes = sync::changed(ctx, &mut sub).await?;
votes.find_quorums(attesters, |_| false)
};

let weight = attesters.weight_of_keys(
self.batch_qc
.get(&new_qc.message.number)
.context("last qc")?
.signatures
.keys(),
);
for qc in quorums {
// In the future this should come from confirmations, but for now it's best effort, so we can forget ASAP.
// TODO: An initial value could be looked up in the database even now.
let next_batch_number = qc.message.number.next();

if weight < attesters.threshold() {
return Ok(());
};
self.batch_store
.queue_batch_qc(ctx, qc)
.await
.context("queue_batch_qc")?;

// If we have enough weight, we can update the last viewed QC and propagate it.
self.batch_votes
.set_min_batch_number(next_batch_number)
.await;
}
}
}
}
12 changes: 12 additions & 0 deletions node/actors/network/src/gossip/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ impl rpc::Handler<rpc::push_validator_addrs::Rpc> for PushValidatorAddrsServer<'
}
}

/// Receive the snapshot of known batch votes from a remote peer.
///
/// The server receives the *diff* from remote peers, which it merges into the common register.
struct PushBatchVotesServer<'a>(&'a Network);

#[async_trait::async_trait]
Expand All @@ -53,8 +56,10 @@ impl rpc::Handler<rpc::push_batch_votes::Rpc> for PushBatchVotesServer<'_> {
}
}

/// Represents what we know about the state of available blocks on the remote peer.
struct PushBlockStoreStateServer<'a> {
state: sync::watch::Sender<BlockStoreState>,
/// The network is required for the verification of messages.
net: &'a Network,
}

Expand All @@ -71,11 +76,13 @@ impl<'a> PushBlockStoreStateServer<'a> {
}
}

/// Represents what we know about the state of available batches on the remote peer.
struct PushBatchStoreStateServer {
state: sync::watch::Sender<BatchStoreState>,
}

impl PushBatchStoreStateServer {
/// Start out not knowing anything about the remote peer.
fn new() -> Self {
Self {
state: sync::watch::channel(BatchStoreState {
Expand Down Expand Up @@ -194,6 +201,8 @@ impl Network {
self.cfg.rpc.get_batch_rate,
)
.add_server(ctx, rpc::ping::Server, rpc::ping::RATE);

// If there is an attester committee then
if self.genesis().attesters.as_ref().is_some() {
let push_signature_client = rpc::Client::<rpc::push_batch_votes::Rpc>::new(
ctx,
Expand All @@ -208,11 +217,14 @@ impl Network {
// Push L1 batch votes updates to peer.
s.spawn::<()>(async {
let push_signature_client = push_signature_client;
// Snapshot of the batches when we last pushed to the peer.
let mut old = BatchVotes::default();
// Subscribe to what we know about the state of the whole network.
let mut sub = self.batch_votes.subscribe();
sub.mark_changed();
loop {
let new = sync::changed(ctx, &mut sub).await?.clone();
// Get the *new* votes, which haven't been pushed before.
let diff = new.get_newer(&old);
if diff.is_empty() {
continue;
Expand Down
Loading

0 comments on commit 96e5e69

Please sign in to comment.