diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java index 815a84a77ed871..14f1190cdfad2c 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java @@ -853,7 +853,8 @@ private static class ThrowingBuildEventArtifactUploaderSupplier { } BuildEventArtifactUploader get() throws IOException { - if (memoizedValue == null && exception == null) { + boolean needsInitialization = memoizedValue == null; + if (needsInitialization && exception == null) { try { memoizedValue = callable.call(); } catch (IOException e) { @@ -864,6 +865,9 @@ BuildEventArtifactUploader get() throws IOException { } } if (memoizedValue != null) { + if (!needsInitialization) { + memoizedValue.retain(); + } return memoizedValue; } throw exception; diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java index 98064b64aaaff6..e5234c7589ece5 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceUploader.java @@ -355,7 +355,7 @@ public void run() { logger.atSevere().log("BES upload failed due to a RuntimeException / Error. This is a bug."); throw e; } finally { - buildEventUploader.shutdown(); + buildEventUploader.release(); MoreExecutors.shutdownAndAwaitTermination(timeoutExecutor, 0, TimeUnit.MILLISECONDS); closeFuture.set(null); } diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BUILD b/src/main/java/com/google/devtools/build/lib/buildeventstream/BUILD index cc0ffd813f6e2d..e7f4606cd24580 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BUILD +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BUILD @@ -29,6 +29,7 @@ java_library( "//third_party:flogger", "//third_party:guava", "//third_party:jsr305", + "//third_party:netty", "//third_party/protobuf:protobuf_java", ], ) diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java index aeb6afd6b8afc7..fe8796d8189c77 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploader.java @@ -20,6 +20,7 @@ import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile; import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile.LocalFileType; import com.google.devtools.build.lib.vfs.Path; +import io.netty.util.ReferenceCounted; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -30,7 +31,7 @@ import javax.annotation.Nullable; /** Uploads artifacts referenced by the Build Event Protocol (BEP). */ -public interface BuildEventArtifactUploader { +public interface BuildEventArtifactUploader extends ReferenceCounted { /** * Asynchronously uploads a set of files referenced by the protobuf representation of a {@link * BuildEvent}. This method is expected to return quickly. @@ -78,11 +79,6 @@ public ListenableFuture uriFuture() { } }; - /** - * Shutdown any resources associated with the uploader. - */ - void shutdown(); - /** * Return true if the upload may be "slow". Examples of slowness include writes to remote storage. */ diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/LocalFilesArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/LocalFilesArtifactUploader.java index c20be67aec887e..502eb01e7624c5 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/LocalFilesArtifactUploader.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/LocalFilesArtifactUploader.java @@ -18,13 +18,16 @@ import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile; import com.google.devtools.build.lib.buildeventstream.PathConverter.FileUriPathConverter; import com.google.devtools.build.lib.vfs.Path; +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.ReferenceCounted; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import javax.annotation.Nullable; /** An uploader that simply turns paths into local file URIs. */ -public class LocalFilesArtifactUploader implements BuildEventArtifactUploader { +public class LocalFilesArtifactUploader extends AbstractReferenceCounted + implements BuildEventArtifactUploader { private static final FileUriPathConverter FILE_URI_PATH_CONVERTER = new FileUriPathConverter(); private final ConcurrentHashMap fileIsDirectory = new ConcurrentHashMap<>(); @@ -34,10 +37,15 @@ public ListenableFuture upload(Map files) { } @Override - public void shutdown() { + protected void deallocate() { // Intentionally left empty } + @Override + public ReferenceCounted touch(Object o) { + return this; + } + @Override public boolean mayBeSlow() { return false; diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java index 140224cb3905e9..9078d0937a4e52 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java @@ -156,7 +156,7 @@ public void run() { } catch (IOException e) { logger.atSevere().withCause(e).log("Failed to close BEP file output stream."); } finally { - uploader.shutdown(); + uploader.release(); timeoutExecutor.shutdown(); } closeFuture.set(null); diff --git a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java index 89fa1f91e89d53..1b2fd7c4a539c5 100644 --- a/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java +++ b/src/main/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploader.java @@ -32,6 +32,8 @@ import com.google.devtools.build.lib.remote.util.DigestUtil; import com.google.devtools.build.lib.vfs.Path; import io.grpc.Context; +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.ReferenceCounted; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -43,10 +45,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; -/** - * A {@link BuildEventArtifactUploader} backed by {@link ByteStreamUploader}. - */ -class ByteStreamBuildEventArtifactUploader implements BuildEventArtifactUploader { +/** A {@link BuildEventArtifactUploader} backed by {@link ByteStreamUploader}. */ +class ByteStreamBuildEventArtifactUploader extends AbstractReferenceCounted + implements BuildEventArtifactUploader { private final ListeningExecutorService uploadExecutor; private final Context ctx; @@ -243,7 +244,7 @@ public boolean mayBeSlow() { } @Override - public void shutdown() { + protected void deallocate() { if (shutdown.getAndSet(true)) { return; } @@ -251,6 +252,11 @@ public void shutdown() { uploadExecutor.shutdown(); } + @Override + public ReferenceCounted touch(Object o) { + return this; + } + private static class PathConverterImpl implements PathConverter { private final String remoteServerInstanceName; diff --git a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventArtifactUploaderFactory.java b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventArtifactUploaderFactory.java index d840fc602993d7..c793ecc08bf0f6 100644 --- a/src/main/java/com/google/devtools/build/lib/runtime/BuildEventArtifactUploaderFactory.java +++ b/src/main/java/com/google/devtools/build/lib/runtime/BuildEventArtifactUploaderFactory.java @@ -26,7 +26,7 @@ public interface BuildEventArtifactUploaderFactory { /** * Returns a new instance of a {@link BuildEventArtifactUploader}. The call is responsible for - * calling {@link BuildEventArtifactUploader#shutdown()} on the returned instance. + * calling {@link BuildEventArtifactUploader#release()} on the returned instance. */ BuildEventArtifactUploader create(CommandEnvironment env) throws InvalidPackagePathSymlinkException; diff --git a/src/test/java/com/google/devtools/build/lib/buildeventstream/BUILD b/src/test/java/com/google/devtools/build/lib/buildeventstream/BUILD index 29e146431a3b25..197af076cf554a 100644 --- a/src/test/java/com/google/devtools/build/lib/buildeventstream/BUILD +++ b/src/test/java/com/google/devtools/build/lib/buildeventstream/BUILD @@ -30,6 +30,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs", "//third_party:guava", "//third_party:junit4", + "//third_party:netty", "//third_party:truth", ], ) diff --git a/src/test/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploaderFactoryMapTest.java b/src/test/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploaderFactoryMapTest.java index a760ef07947506..6c92e6542d7ff8 100644 --- a/src/test/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploaderFactoryMapTest.java +++ b/src/test/java/com/google/devtools/build/lib/buildeventstream/BuildEventArtifactUploaderFactoryMapTest.java @@ -22,6 +22,7 @@ import com.google.devtools.build.lib.runtime.BuildEventArtifactUploaderFactoryMap; import com.google.devtools.build.lib.runtime.CommandEnvironment; import com.google.devtools.build.lib.vfs.Path; +import io.netty.util.ReferenceCounted; import java.io.IOException; import java.util.Map; import org.junit.Before; @@ -51,8 +52,38 @@ public boolean mayBeSlow() { } @Override - public void shutdown() { - // Intentionally left empty. + public int refCnt() { + return 0; + } + + @Override + public ReferenceCounted retain() { + return this; + } + + @Override + public ReferenceCounted retain(int i) { + return this; + } + + @Override + public ReferenceCounted touch() { + return this; + } + + @Override + public ReferenceCounted touch(Object o) { + return this; + } + + @Override + public boolean release() { + return false; + } + + @Override + public boolean release(int i) { + return false; } }; uploaderFactories = diff --git a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BUILD b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BUILD index 4bbc6a416c0593..6527e0c0d6720a 100644 --- a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BUILD +++ b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BUILD @@ -26,6 +26,7 @@ java_test( "//third_party:guava", "//third_party:junit4", "//third_party:mockito", + "//third_party:netty", "//third_party:truth", "//third_party/protobuf:protobuf_java", "//third_party/protobuf:protobuf_java_util", diff --git a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java index f5ca2f9103c2bd..82b3271c576bc1 100644 --- a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java +++ b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java @@ -43,6 +43,8 @@ import com.google.devtools.build.lib.buildeventstream.PathConverter.FileUriPathConverter; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.common.options.Options; +import io.netty.util.AbstractReferenceCounted; +import io.netty.util.ReferenceCounted; import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; @@ -137,7 +139,7 @@ public void testCancelledUpload() throws Exception { BuildEventArtifactUploader uploader = Mockito.spy( - new BuildEventArtifactUploader() { + new BuildEventArtifactUploaderWithRefCounting() { @Override public ListenableFuture upload(Map files) { return Futures.immediateCancelledFuture(); @@ -147,9 +149,6 @@ public ListenableFuture upload(Map files) { public boolean mayBeSlow() { return false; } - - @Override - public void shutdown() {} }); File output = tmp.newFile(); @@ -248,7 +247,7 @@ public void testWritesWithUploadDelays() throws Exception { BuildEventArtifactUploader uploader = Mockito.spy( - new BuildEventArtifactUploader() { + new BuildEventArtifactUploaderWithRefCounting() { @Override public ListenableFuture upload(Map files) { if (files.containsKey(file1)) { @@ -261,11 +260,6 @@ public ListenableFuture upload(Map files) { public boolean mayBeSlow() { return true; } - - @Override - public void shutdown() { - // Intentionally left empty. - } }); File output = tmp.newFile(); BufferedOutputStream outputStream = @@ -284,7 +278,7 @@ public void shutdown() { assertThat(in.available()).isEqualTo(0); } - verify(uploader).shutdown(); + verify(uploader).release(); } /** Regression test for b/207287675 */ @@ -296,7 +290,7 @@ public void testHandlesDuplicateFiles() throws Exception { BuildEventArtifactUploader uploader = Mockito.spy( - new BuildEventArtifactUploader() { + new BuildEventArtifactUploaderWithRefCounting() { @Override public ListenableFuture upload(Map files) { return Futures.immediateFuture(new FileUriPathConverter()); @@ -306,11 +300,6 @@ public ListenableFuture upload(Map files) { public boolean mayBeSlow() { return false; } - - @Override - public void shutdown() { - // Intentionally left empty. - } }); File output = tmp.newFile(); BufferedOutputStream outputStream = @@ -338,7 +327,7 @@ public void testCloseWaitsForWritesToFinish() throws Exception { SettableFuture upload = SettableFuture.create(); BuildEventArtifactUploader uploader = Mockito.spy( - new BuildEventArtifactUploader() { + new BuildEventArtifactUploaderWithRefCounting() { @Override public ListenableFuture upload(Map files) { return upload; @@ -348,11 +337,6 @@ public ListenableFuture upload(Map files) { public boolean mayBeSlow() { return false; } - - @Override - public void shutdown() { - // Intentionally left empty. - } }); File output = tmp.newFile(); @@ -373,7 +357,7 @@ public void shutdown() { assertThat(in.available()).isEqualTo(0); } - verify(uploader).shutdown(); + verify(uploader).release(); } private static class WithLocalFilesEvent implements BuildEvent { @@ -418,4 +402,16 @@ public Collection getChildrenEvents() { return ImmutableList.of(BuildEventIdUtil.progressId(id + 1)); } } + + private abstract static class BuildEventArtifactUploaderWithRefCounting + extends AbstractReferenceCounted implements BuildEventArtifactUploader { + + @Override + protected void deallocate() {} + + @Override + public ReferenceCounted touch(Object o) { + return this; + } + } } diff --git a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java index 5c44697da14a97..da6daeebd53f57 100644 --- a/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java +++ b/src/test/java/com/google/devtools/build/lib/remote/ByteStreamBuildEventArtifactUploaderTest.java @@ -181,7 +181,7 @@ public void uploadsShouldWork() throws Exception { .isEqualTo("bytestream://localhost/instance/blobs/" + hash + "/" + size); } - artifactUploader.shutdown(); + artifactUploader.release(); assertThat(uploader.refCnt()).isEqualTo(0); assertThat(refCntChannel.isShutdown()).isTrue(); @@ -198,7 +198,7 @@ public void testUploadDirectoryDoesNotCrash() throws Exception { PathConverter pathConverter = artifactUploader.upload(filesToUpload).get(); assertThat(pathConverter.apply(dir)).isNull(); - artifactUploader.shutdown(); + artifactUploader.release(); } @Test @@ -267,7 +267,7 @@ public void onCompleted() { assertThat(e.getCause().getCause()).isInstanceOf(StatusRuntimeException.class); assertThat(Status.fromThrowable(e).getCode()).isEqualTo(Status.CANCELLED.getCode()); - artifactUploader.shutdown(); + artifactUploader.release(); assertThat(uploader.refCnt()).isEqualTo(0); assertThat(refCntChannel.isShutdown()).isTrue();