Skip to content

Commit

Permalink
feat: Poll the main node for the next batch to sign (BFT-496) (#2544)
Browse files Browse the repository at this point in the history
## What ❔

Injects an `AttestationStatusWatch` into the `Executor` which on the
main node is backed by an `AttestationStatusRunner` polling the
`BatchStore::next_batch_to_attest` and external nodes call the
`MainNodeApi::fetch_attestation_status`.

TODO: 
- [x] Rebase after #2539
is merged
- [x] Update the `era-consensus` dependency once
matter-labs/era-consensus#161 is merged and
`0.1.0-rc.5` is published

### Test failure - pruning the main node

The following test never finished:
```shell
zk test rust -p zksync_node_consensus tests::test_with_pruning::case_0 --no-capture
```
A little extra logging revealed that the `AttesterStatusRunner` never
gets initialised, because the blocks get pruned earlier than it could
read the result, ie. `ConsensusDal::batch_of_block(genesis.first_block)`
probably returns `None` because it's not found, and never will because
it's pruned.

We discussed in #2480 that
the main node is not supposed to be pruned, which is why the SQL that
looks for what to attest on doesn't look for pruning records any more.
Yet this test prunes the main node, and if I just try to prune the
external node instead it panics because it doesn't have that block yet.

Either the SQL should take pruning into account after all, or we have to
figure out a way to wait in the test until the main node executor is
running, or change the test to prune the external node; I did the
latter.
 
## Why ❔

This is coordinating attesters to cast their votes on the batch which
the main node tells them to do to produce a quorum certificate for all
L1 batches without gaps.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
  • Loading branch information
aakoshh authored Aug 1, 2024
1 parent 4d38356 commit 22cf820
Show file tree
Hide file tree
Showing 12 changed files with 176 additions and 99 deletions.
41 changes: 21 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,16 @@ zk_evm_1_4_1 = { package = "zk_evm", version = "0.141.0" }
zk_evm_1_5_0 = { package = "zk_evm", version = "0.150.0" }

# Consensus dependencies.
zksync_concurrency = "=0.1.0-rc.4"
zksync_consensus_bft = "=0.1.0-rc.4"
zksync_consensus_crypto = "=0.1.0-rc.4"
zksync_consensus_executor = "=0.1.0-rc.4"
zksync_consensus_network = "=0.1.0-rc.4"
zksync_consensus_roles = "=0.1.0-rc.4"
zksync_consensus_storage = "=0.1.0-rc.4"
zksync_consensus_utils = "=0.1.0-rc.4"
zksync_protobuf = "=0.1.0-rc.4"
zksync_protobuf_build = "=0.1.0-rc.4"
zksync_concurrency = "=0.1.0-rc.5"
zksync_consensus_bft = "=0.1.0-rc.5"
zksync_consensus_crypto = "=0.1.0-rc.5"
zksync_consensus_executor = "=0.1.0-rc.5"
zksync_consensus_network = "=0.1.0-rc.5"
zksync_consensus_roles = "=0.1.0-rc.5"
zksync_consensus_storage = "=0.1.0-rc.5"
zksync_consensus_utils = "=0.1.0-rc.5"
zksync_protobuf = "=0.1.0-rc.5"
zksync_protobuf_build = "=0.1.0-rc.5"

# "Local" dependencies
zksync_multivm = { version = "0.1.0", path = "core/lib/multivm" }
Expand Down
1 change: 1 addition & 0 deletions core/lib/dal/src/consensus_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ impl ConsensusDal<'_, '_> {
}
.await?
else {
tracing::info!(%genesis.first_block, "genesis block not found");
return Ok(None);
};
Ok(Some(AttestationStatus {
Expand Down
1 change: 1 addition & 0 deletions core/node/consensus/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,6 @@ pub(super) fn executor(
rpc,
// TODO: Add to configuration
debug_page: None,
batch_poll_interval: time::Duration::seconds(1),
})
}
54 changes: 53 additions & 1 deletion core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use anyhow::Context as _;
use async_trait::async_trait;
use zksync_concurrency::{ctx, error::Wrap as _, scope, time};
use zksync_consensus_executor as executor;
use zksync_consensus_executor::{
self as executor,
attestation::{AttestationStatusClient, AttestationStatusRunner},
};
use zksync_consensus_network::gossip;
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{BatchStore, BlockStore};
use zksync_dal::consensus_dal;
use zksync_node_sync::{
fetcher::FetchedBlock, sync_action::ActionQueueSender, MainNodeClient, SyncState,
};
Expand Down Expand Up @@ -47,6 +53,7 @@ impl EN {

// Initialize genesis.
let genesis = self.fetch_genesis(ctx).await.wrap("fetch_genesis()")?;
let genesis_hash = genesis.hash();
let mut conn = self.pool.connection(ctx).await.wrap("connection()")?;

conn.try_update_genesis(ctx, &genesis)
Expand Down Expand Up @@ -99,6 +106,18 @@ impl EN {
.wrap("BatchStore::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });

let (attestation_status, runner) = {
AttestationStatusRunner::init(
ctx,
Box::new(MainNodeAttestationStatus(self.client.clone())),
time::Duration::seconds(5),
genesis_hash,
)
.await
.wrap("AttestationStatusRunner::init()")?
};
s.spawn_bg(async { Ok(runner.run(ctx).await?) });

let executor = executor::Executor {
config: config::executor(&cfg, &secrets)?,
block_store,
Expand All @@ -111,7 +130,9 @@ impl EN {
payload_manager: Box::new(store.clone()),
}),
attester,
attestation_status,
};
tracing::info!("running the external node executor");
executor.run(ctx).await?;

Ok(())
Expand Down Expand Up @@ -239,3 +260,34 @@ impl EN {
Ok(())
}
}

/// Wrapper to call [MainNodeClient::fetch_attestation_status] and adapt the return value to [AttestationStatusClient].
struct MainNodeAttestationStatus(Box<DynClient<L2>>);

#[async_trait]
impl AttestationStatusClient for MainNodeAttestationStatus {
async fn attestation_status(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<gossip::AttestationStatus>> {
match ctx.wait(self.0.fetch_attestation_status()).await? {
Ok(Some(status)) => {
// If this fails the AttestationStatusRunner will log it an retry it later,
// but it won't stop the whole node.
let status: consensus_dal::AttestationStatus =
zksync_protobuf::serde::deserialize(&status.0)
.context("deserialize(AttestationStatus")?;
let status = gossip::AttestationStatus {
genesis: status.genesis,
next_batch_to_attest: status.next_batch_to_attest,
};
Ok(Some(status))
}
Ok(None) => Ok(None),
Err(err) => {
tracing::warn!("AttestationStatus call to main node HTTP RPC failed: {err}");
Ok(None)
}
}
}
}
19 changes: 17 additions & 2 deletions core/node/consensus/src/mn.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, scope};
use zksync_concurrency::{ctx, error::Wrap as _, scope, time};
use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets};
use zksync_consensus_executor::{self as executor, Attester};
use zksync_consensus_executor::{self as executor, attestation::AttestationStatusRunner, Attester};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{BatchStore, BlockStore};

Expand Down Expand Up @@ -61,6 +61,18 @@ pub async fn run_main_node(
.wrap("BatchStore::new()")?;
s.spawn_bg(runner.run(ctx));

let (attestation_status, runner) = {
AttestationStatusRunner::init_from_store(
ctx,
batch_store.clone(),
time::Duration::seconds(1),
block_store.genesis().hash(),
)
.await
.wrap("AttestationStatusRunner::init_from_store()")?
};
s.spawn_bg(runner.run(ctx));

let executor = executor::Executor {
config: config::executor(&cfg, &secrets)?,
block_store,
Expand All @@ -71,7 +83,10 @@ pub async fn run_main_node(
payload_manager: Box::new(store.clone()),
}),
attester,
attestation_status,
};

tracing::info!("running the main node executor");
executor.run(ctx).await
})
.await
Expand Down
11 changes: 11 additions & 0 deletions core/node/consensus/src/storage/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,4 +435,15 @@ impl<'a> Connection<'a> {
last,
})
}

/// Wrapper for `consensus_dal().attestation_status()`.
pub async fn attestation_status(
&mut self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<consensus_dal::AttestationStatus>> {
Ok(ctx
.wait(self.0.consensus_dal().attestation_status())
.await?
.context("attestation_status()")?)
}
}
59 changes: 19 additions & 40 deletions core/node/consensus/src/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,44 +523,18 @@ impl storage::PersistentBatchStore for Store {
self.batches_persisted.clone()
}

/// Get the earliest L1 batch number which has to be signed by attesters.
async fn earliest_batch_number_to_sign(
/// Get the next L1 batch number which has to be signed by attesters.
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
// This is the rough roadmap of how this logic will evolve:
// 1. Make best effort at gossiping and collecting votes; the `BatchVotes` in consensus only considers the last vote per attesters.
// Still, we can re-sign more than the last batch, anticipating step 2.
// 2. Ask the Main Node what is the earliest batch number that it still expects votes for (ie. what is the last submission + 1).
// 3. Change `BatchVotes` to handle multiple pending batch numbers, anticipating that batch intervals might decrease dramatically.
// 4. Once QC is required to submit to L1, Look at L1 to figure out what is the last submission, and sign after that.

// Originally this method returned all unsigned batch numbers by doing a DAL query, but we decided it should be okay and cheap
// to resend signatures for already signed batches, and we don't have to worry about skipping them. Because of that, we also
// didn't think it makes sense to query the database for the earliest unsigned batch *after* the submission, because we might
// as well just re-sign everything. Until we have a way to argue about the "last submission" we just re-sign the last 10 to
// try to produce as many QCs as the voting register allows, within reason.

// The latest decision is not to store batches with gaps between in the database *of the main node*.
// Once we have an API to serve to external nodes the earliest number the main node wants them to sign,
// we can get rid of this method: on the main node we can sign from what `last_batch_qc` returns, and
// while external nodes we can go from whatever the API returned.

const NUM_BATCHES_TO_SIGN: u64 = 10;

let Some(last_batch_number) = self
Ok(self
.conn(ctx)
.await?
.get_last_batch_number(ctx)
.attestation_status(ctx)
.await
.wrap("get_last_batch_number")?
else {
return Ok(None);
};

Ok(Some(attester::BatchNumber(
last_batch_number.0.saturating_sub(NUM_BATCHES_TO_SIGN),
)))
.wrap("next_batch_to_attest")?
.map(|s| s.next_batch_to_attest))
}

/// Get the L1 batch QC from storage with the highest number.
Expand Down Expand Up @@ -603,16 +577,21 @@ impl storage::PersistentBatchStore for Store {
ctx: &ctx::Ctx,
number: attester::BatchNumber,
) -> ctx::Result<Option<attester::Batch>> {
let Some(hash) = self
.conn(ctx)
.await?
.batch_hash(ctx, number)
.await
.wrap("batch_hash()")?
else {
let mut conn = self.conn(ctx).await?;

let Some(hash) = conn.batch_hash(ctx, number).await.wrap("batch_hash()")? else {
return Ok(None);
};
Ok(Some(attester::Batch { number, hash }))

let Some(genesis) = conn.genesis(ctx).await.wrap("genesis()")? else {
return Ok(None);
};

Ok(Some(attester::Batch {
number,
hash,
genesis: genesis.hash(),
}))
}

/// Returns the QC of the batch with the given number.
Expand Down
Loading

0 comments on commit 22cf820

Please sign in to comment.