Skip to content

Commit

Permalink
Merge pull request #294 from jglick/OutputStreamTaskListener-JENKINS-…
Browse files Browse the repository at this point in the history
…52165

Define `OutputStreamTaskListener` & close `BufferedBuildListener`
  • Loading branch information
jglick authored Jul 28, 2023
2 parents 4edc8b4 + 12f6b09 commit 4b91043
Show file tree
Hide file tree
Showing 4 changed files with 215 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,40 +24,40 @@

package org.jenkinsci.plugins.workflow.log;

import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import hudson.CloseProofOutputStream;
import hudson.model.BuildListener;
import hudson.remoting.Channel;
import hudson.remoting.ChannelClosedException;
import hudson.remoting.RemoteOutputStream;
import hudson.util.StreamTaskListener;
import java.io.Closeable;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.logging.Logger;
import org.jenkinsci.remoting.SerializableOnlyOverRemoting;

/**
* Unlike {@link StreamTaskListener} this does not set {@code autoflush} on the reconstructed {@link PrintStream}.
* It also wraps on the remote side in {@link DelayBufferedOutputStream}.
*/
final class BufferedBuildListener implements BuildListener, Closeable, SerializableOnlyOverRemoting {
final class BufferedBuildListener extends OutputStreamTaskListener.Default implements BuildListener, Closeable, SerializableOnlyOverRemoting {

private static final Logger LOGGER = Logger.getLogger(BufferedBuildListener.class.getName());

private final OutputStream out;
@SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "using Replacement anyway, fields here are irrelevant")
private final PrintStream ps;

BufferedBuildListener(OutputStream out) throws IOException {
BufferedBuildListener(OutputStream out) {
this.out = out;
ps = new PrintStream(out, false, "UTF-8");
}

@NonNull
@Override public PrintStream getLogger() {
return ps;
@Override public OutputStream getOutputStream() {
return out;
}

@Override public void close() throws IOException {
ps.close();
getLogger().close();
}

private Object writeReplace() {
Expand All @@ -75,8 +75,63 @@ private static final class Replacement implements SerializableOnlyOverRemoting {
this.ros = new RemoteOutputStream(new CloseProofOutputStream(cbl.out));
}

private Object readResolve() throws IOException {
return new BufferedBuildListener(new GCFlushedOutputStream(new DelayBufferedOutputStream(ros, tuning)));
private Object readResolve() {
var cos = new CloseableOutputStream(new GCFlushedOutputStream(new DelayBufferedOutputStream(ros, tuning)));
Channel.currentOrFail().addListener(new Channel.Listener() {
@Override public void onClosed(Channel channel, IOException cause) {
LOGGER.fine(() -> "closing " + channel.getName());
cos.close(channel, cause);
}
});
return new BufferedBuildListener(cos);
}

}

/**
* Output stream which throws {@link ChannelClosedException} when appropriate.
* Otherwise callers could continue trying to write to {@link DelayBufferedOutputStream}
* long after {@link Channel#isClosingOrClosed} without errors.
* In the case of {@code org.jenkinsci.plugins.durabletask.Handler.output},
* this is actively harmful since it would mean that writes apparently succeed
* and {@code last-location.txt} would move forward even though output was lost.
*/
private static final class CloseableOutputStream extends FilterOutputStream {

/** non-null if closed */
private Channel channel;
/** optional close cause */
private IOException cause;

CloseableOutputStream(OutputStream delegate) {
super(delegate);
}

void close(Channel channel, IOException cause) {
this.channel = channel;
this.cause = cause;
// Do not call close(): ProxyOutputStream.doClose would just throw ChannelClosedException: …: channel is already closed
}

private void checkClosed() throws IOException {
if (channel != null) {
throw new ChannelClosedException(channel, cause);
}
LOGGER.finer("not closed yet");
}

@Override public void write(int b) throws IOException {
checkClosed();
out.write(b);
}

@Override public void write(byte[] b, int off, int len) throws IOException {
checkClosed();
out.write(b, off, len);
}

@Override public String toString() {
return "CloseableOutputStream[" + out + "]";
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
import java.util.logging.Logger;

import edu.umd.cs.findbugs.annotations.NonNull;
import hudson.remoting.ChannelClosedException;
import java.io.EOFException;
import java.nio.channels.ClosedChannelException;
import java.util.stream.Stream;
import jenkins.util.Timer;

/**
Expand All @@ -58,6 +62,21 @@ final class GCFlushedOutputStream extends FilterOutputStream {
return "GCFlushedOutputStream[" + out + "]";
}

// TODO https://github.com/jenkinsci/remoting/pull/657
private static boolean isClosedChannelException(Throwable t) {
if (t instanceof ClosedChannelException) {
return true;
} else if (t instanceof ChannelClosedException) {
return true;
} else if (t instanceof EOFException) {
return true;
} else if (t == null) {
return false;
} else {
return isClosedChannelException(t.getCause()) || Stream.of(t.getSuppressed()).anyMatch(GCFlushedOutputStream::isClosedChannelException);
}
}

/**
* Flushes streams prior to garbage collection.
* ({@link BufferedOutputStream} does not do this automatically.)
Expand All @@ -78,7 +97,7 @@ private static final class FlushRef extends PhantomReference<GCFlushedOutputStre
try {
ref.out.flush();
} catch (IOException x) {
LOGGER.log(Level.WARNING, null, x);
LOGGER.log(isClosedChannelException(x) ? Level.FINE : Level.WARNING, null, x);
}
}
}, 0, 10, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* The MIT License
*
* Copyright 2023 CloudBees, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package org.jenkinsci.plugins.workflow.log;

import edu.umd.cs.findbugs.annotations.NonNull;
import hudson.model.TaskListener;
import java.io.FilterOutputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.lang.reflect.Field;
import java.lang.reflect.InaccessibleObjectException;
import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.io.output.ClosedOutputStream;
import org.kohsuke.accmod.Restricted;
import org.kohsuke.accmod.restrictions.Beta;

/**
* {@link TaskListener} which can directly return an {@link OutputStream} not wrapped in a {@link PrintStream}.
* This is important for logging since the error-swallowing behavior of {@link PrintStream} is unwanted,
* and {@link PrintStream#checkError} is useless.
*/
@Restricted(Beta.class)
public interface OutputStreamTaskListener extends TaskListener {

/**
* Returns the {@link OutputStream} from which {@link #getLogger} was constructed.
*/
@NonNull OutputStream getOutputStream();

/**
* Tries to call {@link #getOutputStream} and otherwise falls back to reflective access to {@link PrintStream#out} when possible, at worst returning the {@link PrintStream} itself.
*/
static @NonNull OutputStream getOutputStream(@NonNull TaskListener listener) {
if (listener instanceof OutputStreamTaskListener) {
return ((OutputStreamTaskListener) listener).getOutputStream();
}
PrintStream ps = listener.getLogger();
if (ps.getClass() != PrintStream.class) {
Logger.getLogger(OutputStreamTaskListener.class.getName()).warning(() -> "Unexpected PrintStream subclass " + ps.getClass().getName() + " which might override write(…); error handling is degraded unless OutputStreamTaskListener is used: " + listener.getClass().getName());
return ps;
}
if (Runtime.version().compareToIgnoreOptional(Runtime.Version.parse("17")) >= 0) {
Logger.getLogger(OutputStreamTaskListener.class.getName()).warning(() -> "On Java 17+ error handling is degraded unless OutputStreamTaskListener is used: " + listener.getClass().getName());
return ps;
}
Field printStreamDelegate;
try {
printStreamDelegate = FilterOutputStream.class.getDeclaredField("out");
} catch (NoSuchFieldException x) {
Logger.getLogger(OutputStreamTaskListener.class.getName()).log(Level.WARNING, "PrintStream.out defined in Java Platform and protected, so should not happen.", x);
return ps;
}
try {
printStreamDelegate.setAccessible(true);
} catch (InaccessibleObjectException x) {
Logger.getLogger(OutputStreamTaskListener.class.getName()).warning(() -> "Using --illegal-access=deny? Error handling is degraded unless OutputStreamTaskListener is used: " + listener.getClass().getName());
return ps;
}
OutputStream os;
try {
os = (OutputStream) printStreamDelegate.get(ps);
} catch (IllegalAccessException x) {
Logger.getLogger(OutputStreamTaskListener.class.getName()).log(Level.WARNING, "Unexpected failure to access PrintStream.out", x);
return ps;
}
if (os == null) {
// like PrintStream.ensureOpen
return ClosedOutputStream.CLOSED_OUTPUT_STREAM;
}
return os;
}

/**
* Convenience implementation handling {@link #getLogger}.
*/
abstract class Default implements OutputStreamTaskListener {

private transient PrintStream ps;

@Override public synchronized PrintStream getLogger() {
if (ps == null) {
ps = new PrintStream(getOutputStream(), false, StandardCharsets.UTF_8);
}
return ps;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -237,7 +236,7 @@ private static OutputStream decorateAll(OutputStream base, List<TaskListenerDeco
return base;
}

private static final class DecoratedTaskListener implements BuildListener {
private static final class DecoratedTaskListener extends OutputStreamTaskListener.Default implements BuildListener {

private static final long serialVersionUID = 1;

Expand All @@ -253,7 +252,7 @@ private static final class DecoratedTaskListener implements BuildListener {
*/
private final @NonNull List<TaskListenerDecorator> decorators;

private transient PrintStream logger;
private transient OutputStream out;

DecoratedTaskListener(@NonNull TaskListener delegate, @NonNull List<TaskListenerDecorator> decorators) {
this.delegate = delegate;
Expand All @@ -263,15 +262,11 @@ private static final class DecoratedTaskListener implements BuildListener {
}

@NonNull
@Override public PrintStream getLogger() {
if (logger == null) {
try {
logger = new PrintStream(decorateAll(delegate.getLogger(), decorators), false, "UTF-8");
} catch (UnsupportedEncodingException x) {
throw new AssertionError(x);
}
@Override public OutputStream getOutputStream() {
if (out == null) {
out = decorateAll(OutputStreamTaskListener.getOutputStream(delegate), decorators);
}
return logger;
return out;
}

@Override public String toString() {
Expand All @@ -280,7 +275,7 @@ private static final class DecoratedTaskListener implements BuildListener {

}

private static final class CloseableTaskListener implements BuildListener, AutoCloseable {
private static final class CloseableTaskListener implements BuildListener, AutoCloseable, OutputStreamTaskListener {

static BuildListener of(BuildListener mainDelegate, TaskListener closeDelegate) {
if (closeDelegate instanceof AutoCloseable) {
Expand All @@ -301,6 +296,12 @@ private CloseableTaskListener(@NonNull TaskListener mainDelegate, @NonNull TaskL
assert closeDelegate instanceof AutoCloseable;
}

@NonNull
@Override
public OutputStream getOutputStream() {
return OutputStreamTaskListener.getOutputStream(mainDelegate);
}

@NonNull
@Override public PrintStream getLogger() {
return mainDelegate.getLogger();
Expand Down

0 comments on commit 4b91043

Please sign in to comment.