Skip to content

Commit

Permalink
dag_walk: heads: use bfs and terminate if frontier has single item
Browse files Browse the repository at this point in the history
We use `heads_ok()` for finding the head operations when there are
multiple current op heads. The current DFS-based algortihm needs to
always walk all the way to the root. That can be expensive when the
operations are slow to retrieve. In the common case where there are
two operations close to each other in the graph, we should be able to
terminate the search once we've reached the common ancestor. This
patch replaces the DFS by a BFS and adds the early termination.
  • Loading branch information
martinvonz committed Jun 24, 2024
1 parent 2221742 commit dddba46
Showing 1 changed file with 27 additions and 9 deletions.
36 changes: 27 additions & 9 deletions lib/src/dag_walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -488,14 +488,28 @@ where
{
let start: Vec<T> = start.into_iter().try_collect()?;
let mut heads: HashSet<T> = start.iter().cloned().collect();
dfs_ok(start.into_iter().map(Ok), id_fn, |node| {
let neighbors: Vec<Result<T, E>> = neighbors_fn(node).into_iter().collect();
for neighbor in neighbors.iter().filter_map(|x| x.as_ref().ok()) {
heads.remove(neighbor);
// Do a BFS until we have only one item left in the frontier. That frontier must
// have originated from one of the heads, and since there can't be cycles,
// it won't be able to eliminate any other heads.
let mut frontier: Vec<T> = heads.iter().cloned().collect();
let mut visited: HashSet<ID> = heads.iter().map(&id_fn).collect();
let mut root_reached = false;
while frontier.len() > 1 || (!frontier.is_empty() && root_reached) {
frontier = frontier
.iter()
.flat_map(|node| {
let neighbors = neighbors_fn(node).into_iter().collect_vec();
if neighbors.is_empty() {
root_reached = true;
}
neighbors
})
.try_collect()?;
for node in &frontier {
heads.remove(node);
}
neighbors
})
.try_for_each(|r| r.map(|_| ()))?;
frontier.retain(|node| visited.insert(id_fn(node)));
}
Ok(heads)
}

Expand Down Expand Up @@ -1211,10 +1225,14 @@ mod tests {
let neighbors_fn = |node: &char| neighbors[node].clone();

let result = heads_ok([Ok('C')], id_fn, neighbors_fn);
assert_eq!(result, Err('X'));
assert_eq!(result, Ok(hashset! {'C'}));
let result = heads_ok([Ok('B')], id_fn, neighbors_fn);
assert_eq!(result, Err('X'));
assert_eq!(result, Ok(hashset! {'B'}));
let result = heads_ok([Ok('A')], id_fn, neighbors_fn);
assert_eq!(result, Ok(hashset! {'A'}));
let result = heads_ok([Ok('C'), Ok('B')], id_fn, neighbors_fn);
assert_eq!(result, Err('X'));
let result = heads_ok([Ok('C'), Ok('A')], id_fn, neighbors_fn);
assert_eq!(result, Err('X'));
}
}

0 comments on commit dddba46

Please sign in to comment.