Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Remove request multiplexer (#3624)
Browse files Browse the repository at this point in the history
* WIP: Get rid of request multiplexer.

* WIP

* Receiver for handling of incoming requests.

* Get rid of useless `Fault` abstraction.

The things the type system let us do are not worth getting abstracted in
its own type. Instead error handling is going to be merely a pattern.

* Make most things compile again.

* Port availability distribution away from request multiplexer.

* Formatting.

* Port dispute distribution over.

* Fixup statement distribution.

* Handle request directly in collator protocol.

+ Only allow fatal errors at top level.

* Use direct request channel for availability recovery.

* Finally get rid of request multiplexer

Fixes #2842 and paves the way for more back pressure possibilities.

* Fix overseer and statement distribution tests.

* Fix collator protocol and network bridge tests.

* Fix tests in availability recovery.

* Fix availability distribution tests.

* Fix dispute distribution tests.

* Add missing dependency

* Typos.

* Review remarks.

* More remarks.
  • Loading branch information
eskimor authored Aug 12, 2021
1 parent 686e318 commit 117466a
Show file tree
Hide file tree
Showing 51 changed files with 1,493 additions and 1,730 deletions.
33 changes: 25 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions node/network/availability-distribution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "master",
sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" }
thiserror = "1.0.26"
rand = "0.8.3"
derive_more = "0.99.11"
lru = "0.6.6"

[dev-dependencies]
Expand Down
42 changes: 23 additions & 19 deletions node/network/availability-distribution/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,31 @@

//! Error handling related code and Error/Result definitions.

use polkadot_node_network_protocol::request_response::request::RequestError;
use polkadot_node_network_protocol::request_response::outgoing::RequestError;
use thiserror::Error;

use futures::channel::oneshot;

use polkadot_node_subsystem_util::{runtime, unwrap_non_fatal, Fault};
use polkadot_node_subsystem_util::runtime;
use polkadot_subsystem::SubsystemError;

use crate::LOG_TARGET;

#[derive(Debug, Error)]
#[derive(Debug, Error, derive_more::From)]
#[error(transparent)]
pub struct Error(pub Fault<NonFatal, Fatal>);

impl From<NonFatal> for Error {
fn from(e: NonFatal) -> Self {
Self(Fault::from_non_fatal(e))
}
}

impl From<Fatal> for Error {
fn from(f: Fatal) -> Self {
Self(Fault::from_fatal(f))
}
pub enum Error {
/// All fatal errors.
Fatal(Fatal),
/// All nonfatal/potentially recoverable errors.
NonFatal(NonFatal),
}

impl From<runtime::Error> for Error {
fn from(o: runtime::Error) -> Self {
Self(Fault::from_other(o))
match o {
runtime::Error::Fatal(f) => Self::Fatal(Fatal::Runtime(f)),
runtime::Error::NonFatal(f) => Self::NonFatal(NonFatal::Runtime(f)),
}
}
}

Expand Down Expand Up @@ -107,15 +103,23 @@ pub enum NonFatal {
Runtime(#[from] runtime::NonFatal),
}

/// General result type for fatal/nonfatal errors.
pub type Result<T> = std::result::Result<T, Error>;

/// Results which are never fatal.
pub type NonFatalResult<T> = std::result::Result<T, NonFatal>;

/// Utility for eating top level errors and log them.
///
/// We basically always want to try and continue on error. This utility function is meant to
/// consume top-level errors by simply logging them
pub fn log_error(result: Result<()>, ctx: &'static str) -> std::result::Result<(), Fatal> {
if let Some(error) = unwrap_non_fatal(result.map_err(|e| e.0))? {
tracing::warn!(target: LOG_TARGET, error = ?error, ctx);
match result {
Err(Error::Fatal(f)) => Err(f),
Err(Error::NonFatal(error)) => {
tracing::warn!(target: LOG_TARGET, error = ?error, ctx);
Ok(())
},
Ok(()) => Ok(()),
}
Ok(())
}
54 changes: 41 additions & 13 deletions node/network/availability-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use futures::{future::Either, FutureExt, StreamExt, TryFutureExt};

use sp_keystore::SyncCryptoStorePtr;

use polkadot_node_network_protocol::request_response::{v1, IncomingRequestReceiver};
use polkadot_subsystem::{
messages::AvailabilityDistributionMessage, overseer, FromOverseer, OverseerSignal,
SpawnedSubsystem, SubsystemContext, SubsystemError,
Expand All @@ -38,7 +39,7 @@ mod pov_requester;

/// Responding to erasure chunk requests:
mod responder;
use responder::{answer_chunk_request_log, answer_pov_request_log};
use responder::{run_chunk_receiver, run_pov_receiver};

mod metrics;
/// Prometheus `Metrics` for availability distribution.
Expand All @@ -53,10 +54,20 @@ const LOG_TARGET: &'static str = "parachain::availability-distribution";
pub struct AvailabilityDistributionSubsystem {
/// Easy and efficient runtime access for this subsystem.
runtime: RuntimeInfo,
/// Receivers to receive messages from.
recvs: IncomingRequestReceivers,
/// Prometheus metrics.
metrics: Metrics,
}

/// Receivers to be passed into availability distribution.
pub struct IncomingRequestReceivers {
/// Receiver for incoming PoV requests.
pub pov_req_receiver: IncomingRequestReceiver<v1::PoVFetchingRequest>,
/// Receiver for incoming availability chunk requests.
pub chunk_req_receiver: IncomingRequestReceiver<v1::ChunkFetchingRequest>,
}

impl<Context> overseer::Subsystem<Context, SubsystemError> for AvailabilityDistributionSubsystem
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
Expand All @@ -74,18 +85,41 @@ where

impl AvailabilityDistributionSubsystem {
/// Create a new instance of the availability distribution.
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
pub fn new(
keystore: SyncCryptoStorePtr,
recvs: IncomingRequestReceivers,
metrics: Metrics,
) -> Self {
let runtime = RuntimeInfo::new(Some(keystore));
Self { runtime, metrics }
Self { runtime, recvs, metrics }
}

/// Start processing work as passed on from the Overseer.
async fn run<Context>(mut self, mut ctx: Context) -> std::result::Result<(), Fatal>
async fn run<Context>(self, mut ctx: Context) -> std::result::Result<(), Fatal>
where
Context: SubsystemContext<Message = AvailabilityDistributionMessage>,
Context: overseer::SubsystemContext<Message = AvailabilityDistributionMessage>,
{
let mut requester = Requester::new(self.metrics.clone()).fuse();
let Self { mut runtime, recvs, metrics } = self;

let IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver } = recvs;
let mut requester = Requester::new(metrics.clone()).fuse();

{
let sender = ctx.sender().clone();
ctx.spawn(
"pov-receiver",
run_pov_receiver(sender.clone(), pov_req_receiver, metrics.clone()).boxed(),
)
.map_err(Fatal::SpawnTask)?;

ctx.spawn(
"chunk-receiver",
run_chunk_receiver(sender, chunk_req_receiver, metrics.clone()).boxed(),
)
.map_err(Fatal::SpawnTask)?;
}

loop {
let action = {
let mut subsystem_next = ctx.recv().fuse();
Expand All @@ -110,19 +144,13 @@ impl AvailabilityDistributionSubsystem {
log_error(
requester
.get_mut()
.update_fetching_heads(&mut ctx, &mut self.runtime, update)
.update_fetching_heads(&mut ctx, &mut runtime, update)
.await,
"Error in Requester::update_fetching_heads",
)?;
},
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::ChunkFetchingRequest(req),
} => answer_chunk_request_log(&mut ctx, req, &self.metrics).await,
FromOverseer::Communication {
msg: AvailabilityDistributionMessage::PoVFetchingRequest(req),
} => answer_pov_request_log(&mut ctx, req, &self.metrics).await,
FromOverseer::Communication {
msg:
AvailabilityDistributionMessage::FetchPoV {
Expand All @@ -136,7 +164,7 @@ impl AvailabilityDistributionSubsystem {
log_error(
pov_requester::fetch_pov(
&mut ctx,
&mut self.runtime,
&mut runtime,
relay_parent,
from_validator,
candidate_hash,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
use futures::{channel::oneshot, future::BoxFuture, FutureExt};

use polkadot_node_network_protocol::request_response::{
request::{RequestError, Requests},
outgoing::{RequestError, Requests},
v1::{PoVFetchingRequest, PoVFetchingResponse},
OutgoingRequest, Recipient,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures::{

use polkadot_erasure_coding::branch_hash;
use polkadot_node_network_protocol::request_response::{
request::{OutgoingRequest, Recipient, RequestError, Requests},
outgoing::{OutgoingRequest, Recipient, RequestError, Requests},
v1::{ChunkFetchingRequest, ChunkFetchingResponse},
};
use polkadot_node_primitives::ErasureChunk;
Expand Down
Loading

0 comments on commit 117466a

Please sign in to comment.