Skip to content

Commit

Permalink
Add configs for the broadcast peers selector. Currently smoke tests r…
Browse files Browse the repository at this point in the history
…equire an override always, because of limitations in how the configs are built
  • Loading branch information
bchocho committed Jun 20, 2023
1 parent 525eaeb commit ee4b400
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 54 deletions.
1 change: 0 additions & 1 deletion aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,6 @@ fn create_single_node_test_config(
node_config
.mempool
.shared_mempool_max_concurrent_inbound_syncs = 1;
node_config.mempool.default_failovers = 1;
node_config.mempool.max_broadcasts_per_peer = 1;

node_config
Expand Down
41 changes: 24 additions & 17 deletions config/src/config/mempool_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ pub struct MempoolConfig {
pub capacity_bytes: usize,
/// Maximum number of transactions allowed in the Mempool per user
pub capacity_per_user: usize,
// TODO: rename
/// Number of failover peers to broadcast to when the primary network is alive
pub default_failovers: usize,
/// The maximum number of broadcasts sent to a single peer that are pending a response ACK at any point.
pub max_broadcasts_per_peer: usize,
/// Maximum number of inbound network messages to the Mempool application
Expand Down Expand Up @@ -72,7 +69,6 @@ impl Default for MempoolConfig {
capacity: 2_000_000,
capacity_bytes: 2 * 1024 * 1024 * 1024,
capacity_per_user: 100,
default_failovers: 1,
system_transaction_timeout_secs: 600,
system_transaction_gc_interval_ms: 60_000,
broadcast_buckets: DEFAULT_BUCKETS.to_vec(),
Expand Down Expand Up @@ -119,11 +115,11 @@ impl ConfigOptimizer for MempoolConfig {
modified_config = true;
}

// Set the default_failovers to 0 (default is 1)
if local_mempool_config_yaml["default_failovers"].is_null() {
mempool_config.default_failovers = 0;
modified_config = true;
}
// Set broadcast peers to prioritized, with a max of 1
// TODO: as a workaround for smoke tests, always apply even if there is an override
mempool_config.broadcast_peers_selector =
BroadcastPeersSelectorConfig::PrioritizedPeers(1);
modified_config = true;

// Set the shared_mempool_batch_size to 200 (default is 100)
if local_mempool_config_yaml["shared_mempool_batch_size"].is_null() {
Expand All @@ -136,6 +132,14 @@ impl ConfigOptimizer for MempoolConfig {
mempool_config.shared_mempool_tick_interval_ms = 10;
modified_config = true;
}
} else if node_type.is_validator() {
// None for now
} else {
// The node is a PFN
// Set broadcast peers to fresh, with a max of 2
// TODO: as a workaround for smoke tests, always apply even if there is an override
mempool_config.broadcast_peers_selector = BroadcastPeersSelectorConfig::FreshPeers(2);
modified_config = true;
}

Ok(modified_config)
Expand All @@ -146,8 +150,8 @@ impl ConfigOptimizer for MempoolConfig {
#[serde(rename_all = "snake_case")]
pub enum BroadcastPeersSelectorConfig {
AllPeers,
FreshPeers(u64),
PrioritizedPeers(u64),
FreshPeers(usize),
PrioritizedPeers(usize),
}

#[cfg(test)]
Expand Down Expand Up @@ -176,7 +180,10 @@ mod tests {
16
);
assert_eq!(mempool_config.max_broadcasts_per_peer, 4);
assert_eq!(mempool_config.default_failovers, 0);
assert_eq!(
mempool_config.broadcast_peers_selector,
BroadcastPeersSelectorConfig::PrioritizedPeers(1)
);
assert_eq!(mempool_config.shared_mempool_batch_size, 200);
assert_eq!(mempool_config.shared_mempool_tick_interval_ms, 10);
}
Expand Down Expand Up @@ -208,8 +215,8 @@ mod tests {
default_mempool_config.max_broadcasts_per_peer
);
assert_eq!(
mempool_config.default_failovers,
default_mempool_config.default_failovers
mempool_config.broadcast_peers_selector,
default_mempool_config.broadcast_peers_selector
);
assert_eq!(
mempool_config.shared_mempool_batch_size,
Expand All @@ -223,7 +230,7 @@ mod tests {

#[test]
fn test_optimize_vfn_config_no_overrides() {
// Create the default validator config
// Create the default vfn config
let mut node_config = NodeConfig::get_default_vfn_config();

// Create a local config YAML with some local overrides
Expand Down Expand Up @@ -255,8 +262,8 @@ mod tests {
);
assert_eq!(mempool_config.max_broadcasts_per_peer, 1);
assert_ne!(
mempool_config.default_failovers,
default_mempool_config.default_failovers
mempool_config.broadcast_peers_selector,
default_mempool_config.broadcast_peers_selector
);
assert_ne!(
mempool_config.shared_mempool_batch_size,
Expand Down
2 changes: 1 addition & 1 deletion mempool/src/core_mempool/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl TimelineIndex {
batch.push((*address, *sequence_number));
},
_ => {
panic!("mismatch");
panic!("mismatch: {:?}, {:?}", peer, timeline_peer);
},
}
if batch.len() == count {
Expand Down
21 changes: 9 additions & 12 deletions mempool/src/shared_mempool/broadcast_peers_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use aptos_config::{
config::{MempoolConfig, PeerRole},
network_id::PeerNetworkId,
};
use aptos_config::{config::PeerRole, network_id::PeerNetworkId};
use aptos_logger::info;
use aptos_network::application::metadata::PeerMetadata;
use aptos_types::{account_address::AccountAddress, transaction::Version, PeerId};
Expand Down Expand Up @@ -114,15 +111,15 @@ impl BroadcastPeersSelector for AllPeersSelector {
}

pub struct PrioritizedPeersSelector {
mempool_config: MempoolConfig,
max_selected_peers: usize,
prioritized_peers: Vec<PeerNetworkId>,
prioritized_peers_comparator: PrioritizedPeersComparator,
}

impl PrioritizedPeersSelector {
pub fn new(mempool_config: MempoolConfig) -> Self {
pub fn new(max_selected_peers: usize) -> Self {
Self {
mempool_config,
max_selected_peers,
prioritized_peers: Vec::new(),
prioritized_peers_comparator: PrioritizedPeersComparator::new(),
}
Expand All @@ -143,7 +140,7 @@ impl BroadcastPeersSelector for PrioritizedPeersSelector {
let peers: Vec<_> = self
.prioritized_peers
.iter()
.take(self.mempool_config.default_failovers + 1)
.take(self.max_selected_peers)
.cloned()
.collect();
info!(
Expand All @@ -156,16 +153,16 @@ impl BroadcastPeersSelector for PrioritizedPeersSelector {
}

pub struct FreshPeersSelector {
mempool_config: MempoolConfig,
max_selected_peers: usize,
stickiness_cache: Arc<Cache<AccountAddress, Vec<PeerNetworkId>>>,
sorted_peers: Vec<(PeerNetworkId, Version)>,
peers: HashSet<PeerNetworkId>,
}

impl FreshPeersSelector {
pub fn new(mempool_config: MempoolConfig) -> Self {
pub fn new(max_selected_peers: usize) -> Self {
Self {
mempool_config,
max_selected_peers,
stickiness_cache: Arc::new(
Cache::builder()
.max_capacity(100_000)
Expand All @@ -183,7 +180,7 @@ impl FreshPeersSelector {
.sorted_peers
.iter()
.rev()
.take(self.mempool_config.default_failovers + 1)
.take(self.max_selected_peers)
.map(|(peer, _version)| *peer)
.collect();
// TODO: random shuffle among similar versions to keep from biasing
Expand Down
10 changes: 5 additions & 5 deletions mempool/src/shared_mempool/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
},
};
use aptos_config::{
config::{MempoolConfig, RoleType},
config::{BroadcastPeersSelectorConfig, MempoolConfig, RoleType},
network_id::PeerNetworkId,
};
use aptos_infallible::RwLock;
Expand Down Expand Up @@ -336,10 +336,10 @@ impl<NetworkClient: NetworkClientInterface<MempoolSyncMsg>> MempoolNetworkInterf
}
let retry_batch_id = state.broadcast_info.retry_batches.iter().rev().next();

let timeline_peer = if self.is_validator() {
None
} else {
Some(peer)
let timeline_peer = match self.mempool_config.broadcast_peers_selector {
BroadcastPeersSelectorConfig::AllPeers => None,
BroadcastPeersSelectorConfig::FreshPeers(_)
| BroadcastPeersSelectorConfig::PrioritizedPeers(_) => Some(peer),
};
let (batch_id, transactions, metric_label) =
match std::cmp::max(expired_batch_id, retry_batch_id) {
Expand Down
25 changes: 11 additions & 14 deletions mempool/src/shared_mempool/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
},
QuorumStoreRequest,
};
use aptos_config::{config::NodeConfig, network_id::NetworkId};
use aptos_config::config::{BroadcastPeersSelectorConfig, NodeConfig};
use aptos_event_notifications::ReconfigNotificationListener;
use aptos_infallible::{Mutex, RwLock};
use aptos_logger::Level;
Expand All @@ -26,7 +26,6 @@ use aptos_network::application::{
use aptos_storage_interface::DbReader;
use aptos_vm_validator::vm_validator::{TransactionValidation, VMValidator};
use futures::channel::mpsc::{Receiver, UnboundedSender};
use itertools::Itertools;
use std::sync::Arc;
use tokio::runtime::{Handle, Runtime};

Expand Down Expand Up @@ -105,18 +104,16 @@ pub fn bootstrap(
let runtime = aptos_runtimes::spawn_named_runtime("shared-mem".into(), None);

let broadcast_peers_selector = {
let inner_selector: Box<dyn BroadcastPeersSelector> = if config.base.role.is_validator() {
Box::new(AllPeersSelector::new())
} else if !config.base.role.is_validator()
&& peers_and_metadata
.get_registered_networks()
.contains(&NetworkId::Vfn)
{
// is_vfn
Box::new(PrioritizedPeersSelector::new(config.mempool.clone()))
} else {
Box::new(FreshPeersSelector::new(config.mempool.clone()))
};
let inner_selector: Box<dyn BroadcastPeersSelector> =
match config.mempool.broadcast_peers_selector {
BroadcastPeersSelectorConfig::AllPeers => Box::new(AllPeersSelector::new()),
BroadcastPeersSelectorConfig::FreshPeers(max_selected_peers) => {
Box::new(FreshPeersSelector::new(max_selected_peers))
},
BroadcastPeersSelectorConfig::PrioritizedPeers(max_selected_peers) => {
Box::new(PrioritizedPeersSelector::new(max_selected_peers))
},
};
Arc::new(RwLock::new(inner_selector))
};

Expand Down
11 changes: 7 additions & 4 deletions testsuite/smoke-test/src/full_nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use crate::{
},
};
use aptos_config::{
config::{DiscoveryMethod, NodeConfig, Peer, PeerRole, HANDSHAKE_VERSION},
config::{
BroadcastPeersSelectorConfig, DiscoveryMethod, NodeConfig, Peer, PeerRole,
HANDSHAKE_VERSION,
},
network_id::NetworkId,
};
use aptos_forge::{LocalSwarm, NodeExt, Swarm, SwarmExt};
Expand Down Expand Up @@ -118,7 +121,7 @@ async fn test_full_node_basic_flow() {
async fn test_pfn_route_updates() {
// VFN will not forward to other VFN
let mut vfn_config = NodeConfig::get_default_vfn_config();
vfn_config.mempool.default_failovers = 0;
vfn_config.mempool.broadcast_peers_selector = BroadcastPeersSelectorConfig::PrioritizedPeers(1);
let mut swarm = SwarmBuilder::new_local(2)
.with_num_fullnodes(2)
.with_aptos()
Expand All @@ -129,7 +132,7 @@ async fn test_pfn_route_updates() {
let version = swarm.versions().max().unwrap();
// The PFN only forwards to a single VFN at a time. Route updates allow the txns to succeed.
let mut pfn_config = NodeConfig::get_default_pfn_config();
pfn_config.mempool.default_failovers = 0;
pfn_config.mempool.broadcast_peers_selector = BroadcastPeersSelectorConfig::FreshPeers(1);
pfn_config
.peer_monitoring_service
.node_monitoring
Expand Down Expand Up @@ -248,7 +251,7 @@ async fn send_and_receive_coin(
async fn test_vfn_failover() {
// VFN failover happens when validator is down even for default_failovers = 0
let mut vfn_config = NodeConfig::get_default_vfn_config();
vfn_config.mempool.default_failovers = 0;
vfn_config.mempool.broadcast_peers_selector = BroadcastPeersSelectorConfig::PrioritizedPeers(1);
let mut swarm = SwarmBuilder::new_local(4)
.with_num_fullnodes(4)
.with_aptos()
Expand Down

0 comments on commit ee4b400

Please sign in to comment.