Skip to content

Commit

Permalink
fix(network): Verify edge on separate thread (#3145)
Browse files Browse the repository at this point in the history
* fix(network): Verify edge on separate thread

* Address comments from bowen

* Fix compile error
  • Loading branch information
mfornet authored and bowenwang1996 committed Aug 13, 2020
1 parent c1a88ae commit b4db392
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 98 deletions.
2 changes: 2 additions & 0 deletions chain/network/src/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,8 @@ impl StreamHandler<Result<Vec<u8>, ReasonForBan>> for Peer {
}
};

trace!(target: "network", "Received message: {}", peer_msg);

self.on_receive_message();

#[cfg(feature = "metric_recorder")]
Expand Down
201 changes: 113 additions & 88 deletions chain/network/src/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use actix::actors::resolver::{ConnectAddr, Resolver};
use actix::io::FramedWrite;
use actix::{
Actor, ActorFuture, Addr, Arbiter, AsyncContext, Context, ContextFutureSpawner, Handler,
Recipient, Running, StreamHandler, SystemService, WrapFuture,
Recipient, Running, StreamHandler, SyncArbiter, SyncContext, SystemService, WrapFuture,
};
use chrono::Utc;
use futures::task::Poll;
Expand Down Expand Up @@ -41,8 +41,8 @@ use crate::types::{
RoutedMessageFrom, SendMessage, SyncData, Unregister,
};
use crate::types::{
KnownPeerState, NetworkClientMessages, NetworkConfig, NetworkRequests, NetworkResponses,
PeerInfo,
EdgeList, KnownPeerState, NetworkClientMessages, NetworkConfig, NetworkRequests,
NetworkResponses, PeerInfo,
};
use metrics::NetworkMetrics;

Expand Down Expand Up @@ -84,6 +84,20 @@ struct ActivePeer {
peer_type: PeerType,
}

struct EdgeVerifier {}

impl Actor for EdgeVerifier {
type Context = SyncContext<Self>;
}

impl Handler<EdgeList> for EdgeVerifier {
type Result = bool;

fn handle(&mut self, msg: EdgeList, _ctx: &mut Self::Context) -> Self::Result {
msg.0.iter().all(|edge| edge.verify())
}
}

/// Actor that manages peers connections.
pub struct PeerManagerActor {
/// Networking configuration.
Expand Down Expand Up @@ -111,6 +125,7 @@ pub struct PeerManagerActor {
/// Store all collected metrics from a node.
#[cfg(feature = "metric_recorder")]
metric_recorder: MetricRecorder,
edge_verifier_pool: Addr<EdgeVerifier>,
}

impl PeerManagerActor {
Expand All @@ -124,6 +139,8 @@ impl PeerManagerActor {
debug!(target: "network", "Found known peers: {} (boot nodes={})", peer_store.len(), config.boot_nodes.len());
debug!(target: "network", "Blacklist: {:?}", config.blacklist);

let edge_verifier_pool = SyncArbiter::start(4, || EdgeVerifier {});

let me: PeerId = config.public_key.clone().into();
let routing_table = RoutingTable::new(me.clone(), store);

Expand All @@ -142,6 +159,7 @@ impl PeerManagerActor {
monitor_peers_attempts: 0,
pending_update_nonce_request: HashMap::new(),
network_metrics: NetworkMetrics::new(),
edge_verifier_pool,
#[cfg(feature = "metric_recorder")]
metric_recorder,
})
Expand Down Expand Up @@ -1212,104 +1230,109 @@ impl Handler<NetworkRequests> for PeerManagerActor {
// Process edges and add new edges to the routing table. Also broadcast new edges.
let SyncData { edges, accounts } = sync_data;

if !edges.iter().all(|edge| edge.verify()) {
return NetworkResponses::BanPeer(ReasonForBan::InvalidEdge);
}

// Filter known accounts before validating them.
let new_accounts = accounts
.into_iter()
.filter_map(|announce_account| {
if let Some(current_announce_account) =
self.routing_table.get_announce(&announce_account.account_id)
{
if announce_account.epoch_id == current_announce_account.epoch_id {
None
} else {
Some((announce_account, Some(current_announce_account.epoch_id)))
}
} else {
Some((announce_account, None))
}
})
.collect();

// Ask client to validate accounts before accepting them.
self.view_client_addr
.send(NetworkViewClientMessages::AnnounceAccount(new_accounts))
self.edge_verifier_pool.send(EdgeList(edges.clone()))
.into_actor(self)
.then(move |response, act, ctx| {
match response {
Ok(NetworkViewClientResponses::Ban { ban_reason }) => {
if let Some(active_peer) = act.active_peers.get(&peer_id) {
active_peer.addr.do_send(PeerManagerRequest::BanPeer(ban_reason));
}
}
Ok(NetworkViewClientResponses::AnnounceAccount(accounts)) => {
// Filter known edges.
let me = act.peer_id.clone();

let new_edges: Vec<_> = edges
Ok(false) => act.try_ban_peer(ctx, &peer_id, ReasonForBan::InvalidEdge),
Ok(true) => {
// Filter known accounts before validating them.
let new_accounts = accounts
.into_iter()
.filter( |edge| {
if let Some(cur_edge) = act.routing_table.get_edge(edge.peer0.clone(), edge.peer1.clone()){
if cur_edge.nonce >= edge.nonce {
// We have newer update. Drop this.
return false;
}
}
// Add new edge update to the routing table.
act.process_edge(ctx,edge.clone());
if let Some(other) = edge.other(&me) {
// We belong to this edge.
return if act.active_peers.contains_key(&other) {
// This is an active connection.
match edge.edge_type() {
EdgeType::Added => true,
EdgeType::Removed => {
// Try to update the nonce, and in case it fails removes the peer.
act.try_update_nonce(ctx, edge.clone(), other);
false
}
}
.filter_map(|announce_account| {
if let Some(current_announce_account) =
act.routing_table.get_announce(&announce_account.account_id)
{
if announce_account.epoch_id == current_announce_account.epoch_id {
None
} else {
match edge.edge_type() {
EdgeType::Added => {
act.wait_peer_or_remove(ctx, edge.clone());
false
}
EdgeType::Removed => true
}
};
Some((announce_account, Some(current_announce_account.epoch_id)))
}
} else {

true
Some((announce_account, None))
}

})
.collect();

// Add accounts to the routing table.
if !accounts.is_empty() {
debug!(target: "network", "{:?} Received new accounts: {:?}", act.config.account_id, accounts);
}
for account in accounts.iter() {
act.routing_table.add_account(account.clone());
}
// Ask client to validate accounts before accepting them.
act.view_client_addr
.send(NetworkViewClientMessages::AnnounceAccount(new_accounts))
.into_actor(act)
.then(move |response, act, ctx| {
match response {
Ok(NetworkViewClientResponses::Ban { ban_reason }) => {
act.try_ban_peer(ctx, &peer_id, ban_reason);
}
Ok(NetworkViewClientResponses::AnnounceAccount(accounts)) => {
// Filter known edges.
let me = act.peer_id.clone();

let new_edges: Vec<_> = edges
.into_iter()
.filter( |edge| {
if let Some(cur_edge) = act.routing_table.get_edge(edge.peer0.clone(), edge.peer1.clone()){
if cur_edge.nonce >= edge.nonce {
// We have newer update. Drop this.
return false;
}
}
// Add new edge update to the routing table.
act.process_edge(ctx,edge.clone());
if let Some(other) = edge.other(&me) {
// We belong to this edge.
if act.active_peers.contains_key(&other) {
// This is an active connection.
match edge.edge_type() {
EdgeType::Added => true,
EdgeType::Removed => {
// Try to update the nonce, and in case it fails removes the peer.
act.try_update_nonce(ctx, edge.clone(), other);
false
}
}
} else {
match edge.edge_type() {
EdgeType::Added => {
act.wait_peer_or_remove(ctx, edge.clone());
false
}
EdgeType::Removed => true
}
}
} else {
true
}

let new_data = SyncData { edges: new_edges, accounts };
})
.collect();

if !new_data.is_empty() {
act.broadcast_message(
ctx,
SendMessage { message: PeerMessage::RoutingTableSync(new_data) },
)
};
}
_ => {
debug!(target: "network", "Received invalid account confirmation from client.");
// Add accounts to the routing table.
if !accounts.is_empty() {
debug!(target: "network", "{:?} Received new accounts: {:?}", act.config.account_id, accounts);
}
for account in accounts.iter() {
act.routing_table.add_account(account.clone());
}

let new_data = SyncData { edges: new_edges, accounts };

if !new_data.is_empty() {
act.broadcast_message(
ctx,
SendMessage { message: PeerMessage::RoutingTableSync(new_data) },
)
};
}
_ => {
debug!(target: "network", "Received invalid account confirmation from client.");
}
}
actix::fut::ready(())
})
.spawn(ctx);
}
Err(err) => warn!(target: "network", "error validating edges: {}", err),
}
}
actix::fut::ready(())
})
.spawn(ctx);
Expand Down Expand Up @@ -1628,6 +1651,8 @@ impl Handler<PeerRequest> for PeerManagerActor {
impl Handler<PeerMessageMetadata> for PeerManagerActor {
type Result = ();
fn handle(&mut self, msg: PeerMessageMetadata, _ctx: &mut Self::Context) -> Self::Result {
#[cfg(feature = "delay_detector")]
let _d = DelayDetector::new("peer message metadata".into());
self.metric_recorder.handle_peer_message(msg);
}
}
4 changes: 4 additions & 0 deletions chain/network/src/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use crate::{
types::{PeerIdOrHash, Ping, Pong},
utils::cache_to_hashmap,
};
#[cfg(feature = "delay_detector")]
use delay_detector::DelayDetector;

const ANNOUNCE_ACCOUNT_CACHE_SIZE: usize = 10_000;
const ROUTE_BACK_CACHE_SIZE: u64 = 1_000_000;
Expand Down Expand Up @@ -671,6 +673,8 @@ impl RoutingTable {

/// Recalculate routing table.
pub fn update(&mut self) {
#[cfg(feature = "delay_detector")]
let _d = DelayDetector::new("routing table update".into());
let _routing_table_recalculation =
near_metrics::start_timer(&metrics::ROUTING_TABLE_RECALCULATION_HISTOGRAM);

Expand Down
6 changes: 6 additions & 0 deletions chain/network/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -923,6 +923,12 @@ pub enum PeerManagerRequest {
UnregisterPeer,
}

pub struct EdgeList(pub Vec<Edge>);

impl Message for EdgeList {
type Result = bool;
}

/// Combines peer address info, chain and edge information.
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct FullPeerInfo {
Expand Down
Loading

0 comments on commit b4db392

Please sign in to comment.