diff --git a/Cargo.lock b/Cargo.lock
index 6c109b49896e..7d8c37e9728c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -7019,7 +7019,9 @@ dependencies = [
"pin-project",
"polkadot-core-primitives",
"polkadot-node-metrics",
+ "polkadot-node-primitives",
"polkadot-parachain",
+ "polkadot-primitives",
"rand 0.8.5",
"rayon",
"sc-executor",
@@ -7314,6 +7316,7 @@ dependencies = [
"polkadot-erasure-coding",
"polkadot-node-core-pvf",
"polkadot-node-primitives",
+ "polkadot-primitives",
"quote",
"thiserror",
]
diff --git a/erasure-coding/benches/scaling_with_validators.rs b/erasure-coding/benches/scaling_with_validators.rs
index c4834ecbab0f..35eff095d2f6 100644
--- a/erasure-coding/benches/scaling_with_validators.rs
+++ b/erasure-coding/benches/scaling_with_validators.rs
@@ -15,7 +15,7 @@
// along with Polkadot. If not, see .
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
-use polkadot_primitives::v2::Hash;
+use polkadot_primitives::Hash;
use std::time::Duration;
fn chunks(n_validators: usize, pov: &Vec) -> Vec> {
diff --git a/node/core/candidate-validation/Cargo.toml b/node/core/candidate-validation/Cargo.toml
index 267dced02b26..e46d44033bad 100644
--- a/node/core/candidate-validation/Cargo.toml
+++ b/node/core/candidate-validation/Cargo.toml
@@ -17,6 +17,7 @@ polkadot-primitives = { path = "../../../primitives" }
polkadot-parachain = { path = "../../../parachain" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-subsystem = { path = "../../subsystem" }
+polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-node-metrics = { path = "../../metrics" }
[target.'cfg(not(any(target_os = "android", target_os = "unknown")))'.dependencies]
@@ -27,6 +28,5 @@ sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master
futures = { version = "0.3.21", features = ["thread-pool"] }
assert_matches = "1.4.0"
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
-polkadot-node-subsystem-util = { path = "../../subsystem-util" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../../primitives/test-helpers" }
diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs
index b3c44064a922..efdbeefe13f9 100644
--- a/node/core/candidate-validation/src/lib.rs
+++ b/node/core/candidate-validation/src/lib.rs
@@ -24,8 +24,8 @@
#![warn(missing_docs)]
use polkadot_node_core_pvf::{
- InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, Pvf, ValidationError,
- ValidationHost,
+ InvalidCandidate as WasmInvalidCandidate, PrepareError, PrepareStats, Pvf,
+ PvfWithExecutorParams, ValidationError, ValidationHost,
};
use polkadot_node_primitives::{
BlockData, InvalidCandidate, PoV, ValidationResult, POV_BOMB_LIMIT, VALIDATION_CODE_BOMB_LIMIT,
@@ -39,10 +39,11 @@ use polkadot_node_subsystem::{
overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, SubsystemResult,
SubsystemSender,
};
+use polkadot_node_subsystem_util::executor_params_at_relay_parent;
use polkadot_parachain::primitives::{ValidationParams, ValidationResult as WasmValidationResult};
use polkadot_primitives::{
- CandidateCommitments, CandidateDescriptor, CandidateReceipt, Hash, OccupiedCoreAssumption,
- PersistedValidationData, ValidationCode, ValidationCodeHash,
+ vstaging::ExecutorParams, CandidateCommitments, CandidateDescriptor, CandidateReceipt, Hash,
+ OccupiedCoreAssumption, PersistedValidationData, ValidationCode, ValidationCodeHash,
};
use parity_scale_codec::Encode;
@@ -175,12 +176,14 @@ async fn run(
response_sender,
) => {
let bg = {
+ let mut sender = ctx.sender().clone();
let metrics = metrics.clone();
let validation_host = validation_host.clone();
async move {
let _timer = metrics.time_validate_from_exhaustive();
let res = validate_candidate_exhaustive(
+ &mut sender,
validation_host,
persisted_validation_data,
validation_code,
@@ -307,18 +310,38 @@ where
},
};
- let validation_code = match sp_maybe_compressed_blob::decompress(
+ let executor_params =
+ if let Ok(executor_params) = executor_params_at_relay_parent(relay_parent, sender).await {
+ gum::debug!(
+ target: LOG_TARGET,
+ ?relay_parent,
+ ?validation_code_hash,
+ "precheck: acquired executor params for the session: {:?}",
+ executor_params,
+ );
+ executor_params
+ } else {
+ gum::warn!(
+ target: LOG_TARGET,
+ ?relay_parent,
+ ?validation_code_hash,
+ "precheck: failed to acquire executor params for the session, thus voting against.",
+ );
+ return PreCheckOutcome::Invalid
+ };
+
+ let pvf_with_params = match sp_maybe_compressed_blob::decompress(
&validation_code.0,
VALIDATION_CODE_BOMB_LIMIT,
) {
- Ok(code) => Pvf::from_code(code.into_owned()),
+ Ok(code) => PvfWithExecutorParams::new(Pvf::from_code(code.into_owned()), executor_params),
Err(e) => {
gum::debug!(target: LOG_TARGET, err=?e, "precheck: cannot decompress validation code");
return PreCheckOutcome::Invalid
},
};
- match validation_backend.precheck_pvf(validation_code).await {
+ match validation_backend.precheck_pvf(pvf_with_params).await {
Ok(_) => PreCheckOutcome::Valid,
Err(prepare_err) =>
if prepare_err.is_deterministic() {
@@ -456,6 +479,7 @@ where
};
let validation_result = validate_candidate_exhaustive(
+ sender,
validation_host,
validation_data,
validation_code,
@@ -490,7 +514,8 @@ where
validation_result
}
-async fn validate_candidate_exhaustive(
+async fn validate_candidate_exhaustive(
+ sender: &mut Sender,
mut validation_backend: impl ValidationBackend + Send,
persisted_validation_data: PersistedValidationData,
validation_code: ValidationCode,
@@ -498,7 +523,10 @@ async fn validate_candidate_exhaustive(
pov: Arc,
timeout: Duration,
metrics: &Metrics,
-) -> Result {
+) -> Result
+where
+ Sender: SubsystemSender,
+{
let _timer = metrics.time_validate_candidate_exhaustive();
let validation_code_hash = validation_code.hash();
@@ -554,8 +582,34 @@ async fn validate_candidate_exhaustive(
relay_parent_storage_root: persisted_validation_data.relay_parent_storage_root,
};
+ let executor_params = if let Ok(executor_params) =
+ executor_params_at_relay_parent(candidate_receipt.descriptor.relay_parent, sender).await
+ {
+ gum::debug!(
+ target: LOG_TARGET,
+ ?validation_code_hash,
+ ?para_id,
+ "Acquired executor params for the session: {:?}",
+ executor_params,
+ );
+ executor_params
+ } else {
+ gum::warn!(
+ target: LOG_TARGET,
+ ?validation_code_hash,
+ ?para_id,
+ "Failed to acquire executor params for the session",
+ );
+ return Ok(ValidationResult::Invalid(InvalidCandidate::BadParent))
+ };
+
let result = validation_backend
- .validate_candidate_with_retry(raw_validation_code.to_vec(), timeout, params)
+ .validate_candidate_with_retry(
+ raw_validation_code.to_vec(),
+ timeout,
+ params,
+ executor_params,
+ )
.await;
if let Err(ref error) = result {
@@ -613,7 +667,7 @@ trait ValidationBackend {
/// Tries executing a PVF a single time (no retries).
async fn validate_candidate(
&mut self,
- pvf: Pvf,
+ pvf_with_params: PvfWithExecutorParams,
timeout: Duration,
encoded_params: Vec,
) -> Result;
@@ -625,12 +679,14 @@ trait ValidationBackend {
raw_validation_code: Vec,
timeout: Duration,
params: ValidationParams,
+ executor_params: ExecutorParams,
) -> Result {
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
- let pvf = Pvf::from_code(raw_validation_code);
+ let pvf_with_params =
+ PvfWithExecutorParams::new(Pvf::from_code(raw_validation_code), executor_params);
let mut validation_result =
- self.validate_candidate(pvf.clone(), timeout, params.encode()).await;
+ self.validate_candidate(pvf_with_params.clone(), timeout, params.encode()).await;
// If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the
// assumption that the conditions that caused this error may have been transient. Note that
@@ -643,19 +699,23 @@ trait ValidationBackend {
gum::warn!(
target: LOG_TARGET,
- ?pvf,
+ ?pvf_with_params,
"Re-trying failed candidate validation due to AmbiguousWorkerDeath."
);
// Encode the params again when re-trying. We expect the retry case to be relatively
// rare, and we want to avoid unconditionally cloning data.
- validation_result = self.validate_candidate(pvf, timeout, params.encode()).await;
+ validation_result =
+ self.validate_candidate(pvf_with_params, timeout, params.encode()).await;
}
validation_result
}
- async fn precheck_pvf(&mut self, pvf: Pvf) -> Result;
+ async fn precheck_pvf(
+ &mut self,
+ pvf_with_params: PvfWithExecutorParams,
+ ) -> Result;
}
#[async_trait]
@@ -663,14 +723,16 @@ impl ValidationBackend for ValidationHost {
/// Tries executing a PVF a single time (no retries).
async fn validate_candidate(
&mut self,
- pvf: Pvf,
+ pvf_with_params: PvfWithExecutorParams,
timeout: Duration,
encoded_params: Vec,
) -> Result {
let priority = polkadot_node_core_pvf::Priority::Normal;
let (tx, rx) = oneshot::channel();
- if let Err(err) = self.execute_pvf(pvf, timeout, encoded_params, priority, tx).await {
+ if let Err(err) =
+ self.execute_pvf(pvf_with_params, timeout, encoded_params, priority, tx).await
+ {
return Err(ValidationError::InternalError(format!(
"cannot send pvf to the validation host: {:?}",
err
@@ -681,9 +743,12 @@ impl ValidationBackend for ValidationHost {
.map_err(|_| ValidationError::InternalError("validation was cancelled".into()))?
}
- async fn precheck_pvf(&mut self, pvf: Pvf) -> Result {
+ async fn precheck_pvf(
+ &mut self,
+ pvf_with_params: PvfWithExecutorParams,
+ ) -> Result {
let (tx, rx) = oneshot::channel();
- if let Err(err) = self.precheck_pvf(pvf, tx).await {
+ if let Err(err) = self.precheck_pvf(pvf_with_params, tx).await {
// Return an IO error if there was an error communicating with the host.
return Err(PrepareError::IoErr(err))
}
diff --git a/node/core/candidate-validation/src/tests.rs b/node/core/candidate-validation/src/tests.rs
index 779bf0fcca33..6bab57097dae 100644
--- a/node/core/candidate-validation/src/tests.rs
+++ b/node/core/candidate-validation/src/tests.rs
@@ -26,6 +26,37 @@ use polkadot_primitives::{HeadData, Id as ParaId, UpwardMessage};
use sp_core::testing::TaskExecutor;
use sp_keyring::Sr25519Keyring;
+fn test_with_executor_params, R, M>(
+ mut ctx_handle: test_helpers::TestSubsystemContextHandle,
+ test: impl FnOnce() -> T,
+) -> R {
+ let test_fut = test();
+
+ let overseer = async move {
+ assert_matches!(
+ ctx_handle.recv().await,
+ AllMessages::RuntimeApi(
+ RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx))
+ ) => {
+ tx.send(Ok(1u32.into())).unwrap();
+ }
+ );
+ assert_matches!(
+ ctx_handle.recv().await,
+ AllMessages::RuntimeApi(
+ RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(_, tx))
+ ) => {
+ tx.send(Ok(Some(ExecutorParams::default()))).unwrap();
+ }
+ );
+ };
+
+ futures::pin_mut!(test_fut);
+ futures::pin_mut!(overseer);
+ let v = executor::block_on(future::join(test_fut, overseer));
+ v.0
+}
+
#[test]
fn correctly_checks_included_assumption() {
let validation_data: PersistedValidationData = Default::default();
@@ -365,7 +396,7 @@ impl MockValidateCandidateBackend {
impl ValidationBackend for MockValidateCandidateBackend {
async fn validate_candidate(
&mut self,
- _pvf: Pvf,
+ _pvf_with_params: PvfWithExecutorParams,
_timeout: Duration,
_encoded_params: Vec,
) -> Result {
@@ -377,7 +408,10 @@ impl ValidationBackend for MockValidateCandidateBackend {
result
}
- async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result {
+ async fn precheck_pvf(
+ &mut self,
+ _pvf_with_params: PvfWithExecutorParams,
+ ) -> Result {
unreachable!()
}
}
@@ -429,15 +463,23 @@ fn candidate_validation_ok_is_ok() {
let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() };
- let v = executor::block_on(validate_candidate_exhaustive(
- MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)),
- validation_data.clone(),
- validation_code,
- candidate_receipt,
- Arc::new(pov),
- Duration::from_secs(0),
- &Default::default(),
- ))
+ let pool = TaskExecutor::new();
+ let (mut ctx, ctx_handle) =
+ test_helpers::make_subsystem_context::(pool.clone());
+ let metrics = Metrics::default();
+
+ let v = test_with_executor_params(ctx_handle, || {
+ validate_candidate_exhaustive(
+ ctx.sender(),
+ MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)),
+ validation_data.clone(),
+ validation_code,
+ candidate_receipt,
+ Arc::new(pov),
+ Duration::from_secs(0),
+ &metrics,
+ )
+ })
.unwrap();
assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => {
@@ -478,20 +520,27 @@ fn candidate_validation_bad_return_is_invalid() {
let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };
- let v = executor::block_on(validate_candidate_exhaustive(
- MockValidateCandidateBackend::with_hardcoded_result(Err(
- ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout),
- )),
- validation_data,
- validation_code,
- candidate_receipt,
- Arc::new(pov),
- Duration::from_secs(0),
- &Default::default(),
- ))
- .unwrap();
+ let pool = TaskExecutor::new();
+ let (mut ctx, ctx_handle) =
+ test_helpers::make_subsystem_context::(pool.clone());
+ let metrics = Metrics::default();
+
+ let v = test_with_executor_params(ctx_handle, || {
+ validate_candidate_exhaustive(
+ ctx.sender(),
+ MockValidateCandidateBackend::with_hardcoded_result(Err(
+ ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout),
+ )),
+ validation_data,
+ validation_code,
+ candidate_receipt,
+ Arc::new(pov),
+ Duration::from_secs(0),
+ &metrics,
+ )
+ });
- assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::Timeout));
+ assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)));
}
#[test]
@@ -541,18 +590,26 @@ fn candidate_validation_one_ambiguous_error_is_valid() {
let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() };
- let v = executor::block_on(validate_candidate_exhaustive(
- MockValidateCandidateBackend::with_hardcoded_result_list(vec![
- Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
- Ok(validation_result),
- ]),
- validation_data.clone(),
- validation_code,
- candidate_receipt,
- Arc::new(pov),
- Duration::from_secs(0),
- &Default::default(),
- ))
+ let pool = TaskExecutor::new();
+ let (mut ctx, ctx_handle) =
+ test_helpers::make_subsystem_context::(pool.clone());
+ let metrics = Metrics::default();
+
+ let v = test_with_executor_params(ctx_handle, || {
+ validate_candidate_exhaustive(
+ ctx.sender(),
+ MockValidateCandidateBackend::with_hardcoded_result_list(vec![
+ Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
+ Ok(validation_result),
+ ]),
+ validation_data.clone(),
+ validation_code,
+ candidate_receipt,
+ Arc::new(pov),
+ Duration::from_secs(0),
+ &metrics,
+ )
+ })
.unwrap();
assert_matches!(v, ValidationResult::Valid(outputs, used_validation_data) => {
@@ -593,18 +650,26 @@ fn candidate_validation_multiple_ambiguous_errors_is_invalid() {
let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };
- let v = executor::block_on(validate_candidate_exhaustive(
- MockValidateCandidateBackend::with_hardcoded_result_list(vec![
- Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
- Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
- ]),
- validation_data,
- validation_code,
- candidate_receipt,
- Arc::new(pov),
- Duration::from_secs(0),
- &Default::default(),
- ))
+ let pool = TaskExecutor::new();
+ let (mut ctx, ctx_handle) =
+ test_helpers::make_subsystem_context::(pool.clone());
+ let metrics = Metrics::default();
+
+ let v = test_with_executor_params(ctx_handle, || {
+ validate_candidate_exhaustive(
+ ctx.sender(),
+ MockValidateCandidateBackend::with_hardcoded_result_list(vec![
+ Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
+ Err(ValidationError::InvalidCandidate(WasmInvalidCandidate::AmbiguousWorkerDeath)),
+ ]),
+ validation_data,
+ validation_code,
+ candidate_receipt,
+ Arc::new(pov),
+ Duration::from_secs(0),
+ &metrics,
+ )
+ })
.unwrap();
assert_matches!(v, ValidationResult::Invalid(InvalidCandidate::ExecutionError(_)));
@@ -638,17 +703,25 @@ fn candidate_validation_timeout_is_internal_error() {
let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };
- let v = executor::block_on(validate_candidate_exhaustive(
- MockValidateCandidateBackend::with_hardcoded_result(Err(
- ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout),
- )),
- validation_data,
- validation_code,
- candidate_receipt,
- Arc::new(pov),
- Duration::from_secs(0),
- &Default::default(),
- ));
+ let pool = TaskExecutor::new();
+ let (mut ctx, ctx_handle) =
+ test_helpers::make_subsystem_context::(pool.clone());
+ let metrics = Metrics::default();
+
+ let v = test_with_executor_params(ctx_handle, || {
+ validate_candidate_exhaustive(
+ ctx.sender(),
+ MockValidateCandidateBackend::with_hardcoded_result(Err(
+ ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout),
+ )),
+ validation_data,
+ validation_code,
+ candidate_receipt,
+ Arc::new(pov),
+ Duration::from_secs(0),
+ &metrics,
+ )
+ });
assert_matches!(v, Ok(ValidationResult::Invalid(InvalidCandidate::Timeout)));
}
@@ -684,15 +757,23 @@ fn candidate_validation_commitment_hash_mismatch_is_invalid() {
hrmp_watermark: 12345,
};
- let result = executor::block_on(validate_candidate_exhaustive(
- MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)),
- validation_data,
- validation_code,
- candidate_receipt,
- Arc::new(pov),
- Duration::from_secs(0),
- &Default::default(),
- ))
+ let pool = TaskExecutor::new();
+ let (mut ctx, ctx_handle) =
+ test_helpers::make_subsystem_context::(pool.clone());
+ let metrics = Metrics::default();
+
+ let result = test_with_executor_params(ctx_handle, || {
+ validate_candidate_exhaustive(
+ ctx.sender(),
+ MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)),
+ validation_data,
+ validation_code,
+ candidate_receipt,
+ Arc::new(pov),
+ Duration::from_secs(0),
+ &metrics,
+ )
+ })
.unwrap();
// Ensure `post validation` check on the commitments hash works as expected.
@@ -727,7 +808,12 @@ fn candidate_validation_code_mismatch_is_invalid() {
let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };
+ let pool = TaskExecutor::new();
+ let (mut ctx, _ctx_handle) =
+ test_helpers::make_subsystem_context::(pool.clone());
+
let v = executor::block_on(validate_candidate_exhaustive(
+ ctx.sender(),
MockValidateCandidateBackend::with_hardcoded_result(Err(
ValidationError::InvalidCandidate(WasmInvalidCandidate::HardTimeout),
)),
@@ -785,15 +871,23 @@ fn compressed_code_works() {
let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: commitments.hash() };
- let v = executor::block_on(validate_candidate_exhaustive(
- MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)),
- validation_data,
- validation_code,
- candidate_receipt,
- Arc::new(pov),
- Duration::from_secs(0),
- &Default::default(),
- ));
+ let pool = TaskExecutor::new();
+ let (mut ctx, ctx_handle) =
+ test_helpers::make_subsystem_context::(pool.clone());
+ let metrics = Metrics::default();
+
+ let v = test_with_executor_params(ctx_handle, || {
+ validate_candidate_exhaustive(
+ ctx.sender(),
+ MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)),
+ validation_data,
+ validation_code,
+ candidate_receipt,
+ Arc::new(pov),
+ Duration::from_secs(0),
+ &metrics,
+ )
+ });
assert_matches!(v, Ok(ValidationResult::Valid(_, _)));
}
@@ -832,7 +926,12 @@ fn code_decompression_failure_is_error() {
let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };
+ let pool = TaskExecutor::new();
+ let (mut ctx, _ctx_handle) =
+ test_helpers::make_subsystem_context::(pool.clone());
+
let v = executor::block_on(validate_candidate_exhaustive(
+ ctx.sender(),
MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)),
validation_data,
validation_code,
@@ -880,7 +979,12 @@ fn pov_decompression_failure_is_invalid() {
let candidate_receipt = CandidateReceipt { descriptor, commitments_hash: Hash::zero() };
+ let pool = TaskExecutor::new();
+ let (mut ctx, _ctx_handle) =
+ test_helpers::make_subsystem_context::(pool.clone());
+
let v = executor::block_on(validate_candidate_exhaustive(
+ ctx.sender(),
MockValidateCandidateBackend::with_hardcoded_result(Ok(validation_result)),
validation_data,
validation_code,
@@ -907,14 +1011,17 @@ impl MockPreCheckBackend {
impl ValidationBackend for MockPreCheckBackend {
async fn validate_candidate(
&mut self,
- _pvf: Pvf,
+ _pvf_with_params: PvfWithExecutorParams,
_timeout: Duration,
_encoded_params: Vec,
) -> Result {
unreachable!()
}
- async fn precheck_pvf(&mut self, _pvf: Pvf) -> Result {
+ async fn precheck_pvf(
+ &mut self,
+ _pvf_with_params: PvfWithExecutorParams,
+ ) -> Result {
self.result.clone()
}
}
@@ -953,6 +1060,22 @@ fn precheck_works() {
let _ = tx.send(Ok(Some(validation_code.clone())));
}
);
+ assert_matches!(
+ ctx_handle.recv().await,
+ AllMessages::RuntimeApi(
+ RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx))
+ ) => {
+ tx.send(Ok(1u32.into())).unwrap();
+ }
+ );
+ assert_matches!(
+ ctx_handle.recv().await,
+ AllMessages::RuntimeApi(
+ RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(_, tx))
+ ) => {
+ tx.send(Ok(Some(ExecutorParams::default()))).unwrap();
+ }
+ );
assert_matches!(check_result.await, PreCheckOutcome::Valid);
};
@@ -999,6 +1122,22 @@ fn precheck_invalid_pvf_blob_compression() {
let _ = tx.send(Ok(Some(validation_code.clone())));
}
);
+ assert_matches!(
+ ctx_handle.recv().await,
+ AllMessages::RuntimeApi(
+ RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx))
+ ) => {
+ tx.send(Ok(1u32.into())).unwrap();
+ }
+ );
+ assert_matches!(
+ ctx_handle.recv().await,
+ AllMessages::RuntimeApi(
+ RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(_, tx))
+ ) => {
+ tx.send(Ok(Some(ExecutorParams::default()))).unwrap();
+ }
+ );
assert_matches!(check_result.await, PreCheckOutcome::Invalid);
};
@@ -1041,6 +1180,22 @@ fn precheck_properly_classifies_outcomes() {
let _ = tx.send(Ok(Some(validation_code.clone())));
}
);
+ assert_matches!(
+ ctx_handle.recv().await,
+ AllMessages::RuntimeApi(
+ RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionIndexForChild(tx))
+ ) => {
+ tx.send(Ok(1u32.into())).unwrap();
+ }
+ );
+ assert_matches!(
+ ctx_handle.recv().await,
+ AllMessages::RuntimeApi(
+ RuntimeApiMessage::Request(_, RuntimeApiRequest::SessionExecutorParams(_, tx))
+ ) => {
+ tx.send(Ok(Some(ExecutorParams::default()))).unwrap();
+ }
+ );
assert_eq!(check_result.await, precheck_outcome);
};
diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs
index 6725d0061b23..7d7243243ddf 100644
--- a/node/core/dispute-coordinator/src/tests.rs
+++ b/node/core/dispute-coordinator/src/tests.rs
@@ -3339,7 +3339,7 @@ fn informs_chain_selection_when_dispute_concluded_against() {
.await;
let supermajority_threshold =
- polkadot_primitives::v2::supermajority_threshold(test_state.validators.len());
+ polkadot_primitives::supermajority_threshold(test_state.validators.len());
let (valid_vote, invalid_vote) = generate_opposing_votes_pair(
&test_state,
diff --git a/node/core/pvf-checker/src/lib.rs b/node/core/pvf-checker/src/lib.rs
index 7e961dcf4b88..4278b74e0b15 100644
--- a/node/core/pvf-checker/src/lib.rs
+++ b/node/core/pvf-checker/src/lib.rs
@@ -297,7 +297,7 @@ async fn handle_leaves_update(
metrics.on_pvf_observed(outcome.newcomers.len());
metrics.on_pvf_left(outcome.left_num);
for newcomer in outcome.newcomers {
- initiate_precheck(state, sender, recent_block_hash, newcomer, metrics).await;
+ initiate_precheck(state, sender, activated.hash, newcomer, metrics).await;
}
if let Some((new_session_index, credentials)) = new_session_index {
diff --git a/node/core/pvf/Cargo.toml b/node/core/pvf/Cargo.toml
index f19663bab98e..a810c3887c96 100644
--- a/node/core/pvf/Cargo.toml
+++ b/node/core/pvf/Cargo.toml
@@ -27,8 +27,10 @@ parity-scale-codec = { version = "3.3.0", default-features = false, features = [
polkadot-parachain = { path = "../../../parachain" }
polkadot-core-primitives = { path = "../../../core-primitives" }
-polkadot-node-metrics = { path = "../../metrics"}
+polkadot-node-metrics = { path = "../../metrics" }
+polkadot-node-primitives = { path = "../../primitives" }
+polkadot-primitives = { path = "../../../primitives" }
sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" }
diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs
index d2e1e1e90878..7ddbaef38c5a 100644
--- a/node/core/pvf/src/artifacts.rs
+++ b/node/core/pvf/src/artifacts.rs
@@ -17,6 +17,7 @@
use crate::{error::PrepareError, host::PrepareResultSender, prepare::PrepareStats};
use always_assert::always;
use polkadot_parachain::primitives::ValidationCodeHash;
+use polkadot_primitives::vstaging::ExecutorParamsHash;
use std::{
collections::HashMap,
path::{Path, PathBuf},
@@ -37,19 +38,19 @@ impl AsRef<[u8]> for CompiledArtifact {
}
}
-/// Identifier of an artifact. Right now it only encodes a code hash of the PVF. But if we get to
-/// multiple engine implementations the artifact ID should include the engine type as well.
+/// Identifier of an artifact. Encodes a code hash of the PVF and a hash of executor parameter set.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ArtifactId {
pub(crate) code_hash: ValidationCodeHash,
+ pub(crate) executor_params_hash: ExecutorParamsHash,
}
impl ArtifactId {
const PREFIX: &'static str = "wasmtime_";
/// Creates a new artifact ID with the given hash.
- pub fn new(code_hash: ValidationCodeHash) -> Self {
- Self { code_hash }
+ pub fn new(code_hash: ValidationCodeHash, executor_params_hash: ExecutorParamsHash) -> Self {
+ Self { code_hash, executor_params_hash }
}
/// Tries to recover the artifact id from the given file name.
@@ -59,14 +60,18 @@ impl ArtifactId {
use std::str::FromStr as _;
let file_name = file_name.strip_prefix(Self::PREFIX)?;
- let code_hash = Hash::from_str(file_name).ok()?.into();
+ let (code_hash_str, executor_params_hash_str) = file_name.split_once('_')?;
+ let code_hash = Hash::from_str(code_hash_str).ok()?.into();
+ let executor_params_hash =
+ ExecutorParamsHash::from_hash(Hash::from_str(executor_params_hash_str).ok()?);
- Some(Self { code_hash })
+ Some(Self { code_hash, executor_params_hash })
}
/// Returns the expected path to this artifact given the root of the cache.
pub fn path(&self, cache_path: &Path) -> PathBuf {
- let file_name = format!("{}{:#x}", Self::PREFIX, self.code_hash);
+ let file_name =
+ format!("{}{:#x}_{:#x}", Self::PREFIX, self.code_hash, self.executor_params_hash);
cache_path.join(file_name)
}
}
@@ -214,6 +219,7 @@ impl Artifacts {
#[cfg(test)]
mod tests {
use super::{ArtifactId, Artifacts};
+ use polkadot_primitives::vstaging::ExecutorParamsHash;
use sp_core::H256;
use std::{path::Path, str::FromStr};
@@ -224,13 +230,16 @@ mod tests {
assert_eq!(
ArtifactId::from_file_name(
- "wasmtime_0x0022800000000000000000000000000000000000000000000000000000000000"
+ "wasmtime_0x0022800000000000000000000000000000000000000000000000000000000000_0x0033900000000000000000000000000000000000000000000000000000000000"
),
Some(ArtifactId::new(
hex_literal::hex![
"0022800000000000000000000000000000000000000000000000000000000000"
]
- .into()
+ .into(),
+ ExecutorParamsHash::from_hash(sp_core::H256(hex_literal::hex![
+ "0033900000000000000000000000000000000000000000000000000000000000"
+ ])),
)),
);
}
@@ -240,13 +249,12 @@ mod tests {
let path = Path::new("/test");
let hash =
H256::from_str("1234567890123456789012345678901234567890123456789012345678901234")
- .unwrap()
- .into();
+ .unwrap();
assert_eq!(
- ArtifactId::new(hash).path(path).to_str(),
+ ArtifactId::new(hash.into(), ExecutorParamsHash::from_hash(hash)).path(path).to_str(),
Some(
- "/test/wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234"
+ "/test/wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234_0x1234567890123456789012345678901234567890123456789012345678901234"
),
);
}
diff --git a/node/core/pvf/src/execute/queue.rs b/node/core/pvf/src/execute/queue.rs
index c20099b0e798..9f5b0451dc6c 100644
--- a/node/core/pvf/src/execute/queue.rs
+++ b/node/core/pvf/src/execute/queue.rs
@@ -30,8 +30,23 @@ use futures::{
stream::{FuturesUnordered, StreamExt as _},
Future, FutureExt,
};
+use polkadot_node_primitives::BACKING_EXECUTION_TIMEOUT;
+use polkadot_primitives::vstaging::{ExecutorParams, ExecutorParamsHash};
use slotmap::HopSlotMap;
-use std::{collections::VecDeque, fmt, path::PathBuf, time::Duration};
+use std::{
+ collections::VecDeque,
+ fmt,
+ path::PathBuf,
+ time::{Duration, Instant},
+};
+
+/// The amount of time a job for which the queue does not have a compatible worker may wait in the
+/// queue. After that time passes, the queue will kill the first worker which becomes idle to
+/// re-spawn a new worker to execute the job immediately.
+/// To make any sense and not to break things, the value should be greater than minimal execution
+/// timeout in use, and less than the block time.
+const MAX_KEEP_WAITING: Duration =
+ Duration::from_millis(BACKING_EXECUTION_TIMEOUT.as_millis() as u64 * 2);
slotmap::new_key_type! { struct Worker; }
@@ -41,6 +56,7 @@ pub enum ToQueue {
artifact: ArtifactPathId,
execution_timeout: Duration,
params: Vec,
+ executor_params: ExecutorParams,
result_tx: ResultSender,
},
}
@@ -49,12 +65,15 @@ struct ExecuteJob {
artifact: ArtifactPathId,
execution_timeout: Duration,
params: Vec,
+ executor_params: ExecutorParams,
result_tx: ResultSender,
+ waiting_since: Instant,
}
struct WorkerData {
idle: Option,
handle: WorkerHandle,
+ executor_params_hash: ExecutorParamsHash,
}
impl fmt::Debug for WorkerData {
@@ -79,7 +98,17 @@ impl Workers {
self.spawn_inflight + self.running.len() < self.capacity
}
- fn find_available(&self) -> Option {
+ fn find_available(&self, executor_params_hash: ExecutorParamsHash) -> Option {
+ self.running.iter().find_map(|d| {
+ if d.1.idle.is_some() && d.1.executor_params_hash == executor_params_hash {
+ Some(d.0)
+ } else {
+ None
+ }
+ })
+ }
+
+ fn find_idle(&self) -> Option {
self.running
.iter()
.find_map(|d| if d.1.idle.is_some() { Some(d.0) } else { None })
@@ -94,7 +123,7 @@ impl Workers {
}
enum QueueEvent {
- Spawn(IdleWorker, WorkerHandle),
+ Spawn(IdleWorker, WorkerHandle, ExecuteJob),
StartWork(Worker, Outcome, ArtifactId, ResultSender),
}
@@ -154,6 +183,66 @@ impl Queue {
purge_dead(&self.metrics, &mut self.workers).await;
}
}
+
+ /// Tries to assign a job in the queue to a worker. If an idle worker is provided, it does its
+ /// best to find a job with a compatible execution environment unless there are jobs in the
+ /// queue waiting too long. In that case, it kills an existing idle worker and spawns a new
+ /// one. It may spawn an additional worker if that is affordable.
+ /// If all the workers are busy or the queue is empty, it does nothing.
+ /// Should be called every time a new job arrives to the queue or a job finishes.
+ fn try_assign_next_job(&mut self, finished_worker: Option) {
+ // New jobs are always pushed to the tail of the queue; the one at its head is always
+ // the eldest one.
+ let eldest = if let Some(eldest) = self.queue.get(0) { eldest } else { return };
+
+ // By default, we're going to execute the eldest job on any worker slot available, even if
+ // we have to kill and re-spawn a worker
+ let mut worker = None;
+ let mut job_index = 0;
+
+ // But if we're not pressed for time, we can try to find a better job-worker pair not
+ // requiring the expensive kill-spawn operation
+ if eldest.waiting_since.elapsed() < MAX_KEEP_WAITING {
+ if let Some(finished_worker) = finished_worker {
+ if let Some(worker_data) = self.workers.running.get(finished_worker) {
+ for (i, job) in self.queue.iter().enumerate() {
+ if worker_data.executor_params_hash == job.executor_params.hash() {
+ (worker, job_index) = (Some(finished_worker), i);
+ break
+ }
+ }
+ }
+ }
+ }
+
+ if worker.is_none() {
+ // Try to obtain a worker for the job
+ worker = self.workers.find_available(self.queue[job_index].executor_params.hash());
+ }
+
+ if worker.is_none() {
+ if let Some(idle) = self.workers.find_idle() {
+ // No available workers of required type but there are some idle ones of other
+ // types, have to kill one and re-spawn with the correct type
+ if self.workers.running.remove(idle).is_some() {
+ self.metrics.execute_worker().on_retired();
+ }
+ }
+ }
+
+ if worker.is_none() && !self.workers.can_afford_one_more() {
+ // Bad luck, no worker slot can be used to execute the job
+ return
+ }
+
+ let job = self.queue.remove(job_index).expect("Job is just checked to be in queue; qed");
+
+ if let Some(worker) = worker {
+ assign(self, worker, job);
+ } else {
+ spawn_extra_worker(self, job);
+ }
+ }
}
async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
@@ -172,29 +261,30 @@ async fn purge_dead(metrics: &Metrics, workers: &mut Workers) {
}
fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) {
- let ToQueue::Enqueue { artifact, execution_timeout, params, result_tx } = to_queue;
+ let ToQueue::Enqueue { artifact, execution_timeout, params, executor_params, result_tx } =
+ to_queue;
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?artifact.id.code_hash,
"enqueueing an artifact for execution",
);
queue.metrics.execute_enqueued();
- let job = ExecuteJob { artifact, execution_timeout, params, result_tx };
-
- if let Some(available) = queue.workers.find_available() {
- assign(queue, available, job);
- } else {
- if queue.workers.can_afford_one_more() {
- spawn_extra_worker(queue);
- }
- queue.queue.push_back(job);
- }
+ let job = ExecuteJob {
+ artifact,
+ execution_timeout,
+ params,
+ executor_params,
+ result_tx,
+ waiting_since: Instant::now(),
+ };
+ queue.queue.push_back(job);
+ queue.try_assign_next_job(None);
}
async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
match event {
- QueueEvent::Spawn(idle, handle) => {
- handle_worker_spawned(queue, idle, handle);
+ QueueEvent::Spawn(idle, handle, job) => {
+ handle_worker_spawned(queue, idle, handle, job);
},
QueueEvent::StartWork(worker, outcome, artifact_id, result_tx) => {
handle_job_finish(queue, worker, outcome, artifact_id, result_tx);
@@ -202,16 +292,23 @@ async fn handle_mux(queue: &mut Queue, event: QueueEvent) {
}
}
-fn handle_worker_spawned(queue: &mut Queue, idle: IdleWorker, handle: WorkerHandle) {
+fn handle_worker_spawned(
+ queue: &mut Queue,
+ idle: IdleWorker,
+ handle: WorkerHandle,
+ job: ExecuteJob,
+) {
queue.metrics.execute_worker().on_spawned();
queue.workers.spawn_inflight -= 1;
- let worker = queue.workers.running.insert(WorkerData { idle: Some(idle), handle });
+ let worker = queue.workers.running.insert(WorkerData {
+ idle: Some(idle),
+ handle,
+ executor_params_hash: job.executor_params.hash(),
+ });
gum::debug!(target: LOG_TARGET, ?worker, "execute worker spawned");
- if let Some(job) = queue.queue.pop_front() {
- assign(queue, worker, job);
- }
+ assign(queue, worker, job);
}
/// If there are pending jobs in the queue, schedules the next of them onto the just freed up
@@ -280,42 +377,45 @@ fn handle_job_finish(
if let Some(idle_worker) = idle_worker {
if let Some(data) = queue.workers.running.get_mut(worker) {
data.idle = Some(idle_worker);
-
- if let Some(job) = queue.queue.pop_front() {
- assign(queue, worker, job);
- }
+ return queue.try_assign_next_job(Some(worker))
}
} else {
// Note it's possible that the worker was purged already by `purge_dead`
if queue.workers.running.remove(worker).is_some() {
queue.metrics.execute_worker().on_retired();
}
-
- if !queue.queue.is_empty() {
- // The worker has died and we still have work we have to do. Request an extra worker.
- //
- // That can potentially overshoot, but that should be OK.
- spawn_extra_worker(queue);
- }
}
+
+ queue.try_assign_next_job(None);
}
-fn spawn_extra_worker(queue: &mut Queue) {
+fn spawn_extra_worker(queue: &mut Queue, job: ExecuteJob) {
queue.metrics.execute_worker().on_begin_spawn();
gum::debug!(target: LOG_TARGET, "spawning an extra worker");
queue
.mux
- .push(spawn_worker_task(queue.program_path.clone(), queue.spawn_timeout).boxed());
+ .push(spawn_worker_task(queue.program_path.clone(), job, queue.spawn_timeout).boxed());
queue.workers.spawn_inflight += 1;
}
-async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> QueueEvent {
+/// Spawns a new worker to execute a pre-assigned job.
+/// A worker is never spawned as idle; a job to be executed by the worker has to be determined
+/// beforehand. In such a way, a race condition is avoided: during the worker being spawned,
+/// another job in the queue, with an incompatible execution environment, may become stale, and
+/// the queue would have to kill a newly started worker and spawn another one.
+/// Nevertheless, if the worker finishes executing the job, it becomes idle and may be used to execute other jobs with a compatible execution environment.
+async fn spawn_worker_task(
+ program_path: PathBuf,
+ job: ExecuteJob,
+ spawn_timeout: Duration,
+) -> QueueEvent {
use futures_timer::Delay;
loop {
- match super::worker::spawn(&program_path, spawn_timeout).await {
- Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle),
+ match super::worker::spawn(&program_path, job.executor_params.clone(), spawn_timeout).await
+ {
+ Ok((idle, handle)) => break QueueEvent::Spawn(idle, handle, job),
Err(err) => {
gum::warn!(target: LOG_TARGET, "failed to spawn an execute worker: {:?}", err);
@@ -328,7 +428,8 @@ async fn spawn_worker_task(program_path: PathBuf, spawn_timeout: Duration) -> Qu
/// Ask the given worker to perform the given job.
///
-/// The worker must be running and idle.
+/// The worker must be running and idle. The job and the worker must share the same execution
+/// environment parameter set.
fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
gum::debug!(
target: LOG_TARGET,
@@ -337,6 +438,16 @@ fn assign(queue: &mut Queue, worker: Worker, job: ExecuteJob) {
"assigning the execute worker",
);
+ debug_assert_eq!(
+ queue
+ .workers
+ .running
+ .get(worker)
+ .expect("caller must provide existing worker; qed")
+ .executor_params_hash,
+ job.executor_params.hash()
+ );
+
let idle = queue.workers.claim_idle(worker).expect(
"this caller must supply a worker which is idle and running;
thus claim_idle cannot return None;
diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs
index 4b19d4029be5..5db6a6261cc9 100644
--- a/node/core/pvf/src/execute/worker.rs
+++ b/node/core/pvf/src/execute/worker.rs
@@ -28,7 +28,9 @@ use cpu_time::ProcessTime;
use futures::{pin_mut, select_biased, FutureExt};
use futures_timer::Delay;
use parity_scale_codec::{Decode, Encode};
+
use polkadot_parachain::primitives::ValidationResult;
+use polkadot_primitives::vstaging::ExecutorParams;
use std::{
path::{Path, PathBuf},
sync::{mpsc::channel, Arc},
@@ -37,13 +39,29 @@ use std::{
use tokio::{io, net::UnixStream};
/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
+/// Sends a handshake message to the worker as soon as it is spawned.
///
/// The program should be able to handle ` execute-worker ` invocation.
pub async fn spawn(
program_path: &Path,
+ executor_params: ExecutorParams,
spawn_timeout: Duration,
) -> Result<(IdleWorker, WorkerHandle), SpawnErr> {
- spawn_with_program_path("execute", program_path, &["execute-worker"], spawn_timeout).await
+ let (mut idle_worker, worker_handle) =
+ spawn_with_program_path("execute", program_path, &["execute-worker"], spawn_timeout)
+ .await?;
+ send_handshake(&mut idle_worker.stream, Handshake { executor_params })
+ .await
+ .map_err(|error| {
+ gum::warn!(
+ target: LOG_TARGET,
+ worker_pid = %idle_worker.pid,
+ ?error,
+ "failed to send a handshake to the spawned worker",
+ );
+ SpawnErr::Handshake
+ })?;
+ Ok((idle_worker, worker_handle))
}
/// Outcome of PVF execution.
@@ -159,6 +177,21 @@ pub async fn start_work(
}
}
+async fn send_handshake(stream: &mut UnixStream, handshake: Handshake) -> io::Result<()> {
+ framed_send(stream, &handshake.encode()).await
+}
+
+async fn recv_handshake(stream: &mut UnixStream) -> io::Result {
+ let handshake_enc = framed_recv(stream).await?;
+ let handshake = Handshake::decode(&mut &handshake_enc[..]).map_err(|_| {
+ io::Error::new(
+ io::ErrorKind::Other,
+ "execute pvf recv_handshake: failed to decode Handshake".to_owned(),
+ )
+ })?;
+ Ok(handshake)
+}
+
async fn send_request(
stream: &mut UnixStream,
artifact_path: &Path,
@@ -203,6 +236,11 @@ async fn recv_response(stream: &mut UnixStream) -> io::Result {
})
}
+#[derive(Encode, Decode)]
+struct Handshake {
+ executor_params: ExecutorParams,
+}
+
#[derive(Encode, Decode)]
pub enum Response {
Ok { result_descriptor: ValidationResult, duration: Duration },
@@ -225,7 +263,9 @@ impl Response {
/// the path to the socket used to communicate with the host.
pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("execute", socket_path, |rt_handle, mut stream| async move {
- let executor = Arc::new(Executor::new().map_err(|e| {
+ let handshake = recv_handshake(&mut stream).await?;
+
+ let executor = Arc::new(Executor::new(handshake.executor_params).map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("cannot create executor: {}", e))
})?);
diff --git a/node/core/pvf/src/executor_intf.rs b/node/core/pvf/src/executor_intf.rs
index c5578f5f81ad..0cc533f4a85e 100644
--- a/node/core/pvf/src/executor_intf.rs
+++ b/node/core/pvf/src/executor_intf.rs
@@ -16,6 +16,7 @@
//! Interface to the Substrate Executor
+use polkadot_primitives::vstaging::executor_params::{ExecutorParam, ExecutorParams};
use sc_executor_common::{
runtime_blob::RuntimeBlob,
wasm_runtime::{InvokeMethod, WasmModule as _},
@@ -46,7 +47,11 @@ const EXTRA_HEAP_PAGES: u64 = 2048;
/// The number of bytes devoted for the stack during wasm execution of a PVF.
const NATIVE_STACK_MAX: u32 = 256 * 1024 * 1024;
-const CONFIG: Config = Config {
+// VALUES OF THE DEFAULT CONFIGURATION SHOULD NEVER BE CHANGED
+// They are used as base values for the execution environment parametrization.
+// To overwrite them, add new ones to `EXECUTOR_PARAMS` in the `session_info` pallet and perform
+// a runtime upgrade to make them active.
+const DEFAULT_CONFIG: Config = Config {
allow_missing_func_imports: true,
cache_path: None,
semantics: Semantics {
@@ -97,17 +102,42 @@ pub fn prevalidate(code: &[u8]) -> Result Result, sc_executor_common::error::WasmError> {
- sc_executor_wasmtime::prepare_runtime_artifact(blob, &CONFIG.semantics)
+pub fn prepare(
+ blob: RuntimeBlob,
+ executor_params: ExecutorParams,
+) -> Result, sc_executor_common::error::WasmError> {
+ let semantics = params_to_wasmtime_semantics(executor_params)
+ .map_err(|e| sc_executor_common::error::WasmError::Other(e))?;
+ sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics)
+}
+
+fn params_to_wasmtime_semantics(par: ExecutorParams) -> Result {
+ let mut sem = DEFAULT_CONFIG.semantics.clone();
+ let mut stack_limit = if let Some(stack_limit) = sem.deterministic_stack_limit.clone() {
+ stack_limit
+ } else {
+ return Err("No default stack limit set".to_owned())
+ };
+ for p in par.iter() {
+ match p {
+ ExecutorParam::MaxMemorySize(mms) => sem.max_memory_size = Some(*mms as usize),
+ ExecutorParam::StackLogicalMax(slm) => stack_limit.logical_max = *slm,
+ ExecutorParam::StackNativeMax(snm) => stack_limit.native_stack_max = *snm,
+ ExecutorParam::PrecheckingMaxMemory(_) => (), // TODO: Not implemented yet
+ }
+ }
+ sem.deterministic_stack_limit = Some(stack_limit);
+ Ok(sem)
}
pub struct Executor {
thread_pool: rayon::ThreadPool,
spawner: TaskSpawner,
+ config: Config,
}
impl Executor {
- pub fn new() -> Result {
+ pub fn new(params: ExecutorParams) -> Result {
// Wasmtime powers the Substrate Executor. It compiles the wasm bytecode into native code.
// That native code does not create any stacks and just reuses the stack of the thread that
// wasmtime was invoked from.
@@ -154,7 +184,10 @@ impl Executor {
let spawner =
TaskSpawner::new().map_err(|e| format!("cannot create task spawner: {}", e))?;
- Ok(Self { thread_pool, spawner })
+ let mut config = DEFAULT_CONFIG.clone();
+ config.semantics = params_to_wasmtime_semantics(params)?;
+
+ Ok(Self { thread_pool, spawner, config })
}
/// Executes the given PVF in the form of a compiled artifact and returns the result of execution
@@ -183,7 +216,7 @@ impl Executor {
s.spawn(move |_| {
// spawn does not return a value, so we need to use a variable to pass the result.
*result = Some(
- do_execute(compiled_artifact_path, params, spawner)
+ do_execute(compiled_artifact_path, self.config.clone(), params, spawner)
.map_err(|err| format!("execute error: {:?}", err)),
);
});
@@ -195,6 +228,7 @@ impl Executor {
unsafe fn do_execute(
compiled_artifact_path: &Path,
+ config: Config,
params: &[u8],
spawner: impl sp_core::traits::SpawnNamed + 'static,
) -> Result, sc_executor_common::error::Error> {
@@ -208,7 +242,7 @@ unsafe fn do_execute(
sc_executor::with_externalities_safe(&mut ext, || {
let runtime = sc_executor_wasmtime::create_runtime_from_artifact::(
compiled_artifact_path,
- CONFIG,
+ config,
)?;
runtime.new_instance()?.call(InvokeMethod::Export("validate_block"), params)
})?
diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs
index 0ee0b1442fda..b6f515b09d8d 100644
--- a/node/core/pvf/src/host.rs
+++ b/node/core/pvf/src/host.rs
@@ -25,7 +25,7 @@ use crate::{
error::PrepareError,
execute,
metrics::Metrics,
- prepare, PrepareResult, Priority, Pvf, ValidationError, LOG_TARGET,
+ prepare, PrepareResult, Priority, PvfWithExecutorParams, ValidationError, LOG_TARGET,
};
use always_assert::never;
use futures::{
@@ -33,6 +33,7 @@ use futures::{
Future, FutureExt, SinkExt, StreamExt,
};
use polkadot_parachain::primitives::ValidationResult;
+use polkadot_primitives::vstaging::ExecutorParams;
use std::{
collections::HashMap,
path::{Path, PathBuf},
@@ -83,11 +84,11 @@ impl ValidationHost {
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
pub async fn precheck_pvf(
&mut self,
- pvf: Pvf,
+ pvf_with_params: PvfWithExecutorParams,
result_tx: PrepareResultSender,
) -> Result<(), String> {
self.to_host_tx
- .send(ToHost::PrecheckPvf { pvf, result_tx })
+ .send(ToHost::PrecheckPvf { pvf_with_params, result_tx })
.await
.map_err(|_| "the inner loop hung up".to_string())
}
@@ -101,7 +102,7 @@ impl ValidationHost {
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
pub async fn execute_pvf(
&mut self,
- pvf: Pvf,
+ pvf_with_params: PvfWithExecutorParams,
execution_timeout: Duration,
params: Vec,
priority: Priority,
@@ -109,7 +110,7 @@ impl ValidationHost {
) -> Result<(), String> {
self.to_host_tx
.send(ToHost::ExecutePvf(ExecutePvfInputs {
- pvf,
+ pvf_with_params,
execution_timeout,
params,
priority,
@@ -125,7 +126,10 @@ impl ValidationHost {
/// situations this function should return immediately.
///
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
- pub async fn heads_up(&mut self, active_pvfs: Vec) -> Result<(), String> {
+ pub async fn heads_up(
+ &mut self,
+ active_pvfs: Vec,
+ ) -> Result<(), String> {
self.to_host_tx
.send(ToHost::HeadsUp { active_pvfs })
.await
@@ -134,13 +138,13 @@ impl ValidationHost {
}
enum ToHost {
- PrecheckPvf { pvf: Pvf, result_tx: PrepareResultSender },
+ PrecheckPvf { pvf_with_params: PvfWithExecutorParams, result_tx: PrepareResultSender },
ExecutePvf(ExecutePvfInputs),
- HeadsUp { active_pvfs: Vec },
+ HeadsUp { active_pvfs: Vec },
}
struct ExecutePvfInputs {
- pvf: Pvf,
+ pvf_with_params: PvfWithExecutorParams,
execution_timeout: Duration,
params: Vec,
priority: Priority,
@@ -265,6 +269,7 @@ pub fn start(config: Config, metrics: Metrics) -> (ValidationHost, impl Future,
+ executor_params: ExecutorParams,
result_tx: ResultSender,
}
@@ -279,11 +284,13 @@ impl AwaitingPrepare {
artifact_id: ArtifactId,
execution_timeout: Duration,
params: Vec,
+ executor_params: ExecutorParams,
result_tx: ResultSender,
) {
self.0.entry(artifact_id).or_default().push(PendingExecutionRequest {
execution_timeout,
params,
+ executor_params,
result_tx,
});
}
@@ -420,8 +427,8 @@ async fn handle_to_host(
to_host: ToHost,
) -> Result<(), Fatal> {
match to_host {
- ToHost::PrecheckPvf { pvf, result_tx } => {
- handle_precheck_pvf(artifacts, prepare_queue, pvf, result_tx).await?;
+ ToHost::PrecheckPvf { pvf_with_params, result_tx } => {
+ handle_precheck_pvf(artifacts, prepare_queue, pvf_with_params, result_tx).await?;
},
ToHost::ExecutePvf(inputs) => {
handle_execute_pvf(
@@ -449,10 +456,10 @@ async fn handle_to_host(
async fn handle_precheck_pvf(
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender,
- pvf: Pvf,
+ pvf_with_params: PvfWithExecutorParams,
result_sender: PrepareResultSender,
) -> Result<(), Fatal> {
- let artifact_id = pvf.as_artifact_id();
+ let artifact_id = pvf_with_params.as_artifact_id();
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
@@ -474,7 +481,7 @@ async fn handle_precheck_pvf(
prepare_queue,
prepare::ToQueue::Enqueue {
priority: Priority::Normal,
- pvf,
+ pvf_with_params,
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
},
)
@@ -500,8 +507,9 @@ async fn handle_execute_pvf(
awaiting_prepare: &mut AwaitingPrepare,
inputs: ExecutePvfInputs,
) -> Result<(), Fatal> {
- let ExecutePvfInputs { pvf, execution_timeout, params, priority, result_tx } = inputs;
- let artifact_id = pvf.as_artifact_id();
+ let ExecutePvfInputs { pvf_with_params, execution_timeout, params, priority, result_tx } =
+ inputs;
+ let artifact_id = pvf_with_params.as_artifact_id();
if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
@@ -515,19 +523,26 @@ async fn handle_execute_pvf(
artifact: ArtifactPathId::new(artifact_id, cache_path),
execution_timeout,
params,
+ executor_params: pvf_with_params.executor_params(),
result_tx,
},
)
.await?;
},
ArtifactState::Preparing { .. } => {
- awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
+ awaiting_prepare.add(
+ artifact_id,
+ execution_timeout,
+ params,
+ pvf_with_params.executor_params(),
+ result_tx,
+ );
},
ArtifactState::FailedToProcess { last_time_failed, num_failures, error } => {
if can_retry_prepare_after_failure(*last_time_failed, *num_failures, error) {
gum::warn!(
target: LOG_TARGET,
- ?pvf,
+ ?pvf_with_params,
?artifact_id,
?last_time_failed,
%num_failures,
@@ -541,11 +556,12 @@ async fn handle_execute_pvf(
waiting_for_response: Vec::new(),
num_failures: *num_failures,
};
+ let executor_params = pvf_with_params.executor_params().clone();
send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue {
priority,
- pvf,
+ pvf_with_params,
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
},
)
@@ -553,7 +569,13 @@ async fn handle_execute_pvf(
// Add an execution request that will wait to run after this prepare job has
// finished.
- awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
+ awaiting_prepare.add(
+ artifact_id,
+ execution_timeout,
+ params,
+ executor_params,
+ result_tx,
+ );
} else {
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
}
@@ -562,19 +584,20 @@ async fn handle_execute_pvf(
} else {
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
// PVF.
+ let executor_params = pvf_with_params.executor_params();
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue {
priority,
- pvf,
+ pvf_with_params,
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
},
)
.await?;
// Add an execution request that will wait to run after this prepare job has finished.
- awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
+ awaiting_prepare.add(artifact_id, execution_timeout, params, executor_params, result_tx);
}
Ok(())
@@ -583,7 +606,7 @@ async fn handle_execute_pvf(
async fn handle_heads_up(
artifacts: &mut Artifacts,
prepare_queue: &mut mpsc::Sender,
- active_pvfs: Vec,
+ active_pvfs: Vec,
) -> Result<(), Fatal> {
let now = SystemTime::now();
@@ -619,7 +642,7 @@ async fn handle_heads_up(
prepare_queue,
prepare::ToQueue::Enqueue {
priority: Priority::Normal,
- pvf: active_pvf,
+ pvf_with_params: active_pvf,
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
},
)
@@ -635,7 +658,7 @@ async fn handle_heads_up(
prepare_queue,
prepare::ToQueue::Enqueue {
priority: Priority::Normal,
- pvf: active_pvf,
+ pvf_with_params: active_pvf,
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
},
)
@@ -699,7 +722,9 @@ async fn handle_prepare_done(
// It's finally time to dispatch all the execution requests that were waiting for this artifact
// to be prepared.
let pending_requests = awaiting_prepare.take(&artifact_id);
- for PendingExecutionRequest { execution_timeout, params, result_tx } in pending_requests {
+ for PendingExecutionRequest { execution_timeout, params, executor_params, result_tx } in
+ pending_requests
+ {
if result_tx.is_canceled() {
// Preparation could've taken quite a bit of time and the requester may be not interested
// in execution anymore, in which case we just skip the request.
@@ -718,6 +743,7 @@ async fn handle_prepare_done(
artifact: ArtifactPathId::new(artifact_id.clone(), cache_path),
execution_timeout,
params,
+ executor_params,
result_tx,
},
)
@@ -856,7 +882,7 @@ mod tests {
/// Creates a new PVF which artifact id can be uniquely identified by the given number.
fn artifact_id(descriminator: u32) -> ArtifactId {
- Pvf::from_discriminator(descriminator).as_artifact_id()
+ PvfWithExecutorParams::from_discriminator(descriminator).as_artifact_id()
}
fn artifact_path(descriminator: u32) -> PathBuf {
@@ -1065,7 +1091,7 @@ mod tests {
let mut test = builder.build();
let mut host = test.host_handle();
- host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();
+ host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap();
let to_sweeper_rx = &mut test.to_sweeper_rx;
run_until(
@@ -1079,7 +1105,7 @@ mod tests {
// Extend TTL for the first artifact and make sure we don't receive another file removal
// request.
- host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();
+ host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap();
test.poll_ensure_to_sweeper_is_empty().await;
}
@@ -1090,7 +1116,7 @@ mod tests {
let (result_tx, result_rx_pvf_1_1) = oneshot::channel();
host.execute_pvf(
- Pvf::from_discriminator(1),
+ PvfWithExecutorParams::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf1".to_vec(),
Priority::Normal,
@@ -1101,7 +1127,7 @@ mod tests {
let (result_tx, result_rx_pvf_1_2) = oneshot::channel();
host.execute_pvf(
- Pvf::from_discriminator(1),
+ PvfWithExecutorParams::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf1".to_vec(),
Priority::Critical,
@@ -1112,7 +1138,7 @@ mod tests {
let (result_tx, result_rx_pvf_2) = oneshot::channel();
host.execute_pvf(
- Pvf::from_discriminator(2),
+ PvfWithExecutorParams::from_discriminator(2),
TEST_EXECUTION_TIMEOUT,
b"pvf2".to_vec(),
Priority::Normal,
@@ -1190,7 +1216,9 @@ mod tests {
// First, test a simple precheck request.
let (result_tx, result_rx) = oneshot::channel();
- host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();
+ host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx)
+ .await
+ .unwrap();
// The queue received the prepare request.
assert_matches!(
@@ -1214,7 +1242,9 @@ mod tests {
let mut precheck_receivers = Vec::new();
for _ in 0..3 {
let (result_tx, result_rx) = oneshot::channel();
- host.precheck_pvf(Pvf::from_discriminator(2), result_tx).await.unwrap();
+ host.precheck_pvf(PvfWithExecutorParams::from_discriminator(2), result_tx)
+ .await
+ .unwrap();
precheck_receivers.push(result_rx);
}
// Received prepare request.
@@ -1249,7 +1279,7 @@ mod tests {
// Send PVF for the execution and request the prechecking for it.
let (result_tx, result_rx_execute) = oneshot::channel();
host.execute_pvf(
- Pvf::from_discriminator(1),
+ PvfWithExecutorParams::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf2".to_vec(),
Priority::Critical,
@@ -1264,7 +1294,9 @@ mod tests {
);
let (result_tx, result_rx) = oneshot::channel();
- host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();
+ host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx)
+ .await
+ .unwrap();
// Suppose the preparation failed, the execution queue is empty and both
// "clients" receive their results.
@@ -1286,13 +1318,15 @@ mod tests {
let mut precheck_receivers = Vec::new();
for _ in 0..3 {
let (result_tx, result_rx) = oneshot::channel();
- host.precheck_pvf(Pvf::from_discriminator(2), result_tx).await.unwrap();
+ host.precheck_pvf(PvfWithExecutorParams::from_discriminator(2), result_tx)
+ .await
+ .unwrap();
precheck_receivers.push(result_rx);
}
let (result_tx, _result_rx_execute) = oneshot::channel();
host.execute_pvf(
- Pvf::from_discriminator(2),
+ PvfWithExecutorParams::from_discriminator(2),
TEST_EXECUTION_TIMEOUT,
b"pvf2".to_vec(),
Priority::Critical,
@@ -1332,7 +1366,9 @@ mod tests {
// Submit a precheck request that fails.
let (result_tx, result_rx) = oneshot::channel();
- host.precheck_pvf(Pvf::from_discriminator(1), result_tx).await.unwrap();
+ host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx)
+ .await
+ .unwrap();
// The queue received the prepare request.
assert_matches!(
@@ -1354,7 +1390,9 @@ mod tests {
// Submit another precheck request.
let (result_tx_2, result_rx_2) = oneshot::channel();
- host.precheck_pvf(Pvf::from_discriminator(1), result_tx_2).await.unwrap();
+ host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx_2)
+ .await
+ .unwrap();
// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;
@@ -1368,7 +1406,9 @@ mod tests {
// Submit another precheck request.
let (result_tx_3, result_rx_3) = oneshot::channel();
- host.precheck_pvf(Pvf::from_discriminator(1), result_tx_3).await.unwrap();
+ host.precheck_pvf(PvfWithExecutorParams::from_discriminator(1), result_tx_3)
+ .await
+ .unwrap();
// Assert the prepare queue is empty - we do not retry for precheck requests.
test.poll_ensure_to_prepare_queue_is_empty().await;
@@ -1388,7 +1428,7 @@ mod tests {
// Submit a execute request that fails.
let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
- Pvf::from_discriminator(1),
+ PvfWithExecutorParams::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(),
Priority::Critical,
@@ -1418,7 +1458,7 @@ mod tests {
// Submit another execute request. We shouldn't try to prepare again, yet.
let (result_tx_2, result_rx_2) = oneshot::channel();
host.execute_pvf(
- Pvf::from_discriminator(1),
+ PvfWithExecutorParams::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(),
Priority::Critical,
@@ -1440,7 +1480,7 @@ mod tests {
// Submit another execute request.
let (result_tx_3, result_rx_3) = oneshot::channel();
host.execute_pvf(
- Pvf::from_discriminator(1),
+ PvfWithExecutorParams::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(),
Priority::Critical,
@@ -1490,7 +1530,7 @@ mod tests {
// Submit an execute request that fails.
let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
- Pvf::from_discriminator(1),
+ PvfWithExecutorParams::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(),
Priority::Critical,
@@ -1523,7 +1563,7 @@ mod tests {
// Submit another execute request.
let (result_tx_2, result_rx_2) = oneshot::channel();
host.execute_pvf(
- Pvf::from_discriminator(1),
+ PvfWithExecutorParams::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(),
Priority::Critical,
@@ -1548,7 +1588,7 @@ mod tests {
// Submit another execute request.
let (result_tx_3, result_rx_3) = oneshot::channel();
host.execute_pvf(
- Pvf::from_discriminator(1),
+ PvfWithExecutorParams::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf".to_vec(),
Priority::Critical,
@@ -1575,7 +1615,7 @@ mod tests {
let mut host = test.host_handle();
// Submit a heads-up request that fails.
- host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();
+ host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap();
// The queue received the prepare request.
assert_matches!(
@@ -1592,7 +1632,7 @@ mod tests {
.unwrap();
// Submit another heads-up request.
- host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();
+ host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap();
// Assert the prepare queue is empty.
test.poll_ensure_to_prepare_queue_is_empty().await;
@@ -1601,7 +1641,7 @@ mod tests {
futures_timer::Delay::new(PREPARE_FAILURE_COOLDOWN).await;
// Submit another heads-up request.
- host.heads_up(vec![Pvf::from_discriminator(1)]).await.unwrap();
+ host.heads_up(vec![PvfWithExecutorParams::from_discriminator(1)]).await.unwrap();
// Assert the prepare queue contains the request.
assert_matches!(
@@ -1617,7 +1657,7 @@ mod tests {
let (result_tx, result_rx) = oneshot::channel();
host.execute_pvf(
- Pvf::from_discriminator(1),
+ PvfWithExecutorParams::from_discriminator(1),
TEST_EXECUTION_TIMEOUT,
b"pvf1".to_vec(),
Priority::Normal,
diff --git a/node/core/pvf/src/lib.rs b/node/core/pvf/src/lib.rs
index 04c7d5323b30..3de5495a2eec 100644
--- a/node/core/pvf/src/lib.rs
+++ b/node/core/pvf/src/lib.rs
@@ -110,7 +110,7 @@ pub use sp_tracing;
pub use error::{InvalidCandidate, PrepareError, PrepareResult, ValidationError};
pub use prepare::PrepareStats;
pub use priority::Priority;
-pub use pvf::Pvf;
+pub use pvf::{Pvf, PvfWithExecutorParams};
pub use host::{start, Config, ValidationHost};
pub use metrics::Metrics;
diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs
index 49670e4c1ac2..1c4f399f6ebf 100644
--- a/node/core/pvf/src/prepare/pool.rs
+++ b/node/core/pvf/src/prepare/pool.rs
@@ -25,6 +25,7 @@ use always_assert::never;
use futures::{
channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt,
};
+use polkadot_primitives::vstaging::ExecutorParams;
use slotmap::HopSlotMap;
use std::{
fmt,
@@ -69,6 +70,7 @@ pub enum ToPool {
worker: Worker,
code: Arc>,
artifact_path: PathBuf,
+ executor_params: ExecutorParams,
preparation_timeout: Duration,
},
}
@@ -214,7 +216,7 @@ fn handle_to_pool(
metrics.prepare_worker().on_begin_spawn();
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
},
- ToPool::StartWork { worker, code, artifact_path, preparation_timeout } => {
+ ToPool::StartWork { worker, code, artifact_path, executor_params, preparation_timeout } => {
if let Some(data) = spawned.get_mut(worker) {
if let Some(idle) = data.idle.take() {
let preparation_timer = metrics.time_preparation();
@@ -226,6 +228,7 @@ fn handle_to_pool(
code,
cache_path.to_owned(),
artifact_path,
+ executor_params,
preparation_timeout,
preparation_timer,
)
@@ -275,12 +278,20 @@ async fn start_work_task(
code: Arc>,
cache_path: PathBuf,
artifact_path: PathBuf,
+ executor_params: ExecutorParams,
preparation_timeout: Duration,
_preparation_timer: Option,
) -> PoolEvent {
- let outcome =
- worker::start_work(&metrics, idle, code, &cache_path, artifact_path, preparation_timeout)
- .await;
+ let outcome = worker::start_work(
+ &metrics,
+ idle,
+ code,
+ &cache_path,
+ artifact_path,
+ executor_params,
+ preparation_timeout,
+ )
+ .await;
PoolEvent::StartWork(worker, outcome)
}
diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs
index d8dd90688c4f..939f42ea62bf 100644
--- a/node/core/pvf/src/prepare/queue.rs
+++ b/node/core/pvf/src/prepare/queue.rs
@@ -17,7 +17,10 @@
//! A queue that handles requests for PVF preparation.
use super::pool::{self, Worker};
-use crate::{artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, Pvf, LOG_TARGET};
+use crate::{
+ artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, PvfWithExecutorParams,
+ LOG_TARGET,
+};
use always_assert::{always, never};
use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt};
use std::{
@@ -33,7 +36,11 @@ pub enum ToQueue {
///
/// Note that it is incorrect to enqueue the same PVF again without first receiving the
/// [`FromQueue`] response.
- Enqueue { priority: Priority, pvf: Pvf, preparation_timeout: Duration },
+ Enqueue {
+ priority: Priority,
+ pvf_with_params: PvfWithExecutorParams,
+ preparation_timeout: Duration,
+ },
}
/// A response from queue.
@@ -78,7 +85,7 @@ slotmap::new_key_type! { pub struct Job; }
struct JobData {
/// The priority of this job. Can be bumped.
priority: Priority,
- pvf: Pvf,
+ pvf_with_params: PvfWithExecutorParams,
/// The timeout for the preparation job.
preparation_timeout: Duration,
worker: Option,
@@ -208,8 +215,8 @@ impl Queue {
async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fatal> {
match to_queue {
- ToQueue::Enqueue { priority, pvf, preparation_timeout } => {
- handle_enqueue(queue, priority, pvf, preparation_timeout).await?;
+ ToQueue::Enqueue { priority, pvf_with_params, preparation_timeout } => {
+ handle_enqueue(queue, priority, pvf_with_params, preparation_timeout).await?;
},
}
Ok(())
@@ -218,19 +225,19 @@ async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fat
async fn handle_enqueue(
queue: &mut Queue,
priority: Priority,
- pvf: Pvf,
+ pvf_with_params: PvfWithExecutorParams,
preparation_timeout: Duration,
) -> Result<(), Fatal> {
gum::debug!(
target: LOG_TARGET,
- validation_code_hash = ?pvf.code_hash,
+ validation_code_hash = ?pvf_with_params.code_hash(),
?priority,
?preparation_timeout,
"PVF is enqueued for preparation.",
);
queue.metrics.prepare_enqueued();
- let artifact_id = pvf.as_artifact_id();
+ let artifact_id = pvf_with_params.as_artifact_id();
if never!(
queue.artifact_id_to_job.contains_key(&artifact_id),
"second Enqueue sent for a known artifact"
@@ -247,7 +254,10 @@ async fn handle_enqueue(
return Ok(())
}
- let job = queue.jobs.insert(JobData { priority, pvf, preparation_timeout, worker: None });
+ let job =
+ queue
+ .jobs
+ .insert(JobData { priority, pvf_with_params, preparation_timeout, worker: None });
queue.artifact_id_to_job.insert(artifact_id, job);
if let Some(available) = find_idle_worker(queue) {
@@ -338,7 +348,7 @@ async fn handle_worker_concluded(
// this can't be None;
// qed.
let job_data = never_none!(queue.jobs.remove(job));
- let artifact_id = job_data.pvf.as_artifact_id();
+ let artifact_id = job_data.pvf_with_params.as_artifact_id();
queue.artifact_id_to_job.remove(&artifact_id);
@@ -424,7 +434,7 @@ async fn spawn_extra_worker(queue: &mut Queue, critical: bool) -> Result<(), Fat
async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal> {
let job_data = &mut queue.jobs[job];
- let artifact_id = job_data.pvf.as_artifact_id();
+ let artifact_id = job_data.pvf_with_params.as_artifact_id();
let artifact_path = artifact_id.path(&queue.cache_path);
job_data.worker = Some(worker);
@@ -435,8 +445,9 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal
&mut queue.to_pool_tx,
pool::ToPool::StartWork {
worker,
- code: job_data.pvf.code.clone(),
+ code: job_data.pvf_with_params.code(),
artifact_path,
+ executor_params: job_data.pvf_with_params.executor_params(),
preparation_timeout: job_data.preparation_timeout,
},
)
@@ -503,8 +514,8 @@ mod tests {
use std::task::Poll;
/// Creates a new PVF which artifact id can be uniquely identified by the given number.
- fn pvf(descriminator: u32) -> Pvf {
- Pvf::from_discriminator(descriminator)
+ fn pvf_with_params(descriminator: u32) -> PvfWithExecutorParams {
+ PvfWithExecutorParams::from_discriminator(descriminator)
}
async fn run_until(
@@ -613,7 +624,7 @@ mod tests {
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
- pvf: pvf(1),
+ pvf_with_params: pvf_with_params(1),
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
@@ -626,7 +637,10 @@ mod tests {
result: Ok(PrepareStats::default()),
});
- assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
+ assert_eq!(
+ test.poll_and_recv_from_queue().await.artifact_id,
+ pvf_with_params(1).as_artifact_id()
+ );
}
#[tokio::test]
@@ -635,12 +649,20 @@ mod tests {
let priority = Priority::Normal;
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT;
- test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout });
- test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout });
+ test.send_queue(ToQueue::Enqueue {
+ priority,
+ pvf_with_params: PvfWithExecutorParams::from_discriminator(1),
+ preparation_timeout,
+ });
+ test.send_queue(ToQueue::Enqueue {
+ priority,
+ pvf_with_params: PvfWithExecutorParams::from_discriminator(2),
+ preparation_timeout,
+ });
// Start a non-precheck preparation for this one.
test.send_queue(ToQueue::Enqueue {
priority,
- pvf: pvf(3),
+ pvf_with_params: PvfWithExecutorParams::from_discriminator(3),
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
});
@@ -669,7 +691,7 @@ mod tests {
// Enqueue a critical job.
test.send_queue(ToQueue::Enqueue {
priority: Priority::Critical,
- pvf: pvf(4),
+ pvf_with_params: PvfWithExecutorParams::from_discriminator(4),
preparation_timeout,
});
@@ -685,7 +707,7 @@ mod tests {
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
- pvf: pvf(1),
+ pvf_with_params: PvfWithExecutorParams::from_discriminator(1),
preparation_timeout,
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
@@ -696,7 +718,7 @@ mod tests {
// Enqueue a critical job, which warrants spawning over the soft limit.
test.send_queue(ToQueue::Enqueue {
priority: Priority::Critical,
- pvf: pvf(2),
+ pvf_with_params: PvfWithExecutorParams::from_discriminator(2),
preparation_timeout,
});
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
@@ -722,12 +744,20 @@ mod tests {
let priority = Priority::Normal;
let preparation_timeout = PRECHECK_PREPARATION_TIMEOUT;
- test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(1), preparation_timeout });
- test.send_queue(ToQueue::Enqueue { priority, pvf: pvf(2), preparation_timeout });
+ test.send_queue(ToQueue::Enqueue {
+ priority,
+ pvf_with_params: PvfWithExecutorParams::from_discriminator(1),
+ preparation_timeout,
+ });
+ test.send_queue(ToQueue::Enqueue {
+ priority,
+ pvf_with_params: PvfWithExecutorParams::from_discriminator(2),
+ preparation_timeout,
+ });
// Start a non-precheck preparation for this one.
test.send_queue(ToQueue::Enqueue {
priority,
- pvf: pvf(3),
+ pvf_with_params: PvfWithExecutorParams::from_discriminator(3),
preparation_timeout: LENIENT_PREPARATION_TIMEOUT,
});
@@ -753,7 +783,10 @@ mod tests {
// Since there is still work, the queue requested one extra worker to spawn to handle the
// remaining enqueued work items.
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
- assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
+ assert_eq!(
+ test.poll_and_recv_from_queue().await.artifact_id,
+ pvf_with_params(1).as_artifact_id()
+ );
}
#[tokio::test]
@@ -762,7 +795,7 @@ mod tests {
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
- pvf: pvf(1),
+ pvf_with_params: PvfWithExecutorParams::from_discriminator(1),
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
});
@@ -787,7 +820,7 @@ mod tests {
test.send_queue(ToQueue::Enqueue {
priority: Priority::Normal,
- pvf: pvf(1),
+ pvf_with_params: PvfWithExecutorParams::from_discriminator(1),
preparation_timeout: PRECHECK_PREPARATION_TIMEOUT,
});
diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs
index 3e64777a9c17..8fba877a9377 100644
--- a/node/core/pvf/src/prepare/worker.rs
+++ b/node/core/pvf/src/prepare/worker.rs
@@ -34,6 +34,7 @@ use crate::{
use cpu_time::ProcessTime;
use futures::{pin_mut, select_biased, FutureExt};
use parity_scale_codec::{Decode, Encode};
+use polkadot_primitives::vstaging::ExecutorParams;
use sp_core::hexdisplay::HexDisplay;
use std::{
panic,
@@ -85,6 +86,7 @@ pub async fn start_work(
code: Arc>,
cache_path: &Path,
artifact_path: PathBuf,
+ executor_params: ExecutorParams,
preparation_timeout: Duration,
) -> Outcome {
let IdleWorker { stream, pid } = worker;
@@ -97,7 +99,9 @@ pub async fn start_work(
);
with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move {
- if let Err(err) = send_request(&mut stream, code, &tmp_file, preparation_timeout).await {
+ if let Err(err) =
+ send_request(&mut stream, code, &tmp_file, &executor_params, preparation_timeout).await
+ {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
@@ -271,15 +275,19 @@ async fn send_request(
stream: &mut UnixStream,
code: Arc>,
tmp_file: &Path,
+ executor_params: &ExecutorParams,
preparation_timeout: Duration,
) -> io::Result<()> {
framed_send(stream, &code).await?;
framed_send(stream, path_to_bytes(tmp_file)).await?;
+ framed_send(stream, &executor_params.encode()).await?;
framed_send(stream, &preparation_timeout.encode()).await?;
Ok(())
}
-async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, PathBuf, Duration)> {
+async fn recv_request(
+ stream: &mut UnixStream,
+) -> io::Result<(Vec, PathBuf, ExecutorParams, Duration)> {
let code = framed_recv(stream).await?;
let tmp_file = framed_recv(stream).await?;
let tmp_file = bytes_to_path(&tmp_file).ok_or_else(|| {
@@ -288,6 +296,13 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, PathBuf,
"prepare pvf recv_request: non utf-8 artifact path".to_string(),
)
})?;
+ let executor_params_enc = framed_recv(stream).await?;
+ let executor_params = ExecutorParams::decode(&mut &executor_params_enc[..]).map_err(|_| {
+ io::Error::new(
+ io::ErrorKind::Other,
+ "prepare pvf recv_request: failed to decode ExecutorParams".to_string(),
+ )
+ })?;
let preparation_timeout = framed_recv(stream).await?;
let preparation_timeout = Duration::decode(&mut &preparation_timeout[..]).map_err(|e| {
io::Error::new(
@@ -295,7 +310,7 @@ async fn recv_request(stream: &mut UnixStream) -> io::Result<(Vec, PathBuf,
format!("prepare pvf recv_request: failed to decode duration: {:?}", e),
)
})?;
- Ok((code, tmp_file, preparation_timeout))
+ Ok((code, tmp_file, executor_params, preparation_timeout))
}
async fn send_response(stream: &mut UnixStream, result: PrepareResult) -> io::Result<()> {
@@ -347,7 +362,8 @@ pub fn worker_entrypoint(socket_path: &str) {
worker_event_loop("prepare", socket_path, |rt_handle, mut stream| async move {
loop {
let worker_pid = std::process::id();
- let (code, dest, preparation_timeout) = recv_request(&mut stream).await?;
+ let (code, dest, executor_params, preparation_timeout) =
+ recv_request(&mut stream).await?;
gum::debug!(
target: LOG_TARGET,
%worker_pid,
@@ -372,7 +388,7 @@ pub fn worker_entrypoint(socket_path: &str) {
// Spawn another thread for preparation.
let prepare_fut = rt_handle
.spawn_blocking(move || {
- let result = prepare_artifact(&code);
+ let result = prepare_artifact(&code, executor_params);
// Get the `ru_maxrss` stat. If supported, call getrusage for the thread.
#[cfg(target_os = "linux")]
@@ -454,14 +470,17 @@ pub fn worker_entrypoint(socket_path: &str) {
});
}
-fn prepare_artifact(code: &[u8]) -> Result {
+fn prepare_artifact(
+ code: &[u8],
+ executor_params: ExecutorParams,
+) -> Result {
panic::catch_unwind(|| {
let blob = match crate::executor_intf::prevalidate(code) {
Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))),
Ok(b) => b,
};
- match crate::executor_intf::prepare(blob) {
+ match crate::executor_intf::prepare(blob, executor_params) {
Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)),
Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))),
}
diff --git a/node/core/pvf/src/pvf.rs b/node/core/pvf/src/pvf.rs
index d06968a13d43..e0284a26085f 100644
--- a/node/core/pvf/src/pvf.rs
+++ b/node/core/pvf/src/pvf.rs
@@ -16,6 +16,7 @@
use crate::artifacts::ArtifactId;
use polkadot_parachain::primitives::ValidationCodeHash;
+use polkadot_primitives::vstaging::ExecutorParams;
use sp_core::blake2_256;
use std::{fmt, sync::Arc};
@@ -48,9 +49,47 @@ impl Pvf {
let descriminator_buf = num.to_le_bytes().to_vec();
Pvf::from_code(descriminator_buf)
}
+}
+
+/// Coupling PVF code with executor params
+#[derive(Debug, Clone)]
+pub struct PvfWithExecutorParams {
+ pvf: Pvf,
+ executor_params: Arc,
+}
+
+impl PvfWithExecutorParams {
+ /// Creates a new PVF-ExecutorParams pair structure
+ pub fn new(pvf: Pvf, executor_params: ExecutorParams) -> Self {
+ Self { pvf, executor_params: Arc::new(executor_params) }
+ }
- /// Returns the artifact ID that corresponds to this PVF.
+ /// Returns artifact ID that corresponds to the PVF with given executor params
pub(crate) fn as_artifact_id(&self) -> ArtifactId {
- ArtifactId::new(self.code_hash)
+ ArtifactId::new(self.pvf.code_hash, self.executor_params.hash())
+ }
+
+ /// Returns validation code hash for the PVF
+ pub(crate) fn code_hash(&self) -> ValidationCodeHash {
+ self.pvf.code_hash
+ }
+
+ /// Returns PVF code
+ pub(crate) fn code(&self) -> Arc> {
+ self.pvf.code.clone()
+ }
+
+ /// Returns executor params
+ pub(crate) fn executor_params(&self) -> ExecutorParams {
+ (*self.executor_params).clone()
+ }
+
+ /// Creates a structure for tests
+ #[cfg(test)]
+ pub(crate) fn from_discriminator(num: u32) -> Self {
+ Self {
+ pvf: Pvf::from_discriminator(num),
+ executor_params: Arc::new(ExecutorParams::default()),
+ }
}
}
diff --git a/node/core/pvf/src/testing.rs b/node/core/pvf/src/testing.rs
index cbd37b06d403..2abc1d07a836 100644
--- a/node/core/pvf/src/testing.rs
+++ b/node/core/pvf/src/testing.rs
@@ -19,6 +19,8 @@
//! N.B. This is not guarded with some feature flag. Overexposing items here may affect the final
//! artifact even for production builds.
+use polkadot_primitives::vstaging::ExecutorParams;
+
pub mod worker_common {
pub use crate::worker_common::{spawn_with_program_path, SpawnErr};
}
@@ -35,12 +37,12 @@ pub fn validate_candidate(
.expect("Decompressing code failed");
let blob = prevalidate(&code)?;
- let artifact = prepare(blob)?;
+ let artifact = prepare(blob, ExecutorParams::default())?;
let tmpdir = tempfile::tempdir()?;
let artifact_path = tmpdir.path().join("blob");
std::fs::write(&artifact_path, &artifact)?;
- let executor = Executor::new()?;
+ let executor = Executor::new(ExecutorParams::default())?;
let result = unsafe {
// SAFETY: This is trivially safe since the artifact is obtained by calling `prepare`
// and is written into a temporary directory in an unmodified state.
diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs
index 9cda5f8cd0b7..430a6950fb4f 100644
--- a/node/core/pvf/src/worker_common.rs
+++ b/node/core/pvf/src/worker_common.rs
@@ -251,6 +251,8 @@ pub enum SpawnErr {
ProcessSpawn,
/// The deadline allotted for the worker spawning and connecting to the socket has elapsed.
AcceptTimeout,
+ /// Failed to send handshake after successful spawning was signaled
+ Handshake,
}
/// This is a representation of a potentially running worker. Drop it and the process will be killed.
diff --git a/node/core/pvf/tests/it/adder.rs b/node/core/pvf/tests/it/adder.rs
index 8eb57e4d9026..3c373f7ea517 100644
--- a/node/core/pvf/tests/it/adder.rs
+++ b/node/core/pvf/tests/it/adder.rs
@@ -39,6 +39,7 @@ async fn execute_good_block_on_parent() {
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
+ Default::default(),
)
.await
.unwrap();
@@ -72,6 +73,7 @@ async fn execute_good_chain_on_parent() {
relay_parent_number: number as RelayChainBlockNumber + 1,
relay_parent_storage_root: Default::default(),
},
+ Default::default(),
)
.await
.unwrap();
@@ -108,6 +110,7 @@ async fn execute_bad_block_on_parent() {
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
+ Default::default(),
)
.await
.unwrap_err();
@@ -129,6 +132,7 @@ async fn stress_spawn() {
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
+ Default::default(),
)
.await
.unwrap();
diff --git a/node/core/pvf/tests/it/main.rs b/node/core/pvf/tests/it/main.rs
index 07754ef8693d..b540230c4746 100644
--- a/node/core/pvf/tests/it/main.rs
+++ b/node/core/pvf/tests/it/main.rs
@@ -17,10 +17,11 @@
use assert_matches::assert_matches;
use parity_scale_codec::Encode as _;
use polkadot_node_core_pvf::{
- start, Config, InvalidCandidate, Metrics, Pvf, ValidationError, ValidationHost,
- JOB_TIMEOUT_WALL_CLOCK_FACTOR,
+ start, Config, InvalidCandidate, Metrics, Pvf, PvfWithExecutorParams, ValidationError,
+ ValidationHost, JOB_TIMEOUT_WALL_CLOCK_FACTOR,
};
use polkadot_parachain::primitives::{BlockData, ValidationParams, ValidationResult};
+use polkadot_primitives::vstaging::{ExecutorParam, ExecutorParams};
use std::time::Duration;
use tokio::sync::Mutex;
@@ -57,6 +58,7 @@ impl TestHost {
&self,
code: &[u8],
params: ValidationParams,
+ executor_params: ExecutorParams,
) -> Result {
let (result_tx, result_rx) = futures::channel::oneshot::channel();
@@ -67,7 +69,7 @@ impl TestHost {
.lock()
.await
.execute_pvf(
- Pvf::from_code(code.into()),
+ PvfWithExecutorParams::new(Pvf::from_code(code.into()), executor_params),
TEST_EXECUTION_TIMEOUT,
params.encode(),
polkadot_node_core_pvf::Priority::Normal,
@@ -93,6 +95,7 @@ async fn terminates_on_timeout() {
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
+ Default::default(),
)
.await;
@@ -118,6 +121,7 @@ async fn ensure_parallel_execution() {
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
+ Default::default(),
);
let execute_pvf_future_2 = host.validate_candidate(
halt::wasm_binary_unwrap(),
@@ -127,6 +131,7 @@ async fn ensure_parallel_execution() {
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
+ Default::default(),
);
let start = std::time::Instant::now();
@@ -169,6 +174,7 @@ async fn execute_queue_doesnt_stall_if_workers_died() {
relay_parent_number: 1,
relay_parent_storage_root: Default::default(),
},
+ Default::default(),
)
}))
.await;
@@ -184,3 +190,52 @@ async fn execute_queue_doesnt_stall_if_workers_died() {
max_duration.as_millis()
);
}
+
+#[tokio::test]
+async fn execute_queue_doesnt_stall_with_varying_executor_params() {
+ let host = TestHost::new_with_config(|cfg| {
+ cfg.execute_workers_max_num = 2;
+ });
+
+ let executor_params_1 = ExecutorParams::default();
+ let executor_params_2 = ExecutorParams::from(&[ExecutorParam::StackLogicalMax(1024)][..]);
+
+ // Here we spawn 6 validation jobs for the `halt` PVF and share those between 2 workers. Every
+ // 3rd job will have different set of executor parameters. All the workers should be killed
+ // and in this case the queue should respawn new workers with needed executor environment
+ // without waiting. The jobs will be executed in 3 batches, each running two jobs in parallel,
+ // and execution time would be roughly 3 * TEST_EXECUTION_TIMEOUT
+ let start = std::time::Instant::now();
+ futures::future::join_all((0u8..6).map(|i| {
+ host.validate_candidate(
+ halt::wasm_binary_unwrap(),
+ ValidationParams {
+ block_data: BlockData(Vec::new()),
+ parent_head: Default::default(),
+ relay_parent_number: 1,
+ relay_parent_storage_root: Default::default(),
+ },
+ match i % 3 {
+ 0 => executor_params_1.clone(),
+ _ => executor_params_2.clone(),
+ },
+ )
+ }))
+ .await;
+
+ let duration = std::time::Instant::now().duration_since(start);
+ let min_duration = 3 * TEST_EXECUTION_TIMEOUT;
+ let max_duration = 4 * TEST_EXECUTION_TIMEOUT;
+ assert!(
+ duration >= min_duration,
+ "Expected duration {}ms to be greater than or equal to {}ms",
+ duration.as_millis(),
+ min_duration.as_millis()
+ );
+ assert!(
+ duration <= max_duration,
+ "Expected duration {}ms to be less than or equal to {}ms",
+ duration.as_millis(),
+ max_duration.as_millis()
+ );
+}
diff --git a/node/core/runtime-api/src/cache.rs b/node/core/runtime-api/src/cache.rs
index 9efc31328692..82d2e0dbc8b3 100644
--- a/node/core/runtime-api/src/cache.rs
+++ b/node/core/runtime-api/src/cache.rs
@@ -20,11 +20,12 @@ use lru::LruCache;
use sp_consensus_babe::Epoch;
use polkadot_primitives::{
- AuthorityDiscoveryId, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash,
- CommittedCandidateReceipt, CoreState, DisputeState, GroupRotationInfo, Hash, Id as ParaId,
- InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, PersistedValidationData,
- PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode,
- ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
+ vstaging::ExecutorParams, AuthorityDiscoveryId, BlockNumber, CandidateCommitments,
+ CandidateEvent, CandidateHash, CommittedCandidateReceipt, CoreState, DisputeState,
+ GroupRotationInfo, Hash, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage,
+ OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes,
+ SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex,
+ ValidatorSignature,
};
/// For consistency we have the same capacity for all caches. We use 128 as we'll only need that
@@ -51,6 +52,7 @@ pub(crate) struct RequestResultCache {
validation_code_by_hash: LruCache>,
candidate_pending_availability: LruCache<(Hash, ParaId), Option>,
candidate_events: LruCache>,
+ session_executor_params: LruCache>,
session_info: LruCache,
dmq_contents: LruCache<(Hash, ParaId), Vec>>,
inbound_hrmp_channels_contents:
@@ -79,6 +81,7 @@ impl Default for RequestResultCache {
validation_code_by_hash: LruCache::new(DEFAULT_CACHE_CAP),
candidate_pending_availability: LruCache::new(DEFAULT_CACHE_CAP),
candidate_events: LruCache::new(DEFAULT_CACHE_CAP),
+ session_executor_params: LruCache::new(DEFAULT_CACHE_CAP),
session_info: LruCache::new(DEFAULT_CACHE_CAP),
dmq_contents: LruCache::new(DEFAULT_CACHE_CAP),
inbound_hrmp_channels_contents: LruCache::new(DEFAULT_CACHE_CAP),
@@ -263,6 +266,21 @@ impl RequestResultCache {
self.session_info.put(key, value);
}
+ pub(crate) fn session_executor_params(
+ &mut self,
+ session_index: SessionIndex,
+ ) -> Option<&Option> {
+ self.session_executor_params.get(&session_index)
+ }
+
+ pub(crate) fn cache_session_executor_params(
+ &mut self,
+ session_index: SessionIndex,
+ value: Option,
+ ) {
+ self.session_executor_params.put(session_index, value);
+ }
+
pub(crate) fn dmq_contents(
&mut self,
key: (Hash, ParaId),
@@ -389,6 +407,7 @@ pub(crate) enum RequestResult {
ValidationCodeByHash(Hash, ValidationCodeHash, Option),
CandidatePendingAvailability(Hash, ParaId, Option),
CandidateEvents(Hash, Vec),
+ SessionExecutorParams(Hash, SessionIndex, Option),
SessionInfo(Hash, SessionIndex, Option),
DmqContents(Hash, ParaId, Vec>),
InboundHrmpChannelsContents(
diff --git a/node/core/runtime-api/src/lib.rs b/node/core/runtime-api/src/lib.rs
index 3d016305bc64..0c5641d1201e 100644
--- a/node/core/runtime-api/src/lib.rs
+++ b/node/core/runtime-api/src/lib.rs
@@ -132,6 +132,8 @@ where
.cache_candidate_pending_availability((relay_parent, para_id), candidate),
CandidateEvents(relay_parent, events) =>
self.requests_cache.cache_candidate_events(relay_parent, events),
+ SessionExecutorParams(_relay_parent, session_index, index) =>
+ self.requests_cache.cache_session_executor_params(session_index, index),
SessionInfo(_relay_parent, session_index, info) =>
if let Some(info) = info {
self.requests_cache.cache_session_info(session_index, info);
@@ -229,6 +231,17 @@ where
.map(|sender| Request::CandidatePendingAvailability(para, sender)),
Request::CandidateEvents(sender) =>
query!(candidate_events(), sender).map(|sender| Request::CandidateEvents(sender)),
+ Request::SessionExecutorParams(session_index, sender) => {
+ if let Some(executor_params) =
+ self.requests_cache.session_executor_params(session_index)
+ {
+ self.metrics.on_cached_request();
+ let _ = sender.send(Ok(executor_params.clone()));
+ None
+ } else {
+ Some(Request::SessionExecutorParams(session_index, sender))
+ }
+ },
Request::SessionInfo(index, sender) => {
if let Some(info) = self.requests_cache.session_info(index) {
self.metrics.on_cached_request();
@@ -480,6 +493,12 @@ where
res.ok().map(|res| RequestResult::SessionInfo(relay_parent, index, res))
},
+ Request::SessionExecutorParams(session_index, sender) => query!(
+ SessionExecutorParams,
+ session_executor_params(session_index),
+ ver = Request::EXECUTOR_PARAMS_RUNTIME_REQUIREMENT,
+ sender
+ ),
Request::DmqContents(id, sender) => query!(DmqContents, dmq_contents(id), ver = 1, sender),
Request::InboundHrmpChannelsContents(id, sender) =>
query!(InboundHrmpChannelsContents, inbound_hrmp_channels_contents(id), ver = 1, sender),
diff --git a/node/malus/src/variants/suggest_garbage_candidate.rs b/node/malus/src/variants/suggest_garbage_candidate.rs
index 146348f00123..7e1a9246bc4f 100644
--- a/node/malus/src/variants/suggest_garbage_candidate.rs
+++ b/node/malus/src/variants/suggest_garbage_candidate.rs
@@ -33,7 +33,7 @@ use polkadot_cli::{
};
use polkadot_node_core_candidate_validation::find_validation_data;
use polkadot_node_primitives::{AvailableData, BlockData, PoV};
-use polkadot_primitives::CandidateDescriptor;
+use polkadot_primitives::{CandidateDescriptor, CandidateReceipt};
use polkadot_node_subsystem_util::request_validators;
use sp_core::traits::SpawnNamed;
@@ -53,7 +53,6 @@ use crate::{
// Import extra types relevant to the particular
// subsystem.
use polkadot_node_subsystem::{messages::CandidateBackingMessage, SpawnGlue};
-use polkadot_primitives::CandidateReceipt;
use std::sync::Arc;
diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs
index 506e37d2cc92..1acafbd1cfaa 100644
--- a/node/subsystem-types/src/messages.rs
+++ b/node/subsystem-types/src/messages.rs
@@ -39,11 +39,11 @@ use polkadot_node_primitives::{
SignedDisputeStatement, SignedFullStatement, ValidationResult,
};
use polkadot_primitives::{
- AuthorityDiscoveryId, BackedCandidate, BlockNumber, CandidateEvent, CandidateHash,
- CandidateIndex, CandidateReceipt, CollatorId, CommittedCandidateReceipt, CoreState,
- DisputeState, GroupIndex, GroupRotationInfo, Hash, Header as BlockHeader, Id as ParaId,
- InboundDownwardMessage, InboundHrmpMessage, MultiDisputeStatementSet, OccupiedCoreAssumption,
- PersistedValidationData, PvfCheckStatement, SessionIndex, SessionInfo,
+ vstaging::ExecutorParams, AuthorityDiscoveryId, BackedCandidate, BlockNumber, CandidateEvent,
+ CandidateHash, CandidateIndex, CandidateReceipt, CollatorId, CommittedCandidateReceipt,
+ CoreState, DisputeState, GroupIndex, GroupRotationInfo, Hash, Header as BlockHeader,
+ Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, MultiDisputeStatementSet,
+ OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, SessionIndex, SessionInfo,
SignedAvailabilityBitfield, SignedAvailabilityBitfields, ValidationCode, ValidationCodeHash,
ValidatorId, ValidatorIndex, ValidatorSignature,
};
@@ -574,6 +574,8 @@ pub enum RuntimeApiRequest {
/// Get all events concerning candidates (backing, inclusion, time-out) in the parent of
/// the block in whose state this request is executed.
CandidateEvents(RuntimeApiSender>),
+ /// Get the execution environment parameter set by session index
+ SessionExecutorParams(SessionIndex, RuntimeApiSender