Skip to content

Commit

Permalink
[quorum store] create onchain config for quorum store, connect it to …
Browse files Browse the repository at this point in the history
…mempool broadcasts
  • Loading branch information
bchocho committed Nov 17, 2022
1 parent 579848d commit 2130b23
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 21 deletions.
10 changes: 0 additions & 10 deletions aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,16 +837,6 @@ pub fn setup_environment(
);
debug!("Mempool started in {} ms", instant.elapsed().as_millis());

assert!(
!node_config.consensus.use_quorum_store,
"QuorumStore is not yet implemented"
);
assert_ne!(
node_config.consensus.use_quorum_store,
node_config.mempool.shared_mempool_validator_broadcast,
"Shared mempool validator broadcast must be turned off when QuorumStore is on, and vice versa"
);

// StateSync should be instantiated and started before Consensus to avoid a cyclic dependency:
// network provider -> consensus -> state synchronizer -> network provider. This has resulted
// in a deadlock as observed in GitHub issue #749.
Expand Down
3 changes: 0 additions & 3 deletions config/src/config/consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ pub struct ConsensusConfig {
// validators coordinate on the latest version to apply a manual transaction.
pub sync_only: bool,
pub channel_size: usize,
// When false, use the Direct Mempool Quorum Store
pub use_quorum_store: bool,
pub quorum_store_pull_timeout_ms: u64,
// Decides how long the leader waits before proposing empty block if there's no txns in mempool
// the period = (poll_count - 1) * 30ms
Expand Down Expand Up @@ -67,7 +65,6 @@ impl Default for ConsensusConfig {
safety_rules: SafetyRulesConfig::default(),
sync_only: false,
channel_size: 30, // hard-coded
use_quorum_store: false,

quorum_store_pull_timeout_ms: 1000,
quorum_store_poll_count: 10,
Expand Down
2 changes: 0 additions & 2 deletions config/src/config/mempool_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ pub struct MempoolConfig {
pub shared_mempool_tick_interval_ms: u64,
pub system_transaction_timeout_secs: u64,
pub system_transaction_gc_interval_ms: u64,
pub shared_mempool_validator_broadcast: bool,
pub broadcast_buckets: Vec<u64>,
}

Expand All @@ -46,7 +45,6 @@ impl Default for MempoolConfig {
default_failovers: 3,
system_transaction_timeout_secs: 600,
system_transaction_gc_interval_ms: 60_000,
shared_mempool_validator_broadcast: true,
broadcast_buckets: DEFAULT_BROADCAST_BUCKETS.to_vec(),
}
}
Expand Down
5 changes: 3 additions & 2 deletions mempool/src/shared_mempool/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ async fn handle_mempool_reconfig_event<V>(
.spawn(tasks::process_config_update(
config_update,
smp.validator.clone(),
smp.broadcast_within_validator_network.clone(),
))
.await;
}
Expand Down Expand Up @@ -284,8 +285,8 @@ async fn handle_network_event<V>(
} => {
let smp_clone = smp.clone();
let peer = PeerNetworkId::new(network_id, peer_id);
let ineligible_for_broadcast = (!smp.broadcast_within_validator_network()
&& smp.network_interface.is_validator())
let ineligible_for_broadcast = (smp.network_interface.is_validator()
&& !smp.broadcast_within_validator_network())
|| smp.network_interface.is_upstream_peer(&peer, None);
let timeline_state = if ineligible_for_broadcast {
TimelineState::NonQualified
Expand Down
16 changes: 14 additions & 2 deletions mempool/src/shared_mempool/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use aptos_crypto::HashValue;
use aptos_infallible::{Mutex, RwLock};
use aptos_logger::prelude::*;
use aptos_metrics_core::HistogramTimer;
use aptos_types::on_chain_config::OnChainConsensusConfig;
use aptos_types::{
mempool_status::{MempoolStatus, MempoolStatusCode},
on_chain_config::OnChainConfigPayload,
Expand Down Expand Up @@ -109,7 +110,7 @@ pub(crate) async fn process_client_transaction_submission<V>(
timer.stop_and_record();
let _timer = counters::process_txn_submit_latency_timer_client();
let ineligible_for_broadcast =
!smp.broadcast_within_validator_network() && smp.network_interface.is_validator();
smp.network_interface.is_validator() && !smp.broadcast_within_validator_network();
let timeline_state = if ineligible_for_broadcast {
TimelineState::NonQualified
} else {
Expand Down Expand Up @@ -501,6 +502,7 @@ pub(crate) fn process_rejected_transactions(
pub(crate) async fn process_config_update<V>(
config_update: OnChainConfigPayload,
validator: Arc<RwLock<V>>,
broadcast_within_validator_network: Arc<RwLock<bool>>,
) where
V: TransactionValidation,
{
Expand All @@ -509,8 +511,18 @@ pub(crate) async fn process_config_update<V>(
.reconfig_update(config_update.clone())
);

if let Err(e) = validator.write().restart(config_update) {
if let Err(e) = validator.write().restart(config_update.clone()) {
counters::VM_RECONFIG_UPDATE_FAIL_COUNT.inc();
error!(LogSchema::event_log(LogEntry::ReconfigUpdate, LogEvent::VMUpdateFail).error(&e));
}

let consensus_config: anyhow::Result<OnChainConsensusConfig> = config_update.get();
if let Err(error) = &consensus_config {
error!(
"Failed to read on-chain consensus config, using default: {}",
error
);
}
*broadcast_within_validator_network.write() =
!consensus_config.unwrap_or_default().enable_quorum_store();
}
4 changes: 3 additions & 1 deletion mempool/src/shared_mempool/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ where
pub db: Arc<dyn DbReader>,
pub validator: Arc<RwLock<V>>,
pub subscribers: Vec<UnboundedSender<SharedMempoolNotification>>,
pub broadcast_within_validator_network: Arc<RwLock<bool>>,
}

impl<V: TransactionValidation + 'static> SharedMempool<V> {
Expand All @@ -75,11 +76,12 @@ impl<V: TransactionValidation + 'static> SharedMempool<V> {
db,
validator,
subscribers,
broadcast_within_validator_network: Arc::new(RwLock::new(true)),
}
}

pub fn broadcast_within_validator_network(&self) -> bool {
self.config.shared_mempool_validator_broadcast
*self.broadcast_within_validator_network.read()
}
}

Expand Down
53 changes: 52 additions & 1 deletion types/src/on_chain_config/consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub enum OnChainConsensusConfig {
V1(ConsensusConfigV1),
V2(ConsensusConfigV2),
}

/// The public interface that exposes all values with safe fallback.
Expand All @@ -20,13 +21,15 @@ impl OnChainConsensusConfig {
pub fn leader_reputation_exclude_round(&self) -> u64 {
match &self {
OnChainConsensusConfig::V1(config) => config.exclude_round,
OnChainConsensusConfig::V2(config) => config.exclude_round,
}
}

/// Decouple execution from consensus or not.
pub fn decoupled_execution(&self) -> bool {
match &self {
OnChainConsensusConfig::V1(config) => config.decoupled_execution,
OnChainConsensusConfig::V2(config) => config.decoupled_execution,
}
}

Expand All @@ -39,6 +42,7 @@ impl OnChainConsensusConfig {
}
match &self {
OnChainConsensusConfig::V1(config) => config.back_pressure_limit,
OnChainConsensusConfig::V2(config) => config.back_pressure_limit,
}
}

Expand All @@ -47,21 +51,30 @@ impl OnChainConsensusConfig {
pub fn max_failed_authors_to_store(&self) -> usize {
match &self {
OnChainConsensusConfig::V1(config) => config.max_failed_authors_to_store,
OnChainConsensusConfig::V2(config) => config.max_failed_authors_to_store,
}
}

// Type and configuration used for proposer election.
pub fn proposer_election_type(&self) -> &ProposerElectionType {
match &self {
OnChainConsensusConfig::V1(config) => &config.proposer_election_type,
OnChainConsensusConfig::V2(config) => &config.proposer_election_type,
}
}

pub fn enable_quorum_store(&self) -> bool {
match &self {
OnChainConsensusConfig::V1(_config) => false,
OnChainConsensusConfig::V2(config) => config.enable_quorum_store,
}
}
}

/// This is used when on-chain config is not initialized.
impl Default for OnChainConsensusConfig {
fn default() -> Self {
OnChainConsensusConfig::V1(ConsensusConfigV1::default())
OnChainConsensusConfig::V2(ConsensusConfigV2::default())
}
}

Expand Down Expand Up @@ -119,6 +132,44 @@ impl Default for ConsensusConfigV1 {
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
pub struct ConsensusConfigV2 {
pub decoupled_execution: bool,
pub back_pressure_limit: u64,
pub exclude_round: u64,
pub proposer_election_type: ProposerElectionType,
pub max_failed_authors_to_store: usize,
pub enable_quorum_store: bool,
}

impl Default for ConsensusConfigV2 {
fn default() -> Self {
Self {
decoupled_execution: true,
back_pressure_limit: 10,
exclude_round: 20,
max_failed_authors_to_store: 10,
proposer_election_type: ProposerElectionType::LeaderReputation(
LeaderReputationType::ProposerAndVoter(ProposerAndVoterConfig {
active_weight: 1000,
inactive_weight: 10,
failed_weight: 1,
failure_threshold_percent: 10, // = 10%
// In each round we get stastics for the single proposer
// and large number of validators. So the window for
// the proposers needs to be significantly larger
// to have enough useful statistics.
proposer_window_num_validators_multiplier: 10,
voter_window_num_validators_multiplier: 1,
weight_by_voting_power: true,
use_history_from_previous_epoch_max_count: 5,
}),
),
enable_quorum_store: false,
}
}
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
#[serde(rename_all = "snake_case")] // cannot use tag = "type" as nested enums cannot work, and bcs doesn't support it
pub enum ProposerElectionType {
Expand Down

0 comments on commit 2130b23

Please sign in to comment.