From 02c3557c71d2755a0161b60b2361b0df434b2fbd Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Thu, 9 Mar 2023 10:42:18 -0500 Subject: [PATCH] fix #4910 / #4923 addressing inconsistent behavior with exec streams --- CHANGELOG.md | 2 ++ .../client/vertx/VertxWebSocket.java | 9 ++++++- .../dsl/internal/ExecWatchInputStream.java | 1 + .../internal/core/v1/PodOperationsImpl.java | 18 ++------------ .../java/io/fabric8/kubernetes/PodIT.java | 24 +++++++++++++++---- 5 files changed, 33 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4489df3c4e6..5001c4d8723 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,8 @@ * Fix #4891: address vertx not completely reading exec streams * Fix #4899: BuildConfigs.instantiateBinary().fromFile() does not time out * Fix #4908: using the response headers in the vertx response +* Fix #4910: addressing inconsistent behavior with pod exec operations +* Fix #4923: addressing inconsistent behavior with pod exec operations * Fix #4931: using coarse grain locking for all mock server operations * Fix #4947: typo in HttpClient.Factory scoring system logic * Fix #4928: allows non-okhttp clients to handle invalid status diff --git a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java index 4ea48409bc5..3bbb94dd370 100644 --- a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java +++ b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java @@ -44,7 +44,14 @@ void init() { ws.pause(); listener.onMessage(this, msg); }); - ws.closeHandler(v -> listener.onClose(this, ws.closeStatusCode(), ws.closeReason())); + // if the server sends a ping, we're in trouble with our fetch strategy as there is + // no ping handler to increase the demand - this should not be an immediate issue as + // the api server does not seem to be sending pings + + // if for whatever reason we send a ping, pong counts against the demand, so we need more + ws.pongHandler(b -> ws.fetch(1)); + // use end, not close, because close is processed immediately vs. end is in frame order + ws.endHandler(v -> listener.onClose(this, ws.closeStatusCode(), ws.closeReason())); ws.exceptionHandler(err -> listener.onError(this, err)); listener.onOpen(this); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStream.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStream.java index cb98f1c743c..c2e7dbeb6ff 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStream.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStream.java @@ -66,6 +66,7 @@ void consume(List value) { request.run(); return; } + assert !complete || failed == null; buffers.addAll(value); buffers.notifyAll(); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java index 0593c7a45d4..a0866c186bf 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java @@ -85,7 +85,6 @@ import java.nio.file.StandardCopyOption; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static io.fabric8.kubernetes.client.utils.internal.OptionalDependencyWrapper.wrapRunWithOptionalDependency; @@ -480,7 +479,8 @@ private void copyFile(String source, File target) { } try (OutputStream out = new BufferedOutputStream(new FileOutputStream(destination))) { - readTo(out, readFileCommand(source)).get(); + ExecWatch w = writingOutput(out).exec(readFileCommand(source)); + w.exitCode().get(); } catch (Exception e) { throw KubernetesClientException.launderThrowable(e); } @@ -495,20 +495,6 @@ private InputStream read(String... command) { return watch.getOutput(); } - private Future readTo(OutputStream out, String... cmd) { - ExecWatch w = writingOutput(out).exec(cmd); - CompletableFuture result = w.exitCode(); - result.whenComplete((i, t) -> { - try { - out.close(); - } catch (Exception e) { - result.obtrudeException(e); - } - w.close(); - }); - return result; - } - private void copyDir(String source, File target) throws Exception { //Let's wrap the code to a runnable inner class to avoid NoClassDef on Option classes. try { diff --git a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java index 9de8a8e3bab..7229db00df5 100644 --- a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java +++ b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java @@ -295,7 +295,7 @@ void uploadFile(String uploadPath) throws IOException { try (InputStream checkIs = podResource.file(uploadPath).read(); BufferedReader br = new BufferedReader(new InputStreamReader(checkIs, StandardCharsets.UTF_8))) { String result = br.lines().collect(Collectors.joining(System.lineSeparator())); - assertEquals("I'm uploaded", result); + assertEquals("I'm uploaded", result, () -> checkFile(podResource, null, uploadPath)); } } @@ -315,7 +315,7 @@ void uploadBinaryFile() throws IOException { try (InputStream checkIs = podResource.file("/tmp/binfile").read();) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); InputStreamPumper.transferTo(checkIs, baos::write); - assertArrayEquals(bytes, baos.toByteArray()); + assertArrayEquals(bytes, baos.toByteArray(), () -> checkFile(podResource, null, "/tmp/binfile")); } } @@ -348,7 +348,7 @@ void copyDir() throws IOException { PodResource podResource = client.pods().withName("pod-standard"); podResource.dir("/etc").withReadyWaitTimeout(POD_READY_WAIT_IN_MILLIS).copy(tmpDir); - Path msg = tmpDir.resolve("/etc/hosts"); + Path msg = tmpDir.resolve("etc/hosts"); assertTrue(Files.exists(msg)); } @@ -367,7 +367,23 @@ void copyFile() throws IOException { try (InputStream is = Files.newInputStream(msg); BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) { String result = br.lines().collect(Collectors.joining(System.lineSeparator())); - assertEquals("hello", result); + assertEquals("hello", result, () -> checkFile(podResource, msg, "/msg.txt")); + } + } + + private String checkFile(PodResource podResource, Path msg, String remoteFile) { + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + podResource.writingOutput(baos).withReadyWaitTimeout(POD_READY_WAIT_IN_MILLIS).exec("sh", "-c", + String.format("ls -al %s", remoteFile)).exitCode().get(); + String ls = new String(baos.toByteArray()); + if (msg != null) { + byte[] bytes = Files.readAllBytes(msg); + return String.format("%s local bytes %s", ls, bytes.length); + } + return ls; + } catch (Exception e) { + throw new RuntimeException(e); } }