Skip to content

Commit

Permalink
Write to BEP on async crashes / OOMs, selecting the appropriate abort…
Browse files Browse the repository at this point in the history
… reason.

Get rid of the #closeNow() method, which wasn't really being used: It was only called when the Blaze server was being shutdown and the previous set of transports should have already been closed.

RELNOTES: None
PiperOrigin-RevId: 227164858
  • Loading branch information
ericfelly authored and Copybara-Service committed Dec 28, 2018
1 parent f65239f commit 21c2582
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.devtools.build.lib.buildeventstream.BuildEventArtifactUploader;
import com.google.devtools.build.lib.buildeventstream.BuildEventProtocolOptions;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos;
import com.google.devtools.build.lib.buildeventstream.BuildEventStreamProtos.Aborted.AbortReason;
import com.google.devtools.build.lib.buildeventstream.BuildEventTransport;
import com.google.devtools.build.lib.buildeventstream.LargeBuildEventSerializedEvent;
import com.google.devtools.build.lib.buildeventstream.transports.BuildEventStreamOptions;
Expand Down Expand Up @@ -61,8 +62,7 @@ public abstract class BuildEventServiceModule<T extends BuildEventServiceOptions
private static final Logger logger = Logger.getLogger(BuildEventServiceModule.class.getName());

private OutErr outErr;

private Set<BuildEventTransport> transports = ImmutableSet.of();
private BuildEventStreamer streamer;

/** Whether an error in the Build Event Service upload causes the build to fail. */
protected boolean errorsShouldFailTheBuild() {
Expand Down Expand Up @@ -95,7 +95,7 @@ public void beforeCommand(CommandEnvironment commandEnvironment) {
return;
}

BuildEventStreamer streamer = tryCreateStreamer(commandEnvironment);
streamer = tryCreateStreamer(commandEnvironment);
if (streamer != null) {
commandEnvironment.getReporter().addHandler(streamer);
commandEnvironment.getEventBus().register(streamer);
Expand Down Expand Up @@ -128,10 +128,17 @@ public OutErr getOutputListener() {
return outErr;
}

@Override
public void blazeShutdownOnCrash() {
if (streamer != null) {
streamer.close(AbortReason.INTERNAL);
}
}

@Override
public void afterCommand() {
this.outErr = null;
this.transports = ImmutableSet.of();
this.streamer = null;
}

/** Returns {@code null} if no stream could be created. */
Expand Down Expand Up @@ -161,7 +168,7 @@ BuildEventStreamer tryCreateStreamer(CommandEnvironment env) {
transportsBuilder.add(besTransport);
}

transports = transportsBuilder.build();
ImmutableSet<BuildEventTransport> transports = transportsBuilder.build();
if (!transports.isEmpty()) {
BuildEventStreamOptions buildEventStreamOptions =
env.getOptions().getOptions(BuildEventStreamOptions.class);
Expand Down Expand Up @@ -263,13 +270,6 @@ private BuildEventTransport tryCreateBesTransport(CommandEnvironment env)
}
}

@Override
public void blazeShutdown() {
for (BuildEventTransport transport : transports) {
transport.closeNow();
}
}

protected abstract Class<T> optionsClass();

protected abstract BuildEventServiceClient createBesClient(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.google.devtools.build.lib.buildeventstream.PathConverter;
import com.google.devtools.build.lib.clock.Clock;
import com.google.devtools.build.lib.util.JavaSleeper;
import com.google.devtools.build.lib.util.LoggingUtil;
import com.google.devtools.build.lib.util.Sleeper;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.v1.BuildStatus.Result;
Expand Down Expand Up @@ -161,11 +162,6 @@ public ListenableFuture<Void> close() {
return closeFuture;
}

@Override
public void closeNow() {
besUploader.closeNow(/*causedByTimeout=*/ false);
}

@Override
public String name() {
return "Build Event Service";
Expand Down Expand Up @@ -347,14 +343,14 @@ public ListenableFuture<Void> close() {
}

/** Stops the upload immediately. Enqueued events that have not been sent yet will be lost. */
public void closeNow(boolean causedByTimeout) {
public void closeOnTimeout() {
synchronized (lock) {
if (uploadThread != null) {
if (uploadThread.isInterrupted()) {
return;
}

interruptCausedByTimeout = causedByTimeout;
interruptCausedByTimeout = true;
uploadThread.interrupt();
}
}
Expand Down Expand Up @@ -670,15 +666,16 @@ private void startCloseTimer(ListenableFuture<Void> closeFuture, Duration closeT
Thread closeTimer =
new Thread(
() -> {
// Call closeNow() if the future does not complete within closeTimeout
// Call closeOnTimeout() if the future does not complete within closeTimeout
try {
closeFuture.get(closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException | TimeoutException e) {
closeNow(/*causedByTimeout=*/ true);
closeOnTimeout();
} catch (ExecutionException e) {
// Intentionally left empty, because this code only cares about
// calling closeNow() if the closeFuture does not complete within
// closeTimeout.
// This code only cares about calling closeOnTimeout() if the closeFuture does not
// complete within closeTimeout.
logError(e, "closeTimeout failure");
LoggingUtil.logToRemote(Level.SEVERE, "closeTimeout failure", e);
}
},
"bes-uploader-close-timer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,4 @@ public interface BuildEventTransport {
* <p>This method should not throw any exceptions.
*/
ListenableFuture<Void> close();

/**
* Similar to {@link #close()}. Instructs the transport to close as soon as possible even if
* some build events will be lost.
*
* <p>This method might be called multiple times without any effect after the first call.
*
* <p>This method should not throw any exceptions.
*/
void closeNow();
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,6 @@ public ListenableFuture<Void> close() {
return writer.close();
}

@Override
public synchronized void closeNow() {
writer.closeNow();
}

/**
* Converts the given event into a proto object; this may trigger uploading of referenced files as
* a side effect. May return {@code null} if there was an interrupt. This method is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -106,13 +107,13 @@ public class BuildEventStreamer implements EventHandler {
private int progressCount;
private final CountingArtifactGroupNamer artifactGroupNamer = new CountingArtifactGroupNamer();
private OutErrProvider outErrProvider;
private AbortReason abortReason = AbortReason.UNKNOWN;
private volatile AbortReason abortReason = AbortReason.UNKNOWN;
// Will be set to true if the build was invoked through "bazel test" or "bazel coverage".
private boolean isTestCommand;

// After a BuildCompetingEvent we might expect a whitelisted set of events. If non-null,
// the streamer is restricted to only allow those events and fully close after having seen
// them.
// After #buildComplete is called, contains the set of events that the streamer is expected to
// process. The streamer will fully close after seeing them. This field is null until
// #buildComplete is called.
private Set<BuildEventId> finalEventsToCome = null;

// True, if we already closed the stream.
Expand Down Expand Up @@ -358,18 +359,31 @@ private ScheduledFuture<?> bepUploadWaitEvent(ScheduledExecutorService executor)
TimeUnit.SECONDS);
}

public boolean isClosed() {
public synchronized boolean isClosed() {
return closed;
}

private void close() {
public void close() {
close(null);
}

public void close(@Nullable AbortReason reason) {
synchronized (this) {
if (closed) {
return;
}
closed = true;
if (reason != null) {
abortReason = reason;
}

if (finalEventsToCome == null) {
// This should only happen if there's a crash. Try to clean up as best we can.
clearEventsAndPostFinalProgress(null);
}
}


ScheduledExecutorService executor = null;
try {
executor = Executors.newSingleThreadScheduledExecutor(
Expand Down Expand Up @@ -567,7 +581,7 @@ public ImmutableSet<BuildEventTransport> getTransports() {
return ImmutableSet.copyOf(transports);
}

private void buildComplete(ChainableEvent event) {
private synchronized void clearEventsAndPostFinalProgress(ChainableEvent event) {
clearPendingEvents();
String out = null;
String err = null;
Expand All @@ -576,11 +590,14 @@ private void buildComplete(ChainableEvent event) {
err = outErrProvider.getErr();
}
post(ProgressEvent.finalProgressUpdate(progressCount, out, err));
clearAnnouncedEvents(event.getChildrenEvents());
clearAnnouncedEvents(event == null ? ImmutableList.of() : event.getChildrenEvents());
}

private synchronized void buildComplete(ChainableEvent event) {
clearEventsAndPostFinalProgress(event);

finalEventsToCome = new HashSet<>(announcedEvents);
finalEventsToCome.removeAll(postedEvents);

if (finalEventsToCome.isEmpty()) {
close();
}
Expand Down Expand Up @@ -624,10 +641,7 @@ private boolean shouldPublishActionExecutedEvent(ActionExecutedEvent event) {
// Publish failed actions
return true;
}
if (event.getAction() instanceof ExtraAction) {
return true;
}
return false;
return (event.getAction() instanceof ExtraAction);
}

private boolean bufferUntilPrerequisitesReceived(BuildEvent event) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,6 @@ public ListenableFuture<Void> close() {
return Futures.immediateFuture(null);
}

@Override
public void closeNow() {
}

List<BuildEvent> getEvents() {
return events;
}
Expand Down Expand Up @@ -484,7 +480,7 @@ public void testReodering() {
}

@Test
public void testMissingPrerequisits() {
public void testMissingPrerequisites() {
// Verify that an event where the prerequisite is never coming till the end of
// the build still gets posted, with the prerequisite aborted.

Expand Down Expand Up @@ -637,6 +633,45 @@ public void testStdoutReported() {
verify(outErr, times(1)).getErr();
}

@Test
public void testStdoutReportedAfterCrash() {
// Verify that stdout and stderr are reported in the build-event stream on progress
// events.
RecordingBuildEventTransport transport = new RecordingBuildEventTransport();
BuildEventStreamer streamer =
new BuildEventStreamer(ImmutableSet.<BuildEventTransport>of(transport), reporter);
BuildEventStreamer.OutErrProvider outErr =
Mockito.mock(BuildEventStreamer.OutErrProvider.class);
String stdoutMsg = "Some text that was written to stdout.";
String stderrMsg = "The UI text that bazel wrote to stderr.";
when(outErr.getOut()).thenReturn(stdoutMsg);
when(outErr.getErr()).thenReturn(stderrMsg);
BuildEvent startEvent =
new GenericBuildEvent(
testId("Initial"),
ImmutableSet.<BuildEventId>of(ProgressEvent.INITIAL_PROGRESS_UPDATE));

streamer.registerOutErrProvider(outErr);
streamer.buildEvent(startEvent);
// Simulate a crash with an abrupt call to #close().
streamer.close();
assertThat(streamer.isClosed()).isTrue();

List<BuildEvent> eventsSeen = transport.getEvents();
assertThat(eventsSeen).hasSize(2);
assertThat(eventsSeen.get(0).getEventId()).isEqualTo(startEvent.getEventId());
BuildEvent linkEvent = eventsSeen.get(1);
BuildEventStreamProtos.BuildEvent linkEventProto = transport.getEventProtos().get(1);
assertThat(linkEvent.getEventId()).isEqualTo(ProgressEvent.INITIAL_PROGRESS_UPDATE);
assertThat(linkEventProto.getProgress().getStdout()).isEqualTo(stdoutMsg);
assertThat(linkEventProto.getProgress().getStderr()).isEqualTo(stderrMsg);

// As there is only one progress event, the OutErrProvider should be queried
// only once for stdout and stderr.
verify(outErr, times(1)).getOut();
verify(outErr, times(1)).getErr();
}

@Test
public void testReportedConfigurations() throws Exception {
// Verify that configuration events are posted, but only once.
Expand Down

0 comments on commit 21c2582

Please sign in to comment.