Skip to content

Commit

Permalink
Deflake RemoteCacheTest
Browse files Browse the repository at this point in the history
  • Loading branch information
coeuvre committed Aug 4, 2022
1 parent 11368be commit d6e266e
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,21 @@ public void ensureInputsPresent(

Flowable<TransferResult> uploads =
createUploadTasks(context, merkleTree, additionalInputs, allDigests, force)
.flatMap(uploadTasks -> findMissingBlobs(context, uploadTasks))
.flatMapPublisher(this::waitForUploadTasks);
.flatMapPublisher(
result ->
Flowable.using(
() -> result,
uploadTasks ->
findMissingBlobs(context, uploadTasks)
.flatMapPublisher(this::waitForUploadTasks),
uploadTasks -> {
for (UploadTask uploadTask : uploadTasks) {
Disposable d = uploadTask.disposable.getAndSet(null);
if (d != null) {
d.dispose();
}
}
}));

try {
mergeBulkTransfer(uploads).blockingAwait();
Expand Down Expand Up @@ -175,15 +188,7 @@ private Maybe<UploadTask> maybeCreateUploadTask(
UploadTask uploadTask = new UploadTask();
uploadTask.digest = digest;
uploadTask.disposable = new AtomicReference<>();
uploadTask.completion =
Completable.fromObservable(
completion.doOnDispose(
() -> {
Disposable d = uploadTask.disposable.getAndSet(null);
if (d != null) {
d.dispose();
}
}));
uploadTask.completion = Completable.fromObservable(completion);
Completable upload =
casUploadCache.execute(
digest,
Expand Down Expand Up @@ -238,44 +243,34 @@ private Single<List<UploadTask>> findMissingBlobs(
() -> Profiler.instance().profile("findMissingDigests"),
ignored ->
Single.fromObservable(
Observable.fromSingle(
toSingle(
() -> {
ImmutableList<Digest> digestsToQuery =
uploadTasks.stream()
.filter(uploadTask -> uploadTask.continuation != null)
.map(uploadTask -> uploadTask.digest)
.collect(toImmutableList());
if (digestsToQuery.isEmpty()) {
return immediateFuture(ImmutableSet.of());
}
return findMissingDigests(context, digestsToQuery);
},
directExecutor())
.map(
missingDigests -> {
for (UploadTask uploadTask : uploadTasks) {
if (uploadTask.continuation != null) {
uploadTask.continuation.onSuccess(
missingDigests.contains(uploadTask.digest));
}
}
return uploadTasks;
}))
// Use AsyncSubject so that if downstream is disposed, the
// findMissingDigests call is not cancelled (because it may be needed by
// other
// threads).
.subscribeWith(AsyncSubject.create()))
.doOnDispose(
() -> {
for (UploadTask uploadTask : uploadTasks) {
Disposable d = uploadTask.disposable.getAndSet(null);
if (d != null) {
d.dispose();
}
}
}),
Observable.fromSingle(
toSingle(
() -> {
ImmutableList<Digest> digestsToQuery =
uploadTasks.stream()
.filter(uploadTask -> uploadTask.continuation != null)
.map(uploadTask -> uploadTask.digest)
.collect(toImmutableList());
if (digestsToQuery.isEmpty()) {
return immediateFuture(ImmutableSet.of());
}
return findMissingDigests(context, digestsToQuery);
},
directExecutor())
.map(
missingDigests -> {
for (UploadTask uploadTask : uploadTasks) {
if (uploadTask.continuation != null) {
uploadTask.continuation.onSuccess(
missingDigests.contains(uploadTask.digest));
}
}
return uploadTasks;
}))
// Use AsyncSubject so that if downstream is disposed, the
// findMissingDigests call is not cancelled (because it may be needed by
// other threads).
.subscribeWith(AsyncSubject.create())),
SilentCloseable::close);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
Expand Down Expand Up @@ -64,9 +65,8 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -308,7 +308,7 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl
RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient());
RemoteExecutionCache remoteCache = spy(newRemoteExecutionCache(cacheProtocol));

List<SettableFuture<Void>> futures = new ArrayList<>();
Deque<SettableFuture<Void>> futures = new ConcurrentLinkedDeque<>();
CountDownLatch uploadBlobCalls = new CountDownLatch(2);
doAnswer(
invocationOnMock -> {
Expand Down Expand Up @@ -460,73 +460,108 @@ public void ensureInputsPresent_interruptedDuringUploadBlobs_cancelInProgressUpl
RemoteCacheClient cacheProtocol = spy(new InMemoryCacheClient());
RemoteExecutionCache remoteCache = spy(newRemoteExecutionCache(cacheProtocol));

List<SettableFuture<Void>> futures = new ArrayList<>();
ConcurrentLinkedDeque<SettableFuture<Void>> uploadBlobFutures = new ConcurrentLinkedDeque<>();
Map<Path, SettableFuture<Void>> uploadFileFutures = Maps.newConcurrentMap();
CountDownLatch uploadBlobCalls = new CountDownLatch(2);
CountDownLatch uploadFileCalls = new CountDownLatch(3);
doAnswer(
invocationOnMock -> {
SettableFuture<Void> future = SettableFuture.create();
futures.add(future);
uploadBlobFutures.add(future);
uploadBlobCalls.countDown();
return future;
})
.when(cacheProtocol)
.uploadBlob(any(), any(), any());
doAnswer(
invocationOnMock -> {
Path file = invocationOnMock.getArgument(2, Path.class);
SettableFuture<Void> future = SettableFuture.create();
futures.add(future);
uploadBlobCalls.countDown();
uploadFileFutures.put(file, future);
uploadFileCalls.countDown();
return future;
})
.when(cacheProtocol)
.uploadFile(any(), any(), any());

Path path = fs.getPath("/execroot/foo");
FileSystemUtils.writeContentAsLatin1(path, "bar");
SortedMap<PathFragment, Path> inputs = new TreeMap<>();
inputs.put(PathFragment.create("foo"), path);
MerkleTree merkleTree = MerkleTree.build(inputs, digestUtil);
Path foo = fs.getPath("/execroot/foo");
FileSystemUtils.writeContentAsLatin1(foo, "foo");
Path bar = fs.getPath("/execroot/bar");
FileSystemUtils.writeContentAsLatin1(bar, "bar");
Path qux = fs.getPath("/execroot/qux");
FileSystemUtils.writeContentAsLatin1(qux, "qux");

SortedMap<PathFragment, Path> input1 = new TreeMap<>();
input1.put(PathFragment.create("foo"), foo);
input1.put(PathFragment.create("bar"), bar);
MerkleTree merkleTree1 = MerkleTree.build(input1, digestUtil);

SortedMap<PathFragment, Path> input2 = new TreeMap<>();
input2.put(PathFragment.create("bar"), bar);
input2.put(PathFragment.create("qux"), qux);
MerkleTree merkleTree2 = MerkleTree.build(input2, digestUtil);

CountDownLatch ensureInputsPresentReturned = new CountDownLatch(2);
CountDownLatch ensureInterrupted = new CountDownLatch(1);
Runnable work =
() -> {
try {
remoteCache.ensureInputsPresent(context, merkleTree, ImmutableMap.of(), false);
} catch (IOException ignored) {
// ignored
} catch (InterruptedException e) {
ensureInterrupted.countDown();
} finally {
ensureInputsPresentReturned.countDown();
}
};
Thread thread1 = new Thread(work);
Thread thread2 = new Thread(work);
Thread thread1 =
new Thread(
() -> {
try {
remoteCache.ensureInputsPresent(context, merkleTree1, ImmutableMap.of(), false);
} catch (IOException ignored) {
// ignored
} catch (InterruptedException e) {
ensureInterrupted.countDown();
} finally {
ensureInputsPresentReturned.countDown();
}
});
Thread thread2 =
new Thread(
() -> {
try {
remoteCache.ensureInputsPresent(context, merkleTree2, ImmutableMap.of(), false);
} catch (InterruptedException | IOException ignored) {
// ignored
} finally {
ensureInputsPresentReturned.countDown();
}
});

// act
thread1.start();
thread2.start();
uploadBlobCalls.await();
assertThat(futures).hasSize(2);
assertThat(remoteCache.casUploadCache.getInProgressTasks()).hasSize(2);
uploadFileCalls.await();
assertThat(uploadBlobFutures).hasSize(2);
assertThat(uploadFileFutures).hasSize(3);
assertThat(remoteCache.casUploadCache.getInProgressTasks()).hasSize(5);

thread1.interrupt();
ensureInterrupted.await();

// assert
assertThat(remoteCache.casUploadCache.getInProgressTasks()).hasSize(2);
assertThat(remoteCache.casUploadCache.getInProgressTasks()).hasSize(3);
assertThat(remoteCache.casUploadCache.getFinishedTasks()).isEmpty();
for (SettableFuture<Void> future : futures) {
assertThat(future.isCancelled()).isFalse();
for (Map.Entry<Path, SettableFuture<Void>> entry : uploadFileFutures.entrySet()) {
Path file = entry.getKey();
SettableFuture<Void> future = entry.getValue();
if (file.equals(foo)) {
assertThat(future.isCancelled()).isTrue();
} else {
assertThat(future.isCancelled()).isFalse();
}
}

for (SettableFuture<Void> future : futures) {
for (SettableFuture<Void> future : uploadBlobFutures) {
future.set(null);
}
for (SettableFuture<Void> future : uploadFileFutures.values()) {
future.set(null);
}
ensureInputsPresentReturned.await();
assertThat(remoteCache.casUploadCache.getInProgressTasks()).isEmpty();
assertThat(remoteCache.casUploadCache.getFinishedTasks()).hasSize(2);
assertThat(remoteCache.casUploadCache.getFinishedTasks()).hasSize(3);
}

@Test
Expand Down

0 comments on commit d6e266e

Please sign in to comment.