Skip to content

Commit

Permalink
fix fabric8io#4910 / fabric8io#4923 adding some additional messages /…
Browse files Browse the repository at this point in the history
… wait to upload
  • Loading branch information
shawkins committed Mar 9, 2023
1 parent 3985d24 commit 5ed5ab3
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class ExecWebSocketListener implements ExecWatch, AutoCloseable, WebSocke
static final String REASON_NON_ZERO_EXIT_CODE = "NonZeroExitCode";
static final String STATUS_SUCCESS = "Success";

private static final long MAX_QUEUE_SIZE = 16 * 1024 * 1024L;
public static final int MAX_QUEUE_SIZE = 16 * 1024 * 1024;

private final class SimpleResponse implements Response {
private final HttpResponse<?> response;
Expand Down Expand Up @@ -324,6 +324,19 @@ public void onMessage(WebSocket webSocket, ByteBuffer bytes) {
close = true;
} else {
error.handle(byteString, webSocket);
// String stringValue = toString(bytes);
// if (stringValue.startsWith("fabric8 exit")) {
// System.out.println(new Date(System.currentTimeMillis()) + " stdErr done");
// close = true;
// int code = Integer.valueOf(stringValue.substring("fabric8 exit".length(), stringValue.indexOf("=")).trim());
// try {
// if (this.listener != null) {
// this.listener.onExit(code, null);
// }
// } finally {
// this.exitCode.complete(code);
// }
// }
}
break;
case 3:
Expand Down Expand Up @@ -420,7 +433,7 @@ public void resize(int cols, int rows) {
map.put(HEIGHT, rows);
map.put(WIDTH, cols);
byte[] bytes = objectMapper.writeValueAsBytes(map);
send(bytes, 0, bytes.length, (byte) 4);
send(bytes, 0, bytes.length, (byte) 5);
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
}
Expand Down Expand Up @@ -476,7 +489,7 @@ public CompletableFuture<Integer> exitCode() {
return exitCode;
}

final void waitForQueue(int length) {
public final void waitForQueue(int length) {
try {
while (webSocketRef.get().queueSize() + length > MAX_QUEUE_SIZE && !Thread.interrupted()) {
checkError();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ public PodOperationsImpl inContainer(
}

@Override
public ExecWatch exec(String... command) {
public ExecWebSocketListener exec(String... command) {
String[] actualCommands = command.length >= 1 ? command : EMPTY_COMMAND;
try {
URL url = getURL("exec", actualCommands);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
package io.fabric8.kubernetes.client.dsl.internal.uploadable;

import io.fabric8.kubernetes.client.dsl.ExecWatch;
import io.fabric8.kubernetes.client.dsl.internal.ExecWebSocketListener;
import io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl;
import io.fabric8.kubernetes.client.utils.InputStreamPumper;
import io.fabric8.kubernetes.client.utils.Utils;
Expand Down Expand Up @@ -63,10 +63,21 @@ private interface UploadProcessor {
private static boolean upload(PodOperationsImpl operation, String command, UploadProcessor processor) throws IOException {
operation = operation.redirectingInput().terminateOnError();
CompletableFuture<Integer> exitFuture;
try (ExecWatch execWatch = operation.exec("sh", "-c", command)) {
try (ExecWebSocketListener execWatch = operation.exec("sh", "-c", command)) {
OutputStream out = execWatch.getInput();
processor.process(out);
out.close(); // also flushes
// attempt at a workaround to #4910 send some additional messages
// and wait(s) to hopefully force the processing of what was already sent
for (int i = 0; i < 5; i++) {
execWatch.resize(Integer.MAX_VALUE, Integer.MAX_VALUE);
}
execWatch.waitForQueue(ExecWebSocketListener.MAX_QUEUE_SIZE);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
exitFuture = execWatch.exitCode();
}
// TODO: should this timeout be from the start of the upload?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,13 @@ void copyFile() throws IOException {
}
}

@Test
void copyManyFiles() throws Exception {
for (int i = 0; i < 100; i++) {
copyFile();
}
}

private String checkFile(PodResource podResource, Path msg, String remoteFile) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Expand Down

0 comments on commit 5ed5ab3

Please sign in to comment.