From 31cdf4ff6465dcf4be743341d092616637b60f06 Mon Sep 17 00:00:00 2001 From: "Roman S. Borschel" Date: Fri, 3 Apr 2020 16:37:11 +0200 Subject: [PATCH] Finishing touches. --- client/network/src/debug_info.rs | 30 ++- client/network/src/lib.rs | 9 + .../src/protocol/generic_proto/behaviour.rs | 6 +- .../src/protocol/light_client_handler.rs | 213 +++++++++++------- client/network/src/service.rs | 3 +- 5 files changed, 160 insertions(+), 101 deletions(-) diff --git a/client/network/src/debug_info.rs b/client/network/src/debug_info.rs index ebd47df94049a..e2803cde35a77 100644 --- a/client/network/src/debug_info.rs +++ b/client/network/src/debug_info.rs @@ -57,14 +57,27 @@ struct NodeInfo { /// When we will remove the entry about this node from the list, or `None` if we're connected /// to the node. info_expire: Option, - /// How we're connected to the node. - endpoints: SmallVec<[ConnectedPoint; 2]>, + /// Non-empty list of connected endpoints, one per connection. + endpoints: SmallVec<[ConnectedPoint; crate::MAX_CONNECTIONS_PER_PEER]>, /// Version reported by the remote, or `None` if unknown. client_version: Option, /// Latest ping time with this node. latest_ping: Option, } +impl NodeInfo { + fn new(endpoint: ConnectedPoint) -> Self { + let mut endpoints = SmallVec::new(); + endpoints.push(endpoint); + NodeInfo { + info_expire: None, + endpoints, + client_version: None, + latest_ping: None, + } + } +} + impl DebugInfoBehaviour { /// Builds a new `DebugInfoBehaviour`. pub fn new( @@ -122,9 +135,9 @@ impl DebugInfoBehaviour { pub struct Node<'a>(&'a NodeInfo); impl<'a> Node<'a> { - /// Returns the endpoint we are connected to or were last connected to. + /// Returns the endpoint of an established connection to the peer. pub fn endpoint(&self) -> &'a ConnectedPoint { - &self.0.endpoints[0] // TODO: Multiple? + &self.0.endpoints[0] // `endpoints` are non-empty by definition } /// Returns the latest version information we know of. @@ -179,14 +192,7 @@ impl NetworkBehaviour for DebugInfoBehaviour { self.identify.inject_connection_established(peer_id, conn, endpoint); match self.nodes_info.entry(peer_id.clone()) { Entry::Vacant(e) => { - let mut endpoints = SmallVec::new(); - endpoints.push(endpoint.clone()); - e.insert(NodeInfo { - info_expire: None, - endpoints, - client_version: None, - latest_ping: None, - }); + e.insert(NodeInfo::new(endpoint.clone())); } Entry::Occupied(e) => { let e = e.into_mut(); diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index bb58f8c7bf0bc..447dbca40887b 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -255,3 +255,12 @@ pub use libp2p::{Multiaddr, PeerId}; pub use libp2p::multiaddr; pub use sc_peerset::ReputationChange; + +/// The maximum allowed number of established connections per peer. +/// +/// Typically, and by design of the network behaviours in this crate, +/// there is a single established connection per peer. However, to +/// avoid unnecessary and nondeterministic connection closure in +/// case of (possibly repeated) simultaneous dialing attempts between +/// two peers, the per-peer connection limit is not set to 1 but 2. +const MAX_CONNECTIONS_PER_PEER: usize = 2; diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index 4fc34ff9bfdab..cc61e65eeeb81 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -167,7 +167,7 @@ enum PeerState { /// We may still have ongoing traffic with that peer, but it should cease shortly. Disabled { /// The connections that are currently open for custom protocol traffic. - open: SmallVec<[ConnectionId; 2]>, + open: SmallVec<[ConnectionId; crate::MAX_CONNECTIONS_PER_PEER]>, /// If `Some`, any dial attempts to this peer are delayed until the given `Instant`. banned_until: Option, }, @@ -177,7 +177,7 @@ enum PeerState { /// but should get disconnected in a few seconds. DisabledPendingEnable { /// The connections that are currently open for custom protocol traffic. - open: SmallVec<[ConnectionId; 2]>, + open: SmallVec<[ConnectionId; crate::MAX_CONNECTIONS_PER_PEER]>, /// When to enable this remote. timer: futures_timer::Delay, /// When the `timer` will trigger. @@ -188,7 +188,7 @@ enum PeerState { /// enabled state. Enabled { /// The connections that are currently open for custom protocol traffic. - open: SmallVec<[ConnectionId; 2]>, + open: SmallVec<[ConnectionId; crate::MAX_CONNECTIONS_PER_PEER]>, }, /// We received an incoming connection from this peer and forwarded that diff --git a/client/network/src/protocol/light_client_handler.rs b/client/network/src/protocol/light_client_handler.rs index d32ee3b66d8a2..d80677e7714b5 100644 --- a/client/network/src/protocol/light_client_handler.rs +++ b/client/network/src/protocol/light_client_handler.rs @@ -241,14 +241,16 @@ struct RequestWrapper { retries: usize, /// The actual request. request: Request, - /// Peer information, e.g. `PeerId`. - peer: P + /// The peer to send the request to, e.g. `PeerId`. + peer: P, + /// The connection to use for sending the request. + connection: Option, } /// Information we have about some peer. #[derive(Debug)] struct PeerInfo { - addresses: SmallVec<[Multiaddr; 2]>, + connections: SmallVec<[(ConnectionId, Multiaddr); crate::MAX_CONNECTIONS_PER_PEER]>, best_block: Option>, status: PeerStatus, } @@ -256,20 +258,22 @@ struct PeerInfo { impl Default for PeerInfo { fn default() -> Self { PeerInfo { - addresses: SmallVec::new(), + connections: SmallVec::new(), best_block: None, status: PeerStatus::Idle, } } } +type RequestId = u64; + /// A peer is either idle or busy processing a request from us. #[derive(Debug, Clone, PartialEq, Eq)] enum PeerStatus { /// The peer is available. Idle, /// We wait for the peer to return us a response for the given request ID. - BusyWith(u64), + BusyWith(RequestId), } /// The light client handler behaviour. @@ -287,9 +291,9 @@ pub struct LightClientHandler { /// Pending (local) requests. pending_requests: VecDeque>, /// Requests on their way to remote peers. - outstanding: IntMap>, + outstanding: IntMap>, /// (Local) Request ID counter - next_request_id: u64, + next_request_id: RequestId, /// Handle to use for reporting misbehaviour of peers. peerset: sc_peerset::PeersetHandle, } @@ -337,35 +341,18 @@ where retries: retries(&req), request: req, peer: (), // we do not know the peer yet + connection: None, }; self.pending_requests.push_back(rw); Ok(()) } - fn next_request_id(&mut self) -> u64 { + fn next_request_id(&mut self) -> RequestId { let id = self.next_request_id; self.next_request_id += 1; id } - // Iterate over peers known to possess a certain block. - fn idle_peers_with_block(&mut self, num: NumberFor) -> impl Iterator + '_ { - self.peers.iter() - .filter(move |(_, info)| { - info.status == PeerStatus::Idle && info.best_block >= Some(num) - }) - .map(|(peer, _)| peer.clone()) - } - - // Iterate over peers without a known block. - fn idle_peers_with_unknown_block(&mut self) -> impl Iterator + '_ { - self.peers.iter() - .filter(|(_, info)| { - info.status == PeerStatus::Idle && info.best_block.is_none() - }) - .map(|(peer, _)| peer.clone()) - } - /// Remove the given peer. /// /// If we have a request to this peer in flight, we move it back to @@ -378,12 +365,50 @@ where retries: rw.retries, request: rw.request, peer: (), // need to find another peer + connection: None, }; self.pending_requests.push_back(rw); } self.peers.remove(peer); } + /// Prepares a request by selecting a suitable peer and connection to send it to. + /// + /// If there is currently no suitable peer for the request, the given request + /// is returned as `Err`. + fn prepare_request(&self, req: RequestWrapper) + -> Result<(PeerId, RequestWrapper), RequestWrapper> + { + let number = required_block(&req.request); + + let mut peer = None; + for (peer_id, peer_info) in self.peers.iter() { + if peer_info.status == PeerStatus::Idle { + match peer_info.best_block { + Some(n) => if n >= number { + peer = Some((peer_id, peer_info)); + break + }, + None => peer = Some((peer_id, peer_info)) + } + } + } + + if let Some((peer_id, peer_info)) = peer { + let connection = peer_info.connections.iter().next().map(|(id, _)| *id); + let rw = RequestWrapper { + timestamp: req.timestamp, + retries: req.retries, + request: req.request, + peer: peer_id.clone(), + connection, + }; + Ok((peer_id.clone(), rw)) + } else { + Err(req) + } + } + /// Process a local request's response from remote. /// /// If successful, this will give us the actual, checked data we should be @@ -744,14 +769,14 @@ where fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec { self.peers.get(peer) - .map(|info| info.addresses.to_vec()) + .map(|info| info.connections.iter().map(|(_, a)| a.clone()).collect()) .unwrap_or_default() } fn inject_connected(&mut self, peer: &PeerId) { } - fn inject_connection_established(&mut self, peer: &PeerId, _: &ConnectionId, info: &ConnectedPoint) { + fn inject_connection_established(&mut self, peer: &PeerId, conn: &ConnectionId, info: &ConnectedPoint) { let peer_address = match info { ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(), ConnectedPoint::Dialer { address } => address.clone() @@ -760,7 +785,7 @@ where log::trace!("peer {} connected with address {}", peer, peer_address); let entry = self.peers.entry(peer.clone()).or_default(); - entry.addresses.push(peer_address); + entry.connections.push((*conn, peer_address)); } fn inject_disconnected(&mut self, peer: &PeerId) { @@ -768,7 +793,7 @@ where self.remove_peer(peer) } - fn inject_connection_closed(&mut self, peer: &PeerId, _: &ConnectionId, info: &ConnectedPoint) { + fn inject_connection_closed(&mut self, peer: &PeerId, conn: &ConnectionId, info: &ConnectedPoint) { let peer_address = match info { ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr, ConnectedPoint::Dialer { address } => address @@ -777,11 +802,28 @@ where log::trace!("connection to peer {} closed: {}", peer, peer_address); if let Some(info) = self.peers.get_mut(peer) { - info.addresses.retain(|a| a != peer_address) + info.connections.retain(|(c, _)| c != conn) + } + + // Add any outstanding requests on the closed connection back to the + // pending requests. + if let Some(id) = self.outstanding.iter() + .find(|(_, rw)| &rw.peer == peer && rw.connection == Some(*conn)) // (*) + .map(|(id, _)| *id) + { + let rw = self.outstanding.remove(&id).expect("by (*)"); + let rw = RequestWrapper { + timestamp: rw.timestamp, + retries: rw.retries, + request: rw.request, + peer: (), // need to find another peer + connection: None, + }; + self.pending_requests.push_back(rw); } } - fn inject_event(&mut self, peer: PeerId, _: ConnectionId, event: Event) { + fn inject_event(&mut self, peer: PeerId, conn: ConnectionId, event: Event) { match event { // An incoming request from remote has been received. Event::Request(request, mut stream) => { @@ -827,9 +869,10 @@ where // A response to one of our own requests has been received. Event::Response(id, response) => { if let Some(request) = self.outstanding.remove(&id) { - // We first just check if the response originates from the expected peer. + // We first just check if the response originates from the expected peer + // and connection. if request.peer != peer { - log::debug!("was expecting response from {} instead of {}", request.peer, peer); + log::debug!("Expected response from {} instead of {}.", request.peer, peer); self.outstanding.insert(id, request); self.remove_peer(&peer); self.peerset.report_peer(peer, ReputationChange::new_fatal("response from unexpected peer")); @@ -861,6 +904,7 @@ where retries: request.retries, request: request.request, peer: (), + connection: None, }; self.pending_requests.push_back(rw); } @@ -874,6 +918,7 @@ where retries: request.retries - 1, request: request.request, peer: (), + connection: None, }; self.pending_requests.push_back(rw) } else { @@ -913,57 +958,54 @@ where request.timestamp = Instant::now(); request.retries -= 1 } - let number = required_block(&request.request); - let available_peer = { - let p = self.idle_peers_with_block(number).next(); - if p.is_none() { - self.idle_peers_with_unknown_block().next() - } else { - p + + + match self.prepare_request(request) { + Err(request) => { + self.pending_requests.push_front(request); + log::debug!("no peer available to send request to"); + break } - }; - if let Some(peer) = available_peer { - let buf = match serialize_request(&request.request) { - Ok(b) => b, - Err(e) => { - log::debug!("failed to serialize request: {}", e); - send_reply(Err(ClientError::RemoteFetchFailed), request.request); - continue; - } - }; + Ok((peer, request)) => { + let request_bytes = match serialize_request(&request.request) { + Ok(bytes) => bytes, + Err(error) => { + log::debug!("failed to serialize request: {}", error); + send_reply(Err(ClientError::RemoteFetchFailed), request.request); + continue + } + }; - let id = self.next_request_id(); - log::trace!("sending request {} to peer {}", id, peer); - let protocol = OutboundProtocol { - request: buf, - request_id: id, - expected: match request.request { - Request::Body { .. } => ExpectedResponseTy::Block, - _ => ExpectedResponseTy::Light, - }, - max_response_size: self.config.max_response_size, - protocol: match request.request { - Request::Body { .. } => self.config.block_protocol.clone(), - _ => self.config.light_protocol.clone(), - }, - }; - self.peers.get_mut(&peer).map(|info| info.status = PeerStatus::BusyWith(id)); - let rw = RequestWrapper { - timestamp: request.timestamp, - retries: request.retries, - request: request.request, - peer: peer.clone(), - }; - self.outstanding.insert(id, rw); - return Poll::Ready(NetworkBehaviourAction::NotifyHandler { - handler: NotifyHandler::Any, - peer_id: peer, - event: protocol - }) - } else { - self.pending_requests.push_front(request); - log::debug!("no peer available to send request to"); - break + let (expected, protocol) = match request.request { + Request::Body { .. } => + (ExpectedResponseTy::Block, self.config.block_protocol.clone()), + _ => + (ExpectedResponseTy::Light, self.config.light_protocol.clone()), + }; + + let peer_id = peer.clone(); + let handler = request.connection.map_or(NotifyHandler::Any, NotifyHandler::One); + + let request_id = self.next_request_id(); + self.peers.get_mut(&peer).map(|p| p.status = PeerStatus::BusyWith(request_id)); + self.outstanding.insert(request_id, request); + + let event = OutboundProtocol { + request_id, + request: request_bytes, + expected, + max_response_size: self.config.max_response_size, + protocol, + }; + + log::trace!("sending request {} to peer {}", request_id, peer_id); + + return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event, + }) + } } } @@ -989,6 +1031,7 @@ where retries: rw.retries - 1, request: rw.request, peer: (), + connection: None, }; self.pending_requests.push_back(rw) } @@ -1127,7 +1170,7 @@ pub enum Event { /// Incoming request from remote and substream to use for the response. Request(api::v1::light::Request, T), /// Incoming response from remote. - Response(u64, Response), + Response(RequestId, Response), } /// Incoming response from remote. @@ -1187,7 +1230,7 @@ pub struct OutboundProtocol { /// The serialized protobuf request. request: Vec, /// Local identifier for the request. Used to associate it with a response. - request_id: u64, + request_id: RequestId, /// Kind of response expected for this request. expected: ExpectedResponseTy, /// The max. response length in bytes. diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 9acd6b4f7f000..e23655adc0f48 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -308,7 +308,8 @@ impl NetworkWorker { }; transport::build_transport(local_identity, config_mem, config_wasm, flowctrl) }; - let mut builder = SwarmBuilder::new(transport, behaviour, local_peer_id.clone()); + let mut builder = SwarmBuilder::new(transport, behaviour, local_peer_id.clone()) + .peer_connection_limit(crate::MAX_CONNECTIONS_PER_PEER); // TODO: Connection limits if let Some(spawner) = params.executor { struct SpawnImpl(F);