Skip to content

Commit

Permalink
[Mempool] optimize fullnode broadcast hops for latency (#9309)
Browse files Browse the repository at this point in the history
### Description

* Increase single fullnode max throughput from 4K TPS to 6K TPS (max batch size 200 -> 300, scheduled every 50 ms)
* Increase throughput when broadcast RTT is large, by increasing the number of outstanding requests. E.g., previously an RTT of 500 ms with 2 outstanding requests, meant only 2 requests could be made every 500 ms.

### Test Plan

Run forge with PFNs and network emulation, observe that `Avg Insertion-to-Broadcast-Batched` on PFNs drops significantly, from 1-2 s for some PFNs to < 200 ms.
  • Loading branch information
bchocho authored Aug 2, 2023
1 parent 9ac90fb commit 429f4dd
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 30 deletions.
63 changes: 35 additions & 28 deletions config/src/config/mempool_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ impl Default for MempoolConfig {
MempoolConfig {
shared_mempool_tick_interval_ms: 50,
shared_mempool_backoff_interval_ms: 30_000,
shared_mempool_batch_size: 200,
shared_mempool_batch_size: 300,
shared_mempool_max_batch_bytes: MAX_APPLICATION_MESSAGE_SIZE as u64,
shared_mempool_ack_timeout_ms: 2_000,
shared_mempool_max_concurrent_inbound_syncs: 4,
max_broadcasts_per_peer: 2,
max_broadcasts_per_peer: 20,
max_network_channel_size: 1024,
mempool_snapshot_interval_secs: 180,
capacity: 2_000_000,
Expand Down Expand Up @@ -103,19 +103,25 @@ impl ConfigOptimizer for MempoolConfig {

// Change the default configs for VFNs
let mut modified_config = false;
if node_type.is_validator() {
// Set the max_broadcasts_per_peer to 2 (default is 20)
if local_mempool_config_yaml["max_broadcasts_per_peer"].is_null() {
mempool_config.max_broadcasts_per_peer = 2;
modified_config = true;
}
// Set the batch size per broadcast to 200 (default is 300)
if local_mempool_config_yaml["shared_mempool_batch_size"].is_null() {
mempool_config.shared_mempool_batch_size = 200;
modified_config = true;
}
}
if node_type.is_validator_fullnode() {
// Set the shared_mempool_max_concurrent_inbound_syncs to 16 (default is 4)
if local_mempool_config_yaml["shared_mempool_max_concurrent_inbound_syncs"].is_null() {
mempool_config.shared_mempool_max_concurrent_inbound_syncs = 16;
modified_config = true;
}

// Set the max_broadcasts_per_peer to 4 (default is 2)
if local_mempool_config_yaml["max_broadcasts_per_peer"].is_null() {
mempool_config.max_broadcasts_per_peer = 4;
modified_config = true;
}

// Set the default_failovers to 0 (default is 1)
if local_mempool_config_yaml["default_failovers"].is_null() {
mempool_config.default_failovers = 0;
Expand Down Expand Up @@ -158,9 +164,9 @@ mod tests {
mempool_config.shared_mempool_max_concurrent_inbound_syncs,
16
);
assert_eq!(mempool_config.max_broadcasts_per_peer, 4);
assert_eq!(mempool_config.max_broadcasts_per_peer, 20);
assert_eq!(mempool_config.default_failovers, 0);
assert_eq!(mempool_config.shared_mempool_batch_size, 200);
assert_eq!(mempool_config.shared_mempool_batch_size, 300);
assert_eq!(mempool_config.shared_mempool_tick_interval_ms, 10);
}

Expand All @@ -177,7 +183,7 @@ mod tests {
ChainId::mainnet(),
)
.unwrap();
assert!(!modified_config);
assert!(modified_config);

// Verify that all relevant fields are not modified
let mempool_config = &node_config.mempool;
Expand All @@ -186,18 +192,12 @@ mod tests {
mempool_config.shared_mempool_max_concurrent_inbound_syncs,
default_mempool_config.shared_mempool_max_concurrent_inbound_syncs
);
assert_eq!(
mempool_config.max_broadcasts_per_peer,
default_mempool_config.max_broadcasts_per_peer
);
assert_eq!(mempool_config.max_broadcasts_per_peer, 2);
assert_eq!(
mempool_config.default_failovers,
default_mempool_config.default_failovers
);
assert_eq!(
mempool_config.shared_mempool_batch_size,
default_mempool_config.shared_mempool_batch_size
);
assert_eq!(mempool_config.shared_mempool_batch_size, 200);
assert_eq!(
mempool_config.shared_mempool_tick_interval_ms,
default_mempool_config.shared_mempool_tick_interval_ms
Expand All @@ -207,16 +207,24 @@ mod tests {
#[test]
fn test_optimize_vfn_config_no_overrides() {
// Create the default validator config
let local_shared_mempool_max_concurrent_inbound_syncs = 1;
let local_max_broadcasts_per_peer = 1;
let mut node_config = NodeConfig::get_default_vfn_config();
node_config
.mempool
.shared_mempool_max_concurrent_inbound_syncs =
local_shared_mempool_max_concurrent_inbound_syncs;
node_config.mempool.max_broadcasts_per_peer = local_max_broadcasts_per_peer;

// Create a local config YAML with some local overrides
let local_config_yaml = serde_yaml::from_str(
let local_config_yaml = serde_yaml::from_str(&format!(
r#"
mempool:
shared_mempool_max_concurrent_inbound_syncs: 4
max_broadcasts_per_peer: 1
shared_mempool_max_concurrent_inbound_syncs: {}
max_broadcasts_per_peer: {}
"#,
)
local_shared_mempool_max_concurrent_inbound_syncs, local_max_broadcasts_per_peer
))
.unwrap();

// Optimize the config and verify modifications are made
Expand All @@ -234,12 +242,11 @@ mod tests {
let default_mempool_config = MempoolConfig::default();
assert_eq!(
mempool_config.shared_mempool_max_concurrent_inbound_syncs,
4
local_shared_mempool_max_concurrent_inbound_syncs
);
assert_eq!(mempool_config.max_broadcasts_per_peer, 2);
assert_ne!(
mempool_config.default_failovers,
default_mempool_config.default_failovers
assert_eq!(
mempool_config.max_broadcasts_per_peer,
local_max_broadcasts_per_peer
);
assert_ne!(
mempool_config.shared_mempool_tick_interval_ms,
Expand Down
2 changes: 2 additions & 0 deletions mempool/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub struct LogSchema<'a> {
#[schema(debug)]
batch_id: Option<&'a MultiBatchId>,
backpressure: Option<bool>,
num_txns: Option<usize>,
}

impl<'a> LogSchema<'a> {
Expand Down Expand Up @@ -143,6 +144,7 @@ impl<'a> LogSchema<'a> {
upstream_network: None,
batch_id: None,
backpressure: None,
num_txns: None,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion mempool/src/shared_mempool/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,11 +481,12 @@ impl<NetworkClient: NetworkClientInterface<MempoolSyncMsg>> MempoolNetworkInterf

// Log all the metrics
let latency = start_time.elapsed();
trace!(
debug!(
LogSchema::event_log(LogEntry::BroadcastTransaction, LogEvent::Success)
.peer(&peer)
.batch_id(&batch_id)
.backpressure(scheduled_backoff)
.num_txns(num_txns)
);
let network_id = peer.network_id();
counters::shared_mempool_broadcast_size(network_id, num_txns);
Expand Down
2 changes: 1 addition & 1 deletion mempool/src/shared_mempool/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub(crate) async fn execute_broadcast<NetworkClient, TransactionValidator>(
.peer(&peer)
.error(&error)),
_ => {
trace!("{:?}", err)
debug!("{:?}", err)
},
}
}
Expand Down

0 comments on commit 429f4dd

Please sign in to comment.