Skip to content

Commit

Permalink
Automated rollback of commit 353000e.
Browse files Browse the repository at this point in the history
*** Reason for rollback ***

b/312694497

*** Original change description ***

Make the BesUploadMode per transport instead of per invocation.

As a result, `--build_event_[text|json|binary]_file_upload_mode` are introduced to set the upload mode for build event file respectively. The default value for them is `wait_for_upload_complete`. The implicit requirement on `--bes_upload_mode=wait_for_upload_complete` is also removed.

Alternate to #19322.

PiperOrigin-RevId: 586076379
Change-Id: I020c969497fcf72fe77754a5438ff5353ab606cb
  • Loading branch information
justinhorvitz authored and copybara-github committed Nov 28, 2023
1 parent 6b00b77 commit af4d29e
Show file tree
Hide file tree
Showing 18 changed files with 82 additions and 314 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.
package com.google.devtools.build.lib.buildeventservice;

import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -81,11 +80,9 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.AbstractMap.SimpleEntry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -116,9 +113,6 @@ public abstract class BuildEventServiceModule<OptionsT extends BuildEventService
private boolean isRunsPerTestOverTheLimit;
private BuildEventArtifactUploaderFactory uploaderFactoryToCleanup;

private BuildEventOutputStreamFactory buildEventOutputStreamFactory =
file -> new BufferedOutputStream(Files.newOutputStream(Paths.get(file)));

/**
* Holds the close futures for the upload of each transport with timeouts attached to them using
* {@link #constructCloseFuturesMapWithTimeouts(ImmutableMap)} obtained from {@link
Expand All @@ -139,6 +133,8 @@ public abstract class BuildEventServiceModule<OptionsT extends BuildEventService
private ImmutableMap<BuildEventTransport, ListenableFuture<Void>>
halfCloseFuturesWithTimeoutsMap = ImmutableMap.of();

private BesUploadMode previousUploadMode = BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE;

// TODO(lpino): Use Optional instead of @Nullable for the members below.
@Nullable private OutErr outErr;
@Nullable private ImmutableSet<BuildEventTransport> bepTransports;
Expand Down Expand Up @@ -202,17 +198,6 @@ private void cancelAndResetPendingUploads() {
resetPendingUploads();
}

private void removeFromPendingUploads(
Map<BuildEventTransport, ListenableFuture<Void>> transportFutures) {
transportFutures
.values()
.forEach(closeFuture -> closeFuture.cancel(/* mayInterruptIfRunning= */ true));
closeFuturesWithTimeoutsMap =
closeFuturesWithTimeoutsMap.entrySet().stream()
.filter(entry -> !transportFutures.containsKey(entry.getKey()))
.collect(toImmutableMap(Entry::getKey, Entry::getValue));
}

private static boolean isTimeoutException(ExecutionException e) {
return e.getCause() instanceof TimeoutException;
}
Expand All @@ -234,31 +219,20 @@ private void waitForPreviousInvocation(boolean isShutdown) {
return;
}

ImmutableMap<BuildEventTransport, ListenableFuture<Void>> waitingFutureMap =
closeFuturesWithTimeoutsMap.entrySet().stream()
.map(
entry -> {
var transport = entry.getKey();
var closeFuture = entry.getValue();
ListenableFuture<Void> future = closeFuture;
if (transport.getBesUploadMode() == BesUploadMode.FULLY_ASYNC) {
future =
isShutdown ? closeFuture : halfCloseFuturesWithTimeoutsMap.get(transport);
if (future == null) {
future = closeFuture;
}
}
return new SimpleEntry<>(transport, future);
})
.collect(toImmutableMap(Entry::getKey, Entry::getValue));
ImmutableMap<BuildEventTransport, ListenableFuture<Void>> cancelCloseFutures =
closeFuturesWithTimeoutsMap.entrySet().stream()
.filter(
entry -> {
var transport = entry.getKey();
return transport.getBesUploadMode() != BesUploadMode.FULLY_ASYNC;
})
.collect(toImmutableMap(Entry::getKey, Entry::getValue));
ImmutableMap<BuildEventTransport, ListenableFuture<Void>> waitingFutureMap = null;
boolean cancelCloseFutures = true;
switch (previousUploadMode) {
case FULLY_ASYNC:
waitingFutureMap =
isShutdown ? closeFuturesWithTimeoutsMap : halfCloseFuturesWithTimeoutsMap;
cancelCloseFutures = false;
break;
case WAIT_FOR_UPLOAD_COMPLETE:
case NOWAIT_FOR_UPLOAD_COMPLETE:
waitingFutureMap = closeFuturesWithTimeoutsMap;
cancelCloseFutures = true;
break;
}

Stopwatch stopwatch = Stopwatch.createStarted();
try {
Expand Down Expand Up @@ -287,7 +261,7 @@ private void waitForPreviousInvocation(boolean isShutdown) {
waitedMillis / 1000, waitedMillis % 1000);
reporter.handle(Event.warn(msg));
logger.atWarning().withCause(exception).log("%s", msg);
cancelCloseFutures = closeFuturesWithTimeoutsMap;
cancelCloseFutures = true;
} catch (ExecutionException e) {
String msg;
// Futures.withTimeout wraps the TimeoutException in an ExecutionException when the future
Expand All @@ -307,12 +281,13 @@ private void waitForPreviousInvocation(boolean isShutdown) {
}
reporter.handle(Event.warn(msg));
logger.atWarning().withCause(e).log("%s", msg);
cancelCloseFutures = closeFuturesWithTimeoutsMap;
cancelCloseFutures = true;
} finally {
cancelCloseFutures
.values()
.forEach(closeFuture -> closeFuture.cancel(/* mayInterruptIfRunning= */ true));
resetPendingUploads();
if (cancelCloseFutures) {
cancelAndResetPendingUploads();
} else {
resetPendingUploads();
}
}
}

Expand Down Expand Up @@ -506,7 +481,8 @@ public void blazeShutdown() {
}

private void waitForBuildEventTransportsToClose(
Map<BuildEventTransport, ListenableFuture<Void>> transportFutures)
Map<BuildEventTransport, ListenableFuture<Void>> transportFutures,
boolean besUploadModeIsSynchronous)
throws AbruptExitException {
final ScheduledExecutorService executor =
Executors.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -543,7 +519,9 @@ private void waitForBuildEventTransportsToClose(
e.getCause().getMessage()),
e);
} finally {
removeFromPendingUploads(transportFutures);
if (besUploadModeIsSynchronous) {
cancelAndResetPendingUploads();
}
executor.shutdown();
}
}
Expand Down Expand Up @@ -601,16 +579,17 @@ public boolean cancel(boolean mayInterruptIfRunning) {
}

private void closeBepTransports() throws AbruptExitException {
previousUploadMode = besOptions.besUploadMode;
closeFuturesWithTimeoutsMap =
constructCloseFuturesMapWithTimeouts(streamer.getCloseFuturesMap());
halfCloseFuturesWithTimeoutsMap =
constructCloseFuturesMapWithTimeouts(streamer.getHalfClosedMap());
boolean besUploadModeIsSynchronous =
besOptions.besUploadMode == BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE;
Map<BuildEventTransport, ListenableFuture<Void>> blockingTransportFutures = new HashMap<>();
for (Map.Entry<BuildEventTransport, ListenableFuture<Void>> entry :
closeFuturesWithTimeoutsMap.entrySet()) {
BuildEventTransport bepTransport = entry.getKey();
boolean besUploadModeIsSynchronous =
bepTransport.getBesUploadMode() == BesUploadMode.WAIT_FOR_UPLOAD_COMPLETE;
if (!bepTransport.mayBeSlow() || besUploadModeIsSynchronous) {
blockingTransportFutures.put(bepTransport, entry.getValue());
} else {
Expand All @@ -620,7 +599,7 @@ private void closeBepTransports() throws AbruptExitException {
}
}
if (!blockingTransportFutures.isEmpty()) {
waitForBuildEventTransportsToClose(blockingTransportFutures);
waitForBuildEventTransportsToClose(blockingTransportFutures, besUploadModeIsSynchronous);
}
}

Expand Down Expand Up @@ -785,18 +764,16 @@ private ImmutableSet<BuildEventTransport> createBepTransports(
if (!Strings.isNullOrEmpty(besStreamOptions.buildEventTextFile)) {
try {
BufferedOutputStream bepTextOutputStream =
buildEventOutputStreamFactory.create(besStreamOptions.buildEventTextFile);
new BufferedOutputStream(
Files.newOutputStream(Paths.get(besStreamOptions.buildEventTextFile)));

BuildEventArtifactUploader localFileUploader =
besStreamOptions.buildEventTextFilePathConversion
? uploaderSupplier.get()
: new LocalFilesArtifactUploader();
bepTransportsBuilder.add(
new TextFormatFileTransport(
bepTextOutputStream,
bepOptions,
localFileUploader,
artifactGroupNamer,
besStreamOptions.buildEventTextFileUploadMode));
bepTextOutputStream, bepOptions, localFileUploader, artifactGroupNamer));
} catch (IOException exception) {
// TODO(b/125216340): Consider making this a warning instead of an error once the
// associated bug has been resolved.
Expand All @@ -814,18 +791,16 @@ private ImmutableSet<BuildEventTransport> createBepTransports(
if (!Strings.isNullOrEmpty(besStreamOptions.buildEventBinaryFile)) {
try {
BufferedOutputStream bepBinaryOutputStream =
buildEventOutputStreamFactory.create(besStreamOptions.buildEventBinaryFile);
new BufferedOutputStream(
Files.newOutputStream(Paths.get(besStreamOptions.buildEventBinaryFile)));

BuildEventArtifactUploader localFileUploader =
besStreamOptions.buildEventBinaryFilePathConversion
? uploaderSupplier.get()
: new LocalFilesArtifactUploader();
bepTransportsBuilder.add(
new BinaryFormatFileTransport(
bepBinaryOutputStream,
bepOptions,
localFileUploader,
artifactGroupNamer,
besStreamOptions.buildEventBinaryFileUploadMode));
bepBinaryOutputStream, bepOptions, localFileUploader, artifactGroupNamer));
} catch (IOException exception) {
// TODO(b/125216340): Consider making this a warning instead of an error once the
// associated bug has been resolved.
Expand All @@ -843,7 +818,8 @@ private ImmutableSet<BuildEventTransport> createBepTransports(
if (!Strings.isNullOrEmpty(besStreamOptions.buildEventJsonFile)) {
try {
BufferedOutputStream bepJsonOutputStream =
buildEventOutputStreamFactory.create(besStreamOptions.buildEventJsonFile);
new BufferedOutputStream(
Files.newOutputStream(Paths.get(besStreamOptions.buildEventJsonFile)));
BuildEventArtifactUploader localFileUploader =
besStreamOptions.buildEventJsonFilePathConversion
? uploaderSupplier.get()
Expand All @@ -854,8 +830,7 @@ private ImmutableSet<BuildEventTransport> createBepTransports(
bepOptions,
localFileUploader,
artifactGroupNamer,
makeJsonTypeRegistry(),
besStreamOptions.buildEventJsonFileUploadMode));
makeJsonTypeRegistry()));
} catch (IOException exception) {
// TODO(b/125216340): Consider making this a warning instead of an error once the
// associated bug has been resolved.
Expand Down Expand Up @@ -902,11 +877,6 @@ protected abstract BuildEventServiceClient getBesClient(

protected abstract Set<String> allowedCommands(OptionsT besOptions);

@VisibleForTesting
void setBuildEventOutputStreamFactory(BuildEventOutputStreamFactory factory) {
this.buildEventOutputStreamFactory = factory;
}

protected ImmutableSet<String> getBesKeywords(
OptionsT besOptions, @Nullable OptionsParsingResult startupOptionsProvider) {
List<String> userKeywords = besOptions.besKeywords;
Expand Down Expand Up @@ -963,9 +933,4 @@ BuildEventArtifactUploader get() throws IOException {
throw exception;
}
}

@VisibleForTesting
interface BuildEventOutputStreamFactory {
BufferedOutputStream create(String file) throws IOException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode;
import com.google.devtools.build.lib.buildeventservice.client.BuildEventServiceClient;
import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
import com.google.devtools.build.lib.buildeventstream.BuildEvent;
Expand All @@ -38,7 +37,6 @@
public class BuildEventServiceTransport implements BuildEventTransport {
private final BuildEventServiceUploader besUploader;
private final Duration besTimeout;
private final BesUploadMode besUploadMode;

private BuildEventServiceTransport(
BuildEventServiceClient besClient,
Expand All @@ -51,8 +49,7 @@ private BuildEventServiceTransport(
EventBus eventBus,
Duration closeTimeout,
Sleeper sleeper,
Timestamp commandStartTime,
BesUploadMode besUploadMode) {
Timestamp commandStartTime) {
this.besTimeout = closeTimeout;
this.besUploader =
new BuildEventServiceUploader.Builder()
Expand All @@ -67,7 +64,6 @@ private BuildEventServiceTransport(
.eventBus(eventBus)
.commandStartTime(commandStartTime)
.build();
this.besUploadMode = besUploadMode;
}

@Override
Expand Down Expand Up @@ -95,11 +91,6 @@ public boolean mayBeSlow() {
return true;
}

@Override
public BesUploadMode getBesUploadMode() {
return besUploadMode;
}

@Override
public void sendBuildEvent(BuildEvent event) {
besUploader.enqueueEvent(event);
Expand Down Expand Up @@ -197,8 +188,7 @@ public BuildEventServiceTransport build() {
checkNotNull(eventBus),
(besOptions.besTimeout != null) ? besOptions.besTimeout : Duration.ZERO,
sleeper != null ? sleeper : new JavaSleeper(),
checkNotNull(commandStartTime),
besOptions.besUploadMode);
checkNotNull(commandStartTime));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ java_library(
deps = [
"//src/main/java/com/google/devtools/build/lib/actions:artifacts",
"//src/main/java/com/google/devtools/build/lib/actions:file_metadata",
"//src/main/java/com/google/devtools/build/lib/buildeventservice:buildeventservice-options",
"//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto",
"//src/main/java/com/google/devtools/build/lib/cmdline",
"//src/main/java/com/google/devtools/build/lib/collect/nestedset",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
// limitations under the License.
package com.google.devtools.build.lib.buildeventstream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode;
import java.time.Duration;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
Expand All @@ -24,12 +24,14 @@
*
* <p>All implementations need to be thread-safe. All methods are expected to return quickly.
*
* <p>Notice that this interface does not provide any error handling API. A transport may choose to
* log interesting errors to the command line and/or abort the whole build.
* <p>Notice that this interface does not provide any error handling API. A transport may choose
* to log interesting errors to the command line and/or abort the whole build.
*/
@ThreadSafe
public interface BuildEventTransport {
/** The name of this transport as can be displayed to a user. */
/**
* The name of this transport as can be displayed to a user.
*/
String name();

/**
Expand Down Expand Up @@ -85,9 +87,7 @@ default Duration getTimeout() {
*/
boolean mayBeSlow();

/** Returns the desired {@link BesUploadMode} for the transport. */
BesUploadMode getBesUploadMode();

@VisibleForTesting
@Nullable
BuildEventArtifactUploader getUploader();
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ java_library(
name = "transports",
srcs = glob(["*.java"]),
deps = [
"//src/main/java/com/google/devtools/build/lib/buildeventservice:buildeventservice-options",
"//src/main/java/com/google/devtools/build/lib/buildeventstream",
"//src/main/java/com/google/devtools/build/lib/buildeventstream/proto:build_event_stream_java_proto",
"//src/main/java/com/google/devtools/build/lib/util:abrupt_exit_exception",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package com.google.devtools.build.lib.buildeventstream.transports;

import com.google.devtools.build.lib.buildeventservice.BuildEventServiceOptions.BesUploadMode;
import com.google.devtools.build.lib.buildeventstream.ArtifactGroupNamer;
import com.google.devtools.build.lib.buildeventstream.BuildEvent;
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
Expand All @@ -35,9 +34,8 @@ public BinaryFormatFileTransport(
BufferedOutputStream outputStream,
BuildEventProtocolOptions options,
BuildEventArtifactUploader uploader,
ArtifactGroupNamer namer,
BesUploadMode besUploadMode) {
super(outputStream, options, uploader, namer, besUploadMode);
ArtifactGroupNamer namer) {
super(outputStream, options, uploader, namer);
}

@Override
Expand Down
Loading

0 comments on commit af4d29e

Please sign in to comment.