diff --git a/Cargo.lock b/Cargo.lock index c62e454cb98e..76837b157bdd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5570,8 +5570,6 @@ dependencies = [ "polkadot-primitives", "sc-network", "strum", - "thiserror", - "zstd", ] [[package]] @@ -5771,6 +5769,8 @@ dependencies = [ "sp-std", "sp-trie", "sp-version", + "thiserror", + "zstd", ] [[package]] diff --git a/node/network/bridge/src/multiplexer.rs b/node/network/bridge/src/multiplexer.rs index d9475d0ea4a9..3901a91078ea 100644 --- a/node/network/bridge/src/multiplexer.rs +++ b/node/network/bridge/src/multiplexer.rs @@ -136,6 +136,11 @@ fn multiplex_single( decode_with_peer::<v1::AvailabilityFetchingRequest>(peer, payload)?, pending_response, )), + Protocol::CollationFetching => From::from(IncomingRequest::new( + peer, + decode_with_peer::<v1::CollationFetchingRequest>(peer, payload)?, + pending_response, + )), }; Ok(r) } diff --git a/node/network/collator-protocol/src/lib.rs b/node/network/collator-protocol/src/lib.rs index a1201c597e9c..e1f240d3a9c9 100644 --- a/node/network/collator-protocol/src/lib.rs +++ b/node/network/collator-protocol/src/lib.rs @@ -44,7 +44,6 @@ mod collator_side; mod validator_side; const LOG_TARGET: &'static str = "collator_protocol"; -const REQUEST_TIMEOUT: Duration = Duration::from_secs(1); #[derive(Debug, Error)] enum Error { @@ -94,7 +93,6 @@ impl CollatorProtocolSubsystem { match self.protocol_side { ProtocolSide::Validator(metrics) => validator_side::run( ctx, - REQUEST_TIMEOUT, metrics, ).await, ProtocolSide::Collator(id, metrics) => collator_side::run( diff --git a/node/network/collator-protocol/src/validator_side.rs b/node/network/collator-protocol/src/validator_side.rs index a3a8216e5fc7..56458b2d75c6 100644 --- a/node/network/collator-protocol/src/validator_side.rs +++ b/node/network/collator-protocol/src/validator_side.rs @@ -35,9 +35,8 @@ use polkadot_subsystem::{ NetworkBridgeEvent, }, }; -use polkadot_node_network_protocol::{ - v1 as protocol_v1, View, OurView, PeerId, RequestId, UnifiedReputationChange as Rep, -}; +use polkadot_node_network_protocol::{OurView, PeerId, RequestId, UnifiedReputationChange as Rep, View, request_response::OutgoingRequest, v1 as protocol_v1}; +use polkadot_node_network_protocol::request_response::v1 as req_res_v1; use polkadot_node_subsystem_util::{TimeoutExt as _, metrics::{self, prometheus}}; use polkadot_node_primitives::{Statement, SignedFullStatement}; @@ -157,11 +156,8 @@ impl CollationRequest { } struct PerRequest { - // The sender side to signal the `CollationRequest` to resolve successfully. - received: oneshot::Sender<()>, - - // Send result here. - result: oneshot::Sender<(CandidateReceipt, PoV)>, + // Results are received here. + result: oneshot::Receiver<req_resp_v1::OutgoingResult<req_res_v1::CollationFetchingResponse>>, } /// All state relevant for the validator side of the protocol lives here. @@ -180,26 +176,12 @@ struct State { /// per collator per source per relay-parent. advertisements: HashMap<PeerId, HashSet<(ParaId, Hash)>>, - /// Derive RequestIds from this. - next_request_id: RequestId, - /// The collations we have requested by relay parent and para id. /// /// For each relay parent and para id we may be connected to a number /// of collators each of those may have advertised a different collation. /// So we group such cases here. - requested_collations: HashMap<(Hash, ParaId, PeerId), RequestId>, - - /// Housekeeping handles we need to have per request to: - /// - cancel ongoing requests - /// - reply with collations to other subsystems. - requests_info: HashMap<RequestId, PerRequest>, - - /// Collation requests that are currently in progress. - requests_in_progress: FuturesUnordered<BoxFuture<'static, CollationRequestResult>>, - - /// Delay after which a collation request would time out. - request_timeout: Duration, + requested_collations: HashMap<(Hash, ParaId, PeerId), PerRequest>, /// Leaves have recently moved out of scope. /// These are looked into when we receive previously requested collations that we @@ -452,9 +434,6 @@ where return; } - let request_id = state.next_request_id; - state.next_request_id += 1; - let (tx, rx) = oneshot::channel(); let per_request = PerRequest { @@ -464,7 +443,6 @@ where let request = CollationRequest { received: rx, - timeout: state.request_timeout, request_id, }; @@ -483,18 +461,16 @@ where "Requesting collation", ); - let wire_message = protocol_v1::CollatorProtocolMessage::RequestCollation( - request_id, - relay_parent, - para_id, - ); + let (full_request, response_recv) = + OutgoingRequest::new(Recipient::Peer(peer_id), req_res_v1::CollationFetchingRequest { + relay_parent, + para_id, + }); + let requests = Requests::CollationFetchingRequest(full_request); ctx.send_message(AllMessages::NetworkBridge( - NetworkBridgeMessage::SendCollationMessage( - vec![peer_id], - protocol_v1::CollationProtocol::CollatorProtocol(wire_message), - ) - )).await; + NetworkBridgeMessage::SendRequests(requests)) + ).await; } /// Notify `CandidateSelectionSubsystem` that a collation has been advertised. @@ -738,6 +714,12 @@ where ); } } + CollationFetchingRequest(_) => { + tracing::warn!( + target: LOG_TARGET, + "CollationFetchingRequest message is not expected on the validator side of the protocol", + ); + } } } @@ -745,7 +727,6 @@ where #[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))] pub(crate) async fn run<Context>( mut ctx: Context, - request_timeout: Duration, metrics: Metrics, ) -> Result<()> where @@ -755,7 +736,6 @@ where use OverseerSignal::*; let mut state = State { - request_timeout, metrics, ..Default::default() }; diff --git a/node/network/protocol/Cargo.toml b/node/network/protocol/Cargo.toml index 6c56cbc29858..f7a0c72b2a08 100644 --- a/node/network/protocol/Cargo.toml +++ b/node/network/protocol/Cargo.toml @@ -12,8 +12,4 @@ polkadot-node-jaeger = { path = "../../jaeger" } parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] } sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" } strum = { version = "0.20", features = ["derive"] } -thiserror = "1.0.23" futures = "0.3.12" - -[target.'cfg(not(target_os = "unknown"))'.dependencies] -zstd = "0.5.0" diff --git a/node/network/protocol/src/lib.rs b/node/network/protocol/src/lib.rs index 075a0d9decfe..6d3396f1e626 100644 --- a/node/network/protocol/src/lib.rs +++ b/node/network/protocol/src/lib.rs @@ -17,7 +17,7 @@ //! Network protocol types for parachains. #![deny(unused_crate_dependencies)] -#![warn(missing_docs)] +#![warn(missing_docs, unused_imports)] use polkadot_primitives::v1::{Hash, BlockNumber}; use parity_scale_codec::{Encode, Decode}; @@ -288,10 +288,7 @@ impl View { /// v1 protocol types. pub mod v1 { - use polkadot_primitives::v1::{ - Hash, CollatorId, Id as ParaId, ErasureChunk, CandidateReceipt, - SignedAvailabilityBitfield, PoV, CandidateHash, ValidatorIndex, CandidateIndex, AvailableData, - }; + use polkadot_primitives::v1::{AvailableData, CandidateHash, CandidateIndex, CollatorId, CompressedPoV, ErasureChunk, Hash, Id as ParaId, SignedAvailabilityBitfield, ValidatorIndex}; use polkadot_node_primitives::{ SignedFullStatement, approval::{IndirectAssignmentCert, IndirectSignedApprovalVote}, @@ -367,12 +364,6 @@ pub mod v1 { /// that they are a collator with given ID. #[codec(index = 1)] AdvertiseCollation(Hash, ParaId), - /// Request the advertised collation at that relay-parent. - #[codec(index = 2)] - RequestCollation(RequestId, Hash, ParaId), - /// A requested collation. - #[codec(index = 3)] - Collation(RequestId, CandidateReceipt, CompressedPoV), /// A collation sent to a validator was seconded. #[codec(index = 4)] CollationSeconded(SignedFullStatement), diff --git a/node/network/protocol/src/request_response/mod.rs b/node/network/protocol/src/request_response/mod.rs index b727d26fc3da..ab547af37b08 100644 --- a/node/network/protocol/src/request_response/mod.rs +++ b/node/network/protocol/src/request_response/mod.rs @@ -43,7 +43,7 @@ pub use sc_network::config::RequestResponseConfig; /// All requests that can be sent to the network bridge. pub mod request; -pub use request::{IncomingRequest, OutgoingRequest, Requests}; +pub use request::{IncomingRequest, OutgoingRequest, Requests, Recipient}; ///// Multiplexer for incoming requests. // pub mod multiplexer; @@ -68,6 +68,10 @@ pub enum Protocol { /// sets. const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(3); +/// Request timeout where we can assume the connection is already open (e.g. we have peers in a +/// peer set as well. +const DEFAULT_REQUEST_TIMEOUT_CONNECTED: Duration = Duration::from_secs(1); + impl Protocol { /// Get a configuration for a given Request response protocol. /// @@ -99,11 +103,11 @@ impl Protocol { name: p_name, max_request_size: 1_000, /// Collations are expected to be around 10Meg, probably much smaller with - /// compression. So 20Meg should be sufficient, we might be able to reduce this + /// compression. So 10Meg should be sufficient, we might be able to reduce this /// further. - max_response_size: 20_000_000, - // Also just some relative conservative guess: - request_timeout: DEFAULT_REQUEST_TIMEOUT, + max_response_size: 10_000_000, + // Taken from initial implementation in collator protocol: + request_timeout: DEFAULT_REQUEST_TIMEOUT_CONNECTED, inbound_queue: Some(tx), }, }; diff --git a/node/network/protocol/src/request_response/request.rs b/node/network/protocol/src/request_response/request.rs index de5470c116bb..da0d4cd53b3f 100644 --- a/node/network/protocol/src/request_response/request.rs +++ b/node/network/protocol/src/request_response/request.rs @@ -40,6 +40,8 @@ pub trait IsRequest { pub enum Requests { /// Request an availability chunk from a node. AvailabilityFetching(OutgoingRequest<v1::AvailabilityFetchingRequest>), + /// Fetch a collation from a collator which previously announced it. + CollationFetching(OutgoingRequest<v1::CollationFetchingRequest>), } impl Requests { @@ -47,6 +49,7 @@ impl Requests { pub fn get_protocol(&self) -> Protocol { match self { Self::AvailabilityFetching(_) => Protocol::AvailabilityFetching, + Self::CollationFetching(_) => Protocol::CollationFetching, } } @@ -60,12 +63,13 @@ impl Requests { pub fn encode_request(self) -> (Protocol, OutgoingRequest<Vec<u8>>) { match self { Self::AvailabilityFetching(r) => r.encode_request(), + Self::CollationFetching(r) => r.encode_request(), } } } /// Potential recipients of an outgoing request. -#[derive(Debug)] +#[derive(Debug, Eq, Hash, PartialEq)] pub enum Recipient { /// Recipient is a regular peer and we know its peer id. Peer(PeerId), @@ -99,6 +103,9 @@ pub enum RequestError { Canceled(oneshot::Canceled), } +/// Responses received for an `OutgoingRequest`. +pub type OutgoingResult<Res> = Result<Res, RequestError>; + impl<Req> OutgoingRequest<Req> where Req: IsRequest + Encode, @@ -113,7 +120,7 @@ where payload: Req, ) -> ( Self, - impl Future<Output = Result<Req::Response, RequestError>>, + impl Future<Output = OutgoingResult<Req::Response>>, ) { let (tx, rx) = oneshot::channel(); let r = Self { @@ -210,7 +217,7 @@ where /// Future for actually receiving a typed response for an OutgoingRequest. async fn receive_response<Req>( rec: oneshot::Receiver<Result<Vec<u8>, network::RequestFailure>>, -) -> Result<Req::Response, RequestError> +) -> OutgoingResult<Req::Response> where Req: IsRequest, Req::Response: Decode, diff --git a/node/network/protocol/src/request_response/v1.rs b/node/network/protocol/src/request_response/v1.rs index 4f8c968b8fd5..14aeb43fe594 100644 --- a/node/network/protocol/src/request_response/v1.rs +++ b/node/network/protocol/src/request_response/v1.rs @@ -18,7 +18,8 @@ use parity_scale_codec::{Decode, Encode}; -use polkadot_primitives::v1::{CandidateHash, ErasureChunk, ValidatorIndex}; +use polkadot_primitives::v1::{CandidateHash, CandidateReceipt, ErasureChunk, ValidatorIndex, CompressedPoV, Hash}; +use polkadot_primitives::v1::Id as ParaId; use super::request::IsRequest; use super::Protocol; @@ -78,3 +79,23 @@ impl IsRequest for AvailabilityFetchingRequest { type Response = AvailabilityFetchingResponse; const PROTOCOL: Protocol = Protocol::AvailabilityFetching; } + +/// Request the advertised collation at that relay-parent. +#[derive(Debug, Clone, Encode, Decode)] +pub struct CollationFetchingRequest { + relay_parent: Hash, + para_id: ParaId, +} + +/// Responses as sent by collators. +#[derive(Debug, Clone, Encode, Decode)] +pub enum CollationFetchingResponse { + /// Deliver requested collation. + #[codec(index = 0)] + Collation(CandidateReceipt, CompressedPoV), +} + +impl IsRequest for CollationFetchingRequest { + type Response = CollationFetchingResponse; + const PROTOCOL: Protocol = Protocol::CollationFetching; +} diff --git a/node/subsystem/src/messages.rs b/node/subsystem/src/messages.rs index 7800b4631700..153831ea1ec6 100644 --- a/node/subsystem/src/messages.rs +++ b/node/subsystem/src/messages.rs @@ -198,22 +198,25 @@ pub enum CollatorProtocolMessage { /// Get a network bridge update. #[from] NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::CollatorProtocolMessage>), + /// Incoming network request for a collation. + CollationFetchingRequest(IncomingRequest<req_res_v1::CollationFetchingRequest>) } -impl CollatorProtocolMessage { +// impl CollatorProtocolMessage { /// If the current variant contains the relay parent hash, return it. - pub fn relay_parent(&self) -> Option<Hash> { - match self { - Self::CollateOn(_) => None, - Self::DistributeCollation(receipt, _, _) => Some(receipt.descriptor().relay_parent), - Self::FetchCollation(relay_parent, _, _, _) => Some(*relay_parent), - Self::ReportCollator(_) => None, - Self::NoteGoodCollation(_) => None, - Self::NetworkBridgeUpdateV1(_) => None, - Self::NotifyCollationSeconded(_, _) => None, - } - } -} +// pub fn relay_parent(&self) -> Option<Hash> { +// match self { +// Self::CollateOn(_) => None, +// Self::DistributeCollation(receipt, _, _) => Some(receipt.descriptor().relay_parent), +// Self::FetchCollation(relay_parent, _, _, _) => Some(*relay_parent), +// Self::ReportCollator(_) => None, +// Self::NoteGoodCollation(_) => None, +// Self::NetworkBridgeUpdateV1(_) => None, +// Self::CollationFetchingRequest(_) => None, +// Self::NotifyCollationSeconded(_, _) => None, +// } +// } +// } /// Messages received by the network bridge subsystem. #[derive(Debug)] @@ -750,3 +753,13 @@ impl From<IncomingRequest<req_res_v1::AvailabilityFetchingRequest>> for AllMessa From::<AvailabilityDistributionMessage>::from(From::from(req)) } } +impl From<IncomingRequest<req_res_v1::CollationFetchingRequest>> for AllMessages { + fn from(req: IncomingRequest<req_res_v1::CollationFetchingRequest>) -> Self { + From::<CollatorProtocolMessage>::from(From::from(req)) + } +} +impl From<IncomingRequest<req_res_v1::CollationFetchingRequest>> for CollatorProtocolMessage { + fn from(req: IncomingRequest<req_res_v1::CollationFetchingRequest>) -> Self { + Self::CollationFetchingRequest(req) + } +}