diff --git a/aptos-node/src/lib.rs b/aptos-node/src/lib.rs index 91ded78672469..5d43ef85ba3cb 100644 --- a/aptos-node/src/lib.rs +++ b/aptos-node/src/lib.rs @@ -384,7 +384,6 @@ fn create_single_node_test_config( node_config .mempool .shared_mempool_max_concurrent_inbound_syncs = 1; - node_config.mempool.default_failovers = 1; node_config.mempool.max_broadcasts_per_peer = 1; node_config diff --git a/config/src/config/mempool_config.rs b/config/src/config/mempool_config.rs index 032691833e2fa..d48861e0c0d03 100644 --- a/config/src/config/mempool_config.rs +++ b/config/src/config/mempool_config.rs @@ -20,9 +20,6 @@ pub struct MempoolConfig { pub capacity_bytes: usize, /// Maximum number of transactions allowed in the Mempool per user pub capacity_per_user: usize, - // TODO: rename - /// Number of failover peers to broadcast to when the primary network is alive - pub default_failovers: usize, /// The maximum number of broadcasts sent to a single peer that are pending a response ACK at any point. pub max_broadcasts_per_peer: usize, /// Maximum number of inbound network messages to the Mempool application @@ -72,7 +69,6 @@ impl Default for MempoolConfig { capacity: 2_000_000, capacity_bytes: 2 * 1024 * 1024 * 1024, capacity_per_user: 100, - default_failovers: 1, system_transaction_timeout_secs: 600, system_transaction_gc_interval_ms: 60_000, broadcast_buckets: DEFAULT_BUCKETS.to_vec(), @@ -119,11 +115,11 @@ impl ConfigOptimizer for MempoolConfig { modified_config = true; } - // Set the default_failovers to 0 (default is 1) - if local_mempool_config_yaml["default_failovers"].is_null() { - mempool_config.default_failovers = 0; - modified_config = true; - } + // Set broadcast peers to prioritized, with a max of 1 + // TODO: as a workaround for smoke tests, always apply even if there is an override + mempool_config.broadcast_peers_selector = + BroadcastPeersSelectorConfig::PrioritizedPeers(1); + modified_config = true; // Set the shared_mempool_batch_size to 200 (default is 100) if local_mempool_config_yaml["shared_mempool_batch_size"].is_null() { @@ -136,6 +132,14 @@ impl ConfigOptimizer for MempoolConfig { mempool_config.shared_mempool_tick_interval_ms = 10; modified_config = true; } + } else if node_type.is_validator() { + // None for now + } else { + // The node is a PFN + // Set broadcast peers to fresh, with a max of 2 + // TODO: as a workaround for smoke tests, always apply even if there is an override + mempool_config.broadcast_peers_selector = BroadcastPeersSelectorConfig::FreshPeers(2); + modified_config = true; } Ok(modified_config) @@ -146,8 +150,8 @@ impl ConfigOptimizer for MempoolConfig { #[serde(rename_all = "snake_case")] pub enum BroadcastPeersSelectorConfig { AllPeers, - FreshPeers(u64), - PrioritizedPeers(u64), + FreshPeers(usize), + PrioritizedPeers(usize), } #[cfg(test)] @@ -176,7 +180,10 @@ mod tests { 16 ); assert_eq!(mempool_config.max_broadcasts_per_peer, 4); - assert_eq!(mempool_config.default_failovers, 0); + assert_eq!( + mempool_config.broadcast_peers_selector, + BroadcastPeersSelectorConfig::PrioritizedPeers(1) + ); assert_eq!(mempool_config.shared_mempool_batch_size, 200); assert_eq!(mempool_config.shared_mempool_tick_interval_ms, 10); } @@ -208,8 +215,8 @@ mod tests { default_mempool_config.max_broadcasts_per_peer ); assert_eq!( - mempool_config.default_failovers, - default_mempool_config.default_failovers + mempool_config.broadcast_peers_selector, + default_mempool_config.broadcast_peers_selector ); assert_eq!( mempool_config.shared_mempool_batch_size, @@ -223,7 +230,7 @@ mod tests { #[test] fn test_optimize_vfn_config_no_overrides() { - // Create the default validator config + // Create the default vfn config let mut node_config = NodeConfig::get_default_vfn_config(); // Create a local config YAML with some local overrides @@ -255,8 +262,8 @@ mod tests { ); assert_eq!(mempool_config.max_broadcasts_per_peer, 1); assert_ne!( - mempool_config.default_failovers, - default_mempool_config.default_failovers + mempool_config.broadcast_peers_selector, + default_mempool_config.broadcast_peers_selector ); assert_ne!( mempool_config.shared_mempool_batch_size, diff --git a/mempool/src/core_mempool/index.rs b/mempool/src/core_mempool/index.rs index e46357da28b9f..1ad58730a289b 100644 --- a/mempool/src/core_mempool/index.rs +++ b/mempool/src/core_mempool/index.rs @@ -236,7 +236,7 @@ impl TimelineIndex { batch.push((*address, *sequence_number)); }, _ => { - panic!("mismatch"); + panic!("mismatch: {:?}, {:?}", peer, timeline_peer); }, } if batch.len() == count { diff --git a/mempool/src/shared_mempool/broadcast_peers_selector.rs b/mempool/src/shared_mempool/broadcast_peers_selector.rs index 775af8b121452..b2c82359b0089 100644 --- a/mempool/src/shared_mempool/broadcast_peers_selector.rs +++ b/mempool/src/shared_mempool/broadcast_peers_selector.rs @@ -2,10 +2,7 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 -use aptos_config::{ - config::{MempoolConfig, PeerRole}, - network_id::PeerNetworkId, -}; +use aptos_config::{config::PeerRole, network_id::PeerNetworkId}; use aptos_logger::info; use aptos_network::application::metadata::PeerMetadata; use aptos_types::{account_address::AccountAddress, transaction::Version, PeerId}; @@ -114,15 +111,15 @@ impl BroadcastPeersSelector for AllPeersSelector { } pub struct PrioritizedPeersSelector { - mempool_config: MempoolConfig, + max_selected_peers: usize, prioritized_peers: Vec, prioritized_peers_comparator: PrioritizedPeersComparator, } impl PrioritizedPeersSelector { - pub fn new(mempool_config: MempoolConfig) -> Self { + pub fn new(max_selected_peers: usize) -> Self { Self { - mempool_config, + max_selected_peers, prioritized_peers: Vec::new(), prioritized_peers_comparator: PrioritizedPeersComparator::new(), } @@ -143,7 +140,7 @@ impl BroadcastPeersSelector for PrioritizedPeersSelector { let peers: Vec<_> = self .prioritized_peers .iter() - .take(self.mempool_config.default_failovers + 1) + .take(self.max_selected_peers) .cloned() .collect(); info!( @@ -156,16 +153,16 @@ impl BroadcastPeersSelector for PrioritizedPeersSelector { } pub struct FreshPeersSelector { - mempool_config: MempoolConfig, + max_selected_peers: usize, stickiness_cache: Arc>>, sorted_peers: Vec<(PeerNetworkId, Version)>, peers: HashSet, } impl FreshPeersSelector { - pub fn new(mempool_config: MempoolConfig) -> Self { + pub fn new(max_selected_peers: usize) -> Self { Self { - mempool_config, + max_selected_peers, stickiness_cache: Arc::new( Cache::builder() .max_capacity(100_000) @@ -183,7 +180,7 @@ impl FreshPeersSelector { .sorted_peers .iter() .rev() - .take(self.mempool_config.default_failovers + 1) + .take(self.max_selected_peers) .map(|(peer, _version)| *peer) .collect(); // TODO: random shuffle among similar versions to keep from biasing diff --git a/mempool/src/shared_mempool/network.rs b/mempool/src/shared_mempool/network.rs index e345482ac6139..c8ea4eb03b63e 100644 --- a/mempool/src/shared_mempool/network.rs +++ b/mempool/src/shared_mempool/network.rs @@ -16,7 +16,7 @@ use crate::{ }, }; use aptos_config::{ - config::{MempoolConfig, RoleType}, + config::{BroadcastPeersSelectorConfig, MempoolConfig, RoleType}, network_id::PeerNetworkId, }; use aptos_infallible::RwLock; @@ -336,10 +336,10 @@ impl> MempoolNetworkInterf } let retry_batch_id = state.broadcast_info.retry_batches.iter().rev().next(); - let timeline_peer = if self.is_validator() { - None - } else { - Some(peer) + let timeline_peer = match self.mempool_config.broadcast_peers_selector { + BroadcastPeersSelectorConfig::AllPeers => None, + BroadcastPeersSelectorConfig::FreshPeers(_) + | BroadcastPeersSelectorConfig::PrioritizedPeers(_) => Some(peer), }; let (batch_id, transactions, metric_label) = match std::cmp::max(expired_batch_id, retry_batch_id) { diff --git a/mempool/src/shared_mempool/runtime.rs b/mempool/src/shared_mempool/runtime.rs index c18516fe0ab6b..18d7787cbcb43 100644 --- a/mempool/src/shared_mempool/runtime.rs +++ b/mempool/src/shared_mempool/runtime.rs @@ -14,7 +14,7 @@ use crate::{ }, QuorumStoreRequest, }; -use aptos_config::{config::NodeConfig, network_id::NetworkId}; +use aptos_config::config::{BroadcastPeersSelectorConfig, NodeConfig}; use aptos_event_notifications::ReconfigNotificationListener; use aptos_infallible::{Mutex, RwLock}; use aptos_logger::Level; @@ -26,7 +26,6 @@ use aptos_network::application::{ use aptos_storage_interface::DbReader; use aptos_vm_validator::vm_validator::{TransactionValidation, VMValidator}; use futures::channel::mpsc::{Receiver, UnboundedSender}; -use itertools::Itertools; use std::sync::Arc; use tokio::runtime::{Handle, Runtime}; @@ -105,18 +104,16 @@ pub fn bootstrap( let runtime = aptos_runtimes::spawn_named_runtime("shared-mem".into(), None); let broadcast_peers_selector = { - let inner_selector: Box = if config.base.role.is_validator() { - Box::new(AllPeersSelector::new()) - } else if !config.base.role.is_validator() - && peers_and_metadata - .get_registered_networks() - .contains(&NetworkId::Vfn) - { - // is_vfn - Box::new(PrioritizedPeersSelector::new(config.mempool.clone())) - } else { - Box::new(FreshPeersSelector::new(config.mempool.clone())) - }; + let inner_selector: Box = + match config.mempool.broadcast_peers_selector { + BroadcastPeersSelectorConfig::AllPeers => Box::new(AllPeersSelector::new()), + BroadcastPeersSelectorConfig::FreshPeers(max_selected_peers) => { + Box::new(FreshPeersSelector::new(max_selected_peers)) + }, + BroadcastPeersSelectorConfig::PrioritizedPeers(max_selected_peers) => { + Box::new(PrioritizedPeersSelector::new(max_selected_peers)) + }, + }; Arc::new(RwLock::new(inner_selector)) }; diff --git a/testsuite/smoke-test/src/full_nodes.rs b/testsuite/smoke-test/src/full_nodes.rs index 5f39f77bd7022..a4a1996b63b93 100644 --- a/testsuite/smoke-test/src/full_nodes.rs +++ b/testsuite/smoke-test/src/full_nodes.rs @@ -10,7 +10,10 @@ use crate::{ }, }; use aptos_config::{ - config::{DiscoveryMethod, NodeConfig, Peer, PeerRole, HANDSHAKE_VERSION}, + config::{ + BroadcastPeersSelectorConfig, DiscoveryMethod, NodeConfig, Peer, PeerRole, + HANDSHAKE_VERSION, + }, network_id::NetworkId, }; use aptos_forge::{LocalSwarm, NodeExt, Swarm, SwarmExt}; @@ -118,7 +121,7 @@ async fn test_full_node_basic_flow() { async fn test_pfn_route_updates() { // VFN will not forward to other VFN let mut vfn_config = NodeConfig::get_default_vfn_config(); - vfn_config.mempool.default_failovers = 0; + vfn_config.mempool.broadcast_peers_selector = BroadcastPeersSelectorConfig::PrioritizedPeers(1); let mut swarm = SwarmBuilder::new_local(2) .with_num_fullnodes(2) .with_aptos() @@ -129,7 +132,7 @@ async fn test_pfn_route_updates() { let version = swarm.versions().max().unwrap(); // The PFN only forwards to a single VFN at a time. Route updates allow the txns to succeed. let mut pfn_config = NodeConfig::get_default_pfn_config(); - pfn_config.mempool.default_failovers = 0; + pfn_config.mempool.broadcast_peers_selector = BroadcastPeersSelectorConfig::FreshPeers(1); pfn_config .peer_monitoring_service .node_monitoring @@ -248,7 +251,7 @@ async fn send_and_receive_coin( async fn test_vfn_failover() { // VFN failover happens when validator is down even for default_failovers = 0 let mut vfn_config = NodeConfig::get_default_vfn_config(); - vfn_config.mempool.default_failovers = 0; + vfn_config.mempool.broadcast_peers_selector = BroadcastPeersSelectorConfig::PrioritizedPeers(1); let mut swarm = SwarmBuilder::new_local(4) .with_num_fullnodes(4) .with_aptos()