Skip to content

Commit

Permalink
Replace Smol dependency with Tokio
Browse files Browse the repository at this point in the history
  • Loading branch information
maciektr committed Mar 16, 2023
1 parent ccf26d0 commit b3949ff
Show file tree
Hide file tree
Showing 16 changed files with 211 additions and 393 deletions.
286 changes: 48 additions & 238 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,12 @@ serde = { version = "1.0.156", features = ["serde_derive"] }
serde_json = "1.0.94"
serde_test = "1.0.147"
similar-asserts = { version = "1.4.2", features = ["serde"] }
smol = "1.3.0"
smol-potat = "1.1.2"
smol_str = { version = "0.1.23", features = ["serde"] }
snapbox = { version = "0.4.8", features = ["cmd", "path"] }
tempfile = "3.4.0"
test-case = "3.0.0"
thiserror = "1.0.39"
tokio = { version = "1.26.0", features = ["macros", "process", "io-util", "rt", "rt-multi-thread", "sync"] }
toml = "0.7.1"
toml_edit = { version = "0.19.6", features = ["serde"] }
tracing = "0.1.37"
Expand Down
3 changes: 1 addition & 2 deletions scarb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ petgraph.workspace = true
semver.workspace = true
serde.workspace = true
serde_json.workspace = true
smol.workspace = true
tokio.workspace = true
smol_str.workspace = true
thiserror.workspace = true
toml.workspace = true
Expand All @@ -69,7 +69,6 @@ ntest.workspace = true
predicates.workspace = true
serde_test.workspace = true
similar-asserts.workspace = true
smol-potat.workspace = true
snapbox.workspace = true
test-case.workspace = true

Expand Down
8 changes: 8 additions & 0 deletions scarb/src/core/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{env, mem};
use anyhow::{anyhow, Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use tokio::runtime::{Builder, Runtime};
use tracing::trace;
use which::which_in;

Expand All @@ -32,6 +33,7 @@ pub struct Config {
log_filter_directive: OsString,
offline: bool,
compilers: CompilerRepository,
tokio_runtime: OnceCell<Runtime>,
}

impl Config {
Expand Down Expand Up @@ -77,6 +79,7 @@ impl Config {
log_filter_directive: b.log_filter_directive.unwrap_or_default(),
offline: b.offline,
compilers,
tokio_runtime: OnceCell::new(),
})
}

Expand Down Expand Up @@ -171,6 +174,11 @@ impl Config {
not_static_al
}

pub fn tokio_runtime(&self) -> &Runtime {
self.tokio_runtime
.get_or_init(|| Builder::new_multi_thread().enable_all().build().unwrap())
}

/// States whether the _Offline Mode_ is turned on.
///
/// For checking whether Scarb can communicate with the network, prefer to use
Expand Down
2 changes: 1 addition & 1 deletion scarb/src/core/registry/source_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use anyhow::Result;
use async_trait::async_trait;
use itertools::Itertools;
use smol::lock::RwLock;
use tokio::sync::RwLock;
use tracing::trace;

use crate::core::registry::Registry;
Expand Down
6 changes: 3 additions & 3 deletions scarb/src/flock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{fmt, io};
use anyhow::{Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use fs4::{lock_contended_error, FileExt};
use smol::lock::Mutex;
use tokio::sync::Mutex;

use crate::core::Config;
use crate::internal::asyncx::AwaitSync;
Expand Down Expand Up @@ -84,8 +84,8 @@ impl<'f> AdvisoryLock<'f> {
/// This lock is global per-process and can be acquired recursively.
/// An RAII structure is returned to release the lock, and if this process abnormally
/// terminates the lock is also released.
pub fn acquire(&self) -> Result<AdvisoryLockGuard> {
self.acquire_async().await_sync()
pub fn acquire(&self, config: &Config) -> Result<AdvisoryLockGuard> {
self.acquire_async().await_sync(config.tokio_runtime())
}

/// Async version of [`Self::acquire`].
Expand Down
6 changes: 3 additions & 3 deletions scarb/src/internal/async_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::hash::Hash;
use anyhow::Result;
use futures::future::{LocalBoxFuture, Shared};
use futures::prelude::*;
use smol::lock::RwLock;
use tokio::sync::RwLock;

use crate::internal::cloneable_error::CloneableResult;

Expand Down Expand Up @@ -58,7 +58,7 @@ mod tests {

use super::AsyncCache;

#[smol_potat::test]
#[tokio::test]
async fn load() {
let cache = AsyncCache::new((), |key: usize, _ctx: ()| {
static COUNTER: AtomicU8 = AtomicU8::new(0);
Expand All @@ -71,7 +71,7 @@ mod tests {
assert_eq!(cache.load(2).await.unwrap(), (2, 1));
}

#[smol_potat::test]
#[tokio::test]
async fn load_err() {
let cache = AsyncCache::new((), |key: usize, _ctx: ()| {
static COUNTER: AtomicU8 = AtomicU8::new(0);
Expand Down
7 changes: 4 additions & 3 deletions scarb/src/internal/asyncx.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::future::IntoFuture;
use tokio::runtime::Runtime;

pub trait AwaitSync {
/// The output that the future will produce on completion.
type Output;

/// Synchronously await a future by starting a small one-off runtime internally.
fn await_sync(self) -> Self::Output;
fn await_sync(self, runtime: &Runtime) -> Self::Output;
}

impl<F: IntoFuture> AwaitSync for F {
type Output = F::Output;

fn await_sync(self) -> Self::Output {
smol::block_on(self.into_future())
fn await_sync(self, runtime: &Runtime) -> Self::Output {
runtime.block_on(self.into_future())
}
}
2 changes: 1 addition & 1 deletion scarb/src/ops/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub fn resolve_workspace(ws: &Workspace<'_>) -> Result<WorkspaceResolve> {

Ok(WorkspaceResolve { resolve, packages })
}
.await_sync()
.await_sync(ws.config().tokio_runtime())
}

#[tracing::instrument(skip_all, level = "debug")]
Expand Down
95 changes: 46 additions & 49 deletions scarb/src/process.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::ffi::OsStr;
use std::io::{BufRead, BufReader, Read};
use std::fmt;
use std::path::Path;
use std::process::{Command, Stdio};
use std::sync::Arc;
use std::{fmt, thread};

use anyhow::{anyhow, bail, Context, Result};
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
use tracing::{debug, debug_span, warn, Span};
use tracing_futures::Instrument;

use crate::core::Config;
use crate::ui::{Spinner, Status};
Expand Down Expand Up @@ -49,62 +50,58 @@ pub fn exec_replace(cmd: &mut Command) -> Result<()> {

/// Runs the process, waiting for completion, and mapping non-success exit codes to an error.
#[tracing::instrument(level = "trace", skip_all)]
pub fn exec(cmd: &mut Command, config: &Config) -> Result<()> {
let cmd_str = shlex_join(cmd);
pub async fn async_exec(cmd: &mut tokio::process::Command, config: &Config) -> Result<()> {
let cmd_str = shlex_join(cmd.as_std());

config.ui().verbose(Status::new("Running", &cmd_str));
let _spinner = config.ui().widget(Spinner::new(cmd_str.clone()));

return thread::scope(move |s| {
let mut proc = cmd
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.with_context(|| anyhow!("could not execute process: {cmd_str}"))?;

let span = Arc::new(debug_span!("exec", pid = proc.id()));
let _enter = span.enter();
debug!("{cmd_str}");

let stdout = proc.stdout.take().expect("we asked Rust to pipe stdout");
s.spawn({
let span = debug_span!("out");
move || {
let mut stdout = stdout;
pipe_to_logs(&span, &mut stdout);
}
});

let stderr = proc.stderr.take().expect("we asked Rust to pipe stderr");
s.spawn({
let span = debug_span!("err");
move || {
let mut stderr = stderr;
pipe_to_logs(&span, &mut stderr);
}
});

let exit_status = proc
.wait()
.with_context(|| anyhow!("could not wait for proces termination: {cmd_str}"))?;
if exit_status.success() {
Ok(())
} else {
bail!("process did not exit successfully: {exit_status}");
}
});

fn pipe_to_logs(span: &Span, stream: &mut dyn Read) {
let _enter = span.enter();
let stream = BufReader::with_capacity(128, stream);
for line in stream.lines() {
async fn pipe_to_logs<T: AsyncRead + Unpin>(stream: T) {
let mut reader = BufReader::new(stream).lines();
loop {
let line = reader.next_line().await;
match line {
Ok(line) => debug!("{line}"),
Ok(Some(line)) => debug!("{line}"),
Ok(None) => break,
Err(err) => warn!("{err:?}"),
}
}
}
let runtime = config.tokio_runtime();
let mut proc = cmd
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.with_context(|| anyhow!("could not execute process: {cmd_str}"))?;

let span = Arc::new(debug_span!("exec", pid = proc.id()));
let _enter = span.enter();
debug!("{cmd_str}");

let stdout = proc.stdout.take().expect("we asked Rust to pipe stdout");
let span = debug_span!("out");
runtime.spawn(async move {
let mut stdout = stdout;
pipe_to_logs(&mut stdout).instrument(span).await;
});

let stderr = proc.stderr.take().expect("we asked Rust to pipe stderr");
runtime.spawn(async move {
let span = debug_span!("err");
let mut stderr = stderr;
pipe_to_logs(&mut stderr).instrument(span).await
});

let exit_status = proc
.wait()
.await
.with_context(|| anyhow!("could not wait for proces termination: {cmd_str}"))?;
if exit_status.success() {
Ok(())
} else {
bail!("process did not exit successfully: {exit_status}");
}
}

#[cfg(unix)]
Expand Down
4 changes: 3 additions & 1 deletion scarb/src/resolver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ mod tests {
use itertools::Itertools;
use semver::Version;
use similar_asserts::assert_serde_eq;
use tokio::runtime::Builder;

use crate::core::package::PackageName;
use crate::core::registry::mock::{deps, pkgs, registry, MockRegistry};
Expand All @@ -133,6 +134,7 @@ mod tests {
roots: &[&[ManifestDependency]],
expected: Result<&[PackageId], &str>,
) {
let runtime = Builder::new_multi_thread().build().unwrap();
let root_names = (1..).map(|n| PackageName::new(format!("ROOT_{n}")));

let summaries = roots
Expand All @@ -150,7 +152,7 @@ mod tests {
})
.collect_vec();

let resolve = super::resolve(&summaries, &mut registry).await_sync();
let resolve = super::resolve(&summaries, &mut registry).await_sync(&runtime);

let resolve = resolve
.map(|r| {
Expand Down
Loading

0 comments on commit b3949ff

Please sign in to comment.