Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Mempool] optimize fullnode broadcast hops for latency #9309

Merged
merged 4 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 35 additions & 28 deletions config/src/config/mempool_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,11 @@ impl Default for MempoolConfig {
MempoolConfig {
shared_mempool_tick_interval_ms: 50,
shared_mempool_backoff_interval_ms: 30_000,
shared_mempool_batch_size: 200,
shared_mempool_batch_size: 300,
shared_mempool_max_batch_bytes: MAX_APPLICATION_MESSAGE_SIZE as u64,
shared_mempool_ack_timeout_ms: 2_000,
shared_mempool_max_concurrent_inbound_syncs: 4,
max_broadcasts_per_peer: 2,
max_broadcasts_per_peer: 20,
max_network_channel_size: 1024,
mempool_snapshot_interval_secs: 180,
capacity: 2_000_000,
Expand Down Expand Up @@ -103,19 +103,25 @@ impl ConfigOptimizer for MempoolConfig {

// Change the default configs for VFNs
let mut modified_config = false;
if node_type.is_validator() {
// Set the max_broadcasts_per_peer to 2 (default is 20)
if local_mempool_config_yaml["max_broadcasts_per_peer"].is_null() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why do we need to override this for the validator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't used anymore because there is no mempool broadcast between validators with Quorum Store. For the time being, I want to preserve behavior as much as possible just in case we rollback Quorum Store.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment to clarify this?

mempool_config.max_broadcasts_per_peer = 2;
modified_config = true;
}
// Set the batch size per broadcast to 200 (default is 300)
if local_mempool_config_yaml["shared_mempool_batch_size"].is_null() {
mempool_config.shared_mempool_batch_size = 200;
modified_config = true;
}
}
if node_type.is_validator_fullnode() {
// Set the shared_mempool_max_concurrent_inbound_syncs to 16 (default is 4)
if local_mempool_config_yaml["shared_mempool_max_concurrent_inbound_syncs"].is_null() {
mempool_config.shared_mempool_max_concurrent_inbound_syncs = 16;
modified_config = true;
}

// Set the max_broadcasts_per_peer to 4 (default is 2)
if local_mempool_config_yaml["max_broadcasts_per_peer"].is_null() {
mempool_config.max_broadcasts_per_peer = 4;
modified_config = true;
}

// Set the default_failovers to 0 (default is 1)
if local_mempool_config_yaml["default_failovers"].is_null() {
mempool_config.default_failovers = 0;
Expand Down Expand Up @@ -158,9 +164,9 @@ mod tests {
mempool_config.shared_mempool_max_concurrent_inbound_syncs,
16
);
assert_eq!(mempool_config.max_broadcasts_per_peer, 4);
assert_eq!(mempool_config.max_broadcasts_per_peer, 20);
assert_eq!(mempool_config.default_failovers, 0);
assert_eq!(mempool_config.shared_mempool_batch_size, 200);
assert_eq!(mempool_config.shared_mempool_batch_size, 300);
assert_eq!(mempool_config.shared_mempool_tick_interval_ms, 10);
}

Expand All @@ -177,7 +183,7 @@ mod tests {
ChainId::mainnet(),
)
.unwrap();
assert!(!modified_config);
assert!(modified_config);

// Verify that all relevant fields are not modified
let mempool_config = &node_config.mempool;
Expand All @@ -186,18 +192,12 @@ mod tests {
mempool_config.shared_mempool_max_concurrent_inbound_syncs,
default_mempool_config.shared_mempool_max_concurrent_inbound_syncs
);
assert_eq!(
mempool_config.max_broadcasts_per_peer,
default_mempool_config.max_broadcasts_per_peer
);
assert_eq!(mempool_config.max_broadcasts_per_peer, 2);
assert_eq!(
mempool_config.default_failovers,
default_mempool_config.default_failovers
);
assert_eq!(
mempool_config.shared_mempool_batch_size,
default_mempool_config.shared_mempool_batch_size
);
assert_eq!(mempool_config.shared_mempool_batch_size, 200);
assert_eq!(
mempool_config.shared_mempool_tick_interval_ms,
default_mempool_config.shared_mempool_tick_interval_ms
Expand All @@ -207,16 +207,24 @@ mod tests {
#[test]
fn test_optimize_vfn_config_no_overrides() {
// Create the default validator config
let local_shared_mempool_max_concurrent_inbound_syncs = 1;
let local_max_broadcasts_per_peer = 1;
let mut node_config = NodeConfig::get_default_vfn_config();
node_config
.mempool
.shared_mempool_max_concurrent_inbound_syncs =
local_shared_mempool_max_concurrent_inbound_syncs;
node_config.mempool.max_broadcasts_per_peer = local_max_broadcasts_per_peer;

// Create a local config YAML with some local overrides
let local_config_yaml = serde_yaml::from_str(
let local_config_yaml = serde_yaml::from_str(&format!(
r#"
mempool:
shared_mempool_max_concurrent_inbound_syncs: 4
max_broadcasts_per_peer: 1
shared_mempool_max_concurrent_inbound_syncs: {}
max_broadcasts_per_peer: {}
"#,
)
local_shared_mempool_max_concurrent_inbound_syncs, local_max_broadcasts_per_peer
))
.unwrap();

// Optimize the config and verify modifications are made
Expand All @@ -234,12 +242,11 @@ mod tests {
let default_mempool_config = MempoolConfig::default();
assert_eq!(
mempool_config.shared_mempool_max_concurrent_inbound_syncs,
4
local_shared_mempool_max_concurrent_inbound_syncs
);
assert_eq!(mempool_config.max_broadcasts_per_peer, 2);
assert_ne!(
mempool_config.default_failovers,
default_mempool_config.default_failovers
assert_eq!(
mempool_config.max_broadcasts_per_peer,
local_max_broadcasts_per_peer
);
assert_ne!(
mempool_config.shared_mempool_tick_interval_ms,
Expand Down
2 changes: 2 additions & 0 deletions mempool/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub struct LogSchema<'a> {
#[schema(debug)]
batch_id: Option<&'a MultiBatchId>,
backpressure: Option<bool>,
num_txns: Option<usize>,
}

impl<'a> LogSchema<'a> {
Expand Down Expand Up @@ -143,6 +144,7 @@ impl<'a> LogSchema<'a> {
upstream_network: None,
batch_id: None,
backpressure: None,
num_txns: None,
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion mempool/src/shared_mempool/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,11 +481,12 @@ impl<NetworkClient: NetworkClientInterface<MempoolSyncMsg>> MempoolNetworkInterf

// Log all the metrics
let latency = start_time.elapsed();
trace!(
debug!(
LogSchema::event_log(LogEntry::BroadcastTransaction, LogEvent::Success)
.peer(&peer)
.batch_id(&batch_id)
.backpressure(scheduled_backoff)
.num_txns(num_txns)
);
let network_id = peer.network_id();
counters::shared_mempool_broadcast_size(network_id, num_txns);
Expand Down
2 changes: 1 addition & 1 deletion mempool/src/shared_mempool/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub(crate) async fn execute_broadcast<NetworkClient, TransactionValidator>(
.peer(&peer)
.error(&error)),
_ => {
trace!("{:?}", err)
debug!("{:?}", err)
},
}
}
Expand Down