Skip to content

Commit

Permalink
Improve mempool dissemination
Browse files Browse the repository at this point in the history
  • Loading branch information
bchocho committed Sep 20, 2023
1 parent 4825870 commit 857f070
Show file tree
Hide file tree
Showing 27 changed files with 1,070 additions and 319 deletions.
141 changes: 138 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ merlin = "3"
mime = "0.3.16"
mirai-annotations = "1.12.0"
mockall = "0.11.4"
moka = "0.11.2"
more-asserts = "0.3.0"
native-tls = "0.2.10"
ntest = "0.9.0"
Expand Down
1 change: 0 additions & 1 deletion aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ fn create_single_node_test_config(
node_config
.mempool
.shared_mempool_max_concurrent_inbound_syncs = 1;
node_config.mempool.default_failovers = 1;
node_config.mempool.max_broadcasts_per_peer = 1;

node_config
Expand Down
48 changes: 36 additions & 12 deletions config/src/config/mempool_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ pub struct MempoolConfig {
pub capacity_bytes: usize,
/// Maximum number of transactions allowed in the Mempool per user
pub capacity_per_user: usize,
/// Number of failover peers to broadcast to when the primary network is alive
pub default_failovers: usize,
/// The maximum number of broadcasts sent to a single peer that are pending a response ACK at any point.
pub max_broadcasts_per_peer: usize,
/// Maximum number of inbound network messages to the Mempool application
Expand Down Expand Up @@ -53,6 +51,8 @@ pub struct MempoolConfig {
pub broadcast_buckets: Vec<u64>,
pub eager_expire_threshold_ms: Option<u64>,
pub eager_expire_time_ms: u64,
pub peer_update_interval_ms: u64,
pub broadcast_peers_selector: BroadcastPeersSelectorConfig,
}

impl Default for MempoolConfig {
Expand All @@ -70,13 +70,14 @@ impl Default for MempoolConfig {
capacity: 2_000_000,
capacity_bytes: 2 * 1024 * 1024 * 1024,
capacity_per_user: 100,
default_failovers: 1,
shared_mempool_peer_update_interval_ms: 1_000,
system_transaction_timeout_secs: 600,
system_transaction_gc_interval_ms: 60_000,
broadcast_buckets: DEFAULT_BUCKETS.to_vec(),
eager_expire_threshold_ms: Some(10_000),
eager_expire_time_ms: 3_000,
peer_update_interval_ms: 1_000,
broadcast_peers_selector: BroadcastPeersSelectorConfig::AllPeers,
}
}
}
Expand Down Expand Up @@ -116,29 +117,45 @@ impl ConfigOptimizer for MempoolConfig {
}
}
if node_type.is_validator_fullnode() {
// Set broadcast peers to prioritized, with a max of 1
// TODO: as a workaround for smoke tests, always apply even if there is an override
mempool_config.broadcast_peers_selector =
BroadcastPeersSelectorConfig::PrioritizedPeers(1);
modified_config = true;

// Set the shared_mempool_max_concurrent_inbound_syncs to 16 (default is 4)
if local_mempool_config_yaml["shared_mempool_max_concurrent_inbound_syncs"].is_null() {
mempool_config.shared_mempool_max_concurrent_inbound_syncs = 16;
modified_config = true;
}

// Set the default_failovers to 0 (default is 1)
if local_mempool_config_yaml["default_failovers"].is_null() {
mempool_config.default_failovers = 0;
modified_config = true;
}

// Set the shared_mempool_tick_interval_ms to 10 (default is 50)
if local_mempool_config_yaml["shared_mempool_tick_interval_ms"].is_null() {
mempool_config.shared_mempool_tick_interval_ms = 10;
modified_config = true;
}
} else if node_type.is_validator() {
// None for now
} else {
// The node is a PFN
// Set broadcast peers to fresh, with a max of 2
// TODO: as a workaround for smoke tests, always apply even if there is an override
mempool_config.broadcast_peers_selector = BroadcastPeersSelectorConfig::FreshPeers(2);
modified_config = true;
}

Ok(modified_config)
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum BroadcastPeersSelectorConfig {
AllPeers,
FreshPeers(usize),
PrioritizedPeers(usize),
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -165,8 +182,11 @@ mod tests {
16
);
assert_eq!(mempool_config.max_broadcasts_per_peer, 20);
assert_eq!(mempool_config.default_failovers, 0);
assert_eq!(mempool_config.shared_mempool_batch_size, 300);
assert_eq!(
mempool_config.broadcast_peers_selector,
BroadcastPeersSelectorConfig::PrioritizedPeers(1)
);
assert_eq!(mempool_config.shared_mempool_tick_interval_ms, 10);
}

Expand Down Expand Up @@ -194,8 +214,8 @@ mod tests {
);
assert_eq!(mempool_config.max_broadcasts_per_peer, 2);
assert_eq!(
mempool_config.default_failovers,
default_mempool_config.default_failovers
mempool_config.broadcast_peers_selector,
default_mempool_config.broadcast_peers_selector
);
assert_eq!(mempool_config.shared_mempool_batch_size, 200);
assert_eq!(
Expand Down Expand Up @@ -248,6 +268,10 @@ mod tests {
mempool_config.max_broadcasts_per_peer,
local_max_broadcasts_per_peer
);
assert_ne!(
mempool_config.broadcast_peers_selector,
default_mempool_config.broadcast_peers_selector
);
assert_ne!(
mempool_config.shared_mempool_tick_interval_ms,
default_mempool_config.shared_mempool_tick_interval_ms
Expand Down
1 change: 1 addition & 0 deletions mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ fail = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
maplit = { workspace = true }
moka = { workspace = true }
once_cell = { workspace = true }
proptest = { workspace = true, optional = true }
rand = { workspace = true }
Expand Down
Loading

0 comments on commit 857f070

Please sign in to comment.