Skip to content

Commit

Permalink
remote: made CombinedCache a composition of Disk and Http Cache
Browse files Browse the repository at this point in the history
Fixes #8052

The problem: CombinedCache is not working.
The cause: both `OnDiskBlobStore` and `HttpBlobStore` store action results differently from other actions, but not in the same way. The `OnDiskBlobStore` appends "ac_" prefix to the key, while the `HttpBlobStore` stores it in a different directory. Current implementation added "ac_" prefix for both blob stores, which is not correct for the `HttpBlobStore`.
The solution: treat actionResults' `gets` and `puts` in accordance with a particular Blob Store used.

The problem is described in #8052 and the root cause is described in [this](#8052 (comment)) and [this](#8052 (comment)) comments.

Closes #8141.

PiperOrigin-RevId: 247387377
  • Loading branch information
ishikhman authored and copybara-github committed May 9, 2019
1 parent 0320c5f commit 597e289
Show file tree
Hide file tree
Showing 12 changed files with 289 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public Digest uploadStream(Digest digest, InputStream in)
}

public boolean containsKey(Digest digest) throws IOException, InterruptedException {
return blobStore.containsKey(digest.getHash());
return blobStore.contains(digest.getHash());
}

@Override
Expand All @@ -206,7 +206,7 @@ private byte[] downloadActionResult(Digest digest) throws IOException, Interrupt
}
// This unconditionally downloads the whole blob into memory!
ByteArrayOutputStream out = new ByteArrayOutputStream();
boolean success = blobStore.getActionResult(digest.getHash(), out);
boolean success = getFromFuture(blobStore.getActionResult(digest.getHash(), out));
if (!success) {
throw new CacheNotFoundException(digest, digestUtil);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,16 @@ private static SimpleBlobStore createDiskCache(Path workingDirectory, PathFragme
private static SimpleBlobStore createCombinedCache(
Path workingDirectory, PathFragment diskCachePath, RemoteOptions options, Credentials cred)
throws IOException {

Path cacheDir =
workingDirectory.getRelative(Preconditions.checkNotNull(diskCachePath, "diskCachePath"));
if (!cacheDir.exists()) {
cacheDir.createDirectoryAndParents();
}
return new CombinedDiskHttpBlobStore(cacheDir, createHttp(options, cred));

OnDiskBlobStore diskCache = new OnDiskBlobStore(cacheDir);
SimpleBlobStore httpCache = createHttp(options, cred);
return new CombinedDiskHttpBlobStore(diskCache, httpCache);
}

private static boolean isDiskCache(RemoteOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,10 @@
package com.google.devtools.build.lib.remote.blobstore;

import com.google.common.base.Preconditions;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.vfs.Path;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
Expand All @@ -31,119 +27,122 @@

/**
* A {@link SimpleBlobStore} implementation combining two blob stores. A local disk blob store and a
* remote http blob store. If a blob isn't found in the first store, the second store is used, and
* the blob added to the first. Put puts the blob on both stores.
* remote blob store. If a blob isn't found in the first store, the second store is used, and the
* blob added to the first. Put puts the blob on both stores.
*/
public final class CombinedDiskHttpBlobStore extends OnDiskBlobStore {
public final class CombinedDiskHttpBlobStore implements SimpleBlobStore {
private static final Logger logger = Logger.getLogger(CombinedDiskHttpBlobStore.class.getName());
private final SimpleBlobStore bsHttp;

public CombinedDiskHttpBlobStore(Path root, SimpleBlobStore bsHttp) {
super(root);
this.bsHttp = Preconditions.checkNotNull(bsHttp);
private final SimpleBlobStore remoteCache;
private final OnDiskBlobStore diskCache;

public CombinedDiskHttpBlobStore(OnDiskBlobStore diskCache, SimpleBlobStore remoteCache) {
this.diskCache = Preconditions.checkNotNull(diskCache);
this.remoteCache = Preconditions.checkNotNull(remoteCache);
}

@Override
public boolean containsKey(String key) {
// HTTP cache does not support containsKey.
// Don't support it here either for predictable semantics.
throw new UnsupportedOperationException("HTTP Caching does not use this method.");
public boolean contains(String key) {
return diskCache.contains(key);
}

@Override
public ListenableFuture<Boolean> get(String key, OutputStream out) {
boolean foundOnDisk = super.containsKey(key);

if (foundOnDisk) {
return super.get(key, out);
} else {
// Write a temporary file first, and then rename, to avoid data corruption in case of a crash.
Path temp = toPath(UUID.randomUUID().toString());

OutputStream tempOut;
try {
tempOut = temp.getOutputStream();
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
ListenableFuture<Boolean> chained =
Futures.transformAsync(
bsHttp.get(key, tempOut),
(found) -> {
if (!found) {
return Futures.immediateFuture(false);
} else {
Path target = toPath(key);
// The following note and line is taken from OnDiskBlobStore.java
// TODO(ulfjack): Fsync temp here before we rename it to avoid data loss in the
// case of machine
// crashes (the OS may reorder the writes and the rename).
temp.renameTo(target);

SettableFuture<Boolean> f = SettableFuture.create();
try (InputStream in = target.getInputStream()) {
ByteStreams.copy(in, out);
f.set(true);
} catch (IOException e) {
f.setException(e);
}
return f;
}
},
MoreExecutors.directExecutor());
chained.addListener(
() -> {
try {
tempOut.close();
} catch (IOException e) {
// not sure what to do here, we either are here because of another exception being
// thrown,
// or we have successfully used the file we are trying (and failing) to close
logger.log(Level.WARNING, "Failed to close temporary file on get", e);
}
},
MoreExecutors.directExecutor());
return chained;
}
public boolean containsActionResult(String key) {
return diskCache.containsActionResult(key);
}

@Override
public boolean getActionResult(String key, OutputStream out)
public void put(String key, long length, InputStream in)
throws IOException, InterruptedException {
if (super.getActionResult(key, out)) {
return true;
diskCache.put(key, length, in);
try (InputStream inFile = diskCache.toPath(key, /* actionResult= */ false).getInputStream()) {
remoteCache.put(key, length, inFile);
}
}

try (ByteArrayOutputStream tmpOut = new ByteArrayOutputStream()) {
if (bsHttp.getActionResult(key, tmpOut)) {
byte[] tmp = tmpOut.toByteArray();
super.putActionResult(key, tmp);
ByteStreams.copy(new ByteArrayInputStream(tmp), out);
return true;
}
}
@Override
public void putActionResult(String key, byte[] in) throws IOException, InterruptedException {
diskCache.putActionResult(key, in);
remoteCache.putActionResult(key, in);
}

return false;
@Override
public void close() {
diskCache.close();
remoteCache.close();
}

@Override
public void put(String key, long length, InputStream in)
throws IOException, InterruptedException {
super.put(key, length, in);
try (InputStream inFile = toPath(key).getInputStream()) {
bsHttp.put(key, length, inFile);
public ListenableFuture<Boolean> get(String key, OutputStream out) {
return get(key, out, /* actionResult= */ false);
}

private ListenableFuture<Boolean> get(String key, OutputStream out, boolean actionResult) {
boolean foundOnDisk =
actionResult ? diskCache.containsActionResult(key) : diskCache.contains(key);

if (foundOnDisk) {
return getFromCache(diskCache, key, out, actionResult);
} else {
return getFromRemoteAndSaveToDisk(key, out, actionResult);
}
}

@Override
public void putActionResult(String key, byte[] in) throws IOException, InterruptedException {
super.putActionResult(key, in);
bsHttp.putActionResult(key, in);
public ListenableFuture<Boolean> getActionResult(String key, OutputStream out) {
return get(key, out, /* actionResult= */ true);
}

@Override
public void close() {
super.close();
bsHttp.close();
private ListenableFuture<Boolean> getFromRemoteAndSaveToDisk(
String key, OutputStream out, boolean actionResult) {
// Write a temporary file first, and then rename, to avoid data corruption in case of a crash.
Path temp = diskCache.toPath(UUID.randomUUID().toString(), /* actionResult= */ false);

OutputStream tempOut;
try {
tempOut = temp.getOutputStream();
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
ListenableFuture<Boolean> chained =
Futures.transformAsync(
getFromCache(remoteCache, key, tempOut, actionResult),
(found) -> {
if (!found) {
return Futures.immediateFuture(false);
} else {
saveToDiskCache(key, temp, actionResult);
return getFromCache(diskCache, key, out, actionResult);
}
},
MoreExecutors.directExecutor());
chained.addListener(
() -> {
try {
tempOut.close();
} catch (IOException e) {
// not sure what to do here, we either are here because of another exception being
// thrown, or we have successfully used the file we are trying (and failing) to close
logger.log(Level.WARNING, "Failed to close temporary file on get", e);
}
},
MoreExecutors.directExecutor());
return chained;
}

private void saveToDiskCache(String key, Path temp, boolean actionResult) throws IOException {
Path target = diskCache.toPath(key, actionResult);
// TODO(ulfjack): Fsync temp here before we rename it to avoid data loss in the
// case of machine crashes (the OS may reorder the writes and the rename).
temp.renameTo(target);
}

private ListenableFuture<Boolean> getFromCache(
SimpleBlobStore blobStore, String key, OutputStream tempOut, boolean actionResult) {
if (!actionResult) {
return blobStore.get(key, tempOut);
} else {
return blobStore.getActionResult(key, tempOut);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;

/** A {@link SimpleBlobStore} implementation using a {@link ConcurrentMap}. */
public final class ConcurrentMapBlobStore implements SimpleBlobStore {
Expand All @@ -33,10 +32,15 @@ public ConcurrentMapBlobStore(ConcurrentMap<String, byte[]> map) {
}

@Override
public boolean containsKey(String key) {
public boolean contains(String key) {
return map.containsKey(key);
}

@Override
public boolean containsActionResult(String key) {
return map.containsKey(ACTION_KEY_PREFIX + key);
}

@Override
public ListenableFuture<Boolean> get(String key, OutputStream out) {
byte[] data = map.get(key);
Expand All @@ -55,9 +59,8 @@ public ListenableFuture<Boolean> get(String key, OutputStream out) {
}

@Override
public boolean getActionResult(String key, OutputStream out)
throws IOException, InterruptedException {
return getFromFuture(get(ACTION_KEY_PREFIX + key, out));
public ListenableFuture<Boolean> getActionResult(String key, OutputStream out) {
return get(ACTION_KEY_PREFIX + key, out);
}

@Override
Expand All @@ -75,15 +78,4 @@ public void putActionResult(String key, byte[] in) {
@Override
public void close() {}

private static <T> T getFromFuture(ListenableFuture<T> f)
throws IOException, InterruptedException {
try {
return f.get();
} catch (ExecutionException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
}
throw new IOException(e.getCause());
}
}
}
Loading

0 comments on commit 597e289

Please sign in to comment.