From e82feb7501948a6f4fe67a283521a7e3bc5b46a5 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Thu, 25 May 2023 16:29:13 -0400 Subject: [PATCH] PVF: Refactor workers into separate crates, remove host dependency (#7253) * PVF: Refactor workers into separate crates, remove host dependency * Fix compile error * Remove some leftover code * Fix compile errors * Update Cargo.lock * Remove worker main.rs files I accidentally copied these from the other PR. This PR isn't intended to introduce standalone workers yet. * Address review comments * cargo fmt * Update a couple of comments * Update log targets --- Cargo.lock | 68 ++++++++-- Cargo.toml | 8 +- cli/Cargo.toml | 6 +- cli/src/command.rs | 4 +- node/core/pvf/Cargo.toml | 12 +- .../pvf/{worker => }/bin/puppet_worker.rs | 2 +- node/core/pvf/common/Cargo.toml | 26 ++++ node/core/pvf/{worker => common}/build.rs | 0 node/core/pvf/common/src/error.rs | 106 +++++++++++++++ node/core/pvf/common/src/execute.rs | 60 +++++++++ node/core/pvf/common/src/executor_intf.rs | 114 ++++++++++++++++ node/core/pvf/common/src/lib.rs | 57 ++++++++ node/core/pvf/common/src/prepare.rs | 48 +++++++ node/core/pvf/{ => common}/src/pvf.rs | 24 ++-- .../src/common.rs => common/src/worker.rs} | 38 ++++++ .../pvf/{worker => execute-worker}/Cargo.toml | 22 +--- .../src/executor_intf.rs | 122 +----------------- .../execute.rs => execute-worker/src/lib.rs} | 26 ++-- node/core/pvf/prepare-worker/Cargo.toml | 33 +++++ .../pvf/prepare-worker/src/executor_intf.rs | 42 ++++++ .../prepare.rs => prepare-worker/src/lib.rs} | 42 ++++-- .../src/memory_stats.rs | 8 +- node/core/pvf/src/artifacts.rs | 26 ++-- node/core/pvf/src/error.rs | 91 +------------ node/core/pvf/src/execute/mod.rs | 1 - node/core/pvf/src/execute/queue.rs | 2 +- node/core/pvf/src/execute/worker_intf.rs | 52 ++------ node/core/pvf/src/host.rs | 18 ++- node/core/pvf/src/lib.rs | 40 +++--- node/core/pvf/src/metrics.rs | 2 +- node/core/pvf/src/prepare/mod.rs | 33 ----- node/core/pvf/src/prepare/pool.rs | 8 +- node/core/pvf/src/prepare/queue.rs | 26 ++-- node/core/pvf/src/prepare/worker_intf.rs | 15 ++- node/core/pvf/{worker => }/src/testing.rs | 6 +- .../src/{worker_common.rs => worker_intf.rs} | 0 node/core/pvf/{worker => }/tests/it/adder.rs | 0 node/core/pvf/{worker => }/tests/it/main.rs | 0 .../{worker => }/tests/it/worker_common.rs | 0 node/core/pvf/worker/src/lib.rs | 73 ----------- node/malus/Cargo.toml | 3 +- node/malus/src/malus.rs | 4 +- node/test/performance-test/Cargo.toml | 2 +- node/test/performance-test/src/lib.rs | 6 +- .../test-parachains/adder/collator/Cargo.toml | 2 +- .../adder/collator/bin/puppet_worker.rs | 2 +- .../test-parachains/adder/collator/src/lib.rs | 2 +- .../undying/collator/Cargo.toml | 2 +- .../undying/collator/bin/puppet_worker.rs | 2 +- .../undying/collator/src/lib.rs | 2 +- 50 files changed, 773 insertions(+), 515 deletions(-) rename node/core/pvf/{worker => }/bin/puppet_worker.rs (92%) create mode 100644 node/core/pvf/common/Cargo.toml rename node/core/pvf/{worker => common}/build.rs (100%) create mode 100644 node/core/pvf/common/src/error.rs create mode 100644 node/core/pvf/common/src/execute.rs create mode 100644 node/core/pvf/common/src/executor_intf.rs create mode 100644 node/core/pvf/common/src/lib.rs create mode 100644 node/core/pvf/common/src/prepare.rs rename node/core/pvf/{ => common}/src/pvf.rs (81%) rename node/core/pvf/{worker/src/common.rs => common/src/worker.rs} (90%) rename node/core/pvf/{worker => execute-worker}/Cargo.toml (67%) rename node/core/pvf/{worker => execute-worker}/src/executor_intf.rs (65%) rename node/core/pvf/{worker/src/execute.rs => execute-worker/src/lib.rs} (93%) create mode 100644 node/core/pvf/prepare-worker/Cargo.toml create mode 100644 node/core/pvf/prepare-worker/src/executor_intf.rs rename node/core/pvf/{worker/src/prepare.rs => prepare-worker/src/lib.rs} (90%) rename node/core/pvf/{worker => prepare-worker}/src/memory_stats.rs (97%) rename node/core/pvf/{worker => }/src/testing.rs (93%) rename node/core/pvf/src/{worker_common.rs => worker_intf.rs} (100%) rename node/core/pvf/{worker => }/tests/it/adder.rs (100%) rename node/core/pvf/{worker => }/tests/it/main.rs (100%) rename node/core/pvf/{worker => }/tests/it/worker_common.rs (100%) delete mode 100644 node/core/pvf/worker/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 4e37115acc04..54f27915d8a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6862,7 +6862,7 @@ dependencies = [ "nix 0.26.2", "polkadot-cli", "polkadot-core-primitives", - "polkadot-node-core-pvf-worker", + "polkadot-node-core-pvf-prepare-worker", "polkadot-overseer", "substrate-rpc-client", "tempfile", @@ -6988,7 +6988,8 @@ dependencies = [ "futures", "log", "polkadot-client", - "polkadot-node-core-pvf-worker", + "polkadot-node-core-pvf-execute-worker", + "polkadot-node-core-pvf-prepare-worker", "polkadot-node-metrics", "polkadot-performance-test", "polkadot-service", @@ -7467,6 +7468,9 @@ dependencies = [ "parity-scale-codec", "pin-project", "polkadot-core-primitives", + "polkadot-node-core-pvf-common", + "polkadot-node-core-pvf-execute-worker", + "polkadot-node-core-pvf-prepare-worker", "polkadot-node-metrics", "polkadot-node-primitives", "polkadot-parachain", @@ -7479,6 +7483,8 @@ dependencies = [ "sp-wasm-interface", "substrate-build-script-utils", "tempfile", + "test-parachain-adder", + "test-parachain-halt", "tokio", "tracing-gum", ] @@ -7507,15 +7513,32 @@ dependencies = [ ] [[package]] -name = "polkadot-node-core-pvf-worker" +name = "polkadot-node-core-pvf-common" version = "0.9.41" dependencies = [ - "assert_matches", "cpu-time", "futures", "libc", "parity-scale-codec", - "polkadot-node-core-pvf", + "polkadot-parachain", + "polkadot-primitives", + "sc-executor-common", + "sc-executor-wasmtime", + "sp-core", + "sp-tracing", + "substrate-build-script-utils", + "tokio", + "tracing-gum", +] + +[[package]] +name = "polkadot-node-core-pvf-execute-worker" +version = "0.9.41" +dependencies = [ + "cpu-time", + "futures", + "parity-scale-codec", + "polkadot-node-core-pvf-common", "polkadot-parachain", "polkadot-primitives", "rayon", @@ -7527,10 +7550,28 @@ dependencies = [ "sp-io", "sp-maybe-compressed-blob", "sp-tracing", - "substrate-build-script-utils", - "tempfile", - "test-parachain-adder", - "test-parachain-halt", + "tikv-jemalloc-ctl", + "tokio", + "tracing-gum", +] + +[[package]] +name = "polkadot-node-core-pvf-prepare-worker" +version = "0.9.41" +dependencies = [ + "futures", + "libc", + "parity-scale-codec", + "polkadot-node-core-pvf-common", + "polkadot-parachain", + "polkadot-primitives", + "rayon", + "sc-executor", + "sc-executor-common", + "sc-executor-wasmtime", + "sp-io", + "sp-maybe-compressed-blob", + "sp-tracing", "tikv-jemalloc-ctl", "tokio", "tracing-gum", @@ -7786,7 +7827,7 @@ dependencies = [ "kusama-runtime", "log", "polkadot-erasure-coding", - "polkadot-node-core-pvf-worker", + "polkadot-node-core-pvf-prepare-worker", "polkadot-node-primitives", "polkadot-primitives", "quote", @@ -8292,7 +8333,8 @@ dependencies = [ "polkadot-node-core-backing", "polkadot-node-core-candidate-validation", "polkadot-node-core-dispute-coordinator", - "polkadot-node-core-pvf-worker", + "polkadot-node-core-pvf-execute-worker", + "polkadot-node-core-pvf-prepare-worker", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-node-subsystem-test-helpers", @@ -12464,7 +12506,7 @@ dependencies = [ "log", "parity-scale-codec", "polkadot-cli", - "polkadot-node-core-pvf-worker", + "polkadot-node-core-pvf", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-parachain", @@ -12512,7 +12554,7 @@ dependencies = [ "log", "parity-scale-codec", "polkadot-cli", - "polkadot-node-core-pvf-worker", + "polkadot-node-core-pvf", "polkadot-node-primitives", "polkadot-node-subsystem", "polkadot-parachain", diff --git a/Cargo.toml b/Cargo.toml index 02722fd28653..cc3b4e4c1d35 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,7 +24,7 @@ tikv-jemallocator = "0.5.0" # Crates in our workspace, defined as dependencies so we can pass them feature flags. polkadot-cli = { path = "cli", features = [ "kusama-native", "westend-native", "rococo-native" ] } -polkadot-node-core-pvf-worker = { path = "node/core/pvf/worker" } +polkadot-node-core-pvf-prepare-worker = { path = "node/core/pvf/prepare-worker" } polkadot-overseer = { path = "node/overseer" } [dev-dependencies] @@ -81,7 +81,9 @@ members = [ "node/core/parachains-inherent", "node/core/provisioner", "node/core/pvf", - "node/core/pvf/worker", + "node/core/pvf/common", + "node/core/pvf/execute-worker", + "node/core/pvf/prepare-worker", "node/core/pvf-checker", "node/core/runtime-api", "node/network/approval-distribution", @@ -208,7 +210,7 @@ try-runtime = [ "polkadot-cli/try-runtime" ] fast-runtime = [ "polkadot-cli/fast-runtime" ] runtime-metrics = [ "polkadot-cli/runtime-metrics" ] pyroscope = ["polkadot-cli/pyroscope"] -jemalloc-allocator = ["polkadot-node-core-pvf-worker/jemalloc-allocator", "polkadot-overseer/jemalloc-allocator"] +jemalloc-allocator = ["polkadot-node-core-pvf-prepare-worker/jemalloc-allocator", "polkadot-overseer/jemalloc-allocator"] diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 04596d5a6d0b..1fe9fa696cfd 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -23,7 +23,8 @@ pyroscope_pprofrs = { version = "0.2", optional = true } service = { package = "polkadot-service", path = "../node/service", default-features = false, optional = true } polkadot-client = { path = "../node/client", optional = true } -polkadot-node-core-pvf-worker = { path = "../node/core/pvf/worker", optional = true } +polkadot-node-core-pvf-execute-worker = { path = "../node/core/pvf/execute-worker", optional = true } +polkadot-node-core-pvf-prepare-worker = { path = "../node/core/pvf/prepare-worker", optional = true } polkadot-performance-test = { path = "../node/test/performance-test", optional = true } sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } @@ -54,7 +55,8 @@ cli = [ "frame-benchmarking-cli", "try-runtime-cli", "polkadot-client", - "polkadot-node-core-pvf-worker", + "polkadot-node-core-pvf-execute-worker", + "polkadot-node-core-pvf-prepare-worker", ] runtime-benchmarks = [ "service/runtime-benchmarks", diff --git a/cli/src/command.rs b/cli/src/command.rs index 802ba93941c3..132d8279eb13 100644 --- a/cli/src/command.rs +++ b/cli/src/command.rs @@ -495,7 +495,7 @@ pub fn run() -> Result<()> { #[cfg(not(target_os = "android"))] { - polkadot_node_core_pvf_worker::prepare_worker_entrypoint( + polkadot_node_core_pvf_prepare_worker::worker_entrypoint( &cmd.socket_path, Some(&cmd.node_impl_version), ); @@ -517,7 +517,7 @@ pub fn run() -> Result<()> { #[cfg(not(target_os = "android"))] { - polkadot_node_core_pvf_worker::execute_worker_entrypoint( + polkadot_node_core_pvf_execute_worker::worker_entrypoint( &cmd.socket_path, Some(&cmd.node_impl_version), ); diff --git a/node/core/pvf/Cargo.toml b/node/core/pvf/Cargo.toml index 026930758b86..d00c13fda2b0 100644 --- a/node/core/pvf/Cargo.toml +++ b/node/core/pvf/Cargo.toml @@ -4,6 +4,10 @@ version.workspace = true authors.workspace = true edition.workspace = true +[[bin]] +name = "puppet_worker" +path = "bin/puppet_worker.rs" + [dependencies] always-assert = "0.1" futures = "0.3.21" @@ -13,12 +17,16 @@ libc = "0.2.139" pin-project = "1.0.9" rand = "0.8.5" slotmap = "1.0" +tempfile = "3.3.0" tokio = { version = "1.24.2", features = ["fs", "process"] } parity-scale-codec = { version = "3.4.0", default-features = false, features = ["derive"] } polkadot-parachain = { path = "../../../parachain" } polkadot-core-primitives = { path = "../../../core-primitives" } +polkadot-node-core-pvf-common = { path = "common" } +polkadot-node-core-pvf-execute-worker = { path = "execute-worker" } +polkadot-node-core-pvf-prepare-worker = { path = "prepare-worker" } polkadot-node-metrics = { path = "../../metrics" } polkadot-node-primitives = { path = "../../primitives" } polkadot-primitives = { path = "../../../primitives" } @@ -34,4 +42,6 @@ substrate-build-script-utils = { git = "https://github.com/paritytech/substrate" [dev-dependencies] assert_matches = "1.4.0" hex-literal = "0.3.4" -tempfile = "3.3.0" + +adder = { package = "test-parachain-adder", path = "../../../parachain/test-parachains/adder" } +halt = { package = "test-parachain-halt", path = "../../../parachain/test-parachains/halt" } diff --git a/node/core/pvf/worker/bin/puppet_worker.rs b/node/core/pvf/bin/puppet_worker.rs similarity index 92% rename from node/core/pvf/worker/bin/puppet_worker.rs rename to node/core/pvf/bin/puppet_worker.rs index ddd81971292b..7f93519d8454 100644 --- a/node/core/pvf/worker/bin/puppet_worker.rs +++ b/node/core/pvf/bin/puppet_worker.rs @@ -14,4 +14,4 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -polkadot_node_core_pvf_worker::decl_puppet_worker_main!(); +polkadot_node_core_pvf::decl_puppet_worker_main!(); diff --git a/node/core/pvf/common/Cargo.toml b/node/core/pvf/common/Cargo.toml new file mode 100644 index 000000000000..de9fa10804c7 --- /dev/null +++ b/node/core/pvf/common/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "polkadot-node-core-pvf-common" +version.workspace = true +authors.workspace = true +edition.workspace = true + +[dependencies] +cpu-time = "1.0.0" +futures = "0.3.21" +gum = { package = "tracing-gum", path = "../../../gum" } +libc = "0.2.139" +tokio = { version = "1.24.2", features = ["fs", "process", "io-util"] } + +parity-scale-codec = { version = "3.4.0", default-features = false, features = ["derive"] } + +polkadot-parachain = { path = "../../../../parachain" } +polkadot-primitives = { path = "../../../../primitives" } + +sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" } + +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" } + +[build-dependencies] +substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" } diff --git a/node/core/pvf/worker/build.rs b/node/core/pvf/common/build.rs similarity index 100% rename from node/core/pvf/worker/build.rs rename to node/core/pvf/common/build.rs diff --git a/node/core/pvf/common/src/error.rs b/node/core/pvf/common/src/error.rs new file mode 100644 index 000000000000..56353c53b4d2 --- /dev/null +++ b/node/core/pvf/common/src/error.rs @@ -0,0 +1,106 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use crate::prepare::PrepareStats; +use parity_scale_codec::{Decode, Encode}; +use std::fmt; + +/// Result of PVF preparation performed by the validation host. Contains stats about the preparation if +/// successful +pub type PrepareResult = Result; + +/// An error that occurred during the prepare part of the PVF pipeline. +#[derive(Debug, Clone, Encode, Decode)] +pub enum PrepareError { + /// During the prevalidation stage of preparation an issue was found with the PVF. + Prevalidation(String), + /// Compilation failed for the given PVF. + Preparation(String), + /// An unexpected panic has occurred in the preparation worker. + Panic(String), + /// Failed to prepare the PVF due to the time limit. + TimedOut, + /// An IO error occurred. This state is reported by either the validation host or by the worker. + IoErr(String), + /// The temporary file for the artifact could not be created at the given cache path. This state is reported by the + /// validation host (not by the worker). + CreateTmpFileErr(String), + /// The response from the worker is received, but the file cannot be renamed (moved) to the final destination + /// location. This state is reported by the validation host (not by the worker). + RenameTmpFileErr(String), +} + +impl PrepareError { + /// Returns whether this is a deterministic error, i.e. one that should trigger reliably. Those + /// errors depend on the PVF itself and the sc-executor/wasmtime logic. + /// + /// Non-deterministic errors can happen spuriously. Typically, they occur due to resource + /// starvation, e.g. under heavy load or memory pressure. Those errors are typically transient + /// but may persist e.g. if the node is run by overwhelmingly underpowered machine. + pub fn is_deterministic(&self) -> bool { + use PrepareError::*; + match self { + Prevalidation(_) | Preparation(_) | Panic(_) => true, + TimedOut | IoErr(_) | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false, + } + } +} + +impl fmt::Display for PrepareError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use PrepareError::*; + match self { + Prevalidation(err) => write!(f, "prevalidation: {}", err), + Preparation(err) => write!(f, "preparation: {}", err), + Panic(err) => write!(f, "panic: {}", err), + TimedOut => write!(f, "prepare: timeout"), + IoErr(err) => write!(f, "prepare: io error while receiving response: {}", err), + CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err), + RenameTmpFileErr(err) => write!(f, "prepare: error renaming tmp file: {}", err), + } + } +} + +/// Some internal error occurred. +/// +/// Should only ever be used for validation errors independent of the candidate and PVF, or for errors we ruled out +/// during pre-checking (so preparation errors are fine). +#[derive(Debug, Clone, Encode, Decode)] +pub enum InternalValidationError { + /// Some communication error occurred with the host. + HostCommunication(String), + /// Could not find or open compiled artifact file. + CouldNotOpenFile(String), + /// An error occurred in the CPU time monitor thread. Should be totally unrelated to validation. + CpuTimeMonitorThread(String), + /// Some non-deterministic preparation error occurred. + NonDeterministicPrepareError(PrepareError), +} + +impl fmt::Display for InternalValidationError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use InternalValidationError::*; + match self { + HostCommunication(err) => + write!(f, "validation: some communication error occurred with the host: {}", err), + CouldNotOpenFile(err) => + write!(f, "validation: could not find or open compiled artifact file: {}", err), + CpuTimeMonitorThread(err) => + write!(f, "validation: an error occurred in the CPU time monitor thread: {}", err), + NonDeterministicPrepareError(err) => write!(f, "validation: prepare: {}", err), + } + } +} diff --git a/node/core/pvf/common/src/execute.rs b/node/core/pvf/common/src/execute.rs new file mode 100644 index 000000000000..de5ce39f7838 --- /dev/null +++ b/node/core/pvf/common/src/execute.rs @@ -0,0 +1,60 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use crate::error::InternalValidationError; +use parity_scale_codec::{Decode, Encode}; +use polkadot_parachain::primitives::ValidationResult; +use polkadot_primitives::ExecutorParams; +use std::time::Duration; + +/// The payload of the one-time handshake that is done when a worker process is created. Carries +/// data from the host to the worker. +#[derive(Encode, Decode)] +pub struct Handshake { + /// The executor parameters. + pub executor_params: ExecutorParams, +} + +/// The response from an execution job on the worker. +#[derive(Encode, Decode)] +pub enum Response { + /// The job completed successfully. + Ok { + /// The result of parachain validation. + result_descriptor: ValidationResult, + /// The amount of CPU time taken by the job. + duration: Duration, + }, + /// The candidate is invalid. + InvalidCandidate(String), + /// The job timed out. + TimedOut, + /// An unexpected panic has occurred in the execution worker. + Panic(String), + /// Some internal error occurred. + InternalError(InternalValidationError), +} + +impl Response { + /// Creates an invalid response from a context `ctx` and a message `msg` (which can be empty). + pub fn format_invalid(ctx: &'static str, msg: &str) -> Self { + if msg.is_empty() { + Self::InvalidCandidate(ctx.to_string()) + } else { + Self::InvalidCandidate(format!("{}: {}", ctx, msg)) + } + } +} diff --git a/node/core/pvf/common/src/executor_intf.rs b/node/core/pvf/common/src/executor_intf.rs new file mode 100644 index 000000000000..5926f3c5dbc7 --- /dev/null +++ b/node/core/pvf/common/src/executor_intf.rs @@ -0,0 +1,114 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Interface to the Substrate Executor + +use polkadot_primitives::{ExecutorParam, ExecutorParams}; +use sc_executor_common::wasm_runtime::HeapAllocStrategy; +use sc_executor_wasmtime::{Config, DeterministicStackLimit, Semantics}; + +// Memory configuration +// +// When Substrate Runtime is instantiated, a number of WASM pages are allocated for the Substrate +// Runtime instance's linear memory. The exact number of pages is a sum of whatever the WASM blob +// itself requests (by default at least enough to hold the data section as well as have some space +// left for the stack; this is, of course, overridable at link time when compiling the runtime) +// plus the number of pages specified in the `extra_heap_pages` passed to the executor. +// +// By default, rustc (or `lld` specifically) should allocate 1 MiB for the shadow stack, or 16 pages. +// The data section for runtimes are typically rather small and can fit in a single digit number of +// WASM pages, so let's say an extra 16 pages. Thus let's assume that 32 pages or 2 MiB are used for +// these needs by default. +const DEFAULT_HEAP_PAGES_ESTIMATE: u32 = 32; +const EXTRA_HEAP_PAGES: u32 = 2048; + +/// The number of bytes devoted for the stack during wasm execution of a PVF. +pub const NATIVE_STACK_MAX: u32 = 256 * 1024 * 1024; + +// VALUES OF THE DEFAULT CONFIGURATION SHOULD NEVER BE CHANGED +// They are used as base values for the execution environment parametrization. +// To overwrite them, add new ones to `EXECUTOR_PARAMS` in the `session_info` pallet and perform +// a runtime upgrade to make them active. +pub const DEFAULT_CONFIG: Config = Config { + allow_missing_func_imports: true, + cache_path: None, + semantics: Semantics { + heap_alloc_strategy: sc_executor_common::wasm_runtime::HeapAllocStrategy::Dynamic { + maximum_pages: Some(DEFAULT_HEAP_PAGES_ESTIMATE + EXTRA_HEAP_PAGES), + }, + + instantiation_strategy: + sc_executor_wasmtime::InstantiationStrategy::RecreateInstanceCopyOnWrite, + + // Enable deterministic stack limit to pin down the exact number of items the wasmtime stack + // can contain before it traps with stack overflow. + // + // Here is how the values below were chosen. + // + // At the moment of writing, the default native stack size limit is 1 MiB. Assuming a logical item + // (see the docs about the field and the instrumentation algorithm) is 8 bytes, 1 MiB can + // fit 2x 65536 logical items. + // + // Since reaching the native stack limit is undesirable, we halve the logical item limit and + // also increase the native 256x. This hopefully should preclude wasm code from reaching + // the stack limit set by the wasmtime. + deterministic_stack_limit: Some(DeterministicStackLimit { + logical_max: 65536, + native_stack_max: NATIVE_STACK_MAX, + }), + canonicalize_nans: true, + // Rationale for turning the multi-threaded compilation off is to make the preparation time + // easily reproducible and as deterministic as possible. + // + // Currently the prepare queue doesn't distinguish between precheck and prepare requests. + // On the one hand, it simplifies the code, on the other, however, slows down compile times + // for execute requests. This behavior may change in future. + parallel_compilation: false, + + // WASM extensions. Only those that are meaningful to us may be controlled here. By default, + // we're using WASM MVP, which means all the extensions are disabled. Nevertheless, some + // extensions (e.g., sign extension ops) are enabled by Wasmtime and cannot be disabled. + wasm_reference_types: false, + wasm_simd: false, + wasm_bulk_memory: false, + wasm_multi_value: false, + }, +}; + +pub fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Result { + let mut sem = DEFAULT_CONFIG.semantics.clone(); + let mut stack_limit = if let Some(stack_limit) = sem.deterministic_stack_limit.clone() { + stack_limit + } else { + return Err("No default stack limit set".to_owned()) + }; + + for p in par.iter() { + match p { + ExecutorParam::MaxMemoryPages(max_pages) => + sem.heap_alloc_strategy = + HeapAllocStrategy::Dynamic { maximum_pages: Some(*max_pages) }, + ExecutorParam::StackLogicalMax(slm) => stack_limit.logical_max = *slm, + ExecutorParam::StackNativeMax(snm) => stack_limit.native_stack_max = *snm, + ExecutorParam::WasmExtBulkMemory => sem.wasm_bulk_memory = true, + // TODO: Not implemented yet; . + ExecutorParam::PrecheckingMaxMemory(_) => (), + ExecutorParam::PvfPrepTimeout(_, _) | ExecutorParam::PvfExecTimeout(_, _) => (), // Not used here + } + } + sem.deterministic_stack_limit = Some(stack_limit); + Ok(sem) +} diff --git a/node/core/pvf/common/src/lib.rs b/node/core/pvf/common/src/lib.rs new file mode 100644 index 000000000000..028fd9b17947 --- /dev/null +++ b/node/core/pvf/common/src/lib.rs @@ -0,0 +1,57 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Functionality that is shared by the host and the workers. + +pub mod error; +pub mod execute; +pub mod executor_intf; +pub mod prepare; +pub mod pvf; +pub mod worker; + +pub use cpu_time::ProcessTime; + +const LOG_TARGET: &str = "parachain::pvf-common"; + +use std::mem; +use tokio::io::{self, AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt as _}; + +#[doc(hidden)] +pub mod tests { + use std::time::Duration; + + pub const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); + pub const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30); +} + +/// Write some data prefixed by its length into `w`. +pub async fn framed_send(w: &mut (impl AsyncWrite + Unpin), buf: &[u8]) -> io::Result<()> { + let len_buf = buf.len().to_le_bytes(); + w.write_all(&len_buf).await?; + w.write_all(buf).await?; + Ok(()) +} + +/// Read some data prefixed by its length from `r`. +pub async fn framed_recv(r: &mut (impl AsyncRead + Unpin)) -> io::Result> { + let mut len_buf = [0u8; mem::size_of::()]; + r.read_exact(&mut len_buf).await?; + let len = usize::from_le_bytes(len_buf); + let mut buf = vec![0; len]; + r.read_exact(&mut buf).await?; + Ok(buf) +} diff --git a/node/core/pvf/common/src/prepare.rs b/node/core/pvf/common/src/prepare.rs new file mode 100644 index 000000000000..ac64e2927a16 --- /dev/null +++ b/node/core/pvf/common/src/prepare.rs @@ -0,0 +1,48 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use parity_scale_codec::{Decode, Encode}; + +/// Preparation statistics, including the CPU time and memory taken. +#[derive(Debug, Clone, Default, Encode, Decode)] +pub struct PrepareStats { + /// The CPU time that elapsed for the preparation job. + pub cpu_time_elapsed: std::time::Duration, + /// The observed memory statistics for the preparation job. + pub memory_stats: MemoryStats, +} + +/// Helper struct to contain all the memory stats, including `MemoryAllocationStats` and, if +/// supported by the OS, `ru_maxrss`. +#[derive(Clone, Debug, Default, Encode, Decode)] +pub struct MemoryStats { + /// Memory stats from `tikv_jemalloc_ctl`. + #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] + pub memory_tracker_stats: Option, + /// `ru_maxrss` from `getrusage`. `None` if an error occurred. + #[cfg(target_os = "linux")] + pub max_rss: Option, +} + +/// Statistics of collected memory metrics. +#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] +#[derive(Clone, Debug, Default, Encode, Decode)] +pub struct MemoryAllocationStats { + /// Total resident memory, in bytes. + pub resident: u64, + /// Total allocated memory, in bytes. + pub allocated: u64, +} diff --git a/node/core/pvf/src/pvf.rs b/node/core/pvf/common/src/pvf.rs similarity index 81% rename from node/core/pvf/src/pvf.rs rename to node/core/pvf/common/src/pvf.rs index c134cacb4acf..1661f324083a 100644 --- a/node/core/pvf/src/pvf.rs +++ b/node/core/pvf/common/src/pvf.rs @@ -14,7 +14,6 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use crate::artifacts::ArtifactId; use parity_scale_codec::{Decode, Encode}; use polkadot_parachain::primitives::ValidationCodeHash; use polkadot_primitives::ExecutorParams; @@ -26,9 +25,6 @@ use std::{ time::Duration, }; -#[cfg(test)] -use crate::host::tests::TEST_PREPARATION_TIMEOUT; - /// A struct that carries the exhaustive set of data to prepare an artifact out of plain /// Wasm binary /// @@ -58,13 +54,8 @@ impl PvfPrepData { Self { code, code_hash, executor_params, prep_timeout } } - /// Returns artifact ID that corresponds to the PVF with given executor params - pub(crate) fn as_artifact_id(&self) -> ArtifactId { - ArtifactId::new(self.code_hash, self.executor_params.hash()) - } - /// Returns validation code hash for the PVF - pub(crate) fn code_hash(&self) -> ValidationCodeHash { + pub fn code_hash(&self) -> ValidationCodeHash { self.code_hash } @@ -83,16 +74,17 @@ impl PvfPrepData { self.prep_timeout } - /// Creates a structure for tests - #[cfg(test)] - pub(crate) fn from_discriminator_and_timeout(num: u32, timeout: Duration) -> Self { + /// Creates a structure for tests. + #[doc(hidden)] + pub fn from_discriminator_and_timeout(num: u32, timeout: Duration) -> Self { let descriminator_buf = num.to_le_bytes().to_vec(); Self::from_code(descriminator_buf, ExecutorParams::default(), timeout) } - #[cfg(test)] - pub(crate) fn from_discriminator(num: u32) -> Self { - Self::from_discriminator_and_timeout(num, TEST_PREPARATION_TIMEOUT) + /// Creates a structure for tests. + #[doc(hidden)] + pub fn from_discriminator(num: u32) -> Self { + Self::from_discriminator_and_timeout(num, crate::tests::TEST_PREPARATION_TIMEOUT) } } diff --git a/node/core/pvf/worker/src/common.rs b/node/core/pvf/common/src/worker.rs similarity index 90% rename from node/core/pvf/worker/src/common.rs rename to node/core/pvf/common/src/worker.rs index 00289737a5c8..debe18985b37 100644 --- a/node/core/pvf/worker/src/common.rs +++ b/node/core/pvf/common/src/worker.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +//! Functionality common to both prepare and execute workers. + use crate::LOG_TARGET; use cpu_time::ProcessTime; use futures::never::Never; @@ -25,6 +27,42 @@ use std::{ }; use tokio::{io, net::UnixStream, runtime::Runtime}; +/// Use this macro to declare a `fn main() {}` that will create an executable that can be used for +/// spawning the desired worker. +#[macro_export] +macro_rules! decl_worker_main { + ($expected_command:expr, $entrypoint:expr) => { + fn main() { + ::sp_tracing::try_init_simple(); + + let args = std::env::args().collect::>(); + if args.len() < 3 { + panic!("wrong number of arguments"); + } + + let mut version = None; + let mut socket_path: &str = ""; + + for i in 2..args.len() { + match args[i].as_ref() { + "--socket-path" => socket_path = args[i + 1].as_str(), + "--node-version" => version = Some(args[i + 1].as_str()), + _ => (), + } + } + + let subcommand = &args[1]; + if subcommand != $expected_command { + panic!( + "trying to run {} binary with the {} subcommand", + $expected_command, subcommand + ) + } + $entrypoint(&socket_path, version); + } + }; +} + /// Some allowed overhead that we account for in the "CPU time monitor" thread's sleeps, on the /// child process. pub const JOB_TIMEOUT_OVERHEAD: Duration = Duration::from_millis(50); diff --git a/node/core/pvf/worker/Cargo.toml b/node/core/pvf/execute-worker/Cargo.toml similarity index 67% rename from node/core/pvf/worker/Cargo.toml rename to node/core/pvf/execute-worker/Cargo.toml index 53d548dbac6f..c360cee8bf5d 100644 --- a/node/core/pvf/worker/Cargo.toml +++ b/node/core/pvf/execute-worker/Cargo.toml @@ -1,27 +1,20 @@ [package] -name = "polkadot-node-core-pvf-worker" +name = "polkadot-node-core-pvf-execute-worker" version.workspace = true authors.workspace = true edition.workspace = true -[[bin]] -name = "puppet_worker" -path = "bin/puppet_worker.rs" - [dependencies] -assert_matches = "1.4.0" cpu-time = "1.0.0" futures = "0.3.21" gum = { package = "tracing-gum", path = "../../../gum" } -libc = "0.2.139" rayon = "1.5.1" -tempfile = "3.3.0" tikv-jemalloc-ctl = { version = "0.5.0", optional = true } -tokio = "1.24.2" +tokio = { version = "1.24.2", features = ["fs", "process"] } parity-scale-codec = { version = "3.4.0", default-features = false, features = ["derive"] } -polkadot-node-core-pvf = { path = ".." } +polkadot-node-core-pvf-common = { path = "../common" } polkadot-parachain = { path = "../../../../parachain" } polkadot-primitives = { path = "../../../../primitives" } @@ -37,12 +30,5 @@ sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master [target.'cfg(target_os = "linux")'.dependencies] tikv-jemalloc-ctl = "0.5.0" -[build-dependencies] -substrate-build-script-utils = { git = "https://github.com/paritytech/substrate", branch = "master" } - -[dev-dependencies] -adder = { package = "test-parachain-adder", path = "../../../../parachain/test-parachains/adder" } -halt = { package = "test-parachain-halt", path = "../../../../parachain/test-parachains/halt" } - [features] -jemalloc-allocator = ["dep:tikv-jemalloc-ctl"] +builder = [] diff --git a/node/core/pvf/worker/src/executor_intf.rs b/node/core/pvf/execute-worker/src/executor_intf.rs similarity index 65% rename from node/core/pvf/worker/src/executor_intf.rs rename to node/core/pvf/execute-worker/src/executor_intf.rs index ff286dd74d64..98424a3dcd1d 100644 --- a/node/core/pvf/worker/src/executor_intf.rs +++ b/node/core/pvf/execute-worker/src/executor_intf.rs @@ -16,13 +16,16 @@ //! Interface to the Substrate Executor -use polkadot_primitives::{ExecutorParam, ExecutorParams}; +use polkadot_node_core_pvf_common::executor_intf::{ + params_to_wasmtime_semantics, DEFAULT_CONFIG, NATIVE_STACK_MAX, +}; +use polkadot_primitives::ExecutorParams; use sc_executor_common::{ error::WasmError, runtime_blob::RuntimeBlob, - wasm_runtime::{HeapAllocStrategy, InvokeMethod, WasmModule as _}, + wasm_runtime::{InvokeMethod, WasmModule as _}, }; -use sc_executor_wasmtime::{Config, DeterministicStackLimit, Semantics, WasmtimeRuntime}; +use sc_executor_wasmtime::{Config, WasmtimeRuntime}; use sp_core::storage::{ChildInfo, TrackedStorageKey}; use sp_externalities::MultiRemovalResults; use std::any::{Any, TypeId}; @@ -63,119 +66,6 @@ use std::any::{Any, TypeId}; /// The stack size for the execute thread. pub const EXECUTE_THREAD_STACK_SIZE: usize = 2 * 1024 * 1024 + NATIVE_STACK_MAX as usize; -// Memory configuration -// -// When Substrate Runtime is instantiated, a number of WASM pages are allocated for the Substrate -// Runtime instance's linear memory. The exact number of pages is a sum of whatever the WASM blob -// itself requests (by default at least enough to hold the data section as well as have some space -// left for the stack; this is, of course, overridable at link time when compiling the runtime) -// plus the number of pages specified in the `extra_heap_pages` passed to the executor. -// -// By default, rustc (or `lld` specifically) should allocate 1 MiB for the shadow stack, or 16 pages. -// The data section for runtimes are typically rather small and can fit in a single digit number of -// WASM pages, so let's say an extra 16 pages. Thus let's assume that 32 pages or 2 MiB are used for -// these needs by default. -const DEFAULT_HEAP_PAGES_ESTIMATE: u32 = 32; -const EXTRA_HEAP_PAGES: u32 = 2048; - -/// The number of bytes devoted for the stack during wasm execution of a PVF. -const NATIVE_STACK_MAX: u32 = 256 * 1024 * 1024; - -// VALUES OF THE DEFAULT CONFIGURATION SHOULD NEVER BE CHANGED -// They are used as base values for the execution environment parametrization. -// To overwrite them, add new ones to `EXECUTOR_PARAMS` in the `session_info` pallet and perform -// a runtime upgrade to make them active. -const DEFAULT_CONFIG: Config = Config { - allow_missing_func_imports: true, - cache_path: None, - semantics: Semantics { - heap_alloc_strategy: sc_executor_common::wasm_runtime::HeapAllocStrategy::Dynamic { - maximum_pages: Some(DEFAULT_HEAP_PAGES_ESTIMATE + EXTRA_HEAP_PAGES), - }, - - instantiation_strategy: - sc_executor_wasmtime::InstantiationStrategy::RecreateInstanceCopyOnWrite, - - // Enable deterministic stack limit to pin down the exact number of items the wasmtime stack - // can contain before it traps with stack overflow. - // - // Here is how the values below were chosen. - // - // At the moment of writing, the default native stack size limit is 1 MiB. Assuming a logical item - // (see the docs about the field and the instrumentation algorithm) is 8 bytes, 1 MiB can - // fit 2x 65536 logical items. - // - // Since reaching the native stack limit is undesirable, we halve the logical item limit and - // also increase the native 256x. This hopefully should preclude wasm code from reaching - // the stack limit set by the wasmtime. - deterministic_stack_limit: Some(DeterministicStackLimit { - logical_max: 65536, - native_stack_max: NATIVE_STACK_MAX, - }), - canonicalize_nans: true, - // Rationale for turning the multi-threaded compilation off is to make the preparation time - // easily reproducible and as deterministic as possible. - // - // Currently the prepare queue doesn't distinguish between precheck and prepare requests. - // On the one hand, it simplifies the code, on the other, however, slows down compile times - // for execute requests. This behavior may change in future. - parallel_compilation: false, - - // WASM extensions. Only those that are meaningful to us may be controlled here. By default, - // we're using WASM MVP, which means all the extensions are disabled. Nevertheless, some - // extensions (e.g., sign extension ops) are enabled by Wasmtime and cannot be disabled. - wasm_reference_types: false, - wasm_simd: false, - wasm_bulk_memory: false, - wasm_multi_value: false, - }, -}; - -/// Runs the prevalidation on the given code. Returns a [`RuntimeBlob`] if it succeeds. -pub fn prevalidate(code: &[u8]) -> Result { - let blob = RuntimeBlob::new(code)?; - // It's assumed this function will take care of any prevalidation logic - // that needs to be done. - // - // Do nothing for now. - Ok(blob) -} - -/// Runs preparation on the given runtime blob. If successful, it returns a serialized compiled -/// artifact which can then be used to pass into `Executor::execute` after writing it to the disk. -pub fn prepare( - blob: RuntimeBlob, - executor_params: &ExecutorParams, -) -> Result, sc_executor_common::error::WasmError> { - let semantics = params_to_wasmtime_semantics(executor_params) - .map_err(|e| sc_executor_common::error::WasmError::Other(e))?; - sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics) -} - -fn params_to_wasmtime_semantics(par: &ExecutorParams) -> Result { - let mut sem = DEFAULT_CONFIG.semantics.clone(); - let mut stack_limit = if let Some(stack_limit) = sem.deterministic_stack_limit.clone() { - stack_limit - } else { - return Err("No default stack limit set".to_owned()) - }; - - for p in par.iter() { - match p { - ExecutorParam::MaxMemoryPages(max_pages) => - sem.heap_alloc_strategy = - HeapAllocStrategy::Dynamic { maximum_pages: Some(*max_pages) }, - ExecutorParam::StackLogicalMax(slm) => stack_limit.logical_max = *slm, - ExecutorParam::StackNativeMax(snm) => stack_limit.native_stack_max = *snm, - ExecutorParam::WasmExtBulkMemory => sem.wasm_bulk_memory = true, - ExecutorParam::PrecheckingMaxMemory(_) => (), // TODO: Not implemented yet - ExecutorParam::PvfPrepTimeout(_, _) | ExecutorParam::PvfExecTimeout(_, _) => (), // Not used here - } - } - sem.deterministic_stack_limit = Some(stack_limit); - Ok(sem) -} - #[derive(Clone)] pub struct Executor { config: Config, diff --git a/node/core/pvf/worker/src/execute.rs b/node/core/pvf/execute-worker/src/lib.rs similarity index 93% rename from node/core/pvf/worker/src/execute.rs rename to node/core/pvf/execute-worker/src/lib.rs index c5b8ddc9dd18..0ac39aafb0c9 100644 --- a/node/core/pvf/worker/src/execute.rs +++ b/node/core/pvf/execute-worker/src/lib.rs @@ -14,20 +14,26 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use crate::{ - common::{ +mod executor_intf; + +pub use executor_intf::Executor; + +// NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are +// separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-execute-worker=trace`. +const LOG_TARGET: &str = "parachain::pvf-execute-worker"; + +use crate::executor_intf::EXECUTE_THREAD_STACK_SIZE; +use cpu_time::ProcessTime; +use parity_scale_codec::{Decode, Encode}; +use polkadot_node_core_pvf_common::{ + error::InternalValidationError, + execute::{Handshake, Response}, + framed_recv, framed_send, + worker::{ bytes_to_path, cpu_time_monitor_loop, stringify_panic_payload, thread::{self, WaitOutcome}, worker_event_loop, }, - executor_intf::{Executor, EXECUTE_THREAD_STACK_SIZE}, - LOG_TARGET, -}; -use cpu_time::ProcessTime; -use parity_scale_codec::{Decode, Encode}; -use polkadot_node_core_pvf::{ - framed_recv, framed_send, ExecuteHandshake as Handshake, ExecuteResponse as Response, - InternalValidationError, }; use polkadot_parachain::primitives::ValidationResult; use std::{ diff --git a/node/core/pvf/prepare-worker/Cargo.toml b/node/core/pvf/prepare-worker/Cargo.toml new file mode 100644 index 000000000000..07386de35962 --- /dev/null +++ b/node/core/pvf/prepare-worker/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "polkadot-node-core-pvf-prepare-worker" +version.workspace = true +authors.workspace = true +edition.workspace = true + +[dependencies] +futures = "0.3.21" +gum = { package = "tracing-gum", path = "../../../gum" } +libc = "0.2.139" +rayon = "1.5.1" +tikv-jemalloc-ctl = { version = "0.5.0", optional = true } +tokio = { version = "1.24.2", features = ["fs", "process"] } + +parity-scale-codec = { version = "3.4.0", default-features = false, features = ["derive"] } + +polkadot-node-core-pvf-common = { path = "../common" } +polkadot-parachain = { path = "../../../../parachain" } +polkadot-primitives = { path = "../../../../primitives" } + +sc-executor = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-executor-common = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-executor-wasmtime = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-io = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-maybe-compressed-blob = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-tracing = { git = "https://github.com/paritytech/substrate", branch = "master" } + +[target.'cfg(target_os = "linux")'.dependencies] +tikv-jemalloc-ctl = "0.5.0" + +[features] +builder = [] +jemalloc-allocator = ["dep:tikv-jemalloc-ctl"] diff --git a/node/core/pvf/prepare-worker/src/executor_intf.rs b/node/core/pvf/prepare-worker/src/executor_intf.rs new file mode 100644 index 000000000000..1f88f6a6dd6e --- /dev/null +++ b/node/core/pvf/prepare-worker/src/executor_intf.rs @@ -0,0 +1,42 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Interface to the Substrate Executor + +use polkadot_node_core_pvf_common::executor_intf::params_to_wasmtime_semantics; +use polkadot_primitives::ExecutorParams; +use sc_executor_common::runtime_blob::RuntimeBlob; + +/// Runs the prevalidation on the given code. Returns a [`RuntimeBlob`] if it succeeds. +pub fn prevalidate(code: &[u8]) -> Result { + let blob = RuntimeBlob::new(code)?; + // It's assumed this function will take care of any prevalidation logic + // that needs to be done. + // + // Do nothing for now. + Ok(blob) +} + +/// Runs preparation on the given runtime blob. If successful, it returns a serialized compiled +/// artifact which can then be used to pass into `Executor::execute` after writing it to the disk. +pub fn prepare( + blob: RuntimeBlob, + executor_params: &ExecutorParams, +) -> Result, sc_executor_common::error::WasmError> { + let semantics = params_to_wasmtime_semantics(executor_params) + .map_err(|e| sc_executor_common::error::WasmError::Other(e))?; + sc_executor_wasmtime::prepare_runtime_artifact(blob, &semantics) +} diff --git a/node/core/pvf/worker/src/prepare.rs b/node/core/pvf/prepare-worker/src/lib.rs similarity index 90% rename from node/core/pvf/worker/src/prepare.rs rename to node/core/pvf/prepare-worker/src/lib.rs index fe9c1a85545a..8f36ef397cfb 100644 --- a/node/core/pvf/worker/src/prepare.rs +++ b/node/core/pvf/prepare-worker/src/lib.rs @@ -14,23 +14,31 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +mod executor_intf; +mod memory_stats; + +pub use executor_intf::{prepare, prevalidate}; + +// NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are +// separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-prepare-worker=trace`. +const LOG_TARGET: &str = "parachain::pvf-prepare-worker"; + #[cfg(target_os = "linux")] use crate::memory_stats::max_rss_stat::{extract_max_rss_stat, get_max_rss_thread}; #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] use crate::memory_stats::memory_tracker::{get_memory_tracker_loop_stats, memory_tracker_loop}; -use crate::{ - common::{ +use parity_scale_codec::{Decode, Encode}; +use polkadot_node_core_pvf_common::{ + error::{PrepareError, PrepareResult}, + framed_recv, framed_send, + prepare::{MemoryStats, PrepareStats}, + pvf::PvfPrepData, + worker::{ bytes_to_path, cpu_time_monitor_loop, stringify_panic_payload, thread::{self, WaitOutcome}, worker_event_loop, }, - prepare, prevalidate, LOG_TARGET, -}; -use cpu_time::ProcessTime; -use parity_scale_codec::{Decode, Encode}; -use polkadot_node_core_pvf::{ - framed_recv, framed_send, CompiledArtifact, MemoryStats, PrepareError, PrepareResult, - PrepareStats, PvfPrepData, + ProcessTime, }; use std::{ path::PathBuf, @@ -39,6 +47,22 @@ use std::{ }; use tokio::{io, net::UnixStream}; +/// Contains the bytes for a successfully compiled artifact. +pub struct CompiledArtifact(Vec); + +impl CompiledArtifact { + /// Creates a `CompiledArtifact`. + pub fn new(code: Vec) -> Self { + Self(code) + } +} + +impl AsRef<[u8]> for CompiledArtifact { + fn as_ref(&self) -> &[u8] { + self.0.as_slice() + } +} + async fn recv_request(stream: &mut UnixStream) -> io::Result<(PvfPrepData, PathBuf)> { let pvf = framed_recv(stream).await?; let pvf = PvfPrepData::decode(&mut &pvf[..]).map_err(|e| { diff --git a/node/core/pvf/worker/src/memory_stats.rs b/node/core/pvf/prepare-worker/src/memory_stats.rs similarity index 97% rename from node/core/pvf/worker/src/memory_stats.rs rename to node/core/pvf/prepare-worker/src/memory_stats.rs index 907f793d87af..e6dc8572c4a3 100644 --- a/node/core/pvf/worker/src/memory_stats.rs +++ b/node/core/pvf/prepare-worker/src/memory_stats.rs @@ -33,11 +33,11 @@ /// NOTE: Requires jemalloc enabled. #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] pub mod memory_tracker { - use crate::{ - common::{stringify_panic_payload, thread}, - LOG_TARGET, + use crate::LOG_TARGET; + use polkadot_node_core_pvf_common::{ + prepare::MemoryAllocationStats, + worker::{stringify_panic_payload, thread}, }; - use polkadot_node_core_pvf::MemoryAllocationStats; use std::{thread::JoinHandle, time::Duration}; use tikv_jemalloc_ctl::{epoch, stats, Error}; diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs index d5a660cc3aa5..78d2f88941b8 100644 --- a/node/core/pvf/src/artifacts.rs +++ b/node/core/pvf/src/artifacts.rs @@ -55,8 +55,9 @@ //! older by a predefined parameter. This process is run very rarely (say, once a day). Once the //! artifact is expired it is removed from disk eagerly atomically. -use crate::{error::PrepareError, host::PrepareResultSender, prepare::PrepareStats}; +use crate::host::PrepareResultSender; use always_assert::always; +use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareStats, pvf::PvfPrepData}; use polkadot_parachain::primitives::ValidationCodeHash; use polkadot_primitives::ExecutorParamsHash; use std::{ @@ -65,22 +66,6 @@ use std::{ time::{Duration, SystemTime}, }; -/// Contains the bytes for a successfully compiled artifact. -pub struct CompiledArtifact(Vec); - -impl CompiledArtifact { - /// Creates a `CompiledArtifact`. - pub fn new(code: Vec) -> Self { - Self(code) - } -} - -impl AsRef<[u8]> for CompiledArtifact { - fn as_ref(&self) -> &[u8] { - self.0.as_slice() - } -} - /// Identifier of an artifact. Encodes a code hash of the PVF and a hash of executor parameter set. #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct ArtifactId { @@ -96,6 +81,11 @@ impl ArtifactId { Self { code_hash, executor_params_hash } } + /// Returns an artifact ID that corresponds to the PVF with given executor params. + pub fn from_pvf_prep_data(pvf: &PvfPrepData) -> Self { + Self::new(pvf.code_hash(), pvf.executor_params().hash()) + } + /// Tries to recover the artifact id from the given file name. #[cfg(test)] pub fn from_file_name(file_name: &str) -> Option { @@ -304,7 +294,7 @@ mod tests { #[tokio::test] async fn artifacts_removes_cache_on_startup() { - let fake_cache_path = crate::worker_common::tmpfile("test-cache").await.unwrap(); + let fake_cache_path = crate::worker_intf::tmpfile("test-cache").await.unwrap(); let fake_artifact_path = { let mut p = fake_cache_path.clone(); p.push("wasmtime_0x1234567890123456789012345678901234567890123456789012345678901234"); diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs index 33f3f00810f2..7372cd233c49 100644 --- a/node/core/pvf/src/error.rs +++ b/node/core/pvf/src/error.rs @@ -14,65 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use crate::prepare::PrepareStats; -use parity_scale_codec::{Decode, Encode}; -use std::fmt; - -/// Result of PVF preparation performed by the validation host. Contains stats about the preparation if -/// successful -pub type PrepareResult = Result; - -/// An error that occurred during the prepare part of the PVF pipeline. -#[derive(Debug, Clone, Encode, Decode)] -pub enum PrepareError { - /// During the prevalidation stage of preparation an issue was found with the PVF. - Prevalidation(String), - /// Compilation failed for the given PVF. - Preparation(String), - /// An unexpected panic has occurred in the preparation worker. - Panic(String), - /// Failed to prepare the PVF due to the time limit. - TimedOut, - /// An IO error occurred. This state is reported by either the validation host or by the worker. - IoErr(String), - /// The temporary file for the artifact could not be created at the given cache path. This state is reported by the - /// validation host (not by the worker). - CreateTmpFileErr(String), - /// The response from the worker is received, but the file cannot be renamed (moved) to the final destination - /// location. This state is reported by the validation host (not by the worker). - RenameTmpFileErr(String), -} - -impl PrepareError { - /// Returns whether this is a deterministic error, i.e. one that should trigger reliably. Those - /// errors depend on the PVF itself and the sc-executor/wasmtime logic. - /// - /// Non-deterministic errors can happen spuriously. Typically, they occur due to resource - /// starvation, e.g. under heavy load or memory pressure. Those errors are typically transient - /// but may persist e.g. if the node is run by overwhelmingly underpowered machine. - pub fn is_deterministic(&self) -> bool { - use PrepareError::*; - match self { - Prevalidation(_) | Preparation(_) | Panic(_) => true, - TimedOut | IoErr(_) | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false, - } - } -} - -impl fmt::Display for PrepareError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - use PrepareError::*; - match self { - Prevalidation(err) => write!(f, "prevalidation: {}", err), - Preparation(err) => write!(f, "preparation: {}", err), - Panic(err) => write!(f, "panic: {}", err), - TimedOut => write!(f, "prepare: timeout"), - IoErr(err) => write!(f, "prepare: io error while receiving response: {}", err), - CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err), - RenameTmpFileErr(err) => write!(f, "prepare: error renaming tmp file: {}", err), - } - } -} +use polkadot_node_core_pvf_common::error::{InternalValidationError, PrepareError}; /// A error raised during validation of the candidate. #[derive(Debug, Clone)] @@ -122,37 +64,6 @@ pub enum InvalidCandidate { Panic(String), } -/// Some internal error occurred. -/// -/// Should only ever be used for validation errors independent of the candidate and PVF, or for errors we ruled out -/// during pre-checking (so preparation errors are fine). -#[derive(Debug, Clone, Encode, Decode)] -pub enum InternalValidationError { - /// Some communication error occurred with the host. - HostCommunication(String), - /// Could not find or open compiled artifact file. - CouldNotOpenFile(String), - /// An error occurred in the CPU time monitor thread. Should be totally unrelated to validation. - CpuTimeMonitorThread(String), - /// Some non-deterministic preparation error occurred. - NonDeterministicPrepareError(PrepareError), -} - -impl fmt::Display for InternalValidationError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - use InternalValidationError::*; - match self { - HostCommunication(err) => - write!(f, "validation: some communication error occurred with the host: {}", err), - CouldNotOpenFile(err) => - write!(f, "validation: could not find or open compiled artifact file: {}", err), - CpuTimeMonitorThread(err) => - write!(f, "validation: an error occurred in the CPU time monitor thread: {}", err), - NonDeterministicPrepareError(err) => write!(f, "validation: prepare: {}", err), - } - } -} - impl From for ValidationError { fn from(error: InternalValidationError) -> Self { Self::InternalError(error) diff --git a/node/core/pvf/src/execute/mod.rs b/node/core/pvf/src/execute/mod.rs index 8e3b17d71569..669b9dc04d7c 100644 --- a/node/core/pvf/src/execute/mod.rs +++ b/node/core/pvf/src/execute/mod.rs @@ -24,4 +24,3 @@ mod queue; mod worker_intf; pub use queue::{start, PendingExecutionRequest, ToQueue}; -pub use worker_intf::{Handshake as ExecuteHandshake, Response as ExecuteResponse}; diff --git a/node/core/pvf/src/execute/queue.rs b/node/core/pvf/src/execute/queue.rs index 61cebc5e2c46..395697616b36 100644 --- a/node/core/pvf/src/execute/queue.rs +++ b/node/core/pvf/src/execute/queue.rs @@ -21,7 +21,7 @@ use crate::{ artifacts::{ArtifactId, ArtifactPathId}, host::ResultSender, metrics::Metrics, - worker_common::{IdleWorker, WorkerHandle}, + worker_intf::{IdleWorker, WorkerHandle}, InvalidCandidate, ValidationError, LOG_TARGET, }; use futures::{ diff --git a/node/core/pvf/src/execute/worker_intf.rs b/node/core/pvf/src/execute/worker_intf.rs index 4c26aeb0260a..6e54e17e515a 100644 --- a/node/core/pvf/src/execute/worker_intf.rs +++ b/node/core/pvf/src/execute/worker_intf.rs @@ -18,17 +18,20 @@ use crate::{ artifacts::ArtifactPathId, - error::InternalValidationError, - worker_common::{ - framed_recv, framed_send, path_to_bytes, spawn_with_program_path, IdleWorker, SpawnErr, - WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR, + worker_intf::{ + path_to_bytes, spawn_with_program_path, IdleWorker, SpawnErr, WorkerHandle, + JOB_TIMEOUT_WALL_CLOCK_FACTOR, }, LOG_TARGET, }; use futures::FutureExt; use futures_timer::Delay; use parity_scale_codec::{Decode, Encode}; - +use polkadot_node_core_pvf_common::{ + error::InternalValidationError, + execute::{Handshake, Response}, + framed_recv, framed_send, +}; use polkadot_parachain::primitives::ValidationResult; use polkadot_primitives::ExecutorParams; use std::{path::Path, time::Duration}; @@ -208,42 +211,3 @@ async fn recv_response(stream: &mut UnixStream) -> io::Result { ) }) } - -/// The payload of the one-time handshake that is done when a worker process is created. Carries -/// data from the host to the worker. -#[derive(Encode, Decode)] -pub struct Handshake { - /// The executor parameters. - pub executor_params: ExecutorParams, -} - -/// The response from an execution job on the worker. -#[derive(Encode, Decode)] -pub enum Response { - /// The job completed successfully. - Ok { - /// The result of parachain validation. - result_descriptor: ValidationResult, - /// The amount of CPU time taken by the job. - duration: Duration, - }, - /// The candidate is invalid. - InvalidCandidate(String), - /// The job timed out. - TimedOut, - /// An unexpected panic has occurred in the execution worker. - Panic(String), - /// Some internal error occurred. - InternalError(InternalValidationError), -} - -impl Response { - /// Creates an invalid response from a context `ctx` and a message `msg` (which can be empty). - pub fn format_invalid(ctx: &'static str, msg: &str) -> Self { - if msg.is_empty() { - Self::InvalidCandidate(ctx.to_string()) - } else { - Self::InvalidCandidate(format!("{}: {}", ctx, msg)) - } - } -} diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index bfc775a32dee..67f4a66e9748 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -22,16 +22,19 @@ use crate::{ artifacts::{ArtifactId, ArtifactPathId, ArtifactState, Artifacts}, - error::PrepareError, execute::{self, PendingExecutionRequest}, metrics::Metrics, - prepare, PrepareResult, Priority, PvfPrepData, ValidationError, LOG_TARGET, + prepare, Priority, ValidationError, LOG_TARGET, }; use always_assert::never; use futures::{ channel::{mpsc, oneshot}, Future, FutureExt, SinkExt, StreamExt, }; +use polkadot_node_core_pvf_common::{ + error::{PrepareError, PrepareResult}, + pvf::PvfPrepData, +}; use polkadot_parachain::primitives::ValidationResult; use std::{ collections::HashMap, @@ -423,7 +426,7 @@ async fn handle_precheck_pvf( pvf: PvfPrepData, result_sender: PrepareResultSender, ) -> Result<(), Fatal> { - let artifact_id = pvf.as_artifact_id(); + let artifact_id = ArtifactId::from_pvf_prep_data(&pvf); if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { match state { @@ -467,7 +470,7 @@ async fn handle_execute_pvf( inputs: ExecutePvfInputs, ) -> Result<(), Fatal> { let ExecutePvfInputs { pvf, exec_timeout, params, priority, result_tx } = inputs; - let artifact_id = pvf.as_artifact_id(); + let artifact_id = ArtifactId::from_pvf_prep_data(&pvf); let executor_params = (*pvf.executor_params()).clone(); if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { @@ -590,7 +593,7 @@ async fn handle_heads_up( let now = SystemTime::now(); for active_pvf in active_pvfs { - let artifact_id = active_pvf.as_artifact_id(); + let artifact_id = ArtifactId::from_pvf_prep_data(&active_pvf); if let Some(state) = artifacts.artifact_state_mut(&artifact_id) { match state { ArtifactState::Prepared { last_time_needed, .. } => { @@ -854,9 +857,10 @@ fn pulse_every(interval: std::time::Duration) -> impl futures::Stream #[cfg(test)] pub(crate) mod tests { use super::*; - use crate::{prepare::PrepareStats, InvalidCandidate, PrepareError}; + use crate::InvalidCandidate; use assert_matches::assert_matches; use futures::future::BoxFuture; + use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareStats}; const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); pub(crate) const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(30); @@ -877,7 +881,7 @@ pub(crate) mod tests { /// Creates a new PVF which artifact id can be uniquely identified by the given number. fn artifact_id(descriminator: u32) -> ArtifactId { - PvfPrepData::from_discriminator(descriminator).as_artifact_id() + ArtifactId::from_pvf_prep_data(&PvfPrepData::from_discriminator(descriminator)) } fn artifact_path(descriminator: u32) -> PathBuf { diff --git a/node/core/pvf/src/lib.rs b/node/core/pvf/src/lib.rs index 9b302150fd36..d8b801292ca8 100644 --- a/node/core/pvf/src/lib.rs +++ b/node/core/pvf/src/lib.rs @@ -95,27 +95,31 @@ mod host; mod metrics; mod prepare; mod priority; -mod pvf; -mod worker_common; +mod worker_intf; -pub use artifacts::CompiledArtifact; -pub use error::{ - InternalValidationError, InvalidCandidate, PrepareError, PrepareResult, ValidationError, -}; -pub use execute::{ExecuteHandshake, ExecuteResponse}; -#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] -pub use prepare::MemoryAllocationStats; -pub use prepare::{MemoryStats, PrepareStats}; -pub use priority::Priority; -pub use pvf::PvfPrepData; +#[doc(hidden)] +pub mod testing; + +// Used by `decl_puppet_worker_main!`. +#[doc(hidden)] +pub use sp_tracing; +pub use error::{InvalidCandidate, ValidationError}; pub use host::{start, Config, ValidationHost}; pub use metrics::Metrics; -pub use worker_common::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR}; +pub use priority::Priority; +pub use worker_intf::{framed_recv, framed_send, JOB_TIMEOUT_WALL_CLOCK_FACTOR}; + +// Re-export some common types. +pub use polkadot_node_core_pvf_common::{ + error::{InternalValidationError, PrepareError}, + prepare::PrepareStats, + pvf::PvfPrepData, +}; -const LOG_TARGET: &str = "parachain::pvf"; +// Re-export worker entrypoints. +pub use polkadot_node_core_pvf_execute_worker::worker_entrypoint as execute_worker_entrypoint; +pub use polkadot_node_core_pvf_prepare_worker::worker_entrypoint as prepare_worker_entrypoint; -#[doc(hidden)] -pub mod testing { - pub use crate::worker_common::{spawn_with_program_path, SpawnErr}; -} +/// The log target for this crate. +pub const LOG_TARGET: &str = "parachain::pvf"; diff --git a/node/core/pvf/src/metrics.rs b/node/core/pvf/src/metrics.rs index 12bcd9eadad3..62f8c6dc5157 100644 --- a/node/core/pvf/src/metrics.rs +++ b/node/core/pvf/src/metrics.rs @@ -16,7 +16,7 @@ //! Prometheus metrics related to the validation host. -use crate::prepare::MemoryStats; +use polkadot_node_core_pvf_common::prepare::MemoryStats; use polkadot_node_metrics::metrics::{self, prometheus}; /// Validation host metrics. diff --git a/node/core/pvf/src/prepare/mod.rs b/node/core/pvf/src/prepare/mod.rs index de40c48464c4..580f67f73fa0 100644 --- a/node/core/pvf/src/prepare/mod.rs +++ b/node/core/pvf/src/prepare/mod.rs @@ -28,36 +28,3 @@ mod worker_intf; pub use pool::start as start_pool; pub use queue::{start as start_queue, FromQueue, ToQueue}; - -use parity_scale_codec::{Decode, Encode}; - -/// Preparation statistics, including the CPU time and memory taken. -#[derive(Debug, Clone, Default, Encode, Decode)] -pub struct PrepareStats { - /// The CPU time that elapsed for the preparation job. - pub cpu_time_elapsed: std::time::Duration, - /// The observed memory statistics for the preparation job. - pub memory_stats: MemoryStats, -} - -/// Helper struct to contain all the memory stats, including `MemoryAllocationStats` and, if -/// supported by the OS, `ru_maxrss`. -#[derive(Clone, Debug, Default, Encode, Decode)] -pub struct MemoryStats { - /// Memory stats from `tikv_jemalloc_ctl`. - #[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] - pub memory_tracker_stats: Option, - /// `ru_maxrss` from `getrusage`. `None` if an error occurred. - #[cfg(target_os = "linux")] - pub max_rss: Option, -} - -/// Statistics of collected memory metrics. -#[cfg(any(target_os = "linux", feature = "jemalloc-allocator"))] -#[derive(Clone, Debug, Default, Encode, Decode)] -pub struct MemoryAllocationStats { - /// Total resident memory, in bytes. - pub resident: u64, - /// Total allocated memory, in bytes. - pub allocated: u64, -} diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index d151f097805e..ae8ecff5285c 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -16,16 +16,18 @@ use super::worker_intf::{self, Outcome}; use crate::{ - error::{PrepareError, PrepareResult}, metrics::Metrics, - pvf::PvfPrepData, - worker_common::{IdleWorker, WorkerHandle}, + worker_intf::{IdleWorker, WorkerHandle}, LOG_TARGET, }; use always_assert::never; use futures::{ channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt, }; +use polkadot_node_core_pvf_common::{ + error::{PrepareError, PrepareResult}, + pvf::PvfPrepData, +}; use slotmap::HopSlotMap; use std::{ fmt, diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index f84d5ab0e56e..5e19a4c7217a 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -17,11 +17,10 @@ //! A queue that handles requests for PVF preparation. use super::pool::{self, Worker}; -use crate::{ - artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, PvfPrepData, LOG_TARGET, -}; +use crate::{artifacts::ArtifactId, metrics::Metrics, Priority, LOG_TARGET}; use always_assert::{always, never}; use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt}; +use polkadot_node_core_pvf_common::{error::PrepareResult, pvf::PvfPrepData}; use std::{ collections::{HashMap, VecDeque}, path::PathBuf, @@ -231,7 +230,7 @@ async fn handle_enqueue( ); queue.metrics.prepare_enqueued(); - let artifact_id = pvf.as_artifact_id(); + let artifact_id = ArtifactId::from_pvf_prep_data(&pvf); if never!( queue.artifact_id_to_job.contains_key(&artifact_id), "second Enqueue sent for a known artifact" @@ -339,7 +338,7 @@ async fn handle_worker_concluded( // this can't be None; // qed. let job_data = never_none!(queue.jobs.remove(job)); - let artifact_id = job_data.pvf.as_artifact_id(); + let artifact_id = ArtifactId::from_pvf_prep_data(&job_data.pvf); queue.artifact_id_to_job.remove(&artifact_id); @@ -425,7 +424,7 @@ async fn spawn_extra_worker(queue: &mut Queue, critical: bool) -> Result<(), Fat async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal> { let job_data = &mut queue.jobs[job]; - let artifact_id = job_data.pvf.as_artifact_id(); + let artifact_id = ArtifactId::from_pvf_prep_data(&job_data.pvf); let artifact_path = artifact_id.path(&queue.cache_path); job_data.worker = Some(worker); @@ -488,11 +487,10 @@ pub fn start( #[cfg(test)] mod tests { use super::*; - use crate::{ - error::PrepareError, host::tests::TEST_PREPARATION_TIMEOUT, prepare::PrepareStats, - }; + use crate::host::tests::TEST_PREPARATION_TIMEOUT; use assert_matches::assert_matches; use futures::{future::BoxFuture, FutureExt}; + use polkadot_node_core_pvf_common::{error::PrepareError, prepare::PrepareStats}; use slotmap::SlotMap; use std::task::Poll; @@ -616,7 +614,10 @@ mod tests { result: Ok(PrepareStats::default()), }); - assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); + assert_eq!( + test.poll_and_recv_from_queue().await.artifact_id, + ArtifactId::from_pvf_prep_data(&pvf(1)) + ); } #[tokio::test] @@ -735,7 +736,10 @@ mod tests { // Since there is still work, the queue requested one extra worker to spawn to handle the // remaining enqueued work items. assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); - assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); + assert_eq!( + test.poll_and_recv_from_queue().await.artifact_id, + ArtifactId::from_pvf_prep_data(&pvf(1)) + ); } #[tokio::test] diff --git a/node/core/pvf/src/prepare/worker_intf.rs b/node/core/pvf/src/prepare/worker_intf.rs index daf94aadc672..47522d3f0856 100644 --- a/node/core/pvf/src/prepare/worker_intf.rs +++ b/node/core/pvf/src/prepare/worker_intf.rs @@ -17,17 +17,20 @@ //! Host interface to the prepare worker. use crate::{ - error::{PrepareError, PrepareResult}, metrics::Metrics, - prepare::PrepareStats, - pvf::PvfPrepData, - worker_common::{ - framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, IdleWorker, - SpawnErr, WorkerHandle, JOB_TIMEOUT_WALL_CLOCK_FACTOR, + worker_intf::{ + path_to_bytes, spawn_with_program_path, tmpfile_in, IdleWorker, SpawnErr, WorkerHandle, + JOB_TIMEOUT_WALL_CLOCK_FACTOR, }, LOG_TARGET, }; use parity_scale_codec::{Decode, Encode}; +use polkadot_node_core_pvf_common::{ + error::{PrepareError, PrepareResult}, + framed_recv, framed_send, + prepare::PrepareStats, + pvf::PvfPrepData, +}; use sp_core::hexdisplay::HexDisplay; use std::{ diff --git a/node/core/pvf/worker/src/testing.rs b/node/core/pvf/src/testing.rs similarity index 93% rename from node/core/pvf/worker/src/testing.rs rename to node/core/pvf/src/testing.rs index 7497d4aed31c..cc07d7aeef02 100644 --- a/node/core/pvf/worker/src/testing.rs +++ b/node/core/pvf/src/testing.rs @@ -19,6 +19,9 @@ //! N.B. This is not guarded with some feature flag. Overexposing items here may affect the final //! artifact even for production builds. +#[doc(hidden)] +pub use crate::worker_intf::{spawn_with_program_path, SpawnErr}; + use polkadot_primitives::ExecutorParams; /// A function that emulates the stitches together behaviors of the preparation and the execution @@ -27,7 +30,8 @@ pub fn validate_candidate( code: &[u8], params: &[u8], ) -> Result, Box> { - use crate::executor_intf::{prepare, prevalidate, Executor}; + use polkadot_node_core_pvf_execute_worker::Executor; + use polkadot_node_core_pvf_prepare_worker::{prepare, prevalidate}; let code = sp_maybe_compressed_blob::decompress(code, 10 * 1024 * 1024) .expect("Decompressing code failed"); diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_intf.rs similarity index 100% rename from node/core/pvf/src/worker_common.rs rename to node/core/pvf/src/worker_intf.rs diff --git a/node/core/pvf/worker/tests/it/adder.rs b/node/core/pvf/tests/it/adder.rs similarity index 100% rename from node/core/pvf/worker/tests/it/adder.rs rename to node/core/pvf/tests/it/adder.rs diff --git a/node/core/pvf/worker/tests/it/main.rs b/node/core/pvf/tests/it/main.rs similarity index 100% rename from node/core/pvf/worker/tests/it/main.rs rename to node/core/pvf/tests/it/main.rs diff --git a/node/core/pvf/worker/tests/it/worker_common.rs b/node/core/pvf/tests/it/worker_common.rs similarity index 100% rename from node/core/pvf/worker/tests/it/worker_common.rs rename to node/core/pvf/tests/it/worker_common.rs diff --git a/node/core/pvf/worker/src/lib.rs b/node/core/pvf/worker/src/lib.rs deleted file mode 100644 index 456362cf8f57..000000000000 --- a/node/core/pvf/worker/src/lib.rs +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright (C) Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -mod common; -mod execute; -mod executor_intf; -mod memory_stats; -mod prepare; - -#[doc(hidden)] -pub mod testing; - -#[doc(hidden)] -pub use sp_tracing; - -pub use execute::worker_entrypoint as execute_worker_entrypoint; -pub use prepare::worker_entrypoint as prepare_worker_entrypoint; - -pub use executor_intf::{prepare, prevalidate}; - -// NOTE: Initializing logging in e.g. tests will not have an effect in the workers, as they are -// separate spawned processes. Run with e.g. `RUST_LOG=parachain::pvf-worker=trace`. -const LOG_TARGET: &str = "parachain::pvf-worker"; - -/// Use this macro to declare a `fn main() {}` that will create an executable that can be used for -/// spawning the desired worker. -#[macro_export(local_inner_macros)] -macro_rules! decl_worker_main { - ($command:tt) => { - fn main() { - $crate::sp_tracing::try_init_simple(); - - let args = std::env::args().collect::>(); - - let mut version = None; - let mut socket_path: &str = ""; - - for i in 1..args.len() { - match args[i].as_ref() { - "--socket-path" => socket_path = args[i + 1].as_str(), - "--node-version" => version = Some(args[i + 1].as_str()), - _ => (), - } - } - - decl_worker_main_command!($command, socket_path, version) - } - }; -} - -#[macro_export] -#[doc(hidden)] -macro_rules! decl_worker_main_command { - (prepare, $socket_path:expr, $version: expr) => { - $crate::prepare_worker_entrypoint(&$socket_path, $version) - }; - (execute, $socket_path:expr, $version: expr) => { - $crate::execute_worker_entrypoint(&$socket_path, $version) - }; -} diff --git a/node/malus/Cargo.toml b/node/malus/Cargo.toml index a36822b041a3..8e23e623174f 100644 --- a/node/malus/Cargo.toml +++ b/node/malus/Cargo.toml @@ -20,7 +20,8 @@ polkadot-node-subsystem-types = { path = "../subsystem-types" } polkadot-node-core-dispute-coordinator = { path = "../core/dispute-coordinator" } polkadot-node-core-candidate-validation = { path = "../core/candidate-validation" } polkadot-node-core-backing = { path = "../core/backing" } -polkadot-node-core-pvf-worker = { path = "../core/pvf/worker" } +polkadot-node-core-pvf-execute-worker = { path = "../core/pvf/execute-worker" } +polkadot-node-core-pvf-prepare-worker = { path = "../core/pvf/prepare-worker" } polkadot-node-primitives = { path = "../primitives" } polkadot-primitives = { path = "../../primitives" } color-eyre = { version = "0.6.1", default-features = false } diff --git a/node/malus/src/malus.rs b/node/malus/src/malus.rs index 36cf0cca06bf..d09f8be990a4 100644 --- a/node/malus/src/malus.rs +++ b/node/malus/src/malus.rs @@ -97,7 +97,7 @@ impl MalusCli { #[cfg(not(target_os = "android"))] { - polkadot_node_core_pvf_worker::prepare_worker_entrypoint( + polkadot_node_core_pvf_prepare_worker::worker_entrypoint( &cmd.socket_path, None, ); @@ -111,7 +111,7 @@ impl MalusCli { #[cfg(not(target_os = "android"))] { - polkadot_node_core_pvf_worker::execute_worker_entrypoint( + polkadot_node_core_pvf_execute_worker::worker_entrypoint( &cmd.socket_path, None, ); diff --git a/node/test/performance-test/Cargo.toml b/node/test/performance-test/Cargo.toml index 70f072c03ae1..4e3001b3ee66 100644 --- a/node/test/performance-test/Cargo.toml +++ b/node/test/performance-test/Cargo.toml @@ -10,7 +10,7 @@ quote = "1.0.26" env_logger = "0.9" log = "0.4" -polkadot-node-core-pvf-worker = { path = "../../core/pvf/worker" } +polkadot-node-core-pvf-prepare-worker = { path = "../../core/pvf/prepare-worker" } polkadot-erasure-coding = { path = "../../../erasure-coding" } polkadot-node-primitives = { path = "../../primitives" } polkadot-primitives = { path = "../../../primitives" } diff --git a/node/test/performance-test/src/lib.rs b/node/test/performance-test/src/lib.rs index 1afa43cc62ba..15073912654a 100644 --- a/node/test/performance-test/src/lib.rs +++ b/node/test/performance-test/src/lib.rs @@ -65,9 +65,9 @@ pub fn measure_pvf_prepare(wasm_code: &[u8]) -> Result .or(Err(PerfCheckError::CodeDecompressionFailed))?; // Recreate the pipeline from the pvf prepare worker. - let blob = - polkadot_node_core_pvf_worker::prevalidate(code.as_ref()).map_err(PerfCheckError::from)?; - polkadot_node_core_pvf_worker::prepare(blob, &ExecutorParams::default()) + let blob = polkadot_node_core_pvf_prepare_worker::prevalidate(code.as_ref()) + .map_err(PerfCheckError::from)?; + polkadot_node_core_pvf_prepare_worker::prepare(blob, &ExecutorParams::default()) .map_err(PerfCheckError::from)?; Ok(start.elapsed()) diff --git a/parachain/test-parachains/adder/collator/Cargo.toml b/parachain/test-parachains/adder/collator/Cargo.toml index ee20cb0b0d17..7fe4aefc688d 100644 --- a/parachain/test-parachains/adder/collator/Cargo.toml +++ b/parachain/test-parachains/adder/collator/Cargo.toml @@ -34,7 +34,7 @@ sc-service = { git = "https://github.com/paritytech/substrate", branch = "master # This one is tricky. Even though it is not used directly by the collator, we still need it for the # `puppet_worker` binary, which is required for the integration test. However, this shouldn't be # a big problem since it is used transitively anyway. -polkadot-node-core-pvf-worker = { path = "../../../../node/core/pvf/worker" } +polkadot-node-core-pvf = { path = "../../../../node/core/pvf" } [dev-dependencies] polkadot-parachain = { path = "../../.." } diff --git a/parachain/test-parachains/adder/collator/bin/puppet_worker.rs b/parachain/test-parachains/adder/collator/bin/puppet_worker.rs index ddd81971292b..7f93519d8454 100644 --- a/parachain/test-parachains/adder/collator/bin/puppet_worker.rs +++ b/parachain/test-parachains/adder/collator/bin/puppet_worker.rs @@ -14,4 +14,4 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -polkadot_node_core_pvf_worker::decl_puppet_worker_main!(); +polkadot_node_core_pvf::decl_puppet_worker_main!(); diff --git a/parachain/test-parachains/adder/collator/src/lib.rs b/parachain/test-parachains/adder/collator/src/lib.rs index 4b2b9248de22..02a4598f9e47 100644 --- a/parachain/test-parachains/adder/collator/src/lib.rs +++ b/parachain/test-parachains/adder/collator/src/lib.rs @@ -272,7 +272,7 @@ mod tests { } fn validate_collation(collator: &Collator, parent_head: HeadData, collation: Collation) { - use polkadot_node_core_pvf_worker::testing::validate_candidate; + use polkadot_node_core_pvf::testing::validate_candidate; let block_data = match collation.proof_of_validity { MaybeCompressedPoV::Raw(pov) => pov.block_data, diff --git a/parachain/test-parachains/undying/collator/Cargo.toml b/parachain/test-parachains/undying/collator/Cargo.toml index 1b2ccf3be0ca..2b9d80401f5d 100644 --- a/parachain/test-parachains/undying/collator/Cargo.toml +++ b/parachain/test-parachains/undying/collator/Cargo.toml @@ -34,7 +34,7 @@ sc-service = { git = "https://github.com/paritytech/substrate", branch = "master # This one is tricky. Even though it is not used directly by the collator, we still need it for the # `puppet_worker` binary, which is required for the integration test. However, this shouldn't be # a big problem since it is used transitively anyway. -polkadot-node-core-pvf-worker = { path = "../../../../node/core/pvf/worker" } +polkadot-node-core-pvf = { path = "../../../../node/core/pvf" } [dev-dependencies] polkadot-parachain = { path = "../../.." } diff --git a/parachain/test-parachains/undying/collator/bin/puppet_worker.rs b/parachain/test-parachains/undying/collator/bin/puppet_worker.rs index ddd81971292b..7f93519d8454 100644 --- a/parachain/test-parachains/undying/collator/bin/puppet_worker.rs +++ b/parachain/test-parachains/undying/collator/bin/puppet_worker.rs @@ -14,4 +14,4 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -polkadot_node_core_pvf_worker::decl_puppet_worker_main!(); +polkadot_node_core_pvf::decl_puppet_worker_main!(); diff --git a/parachain/test-parachains/undying/collator/src/lib.rs b/parachain/test-parachains/undying/collator/src/lib.rs index dcaf9b63296d..838590fa16f5 100644 --- a/parachain/test-parachains/undying/collator/src/lib.rs +++ b/parachain/test-parachains/undying/collator/src/lib.rs @@ -354,7 +354,7 @@ mod tests { } fn validate_collation(collator: &Collator, parent_head: HeadData, collation: Collation) { - use polkadot_node_core_pvf_worker::testing::validate_candidate; + use polkadot_node_core_pvf::testing::validate_candidate; let block_data = match collation.proof_of_validity { MaybeCompressedPoV::Raw(pov) => pov.block_data,