Skip to content

Commit

Permalink
squash of #7175
Browse files Browse the repository at this point in the history
  • Loading branch information
bchocho committed Mar 16, 2023
1 parent ecb55e6 commit c3d3b8a
Show file tree
Hide file tree
Showing 25 changed files with 212 additions and 421 deletions.
17 changes: 3 additions & 14 deletions config/src/config/quorum_store_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
// SPDX-License-Identifier: Apache-2.0

use crate::config::MAX_SENDING_BLOCK_TXNS_QUORUM_STORE_OVERRIDE;
use aptos_types::block_info::Round;
use serde::{Deserialize, Serialize};
use std::time::Duration;

#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
#[serde(default, deny_unknown_fields)]
Expand Down Expand Up @@ -45,15 +45,7 @@ pub struct QuorumStoreConfig {
pub max_batch_bytes: usize,
pub batch_request_timeout_ms: usize,
/// Used when setting up the expiration time for the batch initation.
pub batch_expiry_round_gap_when_init: Round,
/// Batches may have expiry set for batch_expiry_rounds_gap rounds after the
/// latest committed round, and it will not be cleared from storage for another
/// so other batch_expiry_grace_rounds rounds, so the peers on the network
/// can still fetch the data they fall behind (later, they would have to state-sync).
/// Used when checking the expiration time of the received batch against current logical time to prevent DDoS.
pub batch_expiry_round_gap_behind_latest_certified: Round,
pub batch_expiry_round_gap_beyond_latest_certified: Round,
pub batch_expiry_grace_rounds: Round,
pub batch_expiry_gap_when_init_usecs: u64,
pub memory_quota: usize,
pub db_quota: usize,
pub batch_quota: usize,
Expand All @@ -73,10 +65,7 @@ impl Default for QuorumStoreConfig {
batch_generation_max_interval_ms: 250,
max_batch_bytes: 4 * 1024 * 1024,
batch_request_timeout_ms: 10000,
batch_expiry_round_gap_when_init: 100,
batch_expiry_round_gap_behind_latest_certified: 500,
batch_expiry_round_gap_beyond_latest_certified: 500,
batch_expiry_grace_rounds: 5,
batch_expiry_gap_when_init_usecs: Duration::from_secs(60).as_micros() as u64,
memory_quota: 120_000_000,
db_quota: 300_000_000,
batch_quota: 300_000,
Expand Down
32 changes: 7 additions & 25 deletions consensus/consensus-types/src/proof_of_store.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::common::Round;
use anyhow::{bail, Context};
use aptos_crypto::{bls12381, CryptoMaterialError, HashValue};
use aptos_crypto_derive::{BCSCryptoHash, CryptoHasher};
Expand All @@ -18,26 +17,6 @@ use std::{
ops::Deref,
};

#[derive(Clone, Copy, Debug, Eq, PartialEq, PartialOrd, Ord, Deserialize, Serialize, Hash)]
pub struct LogicalTime {
epoch: u64,
round: Round,
}

impl LogicalTime {
pub fn new(epoch: u64, round: Round) -> Self {
Self { epoch, round }
}

pub fn epoch(&self) -> u64 {
self.epoch
}

pub fn round(&self) -> Round {
self.round
}
}

#[derive(
Copy, Clone, Debug, Deserialize, Serialize, PartialEq, Eq, Hash, CryptoHasher, BCSCryptoHash,
)]
Expand Down Expand Up @@ -91,7 +70,8 @@ impl Display for BatchId {
pub struct BatchInfo {
author: PeerId,
batch_id: BatchId,
expiration: LogicalTime,
epoch: u64,
expiration: u64,
digest: HashValue,
num_txns: u64,
num_bytes: u64,
Expand All @@ -101,14 +81,16 @@ impl BatchInfo {
pub fn new(
author: PeerId,
batch_id: BatchId,
expiration: LogicalTime,
epoch: u64,
expiration: u64,
digest: HashValue,
num_txns: u64,
num_bytes: u64,
) -> Self {
Self {
author,
batch_id,
epoch,
expiration,
digest,
num_txns,
Expand All @@ -117,7 +99,7 @@ impl BatchInfo {
}

pub fn epoch(&self) -> u64 {
self.expiration.epoch
self.epoch
}

pub fn author(&self) -> PeerId {
Expand All @@ -128,7 +110,7 @@ impl BatchInfo {
self.batch_id
}

pub fn expiration(&self) -> LogicalTime {
pub fn expiration(&self) -> u64 {
self.expiration
}

Expand Down
32 changes: 3 additions & 29 deletions consensus/consensus-types/src/request_response.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{
common::{Payload, PayloadFilter, Round},
proof_of_store::LogicalTime,
};
use crate::common::{Payload, PayloadFilter};
use anyhow::Result;
use aptos_crypto::HashValue;
use futures::channel::oneshot;
use std::{fmt, fmt::Formatter};

pub enum GetPayloadCommand {
/// Request to pull block to submit to consensus.
GetPayloadRequest(
Round,
// max block size
u64,
// max byte size
Expand All @@ -31,7 +26,6 @@ impl fmt::Display for GetPayloadCommand {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
GetPayloadCommand::GetPayloadRequest(
round,
max_txns,
max_bytes,
return_non_full,
Expand All @@ -40,28 +34,8 @@ impl fmt::Display for GetPayloadCommand {
) => {
write!(
f,
"GetPayloadRequest [round: {}, max_txns: {}, max_bytes: {}, return_non_full: {}, excluded: {}]",
round, max_txns, max_bytes, return_non_full, excluded
)
},
}
}
}

pub enum CleanCommand {
CleanRequest(LogicalTime, Vec<HashValue>),
}

impl fmt::Display for CleanCommand {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
match self {
CleanCommand::CleanRequest(logical_time, digests) => {
write!(
f,
"CleanRequest [epoch: {}, round: {}, digests: {:?}]",
logical_time.epoch(),
logical_time.round(),
digests
"GetPayloadRequest [max_txns: {}, max_bytes: {}, return_non_full: {}, excluded: {}]",
max_txns, max_bytes, return_non_full, excluded
)
},
}
Expand Down
1 change: 0 additions & 1 deletion consensus/src/liveness/proposal_generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,6 @@ impl ProposalGenerator {
let payload = self
.payload_client
.pull_payload(
round,
max_block_txns,
max_block_bytes,
payload_filter,
Expand Down
6 changes: 1 addition & 5 deletions consensus/src/payload_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
};
use anyhow::Result;
use aptos_consensus_types::{
common::{Payload, PayloadFilter, Round},
common::{Payload, PayloadFilter},
request_response::{GetPayloadCommand, GetPayloadResponse},
};
use aptos_logger::prelude::*;
Expand Down Expand Up @@ -55,15 +55,13 @@ impl QuorumStoreClient {

async fn pull_internal(
&self,
round: Round,
max_items: u64,
max_bytes: u64,
return_non_full: bool,
exclude_payloads: PayloadFilter,
) -> Result<Payload, QuorumStoreError> {
let (callback, callback_rcv) = oneshot::channel();
let req = GetPayloadCommand::GetPayloadRequest(
round,
max_items,
max_bytes,
return_non_full,
Expand Down Expand Up @@ -94,7 +92,6 @@ impl QuorumStoreClient {
impl PayloadClient for QuorumStoreClient {
async fn pull_payload(
&self,
round: Round,
max_items: u64,
max_bytes: u64,
exclude_payloads: PayloadFilter,
Expand Down Expand Up @@ -124,7 +121,6 @@ impl PayloadClient for QuorumStoreClient {
let done = count == 0 || start_time.elapsed().as_millis() >= max_duration;
let payload = self
.pull_internal(
round,
max_items,
max_bytes,
return_non_full || return_empty || done || self.poll_count == u64::MAX,
Expand Down
24 changes: 13 additions & 11 deletions consensus/src/payload_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
use aptos_consensus_types::{
block::Block,
common::{DataStatus, Payload},
proof_of_store::{LogicalTime, ProofOfStore},
proof_of_store::ProofOfStore,
};
use aptos_crypto::HashValue;
use aptos_executor_types::{Error::DataNotFound, *};
Expand All @@ -31,7 +31,7 @@ pub enum PayloadManager {
impl PayloadManager {
async fn request_transactions(
proofs: Vec<ProofOfStore>,
logical_time: LogicalTime,
block_timestamp: u64,
batch_store: &BatchStore<NetworkSender>,
) -> Vec<(
HashValue,
Expand All @@ -40,12 +40,12 @@ impl PayloadManager {
let mut receivers = Vec::new();
for pos in proofs {
trace!(
"QSE: requesting pos {:?}, digest {}, time = {:?}",
"QSE: requesting pos {:?}, digest {}, time = {}",
pos,
pos.digest(),
logical_time
block_timestamp
);
if logical_time <= pos.expiration() {
if block_timestamp <= pos.expiration() {
receivers.push((*pos.digest(), batch_store.get_batch(pos)));
} else {
debug!("QSE: skipped expired pos {}", pos.digest());
Expand All @@ -55,12 +55,14 @@ impl PayloadManager {
}

///Pass commit information to BatchReader and QuorumStore wrapper for their internal cleanups.
pub async fn notify_commit(&self, logical_time: LogicalTime, payloads: Vec<Payload>) {
pub async fn notify_commit(&self, block_timestamp: u64, payloads: Vec<Payload>) {
match self {
PayloadManager::DirectMempool => {},
PayloadManager::InQuorumStore(batch_store, coordinator_tx) => {
// TODO: move this to somewhere in quorum store, so this can be a batch reader
batch_store.update_certified_round(logical_time).await;
batch_store
.update_certified_timestamp(block_timestamp)
.await;

let digests: Vec<HashValue> = payloads
.into_iter()
Expand All @@ -77,7 +79,7 @@ impl PayloadManager {

if let Err(e) = tx
.send(CoordinatorCommand::CommitNotification(
logical_time,
block_timestamp,
digests,
))
.await
Expand All @@ -104,7 +106,7 @@ impl PayloadManager {
if proof_with_status.status.lock().is_none() {
let receivers = PayloadManager::request_transactions(
proof_with_status.proofs.clone(),
LogicalTime::new(block.epoch(), block.round()),
block.timestamp_usecs(),
batch_store,
)
.await;
Expand Down Expand Up @@ -160,7 +162,7 @@ impl PayloadManager {
warn!("Oneshot channel to get a batch was dropped with error {:?}", e);
let new_receivers = PayloadManager::request_transactions(
proof_with_data.proofs.clone(),
LogicalTime::new(block.epoch(), block.round()),
block.timestamp_usecs(),
batch_store,
)
.await;
Expand All @@ -177,7 +179,7 @@ impl PayloadManager {
Ok(Err(e)) => {
let new_receivers = PayloadManager::request_transactions(
proof_with_data.proofs.clone(),
LogicalTime::new(block.epoch(), block.round()),
block.timestamp_usecs(),
batch_store,
)
.await;
Expand Down
33 changes: 9 additions & 24 deletions consensus/src/quorum_store/batch_coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub enum BatchCoordinatorCommand {
}

pub struct BatchCoordinator {
epoch: u64,
my_peer_id: PeerId,
network_sender: NetworkSender,
batch_store: Arc<BatchStore<NetworkSender>>,
Expand All @@ -30,14 +29,12 @@ pub struct BatchCoordinator {

impl BatchCoordinator {
pub(crate) fn new(
epoch: u64, //TODO: pass the epoch config
my_peer_id: PeerId,
network_sender: NetworkSender,
batch_store: Arc<BatchStore<NetworkSender>>,
max_batch_bytes: u64,
) -> Self {
Self {
epoch,
my_peer_id,
network_sender,
batch_store,
Expand All @@ -47,35 +44,23 @@ impl BatchCoordinator {

async fn handle_batch(&mut self, batch: Batch) -> Option<PersistRequest> {
let source = batch.author();
let expiration = batch.expiration();
let batch_id = batch.batch_id();
trace!(
"QS: got batch message from {} batch_id {}",
source,
batch_id,
);
if expiration.epoch() == self.epoch {
counters::RECEIVED_BATCH_COUNT.inc();
let num_bytes = batch.num_bytes();
if num_bytes > self.max_batch_bytes {
error!(
"Batch from {} exceeds size limit {}, actual size: {}",
source, self.max_batch_bytes, num_bytes
);
return None;
}
let persist_request = batch.into();
return Some(persist_request);
}
// Malformed request with an inconsistent expiry epoch.
else {
trace!(
"QS: got end batch message from different epoch {} != {}",
expiration.epoch(),
self.epoch
counters::RECEIVED_BATCH_COUNT.inc();
let num_bytes = batch.num_bytes();
if num_bytes > self.max_batch_bytes {
error!(
"Batch from {} exceeds size limit {}, actual size: {}",
source, self.max_batch_bytes, num_bytes
);
return None;
}
None
let persist_request = batch.into();
Some(persist_request)
}

fn persist_and_send_digest(&self, persist_request: PersistRequest) {
Expand Down
Loading

0 comments on commit c3d3b8a

Please sign in to comment.