Skip to content

Commit

Permalink
fix fabric8io#4910 / fabric8io#4923 addressing inconsistent behavior …
Browse files Browse the repository at this point in the history
…with exec streams
  • Loading branch information
shawkins committed Mar 9, 2023
1 parent f64529c commit c96e3b4
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* Fix #4931: using coarse grain locking for all mock server operations
* Fix #4947: typo in HttpClient.Factory scoring system logic
* Fix #4928: allows non-okhttp clients to handle invalid status
* Fix #4910 / #4923: addressing inconsistent behavior with pod exec operations

#### Improvements
* Fix #4675: adding a fully client side timeout for informer watches
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,14 @@ void init() {
ws.pause();
listener.onMessage(this, msg);
});
ws.closeHandler(v -> listener.onClose(this, ws.closeStatusCode(), ws.closeReason()));
// if the server sends a ping, we're in trouble with our fetch strategy as there is
// no ping handler to increase the demand - this should not be an immediate issue as
// the api server does not seem to be sending pings

// if for whatever reason we send a ping, pong counts against the demand, so we need more
ws.pongHandler(b -> ws.fetch(1));
// use end, not close, because close is processed immediately vs. end is in frame order
ws.endHandler(v -> listener.onClose(this, ws.closeStatusCode(), ws.closeReason()));
ws.exceptionHandler(err -> listener.onError(this, err));
listener.onOpen(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ void consume(List<ByteBuffer> value) {
request.run();
return;
}
assert !complete || failed == null;
buffers.addAll(value);
buffers.notifyAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,8 @@ private void copyFile(String source, File target) {
}

try (OutputStream out = new BufferedOutputStream(new FileOutputStream(destination))) {
readTo(out, readFileCommand(source)).get();
ExecWatch w = writingOutput(out).exec(readFileCommand(source));
w.exitCode().get();
} catch (Exception e) {
throw KubernetesClientException.launderThrowable(e);
}
Expand All @@ -495,20 +496,6 @@ private InputStream read(String... command) {
return watch.getOutput();
}

private Future<?> readTo(OutputStream out, String... cmd) {
ExecWatch w = writingOutput(out).exec(cmd);
CompletableFuture<Integer> result = w.exitCode();
result.whenComplete((i, t) -> {
try {
out.close();
} catch (Exception e) {
result.obtrudeException(e);
}
w.close();
});
return result;
}

private void copyDir(String source, File target) throws Exception {
//Let's wrap the code to a runnable inner class to avoid NoClassDef on Option classes.
try {
Expand Down
24 changes: 20 additions & 4 deletions kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ void uploadFile(String uploadPath) throws IOException {
try (InputStream checkIs = podResource.file(uploadPath).read();
BufferedReader br = new BufferedReader(new InputStreamReader(checkIs, StandardCharsets.UTF_8))) {
String result = br.lines().collect(Collectors.joining(System.lineSeparator()));
assertEquals("I'm uploaded", result);
assertEquals("I'm uploaded", result, () -> checkFile(podResource, null, uploadPath));
}
}

Expand All @@ -315,7 +315,7 @@ void uploadBinaryFile() throws IOException {
try (InputStream checkIs = podResource.file("/tmp/binfile").read();) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
InputStreamPumper.transferTo(checkIs, baos::write);
assertArrayEquals(bytes, baos.toByteArray());
assertArrayEquals(bytes, baos.toByteArray(), () -> checkFile(podResource, null, "/tmp/binfile"));
}
}

Expand Down Expand Up @@ -348,7 +348,7 @@ void copyDir() throws IOException {
PodResource podResource = client.pods().withName("pod-standard");
podResource.dir("/etc").withReadyWaitTimeout(POD_READY_WAIT_IN_MILLIS).copy(tmpDir);

Path msg = tmpDir.resolve("/etc/hosts");
Path msg = tmpDir.resolve("etc/hosts");
assertTrue(Files.exists(msg));
}

Expand All @@ -367,7 +367,23 @@ void copyFile() throws IOException {
try (InputStream is = Files.newInputStream(msg);
BufferedReader br = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8))) {
String result = br.lines().collect(Collectors.joining(System.lineSeparator()));
assertEquals("hello", result);
assertEquals("hello", result, () -> checkFile(podResource, msg, "/msg.txt"));
}
}

private String checkFile(PodResource podResource, Path msg, String remoteFile) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
podResource.writingOutput(baos).withReadyWaitTimeout(POD_READY_WAIT_IN_MILLIS).exec("sh", "-c",
String.format("ls -al %s", remoteFile)).exitCode().get();
String ls = new String(baos.toByteArray());
if (msg != null) {
byte[] bytes = Files.readAllBytes(msg);
return String.format("%s local bytes %s", ls, bytes.length);
}
return ls;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

Expand Down

0 comments on commit c96e3b4

Please sign in to comment.