From 429f4ddf516b4c0b435d288adbc8d8129bf5e787 Mon Sep 17 00:00:00 2001 From: "Brian (Sunghoon) Cho" Date: Wed, 2 Aug 2023 14:50:59 -0700 Subject: [PATCH] [Mempool] optimize fullnode broadcast hops for latency (#9309) ### Description * Increase single fullnode max throughput from 4K TPS to 6K TPS (max batch size 200 -> 300, scheduled every 50 ms) * Increase throughput when broadcast RTT is large, by increasing the number of outstanding requests. E.g., previously an RTT of 500 ms with 2 outstanding requests, meant only 2 requests could be made every 500 ms. ### Test Plan Run forge with PFNs and network emulation, observe that `Avg Insertion-to-Broadcast-Batched` on PFNs drops significantly, from 1-2 s for some PFNs to < 200 ms. --- config/src/config/mempool_config.rs | 63 +++++++++++++++------------ mempool/src/logging.rs | 2 + mempool/src/shared_mempool/network.rs | 3 +- mempool/src/shared_mempool/tasks.rs | 2 +- 4 files changed, 40 insertions(+), 30 deletions(-) diff --git a/config/src/config/mempool_config.rs b/config/src/config/mempool_config.rs index 0eacbd0bd23f8..0093dbbe58f8b 100644 --- a/config/src/config/mempool_config.rs +++ b/config/src/config/mempool_config.rs @@ -60,11 +60,11 @@ impl Default for MempoolConfig { MempoolConfig { shared_mempool_tick_interval_ms: 50, shared_mempool_backoff_interval_ms: 30_000, - shared_mempool_batch_size: 200, + shared_mempool_batch_size: 300, shared_mempool_max_batch_bytes: MAX_APPLICATION_MESSAGE_SIZE as u64, shared_mempool_ack_timeout_ms: 2_000, shared_mempool_max_concurrent_inbound_syncs: 4, - max_broadcasts_per_peer: 2, + max_broadcasts_per_peer: 20, max_network_channel_size: 1024, mempool_snapshot_interval_secs: 180, capacity: 2_000_000, @@ -103,6 +103,18 @@ impl ConfigOptimizer for MempoolConfig { // Change the default configs for VFNs let mut modified_config = false; + if node_type.is_validator() { + // Set the max_broadcasts_per_peer to 2 (default is 20) + if local_mempool_config_yaml["max_broadcasts_per_peer"].is_null() { + mempool_config.max_broadcasts_per_peer = 2; + modified_config = true; + } + // Set the batch size per broadcast to 200 (default is 300) + if local_mempool_config_yaml["shared_mempool_batch_size"].is_null() { + mempool_config.shared_mempool_batch_size = 200; + modified_config = true; + } + } if node_type.is_validator_fullnode() { // Set the shared_mempool_max_concurrent_inbound_syncs to 16 (default is 4) if local_mempool_config_yaml["shared_mempool_max_concurrent_inbound_syncs"].is_null() { @@ -110,12 +122,6 @@ impl ConfigOptimizer for MempoolConfig { modified_config = true; } - // Set the max_broadcasts_per_peer to 4 (default is 2) - if local_mempool_config_yaml["max_broadcasts_per_peer"].is_null() { - mempool_config.max_broadcasts_per_peer = 4; - modified_config = true; - } - // Set the default_failovers to 0 (default is 1) if local_mempool_config_yaml["default_failovers"].is_null() { mempool_config.default_failovers = 0; @@ -158,9 +164,9 @@ mod tests { mempool_config.shared_mempool_max_concurrent_inbound_syncs, 16 ); - assert_eq!(mempool_config.max_broadcasts_per_peer, 4); + assert_eq!(mempool_config.max_broadcasts_per_peer, 20); assert_eq!(mempool_config.default_failovers, 0); - assert_eq!(mempool_config.shared_mempool_batch_size, 200); + assert_eq!(mempool_config.shared_mempool_batch_size, 300); assert_eq!(mempool_config.shared_mempool_tick_interval_ms, 10); } @@ -177,7 +183,7 @@ mod tests { ChainId::mainnet(), ) .unwrap(); - assert!(!modified_config); + assert!(modified_config); // Verify that all relevant fields are not modified let mempool_config = &node_config.mempool; @@ -186,18 +192,12 @@ mod tests { mempool_config.shared_mempool_max_concurrent_inbound_syncs, default_mempool_config.shared_mempool_max_concurrent_inbound_syncs ); - assert_eq!( - mempool_config.max_broadcasts_per_peer, - default_mempool_config.max_broadcasts_per_peer - ); + assert_eq!(mempool_config.max_broadcasts_per_peer, 2); assert_eq!( mempool_config.default_failovers, default_mempool_config.default_failovers ); - assert_eq!( - mempool_config.shared_mempool_batch_size, - default_mempool_config.shared_mempool_batch_size - ); + assert_eq!(mempool_config.shared_mempool_batch_size, 200); assert_eq!( mempool_config.shared_mempool_tick_interval_ms, default_mempool_config.shared_mempool_tick_interval_ms @@ -207,16 +207,24 @@ mod tests { #[test] fn test_optimize_vfn_config_no_overrides() { // Create the default validator config + let local_shared_mempool_max_concurrent_inbound_syncs = 1; + let local_max_broadcasts_per_peer = 1; let mut node_config = NodeConfig::get_default_vfn_config(); + node_config + .mempool + .shared_mempool_max_concurrent_inbound_syncs = + local_shared_mempool_max_concurrent_inbound_syncs; + node_config.mempool.max_broadcasts_per_peer = local_max_broadcasts_per_peer; // Create a local config YAML with some local overrides - let local_config_yaml = serde_yaml::from_str( + let local_config_yaml = serde_yaml::from_str(&format!( r#" mempool: - shared_mempool_max_concurrent_inbound_syncs: 4 - max_broadcasts_per_peer: 1 + shared_mempool_max_concurrent_inbound_syncs: {} + max_broadcasts_per_peer: {} "#, - ) + local_shared_mempool_max_concurrent_inbound_syncs, local_max_broadcasts_per_peer + )) .unwrap(); // Optimize the config and verify modifications are made @@ -234,12 +242,11 @@ mod tests { let default_mempool_config = MempoolConfig::default(); assert_eq!( mempool_config.shared_mempool_max_concurrent_inbound_syncs, - 4 + local_shared_mempool_max_concurrent_inbound_syncs ); - assert_eq!(mempool_config.max_broadcasts_per_peer, 2); - assert_ne!( - mempool_config.default_failovers, - default_mempool_config.default_failovers + assert_eq!( + mempool_config.max_broadcasts_per_peer, + local_max_broadcasts_per_peer ); assert_ne!( mempool_config.shared_mempool_tick_interval_ms, diff --git a/mempool/src/logging.rs b/mempool/src/logging.rs index 219210e25340a..e3ac583798841 100644 --- a/mempool/src/logging.rs +++ b/mempool/src/logging.rs @@ -116,6 +116,7 @@ pub struct LogSchema<'a> { #[schema(debug)] batch_id: Option<&'a MultiBatchId>, backpressure: Option, + num_txns: Option, } impl<'a> LogSchema<'a> { @@ -143,6 +144,7 @@ impl<'a> LogSchema<'a> { upstream_network: None, batch_id: None, backpressure: None, + num_txns: None, } } } diff --git a/mempool/src/shared_mempool/network.rs b/mempool/src/shared_mempool/network.rs index 5f39a5c199b2e..5869bcff63bfe 100644 --- a/mempool/src/shared_mempool/network.rs +++ b/mempool/src/shared_mempool/network.rs @@ -481,11 +481,12 @@ impl> MempoolNetworkInterf // Log all the metrics let latency = start_time.elapsed(); - trace!( + debug!( LogSchema::event_log(LogEntry::BroadcastTransaction, LogEvent::Success) .peer(&peer) .batch_id(&batch_id) .backpressure(scheduled_backoff) + .num_txns(num_txns) ); let network_id = peer.network_id(); counters::shared_mempool_broadcast_size(network_id, num_txns); diff --git a/mempool/src/shared_mempool/tasks.rs b/mempool/src/shared_mempool/tasks.rs index ff8f9dfcb2589..5f856ffdf5bd2 100644 --- a/mempool/src/shared_mempool/tasks.rs +++ b/mempool/src/shared_mempool/tasks.rs @@ -70,7 +70,7 @@ pub(crate) async fn execute_broadcast( .peer(&peer) .error(&error)), _ => { - trace!("{:?}", err) + debug!("{:?}", err) }, } }