Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Replace async-std with tokio in PVF subsystem #6419

Merged
merged 23 commits into from
Jan 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
ec204fa
Replace async-std with tokio in PVF subsystem
mrcnski Dec 11, 2022
3abc804
Rework workers to use `select!` instead of a mutex
mrcnski Dec 13, 2022
5fcc67d
Remove unnecessary `fuse`
mrcnski Dec 13, 2022
1574561
Merge branch 'master' into m-cat/replace-async-std-pvf
mrcnski Dec 13, 2022
451fae0
Add explanation for `expect()`
mrcnski Dec 13, 2022
fc4c28b
Update node/core/pvf/src/worker_common.rs
mrcnski Dec 18, 2022
1dde78b
Update node/core/pvf/src/worker_common.rs
mrcnski Dec 18, 2022
da31a48
Address some review comments
mrcnski Dec 18, 2022
35a0c79
Merge remote-tracking branch 'origin/m-cat/replace-async-std-pvf' int…
mrcnski Dec 18, 2022
e1c2cf3
Shutdown tokio runtime
mrcnski Dec 18, 2022
077a123
Run cargo fmt
mrcnski Dec 19, 2022
e0d4b9e
Add a small note about retries
mrcnski Dec 19, 2022
2353747
Merge branch 'master' into m-cat/replace-async-std-pvf
mrcnski Dec 20, 2022
28d4062
Fix up merge
mrcnski Dec 20, 2022
3964aca
Rework `cpu_time_monitor_loop` to return when other thread finishes
mrcnski Dec 20, 2022
7057518
Add error string to PrepareError::IoErr variant
mrcnski Dec 20, 2022
e6ba098
Log when artifacts fail to prepare
mrcnski Dec 20, 2022
e094f80
Fix `cpu_time_monitor_loop`; fix test
mrcnski Dec 20, 2022
c09377a
Fix text
mrcnski Dec 20, 2022
05d1865
Fix a couple of potential minor data races.
mrcnski Dec 22, 2022
b0c2434
Merge branch 'master' into m-cat/replace-async-std-pvf
mrcnski Jan 5, 2023
5cc477c
Merge branch 'master' into m-cat/replace-async-std-pvf
mrcnski Jan 9, 2023
0f4ac06
Update Cargo.lock
mrcnski Jan 9, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion node/core/pvf/src/prepare/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use crate::{
LOG_TARGET,
};
use always_assert::never;
use async_std::path::{Path, PathBuf};
use futures::{
channel::mpsc, future::BoxFuture, stream::FuturesUnordered, Future, FutureExt, StreamExt,
};
Expand Down
27 changes: 11 additions & 16 deletions node/core/pvf/src/prepare/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub async fn start_work(
artifact_path: PathBuf,
preparation_timeout: Duration,
) -> Outcome {
let IdleWorker { mut stream, pid } = worker;
let IdleWorker { stream, pid } = worker;

gum::debug!(
target: LOG_TARGET,
Expand All @@ -91,7 +91,7 @@ pub async fn start_work(
artifact_path.display(),
);

with_tmp_file(stream.clone(), pid, cache_path, |tmp_file| async move {
with_tmp_file(stream, pid, cache_path, |tmp_file, mut stream| async move {
if let Err(err) = send_request(&mut stream, code, &tmp_file, preparation_timeout).await {
gum::warn!(
target: LOG_TARGET,
Expand Down Expand Up @@ -194,11 +194,6 @@ async fn handle_response_bytes(
preparation_timeout.as_millis(),
tmp_file.display(),
);

// Return a timeout error.
//
// NOTE: The artifact exists, but is located in a temporary file which
// will be cleared by `with_tmp_file`.
return Outcome::TimedOut
}

Expand All @@ -210,10 +205,9 @@ async fn handle_response_bytes(
artifact_path.display(),
);

tokio::fs::rename(&tmp_file, &artifact_path)
.await
.map(|_| Outcome::Concluded { worker, result })
.unwrap_or_else(|err| {
match tokio::fs::rename(&tmp_file, &artifact_path).await {
Ok(()) => Outcome::Concluded { worker, result },
Err(err) => {
gum::warn!(
target: LOG_TARGET,
worker_pid = %pid,
Expand All @@ -223,7 +217,8 @@ async fn handle_response_bytes(
err,
);
Outcome::RenameTmpFileErr { worker, result, err: format!("{:?}", err) }
})
},
}
}

/// Create a temporary file for an artifact at the given cache path and execute the given
Expand All @@ -233,7 +228,7 @@ async fn handle_response_bytes(
async fn with_tmp_file<F, Fut>(stream: UnixStream, pid: u32, cache_path: &Path, f: F) -> Outcome
where
Fut: futures::Future<Output = Outcome>,
F: FnOnce(PathBuf) -> Fut,
F: FnOnce(PathBuf, UnixStream) -> Fut,
{
let tmp_file = match tmpfile_in("prepare-artifact-", cache_path).await {
Ok(f) => f,
Expand All @@ -251,7 +246,7 @@ where
},
};

let outcome = f(tmp_file.clone()).await;
let outcome = f(tmp_file.clone(), stream).await;

// The function called above is expected to move `tmp_file` to a new location upon success. However,
// the function may as well fail and in that case we should remove the tmp file here.
Expand Down Expand Up @@ -344,14 +339,14 @@ pub fn worker_entrypoint(socket_path: &str) {
join_res = thread_fut => {
match join_res {
Ok(()) => Err(PrepareError::TimedOut),
Err(_) => Err(PrepareError::DidNotMakeIt),
Err(_) => Err(PrepareError::IoErr),
}
},
compilation_res = prepare_fut => {
let cpu_time_elapsed = cpu_time_start.elapsed();
finished_flag.store(true, Ordering::Relaxed);

match compilation_res.unwrap_or_else(|_| Err(PrepareError::DidNotMakeIt)) {
match compilation_res.unwrap_or_else(|_| Err(PrepareError::IoErr)) {
Err(err) => {
// Serialized error will be written into the socket.
Err(err)
Expand Down
17 changes: 4 additions & 13 deletions node/core/pvf/src/worker_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

//! Common logic for implementation of worker processes.

use crate::{execute::ExecuteResponse, PrepareError, LOG_TARGET};
use crate::LOG_TARGET;
use cpu_time::ProcessTime;
use futures::{never::Never, FutureExt as _};
use futures_timer::Delay;
Expand Down Expand Up @@ -187,21 +187,12 @@ where
let stream = UnixStream::connect(socket_path).await?;
let _ = tokio::fs::remove_file(socket_path).await;

let result = event_loop(handle.clone(), stream.clone()).await;

if let Err(err) = stream.shutdown(Shutdown::Both) {
// Log, but don't return error here, as it may shadow any error from `event_loop`.
gum::debug!(
target: LOG_TARGET,
"error shutting down stream at path {}: {}",
socket_path,
err
);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed this because UnixStream in tokio doesn't implement Clone, and I couldn't figure out how to get the stream to stick around here without a really big refactor (or a mutex). After spending an hour on this I just gave up, but it's fine because shutdown here was added merely as a courtesy call and shouldn't be necessary. Worst case scenario, the host doesn't get the signal and just waits and times out.

let result = event_loop(handle.clone(), stream).await;

result
})
.unwrap_err(); // it's never `Ok` because it's `Ok(Never)`
// It's never `Ok` because it's `Ok(Never)`.
.unwrap_err();

gum::debug!(
target: LOG_TARGET,
Expand Down