diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java index 8f68923f9fffcd..2fdc420841b39d 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactory.java @@ -59,15 +59,16 @@ public static RemoteCacheClient create( @Nullable Credentials creds, AuthAndTLSOptions authAndTlsOptions, Path workingDirectory, - DigestUtil digestUtil) + DigestUtil digestUtil, + RemoteRetrier retrier) throws IOException { Preconditions.checkNotNull(workingDirectory, "workingDirectory"); if (isHttpCache(options) && isDiskCache(options)) { return createDiskAndHttpCache( - workingDirectory, options.diskCache, options, creds, authAndTlsOptions, digestUtil); + workingDirectory, options.diskCache, options, creds, authAndTlsOptions, digestUtil, retrier); } if (isHttpCache(options)) { - return createHttp(options, creds, authAndTlsOptions, digestUtil); + return createHttp(options, creds, authAndTlsOptions, digestUtil, retrier); } if (isDiskCache(options)) { return createDiskCache( @@ -90,7 +91,8 @@ private static RemoteCacheClient createHttp( RemoteOptions options, Credentials creds, AuthAndTLSOptions authAndTlsOptions, - DigestUtil digestUtil) { + DigestUtil digestUtil, + RemoteRetrier retrier) { Preconditions.checkNotNull(options.remoteCache, "remoteCache"); try { @@ -109,6 +111,7 @@ private static RemoteCacheClient createHttp( options.remoteVerifyDownloads, ImmutableList.copyOf(options.remoteHeaders), digestUtil, + retrier, creds, authAndTlsOptions); } else { @@ -122,6 +125,7 @@ private static RemoteCacheClient createHttp( options.remoteVerifyDownloads, ImmutableList.copyOf(options.remoteHeaders), digestUtil, + retrier, creds, authAndTlsOptions); } @@ -151,7 +155,8 @@ private static RemoteCacheClient createDiskAndHttpCache( RemoteOptions options, Credentials cred, AuthAndTLSOptions authAndTlsOptions, - DigestUtil digestUtil) + DigestUtil digestUtil, + RemoteRetrier retrier) throws IOException { Path cacheDir = workingDirectory.getRelative(Preconditions.checkNotNull(diskCachePath, "diskCachePath")); @@ -159,7 +164,7 @@ private static RemoteCacheClient createDiskAndHttpCache( cacheDir.createDirectoryAndParents(); } - RemoteCacheClient httpCache = createHttp(options, cred, authAndTlsOptions, digestUtil); + RemoteCacheClient httpCache = createHttp(options, cred, authAndTlsOptions, digestUtil, retrier); return createDiskAndRemoteClient( workingDirectory, diskCachePath, diff --git a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java index 8026972836899f..823dd1beaf87ce 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java +++ b/src/main/java/com/google/devtools/build/lib/remote/RemoteModule.java @@ -59,9 +59,11 @@ import com.google.devtools.build.lib.exec.ModuleActionContextRegistry; import com.google.devtools.build.lib.exec.SpawnStrategyRegistry; import com.google.devtools.build.lib.remote.RemoteServerCapabilities.ServerCapabilitiesRequirement; +import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.common.RemoteExecutionClient; import com.google.devtools.build.lib.remote.downloader.GrpcRemoteDownloader; +import com.google.devtools.build.lib.remote.http.HttpException; import com.google.devtools.build.lib.remote.logging.LoggingInterceptor; import com.google.devtools.build.lib.remote.options.RemoteBuildEventUploadMode; import com.google.devtools.build.lib.remote.options.RemoteOptions; @@ -101,10 +103,12 @@ import io.grpc.Channel; import io.grpc.ClientInterceptor; import io.grpc.ManagedChannel; +import io.netty.handler.codec.http.HttpResponseStatus; import io.reactivex.rxjava3.plugins.RxJavaPlugins; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.channels.ClosedChannelException; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; @@ -112,6 +116,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; +import java.util.function.Predicate; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -216,6 +221,28 @@ private static ServerCapabilities getAndVerifyServerCapabilities( return capabilities; } + public static final Predicate RETRIABLE_HTTP_ERRORS = + e -> { + boolean retry = false; + if (e instanceof ClosedChannelException) { + retry = true; + } else if (e instanceof HttpException) { + HttpResponseStatus status = ((HttpException) e).response().status(); + retry = status == HttpResponseStatus.INTERNAL_SERVER_ERROR + || status == HttpResponseStatus.BAD_GATEWAY + || status == HttpResponseStatus.SERVICE_UNAVAILABLE + || status == HttpResponseStatus.GATEWAY_TIMEOUT; + } else if (e instanceof IOException) { + String msg = e.getMessage().toLowerCase(); + if (msg.contains("connection reset by peer")) { + retry = true; + } else if (msg.contains("operation timed out")) { + retry = true; + } + } + return retry; + }; + private void initHttpAndDiskCache( CommandEnvironment env, Credentials credentials, @@ -230,7 +257,13 @@ private void initHttpAndDiskCache( credentials, authAndTlsOptions, Preconditions.checkNotNull(env.getWorkingDirectory(), "workingDirectory"), - digestUtil); + digestUtil, + new RemoteRetrier( + remoteOptions, + RETRIABLE_HTTP_ERRORS, + retryScheduler, + Retrier.ALLOW_ALL_CALLS) + ); } catch (IOException e) { handleInitFailure(env, e, Code.CACHE_INIT_FAILURE); return; diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD index 67e7bd0a4689f4..05eed8d8fbeb93 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/BUILD +++ b/src/main/java/com/google/devtools/build/lib/remote/http/BUILD @@ -21,6 +21,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/analysis:blaze_version_info", "//src/main/java/com/google/devtools/build/lib/authandtls", "//src/main/java/com/google/devtools/build/lib/remote/common", + "//src/main/java/com/google/devtools/build/lib/remote:Retrier", "//src/main/java/com/google/devtools/build/lib/remote/util", "//src/main/java/com/google/devtools/build/lib/vfs", "//third_party:auth", diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java index a2e4abf9d83eb1..93843a91dc7966 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/DownloadCommand.java @@ -25,12 +25,18 @@ final class DownloadCommand { private final boolean casDownload; private final Digest digest; private final OutputStream out; + private final long offset; - DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) { + DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out, long offset) { this.uri = Preconditions.checkNotNull(uri); this.casDownload = casDownload; this.digest = Preconditions.checkNotNull(digest); this.out = Preconditions.checkNotNull(out); + this.offset = offset; + } + + DownloadCommand(URI uri, boolean casDownload, Digest digest, OutputStream out) { + this(uri, casDownload, digest, out, 0); } public URI uri() { @@ -48,4 +54,6 @@ public Digest digest() { public OutputStream out() { return out; } + + public long offset() { return offset; } } diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java index 6c6d4cc6f897fc..056825b4d56e8c 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpCacheClient.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; +import com.google.devtools.build.lib.remote.RemoteRetrier; import com.google.devtools.build.lib.remote.common.CacheNotFoundException; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; @@ -81,8 +82,10 @@ import java.util.List; import java.util.Map.Entry; import java.util.NoSuchElementException; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.regex.Pattern; import javax.annotation.Nullable; @@ -129,6 +132,7 @@ public final class HttpCacheClient implements RemoteCacheClient { private final boolean useTls; private final boolean verifyDownloads; private final DigestUtil digestUtil; + private final RemoteRetrier retrier; private final Object closeLock = new Object(); @@ -150,6 +154,7 @@ public static HttpCacheClient create( boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, + RemoteRetrier retrier, @Nullable final Credentials creds, AuthAndTLSOptions authAndTlsOptions) throws Exception { @@ -162,6 +167,7 @@ public static HttpCacheClient create( verifyDownloads, extraHttpHeaders, digestUtil, + retrier, creds, authAndTlsOptions, null); @@ -175,6 +181,7 @@ public static HttpCacheClient create( boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, + RemoteRetrier retrier, @Nullable final Credentials creds, AuthAndTLSOptions authAndTlsOptions) throws Exception { @@ -189,6 +196,7 @@ public static HttpCacheClient create( verifyDownloads, extraHttpHeaders, digestUtil, + retrier, creds, authAndTlsOptions, domainSocketAddress); @@ -202,6 +210,7 @@ public static HttpCacheClient create( verifyDownloads, extraHttpHeaders, digestUtil, + retrier, creds, authAndTlsOptions, domainSocketAddress); @@ -219,6 +228,7 @@ private HttpCacheClient( boolean verifyDownloads, ImmutableList> extraHttpHeaders, DigestUtil digestUtil, + RemoteRetrier retrier, @Nullable final Credentials creds, AuthAndTLSOptions authAndTlsOptions, @Nullable SocketAddress socketAddress) @@ -284,6 +294,7 @@ public void channelCreated(Channel ch) { this.extraHttpHeaders = extraHttpHeaders; this.verifyDownloads = verifyDownloads; this.digestUtil = digestUtil; + this.retrier = retrier; } @SuppressWarnings("FutureReturnValueIgnored") @@ -441,8 +452,11 @@ public ListenableFuture downloadBlob( RemoteActionExecutionContext context, Digest digest, OutputStream out) { final DigestOutputStream digestOut = verifyDownloads ? digestUtil.newDigestOutputStream(out) : null; + final AtomicLong casBytesDownloaded = new AtomicLong(); return Futures.transformAsync( - get(digest, digestOut != null ? digestOut : out, /* casDownload= */ true), + retrier.executeAsync(() -> + get(digest, digestOut != null ? digestOut : out, Optional.of(casBytesDownloaded)) + ), (v) -> { try { if (digestOut != null) { @@ -458,7 +472,7 @@ public ListenableFuture downloadBlob( } @SuppressWarnings("FutureReturnValueIgnored") - private ListenableFuture get(Digest digest, final OutputStream out, boolean casDownload) { + private ListenableFuture get(Digest digest, final OutputStream out, Optional casBytesDownloaded) { final AtomicBoolean dataWritten = new AtomicBoolean(); OutputStream wrappedOut = new OutputStream() { @@ -469,12 +483,18 @@ private ListenableFuture get(Digest digest, final OutputStream out, boolea @Override public void write(byte[] b, int offset, int length) throws IOException { dataWritten.set(true); + if (casBytesDownloaded.isPresent()) { + casBytesDownloaded.get().addAndGet(length); + } out.write(b, offset, length); } @Override public void write(int b) throws IOException { dataWritten.set(true); + if (casBytesDownloaded.isPresent()) { + casBytesDownloaded.get().incrementAndGet(); + } out.write(b); } @@ -483,7 +503,11 @@ public void flush() throws IOException { out.flush(); } }; - DownloadCommand downloadCmd = new DownloadCommand(uri, casDownload, digest, wrappedOut); + long offset = 0; + if (casBytesDownloaded.isPresent()) { + offset = casBytesDownloaded.get().get(); + } + DownloadCommand downloadCmd = new DownloadCommand(uri, casBytesDownloaded.isPresent(), digest, wrappedOut, offset); SettableFuture outerF = SettableFuture.create(); acquireDownloadChannel() .addListener( @@ -575,8 +599,10 @@ private void getAfterCredentialRefresh(DownloadCommand cmd, SettableFuture public ListenableFuture downloadActionResult( RemoteActionExecutionContext context, ActionKey actionKey, boolean inlineOutErr) { return Futures.transform( - Utils.downloadAsActionResult( - actionKey, (digest, out) -> get(digest, out, /* casDownload= */ false)), + retrier.executeAsync(() -> + Utils.downloadAsActionResult( + actionKey, (digest, out) -> get(digest, out, /* casBytesDownloaded= */ Optional.empty())) + ), CachedActionResult::remote, MoreExecutors.directExecutor()); } @@ -670,20 +696,21 @@ private void uploadAfterCredentialRefresh(UploadCommand upload, SettableFuture uploadFile( RemoteActionExecutionContext context, Digest digest, Path file) { - try { - return uploadAsync( - digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true); - } catch (IOException e) { - // Can be thrown from file.getInputStream. - return Futures.immediateFailedFuture(e); - } + return retrier.executeAsync(() -> { + try { + return uploadAsync(digest.getHash(), digest.getSizeBytes(), file.getInputStream(), /* casUpload= */ true); + } catch (IOException e) { + // Can be thrown from file.getInputStream. + return Futures.immediateFailedFuture(e); + } + }); } @Override public ListenableFuture uploadBlob( RemoteActionExecutionContext context, Digest digest, ByteString data) { - return uploadAsync( - digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true); + return retrier.executeAsync(() -> uploadAsync( + digest.getHash(), digest.getSizeBytes(), data.newInput(), /* casUpload= */ true)); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java index 50d83d138a1d66..e581fd50f4225f 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandler.java @@ -20,24 +20,18 @@ import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http.DefaultFullHttpRequest; -import io.netty.handler.codec.http.HttpContent; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpHeaderValues; -import io.netty.handler.codec.http.HttpMethod; -import io.netty.handler.codec.http.HttpObject; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpUtil; -import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http.LastHttpContent; +import io.netty.handler.codec.http.*; import io.netty.handler.timeout.ReadTimeoutException; import io.netty.util.internal.StringUtil; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.Map.Entry; +import java.util.Optional; +import java.util.OptionalInt; +import java.util.regex.Matcher; +import java.util.regex.Pattern; /** ChannelHandler for downloads. */ final class HttpDownloadHandler extends AbstractHttpHandler { @@ -51,6 +45,8 @@ final class HttpDownloadHandler extends AbstractHttpHandler { private long contentLength = -1; /** the path header in the http request */ private String path; + /** the bytes to skip in a full or chunked response */ + private long skipBytes; public HttpDownloadHandler( Credentials credentials, ImmutableList> extraHttpHeaders) { @@ -105,6 +101,19 @@ protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Ex ByteBuf content = ((HttpContent) msg).content(); int readableBytes = content.readableBytes(); + if (skipBytes > 0) { + int skipNow; + if (skipBytes < readableBytes) { + // readableBytes is an int, meaning skipBytes < readableBytes <= INT_MAX. + // So, this conversion is safe. + skipNow = (int)skipBytes; + } else { + skipNow = readableBytes; + } + content.readerIndex(content.readerIndex() + skipNow); + skipBytes -= skipNow; + readableBytes = readableBytes - skipNow; + } content.readBytes(out, readableBytes); bytesReceived += readableBytes; if (msg instanceof LastHttpContent) { @@ -137,6 +146,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) DownloadCommand cmd = (DownloadCommand) msg; out = cmd.out(); path = constructPath(cmd.uri(), cmd.digest().getHash(), cmd.casDownload()); + skipBytes = cmd.offset(); HttpRequest request = buildRequest(path, constructHost(cmd.uri())); addCredentialHeaders(request, cmd.uri()); addExtraRemoteHeaders(request); diff --git a/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java b/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java index 89fde56046a8d6..6a2bfd5a50b246 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java +++ b/src/main/java/com/google/devtools/build/lib/remote/http/HttpException.java @@ -18,7 +18,7 @@ import java.io.IOException; /** An exception that propagates the http status. */ -final class HttpException extends IOException { +public final class HttpException extends IOException { private final HttpResponse response; HttpException(HttpResponse response, String message, Throwable cause) { diff --git a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactoryTest.java b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactoryTest.java index 4a44a5acc7e688..4f4927ca16c048 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactoryTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/RemoteCacheClientFactoryTest.java @@ -18,6 +18,8 @@ import static org.junit.Assert.assertThrows; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import com.google.devtools.build.lib.clock.JavaClock; import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.disk.DiskAndRemoteCacheClient; @@ -32,6 +34,8 @@ import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; import com.google.devtools.common.options.Options; import java.io.IOException; +import java.util.concurrent.Executors; + import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -47,6 +51,13 @@ public class RemoteCacheClientFactoryTest { private final AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); private Path workingDirectory; private InMemoryFileSystem fs; + private ListeningScheduledExecutorService retryScheduler = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + private RemoteRetrier retrier = new RemoteRetrier( + () -> RemoteRetrier.RETRIES_DISABLED, + (e) -> false, + retryScheduler, + Retrier.ALLOW_ALL_CALLS); @Before public final void setUp() { @@ -63,7 +74,7 @@ public void createCombinedCacheWithExistingWorkingDirectory() throws IOException RemoteCacheClient blobStore = RemoteCacheClientFactory.create( - remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil); + remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil, retrier); assertThat(blobStore).isInstanceOf(DiskAndRemoteCacheClient.class); } @@ -76,7 +87,7 @@ public void createCombinedCacheWithNotExistingWorkingDirectory() throws IOExcept RemoteCacheClient blobStore = RemoteCacheClientFactory.create( - remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil); + remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil, retrier); assertThat(blobStore).isInstanceOf(DiskAndRemoteCacheClient.class); assertThat(workingDirectory.exists()).isTrue(); @@ -96,7 +107,8 @@ public void createCombinedCacheWithMissingWorkingDirectoryShouldThrowException() /* creds= */ null, authAndTlsOptions, /* workingDirectory= */ null, - digestUtil)); + digestUtil, + retrier)); } @Test @@ -106,7 +118,7 @@ public void createHttpCacheWithProxy() throws IOException { RemoteCacheClient blobStore = RemoteCacheClientFactory.create( - remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil); + remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil, retrier); assertThat(blobStore).isInstanceOf(HttpCacheClient.class); } @@ -125,7 +137,8 @@ public void createHttpCacheFailsWithUnsupportedProxyProtocol() { /* creds= */ null, authAndTlsOptions, workingDirectory, - digestUtil))) + digestUtil, + retrier))) .hasMessageThat() .contains("Remote cache proxy unsupported: bad-proxy"); } @@ -136,7 +149,7 @@ public void createHttpCacheWithoutProxy() throws IOException { RemoteCacheClient blobStore = RemoteCacheClientFactory.create( - remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil); + remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil, retrier); assertThat(blobStore).isInstanceOf(HttpCacheClient.class); } @@ -147,7 +160,7 @@ public void createDiskCache() throws IOException { RemoteCacheClient blobStore = RemoteCacheClientFactory.create( - remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil); + remoteOptions, /* creds= */ null, authAndTlsOptions, workingDirectory, digestUtil, retrier); assertThat(blobStore).isInstanceOf(DiskCacheClient.class); } diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/BUILD b/src/test/java/com/google/devtools/build/lib/remote/http/BUILD index 1486dda102a960..d0274727b8fca9 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/BUILD +++ b/src/test/java/com/google/devtools/build/lib/remote/http/BUILD @@ -21,6 +21,7 @@ java_test( test_class = "com.google.devtools.build.lib.AllTests", deps = [ "//src/main/java/com/google/devtools/build/lib/authandtls", + "//src/main/java/com/google/devtools/build/lib/remote:Retrier", "//src/main/java/com/google/devtools/build/lib/remote/common", "//src/main/java/com/google/devtools/build/lib/remote/http", "//src/main/java/com/google/devtools/build/lib/remote/util", diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java index 41cddce1827849..f7e82751df3d59 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpCacheClientTest.java @@ -16,8 +16,7 @@ import static com.google.common.truth.Truth.assertThat; import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture; import static java.util.Collections.singletonList; -import static org.junit.Assert.assertThrows; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -27,13 +26,19 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import build.bazel.remote.execution.v2.ActionResult; import build.bazel.remote.execution.v2.Digest; import com.google.auth.Credentials; import com.google.common.base.Charsets; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.devtools.build.lib.authandtls.AuthAndTLSOptions; +import com.google.common.util.concurrent.ListeningScheduledExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.devtools.build.lib.remote.RemoteRetrier; +import com.google.devtools.build.lib.remote.Retrier; import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext; +import com.google.devtools.build.lib.remote.common.RemoteCacheClient; import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.remote.util.TracingMetadataUtils; import com.google.devtools.build.lib.vfs.DigestHashFunction; @@ -44,17 +49,9 @@ import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFutureListener; -import io.netty.channel.ChannelHandler; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandler; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoopGroup; -import io.netty.channel.ServerChannel; -import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.epoll.Epoll; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerDomainSocketChannel; @@ -65,15 +62,8 @@ import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.unix.DomainSocketAddress; import io.netty.handler.codec.TooLongFrameException; -import io.netty.handler.codec.http.DefaultFullHttpResponse; -import io.netty.handler.codec.http.FullHttpRequest; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpServerCodec; -import io.netty.handler.codec.http.HttpUtil; -import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.*; + import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; @@ -82,16 +72,15 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; +import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; import java.util.function.IntFunction; import javax.annotation.Nullable; + +import io.netty.handler.codec.protobuf.ProtobufEncoder; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -262,9 +251,19 @@ private HttpCacheClient createHttpBlobStore( int timeoutSeconds, boolean remoteVerifyDownloads, @Nullable final Credentials creds, - AuthAndTLSOptions authAndTlsOptions) + AuthAndTLSOptions authAndTlsOptions, + Optional optRetrier) throws Exception { SocketAddress socketAddress = serverChannel.localAddress(); + RemoteRetrier retrier = optRetrier.orElseGet(() -> { + ListeningScheduledExecutorService retryScheduler = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + return new RemoteRetrier( + () -> RemoteRetrier.RETRIES_DISABLED, + (e) -> false, + retryScheduler, + Retrier.ALLOW_ALL_CALLS); + }); if (socketAddress instanceof DomainSocketAddress) { DomainSocketAddress domainSocketAddress = (DomainSocketAddress) socketAddress; URI uri = new URI("http://localhost"); @@ -276,6 +275,7 @@ private HttpCacheClient createHttpBlobStore( remoteVerifyDownloads, ImmutableList.of(), DIGEST_UTIL, + retrier, creds, authAndTlsOptions); } else if (socketAddress instanceof InetSocketAddress) { @@ -288,6 +288,7 @@ private HttpCacheClient createHttpBlobStore( remoteVerifyDownloads, ImmutableList.of(), DIGEST_UTIL, + retrier, creds, authAndTlsOptions); } else { @@ -303,7 +304,7 @@ private HttpCacheClient createHttpBlobStore( AuthAndTLSOptions authAndTlsOptions) throws Exception { return createHttpBlobStore( - serverChannel, timeoutSeconds, /* remoteVerifyDownloads= */ true, creds, authAndTlsOptions); + serverChannel, timeoutSeconds, /* remoteVerifyDownloads= */ true, creds, authAndTlsOptions, Optional.empty()); } @Before @@ -489,7 +490,8 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) /* timeoutSeconds= */ 1, /* remoteVerifyDownloads= */ true, credentials, - authAndTlsOptions); + authAndTlsOptions, + Optional.empty()); Digest fooDigest = DIGEST_UTIL.compute("foo".getBytes(Charsets.UTF_8)); try (OutputStream out = new ByteArrayOutputStream()) { IOException e = @@ -536,7 +538,8 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) /* timeoutSeconds= */ 1, /* remoteVerifyDownloads= */ false, credentials, - authAndTlsOptions); + authAndTlsOptions, + Optional.empty()); Digest fooDigest = DIGEST_UTIL.compute("foo".getBytes(Charsets.UTF_8)); try (ByteArrayOutputStream out = new ByteArrayOutputStream()) { getFromFuture(blobStore.downloadBlob(remoteActionExecutionContext, fooDigest, out)); @@ -547,6 +550,129 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) } } + @Test + public void partialDownloadFailsWithoutRetry() throws Exception { + ServerChannel server = null; + try { + ByteBuf chunk1 = Unpooled.wrappedBuffer("File ".getBytes(Charsets.US_ASCII)); + ByteBuf chunk2 = Unpooled.wrappedBuffer("Contents".getBytes(Charsets.US_ASCII)); + server = testServer.start(new IntermittentFailureHandler(chunk1, chunk2)); + Credentials credentials = newCredentials(); + AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); + + HttpCacheClient blobStore = + createHttpBlobStore(server, /* timeoutSeconds= */ 1, credentials, authAndTlsOptions); + assertThrows( + ClosedChannelException.class, + () -> + getFromFuture( + blobStore.downloadBlob( + remoteActionExecutionContext, DIGEST, new ByteArrayOutputStream()))); + } finally { + testServer.stop(server); + } + } + + @Test + public void partialDownloadSucceedsWithRetry() throws Exception { + ServerChannel server = null; + try { + ByteBuf chunk1 = Unpooled.wrappedBuffer("File ".getBytes(Charsets.US_ASCII)); + // Replace first chunk to test that the client skips the redundant prefix on retry. + ByteBuf chunk1_attempt2 = Unpooled.wrappedBuffer("abcde".getBytes(Charsets.US_ASCII)); + ByteBuf chunk2 = Unpooled.wrappedBuffer("Contents".getBytes(Charsets.US_ASCII)); + server = testServer.start(new IntermittentFailureHandler(chunk1, chunk1_attempt2, chunk2)); + Credentials credentials = newCredentials(); + AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); + + ListeningScheduledExecutorService retryScheduler = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + RemoteRetrier retrier = new RemoteRetrier( + () -> new Retrier.ZeroBackoff(1), + (e) -> { + return e instanceof ClosedChannelException; + }, + retryScheduler, + Retrier.ALLOW_ALL_CALLS); + HttpCacheClient blobStore = + createHttpBlobStore( + server, + /* timeoutSeconds= */ 1, + /* remoteVerifyDownloads= */ false, + credentials, + authAndTlsOptions, + Optional.of(retrier)); + + ByteArrayOutputStream download = new ByteArrayOutputStream(); + getFromFuture( + blobStore.downloadBlob( + remoteActionExecutionContext, DIGEST, download)); + assertArrayEquals("File Contents".getBytes(Charsets.US_ASCII), download.toByteArray()); + } finally { + testServer.stop(server); + } + } + + @Test + public void actionResultRetryReadsFromStart() throws Exception { + ServerChannel server = null; + try { + ActionResult.Builder builder1 = ActionResult.newBuilder(); + builder1 + .addOutputFilesBuilder() + .setPath("attempt1/filename") + .setDigest(DIGEST_UTIL.computeAsUtf8("digest1")) + .setIsExecutable(true); + ActionResult action1 = builder1.build(); + ByteArrayOutputStream buffer1 = new ByteArrayOutputStream(); + action1.writeTo(buffer1); + int splitAt = buffer1.size() / 2; + ByteBuf chunk1 = Unpooled.copiedBuffer(buffer1.toByteArray(), 0, splitAt); + + // Replace first chunk to test that the client starts a fresh ActionResult download on retry. + ActionResult.Builder builder2 = ActionResult.newBuilder(); + builder2 + .addOutputFilesBuilder() + .setPath("attempt2/filename") + .setDigest(DIGEST_UTIL.computeAsUtf8("digest2")) + .setIsExecutable(false); + ActionResult action2 = builder2.build(); + ByteArrayOutputStream buffer2 = new ByteArrayOutputStream(); + action2.writeTo(buffer2); + ByteBuf chunk1_attempt2 = Unpooled.copiedBuffer(buffer2.toByteArray(), 0, splitAt); + ByteBuf chunk2 = Unpooled.copiedBuffer(buffer2.toByteArray(), splitAt, buffer2.size() - splitAt); + + server = testServer.start(new IntermittentFailureHandler(chunk1, chunk1_attempt2, chunk2)); + Credentials credentials = newCredentials(); + AuthAndTLSOptions authAndTlsOptions = Options.getDefaults(AuthAndTLSOptions.class); + + ListeningScheduledExecutorService retryScheduler = + MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(1)); + RemoteRetrier retrier = new RemoteRetrier( + () -> new Retrier.ZeroBackoff(1), + (e) -> { + return e instanceof ClosedChannelException; + }, + retryScheduler, + Retrier.ALLOW_ALL_CALLS); + HttpCacheClient blobStore = + createHttpBlobStore( + server, + /* timeoutSeconds= */ 1, + /* remoteVerifyDownloads= */ false, + credentials, + authAndTlsOptions, + Optional.of(retrier)); + + RemoteCacheClient.CachedActionResult download = getFromFuture( + blobStore.downloadActionResult( + remoteActionExecutionContext, new RemoteCacheClient.ActionKey(DIGEST), false)); + assertThat(download.actionResult()).isEqualTo(action2); + } finally { + testServer.stop(server); + } + } + @Test public void expiredAuthTokensShouldBeRetried_get() throws Exception { expiredAuthTokensShouldBeRetried_get( @@ -769,4 +895,45 @@ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) } } } + + /** + * {@link ChannelHandler} that on the first request returns a partial response and then closes the stream, + * and on any further requests returns a full response. + */ + @Sharable + static class IntermittentFailureHandler extends SimpleChannelInboundHandler { + private final ByteBuf attempt1_chunk1, attempt2_chunk1, attempt2_chunk2; + private int messageCount; + + public IntermittentFailureHandler(ByteBuf attempt1_chunk1, ByteBuf attempt2_chunk1, ByteBuf attempt2_chunk2) { + this.attempt1_chunk1 = attempt1_chunk1; + this.attempt2_chunk1 = attempt2_chunk1; + this.attempt2_chunk2 = attempt2_chunk2; + } + + public IntermittentFailureHandler(ByteBuf chunk1, ByteBuf chunk2) { + this(chunk1.copy(), chunk1, chunk2); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) { + DefaultHttpResponse response = new DefaultHttpResponse( + HttpVersion.HTTP_1_1, + HttpResponseStatus.OK); + response.headers().set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + ctx.write(response); + if (messageCount == 0) { + ctx + .writeAndFlush(new DefaultHttpContent(attempt1_chunk1)) + .addListener(ChannelFutureListener.CLOSE); + } else { + ctx + .writeAndFlush(new DefaultHttpContent(attempt2_chunk1)); + ctx + .writeAndFlush(new DefaultLastHttpContent(attempt2_chunk2)) + .addListener(ChannelFutureListener.CLOSE); + } + ++messageCount; + } + } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandlerTest.java b/src/test/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandlerTest.java index 35eecfbd90bcd6..eef192e59075ae 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandlerTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/http/HttpDownloadHandlerTest.java @@ -162,4 +162,60 @@ public void httpErrorsWithContentAreSupported() throws IOException { verify(out, never()).close(); assertThat(ch.isOpen()).isFalse(); } + + /** + * Test that the handler correctly supports downloads at an offset, e.g. on retry. + */ + @Test + public void downloadAtOffsetShouldWork() throws IOException { + EmbeddedChannel ch = new EmbeddedChannel(new HttpDownloadHandler(null, ImmutableList.of())); + ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream()); + DownloadCommand cmd = new DownloadCommand(CACHE_URI, true, DIGEST, out, 2); + ChannelPromise writePromise = ch.newPromise(); + ch.writeOneOutbound(cmd, writePromise); + + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaders.CONTENT_LENGTH, 5); + response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + ch.writeInbound(response); + ByteBuf content = Unpooled.buffer(); + content.writeBytes(new byte[] {1, 2, 3, 4, 5}); + ch.writeInbound(new DefaultLastHttpContent(content)); + + assertThat(writePromise.isDone()).isTrue(); + assertThat(out.toByteArray()).isEqualTo(new byte[] {3, 4, 5}); + verify(out, never()).close(); + assertThat(ch.isActive()).isTrue(); + } + + /** + * Test that the handler correctly supports chunked downloads at an offset, e.g. on retry. + */ + @Test + public void chunkedDownloadAtOffsetShouldWork() throws IOException { + EmbeddedChannel ch = new EmbeddedChannel(new HttpDownloadHandler(null, ImmutableList.of())); + ByteArrayOutputStream out = Mockito.spy(new ByteArrayOutputStream()); + DownloadCommand cmd = new DownloadCommand(CACHE_URI, true, DIGEST, out, 3); + ChannelPromise writePromise = ch.newPromise(); + ch.writeOneOutbound(cmd, writePromise); + + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.headers().set(HttpHeaders.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); + response.headers().set(HttpHeaders.CONNECTION, HttpHeaderValues.KEEP_ALIVE); + ch.writeInbound(response); + ByteBuf content1 = Unpooled.buffer(); + content1.writeBytes(new byte[] {1, 2}); + ch.writeInbound(new DefaultHttpContent(content1)); + ByteBuf content2 = Unpooled.buffer(); + content2.writeBytes(new byte[] {3, 4}); + ch.writeInbound(new DefaultHttpContent(content2)); + ByteBuf content3 = Unpooled.buffer(); + content3.writeBytes(new byte[] {5}); + ch.writeInbound(new DefaultLastHttpContent(content3)); + + assertThat(writePromise.isDone()).isTrue(); + assertThat(out.toByteArray()).isEqualTo(new byte[] {4, 5}); + verify(out, never()).close(); + assertThat(ch.isActive()).isTrue(); + } }