Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
maciektr committed Mar 15, 2023
1 parent 6efd255 commit 6783c9d
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 320 deletions.
286 changes: 48 additions & 238 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ snapbox = { version = "0.4.8", features = ["cmd", "path"] }
tempfile = "3.4.0"
test-case = "3.0.0"
thiserror = "1.0.39"
tokio = { version = "1.26.0", features = ["macros", "process", "rt-multi-thread", "sync"] }
tokio = { version = "1.26.0", features = ["macros", "process", "io-util", "rt", "rt-multi-thread", "sync"] }
toml = "0.7.1"
toml_edit = { version = "0.19.6", features = ["serde"] }
tracing = "0.1.37"
Expand Down
2 changes: 2 additions & 0 deletions scarb/src/bin/scarb/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,7 @@ fn cli_main(args: ScarbArgs) -> Result<()> {
.log_filter_directive(env::var_os("SCARB_LOG"))
.build()?;

let _ = config.tokio_runtime();

commands::run(args.command, &mut config)
}
8 changes: 8 additions & 0 deletions scarb/src/core/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{env, mem};
use anyhow::{anyhow, Context, Result};
use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use tokio::runtime::{Builder, Runtime};
use tracing::trace;
use which::which_in;

Expand All @@ -32,6 +33,7 @@ pub struct Config {
log_filter_directive: OsString,
offline: bool,
compilers: CompilerRepository,
tokio_runtime: OnceCell<Runtime>,
}

impl Config {
Expand Down Expand Up @@ -77,6 +79,7 @@ impl Config {
log_filter_directive: b.log_filter_directive.unwrap_or_default(),
offline: b.offline,
compilers,
tokio_runtime: OnceCell::new(),
})
}

Expand Down Expand Up @@ -171,6 +174,11 @@ impl Config {
not_static_al
}

pub fn tokio_runtime(&self) -> &Runtime {
self.tokio_runtime
.get_or_init(|| Builder::new_multi_thread().build().unwrap())
}

/// States whether the _Offline Mode_ is turned on.
///
/// For checking whether Scarb can communicate with the network, prefer to use
Expand Down
4 changes: 2 additions & 2 deletions scarb/src/flock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ impl<'f> AdvisoryLock<'f> {
/// This lock is global per-process and can be acquired recursively.
/// An RAII structure is returned to release the lock, and if this process abnormally
/// terminates the lock is also released.
pub fn acquire(&self) -> Result<AdvisoryLockGuard> {
self.acquire_async().await_sync()
pub fn acquire(&self, config: &Config) -> Result<AdvisoryLockGuard> {
self.acquire_async().await_sync(config.tokio_runtime())
}

/// Async version of [`Self::acquire`].
Expand Down
7 changes: 4 additions & 3 deletions scarb/src/internal/asyncx.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::future::IntoFuture;
use tokio::runtime::Runtime;

pub trait AwaitSync {
/// The output that the future will produce on completion.
type Output;

/// Synchronously await a future by starting a small one-off runtime internally.
fn await_sync(self) -> Self::Output;
fn await_sync(self, runtime: &Runtime) -> Self::Output;
}

impl<F: IntoFuture> AwaitSync for F {
type Output = F::Output;

fn await_sync(self) -> Self::Output {
smol::block_on(self.into_future())
fn await_sync(self, runtime: &Runtime) -> Self::Output {
runtime.block_on(self.into_future())
}
}
2 changes: 1 addition & 1 deletion scarb/src/ops/resolve.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub fn resolve_workspace(ws: &Workspace<'_>) -> Result<WorkspaceResolve> {

Ok(WorkspaceResolve { resolve, packages })
}
.await_sync()
.await_sync(ws.config().tokio_runtime())
}

#[tracing::instrument(skip_all, level = "debug")]
Expand Down
107 changes: 59 additions & 48 deletions scarb/src/process.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::ffi::OsStr;
use std::io::{BufRead, BufReader, Read};
use std::fmt;
use std::path::Path;
use std::process::{Command, Stdio};
use std::sync::Arc;
use std::{fmt, thread};

use anyhow::{anyhow, bail, Context, Result};
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
use tracing::{debug, debug_span, warn, Span};

use crate::core::Config;
Expand Down Expand Up @@ -49,62 +49,73 @@ pub fn exec_replace(cmd: &mut Command) -> Result<()> {

/// Runs the process, waiting for completion, and mapping non-success exit codes to an error.
#[tracing::instrument(level = "trace", skip_all)]
pub fn exec(cmd: &mut Command, config: &Config) -> Result<()> {
let cmd_str = shlex_join(cmd);
pub async fn async_exec(cmd: &mut tokio::process::Command, config: &Config) -> Result<()> {
// let cmd_str = ShlexJoin::from(&*cmd).to_string();
let cmd_str = shlex_join(cmd.as_std());

config.ui().verbose(Status::new("Running", &cmd_str));
let _spinner = config.ui().widget(Spinner::new(cmd_str.clone()));

return thread::scope(move |s| {
let mut proc = cmd
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.with_context(|| anyhow!("could not execute process: {cmd_str}"))?;

let span = Arc::new(debug_span!("exec", pid = proc.id()));
async fn pipe_to_logs<T: AsyncRead + Unpin>(span: &Span, stream: T) {
let _enter = span.enter();
debug!("{cmd_str}");

let stdout = proc.stdout.take().expect("we asked Rust to pipe stdout");
s.spawn({
let span = debug_span!("out");
move || {
let mut stdout = stdout;
pipe_to_logs(&span, &mut stdout);
}
});

let stderr = proc.stderr.take().expect("we asked Rust to pipe stderr");
s.spawn({
let span = debug_span!("err");
move || {
let mut stderr = stderr;
pipe_to_logs(&span, &mut stderr);
}
});

let exit_status = proc
.wait()
.with_context(|| anyhow!("could not wait for proces termination: {cmd_str}"))?;
if exit_status.success() {
Ok(())
} else {
bail!("process did not exit successfully: {exit_status}");
}
});

fn pipe_to_logs(span: &Span, stream: &mut dyn Read) {
let _enter = span.enter();
let stream = BufReader::with_capacity(128, stream);
for line in stream.lines() {
let mut reader = BufReader::new(stream).lines();
loop {
let line = reader.next_line().await;
match line {
Ok(line) => debug!("{line}"),
Ok(Some(line)) => debug!("{line}"),
Ok(None) => break,
Err(err) => warn!("{err:?}"),
}
}
}
let runtime = config.tokio_runtime();
let mut proc = cmd
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.with_context(|| anyhow!("could not execute process: {cmd_str}"))?;

let span = Arc::new(debug_span!("exec", pid = proc.id()));
let _enter = span.enter();
debug!("{cmd_str}");

let stdout = proc.stdout.take().expect("we asked Rust to pipe stdout");
runtime.spawn(async move {
let span = debug_span!("out");
// move || {
let mut stdout = stdout;
pipe_to_logs(&span, &mut stdout).await;
// }
});

let stderr = proc.stderr.take().expect("we asked Rust to pipe stderr");
runtime.spawn(async move {
let span = debug_span!("err");
// move || {
let mut stderr = stderr;
pipe_to_logs(&span, &mut stderr).await;
// }
});

// tokio::spawn(async move {
// let status = child.wait().await
// .expect("child process encountered an error");
//
// println!("child status was: {}", status);
// });
// runtime.spawn(async move {
let exit_status = proc
.wait()
.await
.with_context(|| anyhow!("could not wait for proces termination: {cmd_str}"))?;
if exit_status.success() {
Ok(())
} else {
bail!("process did not exit successfully: {exit_status}");
}
// });
// Ok(())
}

#[cfg(unix)]
Expand Down
4 changes: 3 additions & 1 deletion scarb/src/resolver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ mod tests {
use itertools::Itertools;
use semver::Version;
use similar_asserts::assert_serde_eq;
use tokio::runtime::Builder;

use crate::core::package::PackageName;
use crate::core::registry::mock::{deps, pkgs, registry, MockRegistry};
Expand All @@ -133,6 +134,7 @@ mod tests {
roots: &[&[ManifestDependency]],
expected: Result<&[PackageId], &str>,
) {
let runtime = Builder::new_multi_thread().build().unwrap();
let root_names = (1..).map(|n| PackageName::new(format!("ROOT_{n}")));

let summaries = roots
Expand All @@ -150,7 +152,7 @@ mod tests {
})
.collect_vec();

let resolve = super::resolve(&summaries, &mut registry).await_sync();
let resolve = super::resolve(&summaries, &mut registry).await_sync(&runtime);

let resolve = resolve
.map(|r| {
Expand Down
55 changes: 36 additions & 19 deletions scarb/src/sources/git/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
//! repositories as source of super important information.

use std::fmt;
use std::process::Command;
use std::path::Path;

use anyhow::{anyhow, bail, Context, Result};
use camino::Utf8PathBuf;
use tokio::process::Command;

use crate::core::{Config, GitReference};
use crate::flock::Filesystem;
use crate::process::exec;
use crate::process::async_exec;
use crate::ui::Verbosity;

use super::canonical_url::CanonicalUrl;
Expand Down Expand Up @@ -58,12 +59,14 @@ impl fmt::Debug for GitDatabase {

/// A local checkout of a particular Git commit.
#[derive(Debug)]
pub struct GitCheckout<'d> {
pub db: &'d GitDatabase,
pub struct GitCheckout {
// pub db: &'d GitDatabase,
pub location: Utf8PathBuf,
pub rev: Rev,
}

// unsafe impl Send for GitCheckout<'_> {}

#[derive(Copy, Clone, Eq, PartialEq)]
pub struct Rev {
oid: gix::ObjectId,
Expand Down Expand Up @@ -96,8 +99,8 @@ impl GitRemote {
self.url.ident()
}

#[tracing::instrument(level = "trace", skip(config))]
pub fn checkout(
#[tracing::instrument(le vel = "trace", skip(config))]
pub async fn checkout(
&self,
fs: &Filesystem<'_>,
db: Option<GitDatabase>,
Expand All @@ -110,6 +113,7 @@ impl GitRemote {
// version of `reference`, so return that database and the rev we resolve to.
if let Some(db) = db {
db.fetch(self.url.as_str(), reference, config)
.await
.with_context(|| format!("failed to fetch into: {fs}"))?;
match locked_rev {
Some(rev) => {
Expand All @@ -133,6 +137,7 @@ impl GitRemote {
}
let db = GitDatabase::init_bare(self, fs)?;
db.fetch(self.url.as_str(), reference, config)
.await
.with_context(|| format!("failed to clone into: {fs}"))?;
let rev = match locked_rev {
Some(rev) => rev,
Expand Down Expand Up @@ -167,7 +172,7 @@ impl GitDatabase {
}

#[tracing::instrument(level = "trace", skip(config))]
fn fetch(&self, url: &str, reference: &GitReference, config: &Config) -> Result<()> {
async fn fetch(&self, url: &str, reference: &GitReference, config: &Config) -> Result<()> {
if !config.network_allowed() {
bail!("cannot fetch from `{}` in offline mode", self.remote);
}
Expand All @@ -187,17 +192,18 @@ impl GitDatabase {
cmd.arg(url);
cmd.args(refspecs);
cmd.current_dir(self.repo.path());
exec(&mut cmd, config)
async_exec(&mut cmd, config).await
}

pub fn copy_to(
pub async fn copy_to(
&self,
fs: &Filesystem<'_>,
rev: Rev,
config: &Config,
) -> Result<GitCheckout<'_>> {
let checkout = GitCheckout::clone(self, fs, rev, config)?;
checkout.reset(config)?;
) -> Result<GitCheckout> {
let path = self.repo.path().clone();
let checkout = GitCheckout::clone(path, fs, rev, config).await?;
checkout.reset(config).await?;
Ok(checkout)
}

Expand Down Expand Up @@ -249,9 +255,15 @@ impl GitDatabase {
}
}

impl<'d> GitCheckout<'d> {
impl GitCheckout {
#[tracing::instrument(level = "trace", skip(config))]
fn clone(db: &'d GitDatabase, fs: &Filesystem<'_>, rev: Rev, config: &Config) -> Result<Self> {
async fn clone(
// db: &'d GitDatabase,
repo_path: &Path,
fs: &Filesystem<'_>,
rev: Rev,
config: &Config,
) -> Result<GitCheckout> {
unsafe {
fs.recreate()?;
}
Expand All @@ -263,20 +275,25 @@ impl<'d> GitCheckout<'d> {
with_verbosity_flags(&mut cmd, config);
cmd.args(["--config", "core.autocrlf=false"]);
cmd.arg("--recurse-submodules");
cmd.arg(db.repo.path());
// cmd.arg(db.repo.path());
cmd.arg(repo_path);
cmd.arg(&location);
exec(&mut cmd, config)?;
async_exec(&mut cmd, config).await?;

Ok(Self { db, location, rev })
Ok(Self {
// db,
location,
rev,
})
}

#[tracing::instrument(level = "trace", skip(config))]
fn reset(&self, config: &Config) -> Result<()> {
async fn reset(&self, config: &Config) -> Result<()> {
let mut cmd = git_command();
cmd.args(["reset", "--hard"]);
cmd.arg(self.rev.to_string());
cmd.current_dir(&self.location);
exec(&mut cmd, config)
async_exec(&mut cmd, config).await
}
}

Expand Down
Loading

0 comments on commit 6783c9d

Please sign in to comment.