Skip to content

Commit

Permalink
Auto merge of #9201 - ehuss:close-error-during-drain, r=Eh2406
Browse files Browse the repository at this point in the history
Fix hang on broken stderr.

If stderr is closed and cargo tries to print a status message such as "Compiling", cargo can get into a hung state. This is because the `DrainState::run` function would insert the job into the `active` queue, and then return an error without starting the job. Since the job isn't started, there is nothing to remove it from the `active` queue, and thus cargo hangs forever waiting for it to finish.

The solution is to move the call to `note_working_on` earlier before the job is placed into the active queue.

This addresses the issue noted in #8714 (comment).
  • Loading branch information
bors committed Feb 24, 2021
2 parents 9a7fe2c + bb7ec2f commit e370c83
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 20 deletions.
33 changes: 15 additions & 18 deletions src/cargo/core/compiler/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ struct DrainState<'cfg> {
pending_queue: Vec<(Unit, Job)>,
print: DiagnosticPrinter<'cfg>,

// How many jobs we've finished
/// How many jobs we've finished
finished: usize,
}

Expand Down Expand Up @@ -469,7 +469,15 @@ impl<'cfg> DrainState<'cfg> {
// we're able to perform some parallel work.
while self.has_extra_tokens() && !self.pending_queue.is_empty() {
let (unit, job) = self.pending_queue.remove(0);
self.run(&unit, job, cx, scope)?;
*self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
if !cx.bcx.build_config.build_plan {
// Print out some nice progress information.
// NOTE: An error here will drop the job without starting it.
// That should be OK, since we want to exit as soon as
// possible during an error.
self.note_working_on(cx.bcx.config, &unit, job.freshness())?;
}
self.run(&unit, job, cx, scope);
}

Ok(())
Expand Down Expand Up @@ -835,31 +843,22 @@ impl<'cfg> DrainState<'cfg> {
}
}

/// Executes a job, pushing the spawned thread's handled onto `threads`.
fn run(
&mut self,
unit: &Unit,
job: Job,
cx: &Context<'_, '_>,
scope: &Scope<'_>,
) -> CargoResult<()> {
/// Executes a job.
///
/// Fresh jobs block until finished (which should be very fast!), Dirty
/// jobs will spawn a thread in the background and return immediately.
fn run(&mut self, unit: &Unit, job: Job, cx: &Context<'_, '_>, scope: &Scope<'_>) {
let id = JobId(self.next_id);
self.next_id = self.next_id.checked_add(1).unwrap();

info!("start {}: {:?}", id, unit);

assert!(self.active.insert(id, unit.clone()).is_none());
*self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;

let messages = self.messages.clone();
let fresh = job.freshness();
let rmeta_required = cx.rmeta_required(unit);

if !cx.bcx.build_config.build_plan {
// Print out some nice progress information.
self.note_working_on(cx.bcx.config, unit, fresh)?;
}

let doit = move |state: JobState<'_>| {
let mut sender = FinishOnDrop {
messages: &state.messages,
Expand Down Expand Up @@ -934,8 +933,6 @@ impl<'cfg> DrainState<'cfg> {
});
}
}

Ok(())
}

fn emit_warnings(
Expand Down
84 changes: 82 additions & 2 deletions tests/testsuite/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use cargo::{
use cargo_test_support::paths::{root, CargoPathExt};
use cargo_test_support::registry::Package;
use cargo_test_support::{
basic_bin_manifest, basic_lib_manifest, basic_manifest, git, is_nightly, lines_match_unordered,
main_file, paths, project, rustc_host, sleep_ms, symlink_supported, t, Execs, ProjectBuilder,
basic_bin_manifest, basic_lib_manifest, basic_manifest, cargo_exe, git, is_nightly,
lines_match_unordered, main_file, paths, process, project, rustc_host, sleep_ms,
symlink_supported, t, Execs, ProjectBuilder,
};
use std::env;
use std::fs;
Expand Down Expand Up @@ -5256,6 +5257,85 @@ hello stderr!
lines_match_unordered("hello stdout!\n", &stdout).unwrap();
}

#[cargo_test]
fn close_output_during_drain() {
// Test to close the output during the build phase (drain_the_queue).
// There was a bug where it would hang.

// Server to know when rustc has spawned.
let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();

// Create a wrapper so the test can know when compiling has started.
let rustc_wrapper = {
let p = project()
.at("compiler")
.file("Cargo.toml", &basic_manifest("compiler", "1.0.0"))
.file(
"src/main.rs",
&r#"
use std::process::Command;
use std::env;
use std::io::Read;
fn main() {
// Only wait on the first dependency.
if matches!(env::var("CARGO_PKG_NAME").as_deref(), Ok("dep")) {
let mut socket = std::net::TcpStream::connect("__ADDR__").unwrap();
// Wait for the test to tell us to start printing.
let mut buf = [0];
drop(socket.read_exact(&mut buf));
}
let mut cmd = Command::new("rustc");
for arg in env::args_os().skip(1) {
cmd.arg(arg);
}
std::process::exit(cmd.status().unwrap().code().unwrap());
}
"#
.replace("__ADDR__", &addr.to_string()),
)
.build();
p.cargo("build").run();
p.bin("compiler")
};

Package::new("dep", "1.0.0").publish();
let p = project()
.file(
"Cargo.toml",
r#"
[package]
name = "foo"
version = "0.1.0"
[dependencies]
dep = "1.0"
"#,
)
.file("src/lib.rs", "")
.build();

// Spawn cargo, wait for the first rustc to start, and then close stderr.
let mut cmd = process(&cargo_exe())
.arg("check")
.cwd(p.root())
.env("RUSTC", rustc_wrapper)
.build_command();
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
let mut child = cmd.spawn().expect("cargo should spawn");
// Wait for the rustc wrapper to start.
let rustc_conn = listener.accept().unwrap().0;
// Close stderr to force an error.
drop(child.stderr.take());
// Tell the wrapper to continue.
drop(rustc_conn);
match child.wait() {
Ok(status) => assert!(!status.success()),
Err(e) => panic!("child wait failed: {}", e),
}
}

use cargo_test_support::registry::Dependency;

#[cargo_test]
Expand Down

0 comments on commit e370c83

Please sign in to comment.