diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 2394d22319e25..dec8788f3f4dc 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -311,13 +311,13 @@ impl NetworkBehaviourEventProcess { + block_requests::Event::Response { peer, original_request: _, response, request_duration } => { self.events.push_back(BehaviourOut::RequestFinished { peer: peer.clone(), protocol: self.block_requests.protocol_name().to_vec(), request_duration, }); - let ev = self.substrate.on_block_response(peer, original_request, response); + let ev = self.substrate.on_block_response(peer, response); self.inject_event(ev); } block_requests::Event::RequestCancelled { peer, request_duration, .. } | diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 421035e5705be..8a71494d8294d 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -548,30 +548,6 @@ impl Protocol { self.sync.update_chain_info(&info.best_hash, info.best_number); } - /// Accepts a response from the legacy substream and determines what the corresponding - /// request was. - fn handle_response( - &mut self, - who: PeerId, - response: &message::BlockResponse - ) -> Option> { - if let Some(ref mut peer) = self.context_data.peers.get_mut(&who) { - if peer.obsolete_requests.remove(&response.id).is_some() { - trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", who, response.id); - return None; - } - // Clear the request. If the response is invalid peer will be disconnected anyway. - let request = peer.block_request.take(); - if request.as_ref().map_or(false, |(_, r)| r.id == response.id) { - return request.map(|(_, r)| r) - } - trace!(target: "sync", "Unexpected response packet from {} ({})", who, response.id); - self.peerset_handle.report_peer(who.clone(), rep::UNEXPECTED_RESPONSE); - self.behaviour.disconnect_peer(&who); - } - None - } - fn update_peer_info(&mut self, who: &PeerId) { if let Some(info) = self.sync.peer_info(who) { if let Some(ref mut peer) = self.context_data.peers.get_mut(who) { @@ -609,11 +585,9 @@ impl Protocol { GenericMessage::Status(s) => return self.on_status_message(who, s), GenericMessage::BlockRequest(r) => self.on_block_request(who, r), GenericMessage::BlockResponse(r) => { - if let Some(request) = self.handle_response(who.clone(), &r) { - let outcome = self.on_block_response(who.clone(), request, r); - self.update_peer_info(&who); - return outcome - } + let outcome = self.on_block_response(who.clone(), r); + self.update_peer_info(&who); + return outcome }, GenericMessage::BlockAnnounce(announce) => { let outcome = self.on_block_announce(who.clone(), announce); @@ -705,6 +679,10 @@ impl Protocol { ); } + fn update_peer_request(&mut self, who: &PeerId, request: &mut message::BlockRequest) { + update_peer_request::(&mut self.context_data.peers, who, request) + } + /// Called when a new peer is connected pub fn on_peer_connected(&mut self, who: PeerId) { trace!(target: "sync", "Connecting {}", who); @@ -844,9 +822,34 @@ impl Protocol { pub fn on_block_response( &mut self, peer: PeerId, - request: message::BlockRequest, response: message::BlockResponse, ) -> CustomMessageOutcome { + let request = if let Some(ref mut p) = self.context_data.peers.get_mut(&peer) { + if p.obsolete_requests.remove(&response.id).is_some() { + trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id); + return CustomMessageOutcome::None; + } + // Clear the request. If the response is invalid peer will be disconnected anyway. + match p.block_request.take() { + Some((_, request)) if request.id == response.id => request, + Some(_) => { + trace!(target: "sync", "Ignoring obsolete block response packet from {} ({})", peer, response.id); + return CustomMessageOutcome::None; + } + None => { + trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer); + self.behaviour.disconnect_peer(&peer); + self.peerset_handle.report_peer(peer, rep::UNEXPECTED_RESPONSE); + return CustomMessageOutcome::None; + } + } + } else { + trace!(target: "sync", "Unexpected response packet from unknown peer {}", peer); + self.behaviour.disconnect_peer(&peer); + self.peerset_handle.report_peer(peer, rep::UNEXPECTED_RESPONSE); + return CustomMessageOutcome::None; + }; + let blocks_range = || match ( response.blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())), response.blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())), @@ -891,8 +894,9 @@ impl Protocol { match self.sync.on_block_data(&peer, Some(request), response) { Ok(sync::OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), - Ok(sync::OnBlockData::Request(peer, req)) => { + Ok(sync::OnBlockData::Request(peer, mut req)) => { if self.use_new_block_requests_protocol { + self.update_peer_request(&peer, &mut req); CustomMessageOutcome::BlockRequest { target: peer, request: req, @@ -1076,8 +1080,9 @@ impl Protocol { if info.roles.is_full() { match self.sync.new_peer(who.clone(), info.best_hash, info.best_number) { Ok(None) => (), - Ok(Some(req)) => { + Ok(Some(mut req)) => { if self.use_new_block_requests_protocol { + self.update_peer_request(&who, &mut req); self.pending_messages.push_back(CustomMessageOutcome::BlockRequest { target: who.clone(), request: req, @@ -1413,8 +1418,9 @@ impl Protocol { Ok(sync::OnBlockData::Import(origin, blocks)) => { CustomMessageOutcome::BlockImport(origin, blocks) }, - Ok(sync::OnBlockData::Request(peer, req)) => { + Ok(sync::OnBlockData::Request(peer, mut req)) => { if self.use_new_block_requests_protocol { + self.update_peer_request(&peer, &mut req); CustomMessageOutcome::BlockRequest { target: peer, request: req, @@ -1520,8 +1526,9 @@ impl Protocol { ); for result in results { match result { - Ok((id, req)) => { + Ok((id, mut req)) => { if self.use_new_block_requests_protocol { + update_peer_request(&mut self.context_data.peers, &id, &mut req); self.pending_messages.push_back(CustomMessageOutcome::BlockRequest { target: id, request: req, @@ -1935,6 +1942,22 @@ fn send_request( send_message::(behaviour, stats, who, None, message) } +fn update_peer_request( + peers: &mut HashMap>, + who: &PeerId, + request: &mut message::BlockRequest, +) { + if let Some(ref mut peer) = peers.get_mut(who) { + request.id = peer.next_request_id; + peer.next_request_id += 1; + if let Some((timestamp, request)) = peer.block_request.take() { + trace!(target: "sync", "Request {} for {} is now obsolete.", request.id, who); + peer.obsolete_requests.insert(request.id, timestamp); + } + peer.block_request = Some((Instant::now(), request.clone())); + } +} + fn send_message( behaviour: &mut GenericProto, stats: &mut HashMap<&'static str, PacketStats>, @@ -2012,8 +2035,9 @@ impl NetworkBehaviour for Protocol { self.propagate_extrinsics(); } - for (id, r) in self.sync.block_requests() { + for (id, mut r) in self.sync.block_requests() { if self.use_new_block_requests_protocol { + update_peer_request(&mut self.context_data.peers, &id, &mut r); let event = CustomMessageOutcome::BlockRequest { target: id.clone(), request: r, @@ -2029,8 +2053,9 @@ impl NetworkBehaviour for Protocol { ) } } - for (id, r) in self.sync.justification_requests() { + for (id, mut r) in self.sync.justification_requests() { if self.use_new_block_requests_protocol { + update_peer_request(&mut self.context_data.peers, &id, &mut r); let event = CustomMessageOutcome::BlockRequest { target: id, request: r,