From f59c516cb1a2bf29a44ed257461a4f0933de47ac Mon Sep 17 00:00:00 2001 From: Andreas Herrmann Date: Thu, 7 Oct 2021 16:35:15 +0200 Subject: [PATCH 01/16] Retry on HTTP remote cache fetch failure Bazel's previous behavior was to rebuild an artifact locally if fetching it from an HTTP remote cache failed. This behavior is different from GRPC remote cache case where Bazel will retry the fetch. The lack of retry is an issue for multiple reasons: On one hand rebuilding locally can be slower than fetching from the remote cache, on the other hand if a build action is not bit reproducible, as is the case with some compilers, then the local rebuild will trigger cache misses on further build actions that depend on the current artifact. This change aims to avoid theses issues by retrying the fetch in the HTTP cache case similarly to how the GRPC cache client does it. Some care needs to be taken due to the design of Bazel's internal remote cache client API. For a fetch the client is given an `OutputStream` object that it is expected to write the fetched data to. This may be a temporary file on disk that will be moved to the final location after the fetch completed. On retry, we need to be careful to not duplicate previously written data when writing into this `OutputStream`. Due to the generality of the `OutputStream` interface we cannot reset the file handle or write pointer to start fresh. Instead, this change follows the same pattern used in the GRPC cache client. Namely, keep track of the data previously written and continue from that offset on retry. With this change the HTTP cache client will attempt to fetch the data from the remote cache via an HTTP range request. So that the server only needs to send the data that is still missing. If the server replies with a 206 Partial Content response, then we write the received data directly into the output stream, if the server does not support range requests and instead replies with the full data, then we drop the duplicate prefix and only write into the output stream from the required offset. --- .../lib/remote/RemoteCacheClientFactory.java | 17 ++- .../build/lib/remote/RemoteModule.java | 24 +++- .../devtools/build/lib/remote/http/BUILD | 1 + .../lib/remote/http/DownloadCommand.java | 10 +- .../lib/remote/http/HttpCacheClient.java | 132 ++++++++++-------- .../lib/remote/http/HttpDownloadHandler.java | 77 ++++++++-- .../build/lib/remote/http/HttpException.java | 2 +- 7 files changed, 181 insertions(+), 82 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java index 8f68923f9fffcd..2fdc420841b39d 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java @@ -59,15 +59,16 @@ public static RemoteCacheClient create( @Nullable Credentials creds, AuthAndTLSOptions authAndTlsOptions, Path workingDirectory, - DigestUtil digestUtil) + DigestUtil digestUtil, + RemoteRetrier retrier) throws IOException { Preconditions.checkNotNull(workingDirectory, "workingDirectory"); if (isHttpCache(options) && isDiskCache(options)) { return createDiskAndHttpCache( - workingDirectory, options.diskCache, options, creds, authAndTlsOptions, digestUtil); + workingDirectory, options.diskCache, options, creds, authAndTlsOptions, digestUtil, retrier); } if (isHttpCache(options)) { - return createHttp(options, creds, authAndTlsOptions, digestUtil); + return createHttp(options, creds, authAndTlsOptions, digestUtil, retrier); } if (isDiskCache(options)) { return createDiskCache( @@ -90,7 +91,8 @@ private static RemoteCacheClient createHttp( RemoteOptions options, Credentials creds, AuthAndTLSOptions authAndTlsOptions, - DigestUtil digestUtil) { + DigestUtil digestUtil, + RemoteRetrier retrier) { Preconditions.checkNotNull(options.remoteCache, "remoteCache"); try { @@ -109,6 +111,7 @@ private static RemoteCacheClient createHttp( options.remoteVerifyDownloads, ImmutableList.copyOf(options.remoteHeaders), digestUtil, + retrier, creds, authAndTlsOptions); } else { @@ -122,6 +125,7 @@ private static RemoteCacheClient createHttp( options.remoteVerifyDownloads, ImmutableList.copyOf(options.remoteHeaders), digestUtil, + retrier, creds, authAndTlsOptions); } @@ -151,7 +155,8 @@ private static RemoteCacheClient createDiskAndHttpCache( RemoteOptions options, Credentials cred, AuthAndTLSOptions authAndTlsOptions, - DigestUtil digestUtil) + DigestUtil digestUtil, + RemoteRetrier retrier) throws IOException { Path cacheDir = workingDirectory.getRelative(Preconditions.checkNotNull(diskCachePath, "diskCachePath")); @@ -159,7 +164,7 @@ private static RemoteCacheClient createDiskAndHttpCache( cacheDir.createDirectoryAndParents(); } - RemoteCacheClient httpCache = createHttp(options, cred, authAndTlsOptions, digestUtil); + RemoteCacheClient httpCache = createHttp(options, cred, authAndTlsOptions, digestUtil, retrier); return createDiskAndRemoteClient( workingDirectory, diskCachePath, diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 8026972836899f..65a216a96ce91c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -59,9 +59,11 @@ import com.google.devtools.build.lib.exec.ModuleActionContextRegistry; import com.google.devtools.build.lib.exec.SpawnStrategyRegistry; import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement; +import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.downloader.GrpcRemoteDownloader; +import com.google.devtools.build.lib.remote.http.HttpException; import com.google.devtools.build.lib.remote.logging.LoggingInterceptor; import com.google.devtools.build.lib.remote.options.RemoteBuildEventUploadMode; import com.google.devtools.build.lib.remote.options.RemoteOptions; @@ -105,6 +107,7 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.channels.ClosedChannelException; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; @@ -230,7 +233,26 @@ private void initHttpAndDiskCache( credentials, authAndTlsOptions, Preconditions.checkNotNull(env.getWorkingDirectory(), "workingDirectory"), - digestUtil); + digestUtil, + new RemoteRetrier( + remoteOptions, + (e) -> { + boolean retry = false; + if (e instanceof ClosedChannelException) { + retry = true; + } else if (e instanceof HttpException) { + retry = true; + } else if (e instanceof IOException) { + String msg = e.getMessage().toLowerCase(); + if (msg.contains("connection reset by peer")) { + retry = true; + } + } + return retry; + }, + retryScheduler, + Retrier.ALLOW_ALL_CALLS) + ); } catch (IOException e) { handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD index 67e7bd0a4689f4..05eed8d8fbeb93 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD @@ -21,6 +21,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/analysis:blaze_version_info", "//src/main/java/com/google/devtools/build/lib/authandtls", "//src/main/java/com/google/devtools/build/lib/remote/common", + "//src/main/java/com/google/devtools/build/lib/remote:Retrier", "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/java/com/google/devtools/build/lib/vfs", "//third_party:auth", diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java index a2e4abf9d83eb1..93843a91dc7966 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java @@ -25,12 +25,18 @@ final class DownloadCommand { private final boolean casDownload; private final Digest digest; private final OutputStream out; + private final long offset; - DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) { + DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out, long offset) { this.uri = Preconditions.checkNotNull(uri); this.casDownload = casDownload; this.digest = Preconditions.checkNotNull(digest); this.out = Preconditions.checkNotNull(out); + this.offset = offset; + } + + DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) { + this(uri, casDownload, digest, out, 0); } public URI uri() { @@ -48,4 +54,6 @@ public Digest digest() { public OutputStream out() { return out; } + + public long offset() { return offset; } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java index 6c6d4cc6f897fc..047739ddfe888d 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; +import com.google.devtools.build.lib.remote.RemoteRetrier; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; @@ -83,6 +84,7 @@ import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -129,6 +131,7 @@ public final class HttpCacheClient implements RemoteCacheClient { private final boolean useTls; private final boolean verifyDownloads; private final DigestUtil digestUtil; + private final RemoteRetrier retrier; private final Object closeLock = new Object(); @@ -150,6 +153,7 @@ public static HttpCacheClient create( boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, + RemoteRetrier retrier, @Nullable final Credentials creds, AuthAndTLSOptions authAndTlsOptions) throws Exception { @@ -162,6 +166,7 @@ public static HttpCacheClient create( verifyDownloads, extraHttpHeaders, digestUtil, + retrier, creds, authAndTlsOptions, null); @@ -175,6 +180,7 @@ public static HttpCacheClient create( boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, + RemoteRetrier retrier, @Nullable final Credentials creds, AuthAndTLSOptions authAndTlsOptions) throws Exception { @@ -189,6 +195,7 @@ public static HttpCacheClient create( verifyDownloads, extraHttpHeaders, digestUtil, + retrier, creds, authAndTlsOptions, domainSocketAddress); @@ -202,6 +209,7 @@ public static HttpCacheClient create( verifyDownloads, extraHttpHeaders, digestUtil, + retrier, creds, authAndTlsOptions, domainSocketAddress); @@ -219,6 +227,7 @@ private HttpCacheClient( boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, + RemoteRetrier retrier, @Nullable final Credentials creds, AuthAndTLSOptions authAndTlsOptions, @Nullable SocketAddress socketAddress) @@ -284,6 +293,7 @@ public void channelCreated(Channel ch) { this.extraHttpHeaders = extraHttpHeaders; this.verifyDownloads = verifyDownloads; this.digestUtil = digestUtil; + this.retrier = retrier; } @SuppressWarnings("FutureReturnValueIgnored") @@ -460,6 +470,7 @@ public ListenableFuture downloadBlob( @SuppressWarnings("FutureReturnValueIgnored") private ListenableFuture get(Digest digest, final OutputStream out, boolean casDownload) { final AtomicBoolean dataWritten = new AtomicBoolean(); + AtomicLong bytesDownloaded = new AtomicLong(); OutputStream wrappedOut = new OutputStream() { // OutputStream.close() does nothing, which is what we want to ensure that the @@ -469,12 +480,14 @@ private ListenableFuture get(Digest digest, final OutputStream out, boolea @Override public void write(byte[] b, int offset, int length) throws IOException { dataWritten.set(true); + bytesDownloaded.addAndGet(length); out.write(b, offset, length); } @Override public void write(int b) throws IOException { dataWritten.set(true); + bytesDownloaded.incrementAndGet(); out.write(b); } @@ -483,57 +496,59 @@ public void flush() throws IOException { out.flush(); } }; - DownloadCommand downloadCmd = new DownloadCommand(uri, casDownload, digest, wrappedOut); - SettableFuture outerF = SettableFuture.create(); - acquireDownloadChannel() - .addListener( - (Future channelPromise) -> { - if (!channelPromise.isSuccess()) { - outerF.setException(channelPromise.cause()); - return; - } - - Channel ch = channelPromise.getNow(); - ch.writeAndFlush(downloadCmd) - .addListener( - (f) -> { - try { - if (f.isSuccess()) { - outerF.set(null); - } else { - Throwable cause = f.cause(); - // cause can be of type HttpException, because Netty uses - // Unsafe.throwException to - // re-throw a checked exception that hasn't been declared in the method - // signature. - if (cause instanceof HttpException) { - HttpResponse response = ((HttpException) cause).response(); - if (!dataWritten.get() && authTokenExpired(response)) { - // The error is due to an auth token having expired. Let's try - // again. - try { - refreshCredentials(); - getAfterCredentialRefresh(downloadCmd, outerF); - return; - } catch (IOException e) { - cause.addSuppressed(e); - } catch (RuntimeException e) { - logger.atWarning().withCause(e).log("Unexpected exception"); - cause.addSuppressed(e); - } - } else if (cacheMiss(response.status())) { - outerF.setException(new CacheNotFoundException(digest)); - return; - } - } - outerF.setException(cause); - } - } finally { - releaseDownloadChannel(ch); + return retrier.executeAsync(() -> { + DownloadCommand downloadCmd = new DownloadCommand(uri, casDownload, digest, wrappedOut, bytesDownloaded.get()); + SettableFuture outerF = SettableFuture.create(); + acquireDownloadChannel() + .addListener( + (Future channelPromise) -> { + if (!channelPromise.isSuccess()) { + outerF.setException(channelPromise.cause()); + return; } + + Channel ch = channelPromise.getNow(); + ch.writeAndFlush(downloadCmd) + .addListener( + (f) -> { + try { + if (f.isSuccess()) { + outerF.set(null); + } else { + Throwable cause = f.cause(); + // cause can be of type HttpException, because Netty uses + // Unsafe.throwException to + // re-throw a checked exception that hasn't been declared in the method + // signature. + if (cause instanceof HttpException) { + HttpResponse response = ((HttpException) cause).response(); + if (!dataWritten.get() && authTokenExpired(response)) { + // The error is due to an auth token having expired. Let's try + // again. + try { + refreshCredentials(); + getAfterCredentialRefresh(downloadCmd, outerF); + return; + } catch (IOException e) { + cause.addSuppressed(e); + } catch (RuntimeException e) { + logger.atWarning().withCause(e).log("Unexpected exception"); + cause.addSuppressed(e); + } + } else if (cacheMiss(response.status())) { + outerF.setException(new CacheNotFoundException(digest)); + return; + } + } + outerF.setException(cause); + } + } finally { + releaseDownloadChannel(ch); + } + }); }); - }); - return outerF; + return outerF; + }); } @SuppressWarnings("FutureReturnValueIgnored") @@ -670,20 +685,21 @@ private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture uploadFile( RemoteActionExecutionContext context, Digest digest, Path file) { - try { - return uploadAsync( - digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true); - } catch (IOException e) { - // Can be thrown from file.getInputStream. - return Futures.immediateFailedFuture(e); - } + return retrier.executeAsync(() -> { + try { + return uploadAsync(digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true); + } catch (IOException e) { + // Can be thrown from file.getInputStream. + return Futures.immediateFailedFuture(e); + } + }); } @Override public ListenableFuture uploadBlob( RemoteActionExecutionContext context, Digest digest, ByteString data) { - return uploadAsync( - digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true); + return retrier.executeAsync(() -> uploadAsync( + digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true)); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java index 50d83d138a1d66..f38dad965fb6e4 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java @@ -20,24 +20,18 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http.DefaultFullHttpRequest; -import io.netty.handler.codec.http.HttpContent; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpHeaderValues; -import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpObject; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpUtil; -import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.*; import io.netty.handler.timeout.ReadTimeoutException; import io.netty.util.internal.StringUtil; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.Map.Entry; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** ChannelHandler for downloads. */ final class HttpDownloadHandler extends AbstractHttpHandler { @@ -51,6 +45,10 @@ final class HttpDownloadHandler extends AbstractHttpHandler { private long contentLength = -1; /** the path header in the http request */ private String path; + /** the offset at which to download */ + private long offset; + /** the bytes to skip in a full or chunked response */ + private OptionalInt skipBytes; public HttpDownloadHandler( Credentials credentials, ImmutableList> extraHttpHeaders) { @@ -93,7 +91,25 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex if (contentLengthSet) { contentLength = HttpUtil.getContentLength(response); } - downloadSucceeded = response.status().equals(HttpResponseStatus.OK); + boolean full_content = response.status().equals(HttpResponseStatus.OK); + boolean partial_content = response.status().equals(HttpResponseStatus.PARTIAL_CONTENT); + if (full_content) { + if (offset != 0) { + // We requested a range but the server replied with a full response. + // We need to skip `offset` bytes of the response. + if (!skipBytes.isPresent()) { + // This is the first chunk, or the full response. + skipBytes = OptionalInt.of((int)offset); + } + } + } else if (partial_content) { + Optional error = validateContentRangeHeader(response.headers()); + if (error.isPresent()) { + failAndClose(error.get(), ctx); + return; + } + } + downloadSucceeded = full_content || partial_content; if (!downloadSucceeded) { out = new ByteArrayOutputStream(); } @@ -105,6 +121,15 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex ByteBuf content = ((HttpContent) msg).content(); int readableBytes = content.readableBytes(); + if (skipBytes.isPresent() && skipBytes.getAsInt() > 0) { + int skipNow = skipBytes.getAsInt(); + if (skipNow >= readableBytes) { + skipNow = readableBytes; + } + content.readerIndex(content.readerIndex() + skipNow); + skipBytes = OptionalInt.of(skipBytes.getAsInt() - skipNow); + readableBytes = readableBytes - skipNow; + } content.readBytes(out, readableBytes); bytesReceived += readableBytes; if (msg instanceof LastHttpContent) { @@ -137,7 +162,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) DownloadCommand cmd = (DownloadCommand) msg; out = cmd.out(); path = constructPath(cmd.uri(), cmd.digest().getHash(), cmd.casDownload()); - HttpRequest request = buildRequest(path, constructHost(cmd.uri())); + offset = cmd.offset(); + skipBytes = OptionalInt.empty(); + HttpRequest request = buildRequest(path, constructHost(cmd.uri()), cmd.offset()); addCredentialHeaders(request, cmd.uri()); addExtraRemoteHeaders(request); addUserAgentHeader(request); @@ -159,16 +186,36 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) { } } - private HttpRequest buildRequest(String path, String host) { + private HttpRequest buildRequest(String path, String host, long offset) { HttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path); httpRequest.headers().set(HttpHeaderNames.HOST, host); httpRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); httpRequest.headers().set(HttpHeaderNames.ACCEPT, "*/*"); httpRequest.headers().set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP); + if (offset != 0) { + httpRequest.headers().set(HttpHeaderNames.RANGE, String.format("%s=%d-", HttpHeaderValues.BYTES, offset)); + } return httpRequest; } + private Optional validateContentRangeHeader(HttpHeaders headers) { + if (!headers.contains(HttpHeaderNames.CONTENT_RANGE)) { + return Optional.of(new HttpException(response, "Missing 'Content-Range' header", null)); + } + Pattern pattern = Pattern.compile("bytes\\s+(?[0-9]+)-(?[0-9]+)/(?[0-9]*|\\*)"); + Matcher matcher = pattern.matcher(response.headers().get(HttpHeaderNames.CONTENT_RANGE)); + if (!matcher.matches()) { + return Optional.of(new HttpException(response, "Unexpected 'Content-Range' header", null)); + } + long start = Long.valueOf(matcher.group("start")); + if (start != offset) { + return Optional.of(new HttpException( + response, String.format("Unexpected 'Content-Range' start: Expected %d but got %d", offset, start), null)); + } + return Optional.empty(); + } + private void succeedAndReset(ChannelHandlerContext ctx) { // All resets must happen *before* completing the user promise. Otherwise there is a race // condition, where this handler can be reused even though it is closed. In addition, if reset diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java index 89fde56046a8d6..6a2bfd5a50b246 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java @@ -18,7 +18,7 @@ import java.io.IOException; /** An exception that propagates the http status. */ -final class HttpException extends IOException { +public final class HttpException extends IOException { private final HttpResponse response; HttpException(HttpResponse response, String message, Throwable cause) { From 69300dd11901004db54126f70645908d1b2dbca2 Mon Sep 17 00:00:00 2001 From: Andreas Herrmann Date: Fri, 8 Oct 2021 17:33:32 +0200 Subject: [PATCH 02/16] Fix test build --- .../remote/RemoteCacheClientFactoryTest.java | 27 ++++++++++++++----- .../devtools/build/lib/remote/http/BUILD | 1 + .../lib/remote/http/HttpCacheClientTest.java | 14 ++++++++++ 3 files changed, 35 insertions(+), 7 deletions(-) diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactoryTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactoryTest.java index 4a44a5acc7e688..4f4927ca16c048 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactoryTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactoryTest.java @@ -18,6 +18,8 @@ import static org.junit.Assert.assertThrows; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.clock.JavaClock; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.disk.DiskAndRemoteCacheClient; @@ -32,6 +34,8 @@ import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; import com.google.devtools.common.options.Options; import java.io.IOException; +import java.util.concurrent.Executors; + import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -47,6 +51,13 @@ public class RemoteCacheClientFactoryTest { private final AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); private Path workingDirectory; private InMemoryFileSystem fs; + private ListeningScheduledExecutorService retryScheduler = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + private RemoteRetrier retrier = new RemoteRetrier( + () -> RemoteRetrier.RETRIES_DISABLED, + (e) -> false, + retryScheduler, + Retrier.ALLOW_ALL_CALLS); @Before public final void setUp() { @@ -63,7 +74,7 @@ public void createCombinedCacheWithExistingWorkingDirectory() throws IOException RemoteCacheClient blobStore = RemoteCacheClientFactory.create( - remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil); + remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil, retrier); assertThat(blobStore).isInstanceOf(DiskAndRemoteCacheClient.class); } @@ -76,7 +87,7 @@ public void createCombinedCacheWithNotExistingWorkingDirectory() throws IOExcept RemoteCacheClient blobStore = RemoteCacheClientFactory.create( - remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil); + remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil, retrier); assertThat(blobStore).isInstanceOf(DiskAndRemoteCacheClient.class); assertThat(workingDirectory.exists()).isTrue(); @@ -96,7 +107,8 @@ public void createCombinedCacheWithMissingWorkingDirectoryShouldThrowException() /* creds= */ null, authAndTlsOptions, /* workingDirectory= */ null, - digestUtil)); + digestUtil, + retrier)); } @Test @@ -106,7 +118,7 @@ public void createHttpCacheWithProxy() throws IOException { RemoteCacheClient blobStore = RemoteCacheClientFactory.create( - remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil); + remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil, retrier); assertThat(blobStore).isInstanceOf(HttpCacheClient.class); } @@ -125,7 +137,8 @@ public void createHttpCacheFailsWithUnsupportedProxyProtocol() { /* creds= */ null, authAndTlsOptions, workingDirectory, - digestUtil))) + digestUtil, + retrier))) .hasMessageThat() .contains("Remote cache proxy unsupported: bad-proxy"); } @@ -136,7 +149,7 @@ public void createHttpCacheWithoutProxy() throws IOException { RemoteCacheClient blobStore = RemoteCacheClientFactory.create( - remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil); + remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil, retrier); assertThat(blobStore).isInstanceOf(HttpCacheClient.class); } @@ -147,7 +160,7 @@ public void createDiskCache() throws IOException { RemoteCacheClient blobStore = RemoteCacheClientFactory.create( - remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil); + remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil, retrier); assertThat(blobStore).isInstanceOf(DiskCacheClient.class); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/BUILD b/src/test/java/com/google/devtools/build/lib/remote/http/BUILD index 1486dda102a960..d0274727b8fca9 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/http/BUILD @@ -21,6 +21,7 @@ java_test( test_class = "com.google.devtools.build.lib.AllTests", deps = [ "//src/main/java/com/google/devtools/build/lib/authandtls", + "//src/main/java/com/google/devtools/build/lib/remote:Retrier", "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/http", "//src/main/java/com/google/devtools/build/lib/remote/util", diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java index 41cddce1827849..b92df44c7c8d1d 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java @@ -33,6 +33,10 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.devtools.build.lib.remote.RemoteRetrier; +import com.google.devtools.build.lib.remote.Retrier; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; @@ -90,6 +94,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.function.IntFunction; import javax.annotation.Nullable; import org.junit.Before; @@ -265,6 +270,13 @@ private HttpCacheClient createHttpBlobStore( AuthAndTLSOptions authAndTlsOptions) throws Exception { SocketAddress socketAddress = serverChannel.localAddress(); + ListeningScheduledExecutorService retryScheduler = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + RemoteRetrier retrier = new RemoteRetrier( + () -> RemoteRetrier.RETRIES_DISABLED, + (e) -> false, + retryScheduler, + Retrier.ALLOW_ALL_CALLS); if (socketAddress instanceof DomainSocketAddress) { DomainSocketAddress domainSocketAddress = (DomainSocketAddress) socketAddress; URI uri = new URI("http://localhost"); @@ -276,6 +288,7 @@ private HttpCacheClient createHttpBlobStore( remoteVerifyDownloads, ImmutableList.of(), DIGEST_UTIL, + retrier, creds, authAndTlsOptions); } else if (socketAddress instanceof InetSocketAddress) { @@ -288,6 +301,7 @@ private HttpCacheClient createHttpBlobStore( remoteVerifyDownloads, ImmutableList.of(), DIGEST_UTIL, + retrier, creds, authAndTlsOptions); } else { From 0947c2f35e941b7cbfdd20aa57db5705df954307 Mon Sep 17 00:00:00 2001 From: Claudio Bley Date: Tue, 26 Jul 2022 11:24:35 +0200 Subject: [PATCH 03/16] Retry cache operation for `IOException` "Operation timed out" We have seen cache failures caused by: ``` 2022-06-13T19:02:13.0964410Z NOT RETRYING: java.io.IOException: Operation timed out ``` which later resulted in: ``` 2022-06-13T19:05:42.4633080Z WARNING: Reading from Remote Cache: 2022-06-13T19:05:42.4635340Z com.google.devtools.build.lib.remote.BulkTransferException: Operation timed out 2022-06-13T19:05:42.4636670Z at com.google.devtools.build.lib.remote.RemoteCache.waitForBulkTransfer(RemoteCache.java:291) 2022-06-13T19:05:42.4638600Z at com.google.devtools.build.lib.remote.RemoteCache.download(RemoteCache.java:466) 2022-06-13T19:05:42.4640330Z at com.google.devtools.build.lib.remote.RemoteExecutionService.downloadOutputs(RemoteExecutionService.java:383) 2022-06-13T19:05:42.4643000Z at com.google.devtools.build.lib.remote.RemoteSpawnCache.lookup(RemoteSpawnCache.java:120) 2022-06-13T19:05:42.4645110Z at com.google.devtools.build.lib.exec.AbstractSpawnStrategy.exec(AbstractSpawnStrategy.java:139) 2022-06-13T19:05:42.4647270Z at com.google.devtools.build.lib.exec.AbstractSpawnStrategy.exec(AbstractSpawnStrategy.java:106) 2022-06-13T19:05:42.4649070Z at com.google.devtools.build.lib.actions.SpawnStrategy.beginExecution(SpawnStrategy.java:47) 2022-06-13T19:05:42.4650750Z at com.google.devtools.build.lib.exec.SpawnStrategyResolver.beginExecution(SpawnStrategyResolver.java:65) 2022-06-13T19:05:42.4655610Z at com.google.devtools.build.lib.analysis.actions.SpawnAction.beginExecution(SpawnAction.java:331) 2022-06-13T19:05:42.4657770Z at com.google.devtools.build.lib.actions.Action.execute(Action.java:127) 2022-06-13T19:05:42.4667910Z at com.google.devtools.build.lib.skyframe.SkyframeActionExecutor$5.execute(SkyframeActionExecutor.java:855) 2022-06-13T19:05:42.4669320Z at com.google.devtools.build.lib.skyframe.SkyframeActionExecutor$ActionRunner.continueAction(SkyframeActionExecutor.java:1016) 2022-06-13T19:05:42.4670870Z at com.google.devtools.build.lib.skyframe.SkyframeActionExecutor$ActionRunner.run(SkyframeActionExecutor.java:975) 2022-06-13T19:05:42.4672520Z at com.google.devtools.build.lib.skyframe.ActionExecutionState.runStateMachine(ActionExecutionState.java:129) 2022-06-13T19:05:42.4673950Z at com.google.devtools.build.lib.skyframe.ActionExecutionState.getResultOrDependOnFuture(ActionExecutionState.java:81) 2022-06-13T19:05:42.4675420Z at com.google.devtools.build.lib.skyframe.SkyframeActionExecutor.executeAction(SkyframeActionExecutor.java:472) 2022-06-13T19:05:42.4677050Z at com.google.devtools.build.lib.skyframe.ActionExecutionFunction.checkCacheAndExecuteIfNeeded(ActionExecutionFunction.java:834) 2022-06-13T19:05:42.4678230Z at com.google.devtools.build.lib.skyframe.ActionExecutionFunction.compute(ActionExecutionFunction.java:307) 2022-06-13T19:05:42.4679530Z at com.google.devtools.build.skyframe.AbstractParallelEvaluator$Evaluate.run(AbstractParallelEvaluator.java:477) 2022-06-13T19:05:42.4680390Z at com.google.devtools.build.lib.concurrent.AbstractQueueVisitor$WrappedRunnable.run(AbstractQueueVisitor.java:398) 2022-06-13T19:05:42.4681260Z at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) 2022-06-13T19:05:42.4682100Z at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) 2022-06-13T19:05:42.4683170Z at java.base/java.lang.Thread.run(Thread.java:829) 2022-06-13T19:05:42.4683650Z Suppressed: java.io.IOException: Operation timed out 2022-06-13T19:05:42.4684210Z at java.base/sun.nio.ch.FileDispatcherImpl.read0(Native Method) 2022-06-13T19:05:42.4685100Z at java.base/sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) 2022-06-13T19:05:42.4685990Z at java.base/sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:276) 2022-06-13T19:05:42.4686710Z at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:233) 2022-06-13T19:05:42.4687740Z at java.base/sun.nio.ch.IOUtil.read(IOUtil.java:223) 2022-06-13T19:05:42.4688740Z at java.base/sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:356) 2022-06-13T19:05:42.4689620Z at io.netty.buffer.PooledByteBuf.setBytes(PooledByteBuf.java:253) 2022-06-13T19:05:42.4690290Z at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1133) 2022-06-13T19:05:42.4692060Z at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:350) 2022-06-13T19:05:42.4693020Z at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:148) 2022-06-13T19:05:42.4693700Z at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) 2022-06-13T19:05:42.4694350Z at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) 2022-06-13T19:05:42.4695020Z at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) 2022-06-13T19:05:42.4695740Z at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) 2022-06-13T19:05:42.4696750Z at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) 2022-06-13T19:05:42.4698000Z at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) 2022-06-13T19:05:42.4699170Z at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) 2022-06-13T19:05:42.4700320Z ... 1 more ``` This was caused by a time out to the cache server while downloading cached artifacts and should have been retried. --- .../java/com/google/devtools/build/lib/remote/RemoteModule.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 65a216a96ce91c..a6d481b9c81f37 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -246,6 +246,8 @@ private void initHttpAndDiskCache( String msg = e.getMessage().toLowerCase(); if (msg.contains("connection reset by peer")) { retry = true; + } else if (msg.contains("operation timed out")) { + retry = true; } } return retry; From 2615e7ce528d86a5cb1f2fb7e3f28911cf90993a Mon Sep 17 00:00:00 2001 From: Andreas Herrmann Date: Tue, 10 Jan 2023 14:04:47 +0100 Subject: [PATCH 04/16] Remove RANGE requests on retry Addresses review comment https://github.com/bazelbuild/bazel/pull/14258#discussion_r1049719234 . RANGE requests are an optimization in that they avoid resending previously sent data over the wire. However, they are not necessary to implement retries with the HTTP remote cache. The implementation so far was only correct for uncompressed streams, for compressed streams the offset has to be set in terms of encoded bytes instead of decoded bytes. --- .../lib/remote/http/HttpDownloadHandler.java | 48 ++++--------------- 1 file changed, 9 insertions(+), 39 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java index f38dad965fb6e4..35bea78d11ca37 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java @@ -91,25 +91,15 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex if (contentLengthSet) { contentLength = HttpUtil.getContentLength(response); } - boolean full_content = response.status().equals(HttpResponseStatus.OK); - boolean partial_content = response.status().equals(HttpResponseStatus.PARTIAL_CONTENT); - if (full_content) { - if (offset != 0) { - // We requested a range but the server replied with a full response. - // We need to skip `offset` bytes of the response. - if (!skipBytes.isPresent()) { - // This is the first chunk, or the full response. - skipBytes = OptionalInt.of((int)offset); - } - } - } else if (partial_content) { - Optional error = validateContentRangeHeader(response.headers()); - if (error.isPresent()) { - failAndClose(error.get(), ctx); - return; + if (offset != 0) { + // We are in a retried download and received a full response. + // We need to skip `offset` bytes of the response to continue writing from the offset. + if (!skipBytes.isPresent()) { + // This is the first chunk, or the full response. + skipBytes = OptionalInt.of((int)offset); } } - downloadSucceeded = full_content || partial_content; + downloadSucceeded = response.status().equals(HttpResponseStatus.OK); if (!downloadSucceeded) { out = new ByteArrayOutputStream(); } @@ -164,7 +154,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) path = constructPath(cmd.uri(), cmd.digest().getHash(), cmd.casDownload()); offset = cmd.offset(); skipBytes = OptionalInt.empty(); - HttpRequest request = buildRequest(path, constructHost(cmd.uri()), cmd.offset()); + HttpRequest request = buildRequest(path, constructHost(cmd.uri())); addCredentialHeaders(request, cmd.uri()); addExtraRemoteHeaders(request); addUserAgentHeader(request); @@ -186,36 +176,16 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) { } } - private HttpRequest buildRequest(String path, String host, long offset) { + private HttpRequest buildRequest(String path, String host) { HttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, path); httpRequest.headers().set(HttpHeaderNames.HOST, host); httpRequest.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE); httpRequest.headers().set(HttpHeaderNames.ACCEPT, "*/*"); httpRequest.headers().set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP); - if (offset != 0) { - httpRequest.headers().set(HttpHeaderNames.RANGE, String.format("%s=%d-", HttpHeaderValues.BYTES, offset)); - } return httpRequest; } - private Optional validateContentRangeHeader(HttpHeaders headers) { - if (!headers.contains(HttpHeaderNames.CONTENT_RANGE)) { - return Optional.of(new HttpException(response, "Missing 'Content-Range' header", null)); - } - Pattern pattern = Pattern.compile("bytes\\s+(?[0-9]+)-(?[0-9]+)/(?[0-9]*|\\*)"); - Matcher matcher = pattern.matcher(response.headers().get(HttpHeaderNames.CONTENT_RANGE)); - if (!matcher.matches()) { - return Optional.of(new HttpException(response, "Unexpected 'Content-Range' header", null)); - } - long start = Long.valueOf(matcher.group("start")); - if (start != offset) { - return Optional.of(new HttpException( - response, String.format("Unexpected 'Content-Range' start: Expected %d but got %d", offset, start), null)); - } - return Optional.empty(); - } - private void succeedAndReset(ChannelHandlerContext ctx) { // All resets must happen *before* completing the user promise. Otherwise there is a race // condition, where this handler can be reused even though it is closed. In addition, if reset From 6eaa09ee420a45b2a990c90f568f9f3f0c05229c Mon Sep 17 00:00:00 2001 From: Andreas Herrmann Date: Tue, 10 Jan 2023 14:21:28 +0100 Subject: [PATCH 05/16] Factor out RETRIABLE_HTTP_ERRORS Addresses review comment https://github.com/bazelbuild/bazel/pull/14258#discussion_r1048439302 --- .../build/lib/remote/RemoteModule.java | 36 ++++++++++--------- 1 file changed, 20 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index a6d481b9c81f37..8463f2e95fccc9 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -115,6 +115,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.Predicate; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -219,6 +220,24 @@ private static ServerCapabilities getAndVerifyServerCapabilities( return capabilities; } + public static final Predicate RETRIABLE_HTTP_ERRORS = + e -> { + boolean retry = false; + if (e instanceof ClosedChannelException) { + retry = true; + } else if (e instanceof HttpException) { + retry = true; + } else if (e instanceof IOException) { + String msg = e.getMessage().toLowerCase(); + if (msg.contains("connection reset by peer")) { + retry = true; + } else if (msg.contains("operation timed out")) { + retry = true; + } + } + return retry; + }; + private void initHttpAndDiskCache( CommandEnvironment env, Credentials credentials, @@ -236,22 +255,7 @@ private void initHttpAndDiskCache( digestUtil, new RemoteRetrier( remoteOptions, - (e) -> { - boolean retry = false; - if (e instanceof ClosedChannelException) { - retry = true; - } else if (e instanceof HttpException) { - retry = true; - } else if (e instanceof IOException) { - String msg = e.getMessage().toLowerCase(); - if (msg.contains("connection reset by peer")) { - retry = true; - } else if (msg.contains("operation timed out")) { - retry = true; - } - } - return retry; - }, + RETRIABLE_HTTP_ERRORS, retryScheduler, Retrier.ALLOW_ALL_CALLS) ); From 98b606ed456efc7984ddbba34c4ba03ff33161ec Mon Sep 17 00:00:00 2001 From: Andreas Herrmann Date: Wed, 11 Jan 2023 15:23:57 +0100 Subject: [PATCH 06/16] Start fresh ActionResult download on retry Addresses review comment https://github.com/bazelbuild/bazel/pull/14258#issuecomment-1352743891 Continuing to stream from the previously read bytes on retry is safe for CAS downloads because items stored in the CAS can't change (otherwise it would change their content address). However, the same is not true for action cache items. Therefore, we need to start a fresh download on retry for action cache items. To that end we need to lift the retry logic out of the `get` utility function that implements the details of the download, to the separate higher level blob and action result download functions. The counter of bytes downloaded also needs to be lifted and is turned into an optional parameter to double as the flag indicating whether the download is a CAS download or not. --- .../lib/remote/http/HttpCacheClient.java | 127 ++++++++++-------- 1 file changed, 69 insertions(+), 58 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java index 047739ddfe888d..056825b4d56e8c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java @@ -82,6 +82,7 @@ import java.util.List; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -451,8 +452,11 @@ public ListenableFuture downloadBlob( RemoteActionExecutionContext context, Digest digest, OutputStream out) { final DigestOutputStream digestOut = verifyDownloads ? digestUtil.newDigestOutputStream(out) : null; + final AtomicLong casBytesDownloaded = new AtomicLong(); return Futures.transformAsync( - get(digest, digestOut != null ? digestOut : out, /* casDownload= */ true), + retrier.executeAsync(() -> + get(digest, digestOut != null ? digestOut : out, Optional.of(casBytesDownloaded)) + ), (v) -> { try { if (digestOut != null) { @@ -468,9 +472,8 @@ public ListenableFuture downloadBlob( } @SuppressWarnings("FutureReturnValueIgnored") - private ListenableFuture get(Digest digest, final OutputStream out, boolean casDownload) { + private ListenableFuture get(Digest digest, final OutputStream out, Optional casBytesDownloaded) { final AtomicBoolean dataWritten = new AtomicBoolean(); - AtomicLong bytesDownloaded = new AtomicLong(); OutputStream wrappedOut = new OutputStream() { // OutputStream.close() does nothing, which is what we want to ensure that the @@ -480,14 +483,18 @@ private ListenableFuture get(Digest digest, final OutputStream out, boolea @Override public void write(byte[] b, int offset, int length) throws IOException { dataWritten.set(true); - bytesDownloaded.addAndGet(length); + if (casBytesDownloaded.isPresent()) { + casBytesDownloaded.get().addAndGet(length); + } out.write(b, offset, length); } @Override public void write(int b) throws IOException { dataWritten.set(true); - bytesDownloaded.incrementAndGet(); + if (casBytesDownloaded.isPresent()) { + casBytesDownloaded.get().incrementAndGet(); + } out.write(b); } @@ -496,59 +503,61 @@ public void flush() throws IOException { out.flush(); } }; - return retrier.executeAsync(() -> { - DownloadCommand downloadCmd = new DownloadCommand(uri, casDownload, digest, wrappedOut, bytesDownloaded.get()); - SettableFuture outerF = SettableFuture.create(); - acquireDownloadChannel() - .addListener( - (Future channelPromise) -> { - if (!channelPromise.isSuccess()) { - outerF.setException(channelPromise.cause()); - return; - } + long offset = 0; + if (casBytesDownloaded.isPresent()) { + offset = casBytesDownloaded.get().get(); + } + DownloadCommand downloadCmd = new DownloadCommand(uri, casBytesDownloaded.isPresent(), digest, wrappedOut, offset); + SettableFuture outerF = SettableFuture.create(); + acquireDownloadChannel() + .addListener( + (Future channelPromise) -> { + if (!channelPromise.isSuccess()) { + outerF.setException(channelPromise.cause()); + return; + } - Channel ch = channelPromise.getNow(); - ch.writeAndFlush(downloadCmd) - .addListener( - (f) -> { - try { - if (f.isSuccess()) { - outerF.set(null); - } else { - Throwable cause = f.cause(); - // cause can be of type HttpException, because Netty uses - // Unsafe.throwException to - // re-throw a checked exception that hasn't been declared in the method - // signature. - if (cause instanceof HttpException) { - HttpResponse response = ((HttpException) cause).response(); - if (!dataWritten.get() && authTokenExpired(response)) { - // The error is due to an auth token having expired. Let's try - // again. - try { - refreshCredentials(); - getAfterCredentialRefresh(downloadCmd, outerF); - return; - } catch (IOException e) { - cause.addSuppressed(e); - } catch (RuntimeException e) { - logger.atWarning().withCause(e).log("Unexpected exception"); - cause.addSuppressed(e); - } - } else if (cacheMiss(response.status())) { - outerF.setException(new CacheNotFoundException(digest)); - return; - } - } - outerF.setException(cause); - } - } finally { - releaseDownloadChannel(ch); - } - }); + Channel ch = channelPromise.getNow(); + ch.writeAndFlush(downloadCmd) + .addListener( + (f) -> { + try { + if (f.isSuccess()) { + outerF.set(null); + } else { + Throwable cause = f.cause(); + // cause can be of type HttpException, because Netty uses + // Unsafe.throwException to + // re-throw a checked exception that hasn't been declared in the method + // signature. + if (cause instanceof HttpException) { + HttpResponse response = ((HttpException) cause).response(); + if (!dataWritten.get() && authTokenExpired(response)) { + // The error is due to an auth token having expired. Let's try + // again. + try { + refreshCredentials(); + getAfterCredentialRefresh(downloadCmd, outerF); + return; + } catch (IOException e) { + cause.addSuppressed(e); + } catch (RuntimeException e) { + logger.atWarning().withCause(e).log("Unexpected exception"); + cause.addSuppressed(e); + } + } else if (cacheMiss(response.status())) { + outerF.setException(new CacheNotFoundException(digest)); + return; + } + } + outerF.setException(cause); + } + } finally { + releaseDownloadChannel(ch); + } }); - return outerF; - }); + }); + return outerF; } @SuppressWarnings("FutureReturnValueIgnored") @@ -590,8 +599,10 @@ private void getAfterCredentialRefresh(DownloadCommand cmd, SettableFuture public ListenableFuture downloadActionResult( RemoteActionExecutionContext context, ActionKey actionKey, boolean inlineOutErr) { return Futures.transform( - Utils.downloadAsActionResult( - actionKey, (digest, out) -> get(digest, out, /* casDownload= */ false)), + retrier.executeAsync(() -> + Utils.downloadAsActionResult( + actionKey, (digest, out) -> get(digest, out, /* casBytesDownloaded= */ Optional.empty())) + ), CachedActionResult::remote, MoreExecutors.directExecutor()); } From e41c4cb4981a18882872651077086cbec82dead1 Mon Sep 17 00:00:00 2001 From: Andreas Herrmann Date: Wed, 11 Jan 2023 15:40:14 +0100 Subject: [PATCH 07/16] Test download at offset --- .../remote/http/HttpDownloadHandlerTest.java | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandlerTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandlerTest.java index 35eecfbd90bcd6..0bd3b621da49df 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandlerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandlerTest.java @@ -162,4 +162,29 @@ public void httpErrorsWithContentAreSupported() throws IOException { verify(out, never()).close(); assertThat(ch.isOpen()).isFalse(); } + + /** + * Test that the handler correctly supports downloads at an offset, e.g. on retry. + */ + @Test + public void downloadAtOffsetShouldWork() throws IOException { + EmbeddedChannel ch = new EmbeddedChannel(new HttpDownloadHandler(null, ImmutableList.of())); + ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream()); + DownloadCommand cmd = new DownloadCommand(CACHE_URI, true, DIGEST, out, 2); + ChannelPromise writePromise = ch.newPromise(); + ch.writeOneOutbound(cmd, writePromise); + + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaders.CONTENT_LENGTH, 5); + response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + ch.writeInbound(response); + ByteBuf content = Unpooled.buffer(); + content.writeBytes(new byte[] {1, 2, 3, 4, 5}); + ch.writeInbound(new DefaultLastHttpContent(content)); + + assertThat(writePromise.isDone()).isTrue(); + assertThat(out.toByteArray()).isEqualTo(new byte[] {3, 4, 5}); + verify(out, never()).close(); + assertThat(ch.isActive()).isTrue(); + } } From 2f56d3585d7e13b3088e6e49f9a8cc6069ec3f1c Mon Sep 17 00:00:00 2001 From: Andreas Herrmann Date: Wed, 11 Jan 2023 15:44:27 +0100 Subject: [PATCH 08/16] Test chunked download at an offset --- .../remote/http/HttpDownloadHandlerTest.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandlerTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandlerTest.java index 0bd3b621da49df..eef192e59075ae 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandlerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandlerTest.java @@ -187,4 +187,35 @@ public void downloadAtOffsetShouldWork() throws IOException { verify(out, never()).close(); assertThat(ch.isActive()).isTrue(); } + + /** + * Test that the handler correctly supports chunked downloads at an offset, e.g. on retry. + */ + @Test + public void chunkedDownloadAtOffsetShouldWork() throws IOException { + EmbeddedChannel ch = new EmbeddedChannel(new HttpDownloadHandler(null, ImmutableList.of())); + ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream()); + DownloadCommand cmd = new DownloadCommand(CACHE_URI, true, DIGEST, out, 3); + ChannelPromise writePromise = ch.newPromise(); + ch.writeOneOutbound(cmd, writePromise); + + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaders.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + ch.writeInbound(response); + ByteBuf content1 = Unpooled.buffer(); + content1.writeBytes(new byte[] {1, 2}); + ch.writeInbound(new DefaultHttpContent(content1)); + ByteBuf content2 = Unpooled.buffer(); + content2.writeBytes(new byte[] {3, 4}); + ch.writeInbound(new DefaultHttpContent(content2)); + ByteBuf content3 = Unpooled.buffer(); + content3.writeBytes(new byte[] {5}); + ch.writeInbound(new DefaultLastHttpContent(content3)); + + assertThat(writePromise.isDone()).isTrue(); + assertThat(out.toByteArray()).isEqualTo(new byte[] {4, 5}); + verify(out, never()).close(); + assertThat(ch.isActive()).isTrue(); + } } From 800bd245d0a15436a28f3711c059ed1e5d69ed8e Mon Sep 17 00:00:00 2001 From: Andreas Herrmann Date: Wed, 11 Jan 2023 17:10:56 +0100 Subject: [PATCH 09/16] Parameterize test http client with retrier --- .../lib/remote/http/HttpCacheClientTest.java | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java index b92df44c7c8d1d..6152baf7f56630 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java @@ -87,12 +87,7 @@ import java.net.SocketAddress; import java.net.URI; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.function.IntFunction; @@ -267,16 +262,19 @@ private HttpCacheClient createHttpBlobStore( int timeoutSeconds, boolean remoteVerifyDownloads, @Nullable final Credentials creds, - AuthAndTLSOptions authAndTlsOptions) + AuthAndTLSOptions authAndTlsOptions, + Optional optRetrier) throws Exception { SocketAddress socketAddress = serverChannel.localAddress(); - ListeningScheduledExecutorService retryScheduler = - MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); - RemoteRetrier retrier = new RemoteRetrier( - () -> RemoteRetrier.RETRIES_DISABLED, - (e) -> false, - retryScheduler, - Retrier.ALLOW_ALL_CALLS); + RemoteRetrier retrier = optRetrier.orElseGet(() -> { + ListeningScheduledExecutorService retryScheduler = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + return new RemoteRetrier( + () -> RemoteRetrier.RETRIES_DISABLED, + (e) -> false, + retryScheduler, + Retrier.ALLOW_ALL_CALLS); + }); if (socketAddress instanceof DomainSocketAddress) { DomainSocketAddress domainSocketAddress = (DomainSocketAddress) socketAddress; URI uri = new URI("http://localhost"); @@ -317,7 +315,7 @@ private HttpCacheClient createHttpBlobStore( AuthAndTLSOptions authAndTlsOptions) throws Exception { return createHttpBlobStore( - serverChannel, timeoutSeconds, /* remoteVerifyDownloads= */ true, creds, authAndTlsOptions); + serverChannel, timeoutSeconds, /* remoteVerifyDownloads= */ true, creds, authAndTlsOptions, Optional.empty()); } @Before @@ -503,7 +501,8 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) /* timeoutSeconds= */ 1, /* remoteVerifyDownloads= */ true, credentials, - authAndTlsOptions); + authAndTlsOptions, + Optional.empty()); Digest fooDigest = DIGEST_UTIL.compute("foo".getBytes(Charsets.UTF_8)); try (OutputStream out = new ByteArrayOutputStream()) { IOException e = @@ -550,7 +549,8 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) /* timeoutSeconds= */ 1, /* remoteVerifyDownloads= */ false, credentials, - authAndTlsOptions); + authAndTlsOptions, + Optional.empty()); Digest fooDigest = DIGEST_UTIL.compute("foo".getBytes(Charsets.UTF_8)); try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { getFromFuture(blobStore.downloadBlob(remoteActionExecutionContext, fooDigest, out)); From 92b868cc6961b6bf6efa7396e70198ede9929dfb Mon Sep 17 00:00:00 2001 From: Andreas Herrmann Date: Wed, 11 Jan 2023 16:25:09 +0100 Subject: [PATCH 10/16] Test partial download with and without retry --- .../lib/remote/http/HttpCacheClientTest.java | 115 ++++++++++++++---- 1 file changed, 94 insertions(+), 21 deletions(-) diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java index 6152baf7f56630..55fdd51ca0c21e 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java @@ -16,8 +16,7 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; import static java.util.Collections.singletonList; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -48,17 +47,9 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandler; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandler; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.ServerChannel; -import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerDomainSocketChannel; @@ -69,15 +60,8 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.TooLongFrameException; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.codec.http.HttpUtil; -import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.*; + import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; @@ -86,6 +70,7 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; +import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @@ -561,6 +546,63 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) } } + @Test + public void partialDownloadFailsWithoutRetry() throws Exception { + ServerChannel server = null; + try { + server = testServer.start(new IntermittentFailureHandler()); + Credentials credentials = newCredentials(); + AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); + + HttpCacheClient blobStore = + createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials, authAndTlsOptions); + assertThrows( + ClosedChannelException.class, + () -> + getFromFuture( + blobStore.downloadBlob( + remoteActionExecutionContext, DIGEST, new ByteArrayOutputStream()))); + } finally { + testServer.stop(server); + } + } + + @Test + public void partialDownloadSucceedsWithRetry() throws Exception { + ServerChannel server = null; + try { + server = testServer.start(new IntermittentFailureHandler()); + Credentials credentials = newCredentials(); + AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); + + ListeningScheduledExecutorService retryScheduler = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + RemoteRetrier retrier = new RemoteRetrier( + () -> new Retrier.ZeroBackoff(1), + (e) -> { + return e instanceof ClosedChannelException; + }, + retryScheduler, + Retrier.ALLOW_ALL_CALLS); + HttpCacheClient blobStore = + createHttpBlobStore( + server, + /* timeoutSeconds= */ 1, + /* remoteVerifyDownloads= */ false, + credentials, + authAndTlsOptions, + Optional.of(retrier)); + + ByteArrayOutputStream download = new ByteArrayOutputStream(); + getFromFuture( + blobStore.downloadBlob( + remoteActionExecutionContext, DIGEST, download)); + assertArrayEquals("File Contents".getBytes(Charsets.US_ASCII), download.toByteArray()); + } finally { + testServer.stop(server); + } + } + @Test public void expiredAuthTokensShouldBeRetried_get() throws Exception { expiredAuthTokensShouldBeRetried_get( @@ -783,4 +825,35 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) } } } + + /** + * {@link ChannelHandler} that on the first request returns a partial response and then closes the stream, + * and on any further requests returns a full response. + */ + @Sharable + static class IntermittentFailureHandler extends SimpleChannelInboundHandler { + private int messageCount; + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) { + DefaultHttpResponse response = new DefaultHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + ctx.write(response); + ByteBuf content1 = Unpooled.buffer(); + content1.writeBytes("File ".getBytes(Charsets.US_ASCII)); + ChannelFuture future = ctx.writeAndFlush(new DefaultHttpContent(content1)); + if (messageCount == 0) { + future.addListener(ChannelFutureListener.CLOSE); + } else { + ByteBuf content2 = Unpooled.buffer(); + content2.writeBytes("Contents".getBytes(Charsets.US_ASCII)); + ctx + .writeAndFlush(new DefaultLastHttpContent(content2)) + .addListener(ChannelFutureListener.CLOSE); + } + ++messageCount; + } + } } From c2eec3f7c46c47f7389915b907731eb5b5f68189 Mon Sep 17 00:00:00 2001 From: Andreas Herrmann Date: Thu, 12 Jan 2023 16:46:30 +0100 Subject: [PATCH 11/16] Only retry on certain HTTP error codes --- .../com/google/devtools/build/lib/remote/RemoteModule.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 8463f2e95fccc9..823dd1beaf87ce 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -103,6 +103,7 @@ import io.grpc.Channel; import io.grpc.ClientInterceptor; import io.grpc.ManagedChannel; +import io.netty.handler.codec.http.HttpResponseStatus; import io.reactivex.rxjava3.plugins.RxJavaPlugins; import java.io.IOException; import java.net.URI; @@ -226,7 +227,11 @@ private static ServerCapabilities getAndVerifyServerCapabilities( if (e instanceof ClosedChannelException) { retry = true; } else if (e instanceof HttpException) { - retry = true; + HttpResponseStatus status = ((HttpException) e).response().status(); + retry = status == HttpResponseStatus.INTERNAL_SERVER_ERROR + || status == HttpResponseStatus.BAD_GATEWAY + || status == HttpResponseStatus.SERVICE_UNAVAILABLE + || status == HttpResponseStatus.GATEWAY_TIMEOUT; } else if (e instanceof IOException) { String msg = e.getMessage().toLowerCase(); if (msg.contains("connection reset by peer")) { From c4228bca00f53a8a99066c4efb2ad08934f2612c Mon Sep 17 00:00:00 2001 From: Andreas Herrmann Date: Fri, 13 Jan 2023 11:37:25 +0100 Subject: [PATCH 12/16] Make chunks configurable --- .../lib/remote/http/HttpCacheClientTest.java | 32 +++++++++++++------ 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java index 55fdd51ca0c21e..e82f325f1cd595 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java @@ -550,7 +550,9 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) public void partialDownloadFailsWithoutRetry() throws Exception { ServerChannel server = null; try { - server = testServer.start(new IntermittentFailureHandler()); + ByteBuf chunk1 = Unpooled.wrappedBuffer("File ".getBytes(Charsets.US_ASCII)); + ByteBuf chunk2 = Unpooled.wrappedBuffer("Contents".getBytes(Charsets.US_ASCII)); + server = testServer.start(new IntermittentFailureHandler(chunk1, chunk2)); Credentials credentials = newCredentials(); AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); @@ -571,7 +573,9 @@ public void partialDownloadFailsWithoutRetry() throws Exception { public void partialDownloadSucceedsWithRetry() throws Exception { ServerChannel server = null; try { - server = testServer.start(new IntermittentFailureHandler()); + ByteBuf chunk1 = Unpooled.wrappedBuffer("File ".getBytes(Charsets.US_ASCII)); + ByteBuf chunk2 = Unpooled.wrappedBuffer("Contents".getBytes(Charsets.US_ASCII)); + server = testServer.start(new IntermittentFailureHandler(chunk1, chunk2)); Credentials credentials = newCredentials(); AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); @@ -832,8 +836,19 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) */ @Sharable static class IntermittentFailureHandler extends SimpleChannelInboundHandler { + private final ByteBuf attempt1_chunk1, attempt2_chunk1, attempt2_chunk2; private int messageCount; + public IntermittentFailureHandler(ByteBuf attempt1_chunk1, ByteBuf attempt2_chunk1, ByteBuf attempt2_chunk2) { + this.attempt1_chunk1 = attempt1_chunk1; + this.attempt2_chunk1 = attempt2_chunk1; + this.attempt2_chunk2 = attempt2_chunk2; + } + + public IntermittentFailureHandler(ByteBuf chunk1, ByteBuf chunk2) { + this(chunk1.copy(), chunk1, chunk2); + } + @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) { DefaultHttpResponse response = new DefaultHttpResponse( @@ -841,16 +856,15 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) HttpResponseStatus.OK); response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); ctx.write(response); - ByteBuf content1 = Unpooled.buffer(); - content1.writeBytes("File ".getBytes(Charsets.US_ASCII)); - ChannelFuture future = ctx.writeAndFlush(new DefaultHttpContent(content1)); if (messageCount == 0) { - future.addListener(ChannelFutureListener.CLOSE); + ctx + .writeAndFlush(new DefaultHttpContent(attempt1_chunk1)) + .addListener(ChannelFutureListener.CLOSE); } else { - ByteBuf content2 = Unpooled.buffer(); - content2.writeBytes("Contents".getBytes(Charsets.US_ASCII)); ctx - .writeAndFlush(new DefaultLastHttpContent(content2)) + .writeAndFlush(new DefaultHttpContent(attempt2_chunk1)); + ctx + .writeAndFlush(new DefaultLastHttpContent(attempt2_chunk2)) .addListener(ChannelFutureListener.CLOSE); } ++messageCount; From 58402d6a4fe720496c7da9e2f34dc87a9e7e861a Mon Sep 17 00:00:00 2001 From: Andreas Herrmann Date: Fri, 13 Jan 2023 11:39:53 +0100 Subject: [PATCH 13/16] Test that CAS download retry skips redundant offset --- .../devtools/build/lib/remote/http/HttpCacheClientTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java index e82f325f1cd595..af5c8253841af8 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java @@ -574,8 +574,10 @@ public void partialDownloadSucceedsWithRetry() throws Exception { ServerChannel server = null; try { ByteBuf chunk1 = Unpooled.wrappedBuffer("File ".getBytes(Charsets.US_ASCII)); + // Replace first chunk to test that the client skips the redundant prefix on retry. + ByteBuf chunk1_attempt2 = Unpooled.wrappedBuffer("abcde".getBytes(Charsets.US_ASCII)); ByteBuf chunk2 = Unpooled.wrappedBuffer("Contents".getBytes(Charsets.US_ASCII)); - server = testServer.start(new IntermittentFailureHandler(chunk1, chunk2)); + server = testServer.start(new IntermittentFailureHandler(chunk1, chunk1_attempt2, chunk2)); Credentials credentials = newCredentials(); AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); From e33f62b8e956ab5dc655e71254acc7b6bcba28ff Mon Sep 17 00:00:00 2001 From: Andreas Herrmann Date: Fri, 13 Jan 2023 12:04:49 +0100 Subject: [PATCH 14/16] Test ActionResult download retry --- .../lib/remote/http/HttpCacheClientTest.java | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java index af5c8253841af8..f7e82751df3d59 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java @@ -26,6 +26,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import build.bazel.remote.execution.v2.ActionResult; import build.bazel.remote.execution.v2.Digest; import com.google.auth.Credentials; import com.google.common.base.Charsets; @@ -37,6 +38,7 @@ import com.google.devtools.build.lib.remote.RemoteRetrier; import com.google.devtools.build.lib.remote.Retrier; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.vfs.DigestHashFunction; @@ -77,6 +79,8 @@ import java.util.concurrent.Executors; import java.util.function.IntFunction; import javax.annotation.Nullable; + +import io.netty.handler.codec.protobuf.ProtobufEncoder; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -609,6 +613,66 @@ public void partialDownloadSucceedsWithRetry() throws Exception { } } + @Test + public void actionResultRetryReadsFromStart() throws Exception { + ServerChannel server = null; + try { + ActionResult.Builder builder1 = ActionResult.newBuilder(); + builder1 + .addOutputFilesBuilder() + .setPath("attempt1/filename") + .setDigest(DIGEST_UTIL.computeAsUtf8("digest1")) + .setIsExecutable(true); + ActionResult action1 = builder1.build(); + ByteArrayOutputStream buffer1 = new ByteArrayOutputStream(); + action1.writeTo(buffer1); + int splitAt = buffer1.size() / 2; + ByteBuf chunk1 = Unpooled.copiedBuffer(buffer1.toByteArray(), 0, splitAt); + + // Replace first chunk to test that the client starts a fresh ActionResult download on retry. + ActionResult.Builder builder2 = ActionResult.newBuilder(); + builder2 + .addOutputFilesBuilder() + .setPath("attempt2/filename") + .setDigest(DIGEST_UTIL.computeAsUtf8("digest2")) + .setIsExecutable(false); + ActionResult action2 = builder2.build(); + ByteArrayOutputStream buffer2 = new ByteArrayOutputStream(); + action2.writeTo(buffer2); + ByteBuf chunk1_attempt2 = Unpooled.copiedBuffer(buffer2.toByteArray(), 0, splitAt); + ByteBuf chunk2 = Unpooled.copiedBuffer(buffer2.toByteArray(), splitAt, buffer2.size() - splitAt); + + server = testServer.start(new IntermittentFailureHandler(chunk1, chunk1_attempt2, chunk2)); + Credentials credentials = newCredentials(); + AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); + + ListeningScheduledExecutorService retryScheduler = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + RemoteRetrier retrier = new RemoteRetrier( + () -> new Retrier.ZeroBackoff(1), + (e) -> { + return e instanceof ClosedChannelException; + }, + retryScheduler, + Retrier.ALLOW_ALL_CALLS); + HttpCacheClient blobStore = + createHttpBlobStore( + server, + /* timeoutSeconds= */ 1, + /* remoteVerifyDownloads= */ false, + credentials, + authAndTlsOptions, + Optional.of(retrier)); + + RemoteCacheClient.CachedActionResult download = getFromFuture( + blobStore.downloadActionResult( + remoteActionExecutionContext, new RemoteCacheClient.ActionKey(DIGEST), false)); + assertThat(download.actionResult()).isEqualTo(action2); + } finally { + testServer.stop(server); + } + } + @Test public void expiredAuthTokensShouldBeRetried_get() throws Exception { expiredAuthTokensShouldBeRetried_get( From 2690f23cad0624f85fde54786197e2c8a78ebd51 Mon Sep 17 00:00:00 2001 From: Andreas Herrmann Date: Tue, 24 Jan 2023 15:32:40 +0100 Subject: [PATCH 15/16] Remove offset in favor of skipBytes Addressing review comment https://github.com/bazelbuild/bazel/pull/14258#discussion_r1071370986 --- .../lib/remote/http/HttpDownloadHandler.java | 21 +++++-------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java index 35bea78d11ca37..027e63d05c0f4b 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java @@ -45,10 +45,8 @@ final class HttpDownloadHandler extends AbstractHttpHandler { private long contentLength = -1; /** the path header in the http request */ private String path; - /** the offset at which to download */ - private long offset; /** the bytes to skip in a full or chunked response */ - private OptionalInt skipBytes; + private int skipBytes; public HttpDownloadHandler( Credentials credentials, ImmutableList> extraHttpHeaders) { @@ -91,14 +89,6 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex if (contentLengthSet) { contentLength = HttpUtil.getContentLength(response); } - if (offset != 0) { - // We are in a retried download and received a full response. - // We need to skip `offset` bytes of the response to continue writing from the offset. - if (!skipBytes.isPresent()) { - // This is the first chunk, or the full response. - skipBytes = OptionalInt.of((int)offset); - } - } downloadSucceeded = response.status().equals(HttpResponseStatus.OK); if (!downloadSucceeded) { out = new ByteArrayOutputStream(); @@ -111,13 +101,13 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex ByteBuf content = ((HttpContent) msg).content(); int readableBytes = content.readableBytes(); - if (skipBytes.isPresent() && skipBytes.getAsInt() > 0) { - int skipNow = skipBytes.getAsInt(); + if (skipBytes > 0) { + int skipNow = skipBytes; if (skipNow >= readableBytes) { skipNow = readableBytes; } content.readerIndex(content.readerIndex() + skipNow); - skipBytes = OptionalInt.of(skipBytes.getAsInt() - skipNow); + skipBytes -= skipNow; readableBytes = readableBytes - skipNow; } content.readBytes(out, readableBytes); @@ -152,8 +142,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) DownloadCommand cmd = (DownloadCommand) msg; out = cmd.out(); path = constructPath(cmd.uri(), cmd.digest().getHash(), cmd.casDownload()); - offset = cmd.offset(); - skipBytes = OptionalInt.empty(); + skipBytes = (int)cmd.offset(); HttpRequest request = buildRequest(path, constructHost(cmd.uri())); addCredentialHeaders(request, cmd.uri()); addExtraRemoteHeaders(request); From 93315bd72c585494d8b13859cdc56cd15818fed4 Mon Sep 17 00:00:00 2001 From: Andreas Herrmann Date: Wed, 25 Jan 2023 13:54:33 +0100 Subject: [PATCH 16/16] Fix long to int conversion --- .../build/lib/remote/http/HttpDownloadHandler.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java index 027e63d05c0f4b..e581fd50f4225f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java @@ -46,7 +46,7 @@ final class HttpDownloadHandler extends AbstractHttpHandler { /** the path header in the http request */ private String path; /** the bytes to skip in a full or chunked response */ - private int skipBytes; + private long skipBytes; public HttpDownloadHandler( Credentials credentials, ImmutableList> extraHttpHeaders) { @@ -102,8 +102,12 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex ByteBuf content = ((HttpContent) msg).content(); int readableBytes = content.readableBytes(); if (skipBytes > 0) { - int skipNow = skipBytes; - if (skipNow >= readableBytes) { + int skipNow; + if (skipBytes < readableBytes) { + // readableBytes is an int, meaning skipBytes < readableBytes <= INT_MAX. + // So, this conversion is safe. + skipNow = (int)skipBytes; + } else { skipNow = readableBytes; } content.readerIndex(content.readerIndex() + skipNow); @@ -142,7 +146,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) DownloadCommand cmd = (DownloadCommand) msg; out = cmd.out(); path = constructPath(cmd.uri(), cmd.digest().getHash(), cmd.casDownload()); - skipBytes = (int)cmd.offset(); + skipBytes = cmd.offset(); HttpRequest request = buildRequest(path, constructHost(cmd.uri())); addCredentialHeaders(request, cmd.uri()); addExtraRemoteHeaders(request);