From 21fb0e628b36814e1ec8f1a53c0f031c8f95a4be Mon Sep 17 00:00:00 2001 From: Marcin S Date: Fri, 2 Dec 2022 16:20:46 -0500 Subject: [PATCH 1/8] PVF preparation: do not conflate errors + Adds some more granularity to the prepare errors. + Better distinguish whether errors occur on the host side or the worker. + Do not kill the worker if the error happened on the host side. + Do not retry preparation if the error was `Panic`. + Removes unnecessary indirection with `Selected` type. --- node/core/candidate-validation/src/lib.rs | 19 +++--- node/core/candidate-validation/src/tests.rs | 2 +- node/core/pvf/src/error.rs | 63 ++++++++++++------ node/core/pvf/src/host.rs | 17 +++-- node/core/pvf/src/prepare/pool.rs | 74 +++++++++++++++------ node/core/pvf/src/prepare/worker.rs | 59 ++++++++-------- 6 files changed, 142 insertions(+), 92 deletions(-) diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index 74610bc113ec..38824eeda8ab 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -320,12 +320,12 @@ where match validation_backend.precheck_pvf(validation_code).await { Ok(_) => PreCheckOutcome::Valid, - Err(prepare_err) => match prepare_err { - PrepareError::Prevalidation(_) | - PrepareError::Preparation(_) | - PrepareError::Panic(_) => PreCheckOutcome::Invalid, - PrepareError::TimedOut | PrepareError::DidNotMakeIt => PreCheckOutcome::Failed, - }, + Err(prepare_err) => + if prepare_err.is_deterministic() { + PreCheckOutcome::Invalid + } else { + PreCheckOutcome::Failed + }, } } @@ -666,11 +666,14 @@ impl ValidationBackend for ValidationHost { async fn precheck_pvf(&mut self, pvf: Pvf) -> Result { let (tx, rx) = oneshot::channel(); + // TODO: Why are we always returning a non-deterministic error? This causes the pre-check + // outcome to always be `Failed`, never `Invalid`, thus never triggering disputes according + // to comments in pvf/src/error.rs. Can we change this to return the actual error? if let Err(_) = self.precheck_pvf(pvf, tx).await { - return Err(PrepareError::DidNotMakeIt) + return Err(PrepareError::IoErr) } - let precheck_result = rx.await.or(Err(PrepareError::DidNotMakeIt))?; + let precheck_result = rx.await.or(Err(PrepareError::IoErr))?; precheck_result } diff --git a/node/core/candidate-validation/src/tests.rs b/node/core/candidate-validation/src/tests.rs index 5ac93bc7d1f4..c6003c734973 100644 --- a/node/core/candidate-validation/src/tests.rs +++ b/node/core/candidate-validation/src/tests.rs @@ -1053,5 +1053,5 @@ fn precheck_properly_classifies_outcomes() { inner(Err(PrepareError::Panic("baz".to_owned())), PreCheckOutcome::Invalid); inner(Err(PrepareError::TimedOut), PreCheckOutcome::Failed); - inner(Err(PrepareError::DidNotMakeIt), PreCheckOutcome::Failed); + inner(Err(PrepareError::IoErr), PreCheckOutcome::Failed); } diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs index ddcdb2561cfd..a560db8788c6 100644 --- a/node/core/pvf/src/error.rs +++ b/node/core/pvf/src/error.rs @@ -15,7 +15,7 @@ // along with Polkadot. If not, see . use parity_scale_codec::{Decode, Encode}; -use std::{any::Any, time::Duration}; +use std::{any::Any, fmt, time::Duration}; /// Result of PVF preparation performed by the validation host. Contains the elapsed CPU time if /// successful @@ -32,9 +32,42 @@ pub enum PrepareError { Panic(String), /// Failed to prepare the PVF due to the time limit. TimedOut, - /// This state indicates that the process assigned to prepare the artifact wasn't responsible - /// or were killed. This state is reported by the validation host (not by the worker). - DidNotMakeIt, + /// An IO error occurred while receiving the result from the worker process. This state is reported by the + /// validation host (not by the worker). + IoErr, + /// TODO: Keep handle to actual underlying error. + /// The temporary file for the artifact could not be created at the given cache path. This state is reported by the + /// validation host (not by the worker). + CreateTmpFileErr, + /// TODO: Keep handle to actual underlying error. + /// The response from the worker is received, but the file cannot be renamed (moved) to the final destination + /// location. This state is reported by the validation host (not by the worker). + RenameTmpFileErr, +} + +impl PrepareError { + pub fn is_deterministic(&self) -> bool { + use PrepareError::*; + match self { + Prevalidation(_) | Preparation(_) | Panic(_) => true, + TimedOut | IoErr | CreateTmpFileErr | RenameTmpFileErr => false, + } + } +} + +impl fmt::Display for PrepareError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use PrepareError::*; + match self { + Prevalidation(err) => write!(f, "prevalidation: {}", err), + Preparation(err) => write!(f, "preparation: {}", err), + Panic(err) => write!(f, "panic: {}", err), + TimedOut => write!(f, "prepare: timeout"), + IoErr => write!(f, "prepare: io error while receiving response"), + CreateTmpFileErr => write!(f, "prepare: error creating tmp file"), + RenameTmpFileErr => write!(f, "prepare: error renaming tmp file"), + } + } } /// A error raised during validation of the candidate. @@ -89,24 +122,16 @@ impl From for ValidationError { // Deterministic errors should trigger reliably. Those errors depend on the PVF itself and // the sc-executor/wasmtime logic. // - // For now, at least until the PVF pre-checking lands, the deterministic errors will be - // treated as `InvalidCandidate`. Should those occur they could potentially trigger disputes. + // We treat the deterministic errors as `InvalidCandidate`. Should those occur they could + // potentially trigger disputes. // + // TODO: Is this up-to-date? // All non-deterministic errors are qualified as `InternalError`s and will not trigger // disputes. - match error { - PrepareError::Prevalidation(err) => ValidationError::InvalidCandidate( - InvalidCandidate::PrepareError(format!("prevalidation: {}", err)), - ), - PrepareError::Preparation(err) => ValidationError::InvalidCandidate( - InvalidCandidate::PrepareError(format!("preparation: {}", err)), - ), - PrepareError::Panic(err) => ValidationError::InvalidCandidate( - InvalidCandidate::PrepareError(format!("panic: {}", err)), - ), - PrepareError::TimedOut => ValidationError::InternalError("prepare: timeout".to_owned()), - PrepareError::DidNotMakeIt => - ValidationError::InternalError("prepare: did not make it".to_owned()), + if error.is_deterministic() { + ValidationError::InvalidCandidate(InvalidCandidate::PrepareError(error.to_string())) + } else { + ValidationError::InternalError(error.to_string()) } } } diff --git a/node/core/pvf/src/host.rs b/node/core/pvf/src/host.rs index 483419409448..12969ad72dcc 100644 --- a/node/core/pvf/src/host.rs +++ b/node/core/pvf/src/host.rs @@ -776,16 +776,15 @@ fn can_retry_prepare_after_failure( num_failures: u32, error: &PrepareError, ) -> bool { - use PrepareError::*; - match error { - // Gracefully returned an error, so it will probably be reproducible. Don't retry. - Prevalidation(_) | Preparation(_) => false, - // Retry if the retry cooldown has elapsed and if we have already retried less than - // `NUM_PREPARE_RETRIES` times. IO errors may resolve themselves. - Panic(_) | TimedOut | DidNotMakeIt => - SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN && - num_failures <= NUM_PREPARE_RETRIES, + if error.is_deterministic() { + // This error is considered deterministic, so it will probably be reproducible. Don't retry. + return false } + + // Retry if the retry cooldown has elapsed and if we have already retried less than `NUM_PREPARE_RETRIES` times. IO + // errors may resolve themselves. + SystemTime::now() >= last_time_failed + PREPARE_FAILURE_COOLDOWN && + num_failures <= NUM_PREPARE_RETRIES } /// A stream that yields a pulse continuously at a given interval. diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 9ba64be97555..6eb91c7a27b0 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -232,7 +232,7 @@ fn handle_to_pool( // items concluded; // thus idle token is Some; // qed. - never!("unexpected abscence of the idle token in prepare pool"); + never!("unexpected absence of the idle token in prepare pool"); } } else { // That's a relatively normal situation since the queue may send `start_work` and @@ -295,25 +295,24 @@ fn handle_mux( }, PoolEvent::StartWork(worker, outcome) => { match outcome { - Outcome::Concluded { worker: idle, result } => { - let data = match spawned.get_mut(worker) { - None => { - // Perhaps the worker was killed meanwhile and the result is no longer - // relevant. - return Ok(()) - }, - Some(data) => data, - }; - - // We just replace the idle worker that was loaned from this option during - // the work starting. - let old = data.idle.replace(idle); - assert_matches!(old, None, "attempt to overwrite an idle worker"); - - reply(from_pool, FromPool::Concluded { worker, rip: false, result })?; - - Ok(()) - }, + Outcome::Concluded { worker: idle, result } => + handle_concluded_no_rip(from_pool, spawned, worker, idle, result), + // Return `Concluded`, but do not kill the worker since the error was on the host side. + Outcome::CreateTmpFileErr { worker: idle } => handle_concluded_no_rip( + from_pool, + spawned, + worker, + idle, + Err(PrepareError::CreateTmpFileErr), + ), + // Return `Concluded`, but do not kill the worker since the error was on the host side. + Outcome::RenameTmpFileErr { worker: idle, result: _ } => handle_concluded_no_rip( + from_pool, + spawned, + worker, + idle, + Err(PrepareError::RenameTmpFileErr), + ), Outcome::Unreachable => { if attempt_retire(metrics, spawned, worker) { reply(from_pool, FromPool::Rip(worker))?; @@ -321,14 +320,14 @@ fn handle_mux( Ok(()) }, - Outcome::DidNotMakeIt => { + Outcome::IoErr => { if attempt_retire(metrics, spawned, worker) { reply( from_pool, FromPool::Concluded { worker, rip: true, - result: Err(PrepareError::DidNotMakeIt), + result: Err(PrepareError::IoErr), }, )?; } @@ -377,6 +376,37 @@ fn attempt_retire( } } +/// Handles the case where we received a response. There potentially was an error, but not the fault +/// of the worker as far as we know, so the worker should not be killed. +/// +/// This function tries to put the idle worker back into the pool and then replies with +/// `FromPool::Concluded` with `rip: false`. +fn handle_concluded_no_rip( + from_pool: &mut mpsc::UnboundedSender, + spawned: &mut HopSlotMap, + worker: Worker, + idle: IdleWorker, + result: PrepareResult, +) -> Result<(), Fatal> { + let data = match spawned.get_mut(worker) { + None => { + // Perhaps the worker was killed meanwhile and the result is no longer + // relevant. + return Ok(()) + }, + Some(data) => data, + }; + + // We just replace the idle worker that was loaned from this option during + // the work starting. + let old = data.idle.replace(idle); + assert_matches!(old, None, "attempt to overwrite an idle worker"); + + reply(from_pool, FromPool::Concluded { worker, rip: false, result })?; + + Ok(()) +} + /// Spins up the pool and returns the future that should be polled to make the pool functional. pub fn start( metrics: Metrics, diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 4e0c411e45de..e3b030a40e7f 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -59,21 +59,19 @@ pub enum Outcome { /// The host tried to reach the worker but failed. This is most likely because the worked was /// killed by the system. Unreachable, + /// The temporary file for the artifact could not be created at the given cache path. + CreateTmpFileErr { worker: IdleWorker }, + /// The response from the worker is received, but the file cannot be renamed (moved) to the + /// final destination location. + RenameTmpFileErr { worker: IdleWorker, result: PrepareResult }, /// The worker failed to finish the job until the given deadline. /// /// The worker is no longer usable and should be killed. TimedOut, - /// The execution was interrupted abruptly and the worker is not available anymore. + /// An IO error occurred while receiving the result from the worker process. /// /// This doesn't return an idle worker instance, thus this worker is no longer usable. - DidNotMakeIt, -} - -#[derive(Debug)] -enum Selected { - Done(PrepareResult), IoErr, - Deadline, } /// Given the idle token of a worker and parameters of work, communicates with the worker and @@ -94,7 +92,7 @@ pub async fn start_work( artifact_path.display(), ); - with_tmp_file(pid, cache_path, |tmp_file| async move { + with_tmp_file(stream.clone(), pid, cache_path, |tmp_file| async move { if let Err(err) = send_request(&mut stream, code, &tmp_file, preparation_timeout).await { gum::warn!( target: LOG_TARGET, @@ -117,10 +115,11 @@ pub async fn start_work( let timeout = preparation_timeout * JOB_TIMEOUT_WALL_CLOCK_FACTOR; let result = async_std::future::timeout(timeout, framed_recv(&mut stream)).await; - let selected = match result { + match result { // Received bytes from worker within the time limit. Ok(Ok(response_bytes)) => handle_response_bytes( + IdleWorker { stream, pid }, response_bytes, pid, tmp_file, @@ -136,7 +135,7 @@ pub async fn start_work( "failed to recv a prepare response: {:?}", err, ); - Selected::IoErr + Outcome::IoErr }, Err(_) => { // Timed out here on the host. @@ -145,17 +144,8 @@ pub async fn start_work( worker_pid = %pid, "did not recv a prepare response within the time limit", ); - Selected::Deadline + Outcome::TimedOut }, - }; - - match selected { - // Timed out on the child. This should already be logged by the child. - Selected::Done(Err(PrepareError::TimedOut)) => Outcome::TimedOut, - Selected::Done(result) => - Outcome::Concluded { worker: IdleWorker { stream, pid }, result }, - Selected::Deadline => Outcome::TimedOut, - Selected::IoErr => Outcome::DidNotMakeIt, } }) .await @@ -163,12 +153,13 @@ pub async fn start_work( /// Handles the case where we successfully received response bytes on the host from the child. async fn handle_response_bytes( + worker: IdleWorker, response_bytes: Vec, pid: u32, tmp_file: PathBuf, artifact_path: PathBuf, preparation_timeout: Duration, -) -> Selected { +) -> Outcome { // By convention we expect encoded `PrepareResult`. let result = match PrepareResult::decode(&mut response_bytes.as_slice()) { Ok(result) => result, @@ -181,12 +172,14 @@ async fn handle_response_bytes( "received unexpected response from the prepare worker: {}", HexDisplay::from(&bound_bytes), ); - return Selected::IoErr + return Outcome::IoErr }, }; let cpu_time_elapsed = match result { Ok(result) => result, - Err(_) => return Selected::Done(result), + // Timed out on the child. This should already be logged by the child. + Err(PrepareError::TimedOut) => return Outcome::TimedOut, + Err(_) => return Outcome::Concluded { worker, result }, }; if cpu_time_elapsed > preparation_timeout { @@ -204,7 +197,7 @@ async fn handle_response_bytes( // // NOTE: The artifact exists, but is located in a temporary file which // will be cleared by `with_tmp_file`. - return Selected::Deadline + return Outcome::TimedOut } gum::debug!( @@ -215,10 +208,9 @@ async fn handle_response_bytes( artifact_path.display(), ); - async_std::fs::rename(&tmp_file, &artifact_path) - .await - .map(|_| Selected::Done(result)) - .unwrap_or_else(|err| { + match async_std::fs::rename(&tmp_file, &artifact_path).await { + Ok(_) => Outcome::Concluded { worker, result }, + Err(err) => { gum::warn!( target: LOG_TARGET, worker_pid = %pid, @@ -227,15 +219,16 @@ async fn handle_response_bytes( artifact_path.display(), err, ); - Selected::IoErr - }) + Outcome::RenameTmpFileErr { worker, result } + }, + } } /// Create a temporary file for an artifact at the given cache path and execute the given /// future/closure passing the file path in. /// /// The function will try best effort to not leave behind the temporary file. -async fn with_tmp_file(pid: u32, cache_path: &Path, f: F) -> Outcome +async fn with_tmp_file(stream: UnixStream, pid: u32, cache_path: &Path, f: F) -> Outcome where Fut: futures::Future, F: FnOnce(PathBuf) -> Fut, @@ -249,7 +242,7 @@ where "failed to create a temp file for the artifact: {:?}", err, ); - return Outcome::DidNotMakeIt + return Outcome::CreateTmpFileErr { worker: IdleWorker { stream, pid } } }, }; From 9d4681a640ba30d5e3f503cb40779df6d8c3fdea Mon Sep 17 00:00:00 2001 From: Marcin S Date: Sat, 3 Dec 2022 17:16:28 -0500 Subject: [PATCH 2/8] Add missing docs, resolve TODOs --- node/core/pvf/src/error.rs | 26 ++++++++++++-------------- node/core/pvf/src/prepare/pool.rs | 19 ++++++++++--------- node/core/pvf/src/prepare/worker.rs | 11 +++++++---- node/core/pvf/src/worker_common.rs | 2 ++ 4 files changed, 31 insertions(+), 27 deletions(-) diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs index a560db8788c6..85a4d4f04c7e 100644 --- a/node/core/pvf/src/error.rs +++ b/node/core/pvf/src/error.rs @@ -35,22 +35,26 @@ pub enum PrepareError { /// An IO error occurred while receiving the result from the worker process. This state is reported by the /// validation host (not by the worker). IoErr, - /// TODO: Keep handle to actual underlying error. /// The temporary file for the artifact could not be created at the given cache path. This state is reported by the /// validation host (not by the worker). - CreateTmpFileErr, - /// TODO: Keep handle to actual underlying error. + CreateTmpFileErr(String), /// The response from the worker is received, but the file cannot be renamed (moved) to the final destination /// location. This state is reported by the validation host (not by the worker). - RenameTmpFileErr, + RenameTmpFileErr(String), } impl PrepareError { + /// Returns whether this is a deterministic error, i.e. one that should trigger reliably. Those + /// errors depend on the PVF itself and the sc-executor/wasmtime logic. + /// + /// Non-deterministic errors can happen spuriously. Typically, they occur due to resource + /// starvation, e.g. under heavy load or memory pressure. Those errors are typically transient + /// but may persist e.g. if the node is run by overwhelmingly underpowered machine. pub fn is_deterministic(&self) -> bool { use PrepareError::*; match self { Prevalidation(_) | Preparation(_) | Panic(_) => true, - TimedOut | IoErr | CreateTmpFileErr | RenameTmpFileErr => false, + TimedOut | IoErr | CreateTmpFileErr(_) | RenameTmpFileErr(_) => false, } } } @@ -64,8 +68,8 @@ impl fmt::Display for PrepareError { Panic(err) => write!(f, "panic: {}", err), TimedOut => write!(f, "prepare: timeout"), IoErr => write!(f, "prepare: io error while receiving response"), - CreateTmpFileErr => write!(f, "prepare: error creating tmp file"), - RenameTmpFileErr => write!(f, "prepare: error renaming tmp file"), + CreateTmpFileErr(err) => write!(f, "prepare: error creating tmp file: {}", err), + RenameTmpFileErr(err) => write!(f, "prepare: error renaming tmp file: {}", err), } } } @@ -114,13 +118,7 @@ pub enum InvalidCandidate { impl From for ValidationError { fn from(error: PrepareError) -> Self { // Here we need to classify the errors into two errors: deterministic and non-deterministic. - // - // Non-deterministic errors can happen spuriously. Typically, they occur due to resource - // starvation, e.g. under heavy load or memory pressure. Those errors are typically transient - // but may persist e.g. if the node is run by overwhelmingly underpowered machine. - // - // Deterministic errors should trigger reliably. Those errors depend on the PVF itself and - // the sc-executor/wasmtime logic. + // See [`PrepareError::is_deterministic`]. // // We treat the deterministic errors as `InvalidCandidate`. Should those occur they could // potentially trigger disputes. diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 6eb91c7a27b0..6ff8f258ca42 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -298,21 +298,22 @@ fn handle_mux( Outcome::Concluded { worker: idle, result } => handle_concluded_no_rip(from_pool, spawned, worker, idle, result), // Return `Concluded`, but do not kill the worker since the error was on the host side. - Outcome::CreateTmpFileErr { worker: idle } => handle_concluded_no_rip( + Outcome::CreateTmpFileErr { worker: idle, err } => handle_concluded_no_rip( from_pool, spawned, worker, idle, - Err(PrepareError::CreateTmpFileErr), + Err(PrepareError::CreateTmpFileErr(err)), ), // Return `Concluded`, but do not kill the worker since the error was on the host side. - Outcome::RenameTmpFileErr { worker: idle, result: _ } => handle_concluded_no_rip( - from_pool, - spawned, - worker, - idle, - Err(PrepareError::RenameTmpFileErr), - ), + Outcome::RenameTmpFileErr { worker: idle, result: _, err } => + handle_concluded_no_rip( + from_pool, + spawned, + worker, + idle, + Err(PrepareError::RenameTmpFileErr(err)), + ), Outcome::Unreachable => { if attempt_retire(metrics, spawned, worker) { reply(from_pool, FromPool::Rip(worker))?; diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index e3b030a40e7f..945bcbbd6090 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -60,10 +60,10 @@ pub enum Outcome { /// killed by the system. Unreachable, /// The temporary file for the artifact could not be created at the given cache path. - CreateTmpFileErr { worker: IdleWorker }, + CreateTmpFileErr { worker: IdleWorker, err: String }, /// The response from the worker is received, but the file cannot be renamed (moved) to the /// final destination location. - RenameTmpFileErr { worker: IdleWorker, result: PrepareResult }, + RenameTmpFileErr { worker: IdleWorker, result: PrepareResult, err: String }, /// The worker failed to finish the job until the given deadline. /// /// The worker is no longer usable and should be killed. @@ -219,7 +219,7 @@ async fn handle_response_bytes( artifact_path.display(), err, ); - Outcome::RenameTmpFileErr { worker, result } + Outcome::RenameTmpFileErr { worker, result, err: format!("{:?}", err) } }, } } @@ -242,7 +242,10 @@ where "failed to create a temp file for the artifact: {:?}", err, ); - return Outcome::CreateTmpFileErr { worker: IdleWorker { stream, pid } } + return Outcome::CreateTmpFileErr { + worker: IdleWorker { stream, pid }, + err: format!("{:?}", err), + } }, }; diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index 55c91a64424d..fc5d1d197d82 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -189,6 +189,8 @@ where }) .unwrap_err(); // it's never `Ok` because it's `Ok(Never)` + // TODO: should we call `shutdown` on the stream so that the host is notified? + gum::debug!( target: LOG_TARGET, worker_pid = %std::process::id(), From 79a76d1b08dfadd68d37abe2d4ad5ed56b071d78 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 5 Dec 2022 08:06:32 -0500 Subject: [PATCH 3/8] Address review comments and remove TODOs --- node/core/candidate-validation/src/lib.rs | 7 +------ node/core/pvf/src/error.rs | 1 - node/core/pvf/src/prepare/pool.rs | 5 ++++- node/core/pvf/src/prepare/queue.rs | 2 +- node/core/pvf/src/worker_common.rs | 9 ++++++--- 5 files changed, 12 insertions(+), 12 deletions(-) diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index 38824eeda8ab..5419c3346ec7 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -666,12 +666,7 @@ impl ValidationBackend for ValidationHost { async fn precheck_pvf(&mut self, pvf: Pvf) -> Result { let (tx, rx) = oneshot::channel(); - // TODO: Why are we always returning a non-deterministic error? This causes the pre-check - // outcome to always be `Failed`, never `Invalid`, thus never triggering disputes according - // to comments in pvf/src/error.rs. Can we change this to return the actual error? - if let Err(_) = self.precheck_pvf(pvf, tx).await { - return Err(PrepareError::IoErr) - } + self.precheck_pvf(pvf, tx).await?; let precheck_result = rx.await.or(Err(PrepareError::IoErr))?; diff --git a/node/core/pvf/src/error.rs b/node/core/pvf/src/error.rs index 85a4d4f04c7e..01d8c78d39ca 100644 --- a/node/core/pvf/src/error.rs +++ b/node/core/pvf/src/error.rs @@ -123,7 +123,6 @@ impl From for ValidationError { // We treat the deterministic errors as `InvalidCandidate`. Should those occur they could // potentially trigger disputes. // - // TODO: Is this up-to-date? // All non-deterministic errors are qualified as `InternalError`s and will not trigger // disputes. if error.is_deterministic() { diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 6ff8f258ca42..8af220898176 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -401,7 +401,10 @@ fn handle_concluded_no_rip( // We just replace the idle worker that was loaned from this option during // the work starting. let old = data.idle.replace(idle); - assert_matches!(old, None, "attempt to overwrite an idle worker"); + never!( + old.is_some(), + "old idle worker was taken out when starting work; we only replace it here; qed" + ); reply(from_pool, FromPool::Concluded { worker, rip: false, result })?; diff --git a/node/core/pvf/src/prepare/queue.rs b/node/core/pvf/src/prepare/queue.rs index df0a8ec41883..e78351af9839 100644 --- a/node/core/pvf/src/prepare/queue.rs +++ b/node/core/pvf/src/prepare/queue.rs @@ -761,7 +761,7 @@ mod tests { test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, - result: Err(PrepareError::DidNotMakeIt), + result: Err(PrepareError::IoErr), }); test.poll_ensure_to_pool_is_empty().await; } diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index fc5d1d197d82..23c885153f22 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -19,6 +19,7 @@ use crate::{execute::ExecuteResponse, PrepareError, LOG_TARGET}; use async_std::{ io, + net::Shutdown, os::unix::net::{UnixListener, UnixStream}, path::{Path, PathBuf}, }; @@ -185,12 +186,14 @@ where let stream = UnixStream::connect(socket_path).await?; let _ = async_std::fs::remove_file(socket_path).await; - event_loop(stream).await + let result = event_loop(stream.clone()).await; + + stream.shutdown(Shutdown::Both)?; + + result }) .unwrap_err(); // it's never `Ok` because it's `Ok(Never)` - // TODO: should we call `shutdown` on the stream so that the host is notified? - gum::debug!( target: LOG_TARGET, worker_pid = %std::process::id(), From c410d1c2591ee2030242e2a6280efb392fa18dbc Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 5 Dec 2022 09:40:07 -0500 Subject: [PATCH 4/8] Fix error in CI --- node/core/pvf/src/prepare/pool.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 8af220898176..b60da50bb8d9 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -22,7 +22,6 @@ use crate::{ LOG_TARGET, }; use always_assert::never; -use assert_matches::assert_matches; use async_std::path::{Path, PathBuf}; use futures::{ channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt, From 7ea847e28f193c8b2937da022f5187617942bbe0 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 5 Dec 2022 10:52:12 -0500 Subject: [PATCH 5/8] Undo unnecessary change --- node/core/candidate-validation/src/lib.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/node/core/candidate-validation/src/lib.rs b/node/core/candidate-validation/src/lib.rs index 5419c3346ec7..70fc24eacade 100644 --- a/node/core/candidate-validation/src/lib.rs +++ b/node/core/candidate-validation/src/lib.rs @@ -666,7 +666,10 @@ impl ValidationBackend for ValidationHost { async fn precheck_pvf(&mut self, pvf: Pvf) -> Result { let (tx, rx) = oneshot::channel(); - self.precheck_pvf(pvf, tx).await?; + if let Err(_) = self.precheck_pvf(pvf, tx).await { + // Return an IO error if there was an error communicating with the host. + return Err(PrepareError::IoErr) + } let precheck_result = rx.await.or(Err(PrepareError::IoErr))?; From 39628df70388411c7c0c695910d454a8ec43bd7c Mon Sep 17 00:00:00 2001 From: Marcin S Date: Wed, 7 Dec 2022 10:02:32 -0500 Subject: [PATCH 6/8] Update couple of comments --- node/core/pvf/src/prepare/pool.rs | 4 ++-- node/core/pvf/src/prepare/worker.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/node/core/pvf/src/prepare/pool.rs b/node/core/pvf/src/prepare/pool.rs index 94a1d5e2e8a7..3319d44e7fb4 100644 --- a/node/core/pvf/src/prepare/pool.rs +++ b/node/core/pvf/src/prepare/pool.rs @@ -293,8 +293,8 @@ fn handle_mux( Ok(()) }, PoolEvent::StartWork(worker, outcome) => { - // If we receive any outcome other than `Concluded`, we attempt to kill the worker - // process. + // If we receive an outcome that the worker is unreachable or that an error occurred on + // the worker, we attempt to kill the worker process. match outcome { Outcome::Concluded { worker: idle, result } => handle_concluded_no_rip(from_pool, spawned, worker, idle, result), diff --git a/node/core/pvf/src/prepare/worker.rs b/node/core/pvf/src/prepare/worker.rs index 85a138717c6d..5b4212e1e313 100644 --- a/node/core/pvf/src/prepare/worker.rs +++ b/node/core/pvf/src/prepare/worker.rs @@ -77,8 +77,8 @@ pub enum Outcome { /// Given the idle token of a worker and parameters of work, communicates with the worker and /// returns the outcome. /// -/// NOTE: Returning the `TimedOut` or `DidNotMakeIt` errors will trigger the child process being -/// killed. +/// NOTE: Returning the `TimedOut`, `IoErr` or `Unreachable` outcomes will trigger the child process +/// being killed. pub async fn start_work( worker: IdleWorker, code: Arc>, From 36a8a5fb2bc9019c58fcf221e3f8ca356788f052 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 19 Dec 2022 16:23:54 -0500 Subject: [PATCH 7/8] Don't return error for stream shutdown --- node/core/pvf/src/worker_common.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index 524543eb54be..9c4da2659d27 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -188,7 +188,15 @@ where let result = event_loop(stream.clone()).await; - stream.shutdown(Shutdown::Both)?; + if let Err(err) = stream.shutdown(Shutdown::Both) { + // Log, but don't return error here, as it may shadow any error from `event_loop`. + gum::warn!( + target: LOG_TARGET, + "error shutting down stream at path {}: {}", + socket_path, + err + ); + } result }) From d88190d731e5bf432e97d914975ee8e2306fa7f4 Mon Sep 17 00:00:00 2001 From: Marcin S Date: Mon, 19 Dec 2022 16:40:28 -0500 Subject: [PATCH 8/8] Update node/core/pvf/src/worker_common.rs --- node/core/pvf/src/worker_common.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/pvf/src/worker_common.rs b/node/core/pvf/src/worker_common.rs index 9c4da2659d27..e052bd77ed06 100644 --- a/node/core/pvf/src/worker_common.rs +++ b/node/core/pvf/src/worker_common.rs @@ -190,7 +190,7 @@ where if let Err(err) = stream.shutdown(Shutdown::Both) { // Log, but don't return error here, as it may shadow any error from `event_loop`. - gum::warn!( + gum::debug!( target: LOG_TARGET, "error shutting down stream at path {}: {}", socket_path,