Skip to content

Commit

Permalink
cleanup. fmt. metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Jul 9, 2024
1 parent 5c1a6e3 commit f5b929f
Show file tree
Hide file tree
Showing 16 changed files with 327 additions and 379 deletions.
70 changes: 20 additions & 50 deletions consensus/src/network_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ use aptos_network::{
PeerManagerRequestSender,
},
protocols::{
network::{NewNetworkEvents, SerializedRequest},
// rpc::InboundRpcRequest,
wire::handshake::v1::ProtocolIdSet,
network::{NewNetworkEvents, ReceivedMessage, SerializedRequest},
wire::{
handshake::v1::ProtocolIdSet,
messaging::v1::{DirectSendMsg, NetworkMessage, RpcRequest},
},
},
ProtocolId,
};
Expand All @@ -41,8 +43,6 @@ use std::{
time::Duration,
};
use tokio::runtime::Handle;
use aptos_network::protocols::network::ReceivedMessage;
use aptos_network::protocols::wire::messaging::v1::{DirectSendMsg, NetworkMessage, RpcRequest};

/// `TwinId` is used by the NetworkPlayground to uniquely identify
/// nodes, even if they have the same `AccountAddress` (e.g. for Twins)
Expand All @@ -67,11 +67,8 @@ pub struct NetworkPlayground {
/// These events will usually be handled by the event loop spawned in
/// `ConsensusNetworkImpl`.
///
node_consensus_txs: Arc<
Mutex<
HashMap<TwinId, aptos_channel::Sender<(PeerId, ProtocolId), ReceivedMessage>>,
>,
>,
node_consensus_txs:
Arc<Mutex<HashMap<TwinId, aptos_channel::Sender<(PeerId, ProtocolId), ReceivedMessage>>>>,
/// Nodes' outbound handlers forward their outbound non-rpc messages to this
/// queue.
outbound_msgs_tx: mpsc::Sender<(TwinId, PeerManagerRequest)>,
Expand Down Expand Up @@ -129,12 +126,7 @@ impl NetworkPlayground {
mut network_reqs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
mut outbound_msgs_tx: mpsc::Sender<(TwinId, PeerManagerRequest)>,
node_consensus_txs: Arc<
Mutex<
HashMap<
TwinId,
aptos_channel::Sender<(PeerId, ProtocolId), ReceivedMessage>,
>,
>,
Mutex<HashMap<TwinId, aptos_channel::Sender<(PeerId, ProtocolId), ReceivedMessage>>>,
>,
author_to_twin_ids: Arc<RwLock<AuthorToTwinIds>>,
) {
Expand Down Expand Up @@ -165,24 +157,20 @@ impl NetworkPlayground {
let node_consensus_tx =
node_consensus_txs.lock().get(dst_twin_id).unwrap().clone();

// let inbound_req = InboundRpcRequest {
// protocol_id: outbound_req.protocol_id,
// data: outbound_req.data,
// res_tx: outbound_req.res_tx,
// };

node_consensus_tx
.push(
(src_twin_id.author, ProtocolId::ConsensusRpcBcs),
// PeerManagerNotification::RecvRpc(src_twin_id.author, inbound_req),
ReceivedMessage{
message: NetworkMessage::RpcRequest(RpcRequest{
ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest {
protocol_id: outbound_req.protocol_id,
request_id: 123, // TODO: seq?
request_id: 123,
priority: 0,
raw_request: outbound_req.data.into(),
}),
sender: PeerNetworkId::new(NetworkId::Validator, src_twin_id.author),
sender: PeerNetworkId::new(
NetworkId::Validator,
src_twin_id.author,
),
rx_at: 0,
rpc_replier: Some(Arc::new(outbound_req.res_tx)),
},
Expand Down Expand Up @@ -242,7 +230,7 @@ impl NetworkPlayground {
// copy message data
let (source_address, msg, rmsg) = match &msg_notif {
PeerManagerNotification::RecvMessage(src, msg) => {
let rmsg = ReceivedMessage{
let rmsg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id: msg.protocol_id,
priority: 0,
Expand Down Expand Up @@ -291,9 +279,6 @@ impl NetworkPlayground {
PeerManagerRequest::SendDirectSend(dst_inner, msg_inner) => {
(*dst_inner, msg_inner.clone())
},
// NetworkMessage::DirectSendMsg(dmsg) => {
//
// },
msg_inner => panic!(
"[network playground] Unexpected PeerManagerRequest: {:?}",
msg_inner
Expand All @@ -308,12 +293,6 @@ impl NetworkPlayground {
if !self.is_message_dropped(&src_twin_id, dst_twin_id, consensus_msg) {
let msg_notif =
PeerManagerNotification::RecvMessage(src_twin_id.author, msg.clone());
// let msg_notif = ReceivedMessage{
// message: NetworkMessage::,
// sender: (),
// rx_at: 0,
// rpc_replier: None,
// };
let msg_copy = self
.deliver_message(src_twin_id, *dst_twin_id, msg_notif)
.await;
Expand Down Expand Up @@ -846,12 +825,8 @@ mod tests {

let peer_id = PeerId::random();
let protocol_id = ProtocolId::ConsensusDirectSendBcs;
// let bad_msg = PeerManagerNotification::RecvMessage(peer_id, Message {
// protocol_id,
// mdata: Bytes::from_static(b"\xde\xad\xbe\xef"),
// });
let bad_msg = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg{
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id,
priority: 0,
raw_msg: Bytes::from_static(b"\xde\xad\xbe\xef").into(),
Expand All @@ -871,13 +846,8 @@ mod tests {

let protocol_id = ProtocolId::ConsensusRpcJson;
let (res_tx, _res_rx) = oneshot::channel();
// let liveness_check_msg = PeerManagerNotification::RecvRpc(peer_id, InboundRpcRequest {
// protocol_id,
// data: Bytes::from(serde_json::to_vec(&liveness_check_msg).unwrap()),
// res_tx,
// });
let liveness_check_msg = ReceivedMessage{
message: NetworkMessage::RpcRequest(RpcRequest{
let liveness_check_msg = ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest {
protocol_id,
request_id: 0, // TODO: seq?
priority: 0,
Expand All @@ -888,7 +858,7 @@ mod tests {
rpc_replier: Some(Arc::new(res_tx)),
};

peer_mgr_notifs_tx
peer_mgr_notifs_tx
.push((peer_id, protocol_id), liveness_check_msg)
.unwrap();

Expand Down
52 changes: 12 additions & 40 deletions mempool/src/tests/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,20 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

// use crate::{
// core_mempool::CoreMempool,
// network::MempoolSyncMsg,
// shared_mempool::{start_shared_mempool, types::SharedMempoolNotification},
// };
// use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::{
config::{PeerRole, RoleType},
network_id::{NetworkId, PeerNetworkId},
};
// use aptos_crypto::{x25519::PrivateKey, Uniform};
// use aptos_event_notifications::{ReconfigNotification, ReconfigNotificationListener};
// use aptos_infallible::{Mutex, RwLock};
// use aptos_network::{
// application::{
// interface::{NetworkClient, NetworkServiceEvents},
// storage::PeersAndMetadata,
// },
// };
// use aptos_storage_interface::mock::MockDbReaderWriter;
use aptos_types::{
// on_chain_config::{InMemoryOnChainConfig, OnChainConfigPayload},
PeerId,
};
// use aptos_vm_validator::mocks::mock_vm_validator::MockVMValidator;
use aptos_types::PeerId;
use enum_dispatch::enum_dispatch;
// use futures::{
// channel::mpsc::{self, unbounded, UnboundedReceiver},
// };
// use rand::rngs::StdRng;
use std::{
collections::HashSet,
// sync::Arc,
};
// use tokio::runtime::Runtime;
use std::collections::HashSet;

// type MempoolNetworkHandle = (
// NetworkId,
// NetworkSender<MempoolSyncMsg>,
// NetworkEvents<MempoolSyncMsg>,
// );
#[cfg(wrong_network_abstraction)]
type MempoolNetworkHandle = (
NetworkId,
NetworkSender<MempoolSyncMsg>,
NetworkEvents<MempoolSyncMsg>,
);

/// This is a simple node identifier for testing
/// This keeps track of the `NodeType` and a simple index
Expand All @@ -52,7 +25,7 @@ pub struct NodeId {
num: u32,
}

#[cfg(unused)]
#[cfg(wrong_network_abstraction)]
impl NodeId {
pub(crate) fn new(node_type: NodeType, num: u32) -> Self {
NodeId { node_type, num }
Expand Down Expand Up @@ -126,7 +99,7 @@ pub struct ValidatorNodeInfo {
vfn_peer_id: PeerId,
}

#[cfg(unused)]
#[cfg(wrong_network_abstraction)]
impl ValidatorNodeInfo {
fn new(peer_id: PeerId, vfn_peer_id: PeerId) -> Self {
ValidatorNodeInfo {
Expand Down Expand Up @@ -232,7 +205,7 @@ impl NodeInfoTrait for FullNodeInfo {
}

/// Provides a `NodeInfo` and `NodeConfig` for a validator
#[cfg(unused)]
#[cfg(wrong_network_abstraction)]
pub fn validator_config(rng: &mut StdRng) -> (ValidatorNodeInfo, NodeConfig) {
let config = NodeConfig::generate_random_config_with_template(
&NodeConfig::get_default_validator_config(),
Expand Down Expand Up @@ -465,8 +438,7 @@ pub struct NodeNetworkInterface {
/// Peer request receiver for messages
pub(crate) network_reqs_rx: aptos_channel::Receiver<(PeerId, ProtocolId), PeerManagerRequest>,
/// Peer notification sender for sending outgoing messages to other peers
pub(crate) network_notifs_tx:
aptos_channel::Sender<(PeerId, ProtocolId), ReceivedMessage>,
pub(crate) network_notifs_tx: aptos_channel::Sender<(PeerId, ProtocolId), ReceivedMessage>,
}

#[cfg(wrong_network_abstraction)]
Expand Down
35 changes: 13 additions & 22 deletions mempool/src/tests/test_framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ use aptos_network::{
interface::{NetworkClient, NetworkServiceEvents},
storage::PeersAndMetadata,
},
peer_manager::{
ConnectionRequestSender, PeerManagerRequest,
PeerManagerRequestSender,
},
peer_manager::{ConnectionRequestSender, PeerManagerRequest, PeerManagerRequestSender},
protocols::{
network::{NetworkEvents, NetworkSender, NewNetworkEvents, NewNetworkSender},
wire::handshake::v1::ProtocolId::MempoolDirectSend,
network::{
NetworkEvents, NetworkSender, NewNetworkEvents, NewNetworkSender, ReceivedMessage,
},
wire::{
handshake::v1::ProtocolId::MempoolDirectSend,
messaging::v1::{DirectSendMsg, NetworkMessage, RpcRequest},
},
},
testutils::{
builder::TestFrameworkBuilder,
Expand All @@ -52,8 +54,6 @@ use maplit::btreemap;
use std::{collections::HashMap, hash::Hash, sync::Arc};
use tokio::{runtime::Handle, time::Duration};
use tokio_stream::StreamExt;
use aptos_network::protocols::network::ReceivedMessage;
use aptos_network::protocols::wire::messaging::v1::{DirectSendMsg, NetworkMessage, RpcRequest};

/// An individual mempool node that runs in it's own runtime.
///
Expand Down Expand Up @@ -262,12 +262,8 @@ impl MempoolNode {
let data = protocol_id.to_bytes(&msg).unwrap();
let (notif, maybe_receiver) = match protocol_id {
ProtocolId::MempoolDirectSend => (
// PeerManagerNotification::RecvMessage(remote_peer_id, Message {
// protocol_id,
// mdata: data,
// }),
ReceivedMessage{
message: NetworkMessage::DirectSendMsg(DirectSendMsg{
ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id,
priority: 0,
raw_msg: data,
Expand All @@ -280,13 +276,8 @@ impl MempoolNode {
),
ProtocolId::MempoolRpc => {
let (res_tx, res_rx) = oneshot::channel();
// let notif = PeerManagerNotification::RecvRpc(remote_peer_id, InboundRpcRequest {
// protocol_id,
// data,
// res_tx,
// });
let rmsg = ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest{
message: NetworkMessage::RpcRequest(RpcRequest {
protocol_id,
request_id: 0, // TODO: a number?
priority: 0,
Expand Down Expand Up @@ -428,8 +419,8 @@ impl MempoolNode {
// protocol_id,
// mdata: bytes.into(),
// });
let notif = ReceivedMessage{
message: NetworkMessage::DirectSendMsg(DirectSendMsg{
let notif = ReceivedMessage {
message: NetworkMessage::DirectSendMsg(DirectSendMsg {
protocol_id,
priority: 0,
raw_msg: bytes,
Expand Down
20 changes: 10 additions & 10 deletions network/framework/src/application/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@ use crate::{
storage::PeersAndMetadata,
},
peer_manager::{
ConnectionNotification, ConnectionRequestSender,
PeerManagerRequest, PeerManagerRequestSender,
ConnectionNotification, ConnectionRequestSender, PeerManagerRequest,
PeerManagerRequestSender,
},
protocols::{
network::{Event, NetworkEvents, NetworkSender, NewNetworkEvents, NewNetworkSender, ReceivedMessage},
wire::handshake::v1::{ProtocolId, ProtocolIdSet},
network::{
Event, NetworkEvents, NetworkSender, NewNetworkEvents, NewNetworkSender,
ReceivedMessage,
},
wire::{
handshake::v1::{ProtocolId, ProtocolIdSet},
messaging::v1::{DirectSendMsg, NetworkMessage, RpcRequest},
},
},
transport::ConnectionMetadata,
};
Expand All @@ -38,7 +44,6 @@ use std::{
time::Duration,
};
use tokio::{sync::mpsc::error::TryRecvError, time::timeout};
use crate::protocols::wire::messaging::v1::{DirectSendMsg, NetworkMessage, RpcRequest};

// Useful test constants for timeouts
const MAX_CHANNEL_TIMEOUT_SECS: u64 = 1;
Expand Down Expand Up @@ -1205,11 +1210,6 @@ async fn wait_for_network_event(
assert_eq!(outbound_rpc_request.timeout, message_wait_time);

// Create and return the peer manager notification
// let inbound_rpc_request = InboundRpcRequest {
// protocol_id: outbound_rpc_request.protocol_id,
// data: outbound_rpc_request.data,
// res_tx: oneshot::channel().0,
// };
let rmsg = ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest{
protocol_id: outbound_rpc_request.protocol_id,
Expand Down
18 changes: 11 additions & 7 deletions network/framework/src/peer/fuzzing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashMap;
use std::sync::Arc;
use crate::{constants, peer::Peer, protocols::wire::{
handshake::v1::{MessagingProtocolVersion, ProtocolIdSet},
messaging::v1::{MultiplexMessage, MultiplexMessageSink},
}, testutils::fake_socket::ReadOnlyTestSocketVec, transport::{Connection, ConnectionId, ConnectionMetadata}};
use crate::{
constants,
peer::Peer,
protocols::wire::{
handshake::v1::{MessagingProtocolVersion, ProtocolIdSet},
messaging::v1::{MultiplexMessage, MultiplexMessageSink},
},
testutils::fake_socket::ReadOnlyTestSocketVec,
transport::{Connection, ConnectionId, ConnectionMetadata},
};
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_config::{config::PeerRole, network_id::NetworkContext};
use aptos_memsocket::MemorySocket;
Expand All @@ -17,7 +21,7 @@ use aptos_time_service::TimeService;
use aptos_types::{network_address::NetworkAddress, PeerId};
use futures::{executor::block_on, future, io::AsyncReadExt, sink::SinkExt, stream::StreamExt};
use proptest::{arbitrary::any, collection::vec};
use std::time::Duration;
use std::{collections::HashMap, sync::Arc, time::Duration};

/// Generate a sequence of `MultiplexMessage`, bcs serialize them, and write them
/// out to a buffer using our length-prefixed message codec.
Expand Down
Loading

0 comments on commit f5b929f

Please sign in to comment.