Skip to content

Commit

Permalink
Properly preserve state when interrupted during a deletion. Also use …
Browse files Browse the repository at this point in the history
…best Bazel practices for reinitializing maps by reassigning, since just clearing can retain memory. When enqueuing deleted nodes, don't use more threads than there are nodes. Finally, fix a test accidentally changed in 3ea7559.

PiperOrigin-RevId: 392680387
  • Loading branch information
janakdr authored and copybara-github committed Aug 24, 2021
1 parent 11a985f commit 18bfed2
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.
package com.google.devtools.build.skyframe;

import static java.lang.Math.min;
import static java.util.concurrent.TimeUnit.MINUTES;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -215,24 +216,36 @@ public DirtyingInvalidationState() {
}

static final class DeletingInvalidationState extends InvalidationState {
private final ConcurrentHashMap<SkyKey, Boolean> doneKeysWithRdepsToRemove =
new ConcurrentHashMap<>(EXPECTED_PENDING_SET_SIZE, .75f, DEFAULT_THREAD_COUNT);
private ConcurrentHashMap<SkyKey, Boolean> doneKeysWithRdepsToRemove;
private ConcurrentHashMap<SkyKey, Boolean> visitedKeysAcrossInterruptions;

DeletingInvalidationState() {
super(InvalidationType.DELETED);
initializeFields();
}

private void initializeFields() {
doneKeysWithRdepsToRemove =
new ConcurrentHashMap<>(EXPECTED_PENDING_SET_SIZE, .75f, DEFAULT_THREAD_COUNT);
visitedKeysAcrossInterruptions =
new ConcurrentHashMap<>(EXPECTED_PENDING_SET_SIZE, .75f, DEFAULT_THREAD_COUNT);
}

@Override
boolean isEmpty() {
return super.isEmpty() && doneKeysWithRdepsToRemove.isEmpty();
}

void clear() {
initializeFields();
}
}

/** A node-deleting implementation. */
static final class DeletingNodeVisitor extends InvalidatingNodeVisitor<InMemoryGraph> {
private final Set<SkyKey> visited = Sets.newConcurrentHashSet();
private final boolean traverseGraph;
private final ConcurrentHashMap<SkyKey, Boolean> doneKeysWithRdepsToRemove;
private final DeletingInvalidationState state;

DeletingNodeVisitor(
InMemoryGraph graph,
Expand All @@ -245,7 +258,7 @@ static final class DeletingNodeVisitor extends InvalidatingNodeVisitor<InMemoryG
state,
NamedForkJoinPool.newNamedPool("deleting node visitor", DEFAULT_THREAD_COUNT));
this.traverseGraph = traverseGraph;
this.doneKeysWithRdepsToRemove = state.doneKeysWithRdepsToRemove;
this.state = state;
}

@Override
Expand All @@ -256,15 +269,16 @@ protected void runInternal(ImmutableList<Pair<SkyKey, InvalidationType>> pending
// To avoid contention and scheduling too many jobs for our #cpus, we start
// DEFAULT_THREAD_COUNT jobs, each processing a chunk of the pending visitations.
int listSize = pendingList.size();
for (int i = 0; i < DEFAULT_THREAD_COUNT; i++) {
int numThreads = min(DEFAULT_THREAD_COUNT, listSize);
for (int i = 0; i < numThreads; i++) {
int index = i;
executor.execute(
() ->
visit(
Iterables.transform(
pendingList.subList(
(index * listSize) / DEFAULT_THREAD_COUNT,
((index + 1) * listSize) / DEFAULT_THREAD_COUNT),
(index * listSize) / numThreads,
((index + 1) * listSize) / numThreads),
Pair::getFirst),
InvalidationType.DELETED));
}
Expand All @@ -275,16 +289,17 @@ protected void runInternal(ImmutableList<Pair<SkyKey, InvalidationType>> pending
}
try (AutoProfiler ignored =
GoogleAutoProfilerUtils.logged("reverse dep removal", MIN_TIME_FOR_LOGGING)) {
doneKeysWithRdepsToRemove.forEachEntry(
state.doneKeysWithRdepsToRemove.forEachEntry(
/*parallelismThreshold=*/ 1024,
e -> {
NodeEntry entry = graph.get(null, Reason.RDEP_REMOVAL, e.getKey());
if (entry == null) {
return;
}
entry.removeReverseDepsFromDoneEntryDueToDeletion(visited);
entry.removeReverseDepsFromDoneEntryDueToDeletion(
state.visitedKeysAcrossInterruptions.keySet());
});
doneKeysWithRdepsToRemove.clear();
state.clear();
}
}

Expand Down Expand Up @@ -350,7 +365,7 @@ public void visit(Iterable<SkyKey> keys, InvalidationType invalidationType) {
Iterables.filter(
directDeps,
k ->
!visited.contains(k)
!state.visitedKeysAcrossInterruptions.containsKey(k)
&& !pendingVisitations.contains(
Pair.of(k, InvalidationType.DELETED))));
if (!depMap.isEmpty()) {
Expand All @@ -363,7 +378,8 @@ public void visit(Iterable<SkyKey> keys, InvalidationType invalidationType) {
continue;
}
if (dep.isDone()) {
doneKeysWithRdepsToRemove.putIfAbsent(directDepEntry.getKey(), Boolean.TRUE);
state.doneKeysWithRdepsToRemove.putIfAbsent(
directDepEntry.getKey(), Boolean.TRUE);
continue;
}
if (!signalingDeps.contains(directDepEntry.getKey())) {
Expand Down Expand Up @@ -395,7 +411,8 @@ public void visit(Iterable<SkyKey> keys, InvalidationType invalidationType) {
// Actually remove the node.
graph.remove(key);

// Remove the node from the set as the last operation.
// Remove the node from the set and add it to global visited as the last operation.
state.visitedKeysAcrossInterruptions.put(key, Boolean.TRUE);
pendingVisitations.remove(invalidationPair);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,39 @@ public void deletingInsideForkJoinPoolWorks() throws Exception {
.get();
}

@Test
public void interruptRecoversNextTime() throws InterruptedException {
graph = new InMemoryGraphImpl();
SkyKey dep = GraphTester.nonHermeticKey("dep");
SkyKey toDelete = GraphTester.nonHermeticKey("top");
tester.getOrCreate(toDelete).addDependency(dep).setConstantValue(new StringValue("top"));
tester.set(dep, new StringValue("dep"));
eval(/*keepGoing=*/ false, toDelete);
Thread mainThread = Thread.currentThread();
assertThrows(
InterruptedException.class,
() ->
invalidateWithoutError(
new DirtyTrackingProgressReceiver(null) {
@Override
public void invalidated(SkyKey skyKey, InvalidationState state) {
mainThread.interrupt();
// Wait for the main thread to be interrupted uninterruptibly, because the
// main thread is going to interrupt us, and we don't want to get into an
// interrupt fight. Only if we get interrupted without the main thread also
// being interrupted will this throw an InterruptedException.
TrackingAwaiter.INSTANCE.awaitLatchAndTrackExceptions(
visitor.get().getInterruptionLatchForTestingOnly(),
"Main thread was not interrupted");
}
},
toDelete));
invalidateWithoutError(new DirtyTrackingProgressReceiver(null));
eval(/*keepGoing=*/ false, toDelete);
invalidateWithoutError(new DirtyTrackingProgressReceiver(null), toDelete);
eval(/*keepGoing=*/ false, toDelete);
}

@Test
public void interruptThreadInReceiver() throws Exception {
Random random = new Random(TestUtils.getRandomSeed());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class ReverseDepsUtilityTest {
@Parameters(name = "numElements-{0}")
public static List<Object[]> parameters() {
List<Object[]> params = new ArrayList<>();
for (int i = 1; i < 2; i++) {
for (int i = 0; i < 20; i++) {
params.add(new Object[] {i});
}
return params;
Expand Down Expand Up @@ -92,7 +92,7 @@ public void testDuplicateCheckOnGetReverseDeps() {
ReverseDepsUtility.addReverseDep(example, Key.create(0));
if (numElements == 0) {
// Will not throw.
assertThat(ReverseDepsUtility.getReverseDeps(example, /*checkConsistency=*/ true)).isEmpty();
assertThat(ReverseDepsUtility.getReverseDeps(example, /*checkConsistency=*/ true)).hasSize(1);
} else {
assertThrows(
RuntimeException.class,
Expand Down

0 comments on commit 18bfed2

Please sign in to comment.