Skip to content

Commit

Permalink
fix #4112: cleaning up exec streams
Browse files Browse the repository at this point in the history
removing usage of piped streams
adding backpressure support
adding javadocs to stream methods
exposing terminate on error
also updating the exec examples
  • Loading branch information
shawkins committed May 5, 2022
1 parent eeb5c2b commit 1d65573
Show file tree
Hide file tree
Showing 33 changed files with 926 additions and 600 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ And Store.getKey can be used rather than directly referencing static Cache funct
There is also client.resourceList(...).resources() and client.configMaps().resources() - that will provide a Resource stream.
This allows you to implement composite operations easily with lambda: client.secrets().resources().forEach(r -> r.delete());
* Fix #3472 #3587: Allowing for customization of the Informer store/cache key function and how state is stored. See BasicItemStore and ReducedStateItemStore and the SharedIndexInformer.itemStore function.
* Fix #4112: Added TtyExecErrorable.terminateOnError to produce an exceptional outcome to the exitCode when a message is seen on stdErr.
* Fix #3854: Camel-K: Missing method for manipulating KameletBindings

#### _**Note**_: Breaking changes in the API
Expand Down
17 changes: 14 additions & 3 deletions doc/MIGRATION-v6.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- [DSL Interface Changes](#dsl-interface-changes)
- [evict Changes](#evict-changes)
- [Delete Behavior](#delete-behavior)
- [Stream Changes](#stream-changes)

## Backwards Compatibility Interceptor

Expand Down Expand Up @@ -259,7 +260,17 @@ Evictable.evict will throw an exception rather than returning false if the pod i

Deleting a collection is now implemented using a single delete call, rather than for each item. When the collection is namespaced and inAnyNamespace is used, in which case a call will be made to first determine the affected namespaces, and then a collection delete issued against each namespace.

The result of the delete calls will be a List of StatusDetails rather than a boolean value. A best
effort is made to process the response from the server to populate which items are deleted. This information is generally useful if you wish to implement some kind of blocking delete behavior - that is ensure the returned resources based upon a matching uid have been deleted.
The result of the delete calls will be a List of StatusDetails rather than a boolean value. A best effort is made to process the response from the server to populate which items are deleted. This information is generally useful if you wish to implement some kind of blocking delete behavior - that is ensure the returned resources based upon a matching uid have been deleted.

/ list will always return true and 404s on individual items will simply be ignored.
delete(List<T>) and delete(T[]) returning boolean have been deprecated. They will always return true and 404s on individual items will simply be ignored.

## Stream Changes

The usage of Piped streams is no longer supported - they make assumptions about reading and writing threads, which the client no longer honors. They should not be passed into
the methods accepting InputStreams and OutputStreams.

ContainerResource.writingInput(PipedOutputStream in) and readingXXX(PipedInputStream out) have been removed - use the redirecting methods instead.

TtyExecErrorChannelable methods have been deprecated in favor of ExecWatch.exitCode and ExecListener.onExit.

ContainerResource.readingInput(InputStream in) has been deprecated - use redirectingInput instead.
Original file line number Diff line number Diff line change
Expand Up @@ -106,23 +106,25 @@ public CompletionStage<?> onBinary(java.net.http.WebSocket webSocket, ByteBuffer
} catch (IOException e) {
throw new RuntimeException(e);
}
webSocket.request(1);
if (last) {
ByteBuffer value = ByteBuffer.wrap(byteArrayOutputStream.toByteArray());
byteArrayOutputStream.reset();
listener.onMessage(new JdkWebSocketImpl(queueSize, webSocket), value);
} else {
webSocket.request(1);
}
return null;
}

@Override
public CompletionStage<?> onText(java.net.http.WebSocket webSocket, CharSequence data, boolean last) {
stringBuilder.append(data);
webSocket.request(1);
if (last) {
String value = stringBuilder.toString();
stringBuilder.setLength(0);
listener.onMessage(new JdkWebSocketImpl(queueSize, webSocket), value);
} else {
webSocket.request(1);
}
return null;
}
Expand Down Expand Up @@ -188,4 +190,9 @@ public long queueSize() {
return queueSize.get();
}

@Override
public void request() {
this.webSocket.request(1);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.fabric8.kubernetes.client.okhttp;

import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.http.WebSocket;
import io.fabric8.kubernetes.client.http.WebSocketHandshakeException;
import io.fabric8.kubernetes.client.okhttp.OkHttpClientImpl.OkHttpResponseImpl;
Expand All @@ -30,6 +31,8 @@
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class OkHttpWebSocketImpl implements WebSocket {

Expand All @@ -54,6 +57,9 @@ public CompletableFuture<WebSocket> buildAsync(Listener listener) {
CompletableFuture<WebSocket> future = new CompletableFuture<>();
httpClient.newWebSocket(request, new WebSocketListener() {
private volatile boolean opened;
private boolean more = true;
private ReentrantLock lock = new ReentrantLock();
private Condition moreRequested = lock.newCondition();

@Override
public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response response) {
Expand All @@ -72,7 +78,7 @@ public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response respons
future.completeExceptionally(t);
}
} else {
listener.onError(new OkHttpWebSocketImpl(webSocket), t);
listener.onError(new OkHttpWebSocketImpl(webSocket, this::request), t);
}
}

Expand All @@ -82,24 +88,56 @@ public void onOpen(okhttp3.WebSocket webSocket, Response response) {
if (response != null) {
response.close();
}
OkHttpWebSocketImpl value = new OkHttpWebSocketImpl(webSocket);
OkHttpWebSocketImpl value = new OkHttpWebSocketImpl(webSocket, this::request);
listener.onOpen(value);
future.complete(value);
}

@Override
public void onMessage(okhttp3.WebSocket webSocket, ByteString bytes) {
listener.onMessage(new OkHttpWebSocketImpl(webSocket), bytes.asByteBuffer());
awaitMoreRequest();
listener.onMessage(new OkHttpWebSocketImpl(webSocket, this::request), bytes.asByteBuffer());
}

@Override
public void onMessage(okhttp3.WebSocket webSocket, String text) {
listener.onMessage(new OkHttpWebSocketImpl(webSocket), text);
awaitMoreRequest();
listener.onMessage(new OkHttpWebSocketImpl(webSocket, this::request), text);
}

/**
* To back pressure OkHttp, we need to hold the socket processing thread before
* it delivers more results than expected
*/
private void awaitMoreRequest() {
lock.lock();
try {
while (!more) {
moreRequested.await();
}
more = false;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw KubernetesClientException.launderThrowable(e);
} finally {
lock.unlock();
}
}

private void request() {
lock.lock();
try {
more = true;
moreRequested.signalAll();
} finally {
lock.unlock();
}
}

@Override
public void onClosing(okhttp3.WebSocket webSocket, int code, String reason) {
listener.onClose(new OkHttpWebSocketImpl(webSocket), code, reason);
awaitMoreRequest();
listener.onClose(new OkHttpWebSocketImpl(webSocket, this::request), code, reason);
}

});
Expand Down Expand Up @@ -127,9 +165,11 @@ public Builder subprotocol(String protocol) {
}

private okhttp3.WebSocket webSocket;
private Runnable requestMethod;

public OkHttpWebSocketImpl(okhttp3.WebSocket webSocket) {
public OkHttpWebSocketImpl(okhttp3.WebSocket webSocket, Runnable requestMethod) {
this.webSocket = webSocket;
this.requestMethod = requestMethod;
}

@Override
Expand All @@ -147,4 +187,9 @@ public long queueSize() {
return webSocket.queueSize();
}

@Override
public void request() {
requestMethod.run();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
Expand Down Expand Up @@ -116,6 +118,38 @@ public void onOpen(WebSocket webSocket) {
startedFuture.get(10, TimeUnit.SECONDS);
}

@Test
void testRequest() throws Exception {
server.expect().withPath("/foo")
.andUpgradeToWebSocket()
.open()
.waitFor(0)
.andEmit("hello")
.waitFor(0)
.andEmit("world")
.done().always();

CountDownLatch latch = new CountDownLatch(2);

CompletableFuture<WebSocket> startedFuture = client.getHttpClient().newWebSocketBuilder()
.uri(URI.create(client.getConfiguration().getMasterUrl() + "foo"))
.buildAsync(new Listener() {

@Override
public void onMessage(WebSocket webSocket, String text) {
latch.countDown();
}

});

assertFalse(latch.await(2, TimeUnit.SECONDS));
assertEquals(1, latch.getCount());

startedFuture.get().request();

assertTrue(latch.await(1, TimeUnit.SECONDS));
}

@Test
void testAsyncBody() throws Exception {
server.expect().withPath("/async").andReturn(200, "hello world").always();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,39 @@
*/
package io.fabric8.kubernetes.client.dsl;

import io.fabric8.kubernetes.client.utils.InputStreamPumper;

import java.io.InputStream;
import java.io.PipedOutputStream;
import java.io.OutputStream;

public interface ContainerResource
extends TtyExecOutputErrorable,
TimestampBytesLimitTerminateTimeTailPrettyLoggable {

/**
* Will send the given input stream via a polling mechanism.
*
* @deprecated use redirectingInput and the resulting {@link ExecWatch#getOutput()} with
* InputStream#transferTo(java.io.OutputStream) on JDK 9+
* or
* {@link InputStreamPumper#transferTo(InputStream, io.fabric8.kubernetes.client.utils.InputStreamPumper.Writable)}
* @param in the {@link InputStream}
*/
@Deprecated
TtyExecOutputErrorable readingInput(InputStream in);

TtyExecOutputErrorable writingInput(PipedOutputStream in);

/**
* Will provide an {@link OutputStream} via {@link ExecWatch#getInput()} with the
* default buffer size.
*/
TtyExecOutputErrorable redirectingInput();

/**
* Will provide an {@link OutputStream} via {@link ExecWatch#getInput()} with the
* given buffer size.
*
* @param bufferSize if null will use the default
*/
TtyExecOutputErrorable redirectingInput(Integer bufferSize);

CopyOrReadable file(String path);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public interface Response {

/**
* May be null if not provided by the underlying implementation.
*
* @return the body as a String
* @throws IOException
*/
Expand All @@ -37,31 +38,38 @@ public interface Response {
/**
* Called when the request has successfully been upgraded to a web socket.
*/
default void onOpen() {}
default void onOpen() {
}

/**
* Called when the transport or protocol layer of this web socket errors during communication.
*
* @param t Throwable
* @param failureResponse non-null if the failure is caused by the handshake
*/
default void onFailure(Throwable t, Response failureResponse) {}
default void onFailure(Throwable t, Response failureResponse) {
}

/**
* Called when the server sends a close message.
*
* @param code The <a href="http://tools.ietf.org/html/rfc6455#section-7.4.1">RFC-compliant</a>
* status code.
* @param reason Reason for close or an empty string.
*/
void onClose(int code, String reason);
/**
* Called when the server sends a close message.
*
* @param code The <a href="http://tools.ietf.org/html/rfc6455#section-7.4.1">RFC-compliant</a>
* status code.
* @param reason Reason for close or an empty string.
*/
void onClose(int code, String reason);

/**
* Called after a Status message is seen on channel 3.
*
* Use {@link ExecWatch#getErrorChannel()} if you need the raw channel 3 contents.
* <p>
* See https://github.com/kubernetes/kubernetes/issues/89899 - which explains there's currently no way to indicate
* end of input over a websocket, so you may not get an exit code when using stdIn.
* <p>
* See also {@link ExecWatch#exitCode()}
*
* @param code the exit code, -1 will be used if the code cannot be determined
* @param status may be null if no valid status was received
*/
default void onExit(int code, Status status) {}
default void onExit(int code, Status status) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,24 @@
public interface ExecWatch extends Closeable {

/**
* Gets the {@link OutputStream} for stdIn if one is associated
* Gets the {@link OutputStream} for stdIn if {@link ContainerResource#redirectingInput()} has been called.
* <p>
* Closing this stream does not immediately force sending. You will typically call {@link #close()} after
* you are finished writing - the close message will not be sent until all pending messages have been sent.
*
* @return the stdIn stream
*/
OutputStream getInput();

/**
* Gets the {@link InputStream} for stdOut if one is associated
* Gets the {@link InputStream} for stdOut if {@link TtyExecOutputErrorable#redirectingOutput()} has been called.
*
* @return the stdErr stream
*/
InputStream getOutput();

/**
* Gets the {@link InputStream} for stdErr if one is associated
* Gets the {@link InputStream} for stdErr if {@link TtyExecErrorable#redirectingError()} has been called.
*
* @return the stdErr stream
*/
Expand All @@ -55,7 +58,7 @@ public interface ExecWatch extends Closeable {
InputStream getErrorChannel();

/**
* Gracefully close the Watch.
* Gracefully close the Watch - the close message will not be sent until all pending messages have been sent.
*/
@Override
void close();
Expand All @@ -66,6 +69,9 @@ public interface ExecWatch extends Closeable {
* Get a future that will be completed with the exit code.
* <p>
* Will be -1 if the exit code can't be determined from the status, or null if close is received before the exit code.
* <br>
* See https://github.com/kubernetes/kubernetes/issues/89899 - which explains there's currently no way to indicate
* end of input over a websocket, so you may not get an exit code when using stdIn.
* <p>
* Can be used as an alternative to
* {@link ExecListener#onFailure(Throwable, io.fabric8.kubernetes.client.dsl.ExecListener.Response)}
Expand Down
Loading

0 comments on commit 1d65573

Please sign in to comment.