From 42d02533e126be4b4e5b8e6c1124ef818b4d37b5 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Thu, 9 Mar 2023 09:26:14 -0500 Subject: [PATCH] fix #4910 #4923: addressing vertx and general race condition with exec --- CHANGELOG.md | 1 + .../fabric8/kubernetes/client/vertx/VertxWebSocket.java | 9 ++++++++- .../client/dsl/internal/ExecWatchInputStream.java | 1 + .../client/dsl/internal/core/v1/PodOperationsImpl.java | 7 ++++--- .../src/test/java/io/fabric8/kubernetes/PodIT.java | 2 +- 5 files changed, 15 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index db0d2f826ca..96cec9b216c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ * Fix #4931: using coarse grain locking for all mock server operations * Fix #4947: typo in HttpClient.Factory scoring system logic * Fix #4928: allows non-okhttp clients to handle invalid status +* Fix #4910 / #4923: addressing inconsistent behavior with pod exec operations #### Improvements * Fix #4675: adding a fully client side timeout for informer watches diff --git a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java index 4ea48409bc5..3bbb94dd370 100644 --- a/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java +++ b/httpclient-vertx/src/main/java/io/fabric8/kubernetes/client/vertx/VertxWebSocket.java @@ -44,7 +44,14 @@ void init() { ws.pause(); listener.onMessage(this, msg); }); - ws.closeHandler(v -> listener.onClose(this, ws.closeStatusCode(), ws.closeReason())); + // if the server sends a ping, we're in trouble with our fetch strategy as there is + // no ping handler to increase the demand - this should not be an immediate issue as + // the api server does not seem to be sending pings + + // if for whatever reason we send a ping, pong counts against the demand, so we need more + ws.pongHandler(b -> ws.fetch(1)); + // use end, not close, because close is processed immediately vs. end is in frame order + ws.endHandler(v -> listener.onClose(this, ws.closeStatusCode(), ws.closeReason())); ws.exceptionHandler(err -> listener.onError(this, err)); listener.onOpen(this); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStream.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStream.java index cb98f1c743c..c2e7dbeb6ff 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStream.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWatchInputStream.java @@ -66,6 +66,7 @@ void consume(List value) { request.run(); return; } + assert !complete || failed == null; buffers.addAll(value); buffers.notifyAll(); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java index 0593c7a45d4..819bc212193 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java @@ -498,15 +498,16 @@ private InputStream read(String... command) { private Future readTo(OutputStream out, String... cmd) { ExecWatch w = writingOutput(out).exec(cmd); CompletableFuture result = w.exitCode(); - result.whenComplete((i, t) -> { + // ensure the whenComplete runs prior to downstream actions + return result.whenComplete((i, t) -> { try { out.close(); } catch (Exception e) { result.obtrudeException(e); + } finally { + w.close(); } - w.close(); }); - return result; } private void copyDir(String source, File target) throws Exception { diff --git a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java index 9de8a8e3bab..ff2a4954cb4 100644 --- a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java +++ b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java @@ -348,7 +348,7 @@ void copyDir() throws IOException { PodResource podResource = client.pods().withName("pod-standard"); podResource.dir("/etc").withReadyWaitTimeout(POD_READY_WAIT_IN_MILLIS).copy(tmpDir); - Path msg = tmpDir.resolve("/etc/hosts"); + Path msg = tmpDir.resolve("etc/hosts"); assertTrue(Files.exists(msg)); }