Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
pvf host: store only compiled artifacts on disk
Browse files Browse the repository at this point in the history
  • Loading branch information
slumber committed Sep 22, 2021
1 parent c406f7b commit f0943e6
Show file tree
Hide file tree
Showing 7 changed files with 162 additions and 101 deletions.
46 changes: 30 additions & 16 deletions node/core/pvf/src/artifacts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,44 @@ use std::{
time::{Duration, SystemTime},
};

/// A final product of preparation process. Contains either a ready to run compiled artifact or
/// a description what went wrong.
#[derive(Encode, Decode)]
pub enum Artifact {
/// An error that occurred during the prepare part of the PVF pipeline.
#[derive(Debug, Clone, Encode, Decode)]
pub enum PrepareError {
/// During the prevalidation stage of preparation an issue was found with the PVF.
PrevalidationErr(String),
Prevalidation(String),
/// Compilation failed for the given PVF.
PreparationErr(String),
Preparation(String),
/// This state indicates that the process assigned to prepare the artifact wasn't responsible
/// or were killed. This state is reported by the validation host (not by the worker).
DidntMakeIt,
/// The PVF passed all the checks and is ready for execution.
Compiled { compiled_artifact: Vec<u8> },
DidNotMakeIt,
}

impl Artifact {
/// Serializes this struct into a byte buffer.
pub fn serialize(&self) -> Vec<u8> {
self.encode()
/// A wrapper for the compiled PVF code.
#[derive(Encode, Decode)]
pub struct CompiledArtifact(Vec<u8>);

impl CompiledArtifact {
pub fn new(code: Vec<u8>) -> Self {
Self(code)
}
}

/// Deserialize the given byte buffer to an artifact.
pub fn deserialize(mut bytes: &[u8]) -> Result<Self, String> {
Artifact::decode(&mut bytes).map_err(|e| format!("{:?}", e))
impl AsRef<[u8]> for CompiledArtifact {
fn as_ref(&self) -> &[u8] {
self.0.as_slice()
}
}

/// A final product of preparation process. Contains either a ready to run compiled artifact or
/// a description what went wrong.
#[derive(Encode, Decode)]
pub enum Artifact {
/// An error occurred during the prepare part of the PVF pipeline.
Error(PrepareError),
/// The PVF passed all the checks and is ready for execution.
Compiled(CompiledArtifact),
}

/// Identifier of an artifact. Right now it only encodes a code hash of the PVF. But if we get to
/// multiple engine implementations the artifact ID should include the engine type as well.
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)]
Expand Down Expand Up @@ -117,6 +128,9 @@ pub enum ArtifactState {
},
/// A task to prepare this artifact is scheduled.
Preparing,
/// The code couldn't be compiled due to an error. Such artifacts
/// never reach the executor and stay in the host's memory.
FailedToProcess(PrepareError),
}

/// A container of all known artifact ids and their states.
Expand Down
13 changes: 13 additions & 0 deletions node/core/pvf/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::artifacts::PrepareError;

/// A error raised during validation of the candidate.
#[derive(Debug, Clone)]
pub enum ValidationError {
Expand Down Expand Up @@ -54,3 +56,14 @@ pub enum InvalidCandidate {
/// PVF execution (compilation is not included) took more time than was allotted.
HardTimeout,
}

impl From<PrepareError> for ValidationError {
fn from(error: PrepareError) -> Self {
let error_str = match error {
PrepareError::Prevalidation(err) => err,
PrepareError::Preparation(err) => err,
PrepareError::DidNotMakeIt => "preparation timeout".to_owned(),
};
ValidationError::InvalidCandidate(InvalidCandidate::WorkerReportedError(error_str))
}
}
12 changes: 3 additions & 9 deletions node/core/pvf/src/execute/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use crate::{
artifacts::{Artifact, ArtifactPathId},
artifacts::{ArtifactPathId, CompiledArtifact},
executor_intf::TaskExecutor,
worker_common::{
bytes_to_path, framed_recv, framed_send, path_to_bytes, spawn_with_program_path,
Expand Down Expand Up @@ -217,18 +217,12 @@ async fn validate_using_artifact(
Ok(b) => b,
};

let artifact = match Artifact::deserialize(&artifact_bytes) {
let artifact = match CompiledArtifact::decode(&mut artifact_bytes.as_slice()) {
Err(e) => return Response::InternalError(format!("artifact deserialization: {:?}", e)),
Ok(a) => a,
};

let compiled_artifact = match &artifact {
Artifact::PrevalidationErr(msg) => return Response::format_invalid("prevalidation", msg),
Artifact::PreparationErr(msg) => return Response::format_invalid("preparation", msg),
Artifact::DidntMakeIt => return Response::format_invalid("preparation timeout", ""),

Artifact::Compiled { compiled_artifact } => compiled_artifact,
};
let compiled_artifact = artifact.as_ref();

let validation_started_at = Instant::now();
let descriptor_bytes = match unsafe {
Expand Down
31 changes: 24 additions & 7 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,7 @@ async fn run(
.await);
},
from_prepare_queue = from_prepare_queue_rx.next() => {
let prepare::FromQueue::Prepared(artifact_id)
= break_if_fatal!(from_prepare_queue.ok_or(Fatal));
let from_queue = break_if_fatal!(from_prepare_queue.ok_or(Fatal));

// Note that preparation always succeeds.
//
Expand All @@ -344,7 +343,7 @@ async fn run(
&mut artifacts,
&mut to_execute_queue_tx,
&mut awaiting_prepare,
artifact_id,
from_queue,
).await);
},
}
Expand Down Expand Up @@ -419,6 +418,9 @@ async fn handle_execute_pvf(

awaiting_prepare.add(artifact_id, params, result_tx);
},
ArtifactState::FailedToProcess(error) => {
let _ = result_tx.send(Err(ValidationError::from(error.clone())));
},
}
} else {
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
Expand Down Expand Up @@ -450,6 +452,7 @@ async fn handle_heads_up(
// Already preparing. We don't need to send a priority amend either because
// it can't get any lower than the background.
},
ArtifactState::FailedToProcess(_) => {},
}
} else {
// The artifact is unknown: register it and put a background job into the prepare queue.
Expand All @@ -471,8 +474,10 @@ async fn handle_prepare_done(
artifacts: &mut Artifacts,
execute_queue: &mut mpsc::Sender<execute::ToQueue>,
awaiting_prepare: &mut AwaitingPrepare,
artifact_id: ArtifactId,
from_queue: prepare::FromQueue,
) -> Result<(), Fatal> {
let prepare::FromQueue { artifact_id, result } = from_queue;

// Make some sanity checks and extract the current state.
let state = match artifacts.artifact_state_mut(&artifact_id) {
None => {
Expand All @@ -493,9 +498,21 @@ async fn handle_prepare_done(
never!("the artifact is already prepared: {:?}", artifact_id);
return Ok(())
},
Some(ArtifactState::FailedToProcess(_)) => {
// The reasoning is similar to the above, the artifact cannot be
// processed at this point.
never!("the artifact is already processed unsuccessfully: {:?}", artifact_id);
return Ok(())
},
Some(state @ ArtifactState::Preparing) => state,
};

// Don't send failed artifacts to the execution's queue.
if let Err(error) = result {
*state = ArtifactState::FailedToProcess(error);
return Ok(())
}

// It's finally time to dispatch all the execution requests that were waiting for this artifact
// to be prepared.
let pending_requests = awaiting_prepare.take(&artifact_id);
Expand Down Expand Up @@ -895,7 +912,7 @@ mod tests {
);

test.from_prepare_queue_tx
.send(prepare::FromQueue::Prepared(artifact_id(1)))
.send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) })
.await
.unwrap();
let result_tx_pvf_1_1 = assert_matches!(
Expand All @@ -908,7 +925,7 @@ mod tests {
);

test.from_prepare_queue_tx
.send(prepare::FromQueue::Prepared(artifact_id(2)))
.send(prepare::FromQueue { artifact_id: artifact_id(2), result: Ok(()) })
.await
.unwrap();
let result_tx_pvf_2 = assert_matches!(
Expand Down Expand Up @@ -957,7 +974,7 @@ mod tests {
);

test.from_prepare_queue_tx
.send(prepare::FromQueue::Prepared(artifact_id(1)))
.send(prepare::FromQueue { artifact_id: artifact_id(1), result: Ok(()) })
.await
.unwrap();

Expand Down
24 changes: 19 additions & 5 deletions node/core/pvf/src/prepare/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use super::worker::{self, Outcome};
use crate::{
artifacts::PrepareError,
metrics::Metrics,
worker_common::{IdleWorker, WorkerHandle},
LOG_TARGET,
Expand Down Expand Up @@ -80,7 +81,13 @@ pub enum FromPool {

/// The given worker either succeeded or failed the given job. Under any circumstances the
/// artifact file has been written. The `bool` says whether the worker ripped.
Concluded(Worker, bool),
Concluded {
worker: Worker,
rip: bool,
/// [`Ok`] indicates that compiled artifact is successfully stored on disk.
/// Otherwise, an [error](PrepareError) is supplied.
result: Result<(), PrepareError>,
},

/// The given worker ceased to exist.
Rip(Worker),
Expand Down Expand Up @@ -295,7 +302,7 @@ fn handle_mux(
},
PoolEvent::StartWork(worker, outcome) => {
match outcome {
Outcome::Concluded(idle) => {
Outcome::Concluded { worker: idle, result } => {
let data = match spawned.get_mut(worker) {
None => {
// Perhaps the worker was killed meanwhile and the result is no longer
Expand All @@ -310,7 +317,7 @@ fn handle_mux(
let old = data.idle.replace(idle);
assert_matches!(old, None, "attempt to overwrite an idle worker");

reply(from_pool, FromPool::Concluded(worker, false))?;
reply(from_pool, FromPool::Concluded { worker, rip: false, result })?;

Ok(())
},
Expand All @@ -321,9 +328,16 @@ fn handle_mux(

Ok(())
},
Outcome::DidntMakeIt => {
Outcome::DidNotMakeIt => {
if attempt_retire(metrics, spawned, worker) {
reply(from_pool, FromPool::Concluded(worker, true))?;
reply(
from_pool,
FromPool::Concluded {
worker,
rip: true,
result: Err(PrepareError::DidNotMakeIt),
},
)?;
}

Ok(())
Expand Down
42 changes: 23 additions & 19 deletions node/core/pvf/src/prepare/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
//! A queue that handles requests for PVF preparation.

use super::pool::{self, Worker};
use crate::{artifacts::ArtifactId, metrics::Metrics, Priority, Pvf, LOG_TARGET};
use crate::{
artifacts::{ArtifactId, PrepareError},
metrics::Metrics,
Priority, Pvf, LOG_TARGET,
};
use always_assert::{always, never};
use async_std::path::PathBuf;
use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt};
Expand All @@ -37,9 +41,13 @@ pub enum ToQueue {
}

/// A response from queue.
#[derive(Debug, PartialEq, Eq)]
pub enum FromQueue {
Prepared(ArtifactId),
#[derive(Debug)]
pub struct FromQueue {
/// Identifier of an artifact.
pub(crate) artifact_id: ArtifactId,
/// Outcome of the PVF processing. [`Ok`] indicates that compiled artifact
/// is successfully stored on disk. Otherwise, an [error](PrepareError) is supplied.
pub(crate) result: Result<(), PrepareError>,
}

#[derive(Default)]
Expand Down Expand Up @@ -299,7 +307,8 @@ async fn handle_from_pool(queue: &mut Queue, from_pool: pool::FromPool) -> Resul
use pool::FromPool::*;
match from_pool {
Spawned(worker) => handle_worker_spawned(queue, worker).await?,
Concluded(worker, rip) => handle_worker_concluded(queue, worker, rip).await?,
Concluded { worker, rip, result } =>
handle_worker_concluded(queue, worker, rip, result).await?,
Rip(worker) => handle_worker_rip(queue, worker).await?,
}
Ok(())
Expand All @@ -320,6 +329,7 @@ async fn handle_worker_concluded(
queue: &mut Queue,
worker: Worker,
rip: bool,
result: Result<(), PrepareError>,
) -> Result<(), Fatal> {
queue.metrics.prepare_concluded();

Expand Down Expand Up @@ -377,7 +387,7 @@ async fn handle_worker_concluded(
"prepare worker concluded",
);

reply(&mut queue.from_queue_tx, FromQueue::Prepared(artifact_id))?;
reply(&mut queue.from_queue_tx, FromQueue { artifact_id, result })?;

// Figure out what to do with the worker.
if rip {
Expand Down Expand Up @@ -641,12 +651,9 @@ mod tests {

let w = test.workers.insert(());
test.send_from_pool(pool::FromPool::Spawned(w));
test.send_from_pool(pool::FromPool::Concluded(w, false));
test.send_from_pool(pool::FromPool::Concluded { worker: w, rip: false, result: Ok(()) });

assert_eq!(
test.poll_and_recv_from_queue().await,
FromQueue::Prepared(pvf(1).as_artifact_id())
);
assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
}

#[async_std::test]
Expand All @@ -671,7 +678,7 @@ mod tests {
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });

test.send_from_pool(pool::FromPool::Concluded(w1, false));
test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, result: Ok(()) });

assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });

Expand Down Expand Up @@ -704,7 +711,7 @@ mod tests {
// That's a bit silly in this context, but in production there will be an entire pool up
// to the `soft_capacity` of workers and it doesn't matter which one to cull. Either way,
// we just check that edge case of an edge case works.
test.send_from_pool(pool::FromPool::Concluded(w1, false));
test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: false, result: Ok(()) });
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Kill(w1));
}

Expand Down Expand Up @@ -749,15 +756,12 @@ mod tests {
assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });

// Conclude worker 1 and rip it.
test.send_from_pool(pool::FromPool::Concluded(w1, true));
test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, result: Ok(()) });

// Since there is still work, the queue requested one extra worker to spawn to handle the
// remaining enqueued work items.
assert_eq!(test.poll_and_recv_to_pool().await, pool::ToPool::Spawn);
assert_eq!(
test.poll_and_recv_from_queue().await,
FromQueue::Prepared(pvf(1).as_artifact_id())
);
assert_eq!(test.poll_and_recv_from_queue().await.artifact_id, pvf(1).as_artifact_id());
}

#[async_std::test]
Expand All @@ -773,7 +777,7 @@ mod tests {

assert_matches!(test.poll_and_recv_to_pool().await, pool::ToPool::StartWork { .. });

test.send_from_pool(pool::FromPool::Concluded(w1, true));
test.send_from_pool(pool::FromPool::Concluded { worker: w1, rip: true, result: Ok(()) });
test.poll_ensure_to_pool_is_empty().await;
}

Expand Down
Loading

0 comments on commit f0943e6

Please sign in to comment.