Skip to content

Commit

Permalink
Some fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
mrcnski committed Nov 6, 2023
1 parent 8c7c519 commit af96dfa
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 29 deletions.
8 changes: 8 additions & 0 deletions polkadot/node/core/pvf/execute-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ pub fn worker_entrypoint(
// outside world. The only IPC it should be able to do is sending its
// response over the pipe.
drop(stream);
// Drop the read end so we don't have too many FDs open.
drop(pipe_reader);

handle_child_process(
pipe_writer,
Expand Down Expand Up @@ -367,6 +369,12 @@ fn handle_parent_process(
pipe_read.read_to_end(&mut received_data)?;

let status = nix::sys::wait::waitpid(child, None);
gum::trace!(
target: LOG_TARGET,
%worker_pid,
"execute worker received wait status from job: {:?}",
status,
);

let usage_after = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) {
Ok(usage) => usage,
Expand Down
45 changes: 22 additions & 23 deletions polkadot/node/core/pvf/prepare-worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,8 @@ pub fn worker_entrypoint(
// outside world. The only IPC it should be able to do is sending its
// response over the pipe.
drop(stream);
// Drop the read end so we don't have too many FDs open.
drop(pipe_reader);

handle_child_process(
pvf,
Expand Down Expand Up @@ -331,6 +333,8 @@ fn handle_child_process(
gum::debug!(
target: LOG_TARGET,
%worker_job_pid,
?prepare_job_kind,
?preparation_timeout,
"worker job: preparing artifact",
);

Expand Down Expand Up @@ -508,6 +512,13 @@ fn handle_parent_process(
.map_err(|err| PrepareError::IoErr(err.to_string()))?;

let status = nix::sys::wait::waitpid(child, None);
gum::trace!(
target: LOG_TARGET,
%worker_pid,
"prepare worker received wait status from job: {:?}",
status,
);

let usage_after = nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN)
.map_err(|errno| error_from_errno("getrusage after", errno))?;

Expand All @@ -517,38 +528,26 @@ fn handle_parent_process(
// it is necessary to subtract the usage before the current child process to isolate its cpu
// time
let cpu_tv = get_total_cpu_usage(usage_after) - get_total_cpu_usage(usage_before);
if cpu_tv >= timeout {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
cpu_tv.as_millis(),
timeout.as_millis(),
);
return Err(PrepareError::TimedOut)
}

return match status {
match status {
Ok(WaitStatus::Exited(_pid, libc::EXIT_SUCCESS)) => {
let result: Result<Response, PrepareError> =
Result::decode(&mut received_data.as_slice())
// There is either a bug or the job was hijacked.
.map_err(|err| PrepareError::IoErr(err.to_string()))?;
match result {
Err(PrepareError::TimedOut) => {
// Log if we exceed the timeout and the other thread hasn't
// finished.
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
cpu_tv.as_millis(),
timeout.as_millis(),
);
Err(PrepareError::TimedOut)
},
Err(err) => Err(err),
Ok(response) => {
if cpu_tv >= timeout {
gum::warn!(
target: LOG_TARGET,
%worker_pid,
"prepare job took {}ms cpu time, exceeded prepare timeout {}ms",
cpu_tv.as_millis(),
timeout.as_millis(),
);
return Err(PrepareError::TimedOut);
}
// Write the serialized artifact into a temp file.
//
// PVF host only keeps artifacts statuses in its memory,
Expand Down
12 changes: 6 additions & 6 deletions polkadot/node/core/pvf/tests/it/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use tokio::sync::Mutex;
mod adder;
mod worker_common;

const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3);
const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(3);
const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(6);
const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(6);

struct TestHost {
cache_dir: tempfile::TempDir,
Expand Down Expand Up @@ -282,9 +282,9 @@ rusty_fork_test! {
let (result, _) = futures::join!(
// Choose a job that would normally take the entire timeout.
host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()),
// Run a future that kills the job in the middle of the timeout.
// Run a future that kills the job while it's running.
async {
tokio::time::sleep(TEST_PREPARATION_TIMEOUT / 2).await;
tokio::time::sleep(Duration::from_secs(1)).await;
kill_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false);
}
);
Expand Down Expand Up @@ -326,9 +326,9 @@ rusty_fork_test! {
},
Default::default(),
),
// Run a future that kills the job in the middle of the timeout.
// Run a future that kills the job while it's running.
async {
tokio::time::sleep(TEST_EXECUTION_TIMEOUT / 2).await;
tokio::time::sleep(Duration::from_secs(1)).await;
kill_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false);
}
);
Expand Down

0 comments on commit af96dfa

Please sign in to comment.