From 8e26ae7a53d3958dc17523988534a492717fa9b4 Mon Sep 17 00:00:00 2001 From: Zekun Li Date: Mon, 17 Jun 2024 16:48:52 -0700 Subject: [PATCH] [network] make parallel deserialization in-order --- aptos-node/src/network.rs | 32 ++++---- consensus/src/dag/tests/integration_tests.rs | 2 +- consensus/src/network_tests.rs | 7 +- consensus/src/round_manager_test.rs | 2 +- consensus/src/twins/twins_node.rs | 2 +- mempool/src/tests/mocks.rs | 2 +- mempool/src/tests/node.rs | 2 +- mempool/src/tests/test_framework.rs | 8 +- network/builder/src/builder.rs | 5 ++ network/builder/src/dummy.rs | 4 +- network/framework/src/application/tests.rs | 8 +- .../framework/src/protocols/network/mod.rs | 79 +++++++------------ peer-monitoring-service/server/src/tests.rs | 1 + .../storage-service/server/src/tests/mock.rs | 1 + 14 files changed, 73 insertions(+), 82 deletions(-) diff --git a/aptos-node/src/network.rs b/aptos-node/src/network.rs index b3040a19a48ec4..6c2289e225ad87 100644 --- a/aptos-node/src/network.rs +++ b/aptos-node/src/network.rs @@ -302,7 +302,7 @@ pub fn setup_networks_and_get_interfaces( network_id, &network_config, consensus_network_configuration(node_config), - false, + true, ); consensus_network_handle = Some(network_handle); } @@ -315,7 +315,7 @@ pub fn setup_networks_and_get_interfaces( network_id, &network_config, dkg_network_configuration(node_config), - false, + true, ); dkg_network_handle = Some(network_handle); } @@ -328,7 +328,7 @@ pub fn setup_networks_and_get_interfaces( network_id, &network_config, jwk_consensus_network_configuration(node_config), - false, + true, ); jwk_consensus_network_handle = Some(network_handle); } @@ -345,7 +345,7 @@ pub fn setup_networks_and_get_interfaces( network_id, &network_config, consensus_observer_network_configuration(node_config), - true, + false, ); // Add the network handle to the set of handles @@ -364,7 +364,7 @@ pub fn setup_networks_and_get_interfaces( network_id, &network_config, mempool_network_configuration(node_config), - false, + true, ); mempool_network_handles.push(mempool_network_handle); @@ -374,7 +374,7 @@ pub fn setup_networks_and_get_interfaces( network_id, &network_config, peer_monitoring_network_configuration(node_config), - false, + true, ); peer_monitoring_service_network_handles.push(peer_monitoring_service_network_handle); @@ -384,7 +384,7 @@ pub fn setup_networks_and_get_interfaces( network_id, &network_config, storage_service_network_configuration(node_config), - false, + true, ); storage_service_network_handles.push(storage_service_network_handle); @@ -395,7 +395,7 @@ pub fn setup_networks_and_get_interfaces( network_id, &network_config, app_config, - false, + true, ); netbench_handles.push(netbench_handle); } @@ -471,21 +471,19 @@ fn create_network_runtime(network_config: &NetworkConfig) -> Runtime { /// Registers a new application client and service with the network fn register_client_and_service_with_network< - T: Serialize + for<'de> Deserialize<'de> + Send + 'static, + T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static, >( network_builder: &mut NetworkBuilder, network_id: NetworkId, network_config: &NetworkConfig, application_config: NetworkApplicationConfig, - no_parallel: bool, + allow_out_of_order_delivery: bool, ) -> ApplicationNetworkHandle { - let max_parallel_deserialization_tasks = if no_parallel { - None - } else { - network_config.max_parallel_deserialization_tasks - }; - let (network_sender, network_events) = network_builder - .add_client_and_service(&application_config, max_parallel_deserialization_tasks); + let (network_sender, network_events) = network_builder.add_client_and_service( + &application_config, + network_config.max_parallel_deserialization_tasks, + allow_out_of_order_delivery, + ); ApplicationNetworkHandle { network_id, network_sender, diff --git a/consensus/src/dag/tests/integration_tests.rs b/consensus/src/dag/tests/integration_tests.rs index 8b1fae9117b0a0..08d680d6016a42 100644 --- a/consensus/src/dag/tests/integration_tests.rs +++ b/consensus/src/dag/tests/integration_tests.rs @@ -159,7 +159,7 @@ fn create_network( playground.peer_protocols(), ); let consensus_network_client = ConsensusNetworkClient::new(network_client); - let network_events = NetworkEvents::new(consensus_rx, conn_status_rx, None); + let network_events = NetworkEvents::new(consensus_rx, conn_status_rx, None, true); let (self_sender, self_receiver) = aptos_channels::new_unbounded_test(); let network = NetworkSender::new(author, consensus_network_client, self_sender, validators); diff --git a/consensus/src/network_tests.rs b/consensus/src/network_tests.rs index a709940d8f4a9a..6f7400bd3c4d92 100644 --- a/consensus/src/network_tests.rs +++ b/consensus/src/network_tests.rs @@ -619,7 +619,7 @@ mod tests { validator_verifier.clone(), ); - let network_events = NetworkEvents::new(consensus_rx, conn_status_rx, None); + let network_events = NetworkEvents::new(consensus_rx, conn_status_rx, None, true); let network_service_events = NetworkServiceEvents::new(hashmap! {NetworkId::Validator => network_events}); let (task, receiver) = NetworkTask::new(network_service_events, self_receiver); @@ -736,7 +736,7 @@ mod tests { validator_verifier.clone(), ); - let network_events = NetworkEvents::new(consensus_rx, conn_status_rx, None); + let network_events = NetworkEvents::new(consensus_rx, conn_status_rx, None, true); let network_service_events = NetworkServiceEvents::new(hashmap! {NetworkId::Validator => network_events}); let (task, receiver) = NetworkTask::new(network_service_events, self_receiver); @@ -808,7 +808,8 @@ mod tests { aptos_channel::new(QueueStyle::FIFO, 8, None); let (connection_notifs_tx, connection_notifs_rx) = aptos_channel::new(QueueStyle::FIFO, 8, None); - let network_events = NetworkEvents::new(peer_mgr_notifs_rx, connection_notifs_rx, None); + let network_events = + NetworkEvents::new(peer_mgr_notifs_rx, connection_notifs_rx, None, true); let network_service_events = NetworkServiceEvents::new(hashmap! {NetworkId::Validator => network_events}); let (self_sender, self_receiver) = aptos_channels::new_unbounded_test(); diff --git a/consensus/src/round_manager_test.rs b/consensus/src/round_manager_test.rs index 0d571c195efc7b..2c4272dfb9f117 100644 --- a/consensus/src/round_manager_test.rs +++ b/consensus/src/round_manager_test.rs @@ -258,7 +258,7 @@ impl NodeSetup { playground.peer_protocols(), ); let consensus_network_client = ConsensusNetworkClient::new(network_client); - let network_events = NetworkEvents::new(consensus_rx, conn_status_rx, None); + let network_events = NetworkEvents::new(consensus_rx, conn_status_rx, None, true); let author = signer.author(); let twin_id = TwinId { id, author }; diff --git a/consensus/src/twins/twins_node.rs b/consensus/src/twins/twins_node.rs index de6ccff30acdda..51f22d7a66d139 100644 --- a/consensus/src/twins/twins_node.rs +++ b/consensus/src/twins/twins_node.rs @@ -99,7 +99,7 @@ impl SMRNode { playground.peer_protocols(), ); let consensus_network_client = ConsensusNetworkClient::new(network_client); - let network_events = NetworkEvents::new(consensus_rx, conn_notifs_channel, None); + let network_events = NetworkEvents::new(consensus_rx, conn_notifs_channel, None, true); let network_service_events = NetworkServiceEvents::new(hashmap! {NetworkId::Validator => network_events}); diff --git a/mempool/src/tests/mocks.rs b/mempool/src/tests/mocks.rs index b8d6fb315dc68b..d9ecc65be22ba8 100644 --- a/mempool/src/tests/mocks.rs +++ b/mempool/src/tests/mocks.rs @@ -126,7 +126,7 @@ impl MockSharedMempool { PeerManagerRequestSender::new(network_reqs_tx), ConnectionRequestSender::new(connection_reqs_tx), ); - let network_events = NetworkEvents::new(network_notifs_rx, conn_notifs_rx, None); + let network_events = NetworkEvents::new(network_notifs_rx, conn_notifs_rx, None, true); let (ac_client, client_events) = mpsc::channel(1_024); let (quorum_store_sender, quorum_store_receiver) = mpsc::channel(1_024); let (mempool_notifier, mempool_listener) = diff --git a/mempool/src/tests/node.rs b/mempool/src/tests/node.rs index c5bb9e5e44bcf1..eb18b372aaa91b 100644 --- a/mempool/src/tests/node.rs +++ b/mempool/src/tests/node.rs @@ -576,7 +576,7 @@ fn setup_node_network_interface( PeerManagerRequestSender::new(network_reqs_tx), ConnectionRequestSender::new(connection_reqs_tx), ); - let network_events = NetworkEvents::new(network_notifs_rx, conn_status_rx, None); + let network_events = NetworkEvents::new(network_notifs_rx, conn_status_rx, None, true); ( NodeNetworkInterface { diff --git a/mempool/src/tests/test_framework.rs b/mempool/src/tests/test_framework.rs index 5e33be87edef20..456b7681846ad7 100644 --- a/mempool/src/tests/test_framework.rs +++ b/mempool/src/tests/test_framework.rs @@ -547,8 +547,12 @@ fn setup_network( PeerManagerRequestSender::new(reqs_outbound_sender), ConnectionRequestSender::new(connection_outbound_sender), ); - let network_events = - NetworkEvents::new(reqs_inbound_receiver, connection_inbound_receiver, None); + let network_events = NetworkEvents::new( + reqs_inbound_receiver, + connection_inbound_receiver, + None, + true, + ); ( network_sender, diff --git a/network/builder/src/builder.rs b/network/builder/src/builder.rs index ecdd8a9ab83967..4a9ebafe50d08e 100644 --- a/network/builder/src/builder.rs +++ b/network/builder/src/builder.rs @@ -412,6 +412,7 @@ impl NetworkBuilder { let (hc_network_tx, hc_network_rx) = self.add_client_and_service( &health_checker::health_checker_network_config(), max_parallel_deserialization_tasks, + true, ); self.health_checker_builder = Some(HealthCheckerBuilder::new( self.network_context(), @@ -437,12 +438,14 @@ impl NetworkBuilder { &mut self, config: &NetworkApplicationConfig, max_parallel_deserialization_tasks: Option, + allow_out_of_order_delivery: bool, ) -> (SenderT, EventsT) { ( self.add_client(&config.network_client_config), self.add_service( &config.network_service_config, max_parallel_deserialization_tasks, + allow_out_of_order_delivery, ), ) } @@ -461,6 +464,7 @@ impl NetworkBuilder { &mut self, config: &NetworkServiceConfig, max_parallel_deserialization_tasks: Option, + allow_out_of_order_delivery: bool, ) -> EventsT { let (peer_mgr_reqs_rx, connection_notifs_rx) = self.peer_manager_builder.add_service(config); @@ -468,6 +472,7 @@ impl NetworkBuilder { peer_mgr_reqs_rx, connection_notifs_rx, max_parallel_deserialization_tasks, + allow_out_of_order_delivery, ) } } diff --git a/network/builder/src/dummy.rs b/network/builder/src/dummy.rs index bf6e5bd66c0fad..a1d94f944c3758 100644 --- a/network/builder/src/dummy.rs +++ b/network/builder/src/dummy.rs @@ -111,7 +111,7 @@ pub fn setup_network() -> DummyNetwork { ); let (listener_sender, mut listener_events) = network_builder - .add_client_and_service::<_, DummyNetworkEvents>(&dummy_network_config(), None); + .add_client_and_service::<_, DummyNetworkEvents>(&dummy_network_config(), None, true); network_builder.build(runtime.handle().clone()).start(); let listener_network_client = NetworkClient::new( vec![TEST_DIRECT_SEND_PROTOCOL], @@ -144,7 +144,7 @@ pub fn setup_network() -> DummyNetwork { ); let (dialer_sender, mut dialer_events) = network_builder - .add_client_and_service::<_, DummyNetworkEvents>(&dummy_network_config(), None); + .add_client_and_service::<_, DummyNetworkEvents>(&dummy_network_config(), None, true); network_builder.build(runtime.handle().clone()).start(); let dialer_network_client = NetworkClient::new( vec![TEST_DIRECT_SEND_PROTOCOL], diff --git a/network/framework/src/application/tests.rs b/network/framework/src/application/tests.rs index 3355a737f732e9..b59b80e0c7ed29 100644 --- a/network/framework/src/application/tests.rs +++ b/network/framework/src/application/tests.rs @@ -926,8 +926,12 @@ fn create_network_sender_and_events( PeerManagerRequestSender::new(outbound_request_sender), ConnectionRequestSender::new(connection_outbound_sender), ); - let network_events = - NetworkEvents::new(inbound_request_receiver, connection_inbound_receiver, None); + let network_events = NetworkEvents::new( + inbound_request_receiver, + connection_inbound_receiver, + None, + true, + ); // Save the sender, events and receivers network_senders.insert(*network_id, network_sender); diff --git a/network/framework/src/protocols/network/mod.rs b/network/framework/src/protocols/network/mod.rs index bb707ec19e2302..dcc7cf5b158477 100644 --- a/network/framework/src/protocols/network/mod.rs +++ b/network/framework/src/protocols/network/mod.rs @@ -14,7 +14,7 @@ use crate::{ transport::ConnectionMetadata, ProtocolId, }; -use aptos_channels::{aptos_channel, message_queues::QueueStyle}; +use aptos_channels::aptos_channel; use aptos_logger::prelude::*; use aptos_short_hex_str::AsShortHexStr; use aptos_types::{network_address::NetworkAddress, PeerId}; @@ -24,17 +24,13 @@ use futures::{ stream::{FusedStream, Map, Select, Stream, StreamExt}, task::{Context, Poll}, }; -use futures_util::FutureExt; use pin_project::pin_project; use serde::{de::DeserializeOwned, Serialize}; -use std::{cmp::min, fmt::Debug, marker::PhantomData, pin::Pin, time::Duration}; +use std::{cmp::min, fmt::Debug, future, marker::PhantomData, pin::Pin, time::Duration}; pub trait Message: DeserializeOwned + Serialize {} impl Message for T {} -// TODO: do we want to make this configurable? -const MAX_DESERIALIZATION_QUEUE_SIZE_PER_PEER: usize = 50; - /// Events received by network clients in a validator /// /// An enumeration of the various types of messages that the network will be sending @@ -157,7 +153,7 @@ impl NetworkApplicationConfig { pub struct NetworkEvents { #[pin] event_stream: Select< - aptos_channel::Receiver>, + Pin> + Send + Sync + 'static>>, Map< aptos_channel::Receiver, fn(ConnectionNotification) -> Event, @@ -172,65 +168,46 @@ pub trait NewNetworkEvents { peer_mgr_notifs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerNotification>, connection_notifs_rx: aptos_channel::Receiver, max_parallel_deserialization_tasks: Option, + allow_out_of_order_delivery: bool, ) -> Self; } -impl NewNetworkEvents for NetworkEvents { +impl NewNetworkEvents for NetworkEvents { fn new( peer_mgr_notifs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerNotification>, connection_notifs_rx: aptos_channel::Receiver, max_parallel_deserialization_tasks: Option, + allow_out_of_order_delivery: bool, ) -> Self { - // Create a channel for deserialized messages - let (deserialized_message_sender, deserialized_message_receiver) = aptos_channel::new( - QueueStyle::FIFO, - MAX_DESERIALIZATION_QUEUE_SIZE_PER_PEER, - None, - ); - - // Deserialize the peer manager notifications in parallel (for each - // network application) and send them to the receiver. Note: this - // may cause out of order message delivery, but applications - // should already be handling this. - tokio::spawn(async move { - peer_mgr_notifs_rx - .for_each_concurrent( - max_parallel_deserialization_tasks, - move |peer_manager_notification| { - // Get the peer ID for the notification - let deserialized_message_sender = deserialized_message_sender.clone(); - let peer_id_for_notification = peer_manager_notification.get_peer_id(); - - // Spawn a new blocking task to deserialize the message - tokio::task::spawn_blocking(move || { - if let Some(deserialized_message) = - peer_mgr_notif_to_event(peer_manager_notification) - { - if let Err(error) = deserialized_message_sender - .push(peer_id_for_notification, deserialized_message) - { - warn!( - "Failed to send deserialized message to receiver: {:?}", - error - ); - } - } - }) - .map(|_| ()) - }, - ) - .await + // Determine the number of parallel deserialization tasks to use + let max_parallel_deserialization_tasks = max_parallel_deserialization_tasks.unwrap_or(1); + + let data_event_stream = peer_mgr_notifs_rx.map(|notification| { + tokio::task::spawn_blocking(move || peer_mgr_notif_to_event(notification)) }); + let data_event_stream: Pin< + Box> + Send + Sync + 'static>, + > = if allow_out_of_order_delivery { + Box::pin( + data_event_stream + .buffer_unordered(max_parallel_deserialization_tasks) + .filter_map(|res| future::ready(res.expect("JoinError from spawn blocking"))), + ) + } else { + Box::pin( + data_event_stream + .buffered(max_parallel_deserialization_tasks) + .filter_map(|res| future::ready(res.expect("JoinError from spawn blocking"))), + ) + }; + // Process the control messages let control_event_stream = connection_notifs_rx .map(control_msg_to_event as fn(ConnectionNotification) -> Event); Self { - event_stream: ::futures::stream::select( - deserialized_message_receiver, - control_event_stream, - ), + event_stream: ::futures::stream::select(data_event_stream, control_event_stream), _marker: PhantomData, } } diff --git a/peer-monitoring-service/server/src/tests.rs b/peer-monitoring-service/server/src/tests.rs index 544f5958a95351..a43967c7333218 100644 --- a/peer-monitoring-service/server/src/tests.rs +++ b/peer-monitoring-service/server/src/tests.rs @@ -511,6 +511,7 @@ impl MockClient { peer_manager_notification_receiver, connection_notification_receiver, None, + true, ); network_and_events.insert(network_id, network_events); peer_manager_notifiers.insert(network_id, peer_manager_notifier); diff --git a/state-sync/storage-service/server/src/tests/mock.rs b/state-sync/storage-service/server/src/tests/mock.rs index 8799dbfac27efe..ec3a4b73c800d1 100644 --- a/state-sync/storage-service/server/src/tests/mock.rs +++ b/state-sync/storage-service/server/src/tests/mock.rs @@ -103,6 +103,7 @@ impl MockClient { peer_manager_notification_receiver, connection_notification_receiver, None, + true, ); network_and_events.insert(network_id, network_events); peer_manager_notifiers.insert(network_id, peer_manager_notifier);