Skip to content

Commit

Permalink
Merge branch 'main' into rex/circuit_comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rex1fernando authored Sep 18, 2024
2 parents 385d146 + 30f76bf commit f653f8d
Show file tree
Hide file tree
Showing 18 changed files with 94 additions and 466 deletions.
39 changes: 1 addition & 38 deletions config/src/config/consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,48 +93,11 @@ pub struct ConsensusConfig {
pub max_pending_rounds_in_commit_vote_cache: u64,
}

/// Deprecated
#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
pub enum QcAggregatorType {
#[default]
NoDelay,
Delayed(DelayedQcAggregatorConfig),
}

impl QcAggregatorType {
pub fn default_delayed() -> Self {
// TODO: Enable the delayed aggregation by default once we have tested it more.
Self::Delayed(DelayedQcAggregatorConfig::default())
}
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
pub struct DelayedQcAggregatorConfig {
// Maximum Delay for a QC to be aggregated after round start (in milliseconds). This assumes that
// we have enough voting power to form a QC. If we don't have enough voting power, we will wait
// until we have enough voting power to form a QC.
pub max_delay_after_round_start_ms: u64,
// Percentage of aggregated voting power to wait for before aggregating a QC. For example, if this
// is set to 95% then, a QC is formed as soon as we have 95% of the voting power aggregated without
// any additional waiting.
pub aggregated_voting_power_pct_to_wait: usize,
// This knob control what is the % of the time (as compared to time between round start and time when we
// have enough voting power to form a QC) we wait after we have enough voting power to form a QC. In a sense,
// this knobs controls how much slower we are willing to make consensus to wait for more votes.
pub pct_delay_after_qc_aggregated: usize,
// In summary, let's denote the time we have enough voting power (2f + 1) to form a QC as T1 and
// the time we have aggregated `aggregated_voting_power_pct_to_wait` as T2. Then, we wait for
// min((T1 + `pct_delay_after_qc_aggregated` * T1 / 100), `max_delay_after_round_start_ms`, T2)
// before forming a QC.
}

impl Default for DelayedQcAggregatorConfig {
fn default() -> Self {
Self {
max_delay_after_round_start_ms: 700,
aggregated_voting_power_pct_to_wait: 90,
pct_delay_after_qc_aggregated: 30,
}
}
}

/// Execution backpressure which handles gas/s variance,
Expand Down
4 changes: 2 additions & 2 deletions config/src/config/consensus_observer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use serde::{Deserialize, Serialize};
use serde_yaml::Value;

// Useful constants for enabling consensus observer on different node types
const ENABLE_ON_VALIDATORS: bool = false;
const ENABLE_ON_VALIDATOR_FULLNODES: bool = false;
const ENABLE_ON_VALIDATORS: bool = true;
const ENABLE_ON_VALIDATOR_FULLNODES: bool = true;
const ENABLE_ON_PUBLIC_FULLNODES: bool = false;

#[derive(Clone, Copy, Debug, Deserialize, PartialEq, Serialize)]
Expand Down
32 changes: 0 additions & 32 deletions consensus/consensus-types/src/delayed_qc_msg.rs

This file was deleted.

1 change: 0 additions & 1 deletion consensus/consensus-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub mod block;
pub mod block_data;
pub mod block_retrieval;
pub mod common;
pub mod delayed_qc_msg;
pub mod epoch_retrieval;
pub mod order_vote;
pub mod order_vote_msg;
Expand Down
10 changes: 2 additions & 8 deletions consensus/src/block_storage/block_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ use crate::{
test_utils::{
build_empty_tree, build_simple_tree, consensus_runtime, timed_block_on, TreeInserter,
},
util::mock_time_service::SimulatedTimeService,
};
use aptos_config::config::QcAggregatorType;
use aptos_consensus_types::{
block::{
block_test_utils::{
Expand All @@ -27,9 +25,8 @@ use aptos_crypto::{HashValue, PrivateKey};
use aptos_types::{
validator_signer::ValidatorSigner, validator_verifier::random_validator_verifier,
};
use futures_channel::mpsc::unbounded;
use proptest::prelude::*;
use std::{cmp::min, collections::HashSet, sync::Arc};
use std::{cmp::min, collections::HashSet};

#[tokio::test]
async fn test_highest_block_and_quorum_cert() {
Expand Down Expand Up @@ -284,11 +281,8 @@ async fn test_insert_vote() {
let block = inserter
.insert_block_with_qc(certificate_for_genesis(), &genesis, 1)
.await;
let time_service = Arc::new(SimulatedTimeService::new());
let (delayed_qc_tx, _) = unbounded();

let mut pending_votes =
PendingVotes::new(time_service, delayed_qc_tx, QcAggregatorType::NoDelay);
let mut pending_votes = PendingVotes::new();

assert!(block_store.get_quorum_cert_for_block(block.id()).is_none());
for (i, voter) in signers.iter().enumerate().take(10).skip(1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl ConsensusObserverNetworkHandler {
None => {
error!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Missing response sender for RCP request: {:?}",
"Missing response sender for the RPC request: {:?}",
request
))
);
Expand Down
45 changes: 36 additions & 9 deletions consensus/src/consensus_observer/network/observer_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,23 +663,50 @@ impl BlockPayload {

/// Verifies the block payload digests and returns an error if the data is invalid
pub fn verify_payload_digests(&self) -> Result<(), Error> {
// Verify the proof of store digests against the transaction
// Get the transactions, payload proofs and inline batches
let transactions = self.transaction_payload.transactions();
let payload_proofs = self.transaction_payload.payload_proofs();
let inline_batches = self.transaction_payload.inline_batches();

// Get the number of transactions, payload proofs and inline batches
let num_transactions = transactions.len();
let num_payload_proofs = payload_proofs.len();
let num_inline_batches = inline_batches.len();

// Verify the payload proof digests using the transactions
let mut transactions_iter = transactions.iter();
for proof_of_store in &self.transaction_payload.payload_proofs() {
reconstruct_and_verify_batch(&mut transactions_iter, proof_of_store.info())?;
for proof_of_store in &payload_proofs {
reconstruct_and_verify_batch(&mut transactions_iter, proof_of_store.info()).map_err(
|error| {
Error::InvalidMessageError(format!(
"Failed to verify payload proof digests! Num transactions: {:?}, \
num batches: {:?}, num inline batches: {:?}, failed batch: {:?}, Error: {:?}",
num_transactions, num_payload_proofs, num_inline_batches, proof_of_store.info(), error
))
},
)?;
}

// Verify the inline batch digests against the inline batches
for batch_info in self.transaction_payload.inline_batches() {
reconstruct_and_verify_batch(&mut transactions_iter, batch_info)?;
// Verify the inline batch digests using the transactions
for batch_info in inline_batches.into_iter() {
reconstruct_and_verify_batch(&mut transactions_iter, batch_info).map_err(
|error| {
Error::InvalidMessageError(format!(
"Failed to verify inline batch digests! Num transactions: {:?}, \
num batches: {:?}, num inline batches: {:?}, failed batch: {:?}, Error: {:?}",
num_transactions, num_payload_proofs, num_inline_batches, batch_info, error
))
},
)?;
}

// Verify that there are no transactions remaining
// Verify that there are no transactions remaining (all transactions should be consumed)
let remaining_transactions = transactions_iter.as_slice();
if !remaining_transactions.is_empty() {
return Err(Error::InvalidMessageError(format!(
"Failed to verify payload transactions! Transactions remaining: {:?}. Expected: 0",
"Failed to verify payload transactions! Num transactions: {:?}, \
transactions remaining: {:?}. Expected: 0",
num_transactions,
remaining_transactions.len()
)));
}
Expand Down Expand Up @@ -740,7 +767,7 @@ fn reconstruct_and_verify_batch(
let expected_digest = expected_batch_info.digest();
if batch_digest != *expected_digest {
return Err(Error::InvalidMessageError(format!(
"The reconstructed batch digest does not match the expected digest!\
"The reconstructed batch digest does not match the expected digest! \
Batch: {:?}, Expected digest: {:?}, Reconstructed digest: {:?}",
expected_batch_info, expected_digest, batch_digest
)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::consensus_observer::{
};
use aptos_config::{config::ConsensusObserverConfig, network_id::PeerNetworkId};
use aptos_infallible::Mutex;
use aptos_logger::{error, info, warn};
use aptos_logger::{info, warn};
use aptos_network::application::{interface::NetworkClient, metadata::PeerMetadata};
use aptos_storage_interface::DbReader;
use aptos_time_service::TimeService;
Expand Down Expand Up @@ -157,7 +157,7 @@ impl SubscriptionManager {
.get_connected_peers_and_metadata()
.unwrap_or_else(|error| {
// Log the error
error!(
warn!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Failed to get connected peers and metadata! Error: {:?}",
error
Expand Down Expand Up @@ -327,7 +327,7 @@ impl SubscriptionManager {
},
Err(error) => {
// We encountered an error while sending the request
error!(
warn!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Failed to send unsubscribe request to peer: {}! Error: {:?}",
peer_network_id, error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ async fn create_single_subscription(
},
Err(error) => {
// We encountered an error while sending the request
error!(
warn!(
LogSchema::new(LogEntry::ConsensusObserver).message(&format!(
"Failed to send subscription request to peer: {}! Error: {:?}",
potential_peer, error
Expand Down
37 changes: 6 additions & 31 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,9 @@ use crate::{
use anyhow::{anyhow, bail, ensure, Context};
use aptos_bounded_executor::BoundedExecutor;
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::config::{
ConsensusConfig, DagConsensusConfig, ExecutionConfig, NodeConfig, QcAggregatorType,
};
use aptos_config::config::{ConsensusConfig, DagConsensusConfig, ExecutionConfig, NodeConfig};
use aptos_consensus_types::{
common::{Author, Round},
delayed_qc_msg::DelayedQcMsg,
epoch_retrieval::EpochRetrievalRequest,
proof_of_store::ProofCache,
utils::PayloadTxnsSize,
Expand Down Expand Up @@ -96,11 +93,7 @@ use aptos_types::{
use aptos_validator_transaction_pool::VTxnPoolState;
use fail::fail_point;
use futures::{
channel::{
mpsc,
mpsc::{unbounded, Sender, UnboundedSender},
oneshot,
},
channel::{mpsc, mpsc::Sender, oneshot},
SinkExt, StreamExt,
};
use itertools::Itertools;
Expand Down Expand Up @@ -265,21 +258,13 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
&self,
time_service: Arc<dyn TimeService>,
timeout_sender: aptos_channels::Sender<Round>,
delayed_qc_tx: UnboundedSender<DelayedQcMsg>,
qc_aggregator_type: QcAggregatorType,
) -> RoundState {
let time_interval = Box::new(ExponentialTimeInterval::new(
Duration::from_millis(self.config.round_initial_timeout_ms),
self.config.round_timeout_backoff_exponent_base,
self.config.round_timeout_backoff_max_exponent,
));
RoundState::new(
time_interval,
time_service,
timeout_sender,
delayed_qc_tx,
qc_aggregator_type,
)
RoundState::new(time_interval, time_service, timeout_sender)
}

/// Create a proposer election handler based on proposers
Expand Down Expand Up @@ -793,15 +778,10 @@ impl<P: OnChainConfigProvider> EpochManager<P> {
"Unable to initialize safety rules.",
);
}
let (delayed_qc_tx, delayed_qc_rx) = unbounded();

info!(epoch = epoch, "Create RoundState");
let round_state = self.create_round_state(
self.time_service.clone(),
self.timeout_sender.clone(),
delayed_qc_tx,
self.config.qc_aggregator_type.clone(),
);
let round_state =
self.create_round_state(self.time_service.clone(), self.timeout_sender.clone());

info!(epoch = epoch, "Create ProposerElection");
let proposer_election =
Expand Down Expand Up @@ -913,12 +893,7 @@ impl<P: OnChainConfigProvider> EpochManager<P> {

let (close_tx, close_rx) = oneshot::channel();
self.round_manager_close_tx = Some(close_tx);
tokio::spawn(round_manager.start(
round_manager_rx,
buffered_proposal_rx,
delayed_qc_rx,
close_rx,
));
tokio::spawn(round_manager.start(round_manager_rx, buffered_proposal_rx, close_rx));

self.spawn_block_retrieval_task(epoch, block_store, max_blocks_allowed);
}
Expand Down
1 change: 0 additions & 1 deletion consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ mod execution_pipeline;
/// AptosNet interface.
pub mod network_interface;
mod payload_manager;
mod qc_aggregator;
mod transaction_deduper;
mod transaction_filter;
mod transaction_shuffler;
Expand Down
Loading

0 comments on commit f653f8d

Please sign in to comment.