Skip to content

Commit

Permalink
BFT-496: Start polling the DB/API for attestation status
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed Jul 31, 2024
1 parent eb9049f commit f4a8003
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 48 deletions.
1 change: 1 addition & 0 deletions core/node/consensus/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,5 +147,6 @@ pub(super) fn executor(
rpc,
// TODO: Add to configuration
debug_page: None,
batch_poll_interval: time::Duration::seconds(1),
})
}
46 changes: 44 additions & 2 deletions core/node/consensus/src/en.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
use std::sync::Arc;

use anyhow::Context as _;
use async_trait::async_trait;
use zksync_concurrency::{ctx, error::Wrap as _, scope, time};
use zksync_consensus_executor as executor;
use zksync_consensus_roles::validator;
use zksync_consensus_executor::{
self as executor,
attestation::{AttestationStatusClient, AttestationStatusRunner},
};
use zksync_consensus_network::gossip::AttestationStatusWatch;
use zksync_consensus_roles::{attester, validator};
use zksync_consensus_storage::{BatchStore, BlockStore};
use zksync_node_sync::{
fetcher::FetchedBlock, sync_action::ActionQueueSender, MainNodeClient, SyncState,
Expand Down Expand Up @@ -99,6 +106,18 @@ impl EN {
.wrap("BatchStore::new()")?;
s.spawn_bg(async { Ok(runner.run(ctx).await?) });

let (attestation_status, runner) = {
let status = Arc::new(AttestationStatusWatch::default());
let client = MainNodeAttestationStatus(self.client.clone());
let runner = AttestationStatusRunner::new(
status.clone(),
Box::new(client),
time::Duration::seconds(5),
);
(status, runner)
};
s.spawn_bg(async { Ok(runner.run(ctx).await?) });

let executor = executor::Executor {
config: config::executor(&cfg, &secrets)?,
block_store,
Expand All @@ -111,6 +130,7 @@ impl EN {
payload_manager: Box::new(store.clone()),
}),
attester,
attestation_status,
};
executor.run(ctx).await?;

Expand Down Expand Up @@ -238,3 +258,25 @@ impl EN {
Ok(())
}
}

/// Wrapper to call [MainNodeClient::fetch_attestation_status] and adapt the return value to [AttestationStatusClient::next_batch_to_attest].
pub struct MainNodeAttestationStatus(Box<DynClient<L2>>);

#[async_trait]
impl AttestationStatusClient for MainNodeAttestationStatus {
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
match ctx.wait(self.0.fetch_attestation_status()).await? {
Ok(bn) => {
let bn: u64 = bn.next_batch_to_attest.0.into();
Ok(Some(attester::BatchNumber(bn)))
}
Err(err) => {
tracing::warn!("AttestationStatus call to main node HTTP RPC failed: {err}");
Ok(None)
}
}
}
}
19 changes: 17 additions & 2 deletions core/node/consensus/src/mn.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::sync::Arc;

use anyhow::Context as _;
use zksync_concurrency::{ctx, error::Wrap as _, scope};
use zksync_concurrency::{ctx, error::Wrap as _, scope, time};
use zksync_config::configs::consensus::{ConsensusConfig, ConsensusSecrets};
use zksync_consensus_executor::{self as executor, Attester};
use zksync_consensus_executor::{self as executor, attestation::AttestationStatusRunner, Attester};
use zksync_consensus_network::gossip::AttestationStatusWatch;
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{BatchStore, BlockStore};

Expand Down Expand Up @@ -61,6 +64,17 @@ pub async fn run_main_node(
.wrap("BatchStore::new()")?;
s.spawn_bg(runner.run(ctx));

let (attestation_status, runner) = {
let status = Arc::new(AttestationStatusWatch::default());
let runner = AttestationStatusRunner::new_from_store(
status.clone(),
batch_store.clone(),
time::Duration::seconds(1),
);
(status, runner)
};
s.spawn_bg(runner.run(ctx));

let executor = executor::Executor {
config: config::executor(&cfg, &secrets)?,
block_store,
Expand All @@ -71,6 +85,7 @@ pub async fn run_main_node(
payload_manager: Box::new(store.clone()),
}),
attester,
attestation_status,
};
executor.run(ctx).await
})
Expand Down
11 changes: 11 additions & 0 deletions core/node/consensus/src/storage/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,4 +429,15 @@ impl<'a> Connection<'a> {
last,
})
}

/// Wrapper for `consensus_dal().next_batch_to_attest()`.
pub async fn next_batch_to_attest(
&mut self,
ctx: &ctx::Ctx,
) -> ctx::Result<attester::BatchNumber> {
Ok(ctx
.wait(self.0.consensus_dal().next_batch_to_attest())
.await?
.context("next_batch_to_attest()")?)
}
}
65 changes: 22 additions & 43 deletions core/node/consensus/src/storage/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,44 +444,18 @@ impl storage::PersistentBatchStore for Store {
self.batches_persisted.clone()
}

/// Get the earliest L1 batch number which has to be signed by attesters.
async fn earliest_batch_number_to_sign(
/// Get the next L1 batch number which has to be signed by attesters.
async fn next_batch_to_attest(
&self,
ctx: &ctx::Ctx,
) -> ctx::Result<Option<attester::BatchNumber>> {
// This is the rough roadmap of how this logic will evolve:
// 1. Make best effort at gossiping and collecting votes; the `BatchVotes` in consensus only considers the last vote per attesters.
// Still, we can re-sign more than the last batch, anticipating step 2.
// 2. Ask the Main Node what is the earliest batch number that it still expects votes for (ie. what is the last submission + 1).
// 3. Change `BatchVotes` to handle multiple pending batch numbers, anticipating that batch intervals might decrease dramatically.
// 4. Once QC is required to submit to L1, Look at L1 to figure out what is the last submission, and sign after that.

// Originally this method returned all unsigned batch numbers by doing a DAL query, but we decided it should be okay and cheap
// to resend signatures for already signed batches, and we don't have to worry about skipping them. Because of that, we also
// didn't think it makes sense to query the database for the earliest unsigned batch *after* the submission, because we might
// as well just re-sign everything. Until we have a way to argue about the "last submission" we just re-sign the last 10 to
// try to produce as many QCs as the voting register allows, within reason.

// The latest decision is not to store batches with gaps between in the database *of the main node*.
// Once we have an API to serve to external nodes the earliest number the main node wants them to sign,
// we can get rid of this method: on the main node we can sign from what `last_batch_qc` returns, and
// while external nodes we can go from whatever the API returned.

const NUM_BATCHES_TO_SIGN: u64 = 10;

let Some(last_batch_number) = self
.conn(ctx)
.await?
.get_last_batch_number(ctx)
.await
.wrap("get_last_batch_number")?
else {
return Ok(None);
};

Ok(Some(attester::BatchNumber(
last_batch_number.0.saturating_sub(NUM_BATCHES_TO_SIGN),
)))
Ok(Some(
self.conn(ctx)
.await?
.next_batch_to_attest(ctx)
.await
.wrap("next_batch_to_attest")?,
))
}

/// Get the L1 batch QC from storage with the highest number.
Expand Down Expand Up @@ -524,16 +498,21 @@ impl storage::PersistentBatchStore for Store {
ctx: &ctx::Ctx,
number: attester::BatchNumber,
) -> ctx::Result<Option<attester::Batch>> {
let Some(hash) = self
.conn(ctx)
.await?
.batch_hash(ctx, number)
.await
.wrap("batch_hash()")?
else {
let mut conn = self.conn(ctx).await?;

let Some(hash) = conn.batch_hash(ctx, number).await.wrap("batch_hash()")? else {
return Ok(None);
};
Ok(Some(attester::Batch { number, hash }))

let Some(genesis) = conn.genesis(ctx).await.wrap("genesis()")? else {
return Ok(None);
};

Ok(Some(attester::Batch {
number,
hash,
genesis: genesis.hash(),
}))
}

/// Returns the QC of the batch with the given number.
Expand Down
7 changes: 6 additions & 1 deletion core/node/consensus/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -725,9 +725,14 @@ async fn test_attestation_status_api(version: ProtocolVersionId) {
let mut conn = pool.connection(ctx).await?;
let number = status.next_batch_to_attest;
let hash = conn.batch_hash(ctx, number).await?.unwrap();
let genesis = conn.genesis(ctx).await?.unwrap().hash();
let cert = attester::BatchQC {
signatures: attester::MultiSig::default(),
message: attester::Batch { number, hash },
message: attester::Batch {
number,
hash,
genesis,
},
};
conn.insert_batch_certificate(ctx, &cert)
.await
Expand Down

0 comments on commit f4a8003

Please sign in to comment.