Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Add compilation_timeout parameter for PVF preparation job
Browse files Browse the repository at this point in the history
  • Loading branch information
mrcnski committed Oct 11, 2022
1 parent 10d1460 commit ee0abe2
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 21 deletions.
41 changes: 35 additions & 6 deletions node/core/pvf/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ use std::{
time::{Duration, SystemTime},
};

/// The time period after which the precheck preparation worker is considered unresponsive and will
/// be killed.
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
const PRECHECK_COMPILATION_TIMEOUT: Duration = Duration::from_secs(60);

/// The time period after which the execute preparation worker is considered unresponsive and will
/// be killed.
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
const EXECUTE_COMPILATION_TIMEOUT: Duration = Duration::from_secs(180);

/// An alias to not spell the type for the oneshot sender for the PVF execution result.
pub(crate) type ResultSender = oneshot::Sender<Result<ValidationResult, ValidationError>>;

Expand Down Expand Up @@ -92,7 +102,7 @@ impl ValidationHost {

/// Sends a signal to the validation host requesting to prepare a list of the given PVFs.
///
/// This is async to accommodate the fact a possibility of back-pressure. In the vast majority of
/// This is async to accommodate the possibility of back-pressure. In the vast majority of
/// situations this function should return immediately.
///
/// Returns an error if the request cannot be sent to the validation host, i.e. if it shut down.
Expand Down Expand Up @@ -443,8 +453,15 @@ async fn handle_precheck_pvf(
}
} else {
artifacts.insert_preparing(artifact_id, vec![result_sender]);
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf })
.await?;
send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue {
priority: Priority::Normal,
pvf,
compilation_timeout: PRECHECK_COMPILATION_TIMEOUT,
},
)
.await?;
}
Ok(())
}
Expand All @@ -470,7 +487,7 @@ async fn handle_execute_pvf(

if let Some(state) = artifacts.artifact_state_mut(&artifact_id) {
match state {
ArtifactState::Prepared { ref mut last_time_needed } => {
ArtifactState::Prepared { last_time_needed } => {
*last_time_needed = SystemTime::now();

send_execute(
Expand All @@ -495,7 +512,15 @@ async fn handle_execute_pvf(
// Artifact is unknown: register it and enqueue a job with the corresponding priority and
// PVF.
artifacts.insert_preparing(artifact_id.clone(), Vec::new());
send_prepare(prepare_queue, prepare::ToQueue::Enqueue { priority, pvf }).await?;
send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue {
priority,
pvf,
compilation_timeout: EXECUTE_COMPILATION_TIMEOUT,
},
)
.await?;

awaiting_prepare.add(artifact_id, execution_timeout, params, result_tx);
}
Expand Down Expand Up @@ -528,7 +553,11 @@ async fn handle_heads_up(

send_prepare(
prepare_queue,
prepare::ToQueue::Enqueue { priority: Priority::Normal, pvf: active_pvf },
prepare::ToQueue::Enqueue {
priority: Priority::Normal,
pvf: active_pvf,
compilation_timeout: EXECUTE_COMPILATION_TIMEOUT,
},
)
.await?;
}
Expand Down
14 changes: 11 additions & 3 deletions node/core/pvf/src/prepare/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ pub enum ToPool {
///
/// In either case, the worker is considered busy and no further `StartWork` messages should be
/// sent until either `Concluded` or `Rip` message is received.
StartWork { worker: Worker, code: Arc<Vec<u8>>, artifact_path: PathBuf },
StartWork {
worker: Worker,
code: Arc<Vec<u8>>,
artifact_path: PathBuf,
compilation_timeout: Duration,
},
}

/// A message sent from pool to its client.
Expand Down Expand Up @@ -205,7 +210,7 @@ fn handle_to_pool(
metrics.prepare_worker().on_begin_spawn();
mux.push(spawn_worker_task(program_path.to_owned(), spawn_timeout).boxed());
},
ToPool::StartWork { worker, code, artifact_path } => {
ToPool::StartWork { worker, code, artifact_path, compilation_timeout } => {
if let Some(data) = spawned.get_mut(worker) {
if let Some(idle) = data.idle.take() {
let preparation_timer = metrics.time_preparation();
Expand All @@ -216,6 +221,7 @@ fn handle_to_pool(
code,
cache_path.to_owned(),
artifact_path,
compilation_timeout,
preparation_timer,
)
.boxed(),
Expand Down Expand Up @@ -263,9 +269,11 @@ async fn start_work_task<Timer>(
code: Arc<Vec<u8>>,
cache_path: PathBuf,
artifact_path: PathBuf,
compilation_timeout: Duration,
_preparation_timer: Option<Timer>,
) -> PoolEvent {
let outcome = worker::start_work(idle, code, &cache_path, artifact_path).await;
let outcome =
worker::start_work(idle, code, &cache_path, artifact_path, compilation_timeout).await;
PoolEvent::StartWork(worker, outcome)
}

Expand Down
30 changes: 23 additions & 7 deletions node/core/pvf/src/prepare/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ use crate::{artifacts::ArtifactId, metrics::Metrics, PrepareResult, Priority, Pv
use always_assert::{always, never};
use async_std::path::PathBuf;
use futures::{channel::mpsc, stream::StreamExt as _, Future, SinkExt};
use std::collections::{HashMap, VecDeque};
use std::{
collections::{HashMap, VecDeque},
time::Duration,
};

/// A request to pool.
#[derive(Debug)]
Expand All @@ -30,7 +33,7 @@ pub enum ToQueue {
///
/// Note that it is incorrect to enqueue the same PVF again without first receiving the
/// [`FromQueue`] response.
Enqueue { priority: Priority, pvf: Pvf },
Enqueue { priority: Priority, pvf: Pvf, compilation_timeout: Duration },
}

/// A response from queue.
Expand Down Expand Up @@ -76,6 +79,8 @@ struct JobData {
/// The priority of this job. Can be bumped.
priority: Priority,
pvf: Pvf,
/// The timeout for the preparation job.
compilation_timeout: Duration,
worker: Option<Worker>,
}

Expand Down Expand Up @@ -203,18 +208,24 @@ impl Queue {

async fn handle_to_queue(queue: &mut Queue, to_queue: ToQueue) -> Result<(), Fatal> {
match to_queue {
ToQueue::Enqueue { priority, pvf } => {
handle_enqueue(queue, priority, pvf).await?;
ToQueue::Enqueue { priority, pvf, compilation_timeout } => {
handle_enqueue(queue, priority, pvf, compilation_timeout).await?;
},
}
Ok(())
}

async fn handle_enqueue(queue: &mut Queue, priority: Priority, pvf: Pvf) -> Result<(), Fatal> {
async fn handle_enqueue(
queue: &mut Queue,
priority: Priority,
pvf: Pvf,
compilation_timeout: Duration,
) -> Result<(), Fatal> {
gum::debug!(
target: LOG_TARGET,
validation_code_hash = ?pvf.code_hash,
?priority,
?compilation_timeout,
"PVF is enqueued for preparation.",
);
queue.metrics.prepare_enqueued();
Expand All @@ -236,7 +247,7 @@ async fn handle_enqueue(queue: &mut Queue, priority: Priority, pvf: Pvf) -> Resu
return Ok(())
}

let job = queue.jobs.insert(JobData { priority, pvf, worker: None });
let job = queue.jobs.insert(JobData { priority, pvf, compilation_timeout, worker: None });
queue.artifact_id_to_job.insert(artifact_id, job);

if let Some(available) = find_idle_worker(queue) {
Expand Down Expand Up @@ -424,7 +435,12 @@ async fn assign(queue: &mut Queue, worker: Worker, job: Job) -> Result<(), Fatal

send_pool(
&mut queue.to_pool_tx,
pool::ToPool::StartWork { worker, code: job_data.pvf.code.clone(), artifact_path },
pool::ToPool::StartWork {
worker,
code: job_data.pvf.code.clone(),
artifact_path,
compilation_timeout: job_data.compilation_timeout,
},
)
.await?;

Expand Down
7 changes: 2 additions & 5 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@ use parity_scale_codec::{Decode, Encode};
use sp_core::hexdisplay::HexDisplay;
use std::{panic, sync::Arc, time::Duration};

/// The time period after which the preparation worker is considered unresponsive and will be killed.
// NOTE: If you change this make sure to fix the buckets of `pvf_preparation_time` metric.
const COMPILATION_TIMEOUT: Duration = Duration::from_secs(60);

/// Spawns a new worker with the given program path that acts as the worker and the spawn timeout.
///
/// The program should be able to handle `<program-path> prepare-worker <socket-path>` invocation.
Expand Down Expand Up @@ -69,6 +65,7 @@ pub async fn start_work(
code: Arc<Vec<u8>>,
cache_path: &Path,
artifact_path: PathBuf,
compilation_timeout: Duration,
) -> Outcome {
let IdleWorker { mut stream, pid } = worker;

Expand Down Expand Up @@ -103,7 +100,7 @@ pub async fn start_work(
}

let selected =
match async_std::future::timeout(COMPILATION_TIMEOUT, framed_recv(&mut stream)).await {
match async_std::future::timeout(compilation_timeout, framed_recv(&mut stream)).await {
Ok(Ok(response_bytes)) => {
// Received bytes from worker within the time limit.
// By convention we expect encoded `PrepareResult`.
Expand Down

0 comments on commit ee0abe2

Please sign in to comment.