From 092d6a3efeac9a4d9d92b9aeddb661c95dd3d781 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Wed, 1 Nov 2023 21:20:31 +0100 Subject: [PATCH 01/13] Reorganize candidate processing --- .../node/core/candidate-validation/src/lib.rs | 185 +++++++++--------- 1 file changed, 90 insertions(+), 95 deletions(-) diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 93db7d11cee8..70dd9382f10a 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -55,7 +55,7 @@ use polkadot_primitives::{ use parity_scale_codec::Encode; -use futures::{channel::oneshot, prelude::*}; +use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered}; use std::{ path::PathBuf, @@ -153,103 +153,98 @@ async fn run( .await; ctx.spawn_blocking("pvf-validation-host", task.boxed())?; + let mut tasks = FuturesUnordered::new(); loop { - match ctx.recv().await? { - FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_)) => {}, - FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}, - FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()), - FromOrchestra::Communication { msg } => match msg { - CandidateValidationMessage::ValidateFromChainState( - candidate_receipt, - pov, - executor_params, - timeout, - response_sender, - ) => { - let bg = { - let mut sender = ctx.sender().clone(); - let metrics = metrics.clone(); - let validation_host = validation_host.clone(); - - async move { - let _timer = metrics.time_validate_from_chain_state(); - let res = validate_from_chain_state( - &mut sender, - validation_host, - candidate_receipt, - pov, - executor_params, - timeout, - &metrics, - ) - .await; - - metrics.on_validation_event(&res); - let _ = response_sender.send(res); + futures::select! { + comm = ctx.recv().fuse() => { + match comm { + Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_))) => {}, + Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {}, + Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()), + Ok(FromOrchestra::Communication { msg }) => match msg { + CandidateValidationMessage::ValidateFromChainState( + candidate_receipt, + pov, + executor_params, + timeout, + response_sender, + ) => { + let mut sender = ctx.sender().clone(); + let metrics = metrics.clone(); + let validation_host = validation_host.clone(); + + tasks.push(async move { + let _timer = metrics.time_validate_from_chain_state(); + let res = validate_from_chain_state( + &mut sender, + validation_host, + candidate_receipt, + pov, + executor_params, + timeout, + &metrics, + ) + .await; + + metrics.on_validation_event(&res); + let _ = response_sender.send(res); + }.boxed()); + }, + CandidateValidationMessage::ValidateFromExhaustive( + persisted_validation_data, + validation_code, + candidate_receipt, + pov, + executor_params, + timeout, + response_sender, + ) => { + let metrics = metrics.clone(); + let validation_host = validation_host.clone(); + + tasks.push(async move { + let _timer = metrics.time_validate_from_exhaustive(); + let res = validate_candidate_exhaustive( + validation_host, + persisted_validation_data, + validation_code, + candidate_receipt, + pov, + executor_params, + timeout, + &metrics, + ) + .await; + + metrics.on_validation_event(&res); + let _ = response_sender.send(res); + }.boxed()); + }, + CandidateValidationMessage::PreCheck( + relay_parent, + validation_code_hash, + response_sender, + ) => { + let mut sender = ctx.sender().clone(); + let validation_host = validation_host.clone(); + + tasks.push(async move { + let precheck_result = precheck_pvf( + &mut sender, + validation_host, + relay_parent, + validation_code_hash, + ) + .await; + + let _ = response_sender.send(precheck_result); + }.boxed()); } - }; - - ctx.spawn("validate-from-chain-state", bg.boxed())?; - }, - CandidateValidationMessage::ValidateFromExhaustive( - persisted_validation_data, - validation_code, - candidate_receipt, - pov, - executor_params, - timeout, - response_sender, - ) => { - let bg = { - let metrics = metrics.clone(); - let validation_host = validation_host.clone(); - - async move { - let _timer = metrics.time_validate_from_exhaustive(); - let res = validate_candidate_exhaustive( - validation_host, - persisted_validation_data, - validation_code, - candidate_receipt, - pov, - executor_params, - timeout, - &metrics, - ) - .await; - - metrics.on_validation_event(&res); - let _ = response_sender.send(res); - } - }; - - ctx.spawn("validate-from-exhaustive", bg.boxed())?; - }, - CandidateValidationMessage::PreCheck( - relay_parent, - validation_code_hash, - response_sender, - ) => { - let bg = { - let mut sender = ctx.sender().clone(); - let validation_host = validation_host.clone(); - - async move { - let precheck_result = precheck_pvf( - &mut sender, - validation_host, - relay_parent, - validation_code_hash, - ) - .await; - - let _ = response_sender.send(precheck_result); - } - }; - - ctx.spawn("candidate-validation-pre-check", bg.boxed())?; - }, + }, + Err(e) => return Err(SubsystemError::from(e)) + } }, + _ = tasks.select_next_some() => () } } } From 7f1698c4cd57546d74e3694ecf0e00e3ead3d25d Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Sat, 4 Nov 2023 12:24:43 +0100 Subject: [PATCH 02/13] Re-implement using streams --- .../node/core/candidate-validation/src/lib.rs | 196 +++++++++--------- 1 file changed, 103 insertions(+), 93 deletions(-) diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 70dd9382f10a..658b3ee41f61 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -55,11 +55,12 @@ use polkadot_primitives::{ use parity_scale_codec::Encode; -use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered}; +use futures::{channel::oneshot, prelude::*, StreamExt}; use std::{ path::PathBuf, sync::Arc, + task::Poll, time::{Duration, Instant}, }; @@ -152,100 +153,109 @@ async fn run( ) .await; ctx.spawn_blocking("pvf-validation-host", task.boxed())?; - - let mut tasks = FuturesUnordered::new(); - loop { - futures::select! { - comm = ctx.recv().fuse() => { - match comm { - Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_))) => {}, - Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {}, - Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()), - Ok(FromOrchestra::Communication { msg }) => match msg { - CandidateValidationMessage::ValidateFromChainState( - candidate_receipt, - pov, - executor_params, - timeout, - response_sender, - ) => { - let mut sender = ctx.sender().clone(); - let metrics = metrics.clone(); - let validation_host = validation_host.clone(); - - tasks.push(async move { - let _timer = metrics.time_validate_from_chain_state(); - let res = validate_from_chain_state( - &mut sender, - validation_host, - candidate_receipt, - pov, - executor_params, - timeout, - &metrics, - ) - .await; - - metrics.on_validation_event(&res); - let _ = response_sender.send(res); - }.boxed()); - }, - CandidateValidationMessage::ValidateFromExhaustive( - persisted_validation_data, - validation_code, - candidate_receipt, - pov, - executor_params, - timeout, - response_sender, - ) => { - let metrics = metrics.clone(); - let validation_host = validation_host.clone(); - - tasks.push(async move { - let _timer = metrics.time_validate_from_exhaustive(); - let res = validate_candidate_exhaustive( - validation_host, - persisted_validation_data, - validation_code, - candidate_receipt, - pov, - executor_params, - timeout, - &metrics, - ) - .await; - - metrics.on_validation_event(&res); - let _ = response_sender.send(res); - }.boxed()); - }, - CandidateValidationMessage::PreCheck( - relay_parent, - validation_code_hash, - response_sender, - ) => { - let mut sender = ctx.sender().clone(); - let validation_host = validation_host.clone(); - - tasks.push(async move { - let precheck_result = precheck_pvf( - &mut sender, - validation_host, - relay_parent, - validation_code_hash, - ) - .await; - - let _ = response_sender.send(precheck_result); - }.boxed()); - } - }, - Err(e) => return Err(SubsystemError::from(e)) - } + let mut res = Ok(()); + let sender = ctx.sender().to_owned(); + + let read_stream = stream::poll_fn(|c| loop { + match ctx.recv().poll_unpin(c) { + Poll::Ready(Ok(FromOrchestra::Signal(OverseerSignal::Conclude))) => + return Poll::Ready(None), + Poll::Pending => return Poll::Pending, + Poll::Ready(Ok(FromOrchestra::Signal(_))) => continue, + Poll::Ready(Ok(FromOrchestra::Communication { msg })) => return Poll::Ready(Some(msg)), + Poll::Ready(Err(e)) => { + res = Err(e); + return Poll::Ready(None) }, - _ = tasks.select_next_some() => () } + }); + + read_stream + // NB: Cloning `sender` inside `async` block of `for_each_concurrent` renders the whole + // thing not `Send`, so we `zip` the message stream with the stream of `sender` clones here. + .zip(stream::repeat(sender)) + // FIXME: The backlog size here is the same as in PVF host queues. It should either use a + // common constant, or another appropriate value should be chosen + .for_each_concurrent(30, |message_and_sender| { + handle_candidate_validation_message( + message_and_sender.1, + validation_host.clone(), + metrics.clone(), + message_and_sender.0, + ) + }) + .await; + + res +} + +async fn handle_candidate_validation_message( + mut sender: Sender, + validation_host: ValidationHost, + metrics: Metrics, + msg: CandidateValidationMessage, +) where + Sender: SubsystemSender, +{ + match msg { + CandidateValidationMessage::ValidateFromChainState( + candidate_receipt, + pov, + executor_params, + timeout, + response_sender, + ) => { + let _timer = metrics.time_validate_from_chain_state(); + let res = validate_from_chain_state( + &mut sender, + validation_host, + candidate_receipt, + pov, + executor_params, + timeout, + &metrics, + ) + .await; + + metrics.on_validation_event(&res); + let _ = response_sender.send(res); + }, + CandidateValidationMessage::ValidateFromExhaustive( + persisted_validation_data, + validation_code, + candidate_receipt, + pov, + executor_params, + timeout, + response_sender, + ) => { + let _timer = metrics.time_validate_from_exhaustive(); + let res = validate_candidate_exhaustive( + validation_host, + persisted_validation_data, + validation_code, + candidate_receipt, + pov, + executor_params, + timeout, + &metrics, + ) + .await; + + metrics.on_validation_event(&res); + let _ = response_sender.send(res); + }, + CandidateValidationMessage::PreCheck( + relay_parent, + validation_code_hash, + response_sender, + ) => { + let precheck_result = + precheck_pvf(&mut sender, validation_host, relay_parent, validation_code_hash) + .await; + + let _ = response_sender.send(precheck_result); + }, } } From 6e0f46102cccaa86fe562252bdfee4ee6cb4cc22 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Tue, 7 Nov 2023 13:09:59 +0100 Subject: [PATCH 03/13] Revert "Re-implement using streams" This reverts commit 7f1698c4cd57546d74e3694ecf0e00e3ead3d25d. --- .../node/core/candidate-validation/src/lib.rs | 196 +++++++++--------- 1 file changed, 93 insertions(+), 103 deletions(-) diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 658b3ee41f61..70dd9382f10a 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -55,12 +55,11 @@ use polkadot_primitives::{ use parity_scale_codec::Encode; -use futures::{channel::oneshot, prelude::*, StreamExt}; +use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered}; use std::{ path::PathBuf, sync::Arc, - task::Poll, time::{Duration, Instant}, }; @@ -153,109 +152,100 @@ async fn run( ) .await; ctx.spawn_blocking("pvf-validation-host", task.boxed())?; - let mut res = Ok(()); - let sender = ctx.sender().to_owned(); - - let read_stream = stream::poll_fn(|c| loop { - match ctx.recv().poll_unpin(c) { - Poll::Ready(Ok(FromOrchestra::Signal(OverseerSignal::Conclude))) => - return Poll::Ready(None), - Poll::Pending => return Poll::Pending, - Poll::Ready(Ok(FromOrchestra::Signal(_))) => continue, - Poll::Ready(Ok(FromOrchestra::Communication { msg })) => return Poll::Ready(Some(msg)), - Poll::Ready(Err(e)) => { - res = Err(e); - return Poll::Ready(None) + + let mut tasks = FuturesUnordered::new(); + loop { + futures::select! { + comm = ctx.recv().fuse() => { + match comm { + Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_))) => {}, + Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {}, + Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()), + Ok(FromOrchestra::Communication { msg }) => match msg { + CandidateValidationMessage::ValidateFromChainState( + candidate_receipt, + pov, + executor_params, + timeout, + response_sender, + ) => { + let mut sender = ctx.sender().clone(); + let metrics = metrics.clone(); + let validation_host = validation_host.clone(); + + tasks.push(async move { + let _timer = metrics.time_validate_from_chain_state(); + let res = validate_from_chain_state( + &mut sender, + validation_host, + candidate_receipt, + pov, + executor_params, + timeout, + &metrics, + ) + .await; + + metrics.on_validation_event(&res); + let _ = response_sender.send(res); + }.boxed()); + }, + CandidateValidationMessage::ValidateFromExhaustive( + persisted_validation_data, + validation_code, + candidate_receipt, + pov, + executor_params, + timeout, + response_sender, + ) => { + let metrics = metrics.clone(); + let validation_host = validation_host.clone(); + + tasks.push(async move { + let _timer = metrics.time_validate_from_exhaustive(); + let res = validate_candidate_exhaustive( + validation_host, + persisted_validation_data, + validation_code, + candidate_receipt, + pov, + executor_params, + timeout, + &metrics, + ) + .await; + + metrics.on_validation_event(&res); + let _ = response_sender.send(res); + }.boxed()); + }, + CandidateValidationMessage::PreCheck( + relay_parent, + validation_code_hash, + response_sender, + ) => { + let mut sender = ctx.sender().clone(); + let validation_host = validation_host.clone(); + + tasks.push(async move { + let precheck_result = precheck_pvf( + &mut sender, + validation_host, + relay_parent, + validation_code_hash, + ) + .await; + + let _ = response_sender.send(precheck_result); + }.boxed()); + } + }, + Err(e) => return Err(SubsystemError::from(e)) + } }, + _ = tasks.select_next_some() => () } - }); - - read_stream - // NB: Cloning `sender` inside `async` block of `for_each_concurrent` renders the whole - // thing not `Send`, so we `zip` the message stream with the stream of `sender` clones here. - .zip(stream::repeat(sender)) - // FIXME: The backlog size here is the same as in PVF host queues. It should either use a - // common constant, or another appropriate value should be chosen - .for_each_concurrent(30, |message_and_sender| { - handle_candidate_validation_message( - message_and_sender.1, - validation_host.clone(), - metrics.clone(), - message_and_sender.0, - ) - }) - .await; - - res -} - -async fn handle_candidate_validation_message( - mut sender: Sender, - validation_host: ValidationHost, - metrics: Metrics, - msg: CandidateValidationMessage, -) where - Sender: SubsystemSender, -{ - match msg { - CandidateValidationMessage::ValidateFromChainState( - candidate_receipt, - pov, - executor_params, - timeout, - response_sender, - ) => { - let _timer = metrics.time_validate_from_chain_state(); - let res = validate_from_chain_state( - &mut sender, - validation_host, - candidate_receipt, - pov, - executor_params, - timeout, - &metrics, - ) - .await; - - metrics.on_validation_event(&res); - let _ = response_sender.send(res); - }, - CandidateValidationMessage::ValidateFromExhaustive( - persisted_validation_data, - validation_code, - candidate_receipt, - pov, - executor_params, - timeout, - response_sender, - ) => { - let _timer = metrics.time_validate_from_exhaustive(); - let res = validate_candidate_exhaustive( - validation_host, - persisted_validation_data, - validation_code, - candidate_receipt, - pov, - executor_params, - timeout, - &metrics, - ) - .await; - - metrics.on_validation_event(&res); - let _ = response_sender.send(res); - }, - CandidateValidationMessage::PreCheck( - relay_parent, - validation_code_hash, - response_sender, - ) => { - let precheck_result = - precheck_pvf(&mut sender, validation_host, relay_parent, validation_code_hash) - .await; - - let _ = response_sender.send(precheck_result); - }, } } From 83a0fe43030da31530e6425aa16819c301816547 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Tue, 7 Nov 2023 14:22:31 +0100 Subject: [PATCH 04/13] Add a semaphore lock --- Cargo.lock | 14 ++++++++++++-- polkadot/node/core/candidate-validation/Cargo.toml | 1 + polkadot/node/core/candidate-validation/src/lib.rs | 12 ++++++++++++ polkadot/node/core/pvf/src/host.rs | 5 ++++- polkadot/node/core/pvf/src/lib.rs | 2 +- 5 files changed, 30 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b4501b5d7512..ea5c61582800 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1247,6 +1247,15 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "async-semaphore" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "538c756e85eb6ffdefaec153804afb6da84b033e2e5ec3e9d459c34b4bf4d3f6" +dependencies = [ + "event-listener", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -1469,8 +1478,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "93f2635620bf0b9d4576eb7bb9a38a55df78bd1205d26fa994b25911a69f212f" dependencies = [ "bitcoin_hashes", - "rand 0.7.3", - "rand_core 0.5.1", + "rand 0.8.5", + "rand_core 0.6.4", "serde", "unicode-normalization", ] @@ -12167,6 +12176,7 @@ name = "polkadot-node-core-candidate-validation" version = "1.0.0" dependencies = [ "assert_matches", + "async-semaphore", "async-trait", "futures", "futures-timer", diff --git a/polkadot/node/core/candidate-validation/Cargo.toml b/polkadot/node/core/candidate-validation/Cargo.toml index a2e88778532f..dd746471f5a3 100644 --- a/polkadot/node/core/candidate-validation/Cargo.toml +++ b/polkadot/node/core/candidate-validation/Cargo.toml @@ -8,6 +8,7 @@ license.workspace = true [dependencies] async-trait = "0.1.57" +async-semaphore = "1.2.0" futures = "0.3.21" futures-timer = "3.0.2" gum = { package = "tracing-gum", path = "../../gum" } diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 70dd9382f10a..0cc3c935437c 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -23,6 +23,7 @@ #![deny(unused_crate_dependencies, unused_results)] #![warn(missing_docs)] +use async_semaphore::Semaphore; use polkadot_node_core_pvf::{ InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareJobKind, PrepareStats, PvfPrepData, ValidationError, ValidationHost, @@ -154,6 +155,11 @@ async fn run( ctx.spawn_blocking("pvf-validation-host", task.boxed())?; let mut tasks = FuturesUnordered::new(); + // The task queue size is chosen to be somewhat bigger than the PVF host incoming queue size + // to allow exhaustive validation messages to fall through in case the tasks are clogged with + // `ValidateFromChainState` messages awaiting data from the runtime + let semaphore = Arc::new(Semaphore::new(polkadot_node_core_pvf::HOST_MESSAGE_QUEUE_SIZE * 2)); + loop { futures::select! { comm = ctx.recv().fuse() => { @@ -173,7 +179,9 @@ async fn run( let metrics = metrics.clone(); let validation_host = validation_host.clone(); + let guard = semaphore.acquire_arc().await; tasks.push(async move { + let _guard = guard; let _timer = metrics.time_validate_from_chain_state(); let res = validate_from_chain_state( &mut sender, @@ -202,7 +210,9 @@ async fn run( let metrics = metrics.clone(); let validation_host = validation_host.clone(); + let guard = semaphore.acquire_arc().await; tasks.push(async move { + let _guard = guard; let _timer = metrics.time_validate_from_exhaustive(); let res = validate_candidate_exhaustive( validation_host, @@ -228,7 +238,9 @@ async fn run( let mut sender = ctx.sender().clone(); let validation_host = validation_host.clone(); + let guard = semaphore.acquire_arc().await; tasks.push(async move { + let _guard = guard; let precheck_result = precheck_pvf( &mut sender, validation_host, diff --git a/polkadot/node/core/pvf/src/host.rs b/polkadot/node/core/pvf/src/host.rs index dd0bd8581985..fb1323164d2f 100644 --- a/polkadot/node/core/pvf/src/host.rs +++ b/polkadot/node/core/pvf/src/host.rs @@ -59,6 +59,9 @@ pub const PREPARE_BINARY_NAME: &str = "polkadot-prepare-worker"; /// The name of binary spawned to execute a PVF pub const EXECUTE_BINARY_NAME: &str = "polkadot-execute-worker"; +/// The size of incoming message queue +pub const HOST_MESSAGE_QUEUE_SIZE: usize = 10; + /// An alias to not spell the type for the oneshot sender for the PVF execution result. pub(crate) type ResultSender = oneshot::Sender>; @@ -224,7 +227,7 @@ pub async fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Fu } }; - let (to_host_tx, to_host_rx) = mpsc::channel(10); + let (to_host_tx, to_host_rx) = mpsc::channel(HOST_MESSAGE_QUEUE_SIZE); let validation_host = ValidationHost { to_host_tx }; diff --git a/polkadot/node/core/pvf/src/lib.rs b/polkadot/node/core/pvf/src/lib.rs index 102a91dbdad7..b73628a789a9 100644 --- a/polkadot/node/core/pvf/src/lib.rs +++ b/polkadot/node/core/pvf/src/lib.rs @@ -104,7 +104,7 @@ mod worker_intf; pub mod testing; pub use error::{InvalidCandidate, ValidationError}; -pub use host::{start, Config, ValidationHost, EXECUTE_BINARY_NAME, PREPARE_BINARY_NAME}; +pub use host::{start, Config, ValidationHost, EXECUTE_BINARY_NAME, PREPARE_BINARY_NAME, HOST_MESSAGE_QUEUE_SIZE}; pub use metrics::Metrics; pub use priority::Priority; pub use worker_intf::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR}; From 0663ba01ee4045db5ba01eeba11ac3f44e048574 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Thu, 9 Nov 2023 11:01:35 +0100 Subject: [PATCH 05/13] `cargo fmt` --- polkadot/node/core/pvf/src/lib.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/polkadot/node/core/pvf/src/lib.rs b/polkadot/node/core/pvf/src/lib.rs index b73628a789a9..80a432b04dc8 100644 --- a/polkadot/node/core/pvf/src/lib.rs +++ b/polkadot/node/core/pvf/src/lib.rs @@ -104,7 +104,10 @@ mod worker_intf; pub mod testing; pub use error::{InvalidCandidate, ValidationError}; -pub use host::{start, Config, ValidationHost, EXECUTE_BINARY_NAME, PREPARE_BINARY_NAME, HOST_MESSAGE_QUEUE_SIZE}; +pub use host::{ + start, Config, ValidationHost, EXECUTE_BINARY_NAME, HOST_MESSAGE_QUEUE_SIZE, + PREPARE_BINARY_NAME, +}; pub use metrics::Metrics; pub use priority::Priority; pub use worker_intf::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR}; From 52b5c5e920fa54c08c469c8caabb18d506cf4392 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Sat, 11 Nov 2023 12:47:15 +0100 Subject: [PATCH 06/13] Handle messages in a separate fn --- .../node/core/candidate-validation/src/lib.rs | 189 +++++++++--------- 1 file changed, 99 insertions(+), 90 deletions(-) diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index ff323322bbe3..54b9a4eee913 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -61,7 +61,7 @@ use futures::{channel::oneshot, prelude::*, stream::FuturesUnordered}; use std::{ path::PathBuf, sync::Arc, - time::{Duration, Instant}, + time::{Duration, Instant}, pin::Pin, }; use async_trait::async_trait; @@ -135,6 +135,100 @@ impl CandidateValidationSubsystem { } } +fn handle_validation_message(mut sender: S, validation_host: ValidationHost, metrics: Metrics, msg: CandidateValidationMessage) -> Pin + Send>> +where S: SubsystemSender +{ + match msg { + CandidateValidationMessage::ValidateFromChainState { + candidate_receipt, + pov, + executor_params, + exec_timeout_kind, + response_sender, + .. + } => { + // let mut sender = ctx.sender().clone(); + // let metrics = metrics.clone(); + // let validation_host = validation_host.clone(); + + // let guard = semaphore.acquire_arc().await; + async move { + // let _guard = guard; + let _timer = metrics.time_validate_from_chain_state(); + let res = validate_from_chain_state( + &mut sender, + validation_host, + candidate_receipt, + pov, + executor_params, + exec_timeout_kind, + &metrics, + ) + .await; + + metrics.on_validation_event(&res); + let _ = response_sender.send(res); + }.boxed() + }, + CandidateValidationMessage::ValidateFromExhaustive { + validation_data, + validation_code, + candidate_receipt, + pov, + executor_params, + exec_timeout_kind, + response_sender, + .. + } => { + // let metrics = metrics.clone(); + // let validation_host = validation_host.clone(); + + // let guard = semaphore.acquire_arc().await; + async move { + // let _guard = guard; + let _timer = metrics.time_validate_from_exhaustive(); + let res = validate_candidate_exhaustive( + validation_host, + validation_data, + validation_code, + candidate_receipt, + pov, + executor_params, + exec_timeout_kind, + &metrics, + ) + .await; + + metrics.on_validation_event(&res); + let _ = response_sender.send(res); + }.boxed() + }, + CandidateValidationMessage::PreCheck { + relay_parent, + validation_code_hash, + response_sender, + .. + } => { + // let mut sender = ctx.sender().clone(); + // let validation_host = validation_host.clone(); + + // let guard = semaphore.acquire_arc().await; + async move { + // let _guard = guard; + let precheck_result = precheck_pvf( + &mut sender, + validation_host, + relay_parent, + validation_code_hash, + ) + .await; + + let _ = response_sender.send(precheck_result); + }.boxed() + } + } +} + #[overseer::contextbounds(CandidateValidation, prefix = self::overseer)] async fn run( mut ctx: Context, @@ -167,95 +261,10 @@ async fn run( Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_))) => {}, Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {}, Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()), - Ok(FromOrchestra::Communication { msg }) => match msg { - CandidateValidationMessage::ValidateFromChainState { - candidate_receipt, - pov, - executor_params, - exec_timeout_kind, - response_sender, - .. - } => { - let mut sender = ctx.sender().clone(); - let metrics = metrics.clone(); - let validation_host = validation_host.clone(); - - let guard = semaphore.acquire_arc().await; - tasks.push(async move { - let _guard = guard; - let _timer = metrics.time_validate_from_chain_state(); - let res = validate_from_chain_state( - &mut sender, - validation_host, - candidate_receipt, - pov, - executor_params, - exec_timeout_kind, - &metrics, - ) - .await; - - metrics.on_validation_event(&res); - let _ = response_sender.send(res); - }.boxed()); - }, - CandidateValidationMessage::ValidateFromExhaustive { - validation_data, - validation_code, - candidate_receipt, - pov, - executor_params, - exec_timeout_kind, - response_sender, - .. - } => { - let metrics = metrics.clone(); - let validation_host = validation_host.clone(); - - let guard = semaphore.acquire_arc().await; - tasks.push(async move { - let _guard = guard; - let _timer = metrics.time_validate_from_exhaustive(); - let res = validate_candidate_exhaustive( - validation_host, - validation_data, - validation_code, - candidate_receipt, - pov, - executor_params, - exec_timeout_kind, - &metrics, - ) - .await; - - metrics.on_validation_event(&res); - let _ = response_sender.send(res); - }.boxed()); - }, - CandidateValidationMessage::PreCheck { - relay_parent, - validation_code_hash, - response_sender, - .. - } => { - let mut sender = ctx.sender().clone(); - let validation_host = validation_host.clone(); - - let guard = semaphore.acquire_arc().await; - tasks.push(async move { - let _guard = guard; - let precheck_result = precheck_pvf( - &mut sender, - validation_host, - relay_parent, - validation_code_hash, - ) - .await; - - let _ = response_sender.send(precheck_result); - }.boxed()); - } - }, + Ok(FromOrchestra::Communication { msg }) => { + let task = handle_validation_message(ctx.sender().clone(), validation_host.clone(), metrics.clone(), msg); + tasks.push(task); + } Err(e) => return Err(SubsystemError::from(e)) } }, From 77aec82b9505907075c1689a7e51600d2f7ca4b2 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Mon, 1 Jan 2024 19:39:24 +0100 Subject: [PATCH 07/13] Use overseer's new `recv_signal()` --- Cargo.lock | 31 +++- .../node/core/candidate-validation/src/lib.rs | 159 +++++++++--------- polkadot/node/malus/src/interceptor.rs | 4 + polkadot/node/overseer/Cargo.toml | 2 +- polkadot/node/subsystem-bench/Cargo.toml | 2 +- .../node/subsystem-test-helpers/src/lib.rs | 5 + polkadot/node/subsystem-types/Cargo.toml | 2 +- 7 files changed, 114 insertions(+), 91 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 76c71f081793..25570b74994b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4114,7 +4114,7 @@ dependencies = [ "polkadot-primitives", "polkadot-service", "polkadot-test-client", - "prioritized-metered-channel", + "prioritized-metered-channel 0.5.1", "sc-cli", "sc-client-api", "sc-sysinfo", @@ -8972,8 +8972,7 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" [[package]] name = "orchestra" version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46d78e1deb2a8d54fc1f063a544130db4da31dfe4d5d3b493186424910222a76" +source = "git+https://github.com/paritytech/orchestra?branch=master#1912785d36f36ee2894082331122bef2a2360314" dependencies = [ "async-trait", "dyn-clonable", @@ -8981,7 +8980,7 @@ dependencies = [ "futures-timer", "orchestra-proc-macro", "pin-project", - "prioritized-metered-channel", + "prioritized-metered-channel 0.6.0", "thiserror", "tracing", ] @@ -8989,8 +8988,7 @@ dependencies = [ [[package]] name = "orchestra-proc-macro" version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d035b1f968d91a826f2e34a9d6d02cb2af5aa7ca39ebd27922d850ab4b2dd2c6" +source = "git+https://github.com/paritytech/orchestra?branch=master#1912785d36f36ee2894082331122bef2a2360314" dependencies = [ "anyhow", "expander 2.0.0", @@ -12839,7 +12837,7 @@ dependencies = [ "parity-scale-codec", "polkadot-primitives", "polkadot-test-service", - "prioritized-metered-channel", + "prioritized-metered-channel 0.5.1", "prometheus-parse", "sc-cli", "sc-service", @@ -12987,7 +12985,7 @@ dependencies = [ "polkadot-overseer", "polkadot-primitives", "polkadot-primitives-test-helpers", - "prioritized-metered-channel", + "prioritized-metered-channel 0.5.1", "rand 0.8.5", "sc-client-api", "schnellru", @@ -13017,7 +13015,7 @@ dependencies = [ "polkadot-node-subsystem-types", "polkadot-primitives", "polkadot-primitives-test-helpers", - "prioritized-metered-channel", + "prioritized-metered-channel 0.5.1", "sc-client-api", "sp-api", "sp-core", @@ -13981,6 +13979,21 @@ dependencies = [ "tracing", ] +[[package]] +name = "prioritized-metered-channel" +version = "0.6.0" +source = "git+https://github.com/paritytech/orchestra?branch=master#1912785d36f36ee2894082331122bef2a2360314" +dependencies = [ + "coarsetime", + "crossbeam-queue", + "derive_more", + "futures", + "futures-timer", + "nanorand", + "thiserror", + "tracing", +] + [[package]] name = "proc-macro-crate" version = "1.3.1" diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index 7a15f8e5a795..ae0c21354709 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -83,6 +83,8 @@ const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3); #[cfg(test)] const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200); +const TASK_LIMIT: usize = 30; + /// Configuration for the candidate validation subsystem #[derive(Clone)] pub struct Config { @@ -149,31 +151,23 @@ where exec_kind, response_sender, .. - } => { - // let mut sender = ctx.sender().clone(); - // let metrics = metrics.clone(); - // let validation_host = validation_host.clone(); - - // let guard = semaphore.acquire_arc().await; - async move { - // let _guard = guard; - let _timer = metrics.time_validate_from_chain_state(); - let res = validate_from_chain_state( - &mut sender, - validation_host, - candidate_receipt, - pov, - executor_params, - exec_kind, - &metrics, - ) - .await; + } => async move { + let _timer = metrics.time_validate_from_chain_state(); + let res = validate_from_chain_state( + &mut sender, + validation_host, + candidate_receipt, + pov, + executor_params, + exec_kind, + &metrics, + ) + .await; - metrics.on_validation_event(&res); - let _ = response_sender.send(res); - } - .boxed() - }, + metrics.on_validation_event(&res); + let _ = response_sender.send(res); + } + .boxed(), CandidateValidationMessage::ValidateFromExhaustive { validation_data, validation_code, @@ -183,51 +177,37 @@ where exec_kind, response_sender, .. - } => { - // let metrics = metrics.clone(); - // let validation_host = validation_host.clone(); - - // let guard = semaphore.acquire_arc().await; - async move { - // let _guard = guard; - let _timer = metrics.time_validate_from_exhaustive(); - let res = validate_candidate_exhaustive( - validation_host, - validation_data, - validation_code, - candidate_receipt, - pov, - executor_params, - exec_kind, - &metrics, - ) - .await; + } => async move { + let _timer = metrics.time_validate_from_exhaustive(); + let res = validate_candidate_exhaustive( + validation_host, + validation_data, + validation_code, + candidate_receipt, + pov, + executor_params, + exec_kind, + &metrics, + ) + .await; - metrics.on_validation_event(&res); - let _ = response_sender.send(res); - } - .boxed() - }, + metrics.on_validation_event(&res); + let _ = response_sender.send(res); + } + .boxed(), CandidateValidationMessage::PreCheck { relay_parent, validation_code_hash, response_sender, .. - } => { - // let mut sender = ctx.sender().clone(); - // let validation_host = validation_host.clone(); - - // let guard = semaphore.acquire_arc().await; - async move { - // let _guard = guard; - let precheck_result = - precheck_pvf(&mut sender, validation_host, relay_parent, validation_code_hash) - .await; - - let _ = response_sender.send(precheck_result); - } - .boxed() - }, + } => async move { + let precheck_result = + precheck_pvf(&mut sender, validation_host, relay_parent, validation_code_hash) + .await; + + let _ = response_sender.send(precheck_result); + } + .boxed(), } } @@ -258,26 +238,47 @@ async fn run( ctx.spawn_blocking("pvf-validation-host", task.boxed())?; let mut tasks = FuturesUnordered::new(); - // The task queue size is chosen to be somewhat bigger than the PVF host incoming queue size - // to allow exhaustive validation messages to fall through in case the tasks are clogged with - // `ValidateFromChainState` messages awaiting data from the runtime - let semaphore = Arc::new(Semaphore::new(polkadot_node_core_pvf::HOST_MESSAGE_QUEUE_SIZE * 2)); loop { - futures::select! { - comm = ctx.recv().fuse() => { - match comm { - Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_))) => {}, - Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {}, - Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()), - Ok(FromOrchestra::Communication { msg }) => { - let task = handle_validation_message(ctx.sender().clone(), validation_host.clone(), metrics.clone(), msg); - tasks.push(task); + loop { + futures::select! { + comm = ctx.recv().fuse() => { + match comm { + Ok(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(_))) => {}, + Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(..))) => {}, + Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) => return Ok(()), + Ok(FromOrchestra::Communication { msg }) => { + let task = handle_validation_message(ctx.sender().clone(), validation_host.clone(), metrics.clone(), msg); + tasks.push(task); + if tasks.len() >= TASK_LIMIT { + break + } + }, + Err(e) => return Err(SubsystemError::from(e)), + } + }, + _ = tasks.select_next_some() => () + } + } + + gum::debug!(target: LOG_TARGET, "Validation task limit hit"); + + loop { + futures::select! { + signal = ctx.recv_signal().fuse() => { + match signal { + Ok(OverseerSignal::ActiveLeaves(_)) => {}, + Ok(OverseerSignal::BlockFinalized(..)) => {}, + Ok(OverseerSignal::Conclude) => return Ok(()), + Err(e) => return Err(SubsystemError::from(e)), + } + }, + _ = tasks.select_next_some() => { + if tasks.len() < TASK_LIMIT { + break } - Err(e) => return Err(SubsystemError::from(e)) } - }, - _ = tasks.select_next_some() => () + } } } } diff --git a/polkadot/node/malus/src/interceptor.rs b/polkadot/node/malus/src/interceptor.rs index e994319beb96..8f03df573bd1 100644 --- a/polkadot/node/malus/src/interceptor.rs +++ b/polkadot/node/malus/src/interceptor.rs @@ -241,6 +241,10 @@ where } } + async fn recv_signal(&mut self) -> SubsystemResult { + Err(SubsystemError::Context("Not implemented".to_owned())) + } + fn spawn( &mut self, name: &'static str, diff --git a/polkadot/node/overseer/Cargo.toml b/polkadot/node/overseer/Cargo.toml index 40df8d3514a4..0fd4b75dbbc0 100644 --- a/polkadot/node/overseer/Cargo.toml +++ b/polkadot/node/overseer/Cargo.toml @@ -20,7 +20,7 @@ polkadot-node-primitives = { path = "../primitives" } polkadot-node-subsystem-types = { path = "../subsystem-types" } polkadot-node-metrics = { path = "../metrics" } polkadot-primitives = { path = "../../primitives" } -orchestra = { version = "0.3.3", default-features = false, features = ["futures_channel"] } +orchestra = { git = "https://github.com/paritytech/orchestra", branch = "master", default-features = false, features = ["futures_channel"] } gum = { package = "tracing-gum", path = "../gum" } sp-core = { path = "../../../substrate/primitives/core" } async-trait = "0.1.74" diff --git a/polkadot/node/subsystem-bench/Cargo.toml b/polkadot/node/subsystem-bench/Cargo.toml index 6504c8f714de..384e7656ca1d 100644 --- a/polkadot/node/subsystem-bench/Cargo.toml +++ b/polkadot/node/subsystem-bench/Cargo.toml @@ -55,7 +55,7 @@ prometheus = { version = "0.13.0", default-features = false } serde = "1.0.192" serde_yaml = "0.9" paste = "1.0.14" -orchestra = { version = "0.3.3", default-features = false, features = ["futures_channel"] } +orchestra = { git = "https://github.com/paritytech/orchestra", branch = "master", default-features = false, features = ["futures_channel"] } pyroscope = "0.5.7" pyroscope_pprofrs = "0.2.7" diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index dfa78e04b8c9..54444e1608f8 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -221,6 +221,11 @@ where .ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned())) } + // FIXME: Should be properly implemented and tests should be added as well + async fn recv_signal(&mut self) -> SubsystemResult { + Err(SubsystemError::Context("Not implemented".to_owned())) + } + fn spawn( &mut self, name: &'static str, diff --git a/polkadot/node/subsystem-types/Cargo.toml b/polkadot/node/subsystem-types/Cargo.toml index 6713e9031234..0094740437dd 100644 --- a/polkadot/node/subsystem-types/Cargo.toml +++ b/polkadot/node/subsystem-types/Cargo.toml @@ -17,7 +17,7 @@ polkadot-node-primitives = { path = "../primitives" } polkadot-node-network-protocol = { path = "../network/protocol" } polkadot-statement-table = { path = "../../statement-table" } polkadot-node-jaeger = { path = "../jaeger" } -orchestra = { version = "0.3.3", default-features = false, features = ["futures_channel"] } +orchestra = { git = "https://github.com/paritytech/orchestra", branch = "master", default-features = false, features = ["futures_channel"] } sc-network = { path = "../../../substrate/client/network" } sp-api = { path = "../../../substrate/primitives/api" } sp-blockchain = { path = "../../../substrate/primitives/blockchain" } From 6bcc212c93653c8861d67bca5fc8207cebe5dfe7 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Tue, 2 Jan 2024 14:39:52 +0100 Subject: [PATCH 08/13] Fix Malus --- Cargo.lock | 10 ---------- .../node/core/candidate-validation/Cargo.toml | 1 - .../node/core/candidate-validation/src/lib.rs | 1 - polkadot/node/malus/src/interceptor.rs | 19 ++++++++++++++++--- 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 25570b74994b..659774285bae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1207,15 +1207,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "async-semaphore" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "538c756e85eb6ffdefaec153804afb6da84b033e2e5ec3e9d459c34b4bf4d3f6" -dependencies = [ - "event-listener", -] - [[package]] name = "async-stream" version = "0.3.5" @@ -12503,7 +12494,6 @@ name = "polkadot-node-core-candidate-validation" version = "1.0.0" dependencies = [ "assert_matches", - "async-semaphore", "async-trait", "futures", "futures-timer", diff --git a/polkadot/node/core/candidate-validation/Cargo.toml b/polkadot/node/core/candidate-validation/Cargo.toml index 9e34e44af054..4f0ad67dbf1c 100644 --- a/polkadot/node/core/candidate-validation/Cargo.toml +++ b/polkadot/node/core/candidate-validation/Cargo.toml @@ -10,7 +10,6 @@ license.workspace = true workspace = true [dependencies] -async-semaphore = "1.2.0" async-trait = "0.1.74" futures = "0.3.21" futures-timer = "3.0.2" diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index ae0c21354709..dee37c934407 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -23,7 +23,6 @@ #![deny(unused_crate_dependencies, unused_results)] #![warn(missing_docs)] -use async_semaphore::Semaphore; use polkadot_node_core_pvf::{ InternalValidationError, InvalidCandidate as WasmInvalidCandidate, PossiblyInvalidError, PrepareError, PrepareJobKind, PvfPrepData, ValidationError, ValidationHost, diff --git a/polkadot/node/malus/src/interceptor.rs b/polkadot/node/malus/src/interceptor.rs index 8f03df573bd1..b44ffc8956b5 100644 --- a/polkadot/node/malus/src/interceptor.rs +++ b/polkadot/node/malus/src/interceptor.rs @@ -22,7 +22,7 @@ use polkadot_node_subsystem::*; pub use polkadot_node_subsystem::{messages::*, overseer, FromOrchestra}; -use std::{future::Future, pin::Pin}; +use std::{collections::VecDeque, future::Future, pin::Pin}; /// Filter incoming and outgoing messages. pub trait MessageInterceptor: Send + Sync + Clone + 'static @@ -170,6 +170,7 @@ where inner: Context, message_filter: Fil, sender: InterceptedSender<::Sender, Fil>, + message_buffer: VecDeque::Message>>, } impl InterceptedContext @@ -189,7 +190,7 @@ where inner: inner.sender().clone(), message_filter: message_filter.clone(), }; - Self { inner, message_filter, sender } + Self { inner, message_filter, sender, message_buffer: VecDeque::new() } } } @@ -233,6 +234,9 @@ where } async fn recv(&mut self) -> SubsystemResult> { + if let Some(msg) = self.message_buffer.pop_front() { + return Ok(msg) + } loop { let msg = self.inner.recv().await?; if let Some(msg) = self.message_filter.intercept_incoming(self.inner.sender(), msg) { @@ -242,7 +246,16 @@ where } async fn recv_signal(&mut self) -> SubsystemResult { - Err(SubsystemError::Context("Not implemented".to_owned())) + loop { + let msg = self.inner.recv().await?; + if let Some(msg) = self.message_filter.intercept_incoming(self.inner.sender(), msg) { + if let FromOrchestra::Signal(sig) = msg { + return Ok(sig) + } else { + self.message_buffer.push_back(msg) + } + } + } } fn spawn( From fb204328e7008530675adb49372d5495bedb3647 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Wed, 3 Jan 2024 18:32:32 +0100 Subject: [PATCH 09/13] Fix tests --- Cargo.lock | 2 +- polkadot/node/overseer/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 659774285bae..1891f2b84ebb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13005,7 +13005,7 @@ dependencies = [ "polkadot-node-subsystem-types", "polkadot-primitives", "polkadot-primitives-test-helpers", - "prioritized-metered-channel 0.5.1", + "prioritized-metered-channel 0.6.0", "sc-client-api", "sp-api", "sp-core", diff --git a/polkadot/node/overseer/Cargo.toml b/polkadot/node/overseer/Cargo.toml index 0fd4b75dbbc0..c704f4641374 100644 --- a/polkadot/node/overseer/Cargo.toml +++ b/polkadot/node/overseer/Cargo.toml @@ -27,7 +27,7 @@ async-trait = "0.1.74" tikv-jemalloc-ctl = { version = "0.5.0", optional = true } [dev-dependencies] -metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features = ["futures_channel"] } +metered = { package = "prioritized-metered-channel", git = "https://github.com/paritytech/orchestra", branch = "master", default-features = false, features = ["futures_channel"] } sp-core = { path = "../../../substrate/primitives/core" } futures = { version = "0.3.21", features = ["thread-pool"] } femme = "2.2.1" From a0e8065b6a900522a11701759d81c2064851246f Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Fri, 19 Jan 2024 14:50:57 +0100 Subject: [PATCH 10/13] Bump `orchestra` and dependencies --- Cargo.lock | 39 +++++++------------ .../Cargo.toml | 2 +- polkadot/node/metrics/Cargo.toml | 2 +- polkadot/node/overseer/Cargo.toml | 4 +- polkadot/node/subsystem-bench/Cargo.toml | 2 +- polkadot/node/subsystem-types/Cargo.toml | 2 +- polkadot/node/subsystem-util/Cargo.toml | 2 +- 7 files changed, 20 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index facafc3471da..6dc3bed95ae5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4125,7 +4125,7 @@ dependencies = [ "polkadot-primitives", "polkadot-service", "polkadot-test-client", - "prioritized-metered-channel 0.5.1", + "prioritized-metered-channel", "sc-cli", "sc-client-api", "sc-sysinfo", @@ -8988,8 +8988,9 @@ checksum = "04744f49eae99ab78e0d5c0b603ab218f515ea8cfe5a456d7629ad883a3b6e7d" [[package]] name = "orchestra" -version = "0.3.3" -source = "git+https://github.com/paritytech/orchestra?branch=master#1912785d36f36ee2894082331122bef2a2360314" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5edee0c1917703f8a28cd229cf6a5c91a7ee34be139ced16509ac5b53b9d0c51" dependencies = [ "async-trait", "dyn-clonable", @@ -8997,15 +8998,16 @@ dependencies = [ "futures-timer", "orchestra-proc-macro", "pin-project", - "prioritized-metered-channel 0.6.0", + "prioritized-metered-channel", "thiserror", "tracing", ] [[package]] name = "orchestra-proc-macro" -version = "0.3.3" -source = "git+https://github.com/paritytech/orchestra?branch=master#1912785d36f36ee2894082331122bef2a2360314" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f60e64a3808b5bb2786b9da09fc70714952aabcdd0eeba6f1718e3dbc34ad5b" dependencies = [ "expander 2.0.0", "indexmap 2.0.0", @@ -12848,7 +12850,7 @@ dependencies = [ "parity-scale-codec", "polkadot-primitives", "polkadot-test-service", - "prioritized-metered-channel 0.5.1", + "prioritized-metered-channel", "prometheus-parse", "sc-cli", "sc-service", @@ -12996,7 +12998,7 @@ dependencies = [ "polkadot-overseer", "polkadot-primitives", "polkadot-primitives-test-helpers", - "prioritized-metered-channel 0.5.1", + "prioritized-metered-channel", "rand 0.8.5", "sc-client-api", "schnellru", @@ -13026,7 +13028,7 @@ dependencies = [ "polkadot-node-subsystem-types", "polkadot-primitives", "polkadot-primitives-test-helpers", - "prioritized-metered-channel 0.6.0", + "prioritized-metered-channel", "sc-client-api", "sp-api", "sp-core", @@ -14026,24 +14028,9 @@ dependencies = [ [[package]] name = "prioritized-metered-channel" -version = "0.5.1" +version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e99f0c89bd88f393aab44a4ab949351f7bc7e7e1179d11ecbfe50cbe4c47e342" -dependencies = [ - "coarsetime", - "crossbeam-queue", - "derive_more", - "futures", - "futures-timer", - "nanorand", - "thiserror", - "tracing", -] - -[[package]] -name = "prioritized-metered-channel" -version = "0.6.0" -source = "git+https://github.com/paritytech/orchestra?branch=master#1912785d36f36ee2894082331122bef2a2360314" +checksum = "a172e6cc603231f2cf004232eabcecccc0da53ba576ab286ef7baa0cfc7927ad" dependencies = [ "coarsetime", "crossbeam-queue", diff --git a/cumulus/client/relay-chain-inprocess-interface/Cargo.toml b/cumulus/client/relay-chain-inprocess-interface/Cargo.toml index 63f4c9154743..7c7edc502d4c 100644 --- a/cumulus/client/relay-chain-inprocess-interface/Cargo.toml +++ b/cumulus/client/relay-chain-inprocess-interface/Cargo.toml @@ -42,7 +42,7 @@ sp-keyring = { path = "../../../substrate/primitives/keyring" } # Polkadot polkadot-primitives = { path = "../../../polkadot/primitives" } polkadot-test-client = { path = "../../../polkadot/node/test/client" } -metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features = ["futures_channel"] } +metered = { package = "prioritized-metered-channel", version = "0.6.1", default-features = false, features = ["futures_channel"] } # Cumulus cumulus-test-service = { path = "../../test/service" } diff --git a/polkadot/node/metrics/Cargo.toml b/polkadot/node/metrics/Cargo.toml index e9a4d463f4d9..90d95a6e50a2 100644 --- a/polkadot/node/metrics/Cargo.toml +++ b/polkadot/node/metrics/Cargo.toml @@ -14,7 +14,7 @@ futures = "0.3.21" futures-timer = "3.0.2" gum = { package = "tracing-gum", path = "../gum" } -metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features = ["futures_channel"] } +metered = { package = "prioritized-metered-channel", version = "0.6.1", default-features = false, features = ["futures_channel"] } # Both `sc-service` and `sc-cli` are required by runtime metrics `logger_hook()`. sc-service = { path = "../../../substrate/client/service" } sc-cli = { path = "../../../substrate/client/cli" } diff --git a/polkadot/node/overseer/Cargo.toml b/polkadot/node/overseer/Cargo.toml index 69fa2a1d919d..f39f0661aa17 100644 --- a/polkadot/node/overseer/Cargo.toml +++ b/polkadot/node/overseer/Cargo.toml @@ -20,14 +20,14 @@ polkadot-node-primitives = { path = "../primitives" } polkadot-node-subsystem-types = { path = "../subsystem-types" } polkadot-node-metrics = { path = "../metrics" } polkadot-primitives = { path = "../../primitives" } -orchestra = { git = "https://github.com/paritytech/orchestra", branch = "master", default-features = false, features = ["futures_channel"] } +orchestra = { version = "0.3.4", default-features = false, features = ["futures_channel"] } gum = { package = "tracing-gum", path = "../gum" } sp-core = { path = "../../../substrate/primitives/core" } async-trait = "0.1.74" tikv-jemalloc-ctl = { version = "0.5.0", optional = true } [dev-dependencies] -metered = { package = "prioritized-metered-channel", git = "https://github.com/paritytech/orchestra", branch = "master", default-features = false, features = ["futures_channel"] } +metered = { package = "prioritized-metered-channel", version = "0.6.1", default-features = false, features = ["futures_channel"] } sp-core = { path = "../../../substrate/primitives/core" } futures = { version = "0.3.21", features = ["thread-pool"] } femme = "2.2.1" diff --git a/polkadot/node/subsystem-bench/Cargo.toml b/polkadot/node/subsystem-bench/Cargo.toml index 1145f4ed7a14..750f7a7e2f83 100644 --- a/polkadot/node/subsystem-bench/Cargo.toml +++ b/polkadot/node/subsystem-bench/Cargo.toml @@ -55,7 +55,7 @@ prometheus = { version = "0.13.0", default-features = false } serde = "1.0.195" serde_yaml = "0.9" paste = "1.0.14" -orchestra = { git = "https://github.com/paritytech/orchestra", branch = "master", default-features = false, features = ["futures_channel"] } +orchestra = { version = "0.3.4", default-features = false, features = ["futures_channel"] } pyroscope = "0.5.7" pyroscope_pprofrs = "0.2.7" diff --git a/polkadot/node/subsystem-types/Cargo.toml b/polkadot/node/subsystem-types/Cargo.toml index 0094740437dd..181ef54b4c6c 100644 --- a/polkadot/node/subsystem-types/Cargo.toml +++ b/polkadot/node/subsystem-types/Cargo.toml @@ -17,7 +17,7 @@ polkadot-node-primitives = { path = "../primitives" } polkadot-node-network-protocol = { path = "../network/protocol" } polkadot-statement-table = { path = "../../statement-table" } polkadot-node-jaeger = { path = "../jaeger" } -orchestra = { git = "https://github.com/paritytech/orchestra", branch = "master", default-features = false, features = ["futures_channel"] } +orchestra = { version = "0.3.4", default-features = false, features = ["futures_channel"] } sc-network = { path = "../../../substrate/client/network" } sp-api = { path = "../../../substrate/primitives/api" } sp-blockchain = { path = "../../../substrate/primitives/blockchain" } diff --git a/polkadot/node/subsystem-util/Cargo.toml b/polkadot/node/subsystem-util/Cargo.toml index 3147a4f64f46..68a834d46e3e 100644 --- a/polkadot/node/subsystem-util/Cargo.toml +++ b/polkadot/node/subsystem-util/Cargo.toml @@ -32,7 +32,7 @@ polkadot-node-network-protocol = { path = "../network/protocol" } polkadot-primitives = { path = "../../primitives" } polkadot-node-primitives = { path = "../primitives" } polkadot-overseer = { path = "../overseer" } -metered = { package = "prioritized-metered-channel", version = "0.5.1", default-features = false, features = ["futures_channel"] } +metered = { package = "prioritized-metered-channel", version = "0.6.1", default-features = false, features = ["futures_channel"] } sp-core = { path = "../../../substrate/primitives/core" } sp-application-crypto = { path = "../../../substrate/primitives/application-crypto" } From 4c6e73de433a9becf98a864dae7956945058fc9e Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Fri, 19 Jan 2024 15:26:35 +0100 Subject: [PATCH 11/13] Document the constant value --- polkadot/node/core/candidate-validation/src/lib.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/polkadot/node/core/candidate-validation/src/lib.rs b/polkadot/node/core/candidate-validation/src/lib.rs index b1a50c89b43f..bf6e09fd1b69 100644 --- a/polkadot/node/core/candidate-validation/src/lib.rs +++ b/polkadot/node/core/candidate-validation/src/lib.rs @@ -82,6 +82,9 @@ const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_secs(3); #[cfg(test)] const PVF_APPROVAL_EXECUTION_RETRY_DELAY: Duration = Duration::from_millis(200); +// The task queue size is chosen to be somewhat bigger than the PVF host incoming queue size +// to allow exhaustive validation messages to fall through in case the tasks are clogged with +// `ValidateFromChainState` messages awaiting data from the runtime const TASK_LIMIT: usize = 30; /// Configuration for the candidate validation subsystem From 05e3b834fe7c49f526aadf296247cb90a94902c5 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Fri, 19 Jan 2024 15:51:04 +0100 Subject: [PATCH 12/13] Implement `recv_signal` for test helper --- .../node/subsystem-test-helpers/src/lib.rs | 23 +++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index 54444e1608f8..6c1ac86c4507 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -32,6 +32,7 @@ use parking_lot::Mutex; use sp_core::testing::TaskExecutor; use std::{ + collections::VecDeque, convert::Infallible, future::Future, pin::Pin, @@ -190,6 +191,7 @@ pub struct TestSubsystemContext { tx: TestSubsystemSender, rx: mpsc::Receiver>, spawn: S, + message_buffer: VecDeque>, } #[async_trait::async_trait] @@ -207,6 +209,9 @@ where type Error = SubsystemError; async fn try_recv(&mut self) -> Result>, ()> { + if let Some(msg) = self.message_buffer.pop_front() { + return Ok(Some(msg)) + } match poll!(self.rx.next()) { Poll::Ready(Some(msg)) => Ok(Some(msg)), Poll::Ready(None) => Err(()), @@ -215,15 +220,28 @@ where } async fn recv(&mut self) -> SubsystemResult> { + if let Some(msg) = self.message_buffer.pop_front() { + return Ok(msg) + } self.rx .next() .await .ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned())) } - // FIXME: Should be properly implemented and tests should be added as well async fn recv_signal(&mut self) -> SubsystemResult { - Err(SubsystemError::Context("Not implemented".to_owned())) + loop { + let msg = self + .rx + .next() + .await + .ok_or_else(|| SubsystemError::Context("Receiving end closed".to_owned()))?; + if let FromOrchestra::Signal(sig) = msg { + return Ok(sig) + } else { + self.message_buffer.push_back(msg) + } + } } fn spawn( @@ -319,6 +337,7 @@ pub fn make_buffered_subsystem_context( tx: TestSubsystemSender { tx: all_messages_tx }, rx: overseer_rx, spawn: SpawnGlue(spawner), + message_buffer: VecDeque::new(), }, TestSubsystemContextHandle { tx: overseer_tx, rx: all_messages_rx }, ) From 92bacd34cf26252a232a0ab0ea04b30dd4d9bed1 Mon Sep 17 00:00:00 2001 From: Dmitry Sinyavin Date: Fri, 19 Jan 2024 16:06:17 +0100 Subject: [PATCH 13/13] Add `.prdoc` --- prdoc/pr_2125.prdoc | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 prdoc/pr_2125.prdoc diff --git a/prdoc/pr_2125.prdoc b/prdoc/pr_2125.prdoc new file mode 100644 index 000000000000..ee81975d2d07 --- /dev/null +++ b/prdoc/pr_2125.prdoc @@ -0,0 +1,14 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: Introduce bounds for the number of candidate validation subsystem simultaneously processed tasks + +doc: + - audience: Node Dev + description: | + Makes it possible for the candidate validation subsystem to create backpressure on subsystems + requesting to validate a candidate through limiting the number of simultaneously processed + validation tasks. + +crates: + - name: polkadot-node-core-candidate-validation