Skip to content

Commit

Permalink
Improve mempool dissemination
Browse files Browse the repository at this point in the history
  • Loading branch information
bchocho committed Jul 14, 2023
1 parent 027f78b commit e15b046
Show file tree
Hide file tree
Showing 27 changed files with 1,063 additions and 322 deletions.
141 changes: 138 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ maplit = "1.0.2"
mime = "0.3.16"
mirai-annotations = "1.12.0"
mockall = "0.11.4"
moka = "0.11.0"
more-asserts = "0.3.0"
native-tls = "0.2.10"
ntest = "0.9.0"
Expand Down
1 change: 0 additions & 1 deletion aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,6 @@ fn create_single_node_test_config(
node_config
.mempool
.shared_mempool_max_concurrent_inbound_syncs = 1;
node_config.mempool.default_failovers = 1;
node_config.mempool.max_broadcasts_per_peer = 1;

node_config
Expand Down
50 changes: 35 additions & 15 deletions config/src/config/mempool_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ pub struct MempoolConfig {
pub capacity_bytes: usize,
/// Maximum number of transactions allowed in the Mempool per user
pub capacity_per_user: usize,
/// Number of failover peers to broadcast to when the primary network is alive
pub default_failovers: usize,
/// The maximum number of broadcasts sent to a single peer that are pending a response ACK at any point.
pub max_broadcasts_per_peer: usize,
/// Maximum number of inbound network messages to the Mempool application
Expand Down Expand Up @@ -53,6 +51,8 @@ pub struct MempoolConfig {
pub broadcast_buckets: Vec<u64>,
pub eager_expire_threshold_ms: Option<u64>,
pub eager_expire_time_ms: u64,
pub peer_update_interval_ms: u64,
pub broadcast_peers_selector: BroadcastPeersSelectorConfig,
}

impl Default for MempoolConfig {
Expand All @@ -70,13 +70,14 @@ impl Default for MempoolConfig {
capacity: 2_000_000,
capacity_bytes: 2 * 1024 * 1024 * 1024,
capacity_per_user: 100,
default_failovers: 1,
shared_mempool_peer_update_interval_ms: 1_000,
system_transaction_timeout_secs: 600,
system_transaction_gc_interval_ms: 60_000,
broadcast_buckets: DEFAULT_BUCKETS.to_vec(),
eager_expire_threshold_ms: Some(10_000),
eager_expire_time_ms: 3_000,
peer_update_interval_ms: 1_000,
broadcast_peers_selector: BroadcastPeersSelectorConfig::AllPeers,
}
}
}
Expand Down Expand Up @@ -104,6 +105,12 @@ impl ConfigOptimizer for MempoolConfig {
// Change the default configs for VFNs
let mut modified_config = false;
if node_type.is_validator_fullnode() {
// Set broadcast peers to prioritized, with a max of 1
// TODO: as a workaround for smoke tests, always apply even if there is an override
mempool_config.broadcast_peers_selector =
BroadcastPeersSelectorConfig::PrioritizedPeers(1);
modified_config = true;

// Set the shared_mempool_max_concurrent_inbound_syncs to 16 (default is 4)
if local_mempool_config_yaml["shared_mempool_max_concurrent_inbound_syncs"].is_null() {
mempool_config.shared_mempool_max_concurrent_inbound_syncs = 16;
Expand All @@ -116,12 +123,6 @@ impl ConfigOptimizer for MempoolConfig {
modified_config = true;
}

// Set the default_failovers to 0 (default is 1)
if local_mempool_config_yaml["default_failovers"].is_null() {
mempool_config.default_failovers = 0;
modified_config = true;
}

// Set the shared_mempool_batch_size to 200 (default is 100)
if local_mempool_config_yaml["shared_mempool_batch_size"].is_null() {
mempool_config.shared_mempool_batch_size = 200;
Expand All @@ -133,12 +134,28 @@ impl ConfigOptimizer for MempoolConfig {
mempool_config.shared_mempool_tick_interval_ms = 10;
modified_config = true;
}
} else if node_type.is_validator() {
// None for now
} else {
// The node is a PFN
// Set broadcast peers to fresh, with a max of 2
// TODO: as a workaround for smoke tests, always apply even if there is an override
mempool_config.broadcast_peers_selector = BroadcastPeersSelectorConfig::FreshPeers(2);
modified_config = true;
}

Ok(modified_config)
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum BroadcastPeersSelectorConfig {
AllPeers,
FreshPeers(usize),
PrioritizedPeers(usize),
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -165,7 +182,10 @@ mod tests {
16
);
assert_eq!(mempool_config.max_broadcasts_per_peer, 4);
assert_eq!(mempool_config.default_failovers, 0);
assert_eq!(
mempool_config.broadcast_peers_selector,
BroadcastPeersSelectorConfig::PrioritizedPeers(1)
);
assert_eq!(mempool_config.shared_mempool_batch_size, 200);
assert_eq!(mempool_config.shared_mempool_tick_interval_ms, 10);
}
Expand Down Expand Up @@ -197,8 +217,8 @@ mod tests {
default_mempool_config.max_broadcasts_per_peer
);
assert_eq!(
mempool_config.default_failovers,
default_mempool_config.default_failovers
mempool_config.broadcast_peers_selector,
default_mempool_config.broadcast_peers_selector
);
assert_eq!(
mempool_config.shared_mempool_batch_size,
Expand All @@ -212,7 +232,7 @@ mod tests {

#[test]
fn test_optimize_vfn_config_no_overrides() {
// Create the default validator config
// Create the default vfn config
let mut node_config = NodeConfig::get_default_vfn_config();

// Create a local config YAML with some local overrides
Expand Down Expand Up @@ -244,8 +264,8 @@ mod tests {
);
assert_eq!(mempool_config.max_broadcasts_per_peer, 1);
assert_ne!(
mempool_config.default_failovers,
default_mempool_config.default_failovers
mempool_config.broadcast_peers_selector,
default_mempool_config.broadcast_peers_selector
);
assert_ne!(
mempool_config.shared_mempool_batch_size,
Expand Down
Loading

0 comments on commit e15b046

Please sign in to comment.