Skip to content

Commit

Permalink
store jobs priorities in the pending queue
Browse files Browse the repository at this point in the history
This cleans up the priority-sorted scheduling by removing the need
for a priority accessor that would hash the nodes, and allows inserting
in the queue at the correctly sorted position to remove the insert +
sort combination.
  • Loading branch information
lqd committed Sep 2, 2022
1 parent 9ef2958 commit 4ffe98a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 33 deletions.
27 changes: 15 additions & 12 deletions src/cargo/core/compiler/job_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ struct DrainState<'cfg> {
/// The list of jobs that we have not yet started executing, but have
/// retrieved from the `queue`. We eagerly pull jobs off the main queue to
/// allow us to request jobserver tokens pretty early.
pending_queue: Vec<(Unit, Job)>,
pending_queue: Vec<(Unit, Job, usize)>,
print: DiagnosticPrinter<'cfg>,

/// How many jobs we've finished
Expand Down Expand Up @@ -576,26 +576,29 @@ impl<'cfg> DrainState<'cfg> {
// possible that can run. Note that this is also the point where we
// start requesting job tokens. Each job after the first needs to
// request a token.
while let Some((unit, job)) = self.queue.dequeue() {
self.pending_queue.push((unit, job));
while let Some((unit, job, priority)) = self.queue.dequeue() {
// We want to keep the pieces of work in the `pending_queue` sorted
// by their priorities, and insert the current job at its correctly
// sorted position: following the lower priority jobs, and the ones
// with the same priority (since they were dequeued before the
// current one, we also keep that relation).
let idx = self
.pending_queue
.partition_point(|&(_, _, p)| p <= priority);
self.pending_queue.insert(idx, (unit, job, priority));
if self.active.len() + self.pending_queue.len() > 1 {
jobserver_helper.request_token();
}
}

// If multiple pieces of work are waiting in the pending queue, we can
// sort it according to their priorities: higher priorities should be
// scheduled sooner.
self.pending_queue
.sort_by_cached_key(|(unit, _)| self.queue.priority(unit));

// Now that we've learned of all possible work that we can execute
// try to spawn it so long as we've got a jobserver token which says
// we're able to perform some parallel work.
// The `pending_queue` is sorted in ascending priority order, and we're
// removing the highest priority items from its end.
// The `pending_queue` is sorted in ascending priority order, and we
// remove items from its end to schedule the highest priority items
// sooner.
while self.has_extra_tokens() && !self.pending_queue.is_empty() {
let (unit, job) = self.pending_queue.pop().unwrap();
let (unit, job, _) = self.pending_queue.pop().unwrap();
*self.counts.get_mut(&unit.pkg.package_id()).unwrap() -= 1;
if !cx.bcx.build_config.build_plan {
// Print out some nice progress information.
Expand Down
35 changes: 14 additions & 21 deletions src/cargo/util/dependency_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,15 @@ impl<N: Hash + Eq + Clone, E: Eq + Hash + Clone, V> DependencyQueue<N, E, V> {
///
/// A package is ready to be built when it has 0 un-built dependencies. If
/// `None` is returned then no packages are ready to be built.
pub fn dequeue(&mut self) -> Option<(N, V)> {
let key = self
pub fn dequeue(&mut self) -> Option<(N, V, usize)> {
let (key, priority) = self
.dep_map
.iter()
.filter(|(_, (deps, _))| deps.is_empty())
.map(|(key, _)| key.clone())
.max_by_key(|k| self.priority[k])?;
.map(|(key, _)| (key.clone(), self.priority[key]))
.max_by_key(|(_, priority)| *priority)?;
let (_, data) = self.dep_map.remove(&key).unwrap();
Some((key, data))
Some((key, data, priority))
}

/// Returns `true` if there are remaining packages to be built.
Expand All @@ -170,13 +170,6 @@ impl<N: Hash + Eq + Clone, E: Eq + Hash + Clone, V> DependencyQueue<N, E, V> {
self.dep_map.len()
}

/// Returns the relative priority of a node. Higher priorities should be scheduled sooner.
/// Currently computed as the transitive cost of the given node: its own, plus the cost of its
/// reverse dependencies.
pub(crate) fn priority(&self, node: &N) -> usize {
self.priority[node]
}

/// Indicate that something has finished.
///
/// Calling this function indicates that the `node` has produced `edge`. All
Expand Down Expand Up @@ -220,19 +213,19 @@ mod test {
q.queue(5, (), vec![(4, ()), (3, ())], 1);
q.queue_finished();

assert_eq!(q.dequeue(), Some((1, ())));
assert_eq!(q.dequeue(), Some((3, ())));
assert_eq!(q.dequeue(), Some((1, (), 5)));
assert_eq!(q.dequeue(), Some((3, (), 4)));
assert_eq!(q.dequeue(), None);
q.finish(&3, &());
assert_eq!(q.dequeue(), None);
q.finish(&1, &());
assert_eq!(q.dequeue(), Some((2, ())));
assert_eq!(q.dequeue(), Some((2, (), 4)));
assert_eq!(q.dequeue(), None);
q.finish(&2, &());
assert_eq!(q.dequeue(), Some((4, ())));
assert_eq!(q.dequeue(), Some((4, (), 3)));
assert_eq!(q.dequeue(), None);
q.finish(&4, &());
assert_eq!(q.dequeue(), Some((5, ())));
assert_eq!(q.dequeue(), Some((5, (), 2)));
}

#[test]
Expand All @@ -245,16 +238,16 @@ mod test {
q.queue(4, (), vec![(2, ()), (3, ())], 1);
q.queue_finished();

assert_eq!(q.dequeue(), Some((3, ())));
assert_eq!(q.dequeue(), Some((1, ())));
assert_eq!(q.dequeue(), Some((3, (), 9)));
assert_eq!(q.dequeue(), Some((1, (), 4)));
assert_eq!(q.dequeue(), None);
q.finish(&3, &());
assert_eq!(q.dequeue(), None);
q.finish(&1, &());
assert_eq!(q.dequeue(), Some((2, ())));
assert_eq!(q.dequeue(), Some((2, (), 3)));
assert_eq!(q.dequeue(), None);
q.finish(&2, &());
assert_eq!(q.dequeue(), Some((4, ())));
assert_eq!(q.dequeue(), Some((4, (), 2)));
assert_eq!(q.dequeue(), None);
q.finish(&4, &());
assert_eq!(q.dequeue(), None);
Expand Down

0 comments on commit 4ffe98a

Please sign in to comment.