Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] mempool dissemination #8444

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 138 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ merlin = "3"
mime = "0.3.16"
mirai-annotations = "1.12.0"
mockall = "0.11.4"
moka = "0.11.2"
more-asserts = "0.3.0"
native-tls = "0.2.10"
ntest = "0.9.0"
Expand Down
1 change: 0 additions & 1 deletion aptos-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,6 @@ where
node_config
.mempool
.shared_mempool_max_concurrent_inbound_syncs = 1;
node_config.mempool.default_failovers = 1;
node_config.mempool.max_broadcasts_per_peer = 1;

node_config
Expand Down
52 changes: 40 additions & 12 deletions config/src/config/mempool_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ pub struct MempoolConfig {
pub capacity_bytes: usize,
/// Maximum number of transactions allowed in the Mempool per user
pub capacity_per_user: usize,
/// Number of failover peers to broadcast to when the primary network is alive
pub default_failovers: usize,
/// The maximum number of broadcasts sent to a single peer that are pending a response ACK at any point.
pub max_broadcasts_per_peer: usize,
/// Maximum number of inbound network messages to the Mempool application
Expand Down Expand Up @@ -53,6 +51,8 @@ pub struct MempoolConfig {
pub broadcast_buckets: Vec<u64>,
pub eager_expire_threshold_ms: Option<u64>,
pub eager_expire_time_ms: u64,
pub peer_update_interval_ms: u64,
pub broadcast_peers_selector: BroadcastPeersSelectorConfig,
}

impl Default for MempoolConfig {
Expand All @@ -70,13 +70,14 @@ impl Default for MempoolConfig {
capacity: 2_000_000,
capacity_bytes: 2 * 1024 * 1024 * 1024,
capacity_per_user: 100,
default_failovers: 1,
shared_mempool_peer_update_interval_ms: 1_000,
system_transaction_timeout_secs: 600,
system_transaction_gc_interval_ms: 60_000,
broadcast_buckets: DEFAULT_BUCKETS.to_vec(),
eager_expire_threshold_ms: Some(10_000),
eager_expire_time_ms: 3_000,
peer_update_interval_ms: 1_000,
broadcast_peers_selector: BroadcastPeersSelectorConfig::PrioritizedPeers(1),
}
}
}
Expand Down Expand Up @@ -116,15 +117,16 @@ impl ConfigOptimizer for MempoolConfig {
}
}
if node_type.is_validator_fullnode() {
// Set the shared_mempool_max_concurrent_inbound_syncs to 16 (default is 4)
if local_mempool_config_yaml["shared_mempool_max_concurrent_inbound_syncs"].is_null() {
mempool_config.shared_mempool_max_concurrent_inbound_syncs = 16;
// Set broadcast peers to prioritized, with a max of 1
if local_mempool_config_yaml["broadcast_peers_selector"].is_null() {
mempool_config.broadcast_peers_selector =
BroadcastPeersSelectorConfig::PrioritizedPeers(1);
modified_config = true;
}

// Set the default_failovers to 0 (default is 1)
if local_mempool_config_yaml["default_failovers"].is_null() {
mempool_config.default_failovers = 0;
// Set the shared_mempool_max_concurrent_inbound_syncs to 16 (default is 4)
if local_mempool_config_yaml["shared_mempool_max_concurrent_inbound_syncs"].is_null() {
mempool_config.shared_mempool_max_concurrent_inbound_syncs = 16;
modified_config = true;
}

Expand All @@ -133,12 +135,35 @@ impl ConfigOptimizer for MempoolConfig {
mempool_config.shared_mempool_tick_interval_ms = 10;
modified_config = true;
}
} else if node_type.is_validator() {
// TODO: With quorum store, this isn't used. Used for testing, but should be removed.
if local_mempool_config_yaml["broadcast_peers_selector"].is_null() {
mempool_config.broadcast_peers_selector =
BroadcastPeersSelectorConfig::PrioritizedPeers(1);
modified_config = true;
}
} else {
// The node is a PFN
// Set broadcast peers to fresh, with a max of 2
if local_mempool_config_yaml["broadcast_peers_selector"].is_null() {
mempool_config.broadcast_peers_selector =
BroadcastPeersSelectorConfig::FreshPeers(2, 1000);
modified_config = true;
}
}

Ok(modified_config)
}
}

#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum BroadcastPeersSelectorConfig {
/// num_peers_to_select, version_threshold
FreshPeers(usize, u64),
PrioritizedPeers(usize),
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -165,8 +190,11 @@ mod tests {
16
);
assert_eq!(mempool_config.max_broadcasts_per_peer, 20);
assert_eq!(mempool_config.default_failovers, 0);
assert_eq!(mempool_config.shared_mempool_batch_size, 300);
assert_eq!(
mempool_config.broadcast_peers_selector,
BroadcastPeersSelectorConfig::PrioritizedPeers(1)
);
assert_eq!(mempool_config.shared_mempool_tick_interval_ms, 10);
}

Expand Down Expand Up @@ -194,8 +222,8 @@ mod tests {
);
assert_eq!(mempool_config.max_broadcasts_per_peer, 2);
assert_eq!(
mempool_config.default_failovers,
default_mempool_config.default_failovers
mempool_config.broadcast_peers_selector,
default_mempool_config.broadcast_peers_selector
);
assert_eq!(mempool_config.shared_mempool_batch_size, 200);
assert_eq!(
Expand Down
1 change: 1 addition & 0 deletions mempool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ fail = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
maplit = { workspace = true }
moka = { workspace = true }
once_cell = { workspace = true }
proptest = { workspace = true, optional = true }
rand = { workspace = true }
Expand Down
Loading
Loading