From 2130b239bb748e6de1d5d5fd91133de61bc6c721 Mon Sep 17 00:00:00 2001 From: Brian Cho Date: Wed, 16 Nov 2022 16:28:52 -0800 Subject: [PATCH] [quorum store] create onchain config for quorum store, connect it to mempool broadcasts --- aptos-node/src/lib.rs | 10 ---- config/src/config/consensus_config.rs | 3 -- config/src/config/mempool_config.rs | 2 - mempool/src/shared_mempool/coordinator.rs | 5 +- mempool/src/shared_mempool/tasks.rs | 16 +++++- mempool/src/shared_mempool/types.rs | 4 +- types/src/on_chain_config/consensus_config.rs | 53 ++++++++++++++++++- 7 files changed, 72 insertions(+), 21 deletions(-) diff --git a/aptos-node/src/lib.rs b/aptos-node/src/lib.rs index 2f7acacc8c2f1..e6df4e200274d 100644 --- a/aptos-node/src/lib.rs +++ b/aptos-node/src/lib.rs @@ -837,16 +837,6 @@ pub fn setup_environment( ); debug!("Mempool started in {} ms", instant.elapsed().as_millis()); - assert!( - !node_config.consensus.use_quorum_store, - "QuorumStore is not yet implemented" - ); - assert_ne!( - node_config.consensus.use_quorum_store, - node_config.mempool.shared_mempool_validator_broadcast, - "Shared mempool validator broadcast must be turned off when QuorumStore is on, and vice versa" - ); - // StateSync should be instantiated and started before Consensus to avoid a cyclic dependency: // network provider -> consensus -> state synchronizer -> network provider. This has resulted // in a deadlock as observed in GitHub issue #749. diff --git a/config/src/config/consensus_config.rs b/config/src/config/consensus_config.rs index 893ea996a5637..adcbdf471d876 100644 --- a/config/src/config/consensus_config.rs +++ b/config/src/config/consensus_config.rs @@ -25,8 +25,6 @@ pub struct ConsensusConfig { // validators coordinate on the latest version to apply a manual transaction. pub sync_only: bool, pub channel_size: usize, - // When false, use the Direct Mempool Quorum Store - pub use_quorum_store: bool, pub quorum_store_pull_timeout_ms: u64, // Decides how long the leader waits before proposing empty block if there's no txns in mempool // the period = (poll_count - 1) * 30ms @@ -67,7 +65,6 @@ impl Default for ConsensusConfig { safety_rules: SafetyRulesConfig::default(), sync_only: false, channel_size: 30, // hard-coded - use_quorum_store: false, quorum_store_pull_timeout_ms: 1000, quorum_store_poll_count: 10, diff --git a/config/src/config/mempool_config.rs b/config/src/config/mempool_config.rs index 928681361c10b..a0a0e64d8c08b 100644 --- a/config/src/config/mempool_config.rs +++ b/config/src/config/mempool_config.rs @@ -25,7 +25,6 @@ pub struct MempoolConfig { pub shared_mempool_tick_interval_ms: u64, pub system_transaction_timeout_secs: u64, pub system_transaction_gc_interval_ms: u64, - pub shared_mempool_validator_broadcast: bool, pub broadcast_buckets: Vec, } @@ -46,7 +45,6 @@ impl Default for MempoolConfig { default_failovers: 3, system_transaction_timeout_secs: 600, system_transaction_gc_interval_ms: 60_000, - shared_mempool_validator_broadcast: true, broadcast_buckets: DEFAULT_BROADCAST_BUCKETS.to_vec(), } } diff --git a/mempool/src/shared_mempool/coordinator.rs b/mempool/src/shared_mempool/coordinator.rs index dda771f16dfc7..665ea2dda0088 100644 --- a/mempool/src/shared_mempool/coordinator.rs +++ b/mempool/src/shared_mempool/coordinator.rs @@ -227,6 +227,7 @@ async fn handle_mempool_reconfig_event( .spawn(tasks::process_config_update( config_update, smp.validator.clone(), + smp.broadcast_within_validator_network.clone(), )) .await; } @@ -284,8 +285,8 @@ async fn handle_network_event( } => { let smp_clone = smp.clone(); let peer = PeerNetworkId::new(network_id, peer_id); - let ineligible_for_broadcast = (!smp.broadcast_within_validator_network() - && smp.network_interface.is_validator()) + let ineligible_for_broadcast = (smp.network_interface.is_validator() + && !smp.broadcast_within_validator_network()) || smp.network_interface.is_upstream_peer(&peer, None); let timeline_state = if ineligible_for_broadcast { TimelineState::NonQualified diff --git a/mempool/src/shared_mempool/tasks.rs b/mempool/src/shared_mempool/tasks.rs index c8a393e0d71f8..de25b765726a7 100644 --- a/mempool/src/shared_mempool/tasks.rs +++ b/mempool/src/shared_mempool/tasks.rs @@ -20,6 +20,7 @@ use aptos_crypto::HashValue; use aptos_infallible::{Mutex, RwLock}; use aptos_logger::prelude::*; use aptos_metrics_core::HistogramTimer; +use aptos_types::on_chain_config::OnChainConsensusConfig; use aptos_types::{ mempool_status::{MempoolStatus, MempoolStatusCode}, on_chain_config::OnChainConfigPayload, @@ -109,7 +110,7 @@ pub(crate) async fn process_client_transaction_submission( timer.stop_and_record(); let _timer = counters::process_txn_submit_latency_timer_client(); let ineligible_for_broadcast = - !smp.broadcast_within_validator_network() && smp.network_interface.is_validator(); + smp.network_interface.is_validator() && !smp.broadcast_within_validator_network(); let timeline_state = if ineligible_for_broadcast { TimelineState::NonQualified } else { @@ -501,6 +502,7 @@ pub(crate) fn process_rejected_transactions( pub(crate) async fn process_config_update( config_update: OnChainConfigPayload, validator: Arc>, + broadcast_within_validator_network: Arc>, ) where V: TransactionValidation, { @@ -509,8 +511,18 @@ pub(crate) async fn process_config_update( .reconfig_update(config_update.clone()) ); - if let Err(e) = validator.write().restart(config_update) { + if let Err(e) = validator.write().restart(config_update.clone()) { counters::VM_RECONFIG_UPDATE_FAIL_COUNT.inc(); error!(LogSchema::event_log(LogEntry::ReconfigUpdate, LogEvent::VMUpdateFail).error(&e)); } + + let consensus_config: anyhow::Result = config_update.get(); + if let Err(error) = &consensus_config { + error!( + "Failed to read on-chain consensus config, using default: {}", + error + ); + } + *broadcast_within_validator_network.write() = + !consensus_config.unwrap_or_default().enable_quorum_store(); } diff --git a/mempool/src/shared_mempool/types.rs b/mempool/src/shared_mempool/types.rs index 82bb88e9ac004..ae76c29cbf312 100644 --- a/mempool/src/shared_mempool/types.rs +++ b/mempool/src/shared_mempool/types.rs @@ -49,6 +49,7 @@ where pub db: Arc, pub validator: Arc>, pub subscribers: Vec>, + pub broadcast_within_validator_network: Arc>, } impl SharedMempool { @@ -75,11 +76,12 @@ impl SharedMempool { db, validator, subscribers, + broadcast_within_validator_network: Arc::new(RwLock::new(true)), } } pub fn broadcast_within_validator_network(&self) -> bool { - self.config.shared_mempool_validator_broadcast + *self.broadcast_within_validator_network.read() } } diff --git a/types/src/on_chain_config/consensus_config.rs b/types/src/on_chain_config/consensus_config.rs index 594b6de8cedf6..865ec782bf071 100644 --- a/types/src/on_chain_config/consensus_config.rs +++ b/types/src/on_chain_config/consensus_config.rs @@ -12,6 +12,7 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] pub enum OnChainConsensusConfig { V1(ConsensusConfigV1), + V2(ConsensusConfigV2), } /// The public interface that exposes all values with safe fallback. @@ -20,6 +21,7 @@ impl OnChainConsensusConfig { pub fn leader_reputation_exclude_round(&self) -> u64 { match &self { OnChainConsensusConfig::V1(config) => config.exclude_round, + OnChainConsensusConfig::V2(config) => config.exclude_round, } } @@ -27,6 +29,7 @@ impl OnChainConsensusConfig { pub fn decoupled_execution(&self) -> bool { match &self { OnChainConsensusConfig::V1(config) => config.decoupled_execution, + OnChainConsensusConfig::V2(config) => config.decoupled_execution, } } @@ -39,6 +42,7 @@ impl OnChainConsensusConfig { } match &self { OnChainConsensusConfig::V1(config) => config.back_pressure_limit, + OnChainConsensusConfig::V2(config) => config.back_pressure_limit, } } @@ -47,6 +51,7 @@ impl OnChainConsensusConfig { pub fn max_failed_authors_to_store(&self) -> usize { match &self { OnChainConsensusConfig::V1(config) => config.max_failed_authors_to_store, + OnChainConsensusConfig::V2(config) => config.max_failed_authors_to_store, } } @@ -54,6 +59,14 @@ impl OnChainConsensusConfig { pub fn proposer_election_type(&self) -> &ProposerElectionType { match &self { OnChainConsensusConfig::V1(config) => &config.proposer_election_type, + OnChainConsensusConfig::V2(config) => &config.proposer_election_type, + } + } + + pub fn enable_quorum_store(&self) -> bool { + match &self { + OnChainConsensusConfig::V1(_config) => false, + OnChainConsensusConfig::V2(config) => config.enable_quorum_store, } } } @@ -61,7 +74,7 @@ impl OnChainConsensusConfig { /// This is used when on-chain config is not initialized. impl Default for OnChainConsensusConfig { fn default() -> Self { - OnChainConsensusConfig::V1(ConsensusConfigV1::default()) + OnChainConsensusConfig::V2(ConsensusConfigV2::default()) } } @@ -119,6 +132,44 @@ impl Default for ConsensusConfigV1 { } } +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub struct ConsensusConfigV2 { + pub decoupled_execution: bool, + pub back_pressure_limit: u64, + pub exclude_round: u64, + pub proposer_election_type: ProposerElectionType, + pub max_failed_authors_to_store: usize, + pub enable_quorum_store: bool, +} + +impl Default for ConsensusConfigV2 { + fn default() -> Self { + Self { + decoupled_execution: true, + back_pressure_limit: 10, + exclude_round: 20, + max_failed_authors_to_store: 10, + proposer_election_type: ProposerElectionType::LeaderReputation( + LeaderReputationType::ProposerAndVoter(ProposerAndVoterConfig { + active_weight: 1000, + inactive_weight: 10, + failed_weight: 1, + failure_threshold_percent: 10, // = 10% + // In each round we get stastics for the single proposer + // and large number of validators. So the window for + // the proposers needs to be significantly larger + // to have enough useful statistics. + proposer_window_num_validators_multiplier: 10, + voter_window_num_validators_multiplier: 1, + weight_by_voting_power: true, + use_history_from_previous_epoch_max_count: 5, + }), + ), + enable_quorum_store: false, + } + } +} + #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] #[serde(rename_all = "snake_case")] // cannot use tag = "type" as nested enums cannot work, and bcs doesn't support it pub enum ProposerElectionType {