Skip to content

Commit

Permalink
[cherry-pick] PR 7194: [Quorum Store] block timestamp-based expiratio…
Browse files Browse the repository at this point in the history
…n of batches (#7175)

### Description

Previously, batches were expired using logical time, i.e., (epoch, round). Validators set batch expiration based on their local logical time. For validators that fell behind in consensus, this causes an issue where batches they create are more likely to not reach quorum or expire after quorum, as other validators have a more advanced round. There were several gap buffers introduced to try to mitigate this, which added complexity.

In this PR, logical time is replaced with system clock times (for creation) and block timestamp times (for expiration). Validators set expiration when creating a batch using their local clock time. The system clock is less likely to fall behind than round times (it doesn't depend on consensus progress). Expiration is based on the block timestamp, which is monotonically increasing on committed blocks, so all validators will still see the same expiration at the same round.

This simplifies things, now there is a single expiration limit (default to 60 seconds) at batch creation time.

### Test Plan

Ran land-blocking and three regions tests and did not see any regression.
  • Loading branch information
bchocho authored Mar 17, 2023
1 parent 92016df commit 5a96cde
Show file tree
Hide file tree
Showing 26 changed files with 233 additions and 429 deletions.
17 changes: 3 additions & 14 deletions config/src/config/quorum_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

use crate::config::MAX_SENDING_BLOCK_TXNS_QUORUM_STORE_OVERRIDE;
use aptos_types::block_info::Round;
use serde::{Deserialize, Serialize};
use std::time::Duration;

#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
Expand Down Expand Up @@ -45,15 +45,7 @@ pub struct QuorumStoreConfig {
pub max_batch_bytes: usize,
pub batch_request_timeout_ms: usize,
/// Used when setting up the expiration time for the batch initation.
pub batch_expiry_round_gap_when_init: Round,
/// Batches may have expiry set for batch_expiry_rounds_gap rounds after the
/// latest committed round, and it will not be cleared from storage for another
/// so other batch_expiry_grace_rounds rounds, so the peers on the network
/// can still fetch the data they fall behind (later, they would have to state-sync).
/// Used when checking the expiration time of the received batch against current logical time to prevent DDoS.
pub batch_expiry_round_gap_behind_latest_certified: Round,
pub batch_expiry_round_gap_beyond_latest_certified: Round,
pub batch_expiry_grace_rounds: Round,
pub batch_expiry_gap_when_init_usecs: u64,
pub memory_quota: usize,
pub db_quota: usize,
pub batch_quota: usize,
Expand All @@ -73,10 +65,7 @@ impl Default for QuorumStoreConfig {
batch_generation_max_interval_ms: 250,
max_batch_bytes: 4 * 1024 * 1024,
batch_request_timeout_ms: 10000,
batch_expiry_round_gap_when_init: 100,
batch_expiry_round_gap_behind_latest_certified: 500,
batch_expiry_round_gap_beyond_latest_certified: 500,
batch_expiry_grace_rounds: 5,
batch_expiry_gap_when_init_usecs: Duration::from_secs(60).as_micros() as u64,
memory_quota: 120_000_000,
db_quota: 300_000_000,
batch_quota: 300_000,
Expand Down
32 changes: 7 additions & 25 deletions consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::common::Round;
use anyhow::{bail, Context};
use aptos_crypto::{bls12381, CryptoMaterialError, HashValue};
use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher};
Expand All @@ -18,26 +17,6 @@ use std::{
ops::Deref,
};

#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub struct LogicalTime {
epoch: u64,
round: Round,
}

impl LogicalTime {
pub fn new(epoch: u64, round: Round) -> Self {
Self { epoch, round }
}

pub fn epoch(&self) -> u64 {
self.epoch
}

pub fn round(&self) -> Round {
self.round
}
}

#[derive(
Copy, Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash, CryptoHasher, BCSCryptoHash,
)]
Expand Down Expand Up @@ -91,7 +70,8 @@ impl Display for BatchId {
pub struct BatchInfo {
author: PeerId,
batch_id: BatchId,
expiration: LogicalTime,
epoch: u64,
expiration: u64,
digest: HashValue,
num_txns: u64,
num_bytes: u64,
Expand All @@ -101,14 +81,16 @@ impl BatchInfo {
pub fn new(
author: PeerId,
batch_id: BatchId,
expiration: LogicalTime,
epoch: u64,
expiration: u64,
digest: HashValue,
num_txns: u64,
num_bytes: u64,
) -> Self {
Self {
author,
batch_id,
epoch,
expiration,
digest,
num_txns,
Expand All @@ -117,7 +99,7 @@ impl BatchInfo {
}

pub fn epoch(&self) -> u64 {
self.expiration.epoch
self.epoch
}

pub fn author(&self) -> PeerId {
Expand All @@ -128,7 +110,7 @@ impl BatchInfo {
self.batch_id
}

pub fn expiration(&self) -> LogicalTime {
pub fn expiration(&self) -> u64 {
self.expiration
}

Expand Down
32 changes: 3 additions & 29 deletions consensus/consensus-types/src/request_response.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{
common::{Payload, PayloadFilter, Round},
proof_of_store::LogicalTime,
};
use crate::common::{Payload, PayloadFilter};
use anyhow::Result;
use aptos_crypto::HashValue;
use futures::channel::oneshot;
use std::{fmt, fmt::Formatter};

pub enum GetPayloadCommand {
/// Request to pull block to submit to consensus.
GetPayloadRequest(
Round,
// max block size
u64,
// max byte size
Expand All @@ -31,7 +26,6 @@ impl fmt::Display for GetPayloadCommand {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
GetPayloadCommand::GetPayloadRequest(
round,
max_txns,
max_bytes,
return_non_full,
Expand All @@ -40,28 +34,8 @@ impl fmt::Display for GetPayloadCommand {
) => {
write!(
f,
"GetPayloadRequest [round: {}, max_txns: {}, max_bytes: {}, return_non_full: {}, excluded: {}]",
round, max_txns, max_bytes, return_non_full, excluded
)
},
}
}
}

pub enum CleanCommand {
CleanRequest(LogicalTime, Vec<HashValue>),
}

impl fmt::Display for CleanCommand {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
CleanCommand::CleanRequest(logical_time, digests) => {
write!(
f,
"CleanRequest [epoch: {}, round: {}, digests: {:?}]",
logical_time.epoch(),
logical_time.round(),
digests
"GetPayloadRequest [max_txns: {}, max_bytes: {}, return_non_full: {}, excluded: {}]",
max_txns, max_bytes, return_non_full, excluded
)
},
}
Expand Down
1 change: 0 additions & 1 deletion consensus/src/liveness/proposal_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ impl ProposalGenerator {
let payload = self
.payload_client
.pull_payload(
round,
max_block_txns,
max_block_bytes,
payload_filter,
Expand Down
6 changes: 1 addition & 5 deletions consensus/src/payload_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
};
use anyhow::Result;
use aptos_consensus_types::{
common::{Payload, PayloadFilter, Round},
common::{Payload, PayloadFilter},
request_response::{GetPayloadCommand, GetPayloadResponse},
};
use aptos_logger::prelude::*;
Expand Down Expand Up @@ -55,15 +55,13 @@ impl QuorumStoreClient {

async fn pull_internal(
&self,
round: Round,
max_items: u64,
max_bytes: u64,
return_non_full: bool,
exclude_payloads: PayloadFilter,
) -> Result<Payload, QuorumStoreError> {
let (callback, callback_rcv) = oneshot::channel();
let req = GetPayloadCommand::GetPayloadRequest(
round,
max_items,
max_bytes,
return_non_full,
Expand Down Expand Up @@ -94,7 +92,6 @@ impl QuorumStoreClient {
impl PayloadClient for QuorumStoreClient {
async fn pull_payload(
&self,
round: Round,
max_items: u64,
max_bytes: u64,
exclude_payloads: PayloadFilter,
Expand Down Expand Up @@ -124,7 +121,6 @@ impl PayloadClient for QuorumStoreClient {
let done = count == 0 || start_time.elapsed().as_millis() >= max_duration;
let payload = self
.pull_internal(
round,
max_items,
max_bytes,
return_non_full || return_empty || done || self.poll_count == u64::MAX,
Expand Down
24 changes: 13 additions & 11 deletions consensus/src/payload_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
use aptos_consensus_types::{
block::Block,
common::{DataStatus, Payload},
proof_of_store::{LogicalTime, ProofOfStore},
proof_of_store::ProofOfStore,
};
use aptos_crypto::HashValue;
use aptos_executor_types::{Error::DataNotFound, *};
Expand All @@ -31,7 +31,7 @@ pub enum PayloadManager {
impl PayloadManager {
async fn request_transactions(
proofs: Vec<ProofOfStore>,
logical_time: LogicalTime,
block_timestamp: u64,
batch_store: &BatchStore<NetworkSender>,
) -> Vec<(
HashValue,
Expand All @@ -40,12 +40,12 @@ impl PayloadManager {
let mut receivers = Vec::new();
for pos in proofs {
trace!(
"QSE: requesting pos {:?}, digest {}, time = {:?}",
"QSE: requesting pos {:?}, digest {}, time = {}",
pos,
pos.digest(),
logical_time
block_timestamp
);
if logical_time <= pos.expiration() {
if block_timestamp <= pos.expiration() {
receivers.push((*pos.digest(), batch_store.get_batch(pos)));
} else {
debug!("QSE: skipped expired pos {}", pos.digest());
Expand All @@ -55,12 +55,14 @@ impl PayloadManager {
}

///Pass commit information to BatchReader and QuorumStore wrapper for their internal cleanups.
pub async fn notify_commit(&self, logical_time: LogicalTime, payloads: Vec<Payload>) {
pub async fn notify_commit(&self, block_timestamp: u64, payloads: Vec<Payload>) {
match self {
PayloadManager::DirectMempool => {},
PayloadManager::InQuorumStore(batch_store, coordinator_tx) => {
// TODO: move this to somewhere in quorum store, so this can be a batch reader
batch_store.update_certified_round(logical_time).await;
batch_store
.update_certified_timestamp(block_timestamp)
.await;

let digests: Vec<HashValue> = payloads
.into_iter()
Expand All @@ -77,7 +79,7 @@ impl PayloadManager {

if let Err(e) = tx
.send(CoordinatorCommand::CommitNotification(
logical_time,
block_timestamp,
digests,
))
.await
Expand All @@ -104,7 +106,7 @@ impl PayloadManager {
if proof_with_status.status.lock().is_none() {
let receivers = PayloadManager::request_transactions(
proof_with_status.proofs.clone(),
LogicalTime::new(block.epoch(), block.round()),
block.timestamp_usecs(),
batch_store,
)
.await;
Expand Down Expand Up @@ -167,7 +169,7 @@ impl PayloadManager {
warn!("Oneshot channel to get a batch was dropped with error {:?}", e);
let new_receivers = PayloadManager::request_transactions(
proof_with_data.proofs.clone(),
LogicalTime::new(block.epoch(), block.round()),
block.timestamp_usecs(),
batch_store,
)
.await;
Expand All @@ -184,7 +186,7 @@ impl PayloadManager {
Ok(Err(e)) => {
let new_receivers = PayloadManager::request_transactions(
proof_with_data.proofs.clone(),
LogicalTime::new(block.epoch(), block.round()),
block.timestamp_usecs(),
batch_store,
)
.await;
Expand Down
33 changes: 9 additions & 24 deletions consensus/src/quorum_store/batch_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub enum BatchCoordinatorCommand {
}

pub struct BatchCoordinator {
epoch: u64,
my_peer_id: PeerId,
network_sender: NetworkSender,
batch_store: Arc<BatchStore<NetworkSender>>,
Expand All @@ -30,14 +29,12 @@ pub struct BatchCoordinator {

impl BatchCoordinator {
pub(crate) fn new(
epoch: u64, //TODO: pass the epoch config
my_peer_id: PeerId,
network_sender: NetworkSender,
batch_store: Arc<BatchStore<NetworkSender>>,
max_batch_bytes: u64,
) -> Self {
Self {
epoch,
my_peer_id,
network_sender,
batch_store,
Expand All @@ -47,35 +44,23 @@ impl BatchCoordinator {

async fn handle_batch(&mut self, batch: Batch) -> Option<PersistRequest> {
let source = batch.author();
let expiration = batch.expiration();
let batch_id = batch.batch_id();
trace!(
"QS: got batch message from {} batch_id {}",
source,
batch_id,
);
if expiration.epoch() == self.epoch {
counters::RECEIVED_BATCH_COUNT.inc();
let num_bytes = batch.num_bytes();
if num_bytes > self.max_batch_bytes {
error!(
"Batch from {} exceeds size limit {}, actual size: {}",
source, self.max_batch_bytes, num_bytes
);
return None;
}
let persist_request = batch.into();
return Some(persist_request);
}
// Malformed request with an inconsistent expiry epoch.
else {
trace!(
"QS: got end batch message from different epoch {} != {}",
expiration.epoch(),
self.epoch
counters::RECEIVED_BATCH_COUNT.inc();
let num_bytes = batch.num_bytes();
if num_bytes > self.max_batch_bytes {
error!(
"Batch from {} exceeds size limit {}, actual size: {}",
source, self.max_batch_bytes, num_bytes
);
return None;
}
None
let persist_request = batch.into();
Some(persist_request)
}

fn persist_and_send_digest(&self, persist_request: PersistRequest) {
Expand Down
Loading

0 comments on commit 5a96cde

Please sign in to comment.