Skip to content

Commit

Permalink
cleanup and PR refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
brianolson committed Jul 15, 2024
1 parent b135f3b commit f935228
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 42 deletions.
10 changes: 7 additions & 3 deletions mempool/src/tests/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ impl NodeId {

/// Yet another type, to determine the differences between
/// Validators, ValidatorFullNodes, and FullNodes
/// TODO: much code is currently disabled under [cfg(wrong_network_abstraction)], NodeType may or may not come back when that changes?
#[derive(Clone, Copy, Debug, Eq, Hash, Ord, PartialOrd, PartialEq)]
pub enum NodeType {
// Validator,
// ValidatorFullNode,
// FullNode,
#[allow(unused)]
Validator,
#[allow(unused)]
ValidatorFullNode,
#[allow(unused)]
FullNode,
}

/// A union type for all types of simulated nodes
Expand Down
2 changes: 1 addition & 1 deletion mempool/src/tests/test_framework.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ impl MempoolNode {
let rmsg = ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest {
protocol_id,
request_id: 0, // TODO: a number?
request_id: 0,
priority: 0,
raw_request: data,
}),
Expand Down
2 changes: 1 addition & 1 deletion network/framework/src/application/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ async fn wait_for_network_event(
let rmsg = ReceivedMessage {
message: NetworkMessage::RpcRequest(RpcRequest{
protocol_id: outbound_rpc_request.protocol_id,
request_id: 0, // TODO: rand? seq?
request_id: 0,
priority: 0,
raw_request: outbound_rpc_request.data.into(),
}),
Expand Down
3 changes: 2 additions & 1 deletion network/framework/src/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -522,8 +522,9 @@ where
}
},
NetworkMessage::RpcResponse(_) => {
// non-reference cast identical to this match case
let NetworkMessage::RpcResponse(response) = message else {
panic!()
unreachable!("NetworkMessage type changed between match and let")
};
self.outbound_rpcs.handle_inbound_response(response)
},
Expand Down
56 changes: 20 additions & 36 deletions network/framework/src/peer/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,16 +243,24 @@ fn peer_send_message() {
rt.block_on(future::join3(peer.start(), server, client));
}

fn test_upstream_handlers() -> (
Arc<HashMap<ProtocolId, aptos_channel::Sender<(PeerId, ProtocolId), ReceivedMessage>>>,
aptos_channel::Receiver<(PeerId, ProtocolId), ReceivedMessage>,
) {
let mut upstream_handlers = HashMap::new();
let (sender, receiver) = aptos_channel::new(QueueStyle::FIFO, 100, None);
upstream_handlers.insert(PROTOCOL, sender);
let upstream_handlers = Arc::new(upstream_handlers);
(upstream_handlers, receiver)
}

// Reading an inbound DirectSendMsg off the wire should notify the PeerManager of
// an inbound DirectSend.
#[test]
fn peer_recv_message() {
::aptos_logger::Logger::init_for_testing();
let rt = Runtime::new().unwrap();
let mut upstream_handlers = HashMap::new();
let (sender, mut receiver) = aptos_channel::new(QueueStyle::FIFO, 50, None);
upstream_handlers.insert(PROTOCOL, sender);
let upstream_handlers = Arc::new(upstream_handlers);
let (upstream_handlers, mut receiver) = test_upstream_handlers();
let (peer, _peer_handle, connection, _connection_notifs_rx) = build_test_peer(
rt.handle().clone(),
TimeService::mock(),
Expand Down Expand Up @@ -304,14 +312,8 @@ fn peer_recv_message() {
fn peers_send_message_concurrent() {
::aptos_logger::Logger::init_for_testing();
let rt = Runtime::new().unwrap();
let mut upstream_handlers_a = HashMap::new();
let (prot_a_tx, mut prot_a_rx) = aptos_channel::new(QueueStyle::FIFO, 100, None);
upstream_handlers_a.insert(PROTOCOL, prot_a_tx);
let upstream_handlers_a = Arc::new(upstream_handlers_a);
let mut upstream_handlers_b = HashMap::new();
let (prot_b_tx, mut prot_b_rx) = aptos_channel::new(QueueStyle::FIFO, 100, None);
upstream_handlers_b.insert(PROTOCOL, prot_b_tx);
let upstream_handlers_b = Arc::new(upstream_handlers_b);
let (upstream_handlers_a, mut prot_a_rx) = test_upstream_handlers();
let (upstream_handlers_b, mut prot_b_rx) = test_upstream_handlers();
let (
(peer_a, mut peer_handle_a, mut connection_notifs_rx_a),
(peer_b, mut peer_handle_b, mut connection_notifs_rx_b),
Expand Down Expand Up @@ -385,10 +387,7 @@ fn peers_send_message_concurrent() {
fn peer_recv_rpc() {
::aptos_logger::Logger::init_for_testing();
let rt = Runtime::new().unwrap();
let mut upstream_handlers = HashMap::new();
let (prot_tx, mut prot_rx) = aptos_channel::new(QueueStyle::FIFO, 100, None);
upstream_handlers.insert(PROTOCOL, prot_tx);
let upstream_handlers = Arc::new(upstream_handlers);
let (upstream_handlers, mut prot_rx) = test_upstream_handlers();
let (peer, _peer_handle, mut connection, _connection_notifs_rx) = build_test_peer(
rt.handle().clone(),
TimeService::mock(),
Expand Down Expand Up @@ -459,10 +458,7 @@ fn peer_recv_rpc() {
fn peer_recv_rpc_concurrent() {
::aptos_logger::Logger::init_for_testing();
let rt = Runtime::new().unwrap();
let mut upstream_handlers = HashMap::new();
let (prot_tx, mut prot_rx) = aptos_channel::new(QueueStyle::FIFO, 100, None);
upstream_handlers.insert(PROTOCOL, prot_tx);
let upstream_handlers = Arc::new(upstream_handlers);
let (upstream_handlers, mut prot_rx) = test_upstream_handlers();
let (peer, _peer_handle, mut connection, _connection_notifs_rx) = build_test_peer(
rt.handle().clone(),
TimeService::mock(),
Expand Down Expand Up @@ -529,10 +525,7 @@ fn peer_recv_rpc_timeout() {
::aptos_logger::Logger::init_for_testing();
let rt = Runtime::new().unwrap();
let mock_time = MockTimeService::new();
let mut upstream_handlers = HashMap::new();
let (prot_tx, mut prot_rx) = aptos_channel::new(QueueStyle::FIFO, 100, None);
upstream_handlers.insert(PROTOCOL, prot_tx);
let upstream_handlers = Arc::new(upstream_handlers);
let (upstream_handlers, mut prot_rx) = test_upstream_handlers();
let (peer, _peer_handle, mut connection, _connection_notifs_rx) = build_test_peer(
rt.handle().clone(),
mock_time.clone().into(),
Expand Down Expand Up @@ -589,10 +582,7 @@ fn peer_recv_rpc_timeout() {
fn peer_recv_rpc_cancel() {
::aptos_logger::Logger::init_for_testing();
let rt = Runtime::new().unwrap();
let mut upstream_handlers = HashMap::new();
let (prot_tx, mut prot_rx) = aptos_channel::new(QueueStyle::FIFO, 100, None);
upstream_handlers.insert(PROTOCOL, prot_tx);
let upstream_handlers = Arc::new(upstream_handlers);
let (upstream_handlers, mut prot_rx) = test_upstream_handlers();
let (peer, _peer_handle, mut connection, _connection_notifs_rx) = build_test_peer(
rt.handle().clone(),
TimeService::mock(),
Expand Down Expand Up @@ -973,14 +963,8 @@ fn peer_terminates_when_request_tx_has_dropped() {
fn peers_send_multiplex() {
::aptos_logger::Logger::init_for_testing();
let rt = Runtime::new().unwrap();
let mut upstream_handlers_a = HashMap::new();
let (prot_a_tx, mut prot_a_rx) = aptos_channel::new(QueueStyle::FIFO, 100, None);
upstream_handlers_a.insert(PROTOCOL, prot_a_tx);
let upstream_handlers_a = Arc::new(upstream_handlers_a);
let mut upstream_handlers_b = HashMap::new();
let (prot_b_tx, mut prot_b_rx) = aptos_channel::new(QueueStyle::FIFO, 100, None);
upstream_handlers_b.insert(PROTOCOL, prot_b_tx);
let upstream_handlers_b = Arc::new(upstream_handlers_b);
let (upstream_handlers_a, mut prot_a_rx) = test_upstream_handlers();
let (upstream_handlers_b, mut prot_b_rx) = test_upstream_handlers();
let (
(peer_a, mut peer_handle_a, mut connection_notifs_rx_a),
(peer_b, mut peer_handle_b, mut connection_notifs_rx_b),
Expand Down

0 comments on commit f935228

Please sign in to comment.