From b4db39243eac9ebc1caab0e652a7f86f9916523f Mon Sep 17 00:00:00 2001 From: Marcelo Fornet Date: Wed, 12 Aug 2020 13:50:33 -0400 Subject: [PATCH] fix(network): Verify edge on separate thread (#3145) * fix(network): Verify edge on separate thread * Address comments from bowen * Fix compile error --- chain/network/src/peer.rs | 2 + chain/network/src/peer_manager.rs | 201 ++++++++++-------- chain/network/src/routing.rs | 4 + chain/network/src/types.rs | 6 + pytest/lib/peer.py | 79 ++++++- pytest/tests/spec/network/handshake.py | 3 +- pytest/tests/stress/saturate_routing_table.py | 113 ++++++++++ 7 files changed, 310 insertions(+), 98 deletions(-) create mode 100644 pytest/tests/stress/saturate_routing_table.py diff --git a/chain/network/src/peer.rs b/chain/network/src/peer.rs index 8f7957ecb88..44a99e3b4c6 100644 --- a/chain/network/src/peer.rs +++ b/chain/network/src/peer.rs @@ -609,6 +609,8 @@ impl StreamHandler, ReasonForBan>> for Peer { } }; + trace!(target: "network", "Received message: {}", peer_msg); + self.on_receive_message(); #[cfg(feature = "metric_recorder")] diff --git a/chain/network/src/peer_manager.rs b/chain/network/src/peer_manager.rs index 44fcfd2edd0..728fbfb5af8 100644 --- a/chain/network/src/peer_manager.rs +++ b/chain/network/src/peer_manager.rs @@ -10,7 +10,7 @@ use actix::actors::resolver::{ConnectAddr, Resolver}; use actix::io::FramedWrite; use actix::{ Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, ContextFutureSpawner, Handler, - Recipient, Running, StreamHandler, SystemService, WrapFuture, + Recipient, Running, StreamHandler, SyncArbiter, SyncContext, SystemService, WrapFuture, }; use chrono::Utc; use futures::task::Poll; @@ -41,8 +41,8 @@ use crate::types::{ RoutedMessageFrom, SendMessage, SyncData, Unregister, }; use crate::types::{ - KnownPeerState, NetworkClientMessages, NetworkConfig, NetworkRequests, NetworkResponses, - PeerInfo, + EdgeList, KnownPeerState, NetworkClientMessages, NetworkConfig, NetworkRequests, + NetworkResponses, PeerInfo, }; use metrics::NetworkMetrics; @@ -84,6 +84,20 @@ struct ActivePeer { peer_type: PeerType, } +struct EdgeVerifier {} + +impl Actor for EdgeVerifier { + type Context = SyncContext; +} + +impl Handler for EdgeVerifier { + type Result = bool; + + fn handle(&mut self, msg: EdgeList, _ctx: &mut Self::Context) -> Self::Result { + msg.0.iter().all(|edge| edge.verify()) + } +} + /// Actor that manages peers connections. pub struct PeerManagerActor { /// Networking configuration. @@ -111,6 +125,7 @@ pub struct PeerManagerActor { /// Store all collected metrics from a node. #[cfg(feature = "metric_recorder")] metric_recorder: MetricRecorder, + edge_verifier_pool: Addr, } impl PeerManagerActor { @@ -124,6 +139,8 @@ impl PeerManagerActor { debug!(target: "network", "Found known peers: {} (boot nodes={})", peer_store.len(), config.boot_nodes.len()); debug!(target: "network", "Blacklist: {:?}", config.blacklist); + let edge_verifier_pool = SyncArbiter::start(4, || EdgeVerifier {}); + let me: PeerId = config.public_key.clone().into(); let routing_table = RoutingTable::new(me.clone(), store); @@ -142,6 +159,7 @@ impl PeerManagerActor { monitor_peers_attempts: 0, pending_update_nonce_request: HashMap::new(), network_metrics: NetworkMetrics::new(), + edge_verifier_pool, #[cfg(feature = "metric_recorder")] metric_recorder, }) @@ -1212,104 +1230,109 @@ impl Handler for PeerManagerActor { // Process edges and add new edges to the routing table. Also broadcast new edges. let SyncData { edges, accounts } = sync_data; - if !edges.iter().all(|edge| edge.verify()) { - return NetworkResponses::BanPeer(ReasonForBan::InvalidEdge); - } - - // Filter known accounts before validating them. - let new_accounts = accounts - .into_iter() - .filter_map(|announce_account| { - if let Some(current_announce_account) = - self.routing_table.get_announce(&announce_account.account_id) - { - if announce_account.epoch_id == current_announce_account.epoch_id { - None - } else { - Some((announce_account, Some(current_announce_account.epoch_id))) - } - } else { - Some((announce_account, None)) - } - }) - .collect(); - - // Ask client to validate accounts before accepting them. - self.view_client_addr - .send(NetworkViewClientMessages::AnnounceAccount(new_accounts)) + self.edge_verifier_pool.send(EdgeList(edges.clone())) .into_actor(self) .then(move |response, act, ctx| { match response { - Ok(NetworkViewClientResponses::Ban { ban_reason }) => { - if let Some(active_peer) = act.active_peers.get(&peer_id) { - active_peer.addr.do_send(PeerManagerRequest::BanPeer(ban_reason)); - } - } - Ok(NetworkViewClientResponses::AnnounceAccount(accounts)) => { - // Filter known edges. - let me = act.peer_id.clone(); - - let new_edges: Vec<_> = edges + Ok(false) => act.try_ban_peer(ctx, &peer_id, ReasonForBan::InvalidEdge), + Ok(true) => { + // Filter known accounts before validating them. + let new_accounts = accounts .into_iter() - .filter( |edge| { - if let Some(cur_edge) = act.routing_table.get_edge(edge.peer0.clone(), edge.peer1.clone()){ - if cur_edge.nonce >= edge.nonce { - // We have newer update. Drop this. - return false; - } - } - // Add new edge update to the routing table. - act.process_edge(ctx,edge.clone()); - if let Some(other) = edge.other(&me) { - // We belong to this edge. - return if act.active_peers.contains_key(&other) { - // This is an active connection. - match edge.edge_type() { - EdgeType::Added => true, - EdgeType::Removed => { - // Try to update the nonce, and in case it fails removes the peer. - act.try_update_nonce(ctx, edge.clone(), other); - false - } - } + .filter_map(|announce_account| { + if let Some(current_announce_account) = + act.routing_table.get_announce(&announce_account.account_id) + { + if announce_account.epoch_id == current_announce_account.epoch_id { + None } else { - match edge.edge_type() { - EdgeType::Added => { - act.wait_peer_or_remove(ctx, edge.clone()); - false - } - EdgeType::Removed => true - } - }; + Some((announce_account, Some(current_announce_account.epoch_id))) + } } else { - - true + Some((announce_account, None)) } - }) .collect(); - // Add accounts to the routing table. - if !accounts.is_empty() { - debug!(target: "network", "{:?} Received new accounts: {:?}", act.config.account_id, accounts); - } - for account in accounts.iter() { - act.routing_table.add_account(account.clone()); - } + // Ask client to validate accounts before accepting them. + act.view_client_addr + .send(NetworkViewClientMessages::AnnounceAccount(new_accounts)) + .into_actor(act) + .then(move |response, act, ctx| { + match response { + Ok(NetworkViewClientResponses::Ban { ban_reason }) => { + act.try_ban_peer(ctx, &peer_id, ban_reason); + } + Ok(NetworkViewClientResponses::AnnounceAccount(accounts)) => { + // Filter known edges. + let me = act.peer_id.clone(); + + let new_edges: Vec<_> = edges + .into_iter() + .filter( |edge| { + if let Some(cur_edge) = act.routing_table.get_edge(edge.peer0.clone(), edge.peer1.clone()){ + if cur_edge.nonce >= edge.nonce { + // We have newer update. Drop this. + return false; + } + } + // Add new edge update to the routing table. + act.process_edge(ctx,edge.clone()); + if let Some(other) = edge.other(&me) { + // We belong to this edge. + if act.active_peers.contains_key(&other) { + // This is an active connection. + match edge.edge_type() { + EdgeType::Added => true, + EdgeType::Removed => { + // Try to update the nonce, and in case it fails removes the peer. + act.try_update_nonce(ctx, edge.clone(), other); + false + } + } + } else { + match edge.edge_type() { + EdgeType::Added => { + act.wait_peer_or_remove(ctx, edge.clone()); + false + } + EdgeType::Removed => true + } + } + } else { + true + } - let new_data = SyncData { edges: new_edges, accounts }; + }) + .collect(); - if !new_data.is_empty() { - act.broadcast_message( - ctx, - SendMessage { message: PeerMessage::RoutingTableSync(new_data) }, - ) - }; - } - _ => { - debug!(target: "network", "Received invalid account confirmation from client."); + // Add accounts to the routing table. + if !accounts.is_empty() { + debug!(target: "network", "{:?} Received new accounts: {:?}", act.config.account_id, accounts); + } + for account in accounts.iter() { + act.routing_table.add_account(account.clone()); + } + + let new_data = SyncData { edges: new_edges, accounts }; + + if !new_data.is_empty() { + act.broadcast_message( + ctx, + SendMessage { message: PeerMessage::RoutingTableSync(new_data) }, + ) + }; + } + _ => { + debug!(target: "network", "Received invalid account confirmation from client."); + } + } + actix::fut::ready(()) + }) + .spawn(ctx); + } + Err(err) => warn!(target: "network", "error validating edges: {}", err), } - } actix::fut::ready(()) }) .spawn(ctx); @@ -1628,6 +1651,8 @@ impl Handler for PeerManagerActor { impl Handler for PeerManagerActor { type Result = (); fn handle(&mut self, msg: PeerMessageMetadata, _ctx: &mut Self::Context) -> Self::Result { + #[cfg(feature = "delay_detector")] + let _d = DelayDetector::new("peer message metadata".into()); self.metric_recorder.handle_peer_message(msg); } } diff --git a/chain/network/src/routing.rs b/chain/network/src/routing.rs index f855e411920..8c633318b4f 100644 --- a/chain/network/src/routing.rs +++ b/chain/network/src/routing.rs @@ -28,6 +28,8 @@ use crate::{ types::{PeerIdOrHash, Ping, Pong}, utils::cache_to_hashmap, }; +#[cfg(feature = "delay_detector")] +use delay_detector::DelayDetector; const ANNOUNCE_ACCOUNT_CACHE_SIZE: usize = 10_000; const ROUTE_BACK_CACHE_SIZE: u64 = 1_000_000; @@ -671,6 +673,8 @@ impl RoutingTable { /// Recalculate routing table. pub fn update(&mut self) { + #[cfg(feature = "delay_detector")] + let _d = DelayDetector::new("routing table update".into()); let _routing_table_recalculation = near_metrics::start_timer(&metrics::ROUTING_TABLE_RECALCULATION_HISTOGRAM); diff --git a/chain/network/src/types.rs b/chain/network/src/types.rs index dfcfa3563b9..6cff11b7a02 100644 --- a/chain/network/src/types.rs +++ b/chain/network/src/types.rs @@ -923,6 +923,12 @@ pub enum PeerManagerRequest { UnregisterPeer, } +pub struct EdgeList(pub Vec); + +impl Message for EdgeList { + type Result = bool; +} + /// Combines peer address info, chain and edge information. #[derive(Debug, Clone, Eq, PartialEq)] pub struct FullPeerInfo { diff --git a/pytest/lib/peer.py b/pytest/lib/peer.py index f8ae22f15a3..fbf880175c0 100644 --- a/pytest/lib/peer.py +++ b/pytest/lib/peer.py @@ -1,4 +1,5 @@ import asyncio +import concurrent import hashlib import struct @@ -6,7 +7,7 @@ from messages import schema from messages.crypto import PublicKey, Signature from messages.network import (EdgeInfo, GenesisId, Handshake, PeerChainInfo, - PeerMessage) + PeerMessage, RoutedMessage, PeerIdOrHash) from serializer import BinarySerializer from nacl.signing import SigningKey from typing import Optional @@ -15,9 +16,12 @@ class Connection: - def __init__(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter): + + def __init__(self, reader: asyncio.StreamReader, + writer: asyncio.StreamWriter): self.reader = reader self.writer = writer + self.is_closed = False async def send(self, message): raw_message = BinarySerializer(schema).serialize(message) @@ -29,20 +33,39 @@ async def send_raw(self, raw_message): self.writer.write(raw_message) await self.writer.drain() + # returns None on timeout async def recv(self, expected=None): while True: response_raw = await self.recv_raw() + + # Connection was closed on the other side + if response_raw is None: + return None + response = BinarySerializer(schema).deserialize( response_raw, PeerMessage) - if expected is None or response.enum == expected: + if expected is None or response.enum == expected or ( + callable(expected) and expected(response)): return response async def recv_raw(self): length = await self.reader.read(4) - length = struct.unpack('I', length)[0] - response = await self.reader.read(length) - return response + + if len(length) == 0: + self.is_closed = True + return None + else: + length = struct.unpack('I', length)[0] + response = b'' + + while len(response) < length: + response += await self.reader.read(length - len(response)) + if len(response) < length: + print(f"Downloading message {len(response)}/{length}") + + return response + async def close(self): self.writer.close() @@ -63,7 +86,10 @@ async def connect(addr) -> Connection: return conn -def create_handshake(my_key_pair_nacl, their_pk_serialized, listen_port, version=0): +def create_handshake(my_key_pair_nacl, + their_pk_serialized, + listen_port, + version=0): """ Create handshake message but with placeholders in: - version @@ -126,7 +152,10 @@ def sign_handshake(my_key_pair_nacl, handshake): hashlib.sha256(arr).digest()).signature -async def run_handshake(conn: Connection, target_public_key: PublicKey, key_pair: SigningKey, listen_port=12345): +async def run_handshake(conn: Connection, + target_public_key: PublicKey, + key_pair: SigningKey, + listen_port=12345): handshake = create_handshake(key_pair, target_public_key, listen_port) sign_handshake(key_pair, handshake.Handshake) @@ -143,9 +172,41 @@ async def run_handshake(conn: Connection, target_public_key: PublicKey, key_pair if response.enum == 'HandshakeFailure' and response.HandshakeFailure[1].enum == 'ProtocolVersionMismatch': pvm = response.HandshakeFailure[1].ProtocolVersionMismatch + print(pvm) handshake.Handshake.version = pvm sign_handshake(key_pair, handshake.Handshake) await conn.send(handshake) response = await conn.recv() - assert response.enum == 'Handshake', response.enum + assert response.enum == 'Handshake', response.enum if response.enum != 'HandshakeFailure' else response.HandshakeFailure[1].enum + + +def create_and_sign_routed_peer_message(routed_msg_body, target_node, + my_key_pair_nacl): + routed_msg = RoutedMessage() + routed_msg.target = PeerIdOrHash() + routed_msg.target.enum = 'PeerId' + routed_msg.target.PeerId = PublicKey() + routed_msg.target.PeerId.keyType = 0 + routed_msg.target.PeerId.data = base58.b58decode( + target_node.node_key.pk[len(ED_PREFIX):]) + routed_msg.author = PublicKey() + routed_msg.author.keyType = 0 + routed_msg.author.data = bytes(my_key_pair_nacl.verify_key) + routed_msg.ttl = 100 + routed_msg.body = routed_msg_body + routed_msg.signature = Signature() + routed_msg.signature.keyType = 0 + + routed_msg_arr = bytes( + bytearray([0, 0]) + routed_msg.target.PeerId.data + bytearray([0]) + + routed_msg.author.data + + BinarySerializer(schema).serialize(routed_msg.body)) + routed_msg_hash = hashlib.sha256(routed_msg_arr).digest() + routed_msg.signature.data = my_key_pair_nacl.sign(routed_msg_hash).signature + + peer_message = PeerMessage() + peer_message.enum = 'Routed' + peer_message.Routed = routed_msg + + return peer_message diff --git a/pytest/tests/spec/network/handshake.py b/pytest/tests/spec/network/handshake.py index 6076ca0eea5..78e4619d7a3 100644 --- a/pytest/tests/spec/network/handshake.py +++ b/pytest/tests/spec/network/handshake.py @@ -66,8 +66,9 @@ async def main(): assert response.Handshake.target_peer_id.keyType == 0 assert response.Handshake.target_peer_id.data == bytes( my_key_pair_nacl.verify_key) - assert response.Handshake.version == handshake.Handshake.version assert response.Handshake.listen_port == nodes[0].addr()[1] + # TODO(#3016): Bring this assert back + # assert response.Handshake.version == handshake.Handshake.version asyncio.run(main()) diff --git a/pytest/tests/stress/saturate_routing_table.py b/pytest/tests/stress/saturate_routing_table.py new file mode 100644 index 00000000000..b1da594b748 --- /dev/null +++ b/pytest/tests/stress/saturate_routing_table.py @@ -0,0 +1,113 @@ +""" +Saturate routing table with edges. + +Spin a node and connect to it with the python client. Fake several identities in the network and send +edges to the node. Measure the impact of these messages in the node with respect to the time the PeerManagerActor +is blocked. +""" +import asyncio +import socket +import sys +import time +import struct +import hashlib + +sys.path.append('lib') + +import nacl.signing +from cluster import start_cluster +from peer import ED_PREFIX, connect, run_handshake, create_peer_request, Connection +from utils import obj_to_string +from messages import schema +from messages.network import Edge, SyncData, PeerMessage +from messages.crypto import PublicKey, Signature +from random import randint, seed +import base58 + +seed(0) + +def key_seed(): + return bytes([randint(0, 255) for _ in range(32)]) + + +async def consume(conn: Connection): + while True: + message = await conn.recv() + + +def create_sync_data(accounts=[], edges=[]): + sync_data = SyncData() + sync_data.accounts = accounts + sync_data.edges = edges + + peer_message = PeerMessage() + peer_message.enum = 'Sync' + peer_message.Sync = sync_data + return peer_message + + +def create_edge(key0, key1, nonce): + # TODO: Create removal edge + if bytes(key1.verify_key) < bytes(key0.verify_key): + key0, key1 = key1, key0 + + edge = Edge() + edge.peer0 = PublicKey() + edge.peer0.keyType = 0 + edge.peer0.data = bytes(key0.verify_key) + + edge.peer1 = PublicKey() + edge.peer1.keyType = 0 + edge.peer1.data = bytes(key1.verify_key) + + edge.nonce = nonce + + val = bytes([0]) + bytes(edge.peer0.data) + bytes([0]) + bytes(edge.peer1.data) + struct.pack('Q', nonce) + hsh = hashlib.sha256(val).digest() + enc58 = base58.b58encode(hsh) + + edge.signature0 = Signature() + edge.signature0.keyType = 0 + edge.signature0.data = key0.sign(hashlib.sha256(val).digest()).signature + + edge.signature1 = Signature() + edge.signature1.keyType = 0 + edge.signature1.data = key1.sign(hashlib.sha256(val).digest()).signature + + edge.removal_info = None + + return edge + + +async def main(): + key_pair_0 = nacl.signing.SigningKey(key_seed()) + conn = await connect(nodes[0].addr()) + await run_handshake(conn, nodes[0].node_key.pk, key_pair_0, listen_port=12345) + + num_nodes = 100 + + def create_update(): + key_pairs = [key_pair_0] + [nacl.signing.SigningKey(key_seed()) for _ in range(num_nodes - 1)] + nonces = [[1] * num_nodes for _ in range(num_nodes)] + + edges = [] + for i in range(num_nodes): + for j in range(i): + edge = create_edge(key_pairs[i], key_pairs[j], nonces[i][j]) + edges.append(edge) + return create_sync_data(edges=edges) + + asyncio.get_event_loop().create_task(consume(conn)) + + while True: + num_nodes = randint(2, 100) + + update = create_update() + print("Sending update...") + await conn.send(update) + print("Sent...") + await asyncio.sleep(2) + + +nodes = start_cluster(1, 0, 4, None, [], {}) +asyncio.run(main())