Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
eskimor committed Mar 10, 2021
1 parent bb34beb commit 50bcf94
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 80 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions node/network/bridge/src/multiplexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ fn multiplex_single(
decode_with_peer::<v1::AvailabilityFetchingRequest>(peer, payload)?,
pending_response,
)),
Protocol::CollationFetching => From::from(IncomingRequest::new(
peer,
decode_with_peer::<v1::CollationFetchingRequest>(peer, payload)?,
pending_response,
)),
};
Ok(r)
}
Expand Down
2 changes: 0 additions & 2 deletions node/network/collator-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ mod collator_side;
mod validator_side;

const LOG_TARGET: &'static str = "collator_protocol";
const REQUEST_TIMEOUT: Duration = Duration::from_secs(1);

#[derive(Debug, Error)]
enum Error {
Expand Down Expand Up @@ -94,7 +93,6 @@ impl CollatorProtocolSubsystem {
match self.protocol_side {
ProtocolSide::Validator(metrics) => validator_side::run(
ctx,
REQUEST_TIMEOUT,
metrics,
).await,
ProtocolSide::Collator(id, metrics) => collator_side::run(
Expand Down
58 changes: 19 additions & 39 deletions node/network/collator-protocol/src/validator_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ use polkadot_subsystem::{
NetworkBridgeEvent,
},
};
use polkadot_node_network_protocol::{
v1 as protocol_v1, View, OurView, PeerId, RequestId, UnifiedReputationChange as Rep,
};
use polkadot_node_network_protocol::{OurView, PeerId, RequestId, UnifiedReputationChange as Rep, View, request_response::OutgoingRequest, v1 as protocol_v1};
use polkadot_node_network_protocol::request_response::v1 as req_res_v1;
use polkadot_node_subsystem_util::{TimeoutExt as _, metrics::{self, prometheus}};
use polkadot_node_primitives::{Statement, SignedFullStatement};

Expand Down Expand Up @@ -157,11 +156,8 @@ impl CollationRequest {
}

struct PerRequest {
// The sender side to signal the `CollationRequest` to resolve successfully.
received: oneshot::Sender<()>,

// Send result here.
result: oneshot::Sender<(CandidateReceipt, PoV)>,
// Results are received here.
result: oneshot::Receiver<req_resp_v1::OutgoingResult<req_res_v1::CollationFetchingResponse>>,
}

/// All state relevant for the validator side of the protocol lives here.
Expand All @@ -180,26 +176,12 @@ struct State {
/// per collator per source per relay-parent.
advertisements: HashMap<PeerId, HashSet<(ParaId, Hash)>>,

/// Derive RequestIds from this.
next_request_id: RequestId,

/// The collations we have requested by relay parent and para id.
///
/// For each relay parent and para id we may be connected to a number
/// of collators each of those may have advertised a different collation.
/// So we group such cases here.
requested_collations: HashMap<(Hash, ParaId, PeerId), RequestId>,

/// Housekeeping handles we need to have per request to:
/// - cancel ongoing requests
/// - reply with collations to other subsystems.
requests_info: HashMap<RequestId, PerRequest>,

/// Collation requests that are currently in progress.
requests_in_progress: FuturesUnordered<BoxFuture<'static, CollationRequestResult>>,

/// Delay after which a collation request would time out.
request_timeout: Duration,
requested_collations: HashMap<(Hash, ParaId, PeerId), PerRequest>,

/// Leaves have recently moved out of scope.
/// These are looked into when we receive previously requested collations that we
Expand Down Expand Up @@ -452,9 +434,6 @@ where
return;
}

let request_id = state.next_request_id;
state.next_request_id += 1;

let (tx, rx) = oneshot::channel();

let per_request = PerRequest {
Expand All @@ -464,7 +443,6 @@ where

let request = CollationRequest {
received: rx,
timeout: state.request_timeout,
request_id,
};

Expand All @@ -483,18 +461,16 @@ where
"Requesting collation",
);

let wire_message = protocol_v1::CollatorProtocolMessage::RequestCollation(
request_id,
relay_parent,
para_id,
);
let (full_request, response_recv) =
OutgoingRequest::new(Recipient::Peer(peer_id), req_res_v1::CollationFetchingRequest {
relay_parent,
para_id,
});
let requests = Requests::CollationFetchingRequest(full_request);

ctx.send_message(AllMessages::NetworkBridge(
NetworkBridgeMessage::SendCollationMessage(
vec![peer_id],
protocol_v1::CollationProtocol::CollatorProtocol(wire_message),
)
)).await;
NetworkBridgeMessage::SendRequests(requests))
).await;
}

/// Notify `CandidateSelectionSubsystem` that a collation has been advertised.
Expand Down Expand Up @@ -738,14 +714,19 @@ where
);
}
}
CollationFetchingRequest(_) => {
tracing::warn!(
target: LOG_TARGET,
"CollationFetchingRequest message is not expected on the validator side of the protocol",
);
}
}
}

/// The main run loop.
#[tracing::instrument(skip(ctx, metrics), fields(subsystem = LOG_TARGET))]
pub(crate) async fn run<Context>(
mut ctx: Context,
request_timeout: Duration,
metrics: Metrics,
) -> Result<()>
where
Expand All @@ -755,7 +736,6 @@ where
use OverseerSignal::*;

let mut state = State {
request_timeout,
metrics,
..Default::default()
};
Expand Down
4 changes: 0 additions & 4 deletions node/network/protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,4 @@ polkadot-node-jaeger = { path = "../../jaeger" }
parity-scale-codec = { version = "2.0.0", default-features = false, features = ["derive"] }
sc-network = { git = "https://github.com/paritytech/substrate", branch = "master" }
strum = { version = "0.20", features = ["derive"] }
thiserror = "1.0.23"
futures = "0.3.12"

[target.'cfg(not(target_os = "unknown"))'.dependencies]
zstd = "0.5.0"
13 changes: 2 additions & 11 deletions node/network/protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! Network protocol types for parachains.
#![deny(unused_crate_dependencies)]
#![warn(missing_docs)]
#![warn(missing_docs, unused_imports)]

use polkadot_primitives::v1::{Hash, BlockNumber};
use parity_scale_codec::{Encode, Decode};
Expand Down Expand Up @@ -288,10 +288,7 @@ impl View {

/// v1 protocol types.
pub mod v1 {
use polkadot_primitives::v1::{
Hash, CollatorId, Id as ParaId, ErasureChunk, CandidateReceipt,
SignedAvailabilityBitfield, PoV, CandidateHash, ValidatorIndex, CandidateIndex, AvailableData,
};
use polkadot_primitives::v1::{AvailableData, CandidateHash, CandidateIndex, CollatorId, CompressedPoV, ErasureChunk, Hash, Id as ParaId, SignedAvailabilityBitfield, ValidatorIndex};
use polkadot_node_primitives::{
SignedFullStatement,
approval::{IndirectAssignmentCert, IndirectSignedApprovalVote},
Expand Down Expand Up @@ -367,12 +364,6 @@ pub mod v1 {
/// that they are a collator with given ID.
#[codec(index = 1)]
AdvertiseCollation(Hash, ParaId),
/// Request the advertised collation at that relay-parent.
#[codec(index = 2)]
RequestCollation(RequestId, Hash, ParaId),
/// A requested collation.
#[codec(index = 3)]
Collation(RequestId, CandidateReceipt, CompressedPoV),
/// A collation sent to a validator was seconded.
#[codec(index = 4)]
CollationSeconded(SignedFullStatement),
Expand Down
14 changes: 9 additions & 5 deletions node/network/protocol/src/request_response/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub use sc_network::config::RequestResponseConfig;

/// All requests that can be sent to the network bridge.
pub mod request;
pub use request::{IncomingRequest, OutgoingRequest, Requests};
pub use request::{IncomingRequest, OutgoingRequest, Requests, Recipient};

///// Multiplexer for incoming requests.
// pub mod multiplexer;
Expand All @@ -68,6 +68,10 @@ pub enum Protocol {
/// sets.
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(3);

/// Request timeout where we can assume the connection is already open (e.g. we have peers in a
/// peer set as well.
const DEFAULT_REQUEST_TIMEOUT_CONNECTED: Duration = Duration::from_secs(1);

impl Protocol {
/// Get a configuration for a given Request response protocol.
///
Expand Down Expand Up @@ -99,11 +103,11 @@ impl Protocol {
name: p_name,
max_request_size: 1_000,
/// Collations are expected to be around 10Meg, probably much smaller with
/// compression. So 20Meg should be sufficient, we might be able to reduce this
/// compression. So 10Meg should be sufficient, we might be able to reduce this
/// further.
max_response_size: 20_000_000,
// Also just some relative conservative guess:
request_timeout: DEFAULT_REQUEST_TIMEOUT,
max_response_size: 10_000_000,
// Taken from initial implementation in collator protocol:
request_timeout: DEFAULT_REQUEST_TIMEOUT_CONNECTED,
inbound_queue: Some(tx),
},
};
Expand Down
13 changes: 10 additions & 3 deletions node/network/protocol/src/request_response/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ pub trait IsRequest {
pub enum Requests {
/// Request an availability chunk from a node.
AvailabilityFetching(OutgoingRequest<v1::AvailabilityFetchingRequest>),
/// Fetch a collation from a collator which previously announced it.
CollationFetching(OutgoingRequest<v1::CollationFetchingRequest>),
}

impl Requests {
/// Get the protocol this request conforms to.
pub fn get_protocol(&self) -> Protocol {
match self {
Self::AvailabilityFetching(_) => Protocol::AvailabilityFetching,
Self::CollationFetching(_) => Protocol::CollationFetching,
}
}

Expand All @@ -60,12 +63,13 @@ impl Requests {
pub fn encode_request(self) -> (Protocol, OutgoingRequest<Vec<u8>>) {
match self {
Self::AvailabilityFetching(r) => r.encode_request(),
Self::CollationFetching(r) => r.encode_request(),
}
}
}

/// Potential recipients of an outgoing request.
#[derive(Debug)]
#[derive(Debug, Eq, Hash, PartialEq)]
pub enum Recipient {
/// Recipient is a regular peer and we know its peer id.
Peer(PeerId),
Expand Down Expand Up @@ -99,6 +103,9 @@ pub enum RequestError {
Canceled(oneshot::Canceled),
}

/// Responses received for an `OutgoingRequest`.
pub type OutgoingResult<Res> = Result<Res, RequestError>;

impl<Req> OutgoingRequest<Req>
where
Req: IsRequest + Encode,
Expand All @@ -113,7 +120,7 @@ where
payload: Req,
) -> (
Self,
impl Future<Output = Result<Req::Response, RequestError>>,
impl Future<Output = OutgoingResult<Req::Response>>,
) {
let (tx, rx) = oneshot::channel();
let r = Self {
Expand Down Expand Up @@ -210,7 +217,7 @@ where
/// Future for actually receiving a typed response for an OutgoingRequest.
async fn receive_response<Req>(
rec: oneshot::Receiver<Result<Vec<u8>, network::RequestFailure>>,
) -> Result<Req::Response, RequestError>
) -> OutgoingResult<Req::Response>
where
Req: IsRequest,
Req::Response: Decode,
Expand Down
23 changes: 22 additions & 1 deletion node/network/protocol/src/request_response/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
use parity_scale_codec::{Decode, Encode};

use polkadot_primitives::v1::{CandidateHash, ErasureChunk, ValidatorIndex};
use polkadot_primitives::v1::{CandidateHash, CandidateReceipt, ErasureChunk, ValidatorIndex, CompressedPoV, Hash};
use polkadot_primitives::v1::Id as ParaId;

use super::request::IsRequest;
use super::Protocol;
Expand Down Expand Up @@ -78,3 +79,23 @@ impl IsRequest for AvailabilityFetchingRequest {
type Response = AvailabilityFetchingResponse;
const PROTOCOL: Protocol = Protocol::AvailabilityFetching;
}

/// Request the advertised collation at that relay-parent.
#[derive(Debug, Clone, Encode, Decode)]
pub struct CollationFetchingRequest {
relay_parent: Hash,
para_id: ParaId,
}

/// Responses as sent by collators.
#[derive(Debug, Clone, Encode, Decode)]
pub enum CollationFetchingResponse {
/// Deliver requested collation.
#[codec(index = 0)]
Collation(CandidateReceipt, CompressedPoV),
}

impl IsRequest for CollationFetchingRequest {
type Response = CollationFetchingResponse;
const PROTOCOL: Protocol = Protocol::CollationFetching;
}
39 changes: 26 additions & 13 deletions node/subsystem/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,22 +198,25 @@ pub enum CollatorProtocolMessage {
/// Get a network bridge update.
#[from]
NetworkBridgeUpdateV1(NetworkBridgeEvent<protocol_v1::CollatorProtocolMessage>),
/// Incoming network request for a collation.
CollationFetchingRequest(IncomingRequest<req_res_v1::CollationFetchingRequest>)
}

impl CollatorProtocolMessage {
// impl CollatorProtocolMessage {
/// If the current variant contains the relay parent hash, return it.
pub fn relay_parent(&self) -> Option<Hash> {
match self {
Self::CollateOn(_) => None,
Self::DistributeCollation(receipt, _, _) => Some(receipt.descriptor().relay_parent),
Self::FetchCollation(relay_parent, _, _, _) => Some(*relay_parent),
Self::ReportCollator(_) => None,
Self::NoteGoodCollation(_) => None,
Self::NetworkBridgeUpdateV1(_) => None,
Self::NotifyCollationSeconded(_, _) => None,
}
}
}
// pub fn relay_parent(&self) -> Option<Hash> {
// match self {
// Self::CollateOn(_) => None,
// Self::DistributeCollation(receipt, _, _) => Some(receipt.descriptor().relay_parent),
// Self::FetchCollation(relay_parent, _, _, _) => Some(*relay_parent),
// Self::ReportCollator(_) => None,
// Self::NoteGoodCollation(_) => None,
// Self::NetworkBridgeUpdateV1(_) => None,
// Self::CollationFetchingRequest(_) => None,
// Self::NotifyCollationSeconded(_, _) => None,
// }
// }
// }

/// Messages received by the network bridge subsystem.
#[derive(Debug)]
Expand Down Expand Up @@ -750,3 +753,13 @@ impl From<IncomingRequest<req_res_v1::AvailabilityFetchingRequest>> for AllMessa
From::<AvailabilityDistributionMessage>::from(From::from(req))
}
}
impl From<IncomingRequest<req_res_v1::CollationFetchingRequest>> for AllMessages {
fn from(req: IncomingRequest<req_res_v1::CollationFetchingRequest>) -> Self {
From::<CollatorProtocolMessage>::from(From::from(req))
}
}
impl From<IncomingRequest<req_res_v1::CollationFetchingRequest>> for CollatorProtocolMessage {
fn from(req: IncomingRequest<req_res_v1::CollationFetchingRequest>) -> Self {
Self::CollationFetchingRequest(req)
}
}

0 comments on commit 50bcf94

Please sign in to comment.