Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Squash network inbound queues #13956

Merged
merged 37 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
374aa04
squash inbound path inital work. local cluster works.
brianolson Jun 28, 2024
9865dd7
clippy passes
brianolson Jul 9, 2024
9573f16
network test fixes
brianolson Jul 9, 2024
5c1a6e3
Merge branch 'main' into squash-inbound
brianolson Jul 9, 2024
f5b929f
cleanup. fmt. metrics.
brianolson Jul 9, 2024
3af93ab
delete dead code
brianolson Jul 10, 2024
6f364dd
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 10, 2024
b645b2e
PeerNotification goes away
brianolson Jul 10, 2024
841d909
PeerNotification goes away
brianolson Jul 10, 2024
dac4777
comment deprecate PeerManagerNotification
brianolson Jul 10, 2024
1bc9f43
Merge branch 'squash-inbound' into squash-inbound-2
brianolson Jul 10, 2024
62b6dff
add inbound queue delay metric aptos_network_inbound_queue_time
brianolson Jul 10, 2024
64f3ea8
Merge branch 'squash-inbound' into squash-inbound-2
brianolson Jul 10, 2024
026c14a
fmt
brianolson Jul 10, 2024
34bac4e
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 10, 2024
085901a
fmt
brianolson Jul 11, 2024
f41fc0d
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 11, 2024
aec0b06
rx_at -> receive_timestamp_micros
brianolson Jul 11, 2024
28435c3
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 12, 2024
b3c2b64
Merge branch 'squash-inbound' into squash-inbound-2
brianolson Jul 12, 2024
adce732
less PeerManagerNotification
brianolson Jul 15, 2024
c8a822c
PR cleanup
brianolson Jul 15, 2024
b135f3b
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 15, 2024
f935228
cleanup and PR refactor
brianolson Jul 15, 2024
7e4d5bb
unix_micros() util
brianolson Jul 15, 2024
26ec289
cleanup
brianolson Jul 15, 2024
ddba8aa
Merge branch 'squash-inbound' into squash-inbound-2
brianolson Jul 15, 2024
834ab52
fix mempool tests
brianolson Jul 15, 2024
925ed26
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 16, 2024
09081fd
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 17, 2024
506982b
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 18, 2024
504162b
drop unused max_concurrent_network_reqs
brianolson Jul 19, 2024
f5dfe7d
drop unused PeerManagerNotification
brianolson Jul 19, 2024
9298a59
count UNKNOWN_LABEL inbound messages
brianolson Jul 19, 2024
c04a29c
Merge remote-tracking branch 'origin/main' into squash-inbound-2
brianolson Jul 19, 2024
a744ce8
Merge remote-tracking branch 'origin/main' into squash-inbound
brianolson Jul 19, 2024
5634764
PR cleanup
brianolson Jul 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 59 additions & 37 deletions consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
test_utils::{self, consensus_runtime, placeholder_ledger_info, timed_block_on},
};
use aptos_channels::{self, aptos_channel, message_queues::QueueStyle};
use aptos_config::network_id::NetworkId;
use aptos_config::network_id::{NetworkId, PeerNetworkId};
use aptos_consensus_types::{
block::{block_test_utils::certificate_for_genesis, Block},
common::Author,
Expand All @@ -26,9 +26,11 @@ use aptos_network::{
PeerManagerRequestSender,
},
protocols::{
network::{NewNetworkEvents, RpcError, SerializedRequest},
rpc::InboundRpcRequest,
wire::handshake::v1::ProtocolIdSet,
network::{NewNetworkEvents, ReceivedMessage, RpcError, SerializedRequest},
wire::{
handshake::v1::ProtocolIdSet,
messaging::v1::{DirectSendMsg, NetworkMessage, RpcRequest},
},
},
ProtocolId,
};
Expand Down Expand Up @@ -65,11 +67,8 @@ pub struct NetworkPlayground {
/// These events will usually be handled by the event loop spawned in
/// `ConsensusNetworkImpl`.
///
node_consensus_txs: Arc<
Mutex<
HashMap<TwinId, aptos_channel::Sender<(PeerId, ProtocolId), PeerManagerNotification>>,
>,
>,
node_consensus_txs:
Arc<Mutex<HashMap<TwinId, aptos_channel::Sender<(PeerId, ProtocolId), ReceivedMessage>>>>,
/// Nodes' outbound handlers forward their outbound non-rpc messages to this
/// queue.
outbound_msgs_tx: mpsc::Sender<(TwinId, PeerManagerRequest)>,
Expand Down Expand Up @@ -131,12 +130,7 @@ impl NetworkPlayground {
mut network_reqs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
mut outbound_msgs_tx: mpsc::Sender<(TwinId, PeerManagerRequest)>,
node_consensus_txs: Arc<
Mutex<
HashMap<
TwinId,
aptos_channel::Sender<(PeerId, ProtocolId), PeerManagerNotification>,
>,
>,
Mutex<HashMap<TwinId, aptos_channel::Sender<(PeerId, ProtocolId), ReceivedMessage>>>,
>,
author_to_twin_ids: Arc<RwLock<AuthorToTwinIds>>,
) {
Expand Down Expand Up @@ -175,16 +169,23 @@ impl NetworkPlayground {
let node_consensus_tx =
node_consensus_txs.lock().get(dst_twin_id).unwrap().clone();

let inbound_req = InboundRpcRequest {
protocol_id: outbound_req.protocol_id,
data: outbound_req.data,
res_tx: outbound_req.res_tx,
};

node_consensus_tx
.push(
(src_twin_id.author, ProtocolId::ConsensusRpcBcs),
PeerManagerNotification::RecvRpc(src_twin_id.author, inbound_req),
ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest {
protocol_id: outbound_req.protocol_id,
request_id: 123,
priority: 0,
raw_request: outbound_req.data.into(),
}),
sender: PeerNetworkId::new(
NetworkId::Validator,
src_twin_id.author,
),
receive_timestamp_micros: 0,
rpc_replier: Some(Arc::new(outbound_req.res_tx)),
},
)
.unwrap();
},
Expand All @@ -201,7 +202,7 @@ impl NetworkPlayground {
pub fn add_node(
&mut self,
twin_id: TwinId,
consensus_tx: aptos_channel::Sender<(PeerId, ProtocolId), PeerManagerNotification>,
consensus_tx: aptos_channel::Sender<(PeerId, ProtocolId), ReceivedMessage>,
network_reqs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
conn_mgr_reqs_rx: aptos_channels::Receiver<aptos_network::ConnectivityRequest>,
) {
Expand Down Expand Up @@ -241,10 +242,20 @@ impl NetworkPlayground {
.clone();

// copy message data
let msg_copy = match &msg_notif {
let (source_address, msg, rmsg) = match &msg_notif {
PeerManagerNotification::RecvMessage(src, msg) => {
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: msg.protocol_id,
priority: 0,
raw_msg: msg.mdata.clone().into(),
}),
sender: PeerNetworkId::new(NetworkId::Validator, *src),
receive_timestamp_micros: 0,
rpc_replier: None,
};
let msg: ConsensusMsg = msg.to_message().unwrap();
(*src, msg)
(*src, msg, rmsg)
},
msg_notif => panic!(
"[network playground] Unexpected PeerManagerNotification: {:?}",
Expand All @@ -253,9 +264,9 @@ impl NetworkPlayground {
};
let _ = node_consensus_tx.push(
(src_twin_id.author, ProtocolId::ConsensusDirectSendBcs),
msg_notif,
rmsg,
);
msg_copy
(source_address, msg)
}

/// Wait for exactly `num_messages` to be enqueued and delivered. Return a
Expand Down Expand Up @@ -531,7 +542,6 @@ mod tests {
storage::PeersAndMetadata,
},
protocols::{
direct_send::Message,
network,
network::{NetworkEvents, NewNetworkSender},
},
Expand Down Expand Up @@ -849,10 +859,16 @@ mod tests {

let peer_id = PeerId::random();
let protocol_id = ProtocolId::ConsensusDirectSendBcs;
let bad_msg = PeerManagerNotification::RecvMessage(peer_id, Message {
protocol_id,
mdata: Bytes::from_static(b"\xde\xad\xbe\xef"),
});
let bad_msg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id,
priority: 0,
raw_msg: Bytes::from_static(b"\xde\xad\xbe\xef").into(),
}),
sender: PeerNetworkId::new(NetworkId::Validator, peer_id),
receive_timestamp_micros: 0,
rpc_replier: None,
};

peer_mgr_notifs_tx
.push((peer_id, protocol_id), bad_msg)
Expand All @@ -864,11 +880,17 @@ mod tests {

let protocol_id = ProtocolId::ConsensusRpcJson;
let (res_tx, _res_rx) = oneshot::channel();
let liveness_check_msg = PeerManagerNotification::RecvRpc(peer_id, InboundRpcRequest {
protocol_id,
data: Bytes::from(serde_json::to_vec(&liveness_check_msg).unwrap()),
res_tx,
});
let liveness_check_msg = ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest {
protocol_id,
request_id: 0, // TODO: seq?
priority: 0,
raw_request: Bytes::from(serde_json::to_vec(&liveness_check_msg).unwrap()).into(),
}),
sender: PeerNetworkId::new(NetworkId::Validator, peer_id),
receive_timestamp_micros: 0,
rpc_replier: Some(Arc::new(res_tx)),
};

peer_mgr_notifs_tx
.push((peer_id, protocol_id), liveness_check_msg)
Expand Down
1 change: 1 addition & 0 deletions mempool/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ mod core_mempool_test;
#[cfg(test)]
mod integration_tests;
#[cfg(test)]
#[cfg(wrong_network_abstraction)]
mod multi_node_test;
#[cfg(test)]
mod node;
Expand Down
72 changes: 23 additions & 49 deletions mempool/src/tests/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,55 +2,15 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{
core_mempool::{CoreMempool, TimelineState},
network::MempoolSyncMsg,
shared_mempool::{start_shared_mempool, types::SharedMempoolNotification},
tests::common::TestTransaction,
};
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::{
config::{Identity, NodeConfig, PeerRole, RoleType},
config::{PeerRole, RoleType},
network_id::{NetworkId, PeerNetworkId},
};
use aptos_crypto::{x25519::PrivateKey, Uniform};
use aptos_event_notifications::{ReconfigNotification, ReconfigNotificationListener};
use aptos_infallible::{Mutex, MutexGuard, RwLock};
use aptos_netcore::transport::ConnectionOrigin;
use aptos_network::{
application::{
interface::{NetworkClient, NetworkServiceEvents},
storage::PeersAndMetadata,
},
peer_manager::{
ConnectionRequestSender, PeerManagerNotification, PeerManagerRequest,
PeerManagerRequestSender,
},
protocols::{
network::{NetworkEvents, NetworkSender, NewNetworkEvents, NewNetworkSender},
wire::handshake::v1::ProtocolId::MempoolDirectSend,
},
transport::ConnectionMetadata,
ProtocolId,
};
use aptos_storage_interface::mock::MockDbReaderWriter;
use aptos_types::{
on_chain_config::{InMemoryOnChainConfig, OnChainConfigPayload},
PeerId,
};
use aptos_vm_validator::mocks::mock_vm_validator::MockVMValidator;
use aptos_types::PeerId;
use enum_dispatch::enum_dispatch;
use futures::{
channel::mpsc::{self, unbounded, UnboundedReceiver},
FutureExt, StreamExt,
};
use rand::rngs::StdRng;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
};
use tokio::runtime::Runtime;
use std::collections::HashSet;

#[cfg(wrong_network_abstraction)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bchocho, any thoughts on updating or removing all these tests? Should that be done before this PR?

(My feeling is that these should be fixed/removed first, e.g., to ensure we don't accidentally miss something. Feels odd just ignoring them all.)

type MempoolNetworkHandle = (
NetworkId,
NetworkSender<MempoolSyncMsg>,
Expand All @@ -65,6 +25,7 @@ pub struct NodeId {
num: u32,
}

#[cfg(wrong_network_abstraction)]
impl NodeId {
pub(crate) fn new(node_type: NodeType, num: u32) -> Self {
NodeId { node_type, num }
Expand All @@ -75,9 +36,9 @@ impl NodeId {
/// Validators, ValidatorFullNodes, and FullNodes
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)]
pub enum NodeType {
Validator,
ValidatorFullNode,
FullNode,
// Validator,
brianolson marked this conversation as resolved.
Show resolved Hide resolved
// ValidatorFullNode,
// FullNode,
}

/// A union type for all types of simulated nodes
Expand Down Expand Up @@ -138,6 +99,7 @@ pub struct ValidatorNodeInfo {
vfn_peer_id: PeerId,
}

#[cfg(wrong_network_abstraction)]
impl ValidatorNodeInfo {
fn new(peer_id: PeerId, vfn_peer_id: PeerId) -> Self {
ValidatorNodeInfo {
Expand Down Expand Up @@ -175,6 +137,7 @@ pub struct ValidatorFullNodeInfo {
vfn_peer_id: PeerId,
}

#[cfg(wrong_network_abstraction)]
impl ValidatorFullNodeInfo {
fn new(peer_id: PeerId, vfn_peer_id: PeerId) -> Self {
ValidatorFullNodeInfo {
Expand Down Expand Up @@ -212,6 +175,7 @@ pub struct FullNodeInfo {
peer_role: PeerRole,
}

#[cfg(wrong_network_abstraction)]
impl FullNodeInfo {
fn new(peer_id: PeerId, peer_role: PeerRole) -> Self {
FullNodeInfo { peer_id, peer_role }
Expand Down Expand Up @@ -241,6 +205,7 @@ impl NodeInfoTrait for FullNodeInfo {
}

/// Provides a `NodeInfo` and `NodeConfig` for a validator
#[cfg(wrong_network_abstraction)]
pub fn validator_config(rng: &mut StdRng) -> (ValidatorNodeInfo, NodeConfig) {
let config = NodeConfig::generate_random_config_with_template(
&NodeConfig::get_default_validator_config(),
Expand All @@ -259,6 +224,7 @@ pub fn validator_config(rng: &mut StdRng) -> (ValidatorNodeInfo, NodeConfig) {
}

/// Provides a `NodeInfo` and `NodeConfig` for a ValidatorFullNode
#[cfg(wrong_network_abstraction)]
pub fn vfn_config(rng: &mut StdRng, peer_id: PeerId) -> (ValidatorFullNodeInfo, NodeConfig) {
let mut vfn_config = NodeConfig::generate_random_config_with_template(
&NodeConfig::get_default_vfn_config(),
Expand Down Expand Up @@ -292,6 +258,7 @@ pub fn vfn_config(rng: &mut StdRng, peer_id: PeerId) -> (ValidatorFullNodeInfo,
}

/// Provides a `NodeInfo` and `NodeConfig` for a public full node
#[cfg(wrong_network_abstraction)]
pub fn public_full_node_config(
rng: &mut StdRng,
peer_role: PeerRole,
Expand All @@ -311,6 +278,7 @@ pub fn public_full_node_config(
}

/// A struct representing a node, it's network interfaces, mempool, and a mempool event subscriber
#[cfg(wrong_network_abstraction)]
pub struct Node {
/// The identifying Node
node_info: NodeInfo,
Expand All @@ -327,6 +295,7 @@ pub struct Node {
}

/// Reimplement `NodeInfoTrait` for simplicity
#[cfg(wrong_network_abstraction)]
impl NodeInfoTrait for Node {
fn supported_networks(&self) -> Vec<NetworkId> {
self.node_info.supported_networks()
Expand All @@ -345,6 +314,7 @@ impl NodeInfoTrait for Node {
}
}

#[cfg(wrong_network_abstraction)]
impl Node {
/// Sets up a single node by starting up mempool and any network handles
pub fn new(node: NodeInfo, config: NodeConfig) -> Node {
Expand Down Expand Up @@ -463,14 +433,15 @@ impl Node {

/// A simplistic view of the entire network stack for a given `NetworkId`
/// Allows us to mock out the network without dealing with the details
#[cfg(wrong_network_abstraction)]
pub struct NodeNetworkInterface {
/// Peer request receiver for messages
pub(crate) network_reqs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
/// Peer notification sender for sending outgoing messages to other peers
pub(crate) network_notifs_tx:
aptos_channel::Sender<(PeerId, ProtocolId), PeerManagerNotification>,
pub(crate) network_notifs_tx: aptos_channel::Sender<(PeerId, ProtocolId), ReceivedMessage>,
}

#[cfg(wrong_network_abstraction)]
impl NodeNetworkInterface {
fn get_next_network_req(&mut self, runtime: Arc<Runtime>) -> PeerManagerRequest {
runtime.block_on(self.network_reqs_rx.next()).unwrap()
Expand All @@ -491,6 +462,7 @@ impl NodeNetworkInterface {
// Below here are static functions to help build a new `Node`

/// Sets up the network handles for a `Node`
#[cfg(wrong_network_abstraction)]
fn setup_node_network_interfaces(
node: &NodeInfo,
) -> (
Expand Down Expand Up @@ -534,6 +506,7 @@ fn setup_node_network_interfaces(
}

/// Builds a single network interface with associated queues, and attaches it to the top level network
#[cfg(wrong_network_abstraction)]
fn setup_node_network_interface(
peer_network_id: PeerNetworkId,
) -> (NodeNetworkInterface, MempoolNetworkHandle) {
Expand All @@ -560,6 +533,7 @@ fn setup_node_network_interface(
}

/// Starts up the mempool resources for a single node
#[cfg(wrong_network_abstraction)]
fn start_node_mempool(
config: NodeConfig,
network_client: NetworkClient<MempoolSyncMsg>,
Expand Down
Loading
Loading