diff --git a/core/node/consensus/src/en.rs b/core/node/consensus/src/en.rs index 242efa5f0319..ad2ec959aaa1 100644 --- a/core/node/consensus/src/en.rs +++ b/core/node/consensus/src/en.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use anyhow::Context as _; use async_trait::async_trait; use zksync_concurrency::{ctx, error::Wrap as _, scope, time}; @@ -7,7 +5,6 @@ use zksync_consensus_executor::{ self as executor, attestation::{AttestationStatusClient, AttestationStatusRunner}, }; -use zksync_consensus_network::gossip::AttestationStatusWatch; use zksync_consensus_roles::{attester, validator}; use zksync_consensus_storage::{BatchStore, BlockStore}; use zksync_node_sync::{ @@ -107,14 +104,14 @@ impl EN { s.spawn_bg(async { Ok(runner.run(ctx).await?) }); let (attestation_status, runner) = { - let status = Arc::new(AttestationStatusWatch::default()); - let client = MainNodeAttestationStatus(self.client.clone()); - let runner = AttestationStatusRunner::new( - status.clone(), - Box::new(client), + AttestationStatusRunner::init( + ctx, + Box::new(MainNodeAttestationStatus(self.client.clone())), time::Duration::seconds(5), - ); - (status, runner) + ) + .await + .map_err(ctx::Error::Canceled) + .wrap("AttestationStatusRunner::init()")? }; s.spawn_bg(async { Ok(runner.run(ctx).await?) }); @@ -269,10 +266,13 @@ impl AttestationStatusClient for MainNodeAttestationStatus { ctx: &ctx::Ctx, ) -> ctx::Result> { match ctx.wait(self.0.fetch_attestation_status()).await? { - Ok(bn) => { - let bn: u64 = bn.next_batch_to_attest.0.into(); - Ok(Some(attester::BatchNumber(bn))) + Ok(Some(status)) => { + let status: zksync_dal::consensus_dal::AttestationStatus = + zksync_protobuf::serde::deserialize(&status.0) + .context("deserialize(AttestationStatus)")?; + Ok(Some(status.next_batch_to_attest)) } + Ok(None) => Ok(None), Err(err) => { tracing::warn!("AttestationStatus call to main node HTTP RPC failed: {err}"); Ok(None) diff --git a/core/node/consensus/src/mn.rs b/core/node/consensus/src/mn.rs index 68f4ee4e3512..0f1bb8dcb33b 100644 --- a/core/node/consensus/src/mn.rs +++ b/core/node/consensus/src/mn.rs @@ -1,10 +1,7 @@ -use std::sync::Arc; - use anyhow::Context as _; use zksync_concurrency::{ctx, error::Wrap as _, scope, time}; use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets}; use zksync_consensus_executor::{self as executor, attestation::AttestationStatusRunner, Attester}; -use zksync_consensus_network::gossip::AttestationStatusWatch; use zksync_consensus_roles::validator; use zksync_consensus_storage::{BatchStore, BlockStore}; @@ -65,13 +62,14 @@ pub async fn run_main_node( s.spawn_bg(runner.run(ctx)); let (attestation_status, runner) = { - let status = Arc::new(AttestationStatusWatch::default()); - let runner = AttestationStatusRunner::new_from_store( - status.clone(), + AttestationStatusRunner::init_from_store( + ctx, batch_store.clone(), time::Duration::seconds(1), - ); - (status, runner) + ) + .await + .map_err(ctx::Error::Canceled) + .wrap("AttestationStatusRunner::init_from_store()")? }; s.spawn_bg(runner.run(ctx)); diff --git a/core/node/consensus/src/storage/connection.rs b/core/node/consensus/src/storage/connection.rs index 67897268434c..07f82ed4fd23 100644 --- a/core/node/consensus/src/storage/connection.rs +++ b/core/node/consensus/src/storage/connection.rs @@ -430,14 +430,14 @@ impl<'a> Connection<'a> { }) } - /// Wrapper for `consensus_dal().next_batch_to_attest()`. - pub async fn next_batch_to_attest( + /// Wrapper for `consensus_dal().attestation_status()`. + pub async fn attestation_status( &mut self, ctx: &ctx::Ctx, - ) -> ctx::Result { + ) -> ctx::Result> { Ok(ctx - .wait(self.0.consensus_dal().next_batch_to_attest()) + .wait(self.0.consensus_dal().attestation_status()) .await? - .context("next_batch_to_attest()")?) + .context("attestation_status()")?) } } diff --git a/core/node/consensus/src/storage/store.rs b/core/node/consensus/src/storage/store.rs index bd3329ad48ae..0e7d403e9c68 100644 --- a/core/node/consensus/src/storage/store.rs +++ b/core/node/consensus/src/storage/store.rs @@ -449,13 +449,13 @@ impl storage::PersistentBatchStore for Store { &self, ctx: &ctx::Ctx, ) -> ctx::Result> { - Ok(Some( - self.conn(ctx) - .await? - .next_batch_to_attest(ctx) - .await - .wrap("next_batch_to_attest")?, - )) + Ok(self + .conn(ctx) + .await? + .attestation_status(ctx) + .await + .wrap("next_batch_to_attest")? + .map(|s| s.next_batch_to_attest)) } /// Get the L1 batch QC from storage with the highest number.