From f271c091c31ad702bcc479580da69437fbe25187 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Wed, 22 Sep 2021 21:11:22 +0300 Subject: [PATCH 01/10] pvf host: store only compiled artifacts on disk --- node/core/pvf/src/artifacts.rs | 46 +++++++++----- node/core/pvf/src/error.rs | 13 ++++ node/core/pvf/src/execute/worker.rs | 12 +--- node/core/pvf/src/host.rs | 31 +++++++--- node/core/pvf/src/prepare/pool.rs | 24 ++++++-- node/core/pvf/src/prepare/queue.rs | 42 +++++++------ node/core/pvf/src/prepare/worker.rs | 95 +++++++++++++++-------------- 7 files changed, 162 insertions(+), 101 deletions(-) diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs index 1a3429f7ab11..f158dbfa6c8b 100644 --- a/node/core/pvf/src/artifacts.rs +++ b/node/core/pvf/src/artifacts.rs @@ -23,33 +23,44 @@ use std::{ time::{Duration, SystemTime}, }; -/// A final product of preparation process. Contains either a ready to run compiled artifact or -/// a description what went wrong. -#[derive(Encode, Decode)] -pub enum Artifact { +/// An error that occurred during the prepare part of the PVF pipeline. +#[derive(Debug, Clone, Encode, Decode)] +pub enum PrepareError { /// During the prevalidation stage of preparation an issue was found with the PVF. - PrevalidationErr(String), + Prevalidation(String), /// Compilation failed for the given PVF. - PreparationErr(String), + Preparation(String), /// This state indicates that the process assigned to prepare the artifact wasn't responsible /// or were killed. This state is reported by the validation host (not by the worker). - DidntMakeIt, - /// The PVF passed all the checks and is ready for execution. - Compiled { compiled_artifact: Vec }, + DidNotMakeIt, } -impl Artifact { - /// Serializes this struct into a byte buffer. - pub fn serialize(&self) -> Vec { - self.encode() +/// A wrapper for the compiled PVF code. +#[derive(Encode, Decode)] +pub struct CompiledArtifact(Vec); + +impl CompiledArtifact { + pub fn new(code: Vec) -> Self { + Self(code) } +} - /// Deserialize the given byte buffer to an artifact. - pub fn deserialize(mut bytes: &[u8]) -> Result { - Artifact::decode(&mut bytes).map_err(|e| format!("{:?}", e)) +impl AsRef<[u8]> for CompiledArtifact { + fn as_ref(&self) -> &[u8] { + self.0.as_slice() } } +/// A final product of preparation process. Contains either a ready to run compiled artifact or +/// a description what went wrong. +#[derive(Encode, Decode)] +pub enum Artifact { + /// An error occurred during the prepare part of the PVF pipeline. + Error(PrepareError), + /// The PVF passed all the checks and is ready for execution. + Compiled(CompiledArtifact), +} + /// Identifier of an artifact. Right now it only encodes a code hash of the PVF. But if we get to /// multiple engine implementations the artifact ID should include the engine type as well. #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] @@ -117,6 +128,9 @@ pub enum ArtifactState { }, /// A task to prepare this artifact is scheduled. Preparing, + /// The code couldn't be compiled due to an error. Such artifacts + /// never reach the executor and stay in the host's memory. + FailedToProcess(PrepareError), } /// A container of all known artifact ids and their states. diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs index f0ba95515054..d82564ca681d 100644 --- a/node/core/pvf/src/error.rs +++ b/node/core/pvf/src/error.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use crate::artifacts::PrepareError; + /// A error raised during validation of the candidate. #[derive(Debug, Clone)] pub enum ValidationError { @@ -54,3 +56,14 @@ pub enum InvalidCandidate { /// PVF execution (compilation is not included) took more time than was allotted. HardTimeout, } + +impl From for ValidationError { + fn from(error: PrepareError) -> Self { + let error_str = match error { + PrepareError::Prevalidation(err) => err, + PrepareError::Preparation(err) => err, + PrepareError::DidNotMakeIt => "preparation timeout".to_owned(), + }; + ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(error_str)) + } +} diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs index 97fe5aec3dbf..1dd61442b931 100644 --- a/node/core/pvf/src/execute/worker.rs +++ b/node/core/pvf/src/execute/worker.rs @@ -15,7 +15,7 @@ // along with Polkadot. If not, see . use crate::{ - artifacts::{Artifact, ArtifactPathId}, + artifacts::{ArtifactPathId, CompiledArtifact}, executor_intf::TaskExecutor, worker_common::{ bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path, @@ -217,18 +217,12 @@ async fn validate_using_artifact( Ok(b) => b, }; - let artifact = match Artifact::deserialize(&artifact_bytes) { + let artifact = match CompiledArtifact::decode(&mut artifact_bytes.as_slice()) { Err(e) => return Response::InternalError(format!("artifact deserialization: {:?}", e)), Ok(a) => a, }; - let compiled_artifact = match &artifact { - Artifact::PrevalidationErr(msg) => return Response::format_invalid("prevalidation", msg), - Artifact::PreparationErr(msg) => return Response::format_invalid("preparation", msg), - Artifact::DidntMakeIt => return Response::format_invalid("preparation timeout", ""), - - Artifact::Compiled { compiled_artifact } => compiled_artifact, - }; + let compiled_artifact = artifact.as_ref(); let validation_started_at = Instant::now(); let descriptor_bytes = match unsafe { diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 89b230bc90d7..69b8286c62d3 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -327,8 +327,7 @@ async fn run( .await); }, from_prepare_queue = from_prepare_queue_rx.next() => { - let prepare::FromQueue::Prepared(artifact_id) - = break_if_fatal!(from_prepare_queue.ok_or(Fatal)); + let from_queue = break_if_fatal!(from_prepare_queue.ok_or(Fatal)); // Note that preparation always succeeds. // @@ -344,7 +343,7 @@ async fn run( &mut artifacts, &mut to_execute_queue_tx, &mut awaiting_prepare, - artifact_id, + from_queue, ).await); }, } @@ -419,6 +418,9 @@ async fn handle_execute_pvf( awaiting_prepare.add(artifact_id, params, result_tx); }, + ArtifactState::FailedToProcess(error) => { + let _ = result_tx.send(Err(ValidationError::from(error.clone()))); + }, } } else { // Artifact is unknown: register it and enqueue a job with the corresponding priority and @@ -450,6 +452,7 @@ async fn handle_heads_up( // Already preparing. We don't need to send a priority amend either because // it can't get any lower than the background. }, + ArtifactState::FailedToProcess(_) => {}, } } else { // The artifact is unknown: register it and put a background job into the prepare queue. @@ -471,8 +474,10 @@ async fn handle_prepare_done( artifacts: &mut Artifacts, execute_queue: &mut mpsc::Sender, awaiting_prepare: &mut AwaitingPrepare, - artifact_id: ArtifactId, + from_queue: prepare::FromQueue, ) -> Result<(), Fatal> { + let prepare::FromQueue { artifact_id, result } = from_queue; + // Make some sanity checks and extract the current state. let state = match artifacts.artifact_state_mut(&artifact_id) { None => { @@ -493,9 +498,21 @@ async fn handle_prepare_done( never!("the artifact is already prepared: {:?}", artifact_id); return Ok(()) }, + Some(ArtifactState::FailedToProcess(_)) => { + // The reasoning is similar to the above, the artifact cannot be + // processed at this point. + never!("the artifact is already processed unsuccessfully: {:?}", artifact_id); + return Ok(()) + }, Some(state @ ArtifactState::Preparing) => state, }; + // Don't send failed artifacts to the execution's queue. + if let Err(error) = result { + *state = ArtifactState::FailedToProcess(error); + return Ok(()) + } + // It's finally time to dispatch all the execution requests that were waiting for this artifact // to be prepared. let pending_requests = awaiting_prepare.take(&artifact_id); @@ -895,7 +912,7 @@ mod tests { ); test.from_prepare_queue_tx - .send(prepare::FromQueue::Prepared(artifact_id(1))) + .send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) }) .await .unwrap(); let result_tx_pvf_1_1 = assert_matches!( @@ -908,7 +925,7 @@ mod tests { ); test.from_prepare_queue_tx - .send(prepare::FromQueue::Prepared(artifact_id(2))) + .send(prepare::FromQueue { artifact_id: artifact_id(2), result: Ok(()) }) .await .unwrap(); let result_tx_pvf_2 = assert_matches!( @@ -957,7 +974,7 @@ mod tests { ); test.from_prepare_queue_tx - .send(prepare::FromQueue::Prepared(artifact_id(1))) + .send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) }) .await .unwrap(); diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 035d799ac594..e0dae299d318 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -16,6 +16,7 @@ use super::worker::{self, Outcome}; use crate::{ + artifacts::PrepareError, metrics::Metrics, worker_common::{IdleWorker, WorkerHandle}, LOG_TARGET, @@ -80,7 +81,13 @@ pub enum FromPool { /// The given worker either succeeded or failed the given job. Under any circumstances the /// artifact file has been written. The `bool` says whether the worker ripped. - Concluded(Worker, bool), + Concluded { + worker: Worker, + rip: bool, + /// [`Ok`] indicates that compiled artifact is successfully stored on disk. + /// Otherwise, an [error](PrepareError) is supplied. + result: Result<(), PrepareError>, + }, /// The given worker ceased to exist. Rip(Worker), @@ -295,7 +302,7 @@ fn handle_mux( }, PoolEvent::StartWork(worker, outcome) => { match outcome { - Outcome::Concluded(idle) => { + Outcome::Concluded { worker: idle, result } => { let data = match spawned.get_mut(worker) { None => { // Perhaps the worker was killed meanwhile and the result is no longer @@ -310,7 +317,7 @@ fn handle_mux( let old = data.idle.replace(idle); assert_matches!(old, None, "attempt to overwrite an idle worker"); - reply(from_pool, FromPool::Concluded(worker, false))?; + reply(from_pool, FromPool::Concluded { worker, rip: false, result })?; Ok(()) }, @@ -321,9 +328,16 @@ fn handle_mux( Ok(()) }, - Outcome::DidntMakeIt => { + Outcome::DidNotMakeIt => { if attempt_retire(metrics, spawned, worker) { - reply(from_pool, FromPool::Concluded(worker, true))?; + reply( + from_pool, + FromPool::Concluded { + worker, + rip: true, + result: Err(PrepareError::DidNotMakeIt), + }, + )?; } Ok(()) diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index 4ffa21de435b..076a2b351a1e 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -17,7 +17,11 @@ //! A queue that handles requests for PVF preparation. use super::pool::{self, Worker}; -use crate::{artifacts::ArtifactId, metrics::Metrics, Priority, Pvf, LOG_TARGET}; +use crate::{ + artifacts::{ArtifactId, PrepareError}, + metrics::Metrics, + Priority, Pvf, LOG_TARGET, +}; use always_assert::{always, never}; use async_std::path::PathBuf; use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt}; @@ -37,9 +41,13 @@ pub enum ToQueue { } /// A response from queue. -#[derive(Debug, PartialEq, Eq)] -pub enum FromQueue { - Prepared(ArtifactId), +#[derive(Debug)] +pub struct FromQueue { + /// Identifier of an artifact. + pub(crate) artifact_id: ArtifactId, + /// Outcome of the PVF processing. [`Ok`] indicates that compiled artifact + /// is successfully stored on disk. Otherwise, an [error](PrepareError) is supplied. + pub(crate) result: Result<(), PrepareError>, } #[derive(Default)] @@ -299,7 +307,8 @@ async fn handle_from_pool(queue: &mut Queue, from_pool: pool::FromPool) -> Resul use pool::FromPool::*; match from_pool { Spawned(worker) => handle_worker_spawned(queue, worker).await?, - Concluded(worker, rip) => handle_worker_concluded(queue, worker, rip).await?, + Concluded { worker, rip, result } => + handle_worker_concluded(queue, worker, rip, result).await?, Rip(worker) => handle_worker_rip(queue, worker).await?, } Ok(()) @@ -320,6 +329,7 @@ async fn handle_worker_concluded( queue: &mut Queue, worker: Worker, rip: bool, + result: Result<(), PrepareError>, ) -> Result<(), Fatal> { queue.metrics.prepare_concluded(); @@ -377,7 +387,7 @@ async fn handle_worker_concluded( "prepare worker concluded", ); - reply(&mut queue.from_queue_tx, FromQueue::Prepared(artifact_id))?; + reply(&mut queue.from_queue_tx, FromQueue { artifact_id, result })?; // Figure out what to do with the worker. if rip { @@ -641,12 +651,9 @@ mod tests { let w = test.workers.insert(()); test.send_from_pool(pool::FromPool::Spawned(w)); - test.send_from_pool(pool::FromPool::Concluded(w, false)); + test.send_from_pool(pool::FromPool::Concluded { worker: w, rip: false, result: Ok(()) }); - assert_eq!( - test.poll_and_recv_from_queue().await, - FromQueue::Prepared(pvf(1).as_artifact_id()) - ); + assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); } #[async_std::test] @@ -671,7 +678,7 @@ mod tests { assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); - test.send_from_pool(pool::FromPool::Concluded(w1, false)); + test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, result: Ok(()) }); assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); @@ -704,7 +711,7 @@ mod tests { // That's a bit silly in this context, but in production there will be an entire pool up // to the `soft_capacity` of workers and it doesn't matter which one to cull. Either way, // we just check that edge case of an edge case works. - test.send_from_pool(pool::FromPool::Concluded(w1, false)); + test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, result: Ok(()) }); assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1)); } @@ -749,15 +756,12 @@ mod tests { assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); // Conclude worker 1 and rip it. - test.send_from_pool(pool::FromPool::Concluded(w1, true)); + test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, result: Ok(()) }); // Since there is still work, the queue requested one extra worker to spawn to handle the // remaining enqueued work items. assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn); - assert_eq!( - test.poll_and_recv_from_queue().await, - FromQueue::Prepared(pvf(1).as_artifact_id()) - ); + assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id()); } #[async_std::test] @@ -773,7 +777,7 @@ mod tests { assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); - test.send_from_pool(pool::FromPool::Concluded(w1, true)); + test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, result: Ok(()) }); test.poll_ensure_to_pool_is_empty().await; } diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 510d582f7e03..bac48142931d 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -15,7 +15,7 @@ // along with Polkadot. If not, see . use crate::{ - artifacts::Artifact, + artifacts::{Artifact, CompiledArtifact, PrepareError}, worker_common::{ bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, @@ -29,6 +29,7 @@ use async_std::{ }; use futures::FutureExt as _; use futures_timer::Delay; +use parity_scale_codec::{Decode, Encode}; use std::{sync::Arc, time::Duration}; const NICENESS_BACKGROUND: i32 = 10; @@ -48,7 +49,7 @@ pub async fn spawn( pub enum Outcome { /// The worker has finished the work assigned to it. - Concluded(IdleWorker), + Concluded { worker: IdleWorker, result: Result<(), PrepareError> }, /// The host tried to reach the worker but failed. This is most likely because the worked was /// killed by the system. Unreachable, @@ -59,7 +60,7 @@ pub enum Outcome { /// the artifact). /// /// This doesn't return an idle worker instance, thus this worker is no longer usable. - DidntMakeIt, + DidNotMakeIt, } /// Given the idle token of a worker and parameters of work, communicates with the worker and @@ -105,7 +106,7 @@ pub async fn start_work( #[derive(Debug)] enum Selected { - Done, + Done(Result<(), PrepareError>), IoErr, Deadline, } @@ -124,7 +125,7 @@ pub async fn start_work( async_std::fs::rename(&tmp_file, &artifact_path) .await - .map(|_| Selected::Done) + .map(|_| Selected::Done(Ok(()))) .unwrap_or_else(|err| { tracing::warn!( target: LOG_TARGET, @@ -139,15 +140,20 @@ pub async fn start_work( } Ok(response_bytes) => { use sp_core::hexdisplay::HexDisplay; - let bound_bytes = - &response_bytes[..response_bytes.len().min(4)]; - tracing::warn!( - target: LOG_TARGET, - worker_pid = %pid, - "received unexpected response from the prepare worker: {}", - HexDisplay::from(&bound_bytes), - ); - Selected::IoErr + // By convention we expect encoded prepare error. + // If bytes cannot be deserialized, return io error. + if let Ok(error) = PrepareError::decode(&mut response_bytes.clone().as_slice()) { + Selected::Done(Err(error)) + } else { + let bound_bytes = &response_bytes[..response_bytes.len().min(4)]; + tracing::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "received unexpected response from the prepare worker: {}", + HexDisplay::from(&bound_bytes), + ); + Selected::IoErr + } }, Err(err) => { tracing::warn!( @@ -164,24 +170,11 @@ pub async fn start_work( }; match selected { - Selected::Done => { + Selected::Done(result) => { renice(pid, NICENESS_FOREGROUND); - Outcome::Concluded(IdleWorker { stream, pid }) - }, - Selected::IoErr | Selected::Deadline => { - let bytes = Artifact::DidntMakeIt.serialize(); - // best effort: there is nothing we can do here if the write fails. - if let Err(err) = async_std::fs::write(&artifact_path, &bytes).await { - tracing::warn!( - target: LOG_TARGET, - worker_pid = %pid, - "preparation didn't make it, because of `{:?}`: {:?}", - selected, - err, - ); - } - Outcome::DidntMakeIt + Outcome::Concluded { worker: IdleWorker { stream, pid }, result } }, + Selected::IoErr | Selected::Deadline => Outcome::DidNotMakeIt, } }) .await @@ -205,7 +198,7 @@ where "failed to create a temp file for the artifact: {:?}", err, ); - return Outcome::DidntMakeIt + return Outcome::DidNotMakeIt }, }; @@ -288,31 +281,43 @@ pub fn worker_entrypoint(socket_path: &str) { worker_pid = %std::process::id(), "worker: preparing artifact", ); - let artifact_bytes = prepare_artifact(&code).serialize(); - // Write the serialized artifact into into a temp file. - tracing::debug!( - target: LOG_TARGET, - worker_pid = %std::process::id(), - "worker: writing artifact to {}", - dest.display(), - ); - async_std::fs::write(&dest, &artifact_bytes).await?; + let bytes = match prepare_artifact(&code) { + Artifact::Error(err) => { + // Serialized error will be written into the socket. + err.encode() + }, + Artifact::Compiled(compiled_artifact) => { + // Write the serialized artifact into a temp file. + // Since a compiled artifact can be heavy, we only send a single + // byte to indicate the success. + let artifact_bytes = compiled_artifact.encode(); + + tracing::debug!( + target: LOG_TARGET, + worker_pid = %std::process::id(), + "worker: writing artifact to {}", + dest.display(), + ); + async_std::fs::write(&dest, &artifact_bytes).await?; + + vec![1u8] + }, + }; - // Return back a byte that signals finishing the work. - framed_send(&mut stream, &[1u8]).await?; + framed_send(&mut stream, &bytes).await?; } }); } fn prepare_artifact(code: &[u8]) -> Artifact { let blob = match crate::executor_intf::prevalidate(code) { - Err(err) => return Artifact::PrevalidationErr(format!("{:?}", err)), + Err(err) => return Artifact::Error(PrepareError::Prevalidation(format!("{:?}", err))), Ok(b) => b, }; match crate::executor_intf::prepare(blob) { - Ok(compiled_artifact) => Artifact::Compiled { compiled_artifact }, - Err(err) => Artifact::PreparationErr(format!("{:?}", err)), + Ok(compiled_artifact) => Artifact::Compiled(CompiledArtifact::new(compiled_artifact)), + Err(err) => Artifact::Error(PrepareError::Preparation(format!("{:?}", err))), } } From 554b8a03699e1c18fe679a8e70b995840fb4a995 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Thu, 23 Sep 2021 16:15:16 +0300 Subject: [PATCH 02/10] Correctly handle failed artifacts --- node/core/pvf/src/host.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 69b8286c62d3..b7de6f5997c2 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -507,12 +507,6 @@ async fn handle_prepare_done( Some(state @ ArtifactState::Preparing) => state, }; - // Don't send failed artifacts to the execution's queue. - if let Err(error) = result { - *state = ArtifactState::FailedToProcess(error); - return Ok(()) - } - // It's finally time to dispatch all the execution requests that were waiting for this artifact // to be prepared. let pending_requests = awaiting_prepare.take(&artifact_id); @@ -523,6 +517,13 @@ async fn handle_prepare_done( continue } + // Don't send failed artifacts to the execution's queue. + if let Err(ref error) = result { + *state = ArtifactState::FailedToProcess(error.clone()); + let _ = result_tx.send(Err(ValidationError::from(error.clone()))); + continue + } + send_execute( execute_queue, execute::ToQueue::Enqueue { From 8bdc7d3791aec014a3f1dd26b2291add569bc07a Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Thu, 23 Sep 2021 16:52:02 +0300 Subject: [PATCH 03/10] Serialize result of PVF preparation uniquely --- node/core/pvf/src/prepare/worker.rs | 65 +++++++++++++++-------------- 1 file changed, 34 insertions(+), 31 deletions(-) diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index bac48142931d..b49f6435c2cd 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -30,6 +30,7 @@ use async_std::{ use futures::FutureExt as _; use futures_timer::Delay; use parity_scale_codec::{Decode, Encode}; +use sp_core::hexdisplay::HexDisplay; use std::{sync::Arc, time::Duration}; const NICENESS_BACKGROUND: i32 = 10; @@ -114,37 +115,39 @@ pub async fn start_work( let selected = futures::select! { res = framed_recv(&mut stream).fuse() => { match res { - Ok(x) if x == &[1u8] => { - tracing::debug!( - target: LOG_TARGET, - worker_pid = %pid, - "promoting WIP artifact {} to {}", - tmp_file.display(), - artifact_path.display(), - ); - - async_std::fs::rename(&tmp_file, &artifact_path) - .await - .map(|_| Selected::Done(Ok(()))) - .unwrap_or_else(|err| { - tracing::warn!( + Ok(response_bytes) => { + // By convention we expect encoded `Result<(), PrepareError>`. + if let Ok(result) = + >::decode(&mut response_bytes.clone().as_slice()) + { + if result.is_ok() { + tracing::debug!( target: LOG_TARGET, worker_pid = %pid, - "failed to rename the artifact from {} to {}: {:?}", + "promoting WIP artifact {} to {}", tmp_file.display(), artifact_path.display(), - err, ); - Selected::IoErr - }) - } - Ok(response_bytes) => { - use sp_core::hexdisplay::HexDisplay; - // By convention we expect encoded prepare error. - // If bytes cannot be deserialized, return io error. - if let Ok(error) = PrepareError::decode(&mut response_bytes.clone().as_slice()) { - Selected::Done(Err(error)) + + async_std::fs::rename(&tmp_file, &artifact_path) + .await + .map(|_| Selected::Done(result)) + .unwrap_or_else(|err| { + tracing::warn!( + target: LOG_TARGET, + worker_pid = %pid, + "failed to rename the artifact from {} to {}: {:?}", + tmp_file.display(), + artifact_path.display(), + err, + ); + Selected::IoErr + }) + } else { + Selected::Done(result) + } } else { + // We received invalid bytes from the worker. let bound_bytes = &response_bytes[..response_bytes.len().min(4)]; tracing::warn!( target: LOG_TARGET, @@ -282,15 +285,15 @@ pub fn worker_entrypoint(socket_path: &str) { "worker: preparing artifact", ); - let bytes = match prepare_artifact(&code) { + let result = match prepare_artifact(&code) { Artifact::Error(err) => { // Serialized error will be written into the socket. - err.encode() + Err(err) }, Artifact::Compiled(compiled_artifact) => { // Write the serialized artifact into a temp file. - // Since a compiled artifact can be heavy, we only send a single - // byte to indicate the success. + // Since a compiled artifact can be heavy, we send an empty + // `Ok` to indicate the success. let artifact_bytes = compiled_artifact.encode(); tracing::debug!( @@ -301,11 +304,11 @@ pub fn worker_entrypoint(socket_path: &str) { ); async_std::fs::write(&dest, &artifact_bytes).await?; - vec![1u8] + Ok(()) }, }; - framed_send(&mut stream, &bytes).await?; + framed_send(&mut stream, result.encode().as_slice()).await?; } }); } From 7c0b8ed76b00c4412ab3ced6cf4fd9b878c505e5 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 27 Sep 2021 13:37:57 +0300 Subject: [PATCH 04/10] Set the artifact state depending on the result --- node/core/pvf/src/host.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index b7de6f5997c2..3a41dc83876a 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -519,7 +519,6 @@ async fn handle_prepare_done( // Don't send failed artifacts to the execution's queue. if let Err(ref error) = result { - *state = ArtifactState::FailedToProcess(error.clone()); let _ = result_tx.send(Err(ValidationError::from(error.clone()))); continue } @@ -535,8 +534,10 @@ async fn handle_prepare_done( .await?; } - // Now consider the artifact prepared. - *state = ArtifactState::Prepared { last_time_needed: SystemTime::now() }; + *state = match result { + Ok(()) => ArtifactState::Prepared { last_time_needed: SystemTime::now() }, + Err(error) => ArtifactState::FailedToProcess(error.clone()), + }; Ok(()) } From 11de2079dd6e6fd68f9edfd9fdabf392f45f9ea6 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 27 Sep 2021 13:46:34 +0300 Subject: [PATCH 05/10] Return the result of PVF preparation directly --- node/core/pvf/src/artifacts.rs | 10 ---------- node/core/pvf/src/prepare/worker.rs | 14 +++++++------- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs index f158dbfa6c8b..45a6a7b822d5 100644 --- a/node/core/pvf/src/artifacts.rs +++ b/node/core/pvf/src/artifacts.rs @@ -51,16 +51,6 @@ impl AsRef<[u8]> for CompiledArtifact { } } -/// A final product of preparation process. Contains either a ready to run compiled artifact or -/// a description what went wrong. -#[derive(Encode, Decode)] -pub enum Artifact { - /// An error occurred during the prepare part of the PVF pipeline. - Error(PrepareError), - /// The PVF passed all the checks and is ready for execution. - Compiled(CompiledArtifact), -} - /// Identifier of an artifact. Right now it only encodes a code hash of the PVF. But if we get to /// multiple engine implementations the artifact ID should include the engine type as well. #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index b49f6435c2cd..8f7c51e2bac8 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -15,7 +15,7 @@ // along with Polkadot. If not, see . use crate::{ - artifacts::{Artifact, CompiledArtifact, PrepareError}, + artifacts::{CompiledArtifact, PrepareError}, worker_common::{ bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, @@ -286,11 +286,11 @@ pub fn worker_entrypoint(socket_path: &str) { ); let result = match prepare_artifact(&code) { - Artifact::Error(err) => { + Err(err) => { // Serialized error will be written into the socket. Err(err) }, - Artifact::Compiled(compiled_artifact) => { + Ok(compiled_artifact) => { // Write the serialized artifact into a temp file. // Since a compiled artifact can be heavy, we send an empty // `Ok` to indicate the success. @@ -313,14 +313,14 @@ pub fn worker_entrypoint(socket_path: &str) { }); } -fn prepare_artifact(code: &[u8]) -> Artifact { +fn prepare_artifact(code: &[u8]) -> Result { let blob = match crate::executor_intf::prevalidate(code) { - Err(err) => return Artifact::Error(PrepareError::Prevalidation(format!("{:?}", err))), + Err(err) => return Err(PrepareError::Prevalidation(format!("{:?}", err))), Ok(b) => b, }; match crate::executor_intf::prepare(blob) { - Ok(compiled_artifact) => Artifact::Compiled(CompiledArtifact::new(compiled_artifact)), - Err(err) => Artifact::Error(PrepareError::Preparation(format!("{:?}", err))), + Ok(compiled_artifact) => Ok(CompiledArtifact::new(compiled_artifact)), + Err(err) => Err(PrepareError::Preparation(format!("{:?}", err))), } } From d323ac6dc64dc8b80f7b940c3ac45bdef3c6f6b1 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 27 Sep 2021 13:51:17 +0300 Subject: [PATCH 06/10] Move PrepareError to the error module --- node/core/pvf/src/artifacts.rs | 13 +------------ node/core/pvf/src/error.rs | 14 +++++++++++++- node/core/pvf/src/prepare/pool.rs | 2 +- node/core/pvf/src/prepare/queue.rs | 4 +--- node/core/pvf/src/prepare/worker.rs | 3 ++- 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/node/core/pvf/src/artifacts.rs b/node/core/pvf/src/artifacts.rs index 45a6a7b822d5..e34ced5969bf 100644 --- a/node/core/pvf/src/artifacts.rs +++ b/node/core/pvf/src/artifacts.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use crate::error::PrepareError; use always_assert::always; use async_std::path::{Path, PathBuf}; use parity_scale_codec::{Decode, Encode}; @@ -23,18 +24,6 @@ use std::{ time::{Duration, SystemTime}, }; -/// An error that occurred during the prepare part of the PVF pipeline. -#[derive(Debug, Clone, Encode, Decode)] -pub enum PrepareError { - /// During the prevalidation stage of preparation an issue was found with the PVF. - Prevalidation(String), - /// Compilation failed for the given PVF. - Preparation(String), - /// This state indicates that the process assigned to prepare the artifact wasn't responsible - /// or were killed. This state is reported by the validation host (not by the worker). - DidNotMakeIt, -} - /// A wrapper for the compiled PVF code. #[derive(Encode, Decode)] pub struct CompiledArtifact(Vec); diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs index d82564ca681d..8afd0ddddb4b 100644 --- a/node/core/pvf/src/error.rs +++ b/node/core/pvf/src/error.rs @@ -14,7 +14,19 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use crate::artifacts::PrepareError; +use parity_scale_codec::{Decode, Encode}; + +/// An error that occurred during the prepare part of the PVF pipeline. +#[derive(Debug, Clone, Encode, Decode)] +pub enum PrepareError { + /// During the prevalidation stage of preparation an issue was found with the PVF. + Prevalidation(String), + /// Compilation failed for the given PVF. + Preparation(String), + /// This state indicates that the process assigned to prepare the artifact wasn't responsible + /// or were killed. This state is reported by the validation host (not by the worker). + DidNotMakeIt, +} /// A error raised during validation of the candidate. #[derive(Debug, Clone)] diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index e0dae299d318..f4e13523fa19 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -16,7 +16,7 @@ use super::worker::{self, Outcome}; use crate::{ - artifacts::PrepareError, + error::PrepareError, metrics::Metrics, worker_common::{IdleWorker, WorkerHandle}, LOG_TARGET, diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index 076a2b351a1e..5a0de73cf595 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -18,9 +18,7 @@ use super::pool::{self, Worker}; use crate::{ - artifacts::{ArtifactId, PrepareError}, - metrics::Metrics, - Priority, Pvf, LOG_TARGET, + artifacts::ArtifactId, error::PrepareError, metrics::Metrics, Priority, Pvf, LOG_TARGET, }; use always_assert::{always, never}; use async_std::path::PathBuf; diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 8f7c51e2bac8..89e2f5d092f2 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -15,7 +15,8 @@ // along with Polkadot. If not, see . use crate::{ - artifacts::{CompiledArtifact, PrepareError}, + artifacts::CompiledArtifact, + error::PrepareError, worker_common::{ bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path, tmpfile_in, worker_event_loop, IdleWorker, SpawnErr, WorkerHandle, From f129cd640cc8595a97e0d7c855a58bc3a4573b1e Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 27 Sep 2021 14:08:44 +0300 Subject: [PATCH 07/10] Update doc comments --- node/core/pvf/src/execute/worker.rs | 4 ++-- node/core/pvf/src/prepare/pool.rs | 5 +++-- node/core/pvf/src/prepare/queue.rs | 2 +- node/core/pvf/src/prepare/worker.rs | 4 +--- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/node/core/pvf/src/execute/worker.rs b/node/core/pvf/src/execute/worker.rs index 1dd61442b931..d3c36ed924a6 100644 --- a/node/core/pvf/src/execute/worker.rs +++ b/node/core/pvf/src/execute/worker.rs @@ -51,8 +51,8 @@ pub enum Outcome { /// PVF execution completed successfully and the result is returned. The worker is ready for /// another job. Ok { result_descriptor: ValidationResult, duration_ms: u64, idle_worker: IdleWorker }, - /// The candidate validation failed. It may be for example because the preparation process - /// produced an error or the wasm execution triggered a trap. + /// The candidate validation failed. It may be for example because the wasm execution triggered a trap. + /// Errors related to the preparation process are not expected to be encountered by the execution workers. InvalidCandidate { err: String, idle_worker: IdleWorker }, /// An internal error happened during the validation. Such an error is most likely related to /// some transient glitch. diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index f4e13523fa19..f03d18932d22 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -79,8 +79,9 @@ pub enum FromPool { /// The given worker was just spawned and is ready to be used. Spawned(Worker), - /// The given worker either succeeded or failed the given job. Under any circumstances the - /// artifact file has been written. The `bool` says whether the worker ripped. + /// The given worker either succeeded or failed the given job. + /// The artifact file has only been written if the preparation was successful. + /// The `bool` says whether the worker ripped. Concluded { worker: Worker, rip: bool, diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index 5a0de73cf595..cfa360cc9528 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -31,7 +31,7 @@ pub enum ToQueue { /// This schedules preparation of the given PVF. /// /// Note that it is incorrect to enqueue the same PVF again without first receiving the - /// [`FromQueue::Prepared`] response. In case there is a need to bump the priority, use + /// [`FromQueue`] response. In case there is a need to bump the priority, use /// [`ToQueue::Amend`]. Enqueue { priority: Priority, pvf: Pvf }, /// Amends the priority for the given [`ArtifactId`] if it is running. If it's not, then it's noop. diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 89e2f5d092f2..7e9f65d617fe 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -102,9 +102,7 @@ pub async fn start_work( // Wait for the result from the worker, keeping in mind that there may be a timeout, the // worker may get killed, or something along these lines. // - // In that case we should handle these gracefully by writing the artifact file by ourselves. - // We may potentially overwrite the artifact in rare cases where the worker didn't make - // it to report back the result. + // In that case we should propagate the error to the pool. #[derive(Debug)] enum Selected { From cae0540742619781fb7d79e7e767e93eb2dcfc3e Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Mon, 27 Sep 2021 14:17:32 +0300 Subject: [PATCH 08/10] Update misleading comment --- node/core/pvf/src/prepare/worker.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 7e9f65d617fe..a8bb3516e296 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -291,8 +291,12 @@ pub fn worker_entrypoint(socket_path: &str) { }, Ok(compiled_artifact) => { // Write the serialized artifact into a temp file. - // Since a compiled artifact can be heavy, we send an empty - // `Ok` to indicate the success. + // PVF host only keeps artifacts statuses in its memory, + // successfully compiled code gets stored on the disk (and + // consequently deserialized by execute-workers). The prepare + // worker is only required to send an empty `Ok` to the pool + // to indicate the success. + let artifact_bytes = compiled_artifact.encode(); tracing::debug!( From 72918da3b2789079a6af178d825b71b1b6cda6d5 Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Fri, 22 Oct 2021 16:02:38 +0300 Subject: [PATCH 09/10] Cleanup docs --- node/core/pvf/src/prepare/pool.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index f03d18932d22..729f813432f9 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -80,10 +80,10 @@ pub enum FromPool { Spawned(Worker), /// The given worker either succeeded or failed the given job. - /// The artifact file has only been written if the preparation was successful. - /// The `bool` says whether the worker ripped. Concluded { + /// A key for retrieving the worker data from the pool. worker: Worker, + /// Indicates whether the worker process was killed. rip: bool, /// [`Ok`] indicates that compiled artifact is successfully stored on disk. /// Otherwise, an [error](PrepareError) is supplied. From f75af1d2e60b4a88fcb4395b05518b2d6110d2dc Mon Sep 17 00:00:00 2001 From: Chris Sosnin Date: Fri, 22 Oct 2021 16:05:32 +0300 Subject: [PATCH 10/10] Conclude a test job with an error --- node/core/pvf/src/prepare/queue.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index cfa360cc9528..d85e6b8a1422 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -775,7 +775,11 @@ mod tests { assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. }); - test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, result: Ok(()) }); + test.send_from_pool(pool::FromPool::Concluded { + worker: w1, + rip: true, + result: Err(PrepareError::DidNotMakeIt), + }); test.poll_ensure_to_pool_is_empty().await; }