From 5ed5ab305ac699e0dadc8eb7ca04c890cf239703 Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Thu, 9 Mar 2023 12:04:11 -0500 Subject: [PATCH] fix #4910 / #4923 adding some additional messages / wait to upload --- .../dsl/internal/ExecWebSocketListener.java | 19 ++++++++++++++++--- .../internal/core/v1/PodOperationsImpl.java | 2 +- .../dsl/internal/uploadable/PodUpload.java | 15 +++++++++++++-- .../java/io/fabric8/kubernetes/PodIT.java | 7 +++++++ 4 files changed, 37 insertions(+), 6 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java index 5a0a5dd2ec9..6c26c1575aa 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java @@ -70,7 +70,7 @@ public class ExecWebSocketListener implements ExecWatch, AutoCloseable, WebSocke static final String REASON_NON_ZERO_EXIT_CODE = "NonZeroExitCode"; static final String STATUS_SUCCESS = "Success"; - private static final long MAX_QUEUE_SIZE = 16 * 1024 * 1024L; + public static final int MAX_QUEUE_SIZE = 16 * 1024 * 1024; private final class SimpleResponse implements Response { private final HttpResponse response; @@ -324,6 +324,19 @@ public void onMessage(WebSocket webSocket, ByteBuffer bytes) { close = true; } else { error.handle(byteString, webSocket); + // String stringValue = toString(bytes); + // if (stringValue.startsWith("fabric8 exit")) { + // System.out.println(new Date(System.currentTimeMillis()) + " stdErr done"); + // close = true; + // int code = Integer.valueOf(stringValue.substring("fabric8 exit".length(), stringValue.indexOf("=")).trim()); + // try { + // if (this.listener != null) { + // this.listener.onExit(code, null); + // } + // } finally { + // this.exitCode.complete(code); + // } + // } } break; case 3: @@ -420,7 +433,7 @@ public void resize(int cols, int rows) { map.put(HEIGHT, rows); map.put(WIDTH, cols); byte[] bytes = objectMapper.writeValueAsBytes(map); - send(bytes, 0, bytes.length, (byte) 4); + send(bytes, 0, bytes.length, (byte) 5); } catch (Exception e) { throw KubernetesClientException.launderThrowable(e); } @@ -476,7 +489,7 @@ public CompletableFuture exitCode() { return exitCode; } - final void waitForQueue(int length) { + public final void waitForQueue(int length) { try { while (webSocketRef.get().queueSize() + length > MAX_QUEUE_SIZE && !Thread.interrupted()) { checkError(); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java index a0866c186bf..fe5bf8a1f31 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java @@ -288,7 +288,7 @@ public PodOperationsImpl inContainer( } @Override - public ExecWatch exec(String... command) { + public ExecWebSocketListener exec(String... command) { String[] actualCommands = command.length >= 1 ? command : EMPTY_COMMAND; try { URL url = getURL("exec", actualCommands); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java index a6362752e17..f33c6b90789 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java @@ -15,7 +15,7 @@ */ package io.fabric8.kubernetes.client.dsl.internal.uploadable; -import io.fabric8.kubernetes.client.dsl.ExecWatch; +import io.fabric8.kubernetes.client.dsl.internal.ExecWebSocketListener; import io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl; import io.fabric8.kubernetes.client.utils.InputStreamPumper; import io.fabric8.kubernetes.client.utils.Utils; @@ -63,10 +63,21 @@ private interface UploadProcessor { private static boolean upload(PodOperationsImpl operation, String command, UploadProcessor processor) throws IOException { operation = operation.redirectingInput().terminateOnError(); CompletableFuture exitFuture; - try (ExecWatch execWatch = operation.exec("sh", "-c", command)) { + try (ExecWebSocketListener execWatch = operation.exec("sh", "-c", command)) { OutputStream out = execWatch.getInput(); processor.process(out); out.close(); // also flushes + // attempt at a workaround to #4910 send some additional messages + // and wait(s) to hopefully force the processing of what was already sent + for (int i = 0; i < 5; i++) { + execWatch.resize(Integer.MAX_VALUE, Integer.MAX_VALUE); + } + execWatch.waitForQueue(ExecWebSocketListener.MAX_QUEUE_SIZE); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } exitFuture = execWatch.exitCode(); } // TODO: should this timeout be from the start of the upload? diff --git a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java index 7229db00df5..ef3ef16b1f4 100644 --- a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java +++ b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java @@ -371,6 +371,13 @@ void copyFile() throws IOException { } } + @Test + void copyManyFiles() throws Exception { + for (int i = 0; i < 100; i++) { + copyFile(); + } + } + private String checkFile(PodResource podResource, Path msg, String remoteFile) { try { ByteArrayOutputStream baos = new ByteArrayOutputStream();