Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[cherry-pick] PR 7194: [Quorum Store] block timestamp-based expiration of batches #7175

Merged
merged 3 commits into from
Mar 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 3 additions & 14 deletions config/src/config/quorum_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

use crate::config::MAX_SENDING_BLOCK_TXNS_QUORUM_STORE_OVERRIDE;
use aptos_types::block_info::Round;
use serde::{Deserialize, Serialize};
use std::time::Duration;

#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
Expand Down Expand Up @@ -45,15 +45,7 @@ pub struct QuorumStoreConfig {
pub max_batch_bytes: usize,
pub batch_request_timeout_ms: usize,
/// Used when setting up the expiration time for the batch initation.
pub batch_expiry_round_gap_when_init: Round,
/// Batches may have expiry set for batch_expiry_rounds_gap rounds after the
/// latest committed round, and it will not be cleared from storage for another
/// so other batch_expiry_grace_rounds rounds, so the peers on the network
/// can still fetch the data they fall behind (later, they would have to state-sync).
/// Used when checking the expiration time of the received batch against current logical time to prevent DDoS.
pub batch_expiry_round_gap_behind_latest_certified: Round,
pub batch_expiry_round_gap_beyond_latest_certified: Round,
pub batch_expiry_grace_rounds: Round,
pub batch_expiry_gap_when_init_usecs: u64,
pub memory_quota: usize,
pub db_quota: usize,
pub batch_quota: usize,
Expand All @@ -73,10 +65,7 @@ impl Default for QuorumStoreConfig {
batch_generation_max_interval_ms: 250,
max_batch_bytes: 4 * 1024 * 1024,
batch_request_timeout_ms: 10000,
batch_expiry_round_gap_when_init: 100,
batch_expiry_round_gap_behind_latest_certified: 500,
batch_expiry_round_gap_beyond_latest_certified: 500,
batch_expiry_grace_rounds: 5,
batch_expiry_gap_when_init_usecs: Duration::from_secs(60).as_micros() as u64,
memory_quota: 120_000_000,
db_quota: 300_000_000,
batch_quota: 300_000,
Expand Down
32 changes: 7 additions & 25 deletions consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::common::Round;
use anyhow::{bail, Context};
use aptos_crypto::{bls12381, CryptoMaterialError, HashValue};
use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher};
Expand All @@ -18,26 +17,6 @@ use std::{
ops::Deref,
};

#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub struct LogicalTime {
epoch: u64,
round: Round,
}

impl LogicalTime {
pub fn new(epoch: u64, round: Round) -> Self {
Self { epoch, round }
}

pub fn epoch(&self) -> u64 {
self.epoch
}

pub fn round(&self) -> Round {
self.round
}
}

#[derive(
Copy, Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash, CryptoHasher, BCSCryptoHash,
)]
Expand Down Expand Up @@ -91,7 +70,8 @@ impl Display for BatchId {
pub struct BatchInfo {
author: PeerId,
batch_id: BatchId,
expiration: LogicalTime,
epoch: u64,
expiration: u64,
digest: HashValue,
num_txns: u64,
num_bytes: u64,
Expand All @@ -101,14 +81,16 @@ impl BatchInfo {
pub fn new(
author: PeerId,
batch_id: BatchId,
expiration: LogicalTime,
epoch: u64,
expiration: u64,
digest: HashValue,
num_txns: u64,
num_bytes: u64,
) -> Self {
Self {
author,
batch_id,
epoch,
expiration,
digest,
num_txns,
Expand All @@ -117,7 +99,7 @@ impl BatchInfo {
}

pub fn epoch(&self) -> u64 {
self.expiration.epoch
self.epoch
}

pub fn author(&self) -> PeerId {
Expand All @@ -128,7 +110,7 @@ impl BatchInfo {
self.batch_id
}

pub fn expiration(&self) -> LogicalTime {
pub fn expiration(&self) -> u64 {
self.expiration
}

Expand Down
32 changes: 3 additions & 29 deletions consensus/consensus-types/src/request_response.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{
common::{Payload, PayloadFilter, Round},
proof_of_store::LogicalTime,
};
use crate::common::{Payload, PayloadFilter};
use anyhow::Result;
use aptos_crypto::HashValue;
use futures::channel::oneshot;
use std::{fmt, fmt::Formatter};

pub enum GetPayloadCommand {
/// Request to pull block to submit to consensus.
GetPayloadRequest(
Round,
// max block size
u64,
// max byte size
Expand All @@ -31,7 +26,6 @@ impl fmt::Display for GetPayloadCommand {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
GetPayloadCommand::GetPayloadRequest(
round,
max_txns,
max_bytes,
return_non_full,
Expand All @@ -40,28 +34,8 @@ impl fmt::Display for GetPayloadCommand {
) => {
write!(
f,
"GetPayloadRequest [round: {}, max_txns: {}, max_bytes: {}, return_non_full: {}, excluded: {}]",
round, max_txns, max_bytes, return_non_full, excluded
)
},
}
}
}

pub enum CleanCommand {
CleanRequest(LogicalTime, Vec<HashValue>),
}

impl fmt::Display for CleanCommand {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
CleanCommand::CleanRequest(logical_time, digests) => {
write!(
f,
"CleanRequest [epoch: {}, round: {}, digests: {:?}]",
logical_time.epoch(),
logical_time.round(),
digests
"GetPayloadRequest [max_txns: {}, max_bytes: {}, return_non_full: {}, excluded: {}]",
max_txns, max_bytes, return_non_full, excluded
)
},
}
Expand Down
1 change: 0 additions & 1 deletion consensus/src/liveness/proposal_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ impl ProposalGenerator {
let payload = self
.payload_client
.pull_payload(
round,
max_block_txns,
max_block_bytes,
payload_filter,
Expand Down
6 changes: 1 addition & 5 deletions consensus/src/payload_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
};
use anyhow::Result;
use aptos_consensus_types::{
common::{Payload, PayloadFilter, Round},
common::{Payload, PayloadFilter},
request_response::{GetPayloadCommand, GetPayloadResponse},
};
use aptos_logger::prelude::*;
Expand Down Expand Up @@ -55,15 +55,13 @@ impl QuorumStoreClient {

async fn pull_internal(
&self,
round: Round,
max_items: u64,
max_bytes: u64,
return_non_full: bool,
exclude_payloads: PayloadFilter,
) -> Result<Payload, QuorumStoreError> {
let (callback, callback_rcv) = oneshot::channel();
let req = GetPayloadCommand::GetPayloadRequest(
round,
max_items,
max_bytes,
return_non_full,
Expand Down Expand Up @@ -94,7 +92,6 @@ impl QuorumStoreClient {
impl PayloadClient for QuorumStoreClient {
async fn pull_payload(
&self,
round: Round,
max_items: u64,
max_bytes: u64,
exclude_payloads: PayloadFilter,
Expand Down Expand Up @@ -124,7 +121,6 @@ impl PayloadClient for QuorumStoreClient {
let done = count == 0 || start_time.elapsed().as_millis() >= max_duration;
let payload = self
.pull_internal(
round,
max_items,
max_bytes,
return_non_full || return_empty || done || self.poll_count == u64::MAX,
Expand Down
24 changes: 13 additions & 11 deletions consensus/src/payload_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
use aptos_consensus_types::{
block::Block,
common::{DataStatus, Payload},
proof_of_store::{LogicalTime, ProofOfStore},
proof_of_store::ProofOfStore,
};
use aptos_crypto::HashValue;
use aptos_executor_types::{Error::DataNotFound, *};
Expand All @@ -31,7 +31,7 @@ pub enum PayloadManager {
impl PayloadManager {
async fn request_transactions(
proofs: Vec<ProofOfStore>,
logical_time: LogicalTime,
block_timestamp: u64,
batch_store: &BatchStore<NetworkSender>,
) -> Vec<(
HashValue,
Expand All @@ -40,12 +40,12 @@ impl PayloadManager {
let mut receivers = Vec::new();
for pos in proofs {
trace!(
"QSE: requesting pos {:?}, digest {}, time = {:?}",
"QSE: requesting pos {:?}, digest {}, time = {}",
pos,
pos.digest(),
logical_time
block_timestamp
);
if logical_time <= pos.expiration() {
if block_timestamp <= pos.expiration() {
receivers.push((*pos.digest(), batch_store.get_batch(pos)));
} else {
debug!("QSE: skipped expired pos {}", pos.digest());
Expand All @@ -55,12 +55,14 @@ impl PayloadManager {
}

///Pass commit information to BatchReader and QuorumStore wrapper for their internal cleanups.
pub async fn notify_commit(&self, logical_time: LogicalTime, payloads: Vec<Payload>) {
pub async fn notify_commit(&self, block_timestamp: u64, payloads: Vec<Payload>) {
match self {
PayloadManager::DirectMempool => {},
PayloadManager::InQuorumStore(batch_store, coordinator_tx) => {
// TODO: move this to somewhere in quorum store, so this can be a batch reader
batch_store.update_certified_round(logical_time).await;
batch_store
.update_certified_timestamp(block_timestamp)
.await;

let digests: Vec<HashValue> = payloads
.into_iter()
Expand All @@ -77,7 +79,7 @@ impl PayloadManager {

if let Err(e) = tx
.send(CoordinatorCommand::CommitNotification(
logical_time,
block_timestamp,
digests,
))
.await
Expand All @@ -104,7 +106,7 @@ impl PayloadManager {
if proof_with_status.status.lock().is_none() {
let receivers = PayloadManager::request_transactions(
proof_with_status.proofs.clone(),
LogicalTime::new(block.epoch(), block.round()),
block.timestamp_usecs(),
batch_store,
)
.await;
Expand Down Expand Up @@ -167,7 +169,7 @@ impl PayloadManager {
warn!("Oneshot channel to get a batch was dropped with error {:?}", e);
let new_receivers = PayloadManager::request_transactions(
proof_with_data.proofs.clone(),
LogicalTime::new(block.epoch(), block.round()),
block.timestamp_usecs(),
batch_store,
)
.await;
Expand All @@ -184,7 +186,7 @@ impl PayloadManager {
Ok(Err(e)) => {
let new_receivers = PayloadManager::request_transactions(
proof_with_data.proofs.clone(),
LogicalTime::new(block.epoch(), block.round()),
block.timestamp_usecs(),
batch_store,
)
.await;
Expand Down
33 changes: 9 additions & 24 deletions consensus/src/quorum_store/batch_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub enum BatchCoordinatorCommand {
}

pub struct BatchCoordinator {
epoch: u64,
my_peer_id: PeerId,
network_sender: NetworkSender,
batch_store: Arc<BatchStore<NetworkSender>>,
Expand All @@ -30,14 +29,12 @@ pub struct BatchCoordinator {

impl BatchCoordinator {
pub(crate) fn new(
epoch: u64, //TODO: pass the epoch config
my_peer_id: PeerId,
network_sender: NetworkSender,
batch_store: Arc<BatchStore<NetworkSender>>,
max_batch_bytes: u64,
) -> Self {
Self {
epoch,
my_peer_id,
network_sender,
batch_store,
Expand All @@ -47,35 +44,23 @@ impl BatchCoordinator {

async fn handle_batch(&mut self, batch: Batch) -> Option<PersistRequest> {
let source = batch.author();
let expiration = batch.expiration();
let batch_id = batch.batch_id();
trace!(
"QS: got batch message from {} batch_id {}",
source,
batch_id,
);
if expiration.epoch() == self.epoch {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unnecessary, EpochManager checks the epoch of BatchMsg

counters::RECEIVED_BATCH_COUNT.inc();
let num_bytes = batch.num_bytes();
if num_bytes > self.max_batch_bytes {
error!(
"Batch from {} exceeds size limit {}, actual size: {}",
source, self.max_batch_bytes, num_bytes
);
return None;
}
let persist_request = batch.into();
return Some(persist_request);
}
// Malformed request with an inconsistent expiry epoch.
else {
trace!(
"QS: got end batch message from different epoch {} != {}",
expiration.epoch(),
self.epoch
counters::RECEIVED_BATCH_COUNT.inc();
let num_bytes = batch.num_bytes();
if num_bytes > self.max_batch_bytes {
error!(
"Batch from {} exceeds size limit {}, actual size: {}",
source, self.max_batch_bytes, num_bytes
);
return None;
}
None
let persist_request = batch.into();
Some(persist_request)
}

fn persist_and_send_digest(&self, persist_request: PersistRequest) {
Expand Down
Loading