diff --git a/core/node/consensus/src/config.rs b/core/node/consensus/src/config.rs index a46b1ab5afa7..c2fa13472066 100644 --- a/core/node/consensus/src/config.rs +++ b/core/node/consensus/src/config.rs @@ -147,5 +147,6 @@ pub(super) fn executor( rpc, // TODO: Add to configuration debug_page: None, + batch_poll_interval: time::Duration::seconds(1), }) } diff --git a/core/node/consensus/src/en.rs b/core/node/consensus/src/en.rs index 66bdc822c058..242efa5f0319 100644 --- a/core/node/consensus/src/en.rs +++ b/core/node/consensus/src/en.rs @@ -1,7 +1,14 @@ +use std::sync::Arc; + use anyhow::Context as _; +use async_trait::async_trait; use zksync_concurrency::{ctx, error::Wrap as _, scope, time}; -use zksync_consensus_executor as executor; -use zksync_consensus_roles::validator; +use zksync_consensus_executor::{ + self as executor, + attestation::{AttestationStatusClient, AttestationStatusRunner}, +}; +use zksync_consensus_network::gossip::AttestationStatusWatch; +use zksync_consensus_roles::{attester, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; use zksync_node_sync::{ fetcher::FetchedBlock, sync_action::ActionQueueSender, MainNodeClient, SyncState, @@ -99,6 +106,18 @@ impl EN { .wrap("BatchStore::new()")?; s.spawn_bg(async { Ok(runner.run(ctx).await?) }); + let (attestation_status, runner) = { + let status = Arc::new(AttestationStatusWatch::default()); + let client = MainNodeAttestationStatus(self.client.clone()); + let runner = AttestationStatusRunner::new( + status.clone(), + Box::new(client), + time::Duration::seconds(5), + ); + (status, runner) + }; + s.spawn_bg(async { Ok(runner.run(ctx).await?) }); + let executor = executor::Executor { config: config::executor(&cfg, &secrets)?, block_store, @@ -111,6 +130,7 @@ impl EN { payload_manager: Box::new(store.clone()), }), attester, + attestation_status, }; executor.run(ctx).await?; @@ -238,3 +258,25 @@ impl EN { Ok(()) } } + +/// Wrapper to call [MainNodeClient::fetch_attestation_status] and adapt the return value to [AttestationStatusClient::next_batch_to_attest]. +pub struct MainNodeAttestationStatus(Box>); + +#[async_trait] +impl AttestationStatusClient for MainNodeAttestationStatus { + async fn next_batch_to_attest( + &self, + ctx: &ctx::Ctx, + ) -> ctx::Result> { + match ctx.wait(self.0.fetch_attestation_status()).await? { + Ok(bn) => { + let bn: u64 = bn.next_batch_to_attest.0.into(); + Ok(Some(attester::BatchNumber(bn))) + } + Err(err) => { + tracing::warn!("AttestationStatus call to main node HTTP RPC failed: {err}"); + Ok(None) + } + } + } +} diff --git a/core/node/consensus/src/mn.rs b/core/node/consensus/src/mn.rs index 29cacf7a548f..68f4ee4e3512 100644 --- a/core/node/consensus/src/mn.rs +++ b/core/node/consensus/src/mn.rs @@ -1,7 +1,10 @@ +use std::sync::Arc; + use anyhow::Context as _; -use zksync_concurrency::{ctx, error::Wrap as _, scope}; +use zksync_concurrency::{ctx, error::Wrap as _, scope, time}; use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets}; -use zksync_consensus_executor::{self as executor, Attester}; +use zksync_consensus_executor::{self as executor, attestation::AttestationStatusRunner, Attester}; +use zksync_consensus_network::gossip::AttestationStatusWatch; use zksync_consensus_roles::validator; use zksync_consensus_storage::{BatchStore, BlockStore}; @@ -61,6 +64,17 @@ pub async fn run_main_node( .wrap("BatchStore::new()")?; s.spawn_bg(runner.run(ctx)); + let (attestation_status, runner) = { + let status = Arc::new(AttestationStatusWatch::default()); + let runner = AttestationStatusRunner::new_from_store( + status.clone(), + batch_store.clone(), + time::Duration::seconds(1), + ); + (status, runner) + }; + s.spawn_bg(runner.run(ctx)); + let executor = executor::Executor { config: config::executor(&cfg, &secrets)?, block_store, @@ -71,6 +85,7 @@ pub async fn run_main_node( payload_manager: Box::new(store.clone()), }), attester, + attestation_status, }; executor.run(ctx).await }) diff --git a/core/node/consensus/src/storage/connection.rs b/core/node/consensus/src/storage/connection.rs index 6bae0a490508..67897268434c 100644 --- a/core/node/consensus/src/storage/connection.rs +++ b/core/node/consensus/src/storage/connection.rs @@ -429,4 +429,15 @@ impl<'a> Connection<'a> { last, }) } + + /// Wrapper for `consensus_dal().next_batch_to_attest()`. + pub async fn next_batch_to_attest( + &mut self, + ctx: &ctx::Ctx, + ) -> ctx::Result { + Ok(ctx + .wait(self.0.consensus_dal().next_batch_to_attest()) + .await? + .context("next_batch_to_attest()")?) + } } diff --git a/core/node/consensus/src/storage/store.rs b/core/node/consensus/src/storage/store.rs index ce73c946a029..bd3329ad48ae 100644 --- a/core/node/consensus/src/storage/store.rs +++ b/core/node/consensus/src/storage/store.rs @@ -444,44 +444,18 @@ impl storage::PersistentBatchStore for Store { self.batches_persisted.clone() } - /// Get the earliest L1 batch number which has to be signed by attesters. - async fn earliest_batch_number_to_sign( + /// Get the next L1 batch number which has to be signed by attesters. + async fn next_batch_to_attest( &self, ctx: &ctx::Ctx, ) -> ctx::Result> { - // This is the rough roadmap of how this logic will evolve: - // 1. Make best effort at gossiping and collecting votes; the `BatchVotes` in consensus only considers the last vote per attesters. - // Still, we can re-sign more than the last batch, anticipating step 2. - // 2. Ask the Main Node what is the earliest batch number that it still expects votes for (ie. what is the last submission + 1). - // 3. Change `BatchVotes` to handle multiple pending batch numbers, anticipating that batch intervals might decrease dramatically. - // 4. Once QC is required to submit to L1, Look at L1 to figure out what is the last submission, and sign after that. - - // Originally this method returned all unsigned batch numbers by doing a DAL query, but we decided it should be okay and cheap - // to resend signatures for already signed batches, and we don't have to worry about skipping them. Because of that, we also - // didn't think it makes sense to query the database for the earliest unsigned batch *after* the submission, because we might - // as well just re-sign everything. Until we have a way to argue about the "last submission" we just re-sign the last 10 to - // try to produce as many QCs as the voting register allows, within reason. - - // The latest decision is not to store batches with gaps between in the database *of the main node*. - // Once we have an API to serve to external nodes the earliest number the main node wants them to sign, - // we can get rid of this method: on the main node we can sign from what `last_batch_qc` returns, and - // while external nodes we can go from whatever the API returned. - - const NUM_BATCHES_TO_SIGN: u64 = 10; - - let Some(last_batch_number) = self - .conn(ctx) - .await? - .get_last_batch_number(ctx) - .await - .wrap("get_last_batch_number")? - else { - return Ok(None); - }; - - Ok(Some(attester::BatchNumber( - last_batch_number.0.saturating_sub(NUM_BATCHES_TO_SIGN), - ))) + Ok(Some( + self.conn(ctx) + .await? + .next_batch_to_attest(ctx) + .await + .wrap("next_batch_to_attest")?, + )) } /// Get the L1 batch QC from storage with the highest number. @@ -524,16 +498,21 @@ impl storage::PersistentBatchStore for Store { ctx: &ctx::Ctx, number: attester::BatchNumber, ) -> ctx::Result> { - let Some(hash) = self - .conn(ctx) - .await? - .batch_hash(ctx, number) - .await - .wrap("batch_hash()")? - else { + let mut conn = self.conn(ctx).await?; + + let Some(hash) = conn.batch_hash(ctx, number).await.wrap("batch_hash()")? else { return Ok(None); }; - Ok(Some(attester::Batch { number, hash })) + + let Some(genesis) = conn.genesis(ctx).await.wrap("genesis()")? else { + return Ok(None); + }; + + Ok(Some(attester::Batch { + number, + hash, + genesis: genesis.hash(), + })) } /// Returns the QC of the batch with the given number. diff --git a/core/node/consensus/src/tests.rs b/core/node/consensus/src/tests.rs index 9890165ad81f..2a0d4e0b3c1b 100644 --- a/core/node/consensus/src/tests.rs +++ b/core/node/consensus/src/tests.rs @@ -701,9 +701,14 @@ async fn test_attestation_status_api(version: ProtocolVersionId) { let mut conn = validator_pool.connection(ctx).await?; let number = attester::BatchNumber(status.next_batch_to_attest.0.into()); let hash = conn.batch_hash(ctx, number).await?.unwrap(); + let genesis = conn.genesis(ctx).await?.unwrap().hash(); let cert = attester::BatchQC { signatures: attester::MultiSig::default(), - message: attester::Batch { number, hash }, + message: attester::Batch { + number, + hash, + genesis, + }, }; conn.insert_batch_certificate(ctx, &cert) .await