Skip to content

Commit

Permalink
fix #3856: removing the PodUploadWebSocketListener
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed May 2, 2022
1 parent 2b735ba commit eeb5c2b
Show file tree
Hide file tree
Showing 12 changed files with 375 additions and 625 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

class JdkWebSocketImpl implements WebSocket {
Expand Down Expand Up @@ -177,6 +178,8 @@ private boolean asBoolean(CompletableFuture<java.net.http.WebSocket> cf) {
@Override
public boolean sendClose(int code, String reason) {
CompletableFuture<java.net.http.WebSocket> cf = webSocket.sendClose(code, reason == null ? "Closing" : reason);
// matches the behavior of the okhttp implementation and will ensure input closure after 1 minute
cf.thenRunAsync(() -> webSocket.abort(), CompletableFuture.delayedExecutor(1, TimeUnit.MINUTES));
return asBoolean(cf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,40 @@

public interface ExecWatch extends Closeable {

/**
* Gets the {@link OutputStream} for stdIn if one is associated
*
* @return the stdIn stream
*/
OutputStream getInput();

/**
* Gets the {@link InputStream} for stdOut if one is associated
*
* @return the stdErr stream
*/
InputStream getOutput();

/**
* Gets the {@link InputStream} for stdErr if one is associated
*
* @return the stdErr stream
*/
InputStream getError();

/**
* Gets the {@link InputStream} associated with channel 3, which
* returns the final Status containing the exit code, which
* could indicate abnormal termination.
* <p>
* See also {@link #exitCode()}
*
* @return the channel 3 stream
*/
InputStream getErrorChannel();

/**
* Close the Watch.
* Gracefully close the Watch.
*/
@Override
void close();
Expand All @@ -41,7 +65,7 @@ public interface ExecWatch extends Closeable {
/**
* Get a future that will be completed with the exit code.
* <p>
* Will be -1 if the exit code can't be determined.
* Will be -1 if the exit code can't be determined from the status, or null if close is received before the exit code.
* <p>
* Can be used as an alternative to
* {@link ExecListener#onFailure(Throwable, io.fabric8.kubernetes.client.dsl.ExecListener.Response)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ interface Builder extends BasicBuilder {
boolean send(ByteBuffer buffer);

/**
* Send a close message
* Send a close message. If successful, the output side
* will then be closed. After a timeout the input side will
* automatically be shutdown if it isn't already shutdown by
* the remote side.
*
* @return true if the message was successfully enqueued.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -71,6 +72,8 @@ public class ExecWebSocketListener implements ExecWatch, AutoCloseable, WebSocke
static final String REASON_NON_ZERO_EXIT_CODE = "NonZeroExitCode";
static final String STATUS_SUCCESS = "Success";

private static final long MAX_QUEUE_SIZE = 16 * 1024 * 1024L;

private final class SimpleResponse implements Response {
private final HttpResponse<?> response;

Expand Down Expand Up @@ -98,11 +101,13 @@ public String body() throws IOException {
private final OutputStream err;
private final OutputStream errChannel;

private final PipedOutputStream input;
private final OutputStream input;
private final PipedInputStream output;
private final PipedInputStream error;
private final PipedInputStream errorChannel;

private final boolean forUpload;

private final AtomicReference<WebSocket> webSocketRef = new AtomicReference<>();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();

Expand All @@ -114,35 +119,36 @@ public String body() throws IOException {

private final CompletableFuture<Integer> exitCode = new CompletableFuture<>();

private ObjectMapper objectMapper;

public ExecWebSocketListener(InputStream in, OutputStream out, OutputStream err, OutputStream errChannel,
PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, PipedInputStream errorChannelPipe,
ExecListener listener, Integer bufferSize) {
this.listener = listener;
this.in = inputStreamOrPipe(in, inputPipe, toClose, bufferSize);
this.out = outputStreamOrPipe(out, outputPipe, toClose);
this.err = outputStreamOrPipe(err, errorPipe, toClose);
this.errChannel = outputStreamOrPipe(errChannel, errorChannelPipe, toClose);

this.input = inputPipe;
private ObjectMapper objectMapper = new ObjectMapper();

public ExecWebSocketListener(PodOperationContext context) {
this.listener = context.getExecListener();
this.forUpload = context.isForUpload();
PipedOutputStream inputPipe = context.getInPipe();
PipedInputStream outputPipe = context.getOutPipe();
PipedInputStream errorPipe = context.getErrPipe();
PipedInputStream errorChannelPipe = context.getErrChannelPipe();
this.in = inputStreamOrPipe(context.getIn(), inputPipe, toClose, context.getBufferSize());
this.out = outputStreamOrPipe(context.getOut(), outputPipe, toClose);
this.err = outputStreamOrPipe(context.getErr(), errorPipe, toClose);
this.errChannel = outputStreamOrPipe(context.getErrChannel(), errorChannelPipe, toClose);

if (inputPipe == null && in == null && forUpload) {
// if there's no explicit in, then we create an OutputStream that writes directly to send
this.input = InputStreamPumper.writableOutputStream(this::sendWithErrorChecking);
} else {
this.input = inputPipe;
}
this.output = outputPipe;
this.error = errorPipe;
this.errorChannel = errorChannelPipe;
this.objectMapper = new ObjectMapper();
}

@Override
public void close() {
close(1000, "Closing...");
}

private void close(int code, String reason) {
if (!exitCode.isDone()) {
exitCode.completeExceptionally(new KubernetesClientException("Closed before exit code recieved"));
}
closeWebSocketOnce(code, reason);
onClosed(code, reason);
// simply sends a close, which will shut down the output
// it's expected that the server will respond with a close, but if not the input will be shutdown implicitly
closeWebSocketOnce(1000, "Closing...");
}

/**
Expand Down Expand Up @@ -176,8 +182,8 @@ private void closeWebSocketOnce(int code, String reason) {
@Override
public void onOpen(WebSocket webSocket) {
try {
if (in instanceof PipedInputStream && input != null) {
input.connect((PipedInputStream) in);
if (in instanceof PipedInputStream && input instanceof PipedOutputStream) {
((PipedOutputStream) input).connect((PipedInputStream) in);
}
if (out instanceof PipedOutputStream && output != null) {
output.connect((PipedOutputStream) out);
Expand Down Expand Up @@ -208,7 +214,7 @@ public void onOpen(WebSocket webSocket) {
public void onError(WebSocket webSocket, Throwable t) {

//If we already called onClosed() or onFailed() before, we need to abort.
if (closed.compareAndSet(false, true)) {
if (!closed.compareAndSet(false, true)) {
//We are not going to notify the listener, sicne we've already called onClose(), so let's log a debug/warning.
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Received [{}], with message:[{}] after ExecWebSocketListener is closed, Ignoring.",
Expand All @@ -224,7 +230,6 @@ public void onError(WebSocket webSocket, Throwable t) {
}
Status status = OperationSupport.createStatus(response);
status.setMessage(t.getMessage());
LOGGER.error("Exec Failure", t);
cleanUpOnce();
} finally {
try {
Expand All @@ -234,6 +239,8 @@ public void onError(WebSocket webSocket, Throwable t) {
execResponse = new SimpleResponse(response);
}
listener.onFailure(t, execResponse);
} else {
LOGGER.error("Exec Failure", t);
}
} finally {
exitCode.completeExceptionally(t);
Expand All @@ -256,6 +263,11 @@ public void onMessage(WebSocket webSocket, ByteBuffer bytes) {
break;
case 2:
writeAndFlush(err, byteString);
if (forUpload) {
String stringValue = StandardCharsets.UTF_8.decode(bytes).toString();
exitCode.completeExceptionally(new KubernetesClientException(stringValue));
this.close();
}
break;
case 3:
handleExitStatus(bytes);
Expand Down Expand Up @@ -304,10 +316,10 @@ private void writeAndFlush(OutputStream stream, ByteBuffer byteString) throws IO

@Override
public void onClose(WebSocket webSocket, int code, String reason) {
ExecWebSocketListener.this.close(code, reason);
}

private void onClosed(int code, String reason) {
if (!exitCode.isDone()) {
exitCode.complete(null);
}
closeWebSocketOnce(code, reason);
//If we already called onClosed() or onFailed() before, we need to abort.
if (!closed.compareAndSet(false, true)) {
return;
Expand Down Expand Up @@ -360,12 +372,13 @@ public void resize(int cols, int rows) {

private void send(byte[] bytes, int offset, int length, byte flag) {
if (length > 0) {
waitForQueue(length);
WebSocket ws = webSocketRef.get();
if (ws != null) {
byte[] toSend = new byte[length + 1];
toSend[0] = flag;
System.arraycopy(bytes, offset, toSend, 1, length);
ws.send(ByteBuffer.wrap(toSend));
byte[] toSend = new byte[length + 1];
toSend[0] = flag;
System.arraycopy(bytes, offset, toSend, 1, length);
if (!ws.send(ByteBuffer.wrap(toSend))) {
this.exitCode.completeExceptionally(new IOException("could not send"));
}
}
}
Expand All @@ -374,6 +387,12 @@ private void send(byte[] bytes, int offset, int length) {
send(bytes, offset, length, (byte) 0);
}

void sendWithErrorChecking(byte[] bytes, int offset, int length) {
checkError();
send(bytes, offset, length);
checkError();
}

private static InputStream inputStreamOrPipe(InputStream stream, PipedOutputStream out, Set<Closeable> toClose,
Integer bufferSize) {
if (stream != null) {
Expand Down Expand Up @@ -426,4 +445,25 @@ public CompletableFuture<Integer> exitCode() {
return exitCode;
}

final void waitForQueue(int length) {
try {
while (webSocketRef.get().queueSize() + length > MAX_QUEUE_SIZE && !Thread.interrupted()) {
checkError();
Thread.sleep(50L);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}

final void checkError() {
if (exitCode.isDone()) {
try {
exitCode.getNow(null);
} catch (CompletionException e) {
throw KubernetesClientException.launderThrowable(e.getCause());
}
}
}

}
Loading

0 comments on commit eeb5c2b

Please sign in to comment.