diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs index 1a3429f7ab11..f158dbfa6c8b 100644 --- a/node/core/pvf/src/artifacts.rs +++ b/node/core/pvf/src/artifacts.rs @@ -23,33 +23,44 @@ use std::{ time::{Duration, SystemTime}, }; -/// A final product of preparation process. Contains either a ready to run compiled artifact or -/// a description what went wrong. -#[derive(Encode, Decode)] -pub enum Artifact { +/// An error that occurred during the prepare part of the PVF pipeline. +#[derive(Debug, Clone, Encode, Decode)] +pub enum PrepareError { /// During the prevalidation stage of preparation an issue was found with the PVF. - PrevalidationErr(String), + Prevalidation(String), /// Compilation failed for the given PVF. - PreparationErr(String), + Preparation(String), /// This state indicates that the process assigned to prepare the artifact wasn't responsible /// or were killed. This state is reported by the validation host (not by the worker). - DidntMakeIt, - /// The PVF passed all the checks and is ready for execution. - Compiled { compiled_artifact: Vec }, + DidNotMakeIt, } -impl Artifact { - /// Serializes this struct into a byte buffer. - pub fn serialize(&self) -> Vec { - self.encode() +/// A wrapper for the compiled PVF code. +#[derive(Encode, Decode)] +pub struct CompiledArtifact(Vec); + +impl CompiledArtifact { + pub fn new(code: Vec) -> Self { + Self(code) } +} - /// Deserialize the given byte buffer to an artifact. - pub fn deserialize(mut bytes: &[u8]) -> Result { - Artifact::decode(&mut bytes).map_err(|e| format!("{:?}", e)) +impl AsRef<[u8]> for CompiledArtifact { + fn as_ref(&self) -> &[u8] { + self.0.as_slice() } } +/// A final product of preparation process. Contains either a ready to run compiled artifact or +/// a description what went wrong. +#[derive(Encode, Decode)] +pub enum Artifact { + /// An error occurred during the prepare part of the PVF pipeline. + Error(PrepareError), + /// The PVF passed all the checks and is ready for execution. + Compiled(CompiledArtifact), +} + /// Identifier of an artifact. Right now it only encodes a code hash of the PVF. But if we get to /// multiple engine implementations the artifact ID should include the engine type as well. #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -117,6 +128,9 @@ pub enum ArtifactState { }, /// A task to prepare this artifact is scheduled. Preparing, + /// The code couldn't be compiled due to an error. Such artifacts + /// never reach the executor and stay in the host's memory. + FailedToProcess(PrepareError), } /// A container of all known artifact ids and their states. diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs index f0ba95515054..d82564ca681d 100644 --- a/node/core/pvf/src/error.rs +++ b/node/core/pvf/src/error.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use crate::artifacts::PrepareError; + /// A error raised during validation of the candidate. #[derive(Debug, Clone)] pub enum ValidationError { @@ -54,3 +56,14 @@ pub enum InvalidCandidate { /// PVF execution (compilation is not included) took more time than was allotted. HardTimeout, } + +impl From for ValidationError { + fn from(error: PrepareError) -> Self { + let error_str = match error { + PrepareError::Prevalidation(err) => err, + PrepareError::Preparation(err) => err, + PrepareError::DidNotMakeIt => "preparation timeout".to_owned(), + }; + ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(error_str)) + } +} diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs index 97fe5aec3dbf..1dd61442b931 100644 --- a/node/core/pvf/src/execute/worker.rs +++ b/node/core/pvf/src/execute/worker.rs @@ -15,7 +15,7 @@ // along with Polkadot. If not, see . use crate::{ - artifacts::{Artifact, ArtifactPathId}, + artifacts::{ArtifactPathId, CompiledArtifact}, executor_intf::TaskExecutor, worker_common::{ bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path, @@ -217,18 +217,12 @@ async fn validate_using_artifact( Ok(b) => b, }; - let artifact = match Artifact::deserialize(&artifact_bytes) { + let artifact = match CompiledArtifact::decode(&mut artifact_bytes.as_slice()) { Err(e) => return Response::InternalError(format!("artifact deserialization: {:?}", e)), Ok(a) => a, }; - let compiled_artifact = match &artifact { - Artifact::PrevalidationErr(msg) => return Response::format_invalid("prevalidation", msg), - Artifact::PreparationErr(msg) => return Response::format_invalid("preparation", msg), - Artifact::DidntMakeIt => return Response::format_invalid("preparation timeout", ""), - - Artifact::Compiled { compiled_artifact } => compiled_artifact, - }; + let compiled_artifact = artifact.as_ref(); let validation_started_at = Instant::now(); let descriptor_bytes = match unsafe { diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 89b230bc90d7..69b8286c62d3 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -327,8 +327,7 @@ async fn run( .await); }, from_prepare_queue = from_prepare_queue_rx.next() => { - let prepare::FromQueue::Prepared(artifact_id) - = break_if_fatal!(from_prepare_queue.ok_or(Fatal)); + let from_queue = break_if_fatal!(from_prepare_queue.ok_or(Fatal)); // Note that preparation always succeeds. // @@ -344,7 +343,7 @@ async fn run( &mut artifacts, &mut to_execute_queue_tx, &mut awaiting_prepare, - artifact_id, + from_queue, ).await); }, } @@ -419,6 +418,9 @@ async fn handle_execute_pvf( awaiting_prepare.add(artifact_id, params, result_tx); }, + ArtifactState::FailedToProcess(error) => { + let _ = result_tx.send(Err(ValidationError::from(error.clone()))); + }, } } else { // Artifact is unknown: register it and enqueue a job with the corresponding priority and @@ -450,6 +452,7 @@ async fn handle_heads_up( // Already preparing. We don't need to send a priority amend either because // it can't get any lower than the background. }, + ArtifactState::FailedToProcess(_) => {}, } } else { // The artifact is unknown: register it and put a background job into the prepare queue. @@ -471,8 +474,10 @@ async fn handle_prepare_done( artifacts: &mut Artifacts, execute_queue: &mut mpsc::Sender, awaiting_prepare: &mut AwaitingPrepare, - artifact_id: ArtifactId, + from_queue: prepare::FromQueue, ) -> Result<(), Fatal> { + let prepare::FromQueue { artifact_id, result } = from_queue; + // Make some sanity checks and extract the current state. let state = match artifacts.artifact_state_mut(&artifact_id) { None => { @@ -493,9 +498,21 @@ async fn handle_prepare_done( never!("the artifact is already prepared: {:?}", artifact_id); return Ok(()) }, + Some(ArtifactState::FailedToProcess(_)) => { + // The reasoning is similar to the above, the artifact cannot be + // processed at this point. + never!("the artifact is already processed unsuccessfully: {:?}", artifact_id); + return Ok(()) + }, Some(state @ ArtifactState::Preparing) => state, }; + // Don't send failed artifacts to the execution's queue. + if let Err(error) = result { + *state = ArtifactState::FailedToProcess(error); + return Ok(()) + } + // It's finally time to dispatch all the execution requests that were waiting for this artifact // to be prepared. let pending_requests = awaiting_prepare.take(&artifact_id); @@ -895,7 +912,7 @@ mod tests { ); test.from_prepare_queue_tx - .send(prepare::FromQueue::Prepared(artifact_id(1))) + .send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) }) .await .unwrap(); let result_tx_pvf_1_1 = assert_matches!( @@ -908,7 +925,7 @@ mod tests { ); test.from_prepare_queue_tx - .send(prepare::FromQueue::Prepared(artifact_id(2))) + .send(prepare::FromQueue { artifact_id: artifact_id(2), result: Ok(()) }) .await .unwrap(); let result_tx_pvf_2 = assert_matches!( @@ -957,7 +974,7 @@ mod tests { ); test.from_prepare_queue_tx - .send(prepare::FromQueue::Prepared(artifact_id(1))) + .send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) }) .await .unwrap(); diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 035d799ac594..e0dae299d318 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -16,6 +16,7 @@ use super::worker::{self, Outcome}; use crate::{ + artifacts::PrepareError, metrics::Metrics, worker_common::{IdleWorker, WorkerHandle}, LOG_TARGET, @@ -80,7 +81,13 @@ pub enum FromPool { /// The given worker either succeeded or failed the given job. Under any circumstances the /// artifact file has been written. The `bool` says whether the worker ripped. - Concluded(Worker, bool), + Concluded { + worker: Worker, + rip: bool, + /// [`Ok`] indicates that compiled artifact is successfully stored on disk. + /// Otherwise, an [error](PrepareError) is supplied. + result: Result<(), PrepareError>, + }, /// The given worker ceased to exist. Rip(Worker), @@ -295,7 +302,7 @@ fn handle_mux( }, PoolEvent::StartWork(worker, outcome) => { match outcome { - Outcome::Concluded(idle) => { + Outcome::Concluded { worker: idle, result } => { let data = match spawned.get_mut(worker) { None => { // Perhaps the worker was killed meanwhile and the result is no longer @@ -310,7 +317,7 @@ fn handle_mux( let old = data.idle.replace(idle); assert_matches!(old, None, "attempt to overwrite an idle worker"); - reply(from_pool, FromPool::Concluded(worker, false))?; + reply(from_pool, FromPool::Concluded { worker, rip: false, result })?; Ok(()) }, @@ -321,9 +328,16 @@ fn handle_mux( Ok(()) }, - Outcome::DidntMakeIt => { + Outcome::DidNotMakeIt => { if attempt_retire(metrics, spawned, worker) { - reply(from_pool, FromPool::Concluded(worker, true))?; + reply( + from_pool, + FromPool::Concluded { + worker, + rip: true, + result: Err(PrepareError::DidNotMakeIt), + }, + )?; } Ok(()) diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index 4ffa21de435b..076a2b351a1e 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -17,7 +17,11 @@ //! A queue that handles requests for PVF preparation. use super::pool::{self, Worker}; -use crate::{artifacts::ArtifactId, metrics::Metrics, Priority, Pvf, LOG_TARGET}; +use crate::{ + artifacts::{ArtifactId, PrepareError}, + metrics::Metrics, + Priority, Pvf, LOG_TARGET, +}; use always_assert::{always, never}; use async_std::path::PathBuf; use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt}; @@ -37,9 +41,13 @@ pub enum ToQueue { } /// A response from queue. -#[derive(Debug, PartialEq, Eq)] -pub enum FromQueue { - Prepared(ArtifactId), +#[derive(Debug)] +pub struct FromQueue { + /// Identifier of an artifact. + pub(crate) artifact_id: ArtifactId, + /// Outcome of the PVF processing. [`Ok`] indicates that compiled artifact + /// is successfully stored on disk. Otherwise, an [error](PrepareError) is supplied. + pub(crate) result: Result<(), PrepareError>, } #[derive(Default)] @@ -299,7 +307,8 @@ async fn handle_from_pool(queue: &mut Queue, from_pool: pool::FromPool) -> Resul use pool::FromPool::*; match from_pool { Spawned(worker) => handle_worker_spawned(queue, worker).await?, - Concluded(worker, rip) => handle_worker_concluded(queue, worker, rip).await?, + Concluded { worker, rip, result } => + handle_worker_concluded(queue, worker, rip, result).await?, Rip(worker) => handle_worker_rip(queue, worker).await?, } Ok(()) @@ -320,6 +329,7 @@ async fn handle_worker_concluded( queue: &mut Queue, worker: Worker, rip: bool, + result: Result<(), PrepareError>, ) -> Result<(), Fatal> { queue.metrics.prepare_concluded(); @@ -377,7 +387,7 @@ async fn handle_worker_concluded( "prepare worker concluded", ); - reply(&mut queue.from_queue_tx, FromQueue::Prepared(artifact_id))?; + reply(&mut queue.from_queue_tx, FromQueue { artifact_id, result })?; // Figure out what to do with the worker. if rip { @@ -641,12 +651,9 @@ mod tests { let w = test.workers.insert(()); test.send_from_pool(pool::FromPool::Spawned(w)); - test.send_from_pool(pool::FromPool::Concluded(w, false)); + test.send_from_pool(pool::FromPool::Concluded { worker: w, rip: false, result: Ok(()) }); - assert_eq!( - test.poll_and_recv_from_queue().await, - FromQueue::Prepared(pvf(1).as_artifact_id()) - ); + assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); } #[async_std::test] @@ -671,7 +678,7 @@ mod tests { assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); - test.send_from_pool(pool::FromPool::Concluded(w1, false)); + test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, result: Ok(()) }); assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); @@ -704,7 +711,7 @@ mod tests { // That's a bit silly in this context, but in production there will be an entire pool up // to the `soft_capacity` of workers and it doesn't matter which one to cull. Either way, // we just check that edge case of an edge case works. - test.send_from_pool(pool::FromPool::Concluded(w1, false)); + test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, result: Ok(()) }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1)); } @@ -749,15 +756,12 @@ mod tests { assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); // Conclude worker 1 and rip it. - test.send_from_pool(pool::FromPool::Concluded(w1, true)); + test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, result: Ok(()) }); // Since there is still work, the queue requested one extra worker to spawn to handle the // remaining enqueued work items. assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); - assert_eq!( - test.poll_and_recv_from_queue().await, - FromQueue::Prepared(pvf(1).as_artifact_id()) - ); + assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); } #[async_std::test] @@ -773,7 +777,7 @@ mod tests { assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); - test.send_from_pool(pool::FromPool::Concluded(w1, true)); + test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, result: Ok(()) }); test.poll_ensure_to_pool_is_empty().await; } diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 510d582f7e03..108d6fa892dd 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -15,7 +15,7 @@ // along with Polkadot. If not, see . use crate::{ - artifacts::Artifact, + artifacts::{Artifact, CompiledArtifact, PrepareError}, worker_common::{ bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, @@ -29,6 +29,7 @@ use async_std::{ }; use futures::FutureExt as _; use futures_timer::Delay; +use parity_scale_codec::{Decode, Encode}; use std::{sync::Arc, time::Duration}; const NICENESS_BACKGROUND: i32 = 10; @@ -48,7 +49,7 @@ pub async fn spawn( pub enum Outcome { /// The worker has finished the work assigned to it. - Concluded(IdleWorker), + Concluded { worker: IdleWorker, result: Result<(), PrepareError> }, /// The host tried to reach the worker but failed. This is most likely because the worked was /// killed by the system. Unreachable, @@ -59,7 +60,7 @@ pub enum Outcome { /// the artifact). /// /// This doesn't return an idle worker instance, thus this worker is no longer usable. - DidntMakeIt, + DidNotMakeIt, } /// Given the idle token of a worker and parameters of work, communicates with the worker and @@ -105,7 +106,7 @@ pub async fn start_work( #[derive(Debug)] enum Selected { - Done, + Done(Result<(), PrepareError>), IoErr, Deadline, } @@ -124,7 +125,7 @@ pub async fn start_work( async_std::fs::rename(&tmp_file, &artifact_path) .await - .map(|_| Selected::Done) + .map(|_| Selected::Done(Ok(()))) .unwrap_or_else(|err| { tracing::warn!( target: LOG_TARGET, @@ -139,15 +140,20 @@ pub async fn start_work( } Ok(response_bytes) => { use sp_core::hexdisplay::HexDisplay; - let bound_bytes = - &response_bytes[..response_bytes.len().min(4)]; - tracing::warn!( - target: LOG_TARGET, - worker_pid = %pid, - "received unexpected response from the prepare worker: {}", - HexDisplay::from(&bound_bytes), - ); - Selected::IoErr + // By convention we expect encoded prepare error. + // If bytes cannot be deserialized, return io error. + if let Ok(error) = PrepareError::decode(&mut response_bytes.clone().as_slice()) { + Selected::Done(Err(error)) + } else { + let bound_bytes = &response_bytes[..response_bytes.len().min(4)]; + tracing::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "received unexpected response from the prepare worker: {}", + HexDisplay::from(&bound_bytes), + ); + Selected::IoErr + } }, Err(err) => { tracing::warn!( @@ -164,24 +170,11 @@ pub async fn start_work( }; match selected { - Selected::Done => { + Selected::Done(result) => { renice(pid, NICENESS_FOREGROUND); - Outcome::Concluded(IdleWorker { stream, pid }) - }, - Selected::IoErr | Selected::Deadline => { - let bytes = Artifact::DidntMakeIt.serialize(); - // best effort: there is nothing we can do here if the write fails. - if let Err(err) = async_std::fs::write(&artifact_path, &bytes).await { - tracing::warn!( - target: LOG_TARGET, - worker_pid = %pid, - "preparation didn't make it, because of `{:?}`: {:?}", - selected, - err, - ); - } - Outcome::DidntMakeIt + Outcome::Concluded { worker: IdleWorker { stream, pid }, result } }, + Selected::IoErr | Selected::Deadline => Outcome::DidNotMakeIt, } }) .await @@ -205,7 +198,7 @@ where "failed to create a temp file for the artifact: {:?}", err, ); - return Outcome::DidntMakeIt + return Outcome::DidNotMakeIt }, }; @@ -288,31 +281,43 @@ pub fn worker_entrypoint(socket_path: &str) { worker_pid = %std::process::id(), "worker: preparing artifact", ); - let artifact_bytes = prepare_artifact(&code).serialize(); - // Write the serialized artifact into into a temp file. - tracing::debug!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "worker: writing artifact to {}", - dest.display(), - ); - async_std::fs::write(&dest, &artifact_bytes).await?; + let bytes = match prepare_artifact(&code) { + Artifact::Error(err) => { + // Serialized error will be written into the socket. + err.encode() + }, + Artifact::Compiled(compiled_artifact) => { + // Write the serialized artifact into into a temp file. + // Since a compiled artifact can be heavy, we only send a single + // byte to indicate the success. + let artifact_bytes = compiled_artifact.encode(); + + tracing::debug!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: writing artifact to {}", + dest.display(), + ); + async_std::fs::write(&dest, &artifact_bytes).await?; + + vec![1u8] + }, + }; - // Return back a byte that signals finishing the work. - framed_send(&mut stream, &[1u8]).await?; + framed_send(&mut stream, &bytes).await?; } }); } fn prepare_artifact(code: &[u8]) -> Artifact { let blob = match crate::executor_intf::prevalidate(code) { - Err(err) => return Artifact::PrevalidationErr(format!("{:?}", err)), + Err(err) => return Artifact::Error(PrepareError::Prevalidation(format!("{:?}", err))), Ok(b) => b, }; match crate::executor_intf::prepare(blob) { - Ok(compiled_artifact) => Artifact::Compiled { compiled_artifact }, - Err(err) => Artifact::PreparationErr(format!("{:?}", err)), + Ok(compiled_artifact) => Artifact::Compiled(CompiledArtifact::new(compiled_artifact)), + Err(err) => Artifact::Error(PrepareError::Preparation(format!("{:?}", err))), } }