diff --git a/polkadot/node/core/pvf/execute-worker/src/lib.rs b/polkadot/node/core/pvf/execute-worker/src/lib.rs index b9c66e860ab8..1b084c99f87b 100644 --- a/polkadot/node/core/pvf/execute-worker/src/lib.rs +++ b/polkadot/node/core/pvf/execute-worker/src/lib.rs @@ -195,6 +195,8 @@ pub fn worker_entrypoint( // outside world. The only IPC it should be able to do is sending its // response over the pipe. drop(stream); + // Drop the read end so we don't have too many FDs open. + drop(pipe_reader); handle_child_process( pipe_writer, @@ -367,6 +369,12 @@ fn handle_parent_process( pipe_read.read_to_end(&mut received_data)?; let status = nix::sys::wait::waitpid(child, None); + gum::trace!( + target: LOG_TARGET, + %worker_pid, + "execute worker received wait status from job: {:?}", + status, + ); let usage_after = match nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) { Ok(usage) => usage, diff --git a/polkadot/node/core/pvf/prepare-worker/src/lib.rs b/polkadot/node/core/pvf/prepare-worker/src/lib.rs index 5acdc94a1e68..6b0091f5d9e5 100644 --- a/polkadot/node/core/pvf/prepare-worker/src/lib.rs +++ b/polkadot/node/core/pvf/prepare-worker/src/lib.rs @@ -232,6 +232,8 @@ pub fn worker_entrypoint( // outside world. The only IPC it should be able to do is sending its // response over the pipe. drop(stream); + // Drop the read end so we don't have too many FDs open. + drop(pipe_reader); handle_child_process( pvf, @@ -331,6 +333,8 @@ fn handle_child_process( gum::debug!( target: LOG_TARGET, %worker_job_pid, + ?prepare_job_kind, + ?preparation_timeout, "worker job: preparing artifact", ); @@ -508,6 +512,13 @@ fn handle_parent_process( .map_err(|err| PrepareError::IoErr(err.to_string()))?; let status = nix::sys::wait::waitpid(child, None); + gum::trace!( + target: LOG_TARGET, + %worker_pid, + "prepare worker received wait status from job: {:?}", + status, + ); + let usage_after = nix::sys::resource::getrusage(UsageWho::RUSAGE_CHILDREN) .map_err(|errno| error_from_errno("getrusage after", errno))?; @@ -517,38 +528,26 @@ fn handle_parent_process( // it is necessary to subtract the usage before the current child process to isolate its cpu // time let cpu_tv = get_total_cpu_usage(usage_after) - get_total_cpu_usage(usage_before); + if cpu_tv >= timeout { + gum::warn!( + target: LOG_TARGET, + %worker_pid, + "prepare job took {}ms cpu time, exceeded prepare timeout {}ms", + cpu_tv.as_millis(), + timeout.as_millis(), + ); + return Err(PrepareError::TimedOut) + } - return match status { + match status { Ok(WaitStatus::Exited(_pid, libc::EXIT_SUCCESS)) => { let result: Result = Result::decode(&mut received_data.as_slice()) // There is either a bug or the job was hijacked. .map_err(|err| PrepareError::IoErr(err.to_string()))?; match result { - Err(PrepareError::TimedOut) => { - // Log if we exceed the timeout and the other thread hasn't - // finished. - gum::warn!( - target: LOG_TARGET, - %worker_pid, - "prepare job took {}ms cpu time, exceeded prepare timeout {}ms", - cpu_tv.as_millis(), - timeout.as_millis(), - ); - Err(PrepareError::TimedOut) - }, Err(err) => Err(err), Ok(response) => { - if cpu_tv >= timeout { - gum::warn!( - target: LOG_TARGET, - %worker_pid, - "prepare job took {}ms cpu time, exceeded prepare timeout {}ms", - cpu_tv.as_millis(), - timeout.as_millis(), - ); - return Err(PrepareError::TimedOut); - } // Write the serialized artifact into a temp file. // // PVF host only keeps artifacts statuses in its memory, diff --git a/polkadot/node/core/pvf/tests/it/main.rs b/polkadot/node/core/pvf/tests/it/main.rs index 58b47dffe5a9..54a686a3e798 100644 --- a/polkadot/node/core/pvf/tests/it/main.rs +++ b/polkadot/node/core/pvf/tests/it/main.rs @@ -34,8 +34,8 @@ use tokio::sync::Mutex; mod adder; mod worker_common; -const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(3); -const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(3); +const TEST_EXECUTION_TIMEOUT: Duration = Duration::from_secs(6); +const TEST_PREPARATION_TIMEOUT: Duration = Duration::from_secs(6); struct TestHost { cache_dir: tempfile::TempDir, @@ -282,9 +282,9 @@ rusty_fork_test! { let (result, _) = futures::join!( // Choose a job that would normally take the entire timeout. host.precheck_pvf(rococo_runtime::WASM_BINARY.unwrap(), Default::default()), - // Run a future that kills the job in the middle of the timeout. + // Run a future that kills the job while it's running. async { - tokio::time::sleep(TEST_PREPARATION_TIMEOUT / 2).await; + tokio::time::sleep(Duration::from_secs(1)).await; kill_by_sid_and_name(sid, PREPARE_PROCESS_NAME, false); } ); @@ -326,9 +326,9 @@ rusty_fork_test! { }, Default::default(), ), - // Run a future that kills the job in the middle of the timeout. + // Run a future that kills the job while it's running. async { - tokio::time::sleep(TEST_EXECUTION_TIMEOUT / 2).await; + tokio::time::sleep(Duration::from_secs(1)).await; kill_by_sid_and_name(sid, EXECUTE_PROCESS_NAME, false); } );