Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace std::sync::mpsc with a much simpler queue #7845

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 35 additions & 36 deletions src/cargo/core/compiler/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ use std::collections::{BTreeMap, HashMap, HashSet};
use std::io;
use std::marker;
use std::mem;
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::time::Duration;

Expand All @@ -74,6 +73,7 @@ use crate::core::{PackageId, TargetKind};
use crate::handle_error;
use crate::util;
use crate::util::diagnostic_server::{self, DiagnosticPrinter};
use crate::util::Queue;
use crate::util::{internal, profile, CargoResult, CargoResultExt, ProcessBuilder};
use crate::util::{Config, DependencyQueue};
use crate::util::{Progress, ProgressStyle};
Expand All @@ -99,8 +99,7 @@ struct DrainState<'a, 'cfg> {
total_units: usize,

queue: DependencyQueue<Unit<'a>, Artifact, Job>,
tx: Sender<Message>,
rx: Receiver<Message>,
messages: Arc<Queue<Message>>,
active: HashMap<JobId, Unit<'a>>,
compiled: HashSet<PackageId>,
documented: HashSet<PackageId>,
Expand Down Expand Up @@ -146,7 +145,7 @@ impl std::fmt::Display for JobId {

pub struct JobState<'a> {
/// Channel back to the main thread to coordinate messages and such.
tx: Sender<Message>,
messages: Arc<Queue<Message>>,

/// The job id that this state is associated with, used when sending
/// messages back to the main thread.
Expand Down Expand Up @@ -200,7 +199,7 @@ enum Message {

impl<'a> JobState<'a> {
pub fn running(&self, cmd: &ProcessBuilder) {
let _ = self.tx.send(Message::Run(self.id, cmd.to_string()));
self.messages.push(Message::Run(self.id, cmd.to_string()));
}

pub fn build_plan(
Expand All @@ -209,17 +208,16 @@ impl<'a> JobState<'a> {
cmd: ProcessBuilder,
filenames: Arc<Vec<OutputFile>>,
) {
let _ = self
.tx
.send(Message::BuildPlanMsg(module_name, cmd, filenames));
self.messages
.push(Message::BuildPlanMsg(module_name, cmd, filenames));
}

pub fn stdout(&self, stdout: String) {
drop(self.tx.send(Message::Stdout(stdout)));
self.messages.push(Message::Stdout(stdout));
}

pub fn stderr(&self, stderr: String) {
drop(self.tx.send(Message::Stderr(stderr)));
self.messages.push(Message::Stderr(stderr));
}

/// A method used to signal to the coordinator thread that the rmeta file
Expand All @@ -229,9 +227,8 @@ impl<'a> JobState<'a> {
/// produced once!
pub fn rmeta_produced(&self) {
self.rmeta_required.set(false);
let _ = self
.tx
.send(Message::Finish(self.id, Artifact::Metadata, Ok(())));
self.messages
.push(Message::Finish(self.id, Artifact::Metadata, Ok(())));
}

/// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
Expand All @@ -240,14 +237,14 @@ impl<'a> JobState<'a> {
/// This should arrange for the associated client to eventually get a token via
/// `client.release_raw()`.
pub fn will_acquire(&self) {
let _ = self.tx.send(Message::NeedsToken(self.id));
self.messages.push(Message::NeedsToken(self.id));
}

/// The rustc underlying this Job is informing us that it is done with a jobserver token.
///
/// Note that it does *not* write that token back anywhere.
pub fn release_token(&self) {
let _ = self.tx.send(Message::ReleaseToken(self.id));
self.messages.push(Message::ReleaseToken(self.id));
}
}

Expand Down Expand Up @@ -341,13 +338,11 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
let _p = profile::start("executing the job graph");
self.queue.queue_finished();

let (tx, rx) = channel();
let progress = Progress::with_style("Building", ProgressStyle::Ratio, cx.bcx.config);
let state = DrainState {
total_units: self.queue.len(),
queue: self.queue,
tx,
rx,
messages: Arc::new(Queue::new()),
active: HashMap::new(),
compiled: HashSet::new(),
documented: HashSet::new(),
Expand All @@ -365,25 +360,25 @@ impl<'a, 'cfg> JobQueue<'a, 'cfg> {
};

// Create a helper thread for acquiring jobserver tokens
let tx = state.tx.clone();
let messages = state.messages.clone();
let helper = cx
.jobserver
.clone()
.into_helper_thread(move |token| {
drop(tx.send(Message::Token(token)));
drop(messages.push(Message::Token(token)));
})
.chain_err(|| "failed to create helper thread for jobserver management")?;

// Create a helper thread to manage the diagnostics for rustfix if
// necessary.
let tx = state.tx.clone();
let messages = state.messages.clone();
let _diagnostic_server = cx
.bcx
.build_config
.rustfix_diagnostic_server
.borrow_mut()
.take()
.map(move |srv| srv.start(move |msg| drop(tx.send(Message::FixDiagnostic(msg)))));
.map(move |srv| srv.start(move |msg| drop(messages.push(Message::FixDiagnostic(msg)))));

crossbeam_utils::thread::scope(move |scope| state.drain_the_queue(cx, plan, scope, &helper))
.expect("child threads shouldn't panic")
Expand Down Expand Up @@ -585,7 +580,10 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
// to run above to calculate CPU usage over time. To do this we
// listen for a message with a timeout, and on timeout we run the
// previous parts of the loop again.
let events: Vec<_> = self.rx.try_iter().collect();
let mut events = Vec::new();
while let Some(event) = self.messages.try_pop() {
events.push(event);
}
info!(
"tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
self.tokens.len(),
Expand All @@ -603,14 +601,16 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
loop {
self.tick_progress();
self.tokens.truncate(self.active.len() - 1);
match self.rx.recv_timeout(Duration::from_millis(500)) {
Ok(message) => break vec![message],
Err(_) => continue,
match self.messages.pop(Duration::from_millis(500)) {
Some(message) => {
events.push(message);
break;
}
None => continue,
}
}
} else {
events
}
return events;
}

fn drain_the_queue(
Expand Down Expand Up @@ -757,7 +757,7 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
assert!(self.active.insert(id, *unit).is_none());
*self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;

let my_tx = self.tx.clone();
let messages = self.messages.clone();
let fresh = job.freshness();
let rmeta_required = cx.rmeta_required(unit);

Expand All @@ -769,13 +769,13 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
let doit = move || {
let state = JobState {
id,
tx: my_tx.clone(),
messages: messages.clone(),
rmeta_required: Cell::new(rmeta_required),
_marker: marker::PhantomData,
};

let mut sender = FinishOnDrop {
tx: &my_tx,
messages: &messages,
id,
result: Err(format_err!("worker panicked")),
};
Expand All @@ -794,25 +794,24 @@ impl<'a, 'cfg> DrainState<'a, 'cfg> {
// we need to make sure that the metadata is flagged as produced so
// send a synthetic message here.
if state.rmeta_required.get() && sender.result.is_ok() {
my_tx
.send(Message::Finish(id, Artifact::Metadata, Ok(())))
.unwrap();
messages.push(Message::Finish(id, Artifact::Metadata, Ok(())));
}

// Use a helper struct with a `Drop` implementation to guarantee
// that a `Finish` message is sent even if our job panics. We
// shouldn't panic unless there's a bug in Cargo, so we just need
// to make sure nothing hangs by accident.
struct FinishOnDrop<'a> {
tx: &'a Sender<Message>,
messages: &'a Queue<Message>,
id: JobId,
result: CargoResult<()>,
}

impl Drop for FinishOnDrop<'_> {
fn drop(&mut self) {
let msg = mem::replace(&mut self.result, Ok(()));
drop(self.tx.send(Message::Finish(self.id, Artifact::All, msg)));
self.messages
.push(Message::Finish(self.id, Artifact::All, msg));
}
}
};
Expand Down
2 changes: 2 additions & 0 deletions src/cargo/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub use self::paths::{bytes2path, dylib_path, join_paths, path2bytes};
pub use self::paths::{dylib_path_envvar, normalize_path};
pub use self::process_builder::{process, ProcessBuilder};
pub use self::progress::{Progress, ProgressStyle};
pub use self::queue::Queue;
pub use self::read2::read2;
pub use self::rustc::Rustc;
pub use self::sha256::Sha256;
Expand Down Expand Up @@ -50,6 +51,7 @@ pub mod paths;
pub mod process_builder;
pub mod profile;
mod progress;
mod queue;
mod read2;
pub mod rustc;
mod sha256;
Expand Down
54 changes: 54 additions & 0 deletions src/cargo/util/queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::collections::VecDeque;
use std::sync::{Condvar, Mutex};
use std::time::{Duration, Instant};

/// A simple, threadsafe, queue of items of type `T`
///
/// This is a sort of channel where any thread can push to a queue and any
/// thread can pop from a queue. Currently queues have infinite capacity where
/// `push` will never block but `pop` will block.
pub struct Queue<T> {
state: Mutex<State<T>>,
condvar: Condvar,
}

struct State<T> {
items: VecDeque<T>,
}

impl<T> Queue<T> {
pub fn new() -> Queue<T> {
Queue {
state: Mutex::new(State {
items: VecDeque::new(),
}),
condvar: Condvar::new(),
}
}

pub fn push(&self, item: T) {
self.state.lock().unwrap().items.push_back(item);
self.condvar.notify_one();
}

pub fn pop(&self, timeout: Duration) -> Option<T> {
let mut state = self.state.lock().unwrap();
let now = Instant::now();
while state.items.is_empty() {
let elapsed = now.elapsed();
if elapsed >= timeout {
break;
}
let (lock, result) = self.condvar.wait_timeout(state, timeout - elapsed).unwrap();
state = lock;
if result.timed_out() {
break;
}
}
state.items.pop_front()
}

pub fn try_pop(&self) -> Option<T> {
self.state.lock().unwrap().items.pop_front()
}
}