From aa96766e81b5be6ec4b78a5adc705ba06dd8acdc Mon Sep 17 00:00:00 2001 From: Andreas Herrmann Date: Thu, 7 Oct 2021 16:35:15 +0200 Subject: [PATCH] Retry on HTTP remote cache fetch failure Bazel's previous behavior was to rebuild an artifact locally if fetching it from an HTTP remote cache failed. This behavior is different from GRPC remote cache case where Bazel will retry the fetch. The lack of retry is an issue for multiple reasons: On one hand rebuilding locally can be slower than fetching from the remote cache, on the other hand if a build action is not bit reproducible, as is the case with some compilers, then the local rebuild will trigger cache misses on further build actions that depend on the current artifact. This change aims to avoid theses issues by retrying the fetch in the HTTP cache case similarly to how the GRPC cache client does it. Some care needs to be taken due to the design of Bazel's internal remote cache client API. For a fetch the client is given an `OutputStream` object that it is expected to write the fetched data to. This may be a temporary file on disk that will be moved to the final location after the fetch completed. On retry, we need to be careful to not duplicate previously written data when writing into this `OutputStream`. Due to the generality of the `OutputStream` interface we cannot reset the file handle or write pointer to start fresh. Instead, this change follows the same pattern used in the GRPC cache client. Namely, keep track of the data previously written and continue from that offset on retry. With this change the HTTP cache client will attempt to fetch the data from the remote cache via an HTTP range request. So that the server only needs to send the data that is still missing. If the server replies with a 206 Partial Content response, then we write the received data directly into the output stream, if the server does not support range requests and instead replies with the full data, then we drop the duplicate prefix and only write into the output stream from the required offset. --- .../lib/remote/RemoteCacheClientFactory.java | 17 ++- .../build/lib/remote/RemoteModule.java | 24 +++- .../devtools/build/lib/remote/http/BUILD | 1 + .../lib/remote/http/DownloadCommand.java | 10 +- .../lib/remote/http/HttpCacheClient.java | 132 ++++++++++-------- .../lib/remote/http/HttpDownloadHandler.java | 77 ++++++++-- .../build/lib/remote/http/HttpException.java | 2 +- 7 files changed, 181 insertions(+), 82 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java index e5622a47672290..762fa9c70a41cb 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java @@ -58,15 +58,16 @@ public static RemoteCacheClient create( @Nullable Credentials creds, AuthAndTLSOptions authAndTlsOptions, Path workingDirectory, - DigestUtil digestUtil) + DigestUtil digestUtil, + RemoteRetrier retrier) throws IOException { Preconditions.checkNotNull(workingDirectory, "workingDirectory"); if (isHttpCache(options) && isDiskCache(options)) { return createDiskAndHttpCache( - workingDirectory, options.diskCache, options, creds, authAndTlsOptions, digestUtil); + workingDirectory, options.diskCache, options, creds, authAndTlsOptions, digestUtil, retrier); } if (isHttpCache(options)) { - return createHttp(options, creds, authAndTlsOptions, digestUtil); + return createHttp(options, creds, authAndTlsOptions, digestUtil, retrier); } if (isDiskCache(options)) { return createDiskCache( @@ -85,7 +86,8 @@ private static RemoteCacheClient createHttp( RemoteOptions options, Credentials creds, AuthAndTLSOptions authAndTlsOptions, - DigestUtil digestUtil) { + DigestUtil digestUtil, + RemoteRetrier retrier) { Preconditions.checkNotNull(options.remoteCache, "remoteCache"); try { @@ -104,6 +106,7 @@ private static RemoteCacheClient createHttp( options.remoteVerifyDownloads, ImmutableList.copyOf(options.remoteHeaders), digestUtil, + retrier, creds, authAndTlsOptions); } else { @@ -117,6 +120,7 @@ private static RemoteCacheClient createHttp( options.remoteVerifyDownloads, ImmutableList.copyOf(options.remoteHeaders), digestUtil, + retrier, creds, authAndTlsOptions); } @@ -145,7 +149,8 @@ private static RemoteCacheClient createDiskAndHttpCache( RemoteOptions options, Credentials cred, AuthAndTLSOptions authAndTlsOptions, - DigestUtil digestUtil) + DigestUtil digestUtil, + RemoteRetrier retrier) throws IOException { Path cacheDir = workingDirectory.getRelative(Preconditions.checkNotNull(diskCachePath, "diskCachePath")); @@ -153,7 +158,7 @@ private static RemoteCacheClient createDiskAndHttpCache( cacheDir.createDirectoryAndParents(); } - RemoteCacheClient httpCache = createHttp(options, cred, authAndTlsOptions, digestUtil); + RemoteCacheClient httpCache = createHttp(options, cred, authAndTlsOptions, digestUtil, retrier); return createDiskAndRemoteClient( workingDirectory, diskCachePath, diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 3bf61b3ef76a86..9ead7f86e02e73 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -64,9 +64,11 @@ import com.google.devtools.build.lib.exec.SpawnStrategyRegistry; import com.google.devtools.build.lib.packages.TargetUtils; import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement; +import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.downloader.GrpcRemoteDownloader; +import com.google.devtools.build.lib.remote.http.HttpException; import com.google.devtools.build.lib.remote.logging.LoggingInterceptor; import com.google.devtools.build.lib.remote.options.RemoteOptions; import com.google.devtools.build.lib.remote.options.RemoteOutputsMode; @@ -103,6 +105,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.channels.ClosedChannelException; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -233,7 +236,26 @@ private void initHttpAndDiskCache( creds, authAndTlsOptions, Preconditions.checkNotNull(env.getWorkingDirectory(), "workingDirectory"), - digestUtil); + digestUtil, + new RemoteRetrier( + remoteOptions, + (e) -> { + boolean retry = false; + if (e instanceof ClosedChannelException) { + retry = true; + } else if (e instanceof HttpException) { + retry = true; + } else if (e instanceof IOException) { + String msg = e.getMessage().toLowerCase(); + if (msg.contains("connection reset by peer")) { + retry = true; + } + } + return retry; + }, + retryScheduler, + Retrier.ALLOW_ALL_CALLS) + ); } catch (IOException e) { handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD index 67e7bd0a4689f4..05eed8d8fbeb93 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD @@ -21,6 +21,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/analysis:blaze_version_info", "//src/main/java/com/google/devtools/build/lib/authandtls", "//src/main/java/com/google/devtools/build/lib/remote/common", + "//src/main/java/com/google/devtools/build/lib/remote:Retrier", "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/java/com/google/devtools/build/lib/vfs", "//third_party:auth", diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java index a2e4abf9d83eb1..93843a91dc7966 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java @@ -25,12 +25,18 @@ final class DownloadCommand { private final boolean casDownload; private final Digest digest; private final OutputStream out; + private final long offset; - DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) { + DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out, long offset) { this.uri = Preconditions.checkNotNull(uri); this.casDownload = casDownload; this.digest = Preconditions.checkNotNull(digest); this.out = Preconditions.checkNotNull(out); + this.offset = offset; + } + + DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) { + this(uri, casDownload, digest, out, 0); } public URI uri() { @@ -48,4 +54,6 @@ public Digest digest() { public OutputStream out() { return out; } + + public long offset() { return offset; } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java index c5646258ce6f26..cf013b504b15ba 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; +import com.google.devtools.build.lib.remote.RemoteRetrier; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; @@ -83,6 +84,7 @@ import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -129,6 +131,7 @@ public final class HttpCacheClient implements RemoteCacheClient { private final boolean useTls; private final boolean verifyDownloads; private final DigestUtil digestUtil; + private final RemoteRetrier retrier; private final Object closeLock = new Object(); @@ -150,6 +153,7 @@ public static HttpCacheClient create( boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, + RemoteRetrier retrier, @Nullable final Credentials creds, AuthAndTLSOptions authAndTlsOptions) throws Exception { @@ -162,6 +166,7 @@ public static HttpCacheClient create( verifyDownloads, extraHttpHeaders, digestUtil, + retrier, creds, authAndTlsOptions, null); @@ -175,6 +180,7 @@ public static HttpCacheClient create( boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, + RemoteRetrier retrier, @Nullable final Credentials creds, AuthAndTLSOptions authAndTlsOptions) throws Exception { @@ -189,6 +195,7 @@ public static HttpCacheClient create( verifyDownloads, extraHttpHeaders, digestUtil, + retrier, creds, authAndTlsOptions, domainSocketAddress); @@ -202,6 +209,7 @@ public static HttpCacheClient create( verifyDownloads, extraHttpHeaders, digestUtil, + retrier, creds, authAndTlsOptions, domainSocketAddress); @@ -219,6 +227,7 @@ private HttpCacheClient( boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, + RemoteRetrier retrier, @Nullable final Credentials creds, AuthAndTLSOptions authAndTlsOptions, @Nullable SocketAddress socketAddress) @@ -284,6 +293,7 @@ public void channelCreated(Channel ch) { this.extraHttpHeaders = extraHttpHeaders; this.verifyDownloads = verifyDownloads; this.digestUtil = digestUtil; + this.retrier = retrier; } @SuppressWarnings("FutureReturnValueIgnored") @@ -460,6 +470,7 @@ public ListenableFuture downloadBlob( @SuppressWarnings("FutureReturnValueIgnored") private ListenableFuture get(Digest digest, final OutputStream out, boolean casDownload) { final AtomicBoolean dataWritten = new AtomicBoolean(); + AtomicLong bytesDownloaded = new AtomicLong(); OutputStream wrappedOut = new OutputStream() { // OutputStream.close() does nothing, which is what we want to ensure that the @@ -469,12 +480,14 @@ private ListenableFuture get(Digest digest, final OutputStream out, boolea @Override public void write(byte[] b, int offset, int length) throws IOException { dataWritten.set(true); + bytesDownloaded.addAndGet(length); out.write(b, offset, length); } @Override public void write(int b) throws IOException { dataWritten.set(true); + bytesDownloaded.incrementAndGet(); out.write(b); } @@ -483,57 +496,59 @@ public void flush() throws IOException { out.flush(); } }; - DownloadCommand downloadCmd = new DownloadCommand(uri, casDownload, digest, wrappedOut); - SettableFuture outerF = SettableFuture.create(); - acquireDownloadChannel() - .addListener( - (Future channelPromise) -> { - if (!channelPromise.isSuccess()) { - outerF.setException(channelPromise.cause()); - return; - } - - Channel ch = channelPromise.getNow(); - ch.writeAndFlush(downloadCmd) - .addListener( - (f) -> { - try { - if (f.isSuccess()) { - outerF.set(null); - } else { - Throwable cause = f.cause(); - // cause can be of type HttpException, because Netty uses - // Unsafe.throwException to - // re-throw a checked exception that hasn't been declared in the method - // signature. - if (cause instanceof HttpException) { - HttpResponse response = ((HttpException) cause).response(); - if (!dataWritten.get() && authTokenExpired(response)) { - // The error is due to an auth token having expired. Let's try - // again. - try { - refreshCredentials(); - getAfterCredentialRefresh(downloadCmd, outerF); - return; - } catch (IOException e) { - cause.addSuppressed(e); - } catch (RuntimeException e) { - logger.atWarning().withCause(e).log("Unexpected exception"); - cause.addSuppressed(e); - } - } else if (cacheMiss(response.status())) { - outerF.setException(new CacheNotFoundException(digest)); - return; - } - } - outerF.setException(cause); - } - } finally { - releaseDownloadChannel(ch); + return retrier.executeAsync(() -> { + DownloadCommand downloadCmd = new DownloadCommand(uri, casDownload, digest, wrappedOut, bytesDownloaded.get()); + SettableFuture outerF = SettableFuture.create(); + acquireDownloadChannel() + .addListener( + (Future channelPromise) -> { + if (!channelPromise.isSuccess()) { + outerF.setException(channelPromise.cause()); + return; } + + Channel ch = channelPromise.getNow(); + ch.writeAndFlush(downloadCmd) + .addListener( + (f) -> { + try { + if (f.isSuccess()) { + outerF.set(null); + } else { + Throwable cause = f.cause(); + // cause can be of type HttpException, because Netty uses + // Unsafe.throwException to + // re-throw a checked exception that hasn't been declared in the method + // signature. + if (cause instanceof HttpException) { + HttpResponse response = ((HttpException) cause).response(); + if (!dataWritten.get() && authTokenExpired(response)) { + // The error is due to an auth token having expired. Let's try + // again. + try { + refreshCredentials(); + getAfterCredentialRefresh(downloadCmd, outerF); + return; + } catch (IOException e) { + cause.addSuppressed(e); + } catch (RuntimeException e) { + logger.atWarning().withCause(e).log("Unexpected exception"); + cause.addSuppressed(e); + } + } else if (cacheMiss(response.status())) { + outerF.setException(new CacheNotFoundException(digest)); + return; + } + } + outerF.setException(cause); + } + } finally { + releaseDownloadChannel(ch); + } + }); }); - }); - return outerF; + return outerF; + }); } @SuppressWarnings("FutureReturnValueIgnored") @@ -670,20 +685,21 @@ private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture uploadFile( RemoteActionExecutionContext context, Digest digest, Path file) { - try { - return uploadAsync( - digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true); - } catch (IOException e) { - // Can be thrown from file.getInputStream. - return Futures.immediateFailedFuture(e); - } + return retrier.executeAsync(() -> { + try { + return uploadAsync(digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true); + } catch (IOException e) { + // Can be thrown from file.getInputStream. + return Futures.immediateFailedFuture(e); + } + }); } @Override public ListenableFuture uploadBlob( RemoteActionExecutionContext context, Digest digest, ByteString data) { - return uploadAsync( - digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true); + return retrier.executeAsync(() -> uploadAsync( + digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true)); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java index 50d83d138a1d66..f38dad965fb6e4 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java @@ -20,24 +20,18 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http.DefaultFullHttpRequest; -import io.netty.handler.codec.http.HttpContent; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpHeaderValues; -import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpObject; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpUtil; -import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.*; import io.netty.handler.timeout.ReadTimeoutException; import io.netty.util.internal.StringUtil; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.Map.Entry; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** ChannelHandler for downloads. */ final class HttpDownloadHandler extends AbstractHttpHandler { @@ -51,6 +45,10 @@ final class HttpDownloadHandler extends AbstractHttpHandler { private long contentLength = -1; /** the path header in the http request */ private String path; + /** the offset at which to download */ + private long offset; + /** the bytes to skip in a full or chunked response */ + private OptionalInt skipBytes; public HttpDownloadHandler( Credentials credentials, ImmutableList> extraHttpHeaders) { @@ -93,7 +91,25 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex if (contentLengthSet) { contentLength = HttpUtil.getContentLength(response); } - downloadSucceeded = response.status().equals(HttpResponseStatus.OK); + boolean full_content = response.status().equals(HttpResponseStatus.OK); + boolean partial_content = response.status().equals(HttpResponseStatus.PARTIAL_CONTENT); + if (full_content) { + if (offset != 0) { + // We requested a range but the server replied with a full response. + // We need to skip `offset` bytes of the response. + if (!skipBytes.isPresent()) { + // This is the first chunk, or the full response. + skipBytes = OptionalInt.of((int)offset); + } + } + } else if (partial_content) { + Optional error = validateContentRangeHeader(response.headers()); + if (error.isPresent()) { + failAndClose(error.get(), ctx); + return; + } + } + downloadSucceeded = full_content || partial_content; if (!downloadSucceeded) { out = new ByteArrayOutputStream(); } @@ -105,6 +121,15 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex ByteBuf content = ((HttpContent) msg).content(); int readableBytes = content.readableBytes(); + if (skipBytes.isPresent() && skipBytes.getAsInt() > 0) { + int skipNow = skipBytes.getAsInt(); + if (skipNow >= readableBytes) { + skipNow = readableBytes; + } + content.readerIndex(content.readerIndex() + skipNow); + skipBytes = OptionalInt.of(skipBytes.getAsInt() - skipNow); + readableBytes = readableBytes - skipNow; + } content.readBytes(out, readableBytes); bytesReceived += readableBytes; if (msg instanceof LastHttpContent) { @@ -137,7 +162,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) DownloadCommand cmd = (DownloadCommand) msg; out = cmd.out(); path = constructPath(cmd.uri(), cmd.digest().getHash(), cmd.casDownload()); - HttpRequest request = buildRequest(path, constructHost(cmd.uri())); + offset = cmd.offset(); + skipBytes = OptionalInt.empty(); + HttpRequest request = buildRequest(path, constructHost(cmd.uri()), cmd.offset()); addCredentialHeaders(request, cmd.uri()); addExtraRemoteHeaders(request); addUserAgentHeader(request); @@ -159,16 +186,36 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) { } } - private HttpRequest buildRequest(String path, String host) { + private HttpRequest buildRequest(String path, String host, long offset) { HttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path); httpRequest.headers().set(HttpHeaderNames.HOST, host); httpRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); httpRequest.headers().set(HttpHeaderNames.ACCEPT, "*/*"); httpRequest.headers().set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP); + if (offset != 0) { + httpRequest.headers().set(HttpHeaderNames.RANGE, String.format("%s=%d-", HttpHeaderValues.BYTES, offset)); + } return httpRequest; } + private Optional validateContentRangeHeader(HttpHeaders headers) { + if (!headers.contains(HttpHeaderNames.CONTENT_RANGE)) { + return Optional.of(new HttpException(response, "Missing 'Content-Range' header", null)); + } + Pattern pattern = Pattern.compile("bytes\\s+(?[0-9]+)-(?[0-9]+)/(?[0-9]*|\\*)"); + Matcher matcher = pattern.matcher(response.headers().get(HttpHeaderNames.CONTENT_RANGE)); + if (!matcher.matches()) { + return Optional.of(new HttpException(response, "Unexpected 'Content-Range' header", null)); + } + long start = Long.valueOf(matcher.group("start")); + if (start != offset) { + return Optional.of(new HttpException( + response, String.format("Unexpected 'Content-Range' start: Expected %d but got %d", offset, start), null)); + } + return Optional.empty(); + } + private void succeedAndReset(ChannelHandlerContext ctx) { // All resets must happen *before* completing the user promise. Otherwise there is a race // condition, where this handler can be reused even though it is closed. In addition, if reset diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java index 89fde56046a8d6..6a2bfd5a50b246 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java @@ -18,7 +18,7 @@ import java.io.IOException; /** An exception that propagates the http status. */ -final class HttpException extends IOException { +public final class HttpException extends IOException { private final HttpResponse response; HttpException(HttpResponse response, String message, Throwable cause) {