diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java index 56991dfa044eb6..430484102653ec 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceModule.java @@ -13,6 +13,7 @@ // limitations under the License. package com.google.devtools.build.lib.buildeventservice; +import static com.google.common.collect.ImmutableMap.toImmutableMap; import static java.util.concurrent.TimeUnit.MILLISECONDS; import com.google.common.annotations.VisibleForTesting; @@ -80,9 +81,11 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.time.Duration; +import java.util.AbstractMap.SimpleEntry; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -113,6 +116,9 @@ public abstract class BuildEventServiceModule new BufferedOutputStream(Files.newOutputStream(Paths.get(file))); + /** * Holds the close futures for the upload of each transport with timeouts attached to them using * {@link #constructCloseFuturesMapWithTimeouts(ImmutableMap)} obtained from {@link @@ -133,8 +139,6 @@ public abstract class BuildEventServiceModule> halfCloseFuturesWithTimeoutsMap = ImmutableMap.of(); - private BesUploadMode previousUploadMode = BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE; - // TODO(lpino): Use Optional instead of @Nullable for the members below. @Nullable private OutErr outErr; @Nullable private ImmutableSet bepTransports; @@ -198,6 +202,17 @@ private void cancelAndResetPendingUploads() { resetPendingUploads(); } + private void removeFromPendingUploads( + Map> transportFutures) { + transportFutures + .values() + .forEach(closeFuture -> closeFuture.cancel(/* mayInterruptIfRunning= */ true)); + closeFuturesWithTimeoutsMap = + closeFuturesWithTimeoutsMap.entrySet().stream() + .filter(entry -> !transportFutures.containsKey(entry.getKey())) + .collect(toImmutableMap(Entry::getKey, Entry::getValue)); + } + private static boolean isTimeoutException(ExecutionException e) { return e.getCause() instanceof TimeoutException; } @@ -219,20 +234,31 @@ private void waitForPreviousInvocation(boolean isShutdown) { return; } - ImmutableMap> waitingFutureMap = null; - boolean cancelCloseFutures = true; - switch (previousUploadMode) { - case FULLY_ASYNC: - waitingFutureMap = - isShutdown ? closeFuturesWithTimeoutsMap : halfCloseFuturesWithTimeoutsMap; - cancelCloseFutures = false; - break; - case WAIT_FOR_UPLOAD_COMPLETE: - case NOWAIT_FOR_UPLOAD_COMPLETE: - waitingFutureMap = closeFuturesWithTimeoutsMap; - cancelCloseFutures = true; - break; - } + ImmutableMap> waitingFutureMap = + closeFuturesWithTimeoutsMap.entrySet().stream() + .map( + entry -> { + var transport = entry.getKey(); + var closeFuture = entry.getValue(); + ListenableFuture future = closeFuture; + if (transport.getBesUploadMode() == BesUploadMode.FULLY_ASYNC) { + future = + isShutdown ? closeFuture : halfCloseFuturesWithTimeoutsMap.get(transport); + if (future == null) { + future = closeFuture; + } + } + return new SimpleEntry<>(transport, future); + }) + .collect(toImmutableMap(Entry::getKey, Entry::getValue)); + ImmutableMap> cancelCloseFutures = + closeFuturesWithTimeoutsMap.entrySet().stream() + .filter( + entry -> { + var transport = entry.getKey(); + return transport.getBesUploadMode() != BesUploadMode.FULLY_ASYNC; + }) + .collect(toImmutableMap(Entry::getKey, Entry::getValue)); Stopwatch stopwatch = Stopwatch.createStarted(); try { @@ -261,7 +287,7 @@ private void waitForPreviousInvocation(boolean isShutdown) { waitedMillis / 1000, waitedMillis % 1000); reporter.handle(Event.warn(msg)); logger.atWarning().withCause(exception).log("%s", msg); - cancelCloseFutures = true; + cancelCloseFutures = closeFuturesWithTimeoutsMap; } catch (ExecutionException e) { String msg; // Futures.withTimeout wraps the TimeoutException in an ExecutionException when the future @@ -281,13 +307,12 @@ private void waitForPreviousInvocation(boolean isShutdown) { } reporter.handle(Event.warn(msg)); logger.atWarning().withCause(e).log("%s", msg); - cancelCloseFutures = true; + cancelCloseFutures = closeFuturesWithTimeoutsMap; } finally { - if (cancelCloseFutures) { - cancelAndResetPendingUploads(); - } else { - resetPendingUploads(); - } + cancelCloseFutures + .values() + .forEach(closeFuture -> closeFuture.cancel(/* mayInterruptIfRunning= */ true)); + resetPendingUploads(); } } @@ -481,8 +506,7 @@ public void blazeShutdown() { } private void waitForBuildEventTransportsToClose( - Map> transportFutures, - boolean besUploadModeIsSynchronous) + Map> transportFutures) throws AbruptExitException { final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor( @@ -519,9 +543,7 @@ private void waitForBuildEventTransportsToClose( e.getCause().getMessage()), e); } finally { - if (besUploadModeIsSynchronous) { - cancelAndResetPendingUploads(); - } + removeFromPendingUploads(transportFutures); executor.shutdown(); } } @@ -579,17 +601,16 @@ public boolean cancel(boolean mayInterruptIfRunning) { } private void closeBepTransports() throws AbruptExitException { - previousUploadMode = besOptions.besUploadMode; closeFuturesWithTimeoutsMap = constructCloseFuturesMapWithTimeouts(streamer.getCloseFuturesMap()); halfCloseFuturesWithTimeoutsMap = constructCloseFuturesMapWithTimeouts(streamer.getHalfClosedMap()); - boolean besUploadModeIsSynchronous = - besOptions.besUploadMode == BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE; Map> blockingTransportFutures = new HashMap<>(); for (Map.Entry> entry : closeFuturesWithTimeoutsMap.entrySet()) { BuildEventTransport bepTransport = entry.getKey(); + boolean besUploadModeIsSynchronous = + bepTransport.getBesUploadMode() == BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE; if (!bepTransport.mayBeSlow() || besUploadModeIsSynchronous) { blockingTransportFutures.put(bepTransport, entry.getValue()); } else { @@ -599,7 +620,7 @@ private void closeBepTransports() throws AbruptExitException { } } if (!blockingTransportFutures.isEmpty()) { - waitForBuildEventTransportsToClose(blockingTransportFutures, besUploadModeIsSynchronous); + waitForBuildEventTransportsToClose(blockingTransportFutures); } } @@ -764,16 +785,18 @@ private ImmutableSet createBepTransports( if (!Strings.isNullOrEmpty(besStreamOptions.buildEventTextFile)) { try { BufferedOutputStream bepTextOutputStream = - new BufferedOutputStream( - Files.newOutputStream(Paths.get(besStreamOptions.buildEventTextFile))); - + buildEventOutputStreamFactory.create(besStreamOptions.buildEventTextFile); BuildEventArtifactUploader localFileUploader = besStreamOptions.buildEventTextFilePathConversion ? uploaderSupplier.get() : new LocalFilesArtifactUploader(); bepTransportsBuilder.add( new TextFormatFileTransport( - bepTextOutputStream, bepOptions, localFileUploader, artifactGroupNamer)); + bepTextOutputStream, + bepOptions, + localFileUploader, + artifactGroupNamer, + besStreamOptions.buildEventTextFileUploadMode)); } catch (IOException exception) { // TODO(b/125216340): Consider making this a warning instead of an error once the // associated bug has been resolved. @@ -791,16 +814,18 @@ private ImmutableSet createBepTransports( if (!Strings.isNullOrEmpty(besStreamOptions.buildEventBinaryFile)) { try { BufferedOutputStream bepBinaryOutputStream = - new BufferedOutputStream( - Files.newOutputStream(Paths.get(besStreamOptions.buildEventBinaryFile))); - + buildEventOutputStreamFactory.create(besStreamOptions.buildEventBinaryFile); BuildEventArtifactUploader localFileUploader = besStreamOptions.buildEventBinaryFilePathConversion ? uploaderSupplier.get() : new LocalFilesArtifactUploader(); bepTransportsBuilder.add( new BinaryFormatFileTransport( - bepBinaryOutputStream, bepOptions, localFileUploader, artifactGroupNamer)); + bepBinaryOutputStream, + bepOptions, + localFileUploader, + artifactGroupNamer, + besStreamOptions.buildEventBinaryFileUploadMode)); } catch (IOException exception) { // TODO(b/125216340): Consider making this a warning instead of an error once the // associated bug has been resolved. @@ -818,8 +843,7 @@ private ImmutableSet createBepTransports( if (!Strings.isNullOrEmpty(besStreamOptions.buildEventJsonFile)) { try { BufferedOutputStream bepJsonOutputStream = - new BufferedOutputStream( - Files.newOutputStream(Paths.get(besStreamOptions.buildEventJsonFile))); + buildEventOutputStreamFactory.create(besStreamOptions.buildEventJsonFile); BuildEventArtifactUploader localFileUploader = besStreamOptions.buildEventJsonFilePathConversion ? uploaderSupplier.get() @@ -830,7 +854,8 @@ private ImmutableSet createBepTransports( bepOptions, localFileUploader, artifactGroupNamer, - makeJsonTypeRegistry())); + makeJsonTypeRegistry(), + besStreamOptions.buildEventJsonFileUploadMode)); } catch (IOException exception) { // TODO(b/125216340): Consider making this a warning instead of an error once the // associated bug has been resolved. @@ -877,6 +902,11 @@ protected abstract BuildEventServiceClient getBesClient( protected abstract Set allowedCommands(OptionsT besOptions); + @VisibleForTesting + void setBuildEventOutputStreamFactory(BuildEventOutputStreamFactory factory) { + this.buildEventOutputStreamFactory = factory; + } + protected ImmutableSet getBesKeywords( OptionsT besOptions, @Nullable OptionsParsingResult startupOptionsProvider) { List userKeywords = besOptions.besKeywords; @@ -933,4 +963,9 @@ BuildEventArtifactUploader get() throws IOException { throw exception; } } + + @VisibleForTesting + interface BuildEventOutputStreamFactory { + BufferedOutputStream create(String file) throws IOException; + } } diff --git a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java index e47edc79000b6e..7511621ebc30c9 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventservice/BuildEventServiceTransport.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.ListenableFuture; +import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode; import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient; import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; import com.google.devtools.build.lib.buildeventstream.BuildEvent; @@ -37,6 +38,7 @@ public class BuildEventServiceTransport implements BuildEventTransport { private final BuildEventServiceUploader besUploader; private final Duration besTimeout; + private final BesUploadMode besUploadMode; private BuildEventServiceTransport( BuildEventServiceClient besClient, @@ -49,7 +51,8 @@ private BuildEventServiceTransport( EventBus eventBus, Duration closeTimeout, Sleeper sleeper, - Timestamp commandStartTime) { + Timestamp commandStartTime, + BesUploadMode besUploadMode) { this.besTimeout = closeTimeout; this.besUploader = new BuildEventServiceUploader.Builder() @@ -64,6 +67,7 @@ private BuildEventServiceTransport( .eventBus(eventBus) .commandStartTime(commandStartTime) .build(); + this.besUploadMode = besUploadMode; } @Override @@ -91,6 +95,11 @@ public boolean mayBeSlow() { return true; } + @Override + public BesUploadMode getBesUploadMode() { + return besUploadMode; + } + @Override public void sendBuildEvent(BuildEvent event) { besUploader.enqueueEvent(event); @@ -188,7 +197,8 @@ public BuildEventServiceTransport build() { checkNotNull(eventBus), (besOptions.besTimeout != null) ? besOptions.besTimeout : Duration.ZERO, sleeper != null ? sleeper : new JavaSleeper(), - checkNotNull(commandStartTime)); + checkNotNull(commandStartTime), + besOptions.besUploadMode); } } } diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BUILD b/src/main/java/com/google/devtools/build/lib/buildeventstream/BUILD index 856292047af2dc..be3e7876fedec7 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BUILD +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BUILD @@ -21,6 +21,7 @@ java_library( deps = [ "//src/main/java/com/google/devtools/build/lib/actions:artifacts", "//src/main/java/com/google/devtools/build/lib/actions:file_metadata", + "//src/main/java/com/google/devtools/build/lib/buildeventservice:buildeventservice-options", "//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto", "//src/main/java/com/google/devtools/build/lib/cmdline", "//src/main/java/com/google/devtools/build/lib/collect/nestedset", diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java index 5b2d2b31192f7a..945afedc2a4034 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/BuildEventTransport.java @@ -13,8 +13,8 @@ // limitations under the License. package com.google.devtools.build.lib.buildeventstream; -import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.ListenableFuture; +import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode; import java.time.Duration; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -24,14 +24,12 @@ * *

All implementations need to be thread-safe. All methods are expected to return quickly. * - *

Notice that this interface does not provide any error handling API. A transport may choose - * to log interesting errors to the command line and/or abort the whole build. + *

Notice that this interface does not provide any error handling API. A transport may choose to + * log interesting errors to the command line and/or abort the whole build. */ @ThreadSafe public interface BuildEventTransport { - /** - * The name of this transport as can be displayed to a user. - */ + /** The name of this transport as can be displayed to a user. */ String name(); /** @@ -87,7 +85,9 @@ default Duration getTimeout() { */ boolean mayBeSlow(); - @VisibleForTesting + /** Returns the desired {@link BesUploadMode} for the transport. */ + BesUploadMode getBesUploadMode(); + @Nullable BuildEventArtifactUploader getUploader(); } diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BUILD b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BUILD index b74731e3b3481f..fcab7f71bf7fbc 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BUILD +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BUILD @@ -15,6 +15,7 @@ java_library( name = "transports", srcs = glob(["*.java"]), deps = [ + "//src/main/java/com/google/devtools/build/lib/buildeventservice:buildeventservice-options", "//src/main/java/com/google/devtools/build/lib/buildeventstream", "//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto", "//src/main/java/com/google/devtools/build/lib/util:abrupt_exit_exception", diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java index c3a83c0a1f8ea8..c335338b25ff99 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransport.java @@ -14,6 +14,7 @@ package com.google.devtools.build.lib.buildeventstream.transports; +import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode; import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; import com.google.devtools.build.lib.buildeventstream.BuildEvent; import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; @@ -34,8 +35,9 @@ public BinaryFormatFileTransport( BufferedOutputStream outputStream, BuildEventProtocolOptions options, BuildEventArtifactUploader uploader, - ArtifactGroupNamer namer) { - super(outputStream, options, uploader, namer); + ArtifactGroupNamer namer, + BesUploadMode besUploadMode) { + super(outputStream, options, uploader, namer, besUploadMode); } @Override diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BuildEventStreamOptions.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BuildEventStreamOptions.java index 6d38019da6650e..15ef42036fdb86 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BuildEventStreamOptions.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/BuildEventStreamOptions.java @@ -14,6 +14,8 @@ package com.google.devtools.build.lib.buildeventstream.transports; +import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode; +import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadModeConverter; import com.google.devtools.common.options.Option; import com.google.devtools.common.options.OptionDocumentationCategory; import com.google.devtools.common.options.OptionEffectTag; @@ -46,7 +48,6 @@ public class BuildEventStreamOptions extends OptionsBase { name = "build_event_binary_file", oldName = "experimental_build_event_binary_file", defaultValue = "", - implicitRequirements = {"--bes_upload_mode=wait_for_upload_complete"}, documentationCategory = OptionDocumentationCategory.LOGGING, effectTags = {OptionEffectTag.AFFECTS_OUTPUTS}, help = @@ -59,7 +60,6 @@ public class BuildEventStreamOptions extends OptionsBase { name = "build_event_json_file", oldName = "experimental_build_event_json_file", defaultValue = "", - implicitRequirements = {"--bes_upload_mode=wait_for_upload_complete"}, documentationCategory = OptionDocumentationCategory.LOGGING, effectTags = {OptionEffectTag.AFFECTS_OUTPUTS}, help = @@ -67,15 +67,55 @@ public class BuildEventStreamOptions extends OptionsBase { + " This option implies --bes_upload_mode=wait_for_upload_complete.") public String buildEventJsonFile; + @Option( + name = "build_event_text_file_upload_mode", + defaultValue = "wait_for_upload_complete", + converter = BesUploadModeConverter.class, + documentationCategory = OptionDocumentationCategory.LOGGING, + effectTags = {OptionEffectTag.EAGERNESS_TO_EXIT}, + help = + "Specifies whether the Build Event Service upload for --build_event_text_file should" + + " block the build completion or should end the invocation immediately and finish" + + " the upload in the background. Either 'wait_for_upload_complete' (default)," + + " 'nowait_for_upload_complete', or 'fully_async'.") + public BesUploadMode buildEventTextFileUploadMode; + + @Option( + name = "build_event_binary_file_upload_mode", + defaultValue = "wait_for_upload_complete", + converter = BesUploadModeConverter.class, + documentationCategory = OptionDocumentationCategory.LOGGING, + effectTags = {OptionEffectTag.EAGERNESS_TO_EXIT}, + help = + "Specifies whether the Build Event Service upload for --build_event_binary_file should" + + " block the build completion or should end the invocation immediately and finish" + + " the upload in the background. Either 'wait_for_upload_complete' (default)," + + " 'nowait_for_upload_complete', or 'fully_async'.") + public BesUploadMode buildEventBinaryFileUploadMode; + + @Option( + name = "build_event_json_file_upload_mode", + defaultValue = "wait_for_upload_complete", + converter = BesUploadModeConverter.class, + documentationCategory = OptionDocumentationCategory.LOGGING, + effectTags = {OptionEffectTag.EAGERNESS_TO_EXIT}, + help = + "Specifies whether the Build Event Service upload for --build_event_json_file should" + + " block the build completion or should end the invocation immediately and finish" + + " the upload in the background. Either 'wait_for_upload_complete' (default)," + + " 'nowait_for_upload_complete', or 'fully_async'.") + public BesUploadMode buildEventJsonFileUploadMode; + @Option( name = "build_event_text_file_path_conversion", oldName = "experimental_build_event_text_file_path_conversion", defaultValue = "true", documentationCategory = OptionDocumentationCategory.LOGGING, effectTags = {OptionEffectTag.AFFECTS_OUTPUTS}, - help = "Convert paths in the text file representation of the build event protocol to more " - + "globally valid URIs whenever possible; if disabled, the file:// uri scheme will " - + "always be used") + help = + "Convert paths in the text file representation of the build event protocol to more " + + "globally valid URIs whenever possible; if disabled, the file:// uri scheme will " + + "always be used") public boolean buildEventTextFilePathConversion; @Option( @@ -84,9 +124,10 @@ public class BuildEventStreamOptions extends OptionsBase { defaultValue = "true", documentationCategory = OptionDocumentationCategory.LOGGING, effectTags = {OptionEffectTag.AFFECTS_OUTPUTS}, - help = "Convert paths in the binary file representation of the build event protocol to more " - + "globally valid URIs whenever possible; if disabled, the file:// uri scheme will " - + "always be used") + help = + "Convert paths in the binary file representation of the build event protocol to more " + + "globally valid URIs whenever possible; if disabled, the file:// uri scheme will " + + "always be used") public boolean buildEventBinaryFilePathConversion; @Option( diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java index 7bc427d7849342..da3c2b7250e43d 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/FileTransport.java @@ -24,6 +24,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode; import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; import com.google.devtools.build.lib.buildeventstream.BuildEvent; import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; @@ -66,6 +67,7 @@ abstract class FileTransport implements BuildEventTransport { private final BuildEventArtifactUploader uploader; private final SequentialWriter writer; private final ArtifactGroupNamer namer; + private final BesUploadMode besUploadMode; private final ScheduledExecutorService timeoutExecutor = MoreExecutors.listeningDecorator( @@ -76,12 +78,14 @@ abstract class FileTransport implements BuildEventTransport { BufferedOutputStream outputStream, BuildEventProtocolOptions options, BuildEventArtifactUploader uploader, - ArtifactGroupNamer namer) { + ArtifactGroupNamer namer, + BesUploadMode besUploadMode) { this.uploader = uploader; this.options = options; this.writer = new SequentialWriter(outputStream, this::serializeEvent, uploader, timeoutExecutor); this.namer = namer; + this.besUploadMode = besUploadMode; } @ThreadSafe @@ -170,8 +174,7 @@ private void exitFailure(Throwable e) { // Print a more useful error message when the upload times out. // An {@link ExecutionException} may be wrapping a {@link TimeoutException} if the // Future was created with {@link Futures#withTimeout}. - if (e instanceof ExecutionException - && e.getCause() instanceof TimeoutException) { + if (e instanceof ExecutionException && e.getCause() instanceof TimeoutException) { message = "Unable to write all BEP events to file due to timeout"; } else { message = @@ -319,6 +322,11 @@ public boolean mayBeSlow() { return uploader.mayBeSlow(); } + @Override + public BesUploadMode getBesUploadMode() { + return besUploadMode; + } + @Override public BuildEventArtifactUploader getUploader() { return uploader; @@ -329,4 +337,3 @@ Duration getFlushInterval() { return writer.getFlushInterval(); } } - diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java index 5d7af3a024eec4..b6033d8175768a 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransport.java @@ -18,6 +18,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.flogger.GoogleLogger; +import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode; import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions; @@ -47,8 +48,9 @@ public JsonFormatFileTransport( BuildEventProtocolOptions options, BuildEventArtifactUploader uploader, ArtifactGroupNamer namer, - TypeRegistry typeRegistry) { - super(outputStream, options, uploader, namer); + TypeRegistry typeRegistry, + BesUploadMode besUploadMode) { + super(outputStream, options, uploader, namer, besUploadMode); jsonPrinter = JsonFormat.printer().usingTypeRegistry(typeRegistry).omittingInsignificantWhitespace(); } diff --git a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java index 34e26e67c1c33b..02086a8d820660 100644 --- a/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java +++ b/src/main/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransport.java @@ -16,6 +16,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; +import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode; import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions; @@ -35,8 +36,9 @@ public TextFormatFileTransport( BufferedOutputStream outputStream, BuildEventProtocolOptions options, BuildEventArtifactUploader uploader, - ArtifactGroupNamer namer) { - super(outputStream, options, uploader, namer); + ArtifactGroupNamer namer, + BesUploadMode besUploadMode) { + super(outputStream, options, uploader, namer, besUploadMode); } @Override diff --git a/src/test/java/com/google/devtools/build/lib/buildeventservice/BUILD b/src/test/java/com/google/devtools/build/lib/buildeventservice/BUILD index 0c2a557c007e9d..6ec644c9ab0cdb 100644 --- a/src/test/java/com/google/devtools/build/lib/buildeventservice/BUILD +++ b/src/test/java/com/google/devtools/build/lib/buildeventservice/BUILD @@ -74,6 +74,7 @@ java_test( "@googleapis//:google_devtools_build_v1_build_events_java_proto", "@googleapis//:google_devtools_build_v1_publish_build_event_java_grpc", "@googleapis//:google_devtools_build_v1_publish_build_event_java_proto", + "@maven//:com_google_testparameterinjector_test_parameter_injector", "@remoteapis//:build_bazel_remote_execution_v2_remote_execution_java_proto", ], ) diff --git a/src/test/java/com/google/devtools/build/lib/buildeventservice/BazelBuildEventServiceModuleTest.java b/src/test/java/com/google/devtools/build/lib/buildeventservice/BazelBuildEventServiceModuleTest.java index dfea5f77af83bd..1115c3612d1c78 100644 --- a/src/test/java/com/google/devtools/build/lib/buildeventservice/BazelBuildEventServiceModuleTest.java +++ b/src/test/java/com/google/devtools/build/lib/buildeventservice/BazelBuildEventServiceModuleTest.java @@ -37,6 +37,7 @@ import com.google.devtools.build.lib.bugreport.Crash; import com.google.devtools.build.lib.bugreport.CrashContext; import com.google.devtools.build.lib.buildeventservice.BazelBuildEventServiceModule.BackendConfig; +import com.google.devtools.build.lib.buildeventservice.BuildEventServiceModule.BuildEventOutputStreamFactory; import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader; import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos; import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.Aborted; @@ -74,6 +75,8 @@ import com.google.devtools.build.v1.PublishLifecycleEventRequest; import com.google.devtools.build.v1.StreamId; import com.google.protobuf.Empty; +import com.google.testing.junit.testparameterinjector.TestParameter; +import com.google.testing.junit.testparameterinjector.TestParameterInjector; import io.grpc.ManagedChannel; import io.grpc.Metadata; import io.grpc.Server; @@ -84,11 +87,15 @@ import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.stub.StreamObserver; import io.grpc.util.MutableHandlerRegistry; +import java.io.BufferedOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.lang.Thread.UncaughtExceptionHandler; +import java.nio.file.Files; +import java.nio.file.Paths; import java.time.Duration; import java.util.ArrayDeque; import java.util.ArrayList; @@ -102,6 +109,7 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.annotation.concurrent.GuardedBy; import org.junit.After; @@ -111,10 +119,9 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; /** Tests for {@link BazelBuildEventServiceModule}. */ -@RunWith(JUnit4.class) +@RunWith(TestParameterInjector.class) public final class BazelBuildEventServiceModuleTest extends BuildIntegrationTestCase { private static final Duration WAIT_FOR_LAST_INVOCATION_TIMEOUT = Duration.ofSeconds(2); @@ -130,6 +137,8 @@ public final class BazelBuildEventServiceModuleTest extends BuildIntegrationTest @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Nullable private BuildEventOutputStreamFactory buildEventOutputStreamFactory; + @Override protected BlazeModule getConnectivityModule() { return connectivityModule; @@ -167,6 +176,9 @@ protected Duration getMaxWaitForPreviousInvocation() { private void runBuildWithOptions(String... options) throws Exception { addOptions(options); besModule = runtimeWrapper.getRuntime().getBlazeModule(BazelBuildEventServiceModule.class); + if (buildEventOutputStreamFactory != null) { + besModule.setBuildEventOutputStreamFactory(buildEventOutputStreamFactory); + } runtimeWrapper.newCommand(); buildTarget(); } @@ -443,6 +455,65 @@ public void testAfterCommand_fullyAsync() throws Exception { events.assertNoWarningsOrErrors(); } + enum BuildEventFile { + TEXT, + JSON, + BINARY; + + String getBuildEventFileFlag(String file) { + switch (this) { + case TEXT: + return "--build_event_text_file=" + file; + case JSON: + return "--build_event_json_file=" + file; + case BINARY: + return "--build_event_binary_file=" + file; + } + throw new IllegalStateException(); + } + + String getBuildEventFileUploadModeFlag(String mode) { + switch (this) { + case TEXT: + return "--build_event_text_file_upload_mode=" + mode; + case JSON: + return "--build_event_json_file_upload_mode=" + mode; + case BINARY: + return "--build_event_binary_file_upload_mode=" + mode; + } + throw new IllegalStateException(); + } + } + + @Test + public void testAfterCommand_buildEventFile_waitForUploadComplete( + @TestParameter BuildEventFile buildEventFile) throws Exception { + AtomicReference outRef = new AtomicReference<>(null); + buildEventOutputStreamFactory = + (file) -> { + var out = + new DelayingCloseBufferedOutputStream( + Files.newOutputStream(Paths.get(file)), Duration.ofSeconds(1)); + outRef.set(out); + return out; + }; + buildEventService.setDelayBeforeClosingStream(Duration.ofSeconds(10)); + var file = tmpFolder.newFile(); + + runBuildWithOptions( + "--bes_backend=inprocess", + "--bes_upload_mode=FULLY_ASYNC", + "--bes_timeout=1s", + buildEventFile.getBuildEventFileFlag(file.getAbsolutePath()), + buildEventFile.getBuildEventFileUploadModeFlag("wait_for_upload_complete")); + afterBuildCommand(); + + assertThat(outRef.get().isClosed()).isTrue(); + // Expect Bazel doesn't wait for uploading to bes_backend, otherwise there will be a timeout + // error. + events.assertNoWarningsOrErrors(); + } + @Test public void testAfterCommand_fullyAsync_slowHalfCloseIgnored() throws Exception { buildEventService.setDelayBeforeHalfClosingStream(Duration.ofSeconds(10)); @@ -1055,4 +1126,26 @@ public void onCompleted() { new StatusRuntimeException(Status.DATA_LOSS.withDescription(errorMessage))); } } + + private static final class DelayingCloseBufferedOutputStream extends BufferedOutputStream { + private final Duration delay; + private final AtomicBoolean closed = new AtomicBoolean(false); + + DelayingCloseBufferedOutputStream(OutputStream out, Duration delay) { + super(out); + this.delay = delay; + this.out = out; + } + + @Override + public void close() throws IOException { + Uninterruptibles.sleepUninterruptibly(delay); + super.close(); + closed.set(true); + } + + public boolean isClosed() { + return closed.get(); + } + } } diff --git a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BUILD b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BUILD index 3caaa10c75bf09..12727404752373 100644 --- a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BUILD +++ b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BUILD @@ -19,6 +19,7 @@ java_test( test_class = "com.google.devtools.build.lib.AllTests", runtime_deps = ["//src/test/java/com/google/devtools/build/lib:test_runner"], deps = [ + "//src/main/java/com/google/devtools/build/lib/buildeventservice:buildeventservice-options", "//src/main/java/com/google/devtools/build/lib/buildeventstream", "//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto", "//src/main/java/com/google/devtools/build/lib/buildeventstream/transports", diff --git a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java index 527bccebc526ea..916d563d5e05c8 100644 --- a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java +++ b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/BinaryFormatFileTransportTest.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.SettableFuture; +import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode; import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; import com.google.devtools.build.lib.buildeventstream.BuildEvent; import com.google.devtools.build.lib.buildeventstream.BuildEvent.LocalFile; @@ -107,7 +108,8 @@ public void testCreatesFileAndWritesProtoBinaryFormat() throws Exception { outputStream, defaultOpts, new LocalFilesArtifactUploader(), - artifactGroupNamer); + artifactGroupNamer, + BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE); transport.sendBuildEvent(buildEvent); BuildEventStreamProtos.BuildEvent progress = @@ -157,7 +159,12 @@ public boolean mayBeSlow() { BufferedOutputStream outputStream = new BufferedOutputStream(Files.newOutputStream(Paths.get(output.getAbsolutePath()))); BinaryFormatFileTransport transport = - new BinaryFormatFileTransport(outputStream, defaultOpts, uploader, artifactGroupNamer); + new BinaryFormatFileTransport( + outputStream, + defaultOpts, + uploader, + artifactGroupNamer, + BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE); transport.sendBuildEvent(event1); ExecutionException expected = @@ -189,7 +196,8 @@ public void testWriteWhenFileClosed() throws Exception { outputStream, defaultOpts, new LocalFilesArtifactUploader(), - artifactGroupNamer); + artifactGroupNamer, + BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE); transport.close().get(); @@ -220,7 +228,8 @@ public void testWriteWhenTransportClosed() throws Exception { outputStream, defaultOpts, new LocalFilesArtifactUploader(), - artifactGroupNamer); + artifactGroupNamer, + BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE); transport.sendBuildEvent(buildEvent); Future closeFuture = transport.close(); @@ -267,7 +276,12 @@ public boolean mayBeSlow() { BufferedOutputStream outputStream = new BufferedOutputStream(Files.newOutputStream(Paths.get(output.getAbsolutePath()))); BinaryFormatFileTransport transport = - new BinaryFormatFileTransport(outputStream, defaultOpts, uploader, artifactGroupNamer); + new BinaryFormatFileTransport( + outputStream, + defaultOpts, + uploader, + artifactGroupNamer, + BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE); transport.sendBuildEvent(event1); transport.sendBuildEvent(event2); transport.close().get(); @@ -307,7 +321,12 @@ public boolean mayBeSlow() { BufferedOutputStream outputStream = new BufferedOutputStream(Files.newOutputStream(Paths.get(output.getAbsolutePath()))); BinaryFormatFileTransport transport = - new BinaryFormatFileTransport(outputStream, defaultOpts, uploader, artifactGroupNamer); + new BinaryFormatFileTransport( + outputStream, + defaultOpts, + uploader, + artifactGroupNamer, + BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE); transport.sendBuildEvent(event1); transport.close().get(); @@ -345,7 +364,12 @@ public boolean mayBeSlow() { BufferedOutputStream outputStream = new BufferedOutputStream(Files.newOutputStream(Paths.get(output.getAbsolutePath()))); BinaryFormatFileTransport transport = - new BinaryFormatFileTransport(outputStream, defaultOpts, uploader, artifactGroupNamer); + new BinaryFormatFileTransport( + outputStream, + defaultOpts, + uploader, + artifactGroupNamer, + BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE); transport.sendBuildEvent(event); ListenableFuture closeFuture = transport.close(); diff --git a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java index fe9dd01265d09b..3217f890a1b39c 100644 --- a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java +++ b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/JsonFormatFileTransportTest.java @@ -18,6 +18,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.mockito.Mockito.when; +import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode; import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; import com.google.devtools.build.lib.buildeventstream.BuildEvent; import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions; @@ -90,7 +91,8 @@ public void setUp() throws IOException { defaultOpts, new LocalFilesArtifactUploader(), artifactGroupNamer, - SPAWN_EXEC_TYPE_REGISTRY); + SPAWN_EXEC_TYPE_REGISTRY, + BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE); } @Test @@ -171,7 +173,8 @@ public void testFlushesStreamAfterSmallWrites() throws Exception { defaultOpts, new LocalFilesArtifactUploader(), artifactGroupNamer, - SPAWN_EXEC_TYPE_REGISTRY); + SPAWN_EXEC_TYPE_REGISTRY, + BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE); BuildEventStreamProtos.BuildEvent started = BuildEventStreamProtos.BuildEvent.newBuilder() diff --git a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransportTest.java b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransportTest.java index a307a129ebc92e..146076ff08870c 100644 --- a/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransportTest.java +++ b/src/test/java/com/google/devtools/build/lib/buildeventstream/transports/TextFormatFileTransportTest.java @@ -19,6 +19,7 @@ import com.google.common.base.Joiner; import com.google.common.io.Files; +import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode; import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; import com.google.devtools.build.lib.buildeventstream.BuildEvent; import com.google.devtools.build.lib.buildeventstream.BuildEventContext; @@ -88,7 +89,8 @@ public void testCreatesFileAndWritesProtoTextFormat() throws Exception { outputStream, defaultOpts, new LocalFilesArtifactUploader(), - artifactGroupNamer); + artifactGroupNamer, + BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE); transport.sendBuildEvent(buildEvent); BuildEventStreamProtos.BuildEvent progress = diff --git a/src/test/java/com/google/devtools/build/lib/runtime/BUILD b/src/test/java/com/google/devtools/build/lib/runtime/BUILD index 07cf803cf72b80..32d672d7cbb116 100644 --- a/src/test/java/com/google/devtools/build/lib/runtime/BUILD +++ b/src/test/java/com/google/devtools/build/lib/runtime/BUILD @@ -51,6 +51,7 @@ java_library( "//src/main/java/com/google/devtools/build/lib/bazel/repository/downloader", "//src/main/java/com/google/devtools/build/lib/bazel/rules", "//src/main/java/com/google/devtools/build/lib/bugreport", + "//src/main/java/com/google/devtools/build/lib/buildeventservice:buildeventservice-options", "//src/main/java/com/google/devtools/build/lib/buildeventstream", "//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto", "//src/main/java/com/google/devtools/build/lib/buildeventstream/transports", diff --git a/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java b/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java index d224ac5201efbe..648d7892b7f049 100644 --- a/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java +++ b/src/test/java/com/google/devtools/build/lib/runtime/BuildEventStreamerTest.java @@ -45,6 +45,7 @@ import com.google.devtools.build.lib.analysis.config.FragmentFactory; import com.google.devtools.build.lib.analysis.config.FragmentRegistry; import com.google.devtools.build.lib.bugreport.BugReport; +import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode; import com.google.devtools.build.lib.buildeventstream.AnnounceBuildEventTransportsEvent; import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer; import com.google.devtools.build.lib.buildeventstream.BuildEvent; @@ -172,6 +173,11 @@ public boolean mayBeSlow() { return false; } + @Override + public BesUploadMode getBesUploadMode() { + return BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE; + } + @Override public synchronized void sendBuildEvent(BuildEvent event) { events.add(event);