Skip to content

Commit

Permalink
[network] make parallel deserialization in-order
Browse files Browse the repository at this point in the history
  • Loading branch information
Zekun Li authored and Zekun Li committed Jun 18, 2024
1 parent 2823f2e commit 8e26ae7
Show file tree
Hide file tree
Showing 14 changed files with 73 additions and 82 deletions.
32 changes: 15 additions & 17 deletions aptos-node/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ pub fn setup_networks_and_get_interfaces(
network_id,
&network_config,
consensus_network_configuration(node_config),
false,
true,
);
consensus_network_handle = Some(network_handle);
}
Expand All @@ -315,7 +315,7 @@ pub fn setup_networks_and_get_interfaces(
network_id,
&network_config,
dkg_network_configuration(node_config),
false,
true,
);
dkg_network_handle = Some(network_handle);
}
Expand All @@ -328,7 +328,7 @@ pub fn setup_networks_and_get_interfaces(
network_id,
&network_config,
jwk_consensus_network_configuration(node_config),
false,
true,
);
jwk_consensus_network_handle = Some(network_handle);
}
Expand All @@ -345,7 +345,7 @@ pub fn setup_networks_and_get_interfaces(
network_id,
&network_config,
consensus_observer_network_configuration(node_config),
true,
false,
);

// Add the network handle to the set of handles
Expand All @@ -364,7 +364,7 @@ pub fn setup_networks_and_get_interfaces(
network_id,
&network_config,
mempool_network_configuration(node_config),
false,
true,
);
mempool_network_handles.push(mempool_network_handle);

Expand All @@ -374,7 +374,7 @@ pub fn setup_networks_and_get_interfaces(
network_id,
&network_config,
peer_monitoring_network_configuration(node_config),
false,
true,
);
peer_monitoring_service_network_handles.push(peer_monitoring_service_network_handle);

Expand All @@ -384,7 +384,7 @@ pub fn setup_networks_and_get_interfaces(
network_id,
&network_config,
storage_service_network_configuration(node_config),
false,
true,
);
storage_service_network_handles.push(storage_service_network_handle);

Expand All @@ -395,7 +395,7 @@ pub fn setup_networks_and_get_interfaces(
network_id,
&network_config,
app_config,
false,
true,
);
netbench_handles.push(netbench_handle);
}
Expand Down Expand Up @@ -471,21 +471,19 @@ fn create_network_runtime(network_config: &NetworkConfig) -> Runtime {

/// Registers a new application client and service with the network
fn register_client_and_service_with_network<
T: Serialize + for<'de> Deserialize<'de> + Send + 'static,
T: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static,
>(
network_builder: &mut NetworkBuilder,
network_id: NetworkId,
network_config: &NetworkConfig,
application_config: NetworkApplicationConfig,
no_parallel: bool,
allow_out_of_order_delivery: bool,
) -> ApplicationNetworkHandle<T> {
let max_parallel_deserialization_tasks = if no_parallel {
None
} else {
network_config.max_parallel_deserialization_tasks
};
let (network_sender, network_events) = network_builder
.add_client_and_service(&application_config, max_parallel_deserialization_tasks);
let (network_sender, network_events) = network_builder.add_client_and_service(
&application_config,
network_config.max_parallel_deserialization_tasks,
allow_out_of_order_delivery,
);
ApplicationNetworkHandle {
network_id,
network_sender,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ fn create_network(
playground.peer_protocols(),
);
let consensus_network_client = ConsensusNetworkClient::new(network_client);
let network_events = NetworkEvents::new(consensus_rx, conn_status_rx, None);
let network_events = NetworkEvents::new(consensus_rx, conn_status_rx, None, true);

let (self_sender, self_receiver) = aptos_channels::new_unbounded_test();
let network = NetworkSender::new(author, consensus_network_client, self_sender, validators);
Expand Down
7 changes: 4 additions & 3 deletions consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ mod tests {
validator_verifier.clone(),
);

let network_events = NetworkEvents::new(consensus_rx, conn_status_rx, None);
let network_events = NetworkEvents::new(consensus_rx, conn_status_rx, None, true);
let network_service_events =
NetworkServiceEvents::new(hashmap! {NetworkId::Validator => network_events});
let (task, receiver) = NetworkTask::new(network_service_events, self_receiver);
Expand Down Expand Up @@ -736,7 +736,7 @@ mod tests {
validator_verifier.clone(),
);

let network_events = NetworkEvents::new(consensus_rx, conn_status_rx, None);
let network_events = NetworkEvents::new(consensus_rx, conn_status_rx, None, true);
let network_service_events =
NetworkServiceEvents::new(hashmap! {NetworkId::Validator => network_events});
let (task, receiver) = NetworkTask::new(network_service_events, self_receiver);
Expand Down Expand Up @@ -808,7 +808,8 @@ mod tests {
aptos_channel::new(QueueStyle::FIFO, 8, None);
let (connection_notifs_tx, connection_notifs_rx) =
aptos_channel::new(QueueStyle::FIFO, 8, None);
let network_events = NetworkEvents::new(peer_mgr_notifs_rx, connection_notifs_rx, None);
let network_events =
NetworkEvents::new(peer_mgr_notifs_rx, connection_notifs_rx, None, true);
let network_service_events =
NetworkServiceEvents::new(hashmap! {NetworkId::Validator => network_events});
let (self_sender, self_receiver) = aptos_channels::new_unbounded_test();
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/round_manager_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl NodeSetup {
playground.peer_protocols(),
);
let consensus_network_client = ConsensusNetworkClient::new(network_client);
let network_events = NetworkEvents::new(consensus_rx, conn_status_rx, None);
let network_events = NetworkEvents::new(consensus_rx, conn_status_rx, None, true);
let author = signer.author();

let twin_id = TwinId { id, author };
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/twins/twins_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl SMRNode {
playground.peer_protocols(),
);
let consensus_network_client = ConsensusNetworkClient::new(network_client);
let network_events = NetworkEvents::new(consensus_rx, conn_notifs_channel, None);
let network_events = NetworkEvents::new(consensus_rx, conn_notifs_channel, None, true);
let network_service_events =
NetworkServiceEvents::new(hashmap! {NetworkId::Validator => network_events});

Expand Down
2 changes: 1 addition & 1 deletion mempool/src/tests/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ impl MockSharedMempool {
PeerManagerRequestSender::new(network_reqs_tx),
ConnectionRequestSender::new(connection_reqs_tx),
);
let network_events = NetworkEvents::new(network_notifs_rx, conn_notifs_rx, None);
let network_events = NetworkEvents::new(network_notifs_rx, conn_notifs_rx, None, true);
let (ac_client, client_events) = mpsc::channel(1_024);
let (quorum_store_sender, quorum_store_receiver) = mpsc::channel(1_024);
let (mempool_notifier, mempool_listener) =
Expand Down
2 changes: 1 addition & 1 deletion mempool/src/tests/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ fn setup_node_network_interface(
PeerManagerRequestSender::new(network_reqs_tx),
ConnectionRequestSender::new(connection_reqs_tx),
);
let network_events = NetworkEvents::new(network_notifs_rx, conn_status_rx, None);
let network_events = NetworkEvents::new(network_notifs_rx, conn_status_rx, None, true);

(
NodeNetworkInterface {
Expand Down
8 changes: 6 additions & 2 deletions mempool/src/tests/test_framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,12 @@ fn setup_network(
PeerManagerRequestSender::new(reqs_outbound_sender),
ConnectionRequestSender::new(connection_outbound_sender),
);
let network_events =
NetworkEvents::new(reqs_inbound_receiver, connection_inbound_receiver, None);
let network_events = NetworkEvents::new(
reqs_inbound_receiver,
connection_inbound_receiver,
None,
true,
);

(
network_sender,
Expand Down
5 changes: 5 additions & 0 deletions network/builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ impl NetworkBuilder {
let (hc_network_tx, hc_network_rx) = self.add_client_and_service(
&health_checker::health_checker_network_config(),
max_parallel_deserialization_tasks,
true,
);
self.health_checker_builder = Some(HealthCheckerBuilder::new(
self.network_context(),
Expand All @@ -437,12 +438,14 @@ impl NetworkBuilder {
&mut self,
config: &NetworkApplicationConfig,
max_parallel_deserialization_tasks: Option<usize>,
allow_out_of_order_delivery: bool,
) -> (SenderT, EventsT) {
(
self.add_client(&config.network_client_config),
self.add_service(
&config.network_service_config,
max_parallel_deserialization_tasks,
allow_out_of_order_delivery,
),
)
}
Expand All @@ -461,13 +464,15 @@ impl NetworkBuilder {
&mut self,
config: &NetworkServiceConfig,
max_parallel_deserialization_tasks: Option<usize>,
allow_out_of_order_delivery: bool,
) -> EventsT {
let (peer_mgr_reqs_rx, connection_notifs_rx) =
self.peer_manager_builder.add_service(config);
EventsT::new(
peer_mgr_reqs_rx,
connection_notifs_rx,
max_parallel_deserialization_tasks,
allow_out_of_order_delivery,
)
}
}
Expand Down
4 changes: 2 additions & 2 deletions network/builder/src/dummy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub fn setup_network() -> DummyNetwork {
);

let (listener_sender, mut listener_events) = network_builder
.add_client_and_service::<_, DummyNetworkEvents>(&dummy_network_config(), None);
.add_client_and_service::<_, DummyNetworkEvents>(&dummy_network_config(), None, true);
network_builder.build(runtime.handle().clone()).start();
let listener_network_client = NetworkClient::new(
vec![TEST_DIRECT_SEND_PROTOCOL],
Expand Down Expand Up @@ -144,7 +144,7 @@ pub fn setup_network() -> DummyNetwork {
);

let (dialer_sender, mut dialer_events) = network_builder
.add_client_and_service::<_, DummyNetworkEvents>(&dummy_network_config(), None);
.add_client_and_service::<_, DummyNetworkEvents>(&dummy_network_config(), None, true);
network_builder.build(runtime.handle().clone()).start();
let dialer_network_client = NetworkClient::new(
vec![TEST_DIRECT_SEND_PROTOCOL],
Expand Down
8 changes: 6 additions & 2 deletions network/framework/src/application/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,8 +926,12 @@ fn create_network_sender_and_events(
PeerManagerRequestSender::new(outbound_request_sender),
ConnectionRequestSender::new(connection_outbound_sender),
);
let network_events =
NetworkEvents::new(inbound_request_receiver, connection_inbound_receiver, None);
let network_events = NetworkEvents::new(
inbound_request_receiver,
connection_inbound_receiver,
None,
true,
);

// Save the sender, events and receivers
network_senders.insert(*network_id, network_sender);
Expand Down
79 changes: 28 additions & 51 deletions network/framework/src/protocols/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{
transport::ConnectionMetadata,
ProtocolId,
};
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_channels::aptos_channel;
use aptos_logger::prelude::*;
use aptos_short_hex_str::AsShortHexStr;
use aptos_types::{network_address::NetworkAddress, PeerId};
Expand All @@ -24,17 +24,13 @@ use futures::{
stream::{FusedStream, Map, Select, Stream, StreamExt},
task::{Context, Poll},
};
use futures_util::FutureExt;
use pin_project::pin_project;
use serde::{de::DeserializeOwned, Serialize};
use std::{cmp::min, fmt::Debug, marker::PhantomData, pin::Pin, time::Duration};
use std::{cmp::min, fmt::Debug, future, marker::PhantomData, pin::Pin, time::Duration};

pub trait Message: DeserializeOwned + Serialize {}
impl<T: DeserializeOwned + Serialize> Message for T {}

// TODO: do we want to make this configurable?
const MAX_DESERIALIZATION_QUEUE_SIZE_PER_PEER: usize = 50;

/// Events received by network clients in a validator
///
/// An enumeration of the various types of messages that the network will be sending
Expand Down Expand Up @@ -157,7 +153,7 @@ impl NetworkApplicationConfig {
pub struct NetworkEvents<TMessage> {
#[pin]
event_stream: Select<
aptos_channel::Receiver<PeerId, Event<TMessage>>,
Pin<Box<dyn Stream<Item = Event<TMessage>> + Send + Sync + 'static>>,
Map<
aptos_channel::Receiver<PeerId, ConnectionNotification>,
fn(ConnectionNotification) -> Event<TMessage>,
Expand All @@ -172,65 +168,46 @@ pub trait NewNetworkEvents {
peer_mgr_notifs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerNotification>,
connection_notifs_rx: aptos_channel::Receiver<PeerId, ConnectionNotification>,
max_parallel_deserialization_tasks: Option<usize>,
allow_out_of_order_delivery: bool,
) -> Self;
}

impl<TMessage: Message + Send + 'static> NewNetworkEvents for NetworkEvents<TMessage> {
impl<TMessage: Message + Send + Sync + 'static> NewNetworkEvents for NetworkEvents<TMessage> {
fn new(
peer_mgr_notifs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerNotification>,
connection_notifs_rx: aptos_channel::Receiver<PeerId, ConnectionNotification>,
max_parallel_deserialization_tasks: Option<usize>,
allow_out_of_order_delivery: bool,
) -> Self {
// Create a channel for deserialized messages
let (deserialized_message_sender, deserialized_message_receiver) = aptos_channel::new(
QueueStyle::FIFO,
MAX_DESERIALIZATION_QUEUE_SIZE_PER_PEER,
None,
);

// Deserialize the peer manager notifications in parallel (for each
// network application) and send them to the receiver. Note: this
// may cause out of order message delivery, but applications
// should already be handling this.
tokio::spawn(async move {
peer_mgr_notifs_rx
.for_each_concurrent(
max_parallel_deserialization_tasks,
move |peer_manager_notification| {
// Get the peer ID for the notification
let deserialized_message_sender = deserialized_message_sender.clone();
let peer_id_for_notification = peer_manager_notification.get_peer_id();

// Spawn a new blocking task to deserialize the message
tokio::task::spawn_blocking(move || {
if let Some(deserialized_message) =
peer_mgr_notif_to_event(peer_manager_notification)
{
if let Err(error) = deserialized_message_sender
.push(peer_id_for_notification, deserialized_message)
{
warn!(
"Failed to send deserialized message to receiver: {:?}",
error
);
}
}
})
.map(|_| ())
},
)
.await
// Determine the number of parallel deserialization tasks to use
let max_parallel_deserialization_tasks = max_parallel_deserialization_tasks.unwrap_or(1);

let data_event_stream = peer_mgr_notifs_rx.map(|notification| {
tokio::task::spawn_blocking(move || peer_mgr_notif_to_event(notification))
});

let data_event_stream: Pin<
Box<dyn Stream<Item = Event<TMessage>> + Send + Sync + 'static>,
> = if allow_out_of_order_delivery {
Box::pin(
data_event_stream
.buffer_unordered(max_parallel_deserialization_tasks)
.filter_map(|res| future::ready(res.expect("JoinError from spawn blocking"))),
)
} else {
Box::pin(
data_event_stream
.buffered(max_parallel_deserialization_tasks)
.filter_map(|res| future::ready(res.expect("JoinError from spawn blocking"))),
)
};

// Process the control messages
let control_event_stream = connection_notifs_rx
.map(control_msg_to_event as fn(ConnectionNotification) -> Event<TMessage>);

Self {
event_stream: ::futures::stream::select(
deserialized_message_receiver,
control_event_stream,
),
event_stream: ::futures::stream::select(data_event_stream, control_event_stream),
_marker: PhantomData,
}
}
Expand Down
1 change: 1 addition & 0 deletions peer-monitoring-service/server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,7 @@ impl MockClient {
peer_manager_notification_receiver,
connection_notification_receiver,
None,
true,
);
network_and_events.insert(network_id, network_events);
peer_manager_notifiers.insert(network_id, peer_manager_notifier);
Expand Down
Loading

0 comments on commit 8e26ae7

Please sign in to comment.