From 8c8ae8c36b2c206c3b7e41e7b0daf6c121f8ecfe Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Tue, 14 Mar 2023 09:42:13 -0700 Subject: [PATCH] squash of #7175 --- config/src/config/quorum_store_config.rs | 17 +-- .../consensus-types/src/proof_of_store.rs | 32 +--- .../consensus-types/src/request_response.rs | 32 +--- consensus/src/liveness/proposal_generator.rs | 1 - consensus/src/payload_client.rs | 6 +- consensus/src/payload_manager.rs | 24 +-- .../src/quorum_store/batch_coordinator.rs | 33 ++--- consensus/src/quorum_store/batch_generator.rs | 62 ++++---- consensus/src/quorum_store/batch_store.rs | 139 +++++------------- consensus/src/quorum_store/counters.rs | 25 +--- .../direct_mempool_quorum_store.rs | 1 - consensus/src/quorum_store/proof_manager.rs | 43 ++---- .../src/quorum_store/quorum_store_builder.rs | 22 +-- .../quorum_store/quorum_store_coordinator.rs | 9 +- .../quorum_store/tests/batch_store_test.rs | 39 +++-- .../tests/direct_mempool_quorum_store_test.rs | 1 - .../tests/proof_coordinator_test.rs | 4 +- .../quorum_store/tests/proof_manager_test.rs | 14 +- .../tests/quorum_store_db_test.rs | 29 +--- .../src/quorum_store/tests/types_test.rs | 5 +- consensus/src/quorum_store/types.rs | 6 +- consensus/src/quorum_store/utils.rs | 55 ++++--- consensus/src/state_computer.rs | 28 +++- consensus/src/state_replication.rs | 3 +- .../src/test_utils/mock_payload_manager.rs | 3 +- 25 files changed, 212 insertions(+), 421 deletions(-) diff --git a/config/src/config/quorum_store_config.rs b/config/src/config/quorum_store_config.rs index d6dc72178b7be..28868a72aaf2e 100644 --- a/config/src/config/quorum_store_config.rs +++ b/config/src/config/quorum_store_config.rs @@ -2,8 +2,8 @@ // SPDX-License-Identifier: Apache-2.0 use crate::config::MAX_SENDING_BLOCK_TXNS_QUORUM_STORE_OVERRIDE; -use aptos_types::block_info::Round; use serde::{Deserialize, Serialize}; +use std::time::Duration; #[derive(Clone, Debug, Deserialize, PartialEq, Serialize)] #[serde(default, deny_unknown_fields)] @@ -45,15 +45,7 @@ pub struct QuorumStoreConfig { pub max_batch_bytes: usize, pub batch_request_timeout_ms: usize, /// Used when setting up the expiration time for the batch initation. - pub batch_expiry_round_gap_when_init: Round, - /// Batches may have expiry set for batch_expiry_rounds_gap rounds after the - /// latest committed round, and it will not be cleared from storage for another - /// so other batch_expiry_grace_rounds rounds, so the peers on the network - /// can still fetch the data they fall behind (later, they would have to state-sync). - /// Used when checking the expiration time of the received batch against current logical time to prevent DDoS. - pub batch_expiry_round_gap_behind_latest_certified: Round, - pub batch_expiry_round_gap_beyond_latest_certified: Round, - pub batch_expiry_grace_rounds: Round, + pub batch_expiry_gap_when_init_usecs: u64, pub memory_quota: usize, pub db_quota: usize, pub batch_quota: usize, @@ -73,10 +65,7 @@ impl Default for QuorumStoreConfig { batch_generation_max_interval_ms: 250, max_batch_bytes: 4 * 1024 * 1024, batch_request_timeout_ms: 10000, - batch_expiry_round_gap_when_init: 100, - batch_expiry_round_gap_behind_latest_certified: 500, - batch_expiry_round_gap_beyond_latest_certified: 500, - batch_expiry_grace_rounds: 5, + batch_expiry_gap_when_init_usecs: Duration::from_secs(60).as_micros() as u64, memory_quota: 120_000_000, db_quota: 300_000_000, batch_quota: 300_000, diff --git a/consensus/consensus-types/src/proof_of_store.rs b/consensus/consensus-types/src/proof_of_store.rs index 0dd4427b46f08..e707687ec4dc3 100644 --- a/consensus/consensus-types/src/proof_of_store.rs +++ b/consensus/consensus-types/src/proof_of_store.rs @@ -1,7 +1,6 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::common::Round; use anyhow::{bail, Context}; use aptos_crypto::{bls12381, CryptoMaterialError, HashValue}; use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher}; @@ -18,26 +17,6 @@ use std::{ ops::Deref, }; -#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)] -pub struct LogicalTime { - epoch: u64, - round: Round, -} - -impl LogicalTime { - pub fn new(epoch: u64, round: Round) -> Self { - Self { epoch, round } - } - - pub fn epoch(&self) -> u64 { - self.epoch - } - - pub fn round(&self) -> Round { - self.round - } -} - #[derive( Copy, Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash, CryptoHasher, BCSCryptoHash, )] @@ -91,7 +70,8 @@ impl Display for BatchId { pub struct BatchInfo { author: PeerId, batch_id: BatchId, - expiration: LogicalTime, + epoch: u64, + expiration: u64, digest: HashValue, num_txns: u64, num_bytes: u64, @@ -101,7 +81,8 @@ impl BatchInfo { pub fn new( author: PeerId, batch_id: BatchId, - expiration: LogicalTime, + epoch: u64, + expiration: u64, digest: HashValue, num_txns: u64, num_bytes: u64, @@ -109,6 +90,7 @@ impl BatchInfo { Self { author, batch_id, + epoch, expiration, digest, num_txns, @@ -117,7 +99,7 @@ impl BatchInfo { } pub fn epoch(&self) -> u64 { - self.expiration.epoch + self.epoch } pub fn author(&self) -> PeerId { @@ -128,7 +110,7 @@ impl BatchInfo { self.batch_id } - pub fn expiration(&self) -> LogicalTime { + pub fn expiration(&self) -> u64 { self.expiration } diff --git a/consensus/consensus-types/src/request_response.rs b/consensus/consensus-types/src/request_response.rs index fa7f6f09c72e4..31f9599f7ca3f 100644 --- a/consensus/consensus-types/src/request_response.rs +++ b/consensus/consensus-types/src/request_response.rs @@ -1,19 +1,14 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::{ - common::{Payload, PayloadFilter, Round}, - proof_of_store::LogicalTime, -}; +use crate::common::{Payload, PayloadFilter}; use anyhow::Result; -use aptos_crypto::HashValue; use futures::channel::oneshot; use std::{fmt, fmt::Formatter}; pub enum GetPayloadCommand { /// Request to pull block to submit to consensus. GetPayloadRequest( - Round, // max block size u64, // max byte size @@ -31,7 +26,6 @@ impl fmt::Display for GetPayloadCommand { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { match self { GetPayloadCommand::GetPayloadRequest( - round, max_txns, max_bytes, return_non_full, @@ -40,28 +34,8 @@ impl fmt::Display for GetPayloadCommand { ) => { write!( f, - "GetPayloadRequest [round: {}, max_txns: {}, max_bytes: {}, return_non_full: {}, excluded: {}]", - round, max_txns, max_bytes, return_non_full, excluded - ) - }, - } - } -} - -pub enum CleanCommand { - CleanRequest(LogicalTime, Vec), -} - -impl fmt::Display for CleanCommand { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - match self { - CleanCommand::CleanRequest(logical_time, digests) => { - write!( - f, - "CleanRequest [epoch: {}, round: {}, digests: {:?}]", - logical_time.epoch(), - logical_time.round(), - digests + "GetPayloadRequest [max_txns: {}, max_bytes: {}, return_non_full: {}, excluded: {}]", + max_txns, max_bytes, return_non_full, excluded ) }, } diff --git a/consensus/src/liveness/proposal_generator.rs b/consensus/src/liveness/proposal_generator.rs index c483db3603bcc..1a06d3d9c8b96 100644 --- a/consensus/src/liveness/proposal_generator.rs +++ b/consensus/src/liveness/proposal_generator.rs @@ -273,7 +273,6 @@ impl ProposalGenerator { let payload = self .payload_client .pull_payload( - round, max_block_txns, max_block_bytes, payload_filter, diff --git a/consensus/src/payload_client.rs b/consensus/src/payload_client.rs index d5099a9aa1497..5a53843d11160 100644 --- a/consensus/src/payload_client.rs +++ b/consensus/src/payload_client.rs @@ -7,7 +7,7 @@ use crate::{ }; use anyhow::Result; use aptos_consensus_types::{ - common::{Payload, PayloadFilter, Round}, + common::{Payload, PayloadFilter}, request_response::{GetPayloadCommand, GetPayloadResponse}, }; use aptos_logger::prelude::*; @@ -55,7 +55,6 @@ impl QuorumStoreClient { async fn pull_internal( &self, - round: Round, max_items: u64, max_bytes: u64, return_non_full: bool, @@ -63,7 +62,6 @@ impl QuorumStoreClient { ) -> Result { let (callback, callback_rcv) = oneshot::channel(); let req = GetPayloadCommand::GetPayloadRequest( - round, max_items, max_bytes, return_non_full, @@ -94,7 +92,6 @@ impl QuorumStoreClient { impl PayloadClient for QuorumStoreClient { async fn pull_payload( &self, - round: Round, max_items: u64, max_bytes: u64, exclude_payloads: PayloadFilter, @@ -124,7 +121,6 @@ impl PayloadClient for QuorumStoreClient { let done = count == 0 || start_time.elapsed().as_millis() >= max_duration; let payload = self .pull_internal( - round, max_items, max_bytes, return_non_full || return_empty || done || self.poll_count == u64::MAX, diff --git a/consensus/src/payload_manager.rs b/consensus/src/payload_manager.rs index 7f825d3db1cd6..8d894e1dd0ef5 100644 --- a/consensus/src/payload_manager.rs +++ b/consensus/src/payload_manager.rs @@ -11,7 +11,7 @@ use crate::{ use aptos_consensus_types::{ block::Block, common::{DataStatus, Payload}, - proof_of_store::{LogicalTime, ProofOfStore}, + proof_of_store::ProofOfStore, }; use aptos_crypto::HashValue; use aptos_executor_types::{Error::DataNotFound, *}; @@ -31,7 +31,7 @@ pub enum PayloadManager { impl PayloadManager { async fn request_transactions( proofs: Vec, - logical_time: LogicalTime, + block_timestamp: u64, batch_store: &BatchStore, ) -> Vec<( HashValue, @@ -40,12 +40,12 @@ impl PayloadManager { let mut receivers = Vec::new(); for pos in proofs { trace!( - "QSE: requesting pos {:?}, digest {}, time = {:?}", + "QSE: requesting pos {:?}, digest {}, time = {}", pos, pos.digest(), - logical_time + block_timestamp ); - if logical_time <= pos.expiration() { + if block_timestamp <= pos.expiration() { receivers.push((*pos.digest(), batch_store.get_batch(pos))); } else { debug!("QSE: skipped expired pos {}", pos.digest()); @@ -55,12 +55,14 @@ impl PayloadManager { } ///Pass commit information to BatchReader and QuorumStore wrapper for their internal cleanups. - pub async fn notify_commit(&self, logical_time: LogicalTime, payloads: Vec) { + pub async fn notify_commit(&self, block_timestamp: u64, payloads: Vec) { match self { PayloadManager::DirectMempool => {}, PayloadManager::InQuorumStore(batch_store, coordinator_tx) => { // TODO: move this to somewhere in quorum store, so this can be a batch reader - batch_store.update_certified_round(logical_time).await; + batch_store + .update_certified_timestamp(block_timestamp) + .await; let digests: Vec = payloads .into_iter() @@ -77,7 +79,7 @@ impl PayloadManager { if let Err(e) = tx .send(CoordinatorCommand::CommitNotification( - logical_time, + block_timestamp, digests, )) .await @@ -104,7 +106,7 @@ impl PayloadManager { if proof_with_status.status.lock().is_none() { let receivers = PayloadManager::request_transactions( proof_with_status.proofs.clone(), - LogicalTime::new(block.epoch(), block.round()), + block.timestamp_usecs(), batch_store, ) .await; @@ -160,7 +162,7 @@ impl PayloadManager { warn!("Oneshot channel to get a batch was dropped with error {:?}", e); let new_receivers = PayloadManager::request_transactions( proof_with_data.proofs.clone(), - LogicalTime::new(block.epoch(), block.round()), + block.timestamp_usecs(), batch_store, ) .await; @@ -177,7 +179,7 @@ impl PayloadManager { Ok(Err(e)) => { let new_receivers = PayloadManager::request_transactions( proof_with_data.proofs.clone(), - LogicalTime::new(block.epoch(), block.round()), + block.timestamp_usecs(), batch_store, ) .await; diff --git a/consensus/src/quorum_store/batch_coordinator.rs b/consensus/src/quorum_store/batch_coordinator.rs index f8dd9bbb3b38e..c94559b70ed76 100644 --- a/consensus/src/quorum_store/batch_coordinator.rs +++ b/consensus/src/quorum_store/batch_coordinator.rs @@ -21,7 +21,6 @@ pub enum BatchCoordinatorCommand { } pub struct BatchCoordinator { - epoch: u64, my_peer_id: PeerId, network_sender: NetworkSender, batch_store: Arc>, @@ -30,14 +29,12 @@ pub struct BatchCoordinator { impl BatchCoordinator { pub(crate) fn new( - epoch: u64, //TODO: pass the epoch config my_peer_id: PeerId, network_sender: NetworkSender, batch_store: Arc>, max_batch_bytes: u64, ) -> Self { Self { - epoch, my_peer_id, network_sender, batch_store, @@ -47,35 +44,23 @@ impl BatchCoordinator { async fn handle_batch(&mut self, batch: Batch) -> Option { let source = batch.author(); - let expiration = batch.expiration(); let batch_id = batch.batch_id(); trace!( "QS: got batch message from {} batch_id {}", source, batch_id, ); - if expiration.epoch() == self.epoch { - counters::RECEIVED_BATCH_COUNT.inc(); - let num_bytes = batch.num_bytes(); - if num_bytes > self.max_batch_bytes { - error!( - "Batch from {} exceeds size limit {}, actual size: {}", - source, self.max_batch_bytes, num_bytes - ); - return None; - } - let persist_request = batch.into(); - return Some(persist_request); - } - // Malformed request with an inconsistent expiry epoch. - else { - trace!( - "QS: got end batch message from different epoch {} != {}", - expiration.epoch(), - self.epoch + counters::RECEIVED_BATCH_COUNT.inc(); + let num_bytes = batch.num_bytes(); + if num_bytes > self.max_batch_bytes { + error!( + "Batch from {} exceeds size limit {}, actual size: {}", + source, self.max_batch_bytes, num_bytes ); + return None; } - None + let persist_request = batch.into(); + Some(persist_request) } fn persist_and_send_digest(&self, persist_request: PersistRequest) { diff --git a/consensus/src/quorum_store/batch_generator.rs b/consensus/src/quorum_store/batch_generator.rs index f800b008e30f5..3cb58c6c553ad 100644 --- a/consensus/src/quorum_store/batch_generator.rs +++ b/consensus/src/quorum_store/batch_generator.rs @@ -7,14 +7,11 @@ use crate::{ counters, quorum_store_db::QuorumStoreStorage, types::Batch, - utils::{MempoolProxy, RoundExpirations}, + utils::{MempoolProxy, TimeExpirations}, }, }; use aptos_config::config::QuorumStoreConfig; -use aptos_consensus_types::{ - common::TransactionSummary, - proof_of_store::{BatchId, LogicalTime}, -}; +use aptos_consensus_types::{common::TransactionSummary, proof_of_store::BatchId}; use aptos_logger::prelude::*; use aptos_mempool::QuorumStoreRequest; use aptos_types::PeerId; @@ -29,7 +26,7 @@ use tokio::time::Interval; #[derive(Debug)] pub enum BatchGeneratorCommand { - CommitNotification(LogicalTime), + CommitNotification(u64), ProofExpiration(Vec), Shutdown(tokio::sync::oneshot::Sender<()>), } @@ -41,14 +38,15 @@ pub struct BackPressure { } pub struct BatchGenerator { + epoch: u64, my_peer_id: PeerId, batch_id: BatchId, db: Arc, config: QuorumStoreConfig, mempool_proxy: MempoolProxy, batches_in_progress: HashMap>, - batch_round_expirations: RoundExpirations, - latest_logical_time: LogicalTime, + batch_expirations: TimeExpirations, + latest_block_timestamp: u64, last_end_batch_time: Instant, // quorum store back pressure, get updated from proof manager back_pressure: BackPressure, @@ -80,14 +78,15 @@ impl BatchGenerator { .expect("Could not save to db"); Self { + epoch, my_peer_id, batch_id, db, config, mempool_proxy: MempoolProxy::new(mempool_tx, mempool_txn_pull_timeout_ms), batches_in_progress: HashMap::new(), - batch_round_expirations: RoundExpirations::new(), - latest_logical_time: LogicalTime::new(epoch, 0), + batch_expirations: TimeExpirations::new(), + latest_block_timestamp: 0, last_end_batch_time: Instant::now(), back_pressure: BackPressure { txn_count: false, @@ -147,12 +146,11 @@ impl BatchGenerator { let batch_id = self.batch_id; self.batch_id.increment(); self.db - .save_batch_id(self.latest_logical_time.epoch(), self.batch_id) + .save_batch_id(self.epoch, self.batch_id) .expect("Could not save to db"); - let expiry_round = - self.latest_logical_time.round() + self.config.batch_expiry_round_gap_when_init; - let logical_time = LogicalTime::new(self.latest_logical_time.epoch(), expiry_round); + let expiry_time = aptos_infallible::duration_since_epoch().as_micros() as u64 + + self.config.batch_expiry_gap_when_init_usecs; let txn_summaries: Vec<_> = pulled_txns .iter() .map(|txn| TransactionSummary { @@ -161,11 +159,16 @@ impl BatchGenerator { }) .collect(); - let batch = Batch::new(batch_id, pulled_txns, logical_time, self.my_peer_id); + let batch = Batch::new( + batch_id, + pulled_txns, + self.epoch, + expiry_time, + self.my_peer_id, + ); self.batches_in_progress.insert(batch_id, txn_summaries); - self.batch_round_expirations - .add_item(batch_id, expiry_round); + self.batch_expirations.add_item(batch_id, expiry_time); self.last_end_batch_time = Instant::now(); Some(batch) @@ -251,25 +254,19 @@ impl BatchGenerator { }), Some(cmd) = cmd_rx.recv() => monitor!("batch_generator_handle_command", { match cmd { - BatchGeneratorCommand::CommitNotification(logical_time) => { + BatchGeneratorCommand::CommitNotification(block_timestamp) => { trace!( - "QS: got clean request from execution, epoch {}, round {}", - logical_time.epoch(), - logical_time.round() - ); - assert_eq!( - self.latest_logical_time.epoch(), - logical_time.epoch(), - "Wrong epoch" + "QS: got clean request from execution, block timestamp {}", + block_timestamp ); assert!( - self.latest_logical_time <= logical_time, - "Decreasing logical time" + self.latest_block_timestamp <= block_timestamp, + "Decreasing block timestamp" ); - self.latest_logical_time = logical_time; - // Cleans up all batches that expire in rounds <= logical_time.round(). This is + self.latest_block_timestamp = block_timestamp; + // Cleans up all batches that expire in timestamp <= block_timestamp. This is // safe since clean request must occur only after execution result is certified. - for batch_id in self.batch_round_expirations.expire(logical_time.round()) { + for batch_id in self.batch_expirations.expire(block_timestamp) { if self.batches_in_progress.remove(&batch_id).is_some() { debug!( "QS: logical time based expiration batch w. id {} from batches_in_progress, new size {}", @@ -280,9 +277,6 @@ impl BatchGenerator { } }, BatchGeneratorCommand::ProofExpiration(batch_ids) => { - // Quorum store measurements - counters::TIMEOUT_BATCHES_COUNT.inc(); - for batch_id in batch_ids { debug!( "QS: received timeout for proof of store, batch id = {}", diff --git a/consensus/src/quorum_store/batch_store.rs b/consensus/src/quorum_store/batch_store.rs index 73431530e3205..3cd381fcf8fc9 100644 --- a/consensus/src/quorum_store/batch_store.rs +++ b/consensus/src/quorum_store/batch_store.rs @@ -8,13 +8,13 @@ use crate::{ counters, quorum_store_db::QuorumStoreStorage, types::{PersistedValue, StorageMode}, - utils::RoundExpirations, + utils::TimeExpirations, }, }; use anyhow::bail; use aptos_consensus_types::{ common::Round, - proof_of_store::{LogicalTime, ProofOfStore, SignedBatchInfo}, + proof_of_store::{ProofOfStore, SignedBatchInfo}, }; use aptos_crypto::HashValue; use aptos_executor_types::Error; @@ -100,15 +100,11 @@ impl QuotaManager { /// efficient concurrent readers. pub struct BatchStore { epoch: OnceCell, - last_certified_round: AtomicU64, + last_certified_time: AtomicU64, db_cache: DashMap, peer_quota: DashMap, - expirations: Mutex>, + expirations: Mutex>, db: Arc, - batch_expiry_round_gap_when_init: Round, - batch_expiry_round_gap_behind_latest_certified: Round, - batch_expiry_round_gap_beyond_latest_certified: Round, - expiry_grace_rounds: Round, memory_quota: usize, db_quota: usize, batch_quota: usize, @@ -120,12 +116,8 @@ pub struct BatchStore { impl BatchStore { pub(crate) fn new( epoch: u64, - last_certified_round: Round, + last_certified_time: u64, db: Arc, - batch_expiry_round_gap_when_init: Round, - batch_expiry_round_gap_behind_latest_certified: Round, - batch_expiry_round_gap_beyond_latest_certified: Round, - expiry_grace_rounds: Round, memory_quota: usize, db_quota: usize, batch_quota: usize, @@ -136,15 +128,11 @@ impl BatchStore { let db_clone = db.clone(); let batch_store = Self { epoch: OnceCell::with_value(epoch), - last_certified_round: AtomicU64::new(last_certified_round), + last_certified_time: AtomicU64::new(last_certified_time), db_cache: DashMap::new(), peer_quota: DashMap::new(), - expirations: Mutex::new(RoundExpirations::new()), + expirations: Mutex::new(TimeExpirations::new()), db, - batch_expiry_round_gap_when_init, - batch_expiry_round_gap_behind_latest_certified, - batch_expiry_round_gap_beyond_latest_certified, - expiry_grace_rounds, memory_quota, db_quota, batch_quota, @@ -160,7 +148,7 @@ impl BatchStore { "QS: Batchreader {} {} {}", db_content.len(), epoch, - last_certified_round + last_certified_time ); for (digest, value) in db_content { let expiration = value.expiration(); @@ -170,11 +158,8 @@ impl BatchStore { expiration, digest ); - assert!(epoch >= expiration.epoch()); - if epoch > expiration.epoch() - || last_certified_round >= expiration.round() + expiry_grace_rounds - { + if last_certified_time >= expiration { expired_keys.push(digest); } else { batch_store @@ -220,14 +205,14 @@ impl BatchStore { mut value: PersistedValue, ) -> anyhow::Result { let author = value.author(); - let expiration_round = value.expiration().round(); + let expiration_time = value.expiration(); { // Acquire dashmap internal lock on the entry corresponding to the digest. let cache_entry = self.db_cache.entry(digest); if let Occupied(entry) = &cache_entry { - if entry.get().expiration().round() >= value.expiration().round() { + if entry.get().expiration() >= expiration_time { debug!( "QS: already have the digest with higher expiration {}", digest @@ -266,61 +251,31 @@ impl BatchStore { self.expirations .lock() .unwrap() - .add_item(digest, expiration_round); + .add_item(digest, expiration_time); Ok(true) } pub(crate) fn save(&self, digest: HashValue, value: PersistedValue) -> anyhow::Result { - let expiration = value.expiration(); - if expiration.epoch() == self.epoch() { - // record the round gaps - if expiration.round() > self.last_certified_round() { - counters::GAP_BETWEEN_BATCH_EXPIRATION_AND_LAST_CERTIFIED_ROUND_HIGHER - .observe((expiration.round() - self.last_certified_round()) as f64); - } - if expiration.round() < self.last_certified_round() { - counters::GAP_BETWEEN_BATCH_EXPIRATION_AND_LAST_CERTIFIED_ROUND_LOWER - .observe((self.last_certified_round() - expiration.round()) as f64); - } - - if expiration.round() + self.batch_expiry_round_gap_behind_latest_certified - >= self.last_certified_round() - && expiration.round() - <= self.last_certified_round() - + self.batch_expiry_round_gap_beyond_latest_certified - { - fail_point!("quorum_store::save", |_| { - // Skip caching and storing value to the db - Ok(false) - }); - - return self.insert_to_cache(digest, value); - } + let last_certified_time = self.last_certified_time(); + if value.expiration() > last_certified_time { + fail_point!("quorum_store::save", |_| { + // Skip caching and storing value to the db + Ok(false) + }); + + return self.insert_to_cache(digest, value); } - bail!("Incorrect expiration {:?} with init gap {} in epoch {}, last committed round {} and max behind gap {} max beyond gap {}", - expiration, - self.batch_expiry_round_gap_when_init, + bail!( + "Incorrect expiration {} in epoch {}, last committed timestamp {}", + value.expiration(), self.epoch(), - self.last_certified_round(), - self.batch_expiry_round_gap_behind_latest_certified, - self.batch_expiry_round_gap_beyond_latest_certified); + last_certified_time, + ); } // pub(crate) for testing - pub(crate) fn clear_expired_payload(&self, certified_time: LogicalTime) -> Vec { - assert_eq!( - certified_time.epoch(), - self.epoch(), - "Execution epoch inconsistent with BatchReader" - ); - - let expired_round = if certified_time.round() >= self.expiry_grace_rounds { - certified_time.round() - self.expiry_grace_rounds - } else { - 0 - }; - - let expired_digests = self.expirations.lock().unwrap().expire(expired_round); + pub(crate) fn clear_expired_payload(&self, certified_time: u64) -> Vec { + let expired_digests = self.expirations.lock().unwrap().expire(certified_time); let mut ret = Vec::new(); for h in expired_digests { let removed_value = match self.db_cache.entry(h) { @@ -328,7 +283,7 @@ impl BatchStore { // We need to check up-to-date expiration again because receiving the same // digest with a higher expiration would update the persisted value and // effectively extend the expiration. - if entry.get().expiration().round() <= expired_round { + if entry.get().expiration() <= certified_time { Some(entry.remove()) } else { None @@ -346,14 +301,6 @@ impl BatchStore { } pub fn persist(&self, persist_request: PersistRequest) -> Option { - let expiration = persist_request.value.expiration(); - // Network listener should filter messages with wrong expiration epoch. - assert_eq!( - expiration.epoch(), - self.epoch(), - "Persist Request for a batch with an incorrect epoch" - ); - match self.save(persist_request.digest, persist_request.value.clone()) { Ok(needs_db) => { let batch_info = persist_request.value.batch_info().clone(); @@ -379,26 +326,18 @@ impl BatchStore { // batches with expiration in this round can be cleaned up. The parameter // expiry grace rounds just keeps the batches around for a little longer // for lagging nodes to be able to catch up (without state-sync). - pub async fn update_certified_round(&self, certified_time: LogicalTime) { + pub async fn update_certified_timestamp(&self, certified_time: u64) { trace!("QS: batch reader updating time {:?}", certified_time); - assert_eq!( - self.epoch(), - certified_time.epoch(), - "QS: wrong epoch {} != {}", - self.epoch(), - certified_time.epoch() - ); - - let prev_round = self - .last_certified_round - .fetch_max(certified_time.round(), Ordering::SeqCst); - // Note: prev_round may be equal to certified_time round due to state-sync + let prev_time = self + .last_certified_time + .fetch_max(certified_time, Ordering::SeqCst); + // Note: prev_time may be equal to certified_time due to state-sync // at the epoch boundary. assert!( - prev_round <= certified_time.round(), - "Decreasing executed rounds reported to BatchReader {} {}", - prev_round, - certified_time.round(), + prev_time <= certified_time, + "Decreasing executed block timestamp reported to BatchReader {} {}", + prev_time, + certified_time, ); let expired_keys = self.clear_expired_payload(certified_time); @@ -407,8 +346,8 @@ impl BatchStore { } } - fn last_certified_round(&self) -> Round { - self.last_certified_round.load(Ordering::Relaxed) + fn last_certified_time(&self) -> Round { + self.last_certified_time.load(Ordering::Relaxed) } fn get_batch_from_db(&self, digest: &HashValue) -> Result { diff --git a/consensus/src/quorum_store/counters.rs b/consensus/src/quorum_store/counters.rs index 0c356c2c0e717..82cb2ac845751 100644 --- a/consensus/src/quorum_store/counters.rs +++ b/consensus/src/quorum_store/counters.rs @@ -130,31 +130,8 @@ pub static EXPIRED_PROOFS_WHEN_PULL: Lazy = Lazy::new(|| { .unwrap() }); -/// Histogram for the gaps between expiration round of the batch and the last certified round, and expiration round is higher. -pub static GAP_BETWEEN_BATCH_EXPIRATION_AND_LAST_CERTIFIED_ROUND_HIGHER: Lazy = - Lazy::new(|| { - register_histogram!( - "quorum_store_gap_batch_expiration_and_last_certified_round_higher", - "Histogram for the gaps between expiration round of the batch and the last certified round, and expiration round is higher.", - // exponential_buckets(/*start=*/ 100.0, /*factor=*/ 1.1, /*count=*/ 100).unwrap(), - ) - .unwrap() - }); - -/// Histogram for the gaps between expiration round of the batch and the last certified round, and expiration round is lower. -pub static GAP_BETWEEN_BATCH_EXPIRATION_AND_LAST_CERTIFIED_ROUND_LOWER: Lazy = Lazy::new( - || { - register_histogram!( - "quorum_store_gap_batch_expiration_and_last_certified_round_lower", - "Histogram for the gaps between expiration round of the batch and the last certified round, and expiration round is lower.", - // exponential_buckets(/*start=*/ 100.0, /*factor=*/ 1.1, /*count=*/ 100).unwrap(), - ) - .unwrap() - }, -); - /// Histogram for the gaps between expiration round and the current round when pulling the proofs, and expiration round is lower. -pub static GAP_BETWEEN_BATCH_EXPIRATION_AND_CURRENT_ROUND_WHEN_PULL_PROOFS: Lazy = +pub static GAP_BETWEEN_BATCH_EXPIRATION_AND_CURRENT_TIME_WHEN_PULL_PROOFS: Lazy = Lazy::new(|| { register_histogram!( "quorum_store_gap_batch_expiration_and_current_round_when_pull", diff --git a/consensus/src/quorum_store/direct_mempool_quorum_store.rs b/consensus/src/quorum_store/direct_mempool_quorum_store.rs index aceec4fb6c9fd..cb714e03ef596 100644 --- a/consensus/src/quorum_store/direct_mempool_quorum_store.rs +++ b/consensus/src/quorum_store/direct_mempool_quorum_store.rs @@ -131,7 +131,6 @@ impl DirectMempoolQuorumStore { async fn handle_consensus_request(&self, req: GetPayloadCommand) { match req { GetPayloadCommand::GetPayloadRequest( - _round, max_txns, max_bytes, return_non_full, diff --git a/consensus/src/quorum_store/proof_manager.rs b/consensus/src/quorum_store/proof_manager.rs index ad158553a0aa0..c96d3145e2204 100644 --- a/consensus/src/quorum_store/proof_manager.rs +++ b/consensus/src/quorum_store/proof_manager.rs @@ -7,7 +7,7 @@ use crate::{ }; use aptos_consensus_types::{ common::{Payload, PayloadFilter, ProofWithData}, - proof_of_store::{LogicalTime, ProofOfStore}, + proof_of_store::ProofOfStore, request_response::{GetPayloadCommand, GetPayloadResponse}, }; use aptos_crypto::HashValue; @@ -20,14 +20,14 @@ use std::collections::HashSet; #[derive(Debug)] pub enum ProofManagerCommand { ReceiveProof(ProofOfStore), - CommitNotification(LogicalTime, Vec), + CommitNotification(u64, Vec), Shutdown(tokio::sync::oneshot::Sender<()>), } pub struct ProofManager { my_peer_id: PeerId, proofs_for_consensus: ProofQueue, - latest_logical_time: LogicalTime, + latest_block_timestamp: u64, back_pressure_total_txn_limit: u64, remaining_total_txn_num: u64, back_pressure_total_proof_limit: u64, @@ -36,7 +36,6 @@ pub struct ProofManager { impl ProofManager { pub fn new( - epoch: u64, my_peer_id: PeerId, back_pressure_total_txn_limit: u64, back_pressure_total_proof_limit: u64, @@ -44,7 +43,7 @@ impl ProofManager { Self { my_peer_id, proofs_for_consensus: ProofQueue::new(), - latest_logical_time: LogicalTime::new(epoch, 0), + latest_block_timestamp: 0, back_pressure_total_txn_limit, remaining_total_txn_num: 0, back_pressure_total_proof_limit, @@ -67,24 +66,18 @@ impl ProofManager { pub(crate) fn handle_commit_notification( &mut self, - logical_time: LogicalTime, + block_timestamp: u64, digests: Vec, ) { trace!( - "QS: got clean request from execution at epoch {}, round {}", - logical_time.epoch(), - logical_time.round() - ); - assert_eq!( - self.latest_logical_time.epoch(), - logical_time.epoch(), - "Wrong epoch" + "QS: got clean request from execution at block timestamp {}", + block_timestamp ); assert!( - self.latest_logical_time <= logical_time, - "Decreasing logical time" + self.latest_block_timestamp <= block_timestamp, + "Decreasing block timestamp" ); - self.latest_logical_time = logical_time; + self.latest_block_timestamp = block_timestamp; self.proofs_for_consensus.mark_committed(digests); } @@ -92,7 +85,6 @@ impl ProofManager { match msg { // TODO: check what max_txns consensus is using GetPayloadCommand::GetPayloadRequest( - round, max_txns, max_bytes, return_non_full, @@ -110,17 +102,14 @@ impl ProofManager { let proof_block = self.proofs_for_consensus.pull_proofs( &excluded_proofs, - LogicalTime::new(self.latest_logical_time.epoch(), round), + self.latest_block_timestamp, max_txns, max_bytes, return_non_full, ); (self.remaining_total_txn_num, self.remaining_total_proof_num) = self .proofs_for_consensus - .num_total_txns_and_proofs(LogicalTime::new( - self.latest_logical_time.epoch(), - round, - )); + .num_total_txns_and_proofs(self.latest_block_timestamp); let res = GetPayloadResponse::GetPayloadResponse( if proof_block.is_empty() { @@ -189,14 +178,14 @@ impl ProofManager { ProofManagerCommand::ReceiveProof(proof) => { self.receive_proof(proof); }, - ProofManagerCommand::CommitNotification(logical_time, digests) => { - self.handle_commit_notification(logical_time, digests); + ProofManagerCommand::CommitNotification(block_timestamp, digests) => { + self.handle_commit_notification(block_timestamp, digests); // update the backpressure upon new commit round (self.remaining_total_txn_num, self.remaining_total_proof_num) = - self.proofs_for_consensus.num_total_txns_and_proofs(logical_time); + self.proofs_for_consensus.num_total_txns_and_proofs(block_timestamp); // TODO: keeping here for metrics, might be part of the backpressure in the future? - self.proofs_for_consensus.clean_local_proofs(logical_time); + self.proofs_for_consensus.clean_local_proofs(block_timestamp); }, } let updated_back_pressure = self.qs_back_pressure(); diff --git a/consensus/src/quorum_store/quorum_store_builder.rs b/consensus/src/quorum_store/quorum_store_builder.rs index b1fd3dedad82f..21a7e6da3efa6 100644 --- a/consensus/src/quorum_store/quorum_store_builder.rs +++ b/consensus/src/quorum_store/quorum_store_builder.rs @@ -225,19 +225,7 @@ impl InnerBuilder { .aptos_db .get_latest_ledger_info() .expect("could not get latest ledger info"); - let last_committed_round = if latest_ledger_info_with_sigs - .ledger_info() - .commit_info() - .epoch() - == self.epoch - { - latest_ledger_info_with_sigs - .ledger_info() - .commit_info() - .round() - } else { - 0 - }; + let last_committed_timestamp = latest_ledger_info_with_sigs.commit_info().timestamp_usecs(); let batch_requester = BatchRequester::new( self.epoch, @@ -248,12 +236,8 @@ impl InnerBuilder { ); let batch_store = Arc::new(BatchStore::new( self.epoch, - last_committed_round, + last_committed_timestamp, self.quorum_store_storage.clone(), - self.config.batch_expiry_round_gap_when_init, - self.config.batch_expiry_round_gap_behind_latest_certified, - self.config.batch_expiry_round_gap_beyond_latest_certified, - self.config.batch_expiry_grace_rounds, self.config.memory_quota, self.config.db_quota, self.config.batch_quota, @@ -315,7 +299,6 @@ impl InnerBuilder { self.remote_batch_coordinator_cmd_rx.into_iter().enumerate() { let batch_coordinator = BatchCoordinator::new( - self.epoch, self.author, self.network_sender.clone(), self.batch_store.clone().unwrap(), @@ -347,7 +330,6 @@ impl InnerBuilder { let proof_manager_cmd_rx = self.proof_manager_cmd_rx.take().unwrap(); let proof_manager = ProofManager::new( - self.epoch, self.author, self.config.back_pressure.backlog_txn_limit_count, self.config diff --git a/consensus/src/quorum_store/quorum_store_coordinator.rs b/consensus/src/quorum_store/quorum_store_coordinator.rs index 09571409d16c5..e4f9b302bd2bc 100644 --- a/consensus/src/quorum_store/quorum_store_coordinator.rs +++ b/consensus/src/quorum_store/quorum_store_coordinator.rs @@ -10,7 +10,6 @@ use crate::{ round_manager::VerifiedEvent, }; use aptos_channels::aptos_channel; -use aptos_consensus_types::proof_of_store::LogicalTime; use aptos_crypto::HashValue; use aptos_logger::prelude::*; use aptos_types::{account_address::AccountAddress, PeerId}; @@ -18,7 +17,7 @@ use futures::StreamExt; use tokio::sync::{mpsc, oneshot}; pub enum CoordinatorCommand { - CommitNotification(LogicalTime, Vec), + CommitNotification(u64, Vec), Shutdown(futures_channel::oneshot::Sender<()>), } @@ -54,10 +53,10 @@ impl QuorumStoreCoordinator { while let Some(cmd) = rx.next().await { monitor!("quorum_store_coordinator_loop", { match cmd { - CoordinatorCommand::CommitNotification(logical_time, digests) => { + CoordinatorCommand::CommitNotification(block_timestamp, digests) => { self.proof_manager_cmd_tx .send(ProofManagerCommand::CommitNotification( - logical_time, + block_timestamp, digests, )) .await @@ -65,7 +64,7 @@ impl QuorumStoreCoordinator { // TODO: need a callback or not? self.batch_generator_cmd_tx - .send(BatchGeneratorCommand::CommitNotification(logical_time)) + .send(BatchGeneratorCommand::CommitNotification(block_timestamp)) .await .expect("Failed to send to BatchGenerator"); }, diff --git a/consensus/src/quorum_store/tests/batch_store_test.rs b/consensus/src/quorum_store/tests/batch_store_test.rs index 533bda04af644..2d8fb1b2f24df 100644 --- a/consensus/src/quorum_store/tests/batch_store_test.rs +++ b/consensus/src/quorum_store/tests/batch_store_test.rs @@ -10,7 +10,7 @@ use crate::{ }, test_utils::mock_quorum_store_sender::MockQuorumStoreSender, }; -use aptos_consensus_types::proof_of_store::{BatchId, LogicalTime}; +use aptos_consensus_types::proof_of_store::BatchId; use aptos_crypto::HashValue; use aptos_temppath::TempPath; use aptos_types::{account_address::AccountAddress, validator_verifier::random_validator_verifier}; @@ -39,10 +39,6 @@ fn batch_store_for_test_no_db(memory_quota: usize) -> Arc, - expiration: LogicalTime, + epoch: u64, + expiration: u64, batch_author: PeerId, ) -> Self { let payload = BatchPayload::new(payload); let batch_info = BatchInfo::new( batch_author, batch_id, + epoch, expiration, payload.hash(), payload.num_txns() as u64, diff --git a/consensus/src/quorum_store/utils.rs b/consensus/src/quorum_store/utils.rs index c25714c903ed1..4d6b9fdd10356 100644 --- a/consensus/src/quorum_store/utils.rs +++ b/consensus/src/quorum_store/utils.rs @@ -2,10 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{monitor, quorum_store::counters}; -use aptos_consensus_types::{ - common::{Round, TransactionSummary}, - proof_of_store::{LogicalTime, ProofOfStore}, -}; +use aptos_consensus_types::{common::TransactionSummary, proof_of_store::ProofOfStore}; use aptos_crypto::HashValue; use aptos_logger::prelude::*; use aptos_mempool::{QuorumStoreRequest, QuorumStoreResponse}; @@ -59,26 +56,26 @@ impl Timeouts { } } -pub(crate) struct RoundExpirations { - expiries: BinaryHeap<(Reverse, I)>, +pub(crate) struct TimeExpirations { + expiries: BinaryHeap<(Reverse, I)>, } -impl RoundExpirations { +impl TimeExpirations { pub(crate) fn new() -> Self { Self { expiries: BinaryHeap::new(), } } - pub(crate) fn add_item(&mut self, item: I, expiry_round: Round) { - self.expiries.push((Reverse(expiry_round), item)); + pub(crate) fn add_item(&mut self, item: I, expiry_time: u64) { + self.expiries.push((Reverse(expiry_time), item)); } /// Expire and return items corresponding to round <= given (expired) round. - pub(crate) fn expire(&mut self, round: Round) -> HashSet { + pub(crate) fn expire(&mut self, expiry_time: u64) -> HashSet { let mut ret = HashSet::new(); - while let Some((Reverse(r), _)) = self.expiries.peek() { - if *r <= round { + while let Some((Reverse(t), _)) = self.expiries.peek() { + if *t <= expiry_time { let (_, item) = self.expiries.pop().unwrap(); ret.insert(item); } else { @@ -145,8 +142,8 @@ impl MempoolProxy { // TODO: unitest pub struct ProofQueue { - digest_queue: VecDeque<(HashValue, LogicalTime)>, // queue of all proofs - local_digest_queue: VecDeque<(HashValue, LogicalTime)>, // queue of local proofs, to make back pressure update more efficient + digest_queue: VecDeque<(HashValue, u64)>, // queue of all proofs + local_digest_queue: VecDeque<(HashValue, u64)>, // queue of local proofs, to make back pressure update more efficient digest_proof: HashMap>, // None means committed digest_insertion_time: HashMap, } @@ -192,7 +189,7 @@ impl ProofQueue { pub(crate) fn pull_proofs( &mut self, excluded_proofs: &HashSet, - current_time: LogicalTime, + current_block_timestamp: u64, max_txns: u64, max_bytes: u64, return_non_full: bool, @@ -200,7 +197,7 @@ impl ProofQueue { let num_expired = self .digest_queue .iter() - .take_while(|(_, expiration_time)| *expiration_time < current_time) + .take_while(|(_, expiration_time)| *expiration_time < current_block_timestamp) .count(); let mut num_expired_but_not_committed = 0; for (digest, expiration_time) in self.digest_queue.drain(0..num_expired) { @@ -212,9 +209,9 @@ impl ProofQueue { { // non-committed proof that is expired num_expired_but_not_committed += 1; - if expiration_time.round() < current_time.round() { - counters::GAP_BETWEEN_BATCH_EXPIRATION_AND_CURRENT_ROUND_WHEN_PULL_PROOFS - .observe((current_time.round() - expiration_time.round()) as f64); + if expiration_time < current_block_timestamp { + counters::GAP_BETWEEN_BATCH_EXPIRATION_AND_CURRENT_TIME_WHEN_PULL_PROOFS + .observe((current_block_timestamp - expiration_time) as f64); } } claims::assert_some!(self.digest_proof.remove(&digest)); @@ -238,7 +235,7 @@ impl ProofQueue { .get(digest) .expect("Entry for unexpired digest must exist") { - if *expiration >= current_time { + if *expiration >= current_block_timestamp { // non-committed proof that has not expired cur_bytes += proof.num_bytes(); cur_txns += proof.num_txns(); @@ -254,9 +251,9 @@ impl ProofQueue { } else { // non-committed proof that is expired num_expired_but_not_committed += 1; - if expiration.round() < current_time.round() { - counters::GAP_BETWEEN_BATCH_EXPIRATION_AND_CURRENT_ROUND_WHEN_PULL_PROOFS - .observe((current_time.round() - expiration.round()) as f64); + if *expiration < current_block_timestamp { + counters::GAP_BETWEEN_BATCH_EXPIRATION_AND_CURRENT_TIME_WHEN_PULL_PROOFS + .observe((current_block_timestamp - expiration) as f64); } } } @@ -285,13 +282,13 @@ impl ProofQueue { } } - pub(crate) fn num_total_txns_and_proofs(&mut self, current_time: LogicalTime) -> (u64, u64) { + pub(crate) fn num_total_txns_and_proofs(&mut self, current_block_timestamp: u64) -> (u64, u64) { let mut remaining_txns = 0; let mut remaining_proofs = 0; // TODO: if the digest_queue is large, this may be too inefficient for (digest, expiration) in self.digest_queue.iter() { // Not expired - if *expiration >= current_time { + if *expiration >= current_block_timestamp { // Not committed if let Some(Some(proof)) = self.digest_proof.get(digest) { remaining_txns += proof.num_txns(); @@ -306,11 +303,11 @@ impl ProofQueue { } // returns the number of unexpired local proofs - pub(crate) fn clean_local_proofs(&mut self, current_time: LogicalTime) -> Option { + pub(crate) fn clean_local_proofs(&mut self, current_block_timestamp: u64) -> Option { let num_expired = self .local_digest_queue .iter() - .take_while(|(_, expiration_time)| *expiration_time < current_time) + .take_while(|(_, expiration_time)| *expiration_time < current_block_timestamp) .count(); self.local_digest_queue.drain(0..num_expired); @@ -319,7 +316,7 @@ impl ProofQueue { for (digest, expiration) in self.local_digest_queue.iter() { // Not expired. It is possible that the proof entry in digest_proof was already removed // when draining the digest_queue but local_digest_queue is not drained yet. - if *expiration >= current_time { + if *expiration >= current_block_timestamp { if let Some(entry) = self.digest_proof.get(digest) { // Not committed if entry.is_some() { @@ -331,7 +328,7 @@ impl ProofQueue { counters::NUM_LOCAL_PROOFS_LEFT_ON_COMMIT.observe(remaining_local_proof_size as f64); if let Some(&(_, time)) = self.local_digest_queue.iter().next() { - Some(time.round()) + Some(time) } else { None } diff --git a/consensus/src/state_computer.rs b/consensus/src/state_computer.rs index dcef8b9dd87f2..ea77bcb813afb 100644 --- a/consensus/src/state_computer.rs +++ b/consensus/src/state_computer.rs @@ -14,9 +14,7 @@ use crate::{ }; use anyhow::Result; use aptos_consensus_notifications::ConsensusNotificationSender; -use aptos_consensus_types::{ - block::Block, executed_block::ExecutedBlock, proof_of_store::LogicalTime, -}; +use aptos_consensus_types::{block::Block, common::Round, executed_block::ExecutedBlock}; use aptos_crypto::HashValue; use aptos_executor_types::{BlockExecutorTrait, Error as ExecutionError, StateComputeResult}; use aptos_infallible::Mutex; @@ -36,6 +34,18 @@ type NotificationType = ( Vec, ); +#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Hash)] +struct LogicalTime { + epoch: u64, + round: Round, +} + +impl LogicalTime { + pub fn new(epoch: u64, round: Round) -> Self { + Self { epoch, round } + } +} + /// Basic communication with the Execution module; /// implements StateComputer traits. pub struct ExecutionProxy { @@ -44,7 +54,7 @@ pub struct ExecutionProxy { state_sync_notifier: Arc, async_state_sync_notifier: aptos_channels::Sender, validators: Mutex>, - write_mutex: AsyncMutex, + write_mutex: AsyncMutex, // TODO: mutex needed? payload_manager: Mutex>>, transaction_shuffler: Mutex>>, } @@ -159,6 +169,7 @@ impl StateComputer for ExecutionProxy { finality_proof.ledger_info().epoch(), finality_proof.ledger_info().round(), ); + let block_timestamp = finality_proof.commit_info().timestamp_usecs(); let payload_manager = self.payload_manager.lock().as_ref().unwrap().clone(); let txn_shuffler = self.transaction_shuffler.lock().as_ref().unwrap().clone(); @@ -201,7 +212,9 @@ impl StateComputer for ExecutionProxy { .expect("Failed to send async state sync notification"); *latest_logical_time = logical_time; - payload_manager.notify_commit(logical_time, payloads).await; + payload_manager + .notify_commit(block_timestamp, payloads) + .await; Ok(()) } @@ -210,12 +223,13 @@ impl StateComputer for ExecutionProxy { let mut latest_logical_time = self.write_mutex.lock().await; let logical_time = LogicalTime::new(target.ledger_info().epoch(), target.ledger_info().round()); + let block_timestamp = target.commit_info().timestamp_usecs(); // Before the state synchronization, we have to call finish() to free the in-memory SMT // held by BlockExecutor to prevent memory leak. self.executor.finish(); - // The pipeline phase already committed beyond the target synced round, just return. + // The pipeline phase already committed beyond the target block timestamp, just return. if *latest_logical_time >= logical_time { warn!( "State sync target {:?} is lower than already committed logical time {:?}", @@ -230,7 +244,7 @@ impl StateComputer for ExecutionProxy { let maybe_payload_manager = self.payload_manager.lock().as_ref().cloned(); if let Some(payload_manager) = maybe_payload_manager { payload_manager - .notify_commit(logical_time, Vec::new()) + .notify_commit(block_timestamp, Vec::new()) .await; } diff --git a/consensus/src/state_replication.rs b/consensus/src/state_replication.rs index bf242696fb0d2..f1c1ef56b063c 100644 --- a/consensus/src/state_replication.rs +++ b/consensus/src/state_replication.rs @@ -10,7 +10,7 @@ use crate::{ use anyhow::Result; use aptos_consensus_types::{ block::Block, - common::{Payload, PayloadFilter, Round}, + common::{Payload, PayloadFilter}, executed_block::ExecutedBlock, }; use aptos_crypto::HashValue; @@ -26,7 +26,6 @@ pub type StateComputerCommitCallBackType = pub trait PayloadClient: Send + Sync { async fn pull_payload( &self, - round: Round, max_items: u64, max_bytes: u64, exclude: PayloadFilter, diff --git a/consensus/src/test_utils/mock_payload_manager.rs b/consensus/src/test_utils/mock_payload_manager.rs index c823347416dd8..5e5ab2d0dc807 100644 --- a/consensus/src/test_utils/mock_payload_manager.rs +++ b/consensus/src/test_utils/mock_payload_manager.rs @@ -7,7 +7,7 @@ use crate::{ use anyhow::Result; use aptos_consensus_types::{ block::block_test_utils::random_payload, - common::{Payload, PayloadFilter, Round}, + common::{Payload, PayloadFilter}, request_response::GetPayloadCommand, }; use aptos_types::{ @@ -52,7 +52,6 @@ impl PayloadClient for MockPayloadManager { /// The returned future is fulfilled with the vector of SignedTransactions async fn pull_payload( &self, - _round: Round, _max_size: u64, _max_bytes: u64, _exclude: PayloadFilter,