Skip to content

Commit

Permalink
feat: Poll the main node API for attestation status - relaxed (BFT-49…
Browse files Browse the repository at this point in the history
…6) (#2583)

## What ❔

Reverts #2574 to
re-establish the attestation status API integration, but also bumps the
era-consensus version to
[0.1.0-rc.6](matter-labs/era-consensus#174)
which has changes to make it backwards compatible:
* the `Executor` is started without waiting for the initial attestation
batch number to become available, so as not to stop the node completely
from participating in gossip without the main node API (this behaviour
is internal to the `AttesterStatusRunner`)
* invalid batch vote signatures are not rejected, just ignored, so the
node doesn't break connection with a peer who signed a different payload
while the feature is in flux

TODO: 
- [x] Update once `0.1.0-rc.6` is published to crates.io

## Why ❔

The first reason is to not be so eager to drop a gossip peer for
incompatibilities between newer and existing versions of the software
while we're actively working on the features involved.

The second is that the rollout strategy to mainnet seems to be to roll
out external nodes first, main nodes second. It is expected that the
external node should work with an older version of the main node, or at
least not fail to start; it is okay if a new feature doesn't work.

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.

---------

Co-authored-by: Bruno França <[email protected]>
  • Loading branch information
aakoshh and brunoffranca authored Aug 6, 2024
1 parent 0891244 commit b45aa91
Show file tree
Hide file tree
Showing 12 changed files with 173 additions and 100 deletions.
41 changes: 21 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 10 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -216,16 +216,16 @@ zk_evm_1_4_1 = { package = "zk_evm", version = "0.141.0" }
zk_evm_1_5_0 = { package = "zk_evm", version = "=0.150.0" }

# Consensus dependencies.
zksync_concurrency = "=0.1.0-rc.4"
zksync_consensus_bft = "=0.1.0-rc.4"
zksync_consensus_crypto = "=0.1.0-rc.4"
zksync_consensus_executor = "=0.1.0-rc.4"
zksync_consensus_network = "=0.1.0-rc.4"
zksync_consensus_roles = "=0.1.0-rc.4"
zksync_consensus_storage = "=0.1.0-rc.4"
zksync_consensus_utils = "=0.1.0-rc.4"
zksync_protobuf = "=0.1.0-rc.4"
zksync_protobuf_build = "=0.1.0-rc.4"
zksync_concurrency = "=0.1.0-rc.8"
zksync_consensus_bft = "=0.1.0-rc.8"
zksync_consensus_crypto = "=0.1.0-rc.8"
zksync_consensus_executor = "=0.1.0-rc.8"
zksync_consensus_network = "=0.1.0-rc.8"
zksync_consensus_roles = "=0.1.0-rc.8"
zksync_consensus_storage = "=0.1.0-rc.8"
zksync_consensus_utils = "=0.1.0-rc.8"
zksync_protobuf = "=0.1.0-rc.8"
zksync_protobuf_build = "=0.1.0-rc.8"

# "Local" dependencies
zksync_multivm = { version = "0.1.0", path = "core/lib/multivm" }
Expand Down
1 change: 1 addition & 0 deletions core/lib/dal/src/consensus_dal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ impl ConsensusDal<'_, '_> {
}
.await?
else {
tracing::info!(%genesis.first_block, "genesis block not found");
return Ok(None);
};
Ok(Some(AttestationStatus {
Expand Down
1 change: 1 addition & 0 deletions core/node/consensus/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,6 @@ pub(super) fn executor(
rpc,
// TODO: Add to configuration
debug_page: None,
batch_poll_interval: time::Duration::seconds(1),
})
}
52 changes: 50 additions & 2 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use anyhow::Context as _;
use async_trait::async_trait;
use zksync_concurrency::{ctx, error::Wrap as _, scope, time};
use zksync_consensus_executor as executor;
use zksync_consensus_roles::validator;
use zksync_consensus_executor::{
self as executor,
attestation::{AttestationStatusClient, AttestationStatusRunner},
};
use zksync_consensus_roles::{attester, validator};
use zksync_consensus_storage::{BatchStore, BlockStore};
use zksync_dal::consensus_dal;
use zksync_node_sync::{
fetcher::FetchedBlock, sync_action::ActionQueueSender, MainNodeClient, SyncState,
};
Expand Down Expand Up @@ -47,6 +52,7 @@ impl EN {

// Initialize genesis.
let genesis = self.fetch_genesis(ctx).await.wrap("fetch_genesis()")?;
let genesis_hash = genesis.hash();
let mut conn = self.pool.connection(ctx).await.wrap("connection()")?;

conn.try_update_genesis(ctx, &genesis)
Expand Down Expand Up @@ -99,6 +105,18 @@ impl EN {
.wrap("BatchStore::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });

let (attestation_status, runner) = {
AttestationStatusRunner::init(
ctx,
Box::new(MainNodeAttestationStatus(self.client.clone())),
time::Duration::seconds(5),
genesis_hash,
)
.await
.wrap("AttestationStatusRunner::init()")?
};
s.spawn_bg(async { Ok(runner.run(ctx).await?) });

let executor = executor::Executor {
config: config::executor(&cfg, &secrets)?,
block_store,
Expand All @@ -111,7 +129,9 @@ impl EN {
payload_manager: Box::new(store.clone()),
}),
attester,
attestation_status,
};
tracing::info!("running the external node executor");
executor.run(ctx).await?;

Ok(())
Expand Down Expand Up @@ -239,3 +259,31 @@ impl EN {
Ok(())
}
}

/// Wrapper to call [MainNodeClient::fetch_attestation_status] and adapt the return value to [AttestationStatusClient].
struct MainNodeAttestationStatus(Box<DynClient<L2>>);

#[async_trait]
impl AttestationStatusClient for MainNodeAttestationStatus {
async fn attestation_status(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<(attester::GenesisHash, attester::BatchNumber)>> {
match ctx.wait(self.0.fetch_attestation_status()).await? {
Ok(Some(status)) => {
// If this fails the AttestationStatusRunner will log it an retry it later,
// but it won't stop the whole node.
let status: consensus_dal::AttestationStatus =
zksync_protobuf::serde::deserialize(&status.0)
.context("deserialize(AttestationStatus")?;

Ok(Some((status.genesis, status.next_batch_to_attest)))
}
Ok(None) => Ok(None),
Err(err) => {
tracing::warn!("AttestationStatus call to main node HTTP RPC failed: {err}");
Ok(None)
}
}
}
}
19 changes: 17 additions & 2 deletions core/node/consensus/src/mn.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, scope};
use zksync_concurrency::{ctx, error::Wrap as _, scope, time};
use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets};
use zksync_consensus_executor::{self as executor, Attester};
use zksync_consensus_executor::{self as executor, attestation::AttestationStatusRunner, Attester};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{BatchStore, BlockStore};

Expand Down Expand Up @@ -61,6 +61,18 @@ pub async fn run_main_node(
.wrap("BatchStore::new()")?;
s.spawn_bg(runner.run(ctx));

let (attestation_status, runner) = {
AttestationStatusRunner::init_from_store(
ctx,
batch_store.clone(),
time::Duration::seconds(1),
block_store.genesis().hash(),
)
.await
.wrap("AttestationStatusRunner::init_from_store()")?
};
s.spawn_bg(runner.run(ctx));

let executor = executor::Executor {
config: config::executor(&cfg, &secrets)?,
block_store,
Expand All @@ -71,7 +83,10 @@ pub async fn run_main_node(
payload_manager: Box::new(store.clone()),
}),
attester,
attestation_status,
};

tracing::info!("running the main node executor");
executor.run(ctx).await
})
.await
Expand Down
11 changes: 11 additions & 0 deletions core/node/consensus/src/storage/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,4 +435,15 @@ impl<'a> Connection<'a> {
last,
})
}

/// Wrapper for `consensus_dal().attestation_status()`.
pub async fn attestation_status(
&mut self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<consensus_dal::AttestationStatus>> {
Ok(ctx
.wait(self.0.consensus_dal().attestation_status())
.await?
.context("attestation_status()")?)
}
}
59 changes: 19 additions & 40 deletions core/node/consensus/src/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -523,44 +523,18 @@ impl storage::PersistentBatchStore for Store {
self.batches_persisted.clone()
}

/// Get the earliest L1 batch number which has to be signed by attesters.
async fn earliest_batch_number_to_sign(
/// Get the next L1 batch number which has to be signed by attesters.
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
// This is the rough roadmap of how this logic will evolve:
// 1. Make best effort at gossiping and collecting votes; the `BatchVotes` in consensus only considers the last vote per attesters.
// Still, we can re-sign more than the last batch, anticipating step 2.
// 2. Ask the Main Node what is the earliest batch number that it still expects votes for (ie. what is the last submission + 1).
// 3. Change `BatchVotes` to handle multiple pending batch numbers, anticipating that batch intervals might decrease dramatically.
// 4. Once QC is required to submit to L1, Look at L1 to figure out what is the last submission, and sign after that.

// Originally this method returned all unsigned batch numbers by doing a DAL query, but we decided it should be okay and cheap
// to resend signatures for already signed batches, and we don't have to worry about skipping them. Because of that, we also
// didn't think it makes sense to query the database for the earliest unsigned batch *after* the submission, because we might
// as well just re-sign everything. Until we have a way to argue about the "last submission" we just re-sign the last 10 to
// try to produce as many QCs as the voting register allows, within reason.

// The latest decision is not to store batches with gaps between in the database *of the main node*.
// Once we have an API to serve to external nodes the earliest number the main node wants them to sign,
// we can get rid of this method: on the main node we can sign from what `last_batch_qc` returns, and
// while external nodes we can go from whatever the API returned.

const NUM_BATCHES_TO_SIGN: u64 = 10;

let Some(last_batch_number) = self
Ok(self
.conn(ctx)
.await?
.get_last_batch_number(ctx)
.attestation_status(ctx)
.await
.wrap("get_last_batch_number")?
else {
return Ok(None);
};

Ok(Some(attester::BatchNumber(
last_batch_number.0.saturating_sub(NUM_BATCHES_TO_SIGN),
)))
.wrap("next_batch_to_attest")?
.map(|s| s.next_batch_to_attest))
}

/// Get the L1 batch QC from storage with the highest number.
Expand Down Expand Up @@ -603,16 +577,21 @@ impl storage::PersistentBatchStore for Store {
ctx: &ctx::Ctx,
number: attester::BatchNumber,
) -> ctx::Result<Option<attester::Batch>> {
let Some(hash) = self
.conn(ctx)
.await?
.batch_hash(ctx, number)
.await
.wrap("batch_hash()")?
else {
let mut conn = self.conn(ctx).await?;

let Some(hash) = conn.batch_hash(ctx, number).await.wrap("batch_hash()")? else {
return Ok(None);
};
Ok(Some(attester::Batch { number, hash }))

let Some(genesis) = conn.genesis(ctx).await.wrap("genesis()")? else {
return Ok(None);
};

Ok(Some(attester::Batch {
number,
hash,
genesis: genesis.hash(),
}))
}

/// Returns the QC of the batch with the given number.
Expand Down
Loading

0 comments on commit b45aa91

Please sign in to comment.