diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index 74610bc113ec..70fc24eacade 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -320,12 +320,12 @@ where match validation_backend.precheck_pvf(validation_code).await { Ok(_) => PreCheckOutcome::Valid, - Err(prepare_err) => match prepare_err { - PrepareError::Prevalidation(_) | - PrepareError::Preparation(_) | - PrepareError::Panic(_) => PreCheckOutcome::Invalid, - PrepareError::TimedOut | PrepareError::DidNotMakeIt => PreCheckOutcome::Failed, - }, + Err(prepare_err) => + if prepare_err.is_deterministic() { + PreCheckOutcome::Invalid + } else { + PreCheckOutcome::Failed + }, } } @@ -667,10 +667,11 @@ impl ValidationBackend for ValidationHost { async fn precheck_pvf(&mut self, pvf: Pvf) -> Result { let (tx, rx) = oneshot::channel(); if let Err(_) = self.precheck_pvf(pvf, tx).await { - return Err(PrepareError::DidNotMakeIt) + // Return an IO error if there was an error communicating with the host. + return Err(PrepareError::IoErr) } - let precheck_result = rx.await.or(Err(PrepareError::DidNotMakeIt))?; + let precheck_result = rx.await.or(Err(PrepareError::IoErr))?; precheck_result } diff --git a/node/core/candidate-validation/src/tests.rs b/node/core/candidate-validation/src/tests.rs index 5ac93bc7d1f4..c6003c734973 100644 --- a/node/core/candidate-validation/src/tests.rs +++ b/node/core/candidate-validation/src/tests.rs @@ -1053,5 +1053,5 @@ fn precheck_properly_classifies_outcomes() { inner(Err(PrepareError::Panic("baz".to_owned())), PreCheckOutcome::Invalid); inner(Err(PrepareError::TimedOut), PreCheckOutcome::Failed); - inner(Err(PrepareError::DidNotMakeIt), PreCheckOutcome::Failed); + inner(Err(PrepareError::IoErr), PreCheckOutcome::Failed); } diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs index ddcdb2561cfd..01d8c78d39ca 100644 --- a/node/core/pvf/src/error.rs +++ b/node/core/pvf/src/error.rs @@ -15,7 +15,7 @@ // along with Polkadot. If not, see . use parity_scale_codec::{Decode, Encode}; -use std::{any::Any, time::Duration}; +use std::{any::Any, fmt, time::Duration}; /// Result of PVF preparation performed by the validation host. Contains the elapsed CPU time if /// successful @@ -32,9 +32,46 @@ pub enum PrepareError { Panic(String), /// Failed to prepare the PVF due to the time limit. TimedOut, - /// This state indicates that the process assigned to prepare the artifact wasn't responsible - /// or were killed. This state is reported by the validation host (not by the worker). - DidNotMakeIt, + /// An IO error occurred while receiving the result from the worker process. This state is reported by the + /// validation host (not by the worker). + IoErr, + /// The temporary file for the artifact could not be created at the given cache path. This state is reported by the + /// validation host (not by the worker). + CreateTmpFileErr(String), + /// The response from the worker is received, but the file cannot be renamed (moved) to the final destination + /// location. This state is reported by the validation host (not by the worker). + RenameTmpFileErr(String), +} + +impl PrepareError { + /// Returns whether this is a deterministic error, i.e. one that should trigger reliably. Those + /// errors depend on the PVF itself and the sc-executor/wasmtime logic. + /// + /// Non-deterministic errors can happen spuriously. Typically, they occur due to resource + /// starvation, e.g. under heavy load or memory pressure. Those errors are typically transient + /// but may persist e.g. if the node is run by overwhelmingly underpowered machine. + pub fn is_deterministic(&self) -> bool { + use PrepareError::*; + match self { + Prevalidation(_) | Preparation(_) | Panic(_) => true, + TimedOut | IoErr | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false, + } + } +} + +impl fmt::Display for PrepareError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use PrepareError::*; + match self { + Prevalidation(err) => write!(f, "prevalidation: {}", err), + Preparation(err) => write!(f, "preparation: {}", err), + Panic(err) => write!(f, "panic: {}", err), + TimedOut => write!(f, "prepare: timeout"), + IoErr => write!(f, "prepare: io error while receiving response"), + CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err), + RenameTmpFileErr(err) => write!(f, "prepare: error renaming tmp file: {}", err), + } + } } /// A error raised during validation of the candidate. @@ -81,32 +118,17 @@ pub enum InvalidCandidate { impl From for ValidationError { fn from(error: PrepareError) -> Self { // Here we need to classify the errors into two errors: deterministic and non-deterministic. + // See [`PrepareError::is_deterministic`]. // - // Non-deterministic errors can happen spuriously. Typically, they occur due to resource - // starvation, e.g. under heavy load or memory pressure. Those errors are typically transient - // but may persist e.g. if the node is run by overwhelmingly underpowered machine. - // - // Deterministic errors should trigger reliably. Those errors depend on the PVF itself and - // the sc-executor/wasmtime logic. - // - // For now, at least until the PVF pre-checking lands, the deterministic errors will be - // treated as `InvalidCandidate`. Should those occur they could potentially trigger disputes. + // We treat the deterministic errors as `InvalidCandidate`. Should those occur they could + // potentially trigger disputes. // // All non-deterministic errors are qualified as `InternalError`s and will not trigger // disputes. - match error { - PrepareError::Prevalidation(err) => ValidationError::InvalidCandidate( - InvalidCandidate::PrepareError(format!("prevalidation: {}", err)), - ), - PrepareError::Preparation(err) => ValidationError::InvalidCandidate( - InvalidCandidate::PrepareError(format!("preparation: {}", err)), - ), - PrepareError::Panic(err) => ValidationError::InvalidCandidate( - InvalidCandidate::PrepareError(format!("panic: {}", err)), - ), - PrepareError::TimedOut => ValidationError::InternalError("prepare: timeout".to_owned()), - PrepareError::DidNotMakeIt => - ValidationError::InternalError("prepare: did not make it".to_owned()), + if error.is_deterministic() { + ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(error.to_string())) + } else { + ValidationError::InternalError(error.to_string()) } } } diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 0f2e2b839a80..d7823ac44c77 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -776,16 +776,15 @@ fn can_retry_prepare_after_failure( num_failures: u32, error: &PrepareError, ) -> bool { - use PrepareError::*; - match error { - // Gracefully returned an error, so it will probably be reproducible. Don't retry. - Prevalidation(_) | Preparation(_) => false, - // Retry if the retry cooldown has elapsed and if we have already retried less than - // `NUM_PREPARE_RETRIES` times. IO errors may resolve themselves. - Panic(_) | TimedOut | DidNotMakeIt => - SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN && - num_failures <= NUM_PREPARE_RETRIES, + if error.is_deterministic() { + // This error is considered deterministic, so it will probably be reproducible. Don't retry. + return false } + + // Retry if the retry cooldown has elapsed and if we have already retried less than `NUM_PREPARE_RETRIES` times. IO + // errors may resolve themselves. + SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN && + num_failures <= NUM_PREPARE_RETRIES } /// A stream that yields a pulse continuously at a given interval. diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 306588eb429a..3319d44e7fb4 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -22,7 +22,6 @@ use crate::{ LOG_TARGET, }; use always_assert::never; -use assert_matches::assert_matches; use async_std::path::{Path, PathBuf}; use futures::{ channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt, @@ -232,7 +231,7 @@ fn handle_to_pool( // items concluded; // thus idle token is Some; // qed. - never!("unexpected abscence of the idle token in prepare pool"); + never!("unexpected absence of the idle token in prepare pool"); } } else { // That's a relatively normal situation since the queue may send `start_work` and @@ -294,29 +293,28 @@ fn handle_mux( Ok(()) }, PoolEvent::StartWork(worker, outcome) => { - // If we receive any outcome other than `Concluded`, we attempt to kill the worker - // process. + // If we receive an outcome that the worker is unreachable or that an error occurred on + // the worker, we attempt to kill the worker process. match outcome { - Outcome::Concluded { worker: idle, result } => { - let data = match spawned.get_mut(worker) { - None => { - // Perhaps the worker was killed meanwhile and the result is no longer - // relevant. We already send `Rip` when purging if we detect that the - // worker is dead. - return Ok(()) - }, - Some(data) => data, - }; - - // We just replace the idle worker that was loaned from this option during - // the work starting. - let old = data.idle.replace(idle); - assert_matches!(old, None, "attempt to overwrite an idle worker"); - - reply(from_pool, FromPool::Concluded { worker, rip: false, result })?; - - Ok(()) - }, + Outcome::Concluded { worker: idle, result } => + handle_concluded_no_rip(from_pool, spawned, worker, idle, result), + // Return `Concluded`, but do not kill the worker since the error was on the host side. + Outcome::CreateTmpFileErr { worker: idle, err } => handle_concluded_no_rip( + from_pool, + spawned, + worker, + idle, + Err(PrepareError::CreateTmpFileErr(err)), + ), + // Return `Concluded`, but do not kill the worker since the error was on the host side. + Outcome::RenameTmpFileErr { worker: idle, result: _, err } => + handle_concluded_no_rip( + from_pool, + spawned, + worker, + idle, + Err(PrepareError::RenameTmpFileErr(err)), + ), Outcome::Unreachable => { if attempt_retire(metrics, spawned, worker) { reply(from_pool, FromPool::Rip(worker))?; @@ -324,14 +322,14 @@ fn handle_mux( Ok(()) }, - Outcome::DidNotMakeIt => { + Outcome::IoErr => { if attempt_retire(metrics, spawned, worker) { reply( from_pool, FromPool::Concluded { worker, rip: true, - result: Err(PrepareError::DidNotMakeIt), + result: Err(PrepareError::IoErr), }, )?; } @@ -380,6 +378,40 @@ fn attempt_retire( } } +/// Handles the case where we received a response. There potentially was an error, but not the fault +/// of the worker as far as we know, so the worker should not be killed. +/// +/// This function tries to put the idle worker back into the pool and then replies with +/// `FromPool::Concluded` with `rip: false`. +fn handle_concluded_no_rip( + from_pool: &mut mpsc::UnboundedSender, + spawned: &mut HopSlotMap, + worker: Worker, + idle: IdleWorker, + result: PrepareResult, +) -> Result<(), Fatal> { + let data = match spawned.get_mut(worker) { + None => { + // Perhaps the worker was killed meanwhile and the result is no longer relevant. We + // already send `Rip` when purging if we detect that the worker is dead. + return Ok(()) + }, + Some(data) => data, + }; + + // We just replace the idle worker that was loaned from this option during + // the work starting. + let old = data.idle.replace(idle); + never!( + old.is_some(), + "old idle worker was taken out when starting work; we only replace it here; qed" + ); + + reply(from_pool, FromPool::Concluded { worker, rip: false, result })?; + + Ok(()) +} + /// Spins up the pool and returns the future that should be polled to make the pool functional. pub fn start( metrics: Metrics, diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index df0a8ec41883..e78351af9839 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -761,7 +761,7 @@ mod tests { test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, - result: Err(PrepareError::DidNotMakeIt), + result: Err(PrepareError::IoErr), }); test.poll_ensure_to_pool_is_empty().await; } diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 91361eacaf26..5b4212e1e313 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -59,28 +59,26 @@ pub enum Outcome { /// The host tried to reach the worker but failed. This is most likely because the worked was /// killed by the system. Unreachable, + /// The temporary file for the artifact could not be created at the given cache path. + CreateTmpFileErr { worker: IdleWorker, err: String }, + /// The response from the worker is received, but the file cannot be renamed (moved) to the + /// final destination location. + RenameTmpFileErr { worker: IdleWorker, result: PrepareResult, err: String }, /// The worker failed to finish the job until the given deadline. /// /// The worker is no longer usable and should be killed. TimedOut, - /// The execution was interrupted abruptly and the worker is not available anymore. + /// An IO error occurred while receiving the result from the worker process. /// /// This doesn't return an idle worker instance, thus this worker is no longer usable. - DidNotMakeIt, -} - -#[derive(Debug)] -enum Selected { - Done(PrepareResult), IoErr, - Deadline, } /// Given the idle token of a worker and parameters of work, communicates with the worker and /// returns the outcome. /// -/// NOTE: Returning the `TimedOut` or `DidNotMakeIt` errors will trigger the child process being -/// killed. +/// NOTE: Returning the `TimedOut`, `IoErr` or `Unreachable` outcomes will trigger the child process +/// being killed. pub async fn start_work( worker: IdleWorker, code: Arc>, @@ -97,7 +95,7 @@ pub async fn start_work( artifact_path.display(), ); - with_tmp_file(pid, cache_path, |tmp_file| async move { + with_tmp_file(stream.clone(), pid, cache_path, |tmp_file| async move { if let Err(err) = send_request(&mut stream, code, &tmp_file, preparation_timeout).await { gum::warn!( target: LOG_TARGET, @@ -120,10 +118,11 @@ pub async fn start_work( let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR; let result = async_std::future::timeout(timeout, framed_recv(&mut stream)).await; - let selected = match result { + match result { // Received bytes from worker within the time limit. Ok(Ok(response_bytes)) => handle_response_bytes( + IdleWorker { stream, pid }, response_bytes, pid, tmp_file, @@ -139,7 +138,7 @@ pub async fn start_work( "failed to recv a prepare response: {:?}", err, ); - Selected::IoErr + Outcome::IoErr }, Err(_) => { // Timed out here on the host. @@ -148,18 +147,8 @@ pub async fn start_work( worker_pid = %pid, "did not recv a prepare response within the time limit", ); - Selected::Deadline + Outcome::TimedOut }, - }; - - // NOTE: A `TimedOut` or `DidNotMakeIt` error triggers the child process being killed. - match selected { - // Timed out on the child. This should already be logged by the child. - Selected::Done(Err(PrepareError::TimedOut)) => Outcome::TimedOut, - Selected::Done(result) => - Outcome::Concluded { worker: IdleWorker { stream, pid }, result }, - Selected::Deadline => Outcome::TimedOut, - Selected::IoErr => Outcome::DidNotMakeIt, } }) .await @@ -170,12 +159,13 @@ pub async fn start_work( /// NOTE: Here we know the artifact exists, but is still located in a temporary file which will be /// cleared by `with_tmp_file`. async fn handle_response_bytes( + worker: IdleWorker, response_bytes: Vec, pid: u32, tmp_file: PathBuf, artifact_path: PathBuf, preparation_timeout: Duration, -) -> Selected { +) -> Outcome { // By convention we expect encoded `PrepareResult`. let result = match PrepareResult::decode(&mut response_bytes.as_slice()) { Ok(result) => result, @@ -188,12 +178,14 @@ async fn handle_response_bytes( "received unexpected response from the prepare worker: {}", HexDisplay::from(&bound_bytes), ); - return Selected::IoErr + return Outcome::IoErr }, }; let cpu_time_elapsed = match result { Ok(result) => result, - Err(_) => return Selected::Done(result), + // Timed out on the child. This should already be logged by the child. + Err(PrepareError::TimedOut) => return Outcome::TimedOut, + Err(_) => return Outcome::Concluded { worker, result }, }; if cpu_time_elapsed > preparation_timeout { @@ -208,7 +200,10 @@ async fn handle_response_bytes( ); // Return a timeout error. - return Selected::Deadline + // + // NOTE: The artifact exists, but is located in a temporary file which + // will be cleared by `with_tmp_file`. + return Outcome::TimedOut } gum::debug!( @@ -219,10 +214,9 @@ async fn handle_response_bytes( artifact_path.display(), ); - async_std::fs::rename(&tmp_file, &artifact_path) - .await - .map(|_| Selected::Done(result)) - .unwrap_or_else(|err| { + match async_std::fs::rename(&tmp_file, &artifact_path).await { + Ok(_) => Outcome::Concluded { worker, result }, + Err(err) => { gum::warn!( target: LOG_TARGET, worker_pid = %pid, @@ -231,15 +225,16 @@ async fn handle_response_bytes( artifact_path.display(), err, ); - Selected::IoErr - }) + Outcome::RenameTmpFileErr { worker, result, err: format!("{:?}", err) } + }, + } } /// Create a temporary file for an artifact at the given cache path and execute the given /// future/closure passing the file path in. /// /// The function will try best effort to not leave behind the temporary file. -async fn with_tmp_file(pid: u32, cache_path: &Path, f: F) -> Outcome +async fn with_tmp_file(stream: UnixStream, pid: u32, cache_path: &Path, f: F) -> Outcome where Fut: futures::Future, F: FnOnce(PathBuf) -> Fut, @@ -253,7 +248,10 @@ where "failed to create a temp file for the artifact: {:?}", err, ); - return Outcome::DidNotMakeIt + return Outcome::CreateTmpFileErr { + worker: IdleWorker { stream, pid }, + err: format!("{:?}", err), + } }, }; diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index f9eaf42dcf67..e052bd77ed06 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -19,6 +19,7 @@ use crate::{execute::ExecuteResponse, PrepareError, LOG_TARGET}; use async_std::{ io, + net::Shutdown, os::unix::net::{UnixListener, UnixStream}, path::{Path, PathBuf}, }; @@ -185,7 +186,19 @@ where let stream = UnixStream::connect(socket_path).await?; let _ = async_std::fs::remove_file(socket_path).await; - event_loop(stream).await + let result = event_loop(stream.clone()).await; + + if let Err(err) = stream.shutdown(Shutdown::Both) { + // Log, but don't return error here, as it may shadow any error from `event_loop`. + gum::debug!( + target: LOG_TARGET, + "error shutting down stream at path {}: {}", + socket_path, + err + ); + } + + result }) .unwrap_err(); // it's never `Ok` because it's `Ok(Never)`