From d1e8e3339ea91c1e48e8bc30d56fad3241444e9d Mon Sep 17 00:00:00 2001 From: Alin Dima Date: Wed, 10 Jan 2024 15:19:50 +0200 Subject: [PATCH] add fallback request for req-response protocols (#2771) Previously, it was only possible to retry the same request on a different protocol name that had the exact same binary payloads. Introduce a way of trying a different request on a different protocol if the first one fails with Unsupported protocol. This helps with adding new req-response versions in polkadot while preserving compatibility with unupgraded nodes. The way req-response protocols were bumped previously was that they were bundled with some other notifications protocol upgrade, like for async backing (but that is more complicated, especially if the feature does not require any changes to a notifications protocol). Will be needed for implementing https://github.com/polkadot-fellows/RFCs/pull/47 TODO: - [x] add tests - [x] add guidance docs in polkadot about req-response protocol versioning --- .../outgoing_requests_engine.rs | 5 +- substrate/client/network/src/behaviour.rs | 15 +- .../client/network/src/request_responses.rs | 414 ++++++++++++++++-- substrate/client/network/src/service.rs | 16 +- .../client/network/src/service/traits.rs | 22 +- .../network/sync/src/block_relay_protocol.rs | 7 +- .../network/sync/src/block_request_handler.rs | 2 +- substrate/client/network/sync/src/engine.rs | 2 +- substrate/client/network/sync/src/mock.rs | 4 +- .../network/sync/src/pending_responses.rs | 4 +- .../client/network/sync/src/service/mock.rs | 6 +- .../network/sync/src/service/network.rs | 6 +- 12 files changed, 428 insertions(+), 75 deletions(-) diff --git a/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs b/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs index ef462a54fca5b..7121410ea109b 100644 --- a/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs +++ b/substrate/client/consensus/beefy/src/communication/request_response/outgoing_requests_engine.rs @@ -43,7 +43,7 @@ use crate::{ }; /// Response type received from network. -type Response = Result, RequestFailure>; +type Response = Result<(Vec, ProtocolName), RequestFailure>; /// Used to receive a response from the network. type ResponseReceiver = oneshot::Receiver; @@ -125,6 +125,7 @@ impl OnDemandJustificationsEngine { peer, self.protocol_name.clone(), payload, + None, tx, IfDisconnected::ImmediateError, ); @@ -204,7 +205,7 @@ impl OnDemandJustificationsEngine { }, } }) - .and_then(|encoded| { + .and_then(|(encoded, _)| { decode_and_verify_finality_proof::( &encoded[..], req_info.block, diff --git a/substrate/client/network/src/behaviour.rs b/substrate/client/network/src/behaviour.rs index 745550412fc21..1f234683392f1 100644 --- a/substrate/client/network/src/behaviour.rs +++ b/substrate/client/network/src/behaviour.rs @@ -231,13 +231,20 @@ impl Behaviour { pub fn send_request( &mut self, target: &PeerId, - protocol: &str, + protocol: ProtocolName, request: Vec, - pending_response: oneshot::Sender, RequestFailure>>, + fallback_request: Option<(Vec, ProtocolName)>, + pending_response: oneshot::Sender, ProtocolName), RequestFailure>>, connect: IfDisconnected, ) { - self.request_responses - .send_request(target, protocol, request, pending_response, connect) + self.request_responses.send_request( + target, + protocol, + request, + fallback_request, + pending_response, + connect, + ) } /// Returns a shared reference to the user protocol. diff --git a/substrate/client/network/src/request_responses.rs b/substrate/client/network/src/request_responses.rs index 5af072aaddc62..0cd1cf06bb33e 100644 --- a/substrate/client/network/src/request_responses.rs +++ b/substrate/client/network/src/request_responses.rs @@ -56,6 +56,7 @@ use libp2p::{ use std::{ collections::{hash_map::Entry, HashMap}, io, iter, + ops::Deref, pin::Pin, task::{Context, Poll}, time::{Duration, Instant}, @@ -172,6 +173,13 @@ pub struct OutgoingResponse { pub sent_feedback: Option>, } +/// Information stored about a pending request. +struct PendingRequest { + started_at: Instant, + response_tx: oneshot::Sender, ProtocolName), RequestFailure>>, + fallback_request: Option<(Vec, ProtocolName)>, +} + /// When sending a request, what to do on a disconnected recipient. #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] pub enum IfDisconnected { @@ -264,8 +272,7 @@ pub struct RequestResponsesBehaviour { >, /// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply. - pending_requests: - HashMap, RequestFailure>>)>, + pending_requests: HashMap, /// Whenever an incoming request arrives, a `Future` is added to this list and will yield the /// start time and the response to send back to the remote. @@ -348,29 +355,25 @@ impl RequestResponsesBehaviour { pub fn send_request( &mut self, target: &PeerId, - protocol_name: &str, + protocol_name: ProtocolName, request: Vec, - pending_response: oneshot::Sender, RequestFailure>>, + fallback_request: Option<(Vec, ProtocolName)>, + pending_response: oneshot::Sender, ProtocolName), RequestFailure>>, connect: IfDisconnected, ) { log::trace!(target: "sub-libp2p", "send request to {target} ({protocol_name:?}), {} bytes", request.len()); - if let Some((protocol, _)) = self.protocols.get_mut(protocol_name) { - if protocol.is_connected(target) || connect.should_connect() { - let request_id = protocol.send_request(target, request); - let prev_req_id = self.pending_requests.insert( - (protocol_name.to_string().into(), request_id).into(), - (Instant::now(), pending_response), - ); - debug_assert!(prev_req_id.is_none(), "Expect request id to be unique."); - } else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() { - log::debug!( - target: "sub-libp2p", - "Not connected to peer {:?}. At the same time local \ - node is no longer interested in the result.", - target, - ); - } + if let Some((protocol, _)) = self.protocols.get_mut(protocol_name.deref()) { + Self::send_request_inner( + protocol, + &mut self.pending_requests, + target, + protocol_name, + request, + fallback_request, + pending_response, + connect, + ) } else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() { log::debug!( target: "sub-libp2p", @@ -380,6 +383,37 @@ impl RequestResponsesBehaviour { ); } } + + fn send_request_inner( + behaviour: &mut Behaviour, + pending_requests: &mut HashMap, + target: &PeerId, + protocol_name: ProtocolName, + request: Vec, + fallback_request: Option<(Vec, ProtocolName)>, + pending_response: oneshot::Sender, ProtocolName), RequestFailure>>, + connect: IfDisconnected, + ) { + if behaviour.is_connected(target) || connect.should_connect() { + let request_id = behaviour.send_request(target, request); + let prev_req_id = pending_requests.insert( + (protocol_name.to_string().into(), request_id).into(), + PendingRequest { + started_at: Instant::now(), + response_tx: pending_response, + fallback_request, + }, + ); + debug_assert!(prev_req_id.is_none(), "Expect request id to be unique."); + } else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() { + log::debug!( + target: "sub-libp2p", + "Not connected to peer {:?}. At the same time local \ + node is no longer interested in the result.", + target, + ); + } + } } impl NetworkBehaviour for RequestResponsesBehaviour { @@ -596,8 +630,10 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } } + let mut fallback_requests = vec![]; + // Poll request-responses protocols. - for (protocol, (behaviour, resp_builder)) in &mut self.protocols { + for (protocol, (ref mut behaviour, ref mut resp_builder)) in &mut self.protocols { 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx, params) { let ev = match ev { // Main events we are interested in. @@ -698,17 +734,21 @@ impl NetworkBehaviour for RequestResponsesBehaviour { .pending_requests .remove(&(protocol.clone(), request_id).into()) { - Some((started, pending_response)) => { + Some(PendingRequest { started_at, response_tx, .. }) => { log::trace!( target: "sub-libp2p", "received response from {peer} ({protocol:?}), {} bytes", response.as_ref().map_or(0usize, |response| response.len()), ); - let delivered = pending_response - .send(response.map_err(|()| RequestFailure::Refused)) + let delivered = response_tx + .send( + response + .map_err(|()| RequestFailure::Refused) + .map(|resp| (resp, protocol.clone())), + ) .map_err(|_| RequestFailure::Obsolete); - (started, delivered) + (started_at, delivered) }, None => { log::warn!( @@ -742,8 +782,34 @@ impl NetworkBehaviour for RequestResponsesBehaviour { .pending_requests .remove(&(protocol.clone(), request_id).into()) { - Some((started, pending_response)) => { - if pending_response + Some(PendingRequest { + started_at, + response_tx, + fallback_request, + }) => { + // Try using the fallback request if the protocol was not + // supported. + if let OutboundFailure::UnsupportedProtocols = error { + if let Some((fallback_request, fallback_protocol)) = + fallback_request + { + log::trace!( + target: "sub-libp2p", + "Request with id {:?} failed. Trying the fallback protocol. {}", + request_id, + fallback_protocol.deref() + ); + fallback_requests.push(( + peer, + fallback_protocol, + fallback_request, + response_tx, + )); + continue + } + } + + if response_tx .send(Err(RequestFailure::Network(error.clone()))) .is_err() { @@ -754,7 +820,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { request_id, ); } - started + started_at }, None => { log::warn!( @@ -825,6 +891,25 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } } + // Send out fallback requests. + for (peer, protocol, request, pending_response) in fallback_requests.drain(..) { + if let Some((behaviour, _)) = self.protocols.get_mut(&protocol) { + Self::send_request_inner( + behaviour, + &mut self.pending_requests, + &peer, + protocol, + request, + None, + pending_response, + // We can error if not connected because the + // previous attempt would have tried to establish a + // connection already or errored and we wouldn't have gotten here. + IfDisconnected::ImmediateError, + ); + } + } + break Poll::Pending } } @@ -976,6 +1061,7 @@ mod tests { use super::*; use crate::mock::MockPeerStore; + use assert_matches::assert_matches; use futures::{channel::oneshot, executor::LocalPool, task::Spawn}; use libp2p::{ core::{ @@ -1025,7 +1111,7 @@ mod tests { #[test] fn basic_request_response_works() { - let protocol_name = "/test/req-resp/1"; + let protocol_name = ProtocolName::from("/test/req-resp/1"); let mut pool = LocalPool::new(); // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. @@ -1053,7 +1139,7 @@ mod tests { .unwrap(); let protocol_config = ProtocolConfig { - name: From::from(protocol_name), + name: protocol_name.clone(), fallback_names: Vec::new(), max_request_size: 1024, max_response_size: 1024 * 1024, @@ -1102,8 +1188,9 @@ mod tests { let (sender, receiver) = oneshot::channel(); swarm.behaviour_mut().send_request( &peer_id, - protocol_name, + protocol_name.clone(), b"this is a request".to_vec(), + None, sender, IfDisconnected::ImmediateError, ); @@ -1118,13 +1205,16 @@ mod tests { } } - assert_eq!(response_receiver.unwrap().await.unwrap().unwrap(), b"this is a response"); + assert_eq!( + response_receiver.unwrap().await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name) + ); }); } #[test] fn max_response_size_exceeded() { - let protocol_name = "/test/req-resp/1"; + let protocol_name = ProtocolName::from("/test/req-resp/1"); let mut pool = LocalPool::new(); // Build swarms whose behaviour is [`RequestResponsesBehaviour`]. @@ -1150,7 +1240,7 @@ mod tests { .unwrap(); let protocol_config = ProtocolConfig { - name: From::from(protocol_name), + name: protocol_name.clone(), fallback_names: Vec::new(), max_request_size: 1024, max_response_size: 8, // <-- important for the test @@ -1201,8 +1291,9 @@ mod tests { let (sender, receiver) = oneshot::channel(); swarm.behaviour_mut().send_request( &peer_id, - protocol_name, + protocol_name.clone(), b"this is a request".to_vec(), + None, sender, IfDisconnected::ImmediateError, ); @@ -1236,14 +1327,14 @@ mod tests { /// See [`ProtocolRequestId`] for additional information. #[test] fn request_id_collision() { - let protocol_name_1 = "/test/req-resp-1/1"; - let protocol_name_2 = "/test/req-resp-2/1"; + let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1"); + let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1"); let mut pool = LocalPool::new(); let mut swarm_1 = { let protocol_configs = vec![ ProtocolConfig { - name: From::from(protocol_name_1), + name: protocol_name_1.clone(), fallback_names: Vec::new(), max_request_size: 1024, max_response_size: 1024 * 1024, @@ -1251,7 +1342,7 @@ mod tests { inbound_queue: None, }, ProtocolConfig { - name: From::from(protocol_name_2), + name: protocol_name_2.clone(), fallback_names: Vec::new(), max_request_size: 1024, max_response_size: 1024 * 1024, @@ -1269,7 +1360,7 @@ mod tests { let protocol_configs = vec![ ProtocolConfig { - name: From::from(protocol_name_1), + name: protocol_name_1.clone(), fallback_names: Vec::new(), max_request_size: 1024, max_response_size: 1024 * 1024, @@ -1277,7 +1368,7 @@ mod tests { inbound_queue: Some(tx_1), }, ProtocolConfig { - name: From::from(protocol_name_2), + name: protocol_name_2.clone(), fallback_names: Vec::new(), max_request_size: 1024, max_response_size: 1024 * 1024, @@ -1359,15 +1450,17 @@ mod tests { let (sender_2, receiver_2) = oneshot::channel(); swarm_1.behaviour_mut().send_request( &peer_id, - protocol_name_1, + protocol_name_1.clone(), b"this is a request".to_vec(), + None, sender_1, IfDisconnected::ImmediateError, ); swarm_1.behaviour_mut().send_request( &peer_id, - protocol_name_2, + protocol_name_2.clone(), b"this is a request".to_vec(), + None, sender_2, IfDisconnected::ImmediateError, ); @@ -1385,8 +1478,239 @@ mod tests { } } let (response_receiver_1, response_receiver_2) = response_receivers.unwrap(); - assert_eq!(response_receiver_1.await.unwrap().unwrap(), b"this is a response"); - assert_eq!(response_receiver_2.await.unwrap().unwrap(), b"this is a response"); + assert_eq!( + response_receiver_1.await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name_1) + ); + assert_eq!( + response_receiver_2.await.unwrap().unwrap(), + (b"this is a response".to_vec(), protocol_name_2) + ); + }); + } + + #[test] + fn request_fallback() { + let protocol_name_1 = ProtocolName::from("/test/req-resp/2"); + let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1"); + let protocol_name_2 = ProtocolName::from("/test/another"); + let mut pool = LocalPool::new(); + + let protocol_config_1 = ProtocolConfig { + name: protocol_name_1.clone(), + fallback_names: Vec::new(), + max_request_size: 1024, + max_response_size: 1024 * 1024, + request_timeout: Duration::from_secs(30), + inbound_queue: None, + }; + let protocol_config_1_fallback = ProtocolConfig { + name: protocol_name_1_fallback.clone(), + fallback_names: Vec::new(), + max_request_size: 1024, + max_response_size: 1024 * 1024, + request_timeout: Duration::from_secs(30), + inbound_queue: None, + }; + let protocol_config_2 = ProtocolConfig { + name: protocol_name_2.clone(), + fallback_names: Vec::new(), + max_request_size: 1024, + max_response_size: 1024 * 1024, + request_timeout: Duration::from_secs(30), + inbound_queue: None, + }; + + // This swarm only speaks protocol_name_1_fallback and protocol_name_2. + // It only responds to requests. + let mut older_swarm = { + let (tx_1, mut rx_1) = async_channel::bounded::(64); + let (tx_2, mut rx_2) = async_channel::bounded::(64); + let mut protocol_config_1_fallback = protocol_config_1_fallback.clone(); + protocol_config_1_fallback.inbound_queue = Some(tx_1); + + let mut protocol_config_2 = protocol_config_2.clone(); + protocol_config_2.inbound_queue = Some(tx_2); + + pool.spawner() + .spawn_obj( + async move { + for _ in 0..2 { + if let Some(rq) = rx_1.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); + assert_eq!(rq.payload, b"request on protocol /test/req-resp/1"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok( + b"this is a response on protocol /test/req-resp/1".to_vec() + ), + reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), + }); + fb_rx.await.unwrap(); + } + } + + if let Some(rq) = rx_2.next().await { + let (fb_tx, fb_rx) = oneshot::channel(); + assert_eq!(rq.payload, b"request on protocol /test/other"); + let _ = rq.pending_response.send(super::OutgoingResponse { + result: Ok(b"this is a response on protocol /test/other".to_vec()), + reputation_changes: Vec::new(), + sent_feedback: Some(fb_tx), + }); + fb_rx.await.unwrap(); + } + } + .boxed() + .into(), + ) + .unwrap(); + + build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter()) + }; + + // This swarm speaks all protocols. + let mut new_swarm = build_swarm( + vec![ + protocol_config_1.clone(), + protocol_config_1_fallback.clone(), + protocol_config_2.clone(), + ] + .into_iter(), + ); + + { + let dial_addr = older_swarm.1.clone(); + Swarm::dial(&mut new_swarm.0, dial_addr).unwrap(); + } + + // Running `older_swarm`` in the background. + pool.spawner() + .spawn_obj({ + async move { + loop { + _ = older_swarm.0.select_next_some().await; + } + } + .boxed() + .into() + }) + .unwrap(); + + // Run the newer swarm. Attempt to make requests on all protocols. + let (mut swarm, _) = new_swarm; + let mut older_peer_id = None; + + pool.run_until(async move { + let mut response_receiver = None; + // Try the new protocol with a fallback. + loop { + match swarm.select_next_some().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + older_peer_id = Some(peer_id); + let (sender, receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + &peer_id, + protocol_name_1.clone(), + b"request on protocol /test/req-resp/2".to_vec(), + Some(( + b"request on protocol /test/req-resp/1".to_vec(), + protocol_config_1_fallback.name.clone(), + )), + sender, + IfDisconnected::ImmediateError, + ); + response_receiver = Some(receiver); + }, + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, + } + } + assert_eq!( + response_receiver.unwrap().await.unwrap().unwrap(), + ( + b"this is a response on protocol /test/req-resp/1".to_vec(), + protocol_name_1_fallback.clone() + ) + ); + // Try the old protocol with a useless fallback. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_1_fallback.clone(), + b"request on protocol /test/req-resp/1".to_vec(), + Some(( + b"dummy request, will fail if processed".to_vec(), + protocol_config_1_fallback.name.clone(), + )), + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, + } + } + assert_eq!( + response_receiver.await.unwrap().unwrap(), + ( + b"this is a response on protocol /test/req-resp/1".to_vec(), + protocol_name_1_fallback.clone() + ) + ); + // Try the new protocol with no fallback. Should fail. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_1.clone(), + b"request on protocol /test/req-resp-2".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + assert_matches!( + result.unwrap_err(), + RequestFailure::Network(OutboundFailure::UnsupportedProtocols) + ); + break + }, + _ => {}, + } + } + assert!(response_receiver.await.unwrap().is_err()); + // Try the other protocol with no fallback. + let (sender, response_receiver) = oneshot::channel(); + swarm.behaviour_mut().send_request( + older_peer_id.as_ref().unwrap(), + protocol_name_2.clone(), + b"request on protocol /test/other".to_vec(), + None, + sender, + IfDisconnected::ImmediateError, + ); + loop { + match swarm.select_next_some().await { + SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => { + result.unwrap(); + break + }, + _ => {}, + } + } + assert_eq!( + response_receiver.await.unwrap().unwrap(), + (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone()) + ); }); } } diff --git a/substrate/client/network/src/service.rs b/substrate/client/network/src/service.rs index 06db23844d0d9..47e23337633ba 100644 --- a/substrate/client/network/src/service.rs +++ b/substrate/client/network/src/service.rs @@ -1048,11 +1048,12 @@ where target: PeerId, protocol: ProtocolName, request: Vec, + fallback_request: Option<(Vec, ProtocolName)>, connect: IfDisconnected, - ) -> Result, RequestFailure> { + ) -> Result<(Vec, ProtocolName), RequestFailure> { let (tx, rx) = oneshot::channel(); - self.start_request(target, protocol, request, tx, connect); + self.start_request(target, protocol, request, fallback_request, tx, connect); match rx.await { Ok(v) => v, @@ -1068,13 +1069,15 @@ where target: PeerId, protocol: ProtocolName, request: Vec, - tx: oneshot::Sender, RequestFailure>>, + fallback_request: Option<(Vec, ProtocolName)>, + tx: oneshot::Sender, ProtocolName), RequestFailure>>, connect: IfDisconnected, ) { let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request { target, protocol: protocol.into(), request, + fallback_request, pending_response: tx, connect, }); @@ -1160,7 +1163,8 @@ enum ServiceToWorkerMsg { target: PeerId, protocol: ProtocolName, request: Vec, - pending_response: oneshot::Sender, RequestFailure>>, + fallback_request: Option<(Vec, ProtocolName)>, + pending_response: oneshot::Sender, ProtocolName), RequestFailure>>, connect: IfDisconnected, }, NetworkStatus { @@ -1287,13 +1291,15 @@ where target, protocol, request, + fallback_request, pending_response, connect, } => { self.network_service.behaviour_mut().send_request( &target, - &protocol, + protocol, request, + fallback_request, pending_response, connect, ); diff --git a/substrate/client/network/src/service/traits.rs b/substrate/client/network/src/service/traits.rs index d4d4a05a86f1d..74ddb986c247a 100644 --- a/substrate/client/network/src/service/traits.rs +++ b/substrate/client/network/src/service/traits.rs @@ -551,8 +551,9 @@ pub trait NetworkRequest { target: PeerId, protocol: ProtocolName, request: Vec, + fallback_request: Option<(Vec, ProtocolName)>, connect: IfDisconnected, - ) -> Result, RequestFailure>; + ) -> Result<(Vec, ProtocolName), RequestFailure>; /// Variation of `request` which starts a request whose response is delivered on a provided /// channel. @@ -569,7 +570,8 @@ pub trait NetworkRequest { target: PeerId, protocol: ProtocolName, request: Vec, - tx: oneshot::Sender, RequestFailure>>, + fallback_request: Option<(Vec, ProtocolName)>, + tx: oneshot::Sender, ProtocolName), RequestFailure>>, connect: IfDisconnected, ); } @@ -585,13 +587,20 @@ where target: PeerId, protocol: ProtocolName, request: Vec, + fallback_request: Option<(Vec, ProtocolName)>, connect: IfDisconnected, - ) -> Pin, RequestFailure>> + Send + 'async_trait>> + ) -> Pin< + Box< + dyn Future, ProtocolName), RequestFailure>> + + Send + + 'async_trait, + >, + > where 'life0: 'async_trait, Self: 'async_trait, { - T::request(self, target, protocol, request, connect) + T::request(self, target, protocol, request, fallback_request, connect) } fn start_request( @@ -599,10 +608,11 @@ where target: PeerId, protocol: ProtocolName, request: Vec, - tx: oneshot::Sender, RequestFailure>>, + fallback_request: Option<(Vec, ProtocolName)>, + tx: oneshot::Sender, ProtocolName), RequestFailure>>, connect: IfDisconnected, ) { - T::start_request(self, target, protocol, request, tx, connect) + T::start_request(self, target, protocol, request, fallback_request, tx, connect) } } diff --git a/substrate/client/network/sync/src/block_relay_protocol.rs b/substrate/client/network/sync/src/block_relay_protocol.rs index 7a313458bf034..b4ef72a10c6b8 100644 --- a/substrate/client/network/sync/src/block_relay_protocol.rs +++ b/substrate/client/network/sync/src/block_relay_protocol.rs @@ -18,7 +18,10 @@ use futures::channel::oneshot; use libp2p::PeerId; -use sc_network::request_responses::{ProtocolConfig, RequestFailure}; +use sc_network::{ + request_responses::{ProtocolConfig, RequestFailure}, + ProtocolName, +}; use sc_network_common::sync::message::{BlockData, BlockRequest}; use sp_runtime::traits::Block as BlockT; use std::sync::Arc; @@ -43,7 +46,7 @@ pub trait BlockDownloader: Send + Sync { &self, who: PeerId, request: BlockRequest, - ) -> Result, RequestFailure>, oneshot::Canceled>; + ) -> Result, ProtocolName), RequestFailure>, oneshot::Canceled>; /// Parses the protocol specific response to retrieve the block data. fn block_response_into_blocks( diff --git a/substrate/client/network/sync/src/block_request_handler.rs b/substrate/client/network/sync/src/block_request_handler.rs index f363dda3a2d18..f669a22cd2e94 100644 --- a/substrate/client/network/sync/src/block_request_handler.rs +++ b/substrate/client/network/sync/src/block_request_handler.rs @@ -570,7 +570,7 @@ impl BlockDownloader for FullBlockDownloader { &self, who: PeerId, request: BlockRequest, - ) -> Result, RequestFailure>, oneshot::Canceled> { + ) -> Result, ProtocolName), RequestFailure>, oneshot::Canceled> { // Build the request protobuf. let bytes = BlockRequestSchema { fields: request.fields.to_be_u32(), diff --git a/substrate/client/network/sync/src/engine.rs b/substrate/client/network/sync/src/engine.rs index d7b024cd801c7..952300a14d891 100644 --- a/substrate/client/network/sync/src/engine.rs +++ b/substrate/client/network/sync/src/engine.rs @@ -1263,7 +1263,7 @@ where let ResponseEvent { peer_id, request, response } = response_event; match response { - Ok(Ok(resp)) => match request { + Ok(Ok((resp, _))) => match request { PeerRequest::Block(req) => { match self.block_downloader.block_response_into_blocks(&req, resp) { Ok(blocks) => { diff --git a/substrate/client/network/sync/src/mock.rs b/substrate/client/network/sync/src/mock.rs index 42220096e0695..a4f5eb564c2cd 100644 --- a/substrate/client/network/sync/src/mock.rs +++ b/substrate/client/network/sync/src/mock.rs @@ -22,7 +22,7 @@ use crate::block_relay_protocol::{BlockDownloader as BlockDownloaderT, BlockResp use futures::channel::oneshot; use libp2p::PeerId; -use sc_network::RequestFailure; +use sc_network::{ProtocolName, RequestFailure}; use sc_network_common::sync::message::{BlockData, BlockRequest}; use sp_runtime::traits::Block as BlockT; @@ -35,7 +35,7 @@ mockall::mock! { &self, who: PeerId, request: BlockRequest, - ) -> Result, RequestFailure>, oneshot::Canceled>; + ) -> Result, ProtocolName), RequestFailure>, oneshot::Canceled>; fn block_response_into_blocks( &self, request: &BlockRequest, diff --git a/substrate/client/network/sync/src/pending_responses.rs b/substrate/client/network/sync/src/pending_responses.rs index 55308dfc1ea90..e21a576322508 100644 --- a/substrate/client/network/sync/src/pending_responses.rs +++ b/substrate/client/network/sync/src/pending_responses.rs @@ -28,7 +28,7 @@ use futures::{ }; use libp2p::PeerId; use log::error; -use sc_network::request_responses::RequestFailure; +use sc_network::{request_responses::RequestFailure, types::ProtocolName}; use sp_runtime::traits::Block as BlockT; use std::task::{Context, Poll, Waker}; use tokio_stream::StreamMap; @@ -37,7 +37,7 @@ use tokio_stream::StreamMap; const LOG_TARGET: &'static str = "sync"; /// Response result. -type ResponseResult = Result, RequestFailure>, oneshot::Canceled>; +type ResponseResult = Result, ProtocolName), RequestFailure>, oneshot::Canceled>; /// A future yielding [`ResponseResult`]. type ResponseFuture = BoxFuture<'static, ResponseResult>; diff --git a/substrate/client/network/sync/src/service/mock.rs b/substrate/client/network/sync/src/service/mock.rs index 6e307d8698444..420de8cd5fdcf 100644 --- a/substrate/client/network/sync/src/service/mock.rs +++ b/substrate/client/network/sync/src/service/mock.rs @@ -117,14 +117,16 @@ mockall::mock! { target: PeerId, protocol: ProtocolName, request: Vec, + fallback_request: Option<(Vec, ProtocolName)>, connect: IfDisconnected, - ) -> Result, RequestFailure>; + ) -> Result<(Vec, ProtocolName), RequestFailure>; fn start_request( &self, target: PeerId, protocol: ProtocolName, request: Vec, - tx: oneshot::Sender, RequestFailure>>, + fallback_request: Option<(Vec, ProtocolName)>, + tx: oneshot::Sender, ProtocolName), RequestFailure>>, connect: IfDisconnected, ); } diff --git a/substrate/client/network/sync/src/service/network.rs b/substrate/client/network/sync/src/service/network.rs index 12a47d6a9b544..07f28519afb2b 100644 --- a/substrate/client/network/sync/src/service/network.rs +++ b/substrate/client/network/sync/src/service/network.rs @@ -54,7 +54,7 @@ pub enum ToServiceCommand { PeerId, ProtocolName, Vec, - oneshot::Sender, RequestFailure>>, + oneshot::Sender, ProtocolName), RequestFailure>>, IfDisconnected, ), @@ -94,7 +94,7 @@ impl NetworkServiceHandle { who: PeerId, protocol: ProtocolName, request: Vec, - tx: oneshot::Sender, RequestFailure>>, + tx: oneshot::Sender, ProtocolName), RequestFailure>>, connect: IfDisconnected, ) { let _ = self @@ -134,7 +134,7 @@ impl NetworkServiceProvider { ToServiceCommand::ReportPeer(peer, reputation_change) => service.report_peer(peer, reputation_change), ToServiceCommand::StartRequest(peer, protocol, request, tx, connect) => - service.start_request(peer, protocol, request, tx, connect), + service.start_request(peer, protocol, request, None, tx, connect), ToServiceCommand::WriteNotification(peer, protocol, message) => service.write_notification(peer, protocol, message), ToServiceCommand::SetNotificationHandshake(protocol, handshake) =>