From d15341cd0b8eb30b9186583cde1010ef5720bffb Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Fri, 24 Mar 2023 12:54:36 -0400 Subject: [PATCH] fix #4910: using tar for most uploads and verifying upload --- CHANGELOG.md | 5 +- .../dsl/internal/uploadable/PodUpload.java | 136 ++++++++++---- .../internal/uploadable/PodUploadTest.java | 175 ++++++------------ .../java/io/fabric8/kubernetes/PodIT.java | 33 +++- 4 files changed, 189 insertions(+), 160 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d21db45dde..03f502257be 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### 6.6-SNAPSHOT #### Bugs +* Fix #4910: Pod file upload will now detect if it's not completely sent to the api server * Fix #4963: Openshift Client return 403 when use websocket * Fix #4985: triggering the immediate cleanup of the okhttp idle task * fix #5002: Jetty response completion accounts for header processing @@ -21,9 +22,9 @@ #### New Features #### _**Note**_: Breaking changes -* Fix #4998: Serialization.yamlMapper and Serialization.clearYamlMapper have been deprecated - * Fix #4875: Removed unused options from the java-generator +* Fix #4910: all Pod file uploads not require commons-compress +* Fix #4998: Serialization.yamlMapper and Serialization.clearYamlMapper have been deprecated ### 6.5.1 (2023-03-20) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java index a6362752e17..6c601707df7 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java @@ -21,23 +21,29 @@ import io.fabric8.kubernetes.client.utils.Utils; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.compress.utils.CountingOutputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.zip.GZIPOutputStream; import static io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl.shellQuote; public class PodUpload { + private static final Logger LOG = LoggerFactory.getLogger(PodUpload.class); + private static final String TAR_PATH_DELIMITER = "/"; private PodUpload() { @@ -47,81 +53,137 @@ public static boolean upload(PodOperationsImpl operation, Path pathToUpload) throws IOException { if (Utils.isNotNullOrEmpty(operation.getContext().getFile()) && pathToUpload.toFile().isFile()) { - return uploadFile(operation, pathToUpload); + return uploadTar(operation, getDirectoryFromFile(operation), + tar -> addFileToTar(null, new File(operation.getContext().getFile()).getName(), pathToUpload.toFile(), tar)); } else if (Utils.isNotNullOrEmpty(operation.getContext().getDir()) && pathToUpload.toFile().isDirectory()) { - return uploadDirectory(operation, pathToUpload); + return uploadTar(operation, operation.getContext().getDir(), tar -> { + for (File file : pathToUpload.toFile().listFiles()) { + addFileToTar(null, file.getName(), file, tar); + } + }); } throw new IllegalArgumentException("Provided arguments are not valid (file, directory, path)"); } - private interface UploadProcessor { + private static String getDirectoryFromFile(PodOperationsImpl operation) { + String file = operation.getContext().getFile(); + String directoryTrimmedFromFilePath = file.substring(0, file.lastIndexOf('/')); + return directoryTrimmedFromFilePath.isEmpty() ? "/" : directoryTrimmedFromFilePath; + } - void process(OutputStream out) throws IOException; + private interface UploadProcessor { + + void process(T out) throws IOException; } - private static boolean upload(PodOperationsImpl operation, String command, UploadProcessor processor) throws IOException { - operation = operation.redirectingInput().terminateOnError(); + private static boolean upload(PodOperationsImpl operation, String file, UploadProcessor processor) + throws IOException { + + String command = createExecCommandForUpload(file); + CompletableFuture exitFuture; - try (ExecWatch execWatch = operation.exec("sh", "-c", command)) { + + int uploadRequestTimeout = operation.getRequestConfig().getUploadRequestTimeout(); + long uploadRequestTimeoutEnd = uploadRequestTimeout < 0 ? Long.MAX_VALUE + : uploadRequestTimeout + System.currentTimeMillis(); + long expected = 0; + try (ExecWatch execWatch = operation.redirectingInput().terminateOnError().exec("sh", "-c", command)) { OutputStream out = execWatch.getInput(); - processor.process(out); + CountingOutputStream countingStream = new CountingOutputStream(out); + processor.process(countingStream); out.close(); // also flushes + expected = countingStream.getBytesWritten(); exitFuture = execWatch.exitCode(); } - // TODO: should this timeout be from the start of the upload? - if (!Utils.waitUntilReady(exitFuture, operation.getRequestConfig().getUploadRequestTimeout(), + + // enforce the timeout after we've written everything - generally this won't fail, but + // we may have already exceeded the timeout because of how long it took to write + if (!Utils.waitUntilReady(exitFuture, Math.max(0, uploadRequestTimeoutEnd - System.currentTimeMillis()), TimeUnit.MILLISECONDS)) { + LOG.debug("failed to complete upload before timeout expired"); return false; } Integer exitCode = exitFuture.getNow(null); - return exitCode == null || exitCode.intValue() == 0; - } - - public static boolean uploadFileData(PodOperationsImpl operation, InputStream inputStream) - throws IOException { - String command = createExecCommandForUpload(operation.getContext().getFile()); + if (exitCode != null && exitCode.intValue() != 0) { + LOG.debug("upload process failed with exit code {}", exitCode); + return false; + } - return upload(operation, command, os -> InputStreamPumper.transferTo(inputStream, os::write)); + ByteArrayOutputStream byteCount = new ByteArrayOutputStream(); + try (ExecWatch countWatch = operation.writingOutput(byteCount).exec("sh", "-c", + String.format("wc -c < %s", shellQuote(file)))) { + CompletableFuture countExitFuture = countWatch.exitCode(); + if (!Utils.waitUntilReady(countExitFuture, Math.max(0, uploadRequestTimeoutEnd - System.currentTimeMillis()), + TimeUnit.MILLISECONDS) || !Integer.valueOf(0).equals(countExitFuture.getNow(null))) { + LOG.debug("failed to validate the upload size, exit code {}", countExitFuture.getNow(null)); + return false; + } + String remoteSize = new String(byteCount.toByteArray(), StandardCharsets.UTF_8); + if (!String.valueOf(expected).equals(remoteSize.trim())) { + LOG.debug("upload file size validation failed, expected {}, but was {}", expected, remoteSize); + return false; + } + } + return true; } - private static boolean uploadFile(PodOperationsImpl operation, Path pathToUpload) + public static boolean uploadFileData(PodOperationsImpl operation, InputStream inputStream) throws IOException { - try (final FileInputStream fis = new FileInputStream(pathToUpload.toFile())) { - return uploadFileData(operation, fis); - } + return upload(operation, operation.getContext().getFile(), os -> InputStreamPumper.transferTo(inputStream, os::write)); } - private static boolean uploadDirectory(PodOperationsImpl operation, Path pathToUpload) + private static boolean uploadTar(PodOperationsImpl operation, String directory, + UploadProcessor processor) throws IOException { - final String command = String.format( - "mkdir -p %1$s && tar -C %1$s -xzf -", shellQuote(operation.getContext().getDir())); + String fileName = String.format("/tmp/fabric8-%s.tar", UUID.randomUUID()); - return upload(operation, command, os -> { - try (final GZIPOutputStream gzip = new GZIPOutputStream(os); - final TarArchiveOutputStream tar = new TarArchiveOutputStream(gzip)) { + boolean uploaded = upload(operation, fileName, os -> { + try (final TarArchiveOutputStream tar = new TarArchiveOutputStream(os)) { tar.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX); - for (File file : pathToUpload.toFile().listFiles()) { - addFileToTar(null, file, tar); - } - tar.flush(); + processor.process(tar); } }); + + if (!uploaded) { + // best effort delete of the failed upload + try (ExecWatch rm = operation.writingOutput(new ByteArrayOutputStream()).exec("sh", "-c", + String.format("rm %s", fileName))) { + if (!Utils.waitUntilReady(rm.exitCode(), operation.getRequestConfig().getUploadRequestTimeout(), TimeUnit.MILLISECONDS) + || !Integer.valueOf(0).equals(rm.exitCode().getNow(null))) { + LOG.warn("delete of temporary tar file {} may not have completed", fileName); + } + } + return false; + } + + final String command = extractTarCommand(directory, fileName); + + try (ExecWatch execWatch = operation.redirectingInput().exec("sh", "-c", command)) { + CompletableFuture countExitFuture = execWatch.exitCode(); + // TODO: this enforcement duplicates the timeout + return Utils.waitUntilReady(countExitFuture, operation.getRequestConfig().getUploadRequestTimeout(), + TimeUnit.MILLISECONDS) && Integer.valueOf(0).equals(countExitFuture.getNow(null)); + } + } - private static void addFileToTar(String rootTarPath, File file, TarArchiveOutputStream tar) - throws IOException { + static String extractTarCommand(String directory, String tar) { + return String.format("mkdir -p %1$s; tar -C %1$s -xmf %2$s; e=$?; rm %2$s; exit $e", shellQuote(directory), tar); + } - final String fileName = Optional.ofNullable(rootTarPath).orElse("") + TAR_PATH_DELIMITER + file.getName(); + private static void addFileToTar(String rootTarPath, String fileName, File file, TarArchiveOutputStream tar) + throws IOException { tar.putArchiveEntry(new TarArchiveEntry(file, fileName)); if (file.isFile()) { Files.copy(file.toPath(), tar); tar.closeArchiveEntry(); } else if (file.isDirectory()) { tar.closeArchiveEntry(); + String dirRootPath = Optional.ofNullable(rootTarPath).orElse("") + TAR_PATH_DELIMITER + fileName; for (File fileInDirectory : file.listFiles()) { - addFileToTar(fileName, fileInDirectory, tar); + addFileToTar(dirRootPath, fileInDirectory.getName(), fileInDirectory, tar); } } } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadTest.java index 2615ecd66be..a5551640fff 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadTest.java @@ -15,61 +15,37 @@ */ package io.fabric8.kubernetes.client.dsl.internal.uploadable; -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.PodBuilder; -import io.fabric8.kubernetes.api.model.PodConditionBuilder; -import io.fabric8.kubernetes.api.model.WatchEventBuilder; -import io.fabric8.kubernetes.client.Config; -import io.fabric8.kubernetes.client.dsl.internal.ExecWebSocketListener; -import io.fabric8.kubernetes.client.dsl.internal.OperationContext; -import io.fabric8.kubernetes.client.dsl.internal.PodOperationContext; +import io.fabric8.kubernetes.client.dsl.TtyExecErrorable; import io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl; -import io.fabric8.kubernetes.client.http.HttpClient; -import io.fabric8.kubernetes.client.http.TestHttpResponse; -import io.fabric8.kubernetes.client.http.WebSocket; -import io.fabric8.kubernetes.client.impl.BaseClient; -import io.fabric8.kubernetes.client.utils.CommonThreadPool; import io.fabric8.kubernetes.client.utils.InputStreamPumper; -import io.fabric8.kubernetes.client.utils.Serialization; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.InputStream; -import java.net.URI; -import java.nio.ByteBuffer; +import java.io.OutputStream; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.Arrays; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.atLeast; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.junit.jupiter.api.Assertions.assertTrue; class PodUploadTest { - private HttpClient mockClient; - private WebSocket mockWebSocket; private PodOperationsImpl operation; - private Pod item; @FunctionalInterface public interface PodUploadTester { @@ -78,29 +54,7 @@ public interface PodUploadTester { @BeforeEach void setUp() { - mockClient = mock(HttpClient.class, Mockito.RETURNS_DEEP_STUBS); - mockWebSocket = mock(WebSocket.class, Mockito.RETURNS_DEEP_STUBS); - when(mockWebSocket.send(any())).thenReturn(true); - when(mockClient.newBuilder().readTimeout(anyLong(), any(TimeUnit.class)).build()).thenReturn(mockClient); - - BaseClient client = mock(BaseClient.class, Mockito.RETURNS_SELF); - Mockito.when(client.adapt(BaseClient.class).getExecutor()).thenReturn(CommonThreadPool.get()); - Config config = mock(Config.class, Mockito.RETURNS_DEEP_STUBS); - when(config.getRequestConfig().getUploadRequestTimeout()).thenReturn(10); - when(config.getMasterUrl()).thenReturn("https://openshift.com:8443"); - when(config.getNamespace()).thenReturn("default"); - when(client.getConfiguration()).thenReturn(config); - when(client.getHttpClient()).thenReturn(mockClient); - item = new PodBuilder() - .withNewMetadata().withName("pod").endMetadata() - .withNewSpec().addNewContainer().withName("container").endContainer().endSpec() - .withNewStatus().withConditions(new PodConditionBuilder().withType("Ready").withStatus("True").build()).endStatus() - .build(); - - this.operation = (PodOperationsImpl) new PodOperationsImpl( - new PodOperationContext(), new OperationContext().withClient(client)).resource(item); - when(mockClient.sendAsync(Mockito.any(), Mockito.eq(byte[].class))) - .thenReturn(CompletableFuture.completedFuture(TestHttpResponse.from(200, Serialization.asJson(item)))); + this.operation = Mockito.mock(PodOperationsImpl.class, Mockito.RETURNS_DEEP_STUBS); } @Test @@ -116,27 +70,27 @@ void uploadInvalidParametersShouldThrowException(@TempDir Path pathToUpload) { void upload_withFile_shouldUploadFile() throws IOException, InterruptedException { final Path toUpload = new File(PodUpload.class.getResource("/upload/upload-sample.txt").getFile()) .toPath(); - uploadFileAndVerify(() -> PodUpload.upload(operation, toUpload)); + uploadFileAndVerify(() -> PodUpload.upload(operation, toUpload), false, 2560); } @Test void uploadFileData_whenByteArrayInputStreamProvided_shouldUploadFile() throws IOException, InterruptedException { InputStream inputStream = new ByteArrayInputStream("test data".getBytes()); - uploadFileAndVerify(() -> PodUpload.uploadFileData(operation, inputStream)); + uploadFileAndVerify(() -> PodUpload.uploadFileData(operation, inputStream), true, 9); } @Test void upload_withDirectory_shouldUploadDirectory() throws Exception { final Path toUpload = new File(PodUpload.class.getResource("/upload").getFile()) .toPath(); - uploadDirectoryAndVerify(() -> PodUpload.upload(operation, toUpload)); + uploadDirectoryAndVerify(() -> PodUpload.upload(operation, toUpload), 2560); } @Test void upload_withDirectoryAndLongFileNames_shouldUploadDirectory() throws Exception { final Path toUpload = new File(PodUpload.class.getResource("/upload_long").getFile()) .toPath(); - uploadDirectoryAndVerify(() -> PodUpload.upload(operation, toUpload)); + uploadDirectoryAndVerify(() -> PodUpload.upload(operation, toUpload), 4096); } @Test @@ -183,75 +137,66 @@ void createExecCommandForUpload_withMultipleSingleQuotesInPath() { assertThat(result).isEqualTo("mkdir -p '/tmp/f\'\\'\'o\'\\'\'o' && cat - > '/tmp/f\'\\'\'o\'\\'\'o/c\'\\'\'p.log'"); } - void uploadFileAndVerify(PodUploadTester fileUploadMethodToTest) throws IOException, InterruptedException { - operation = operation.file("/mock/dir/file"); - WebSocket.Builder builder = mock(WebSocket.Builder.class, Mockito.RETURNS_SELF); - when(builder.buildAsync(any())).thenAnswer(newWebSocket -> { - final WebSocket.Listener wsl = newWebSocket.getArgument(0, WebSocket.Listener.class); - // Set ready status - wsl.onOpen(mockWebSocket); - if (wsl instanceof ExecWebSocketListener) { - wsl.onMessage(mockWebSocket, ByteBuffer.wrap(new byte[] { (byte) 0 })); - } else { - wsl.onMessage(mockWebSocket, Serialization.asJson(new WatchEventBuilder().withType("ADDED").withObject(item).build())); + void uploadFileAndVerify(PodUploadTester fileUploadMethodToTest, boolean stream, long size) + throws IOException, InterruptedException { + Mockito.when(this.operation.getContext().getFile()).thenReturn("/mock/file"); + if (!stream) { + verifyTar(fileUploadMethodToTest, size, "/mock"); + return; + } + Mockito.when(this.operation.writingOutput(Mockito.any())).then(new Answer() { + @Override + public TtyExecErrorable answer(InvocationOnMock invocation) throws Throwable { + OutputStream out = (OutputStream) invocation.getArgument(0); + out.write((size + "\n").getBytes(StandardCharsets.UTF_8)); + return operation; } - // Set complete status - Mockito.doAnswer(close -> { - wsl.onClose(mockWebSocket, close.getArgument(0), close.getArgument(1)); - return null; - }).when(mockWebSocket).sendClose(anyInt(), anyString()); - return CompletableFuture.completedFuture(mockWebSocket); }); - when(mockClient.newWebSocketBuilder()).thenReturn(builder); + boolean result = fileUploadMethodToTest.apply(); + assertThat(result).isTrue(); - final boolean result = fileUploadMethodToTest.apply(); + ArgumentCaptor captorUpload = ArgumentCaptor.forClass(String[].class); + Mockito.verify(operation.redirectingInput().terminateOnError(), Mockito.times(1)).exec(captorUpload.capture()); + assertEquals("mkdir -p '/mock' && cat - > '/mock/file'", captorUpload.getValue()[2]); - assertThat(result).isTrue(); - ArgumentCaptor captor = ArgumentCaptor.forClass(URI.class); - verify(builder, times(2)).uri(captor.capture()); - assertEquals( - "https://openshift.com:8443/api/v1/namespaces/default/pods?fieldSelector=metadata.name%3Dpod&timeoutSeconds=600&allowWatchBookmarks=true&watch=true", - captor.getAllValues().get(0).toString()); - assertEquals( - "https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20cat%20-%20%3E%20%27%2Fmock%2Fdir%2Ffile%27&container=container&stdin=true&stderr=true", - captor.getAllValues().get(1).toString()); - verify(mockWebSocket, atLeast(1)).send(any(ByteBuffer.class)); + ArgumentCaptor captor = ArgumentCaptor.forClass(String[].class); + Mockito.verify(operation, Mockito.times(1)).exec(captor.capture()); + assertEquals("wc -c < '/mock/file'", captor.getValue()[2]); + } + + private void uploadDirectoryAndVerify(PodUploadTester directoryUpload, long size) + throws IOException, InterruptedException { + Mockito.when(this.operation.getContext().getDir()).thenReturn("/mock/dir"); + verifyTar(directoryUpload, size, "/mock/dir"); } - private void uploadDirectoryAndVerify(PodUploadTester directoryUpload) + private void verifyTar(PodUploadTester directoryUpload, long size, String dir) throws IOException, InterruptedException { - this.operation = operation.dir("/mock/dir"); - WebSocket.Builder builder = mock(WebSocket.Builder.class, Mockito.RETURNS_SELF); - when(builder.buildAsync(any())).thenAnswer(newWebSocket -> { - final WebSocket.Listener wsl = newWebSocket.getArgument(0, WebSocket.Listener.class); - // Set ready status - wsl.onOpen(mockWebSocket); - if (wsl instanceof ExecWebSocketListener) { - wsl.onMessage(mockWebSocket, ByteBuffer.wrap(new byte[] { (byte) 0 })); - } else { - wsl.onMessage(mockWebSocket, Serialization.asJson(new WatchEventBuilder().withType("ADDED").withObject(item).build())); + Mockito.when(this.operation.writingOutput(Mockito.any())).then(new Answer() { + @Override + public TtyExecErrorable answer(InvocationOnMock invocation) throws Throwable { + OutputStream out = (OutputStream) invocation.getArgument(0); + out.write((size + "\n").getBytes(StandardCharsets.UTF_8)); + return operation; } - // Set complete status - Mockito.doAnswer(close -> { - wsl.onClose(mockWebSocket, close.getArgument(0), close.getArgument(1)); - return null; - }).when(mockWebSocket).sendClose(anyInt(), anyString()); - return CompletableFuture.completedFuture(mockWebSocket); }); - when(mockClient.newWebSocketBuilder()).thenReturn(builder); + boolean result = directoryUpload.apply(); + assertThat(result).isTrue(); - final boolean result = directoryUpload.apply(); + ArgumentCaptor captorUpload = ArgumentCaptor.forClass(String[].class); + Mockito.verify(operation.redirectingInput().terminateOnError(), Mockito.times(1)).exec(captorUpload.capture()); + assertTrue(captorUpload.getValue()[2].startsWith("mkdir -p '/tmp' && cat - > '/tmp/fabric8-")); - assertThat(result).isTrue(); - ArgumentCaptor captor = ArgumentCaptor.forClass(URI.class); - verify(builder, times(2)).uri(captor.capture()); - assertEquals( - "https://openshift.com:8443/api/v1/namespaces/default/pods?fieldSelector=metadata.name%3Dpod&timeoutSeconds=600&allowWatchBookmarks=true&watch=true", - captor.getAllValues().get(0).toString()); - assertEquals( - "https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20tar%20-C%20%27%2Fmock%2Fdir%27%20-xzf%20-&container=container&stdin=true&stderr=true", - captor.getAllValues().get(1).toString()); - verify(mockWebSocket, atLeast(1)).send(any(ByteBuffer.class)); + ArgumentCaptor captorCount = ArgumentCaptor.forClass(String[].class); + Mockito.verify(operation, Mockito.times(1)).exec(captorCount.capture()); + assertTrue(captorCount.getValue()[2].startsWith("wc -c < '/tmp/fabric8-")); + + ArgumentCaptor captorExtract = ArgumentCaptor.forClass(String[].class); + Mockito.verify(operation.redirectingInput()).exec(captorExtract.capture()); + + String extractCommand = captorExtract.getValue()[2]; + assertTrue(extractCommand.startsWith(String.format("mkdir -p '%1$s'; tar -C '%1$s' -xmf /tmp/fabric8-", dir)), + extractCommand); } } diff --git a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java index 7229db00df5..8e33d9e5a66 100644 --- a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java +++ b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -54,6 +55,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BooleanSupplier; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -289,9 +291,8 @@ void uploadFile(String uploadPath) throws IOException { final PodResource podResource = client.pods().withName("pod-standard"); podResource.waitUntilReady(POD_READY_WAIT_IN_MILLIS, TimeUnit.SECONDS); // When - final boolean uploadResult = podResource.file(uploadPath).upload(tempFile); - // Then - assertThat(uploadResult).isTrue(); + retryUpload(() -> podResource.file(uploadPath).upload(tempFile)); + try (InputStream checkIs = podResource.file(uploadPath).read(); BufferedReader br = new BufferedReader(new InputStreamReader(checkIs, StandardCharsets.UTF_8))) { String result = br.lines().collect(Collectors.joining(System.lineSeparator())); @@ -299,6 +300,27 @@ void uploadFile(String uploadPath) throws IOException { } } + void retryUpload(BooleanSupplier operation) { + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(operation::getAsBoolean); + } + + @Test + void uploadBinaryStream() throws Exception { + byte[] bytes = new byte[16385]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = (byte) i; + } + final PodResource podResource = client.pods().withName("pod-standard"); + // When + retryUpload(() -> podResource.file("/tmp/binstream").upload(new ByteArrayInputStream(bytes))); + // Then + try (InputStream checkIs = podResource.file("/tmp/binstream").read();) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + InputStreamPumper.transferTo(checkIs, baos::write); + assertArrayEquals(bytes, baos.toByteArray(), () -> checkFile(podResource, null, "/tmp/binstream")); + } + } + @Test void uploadBinaryFile() throws IOException { // Given @@ -309,9 +331,8 @@ void uploadBinaryFile() throws IOException { final Path tempFile = Files.write(tempDir.resolve("file.toBeUploaded"), bytes); final PodResource podResource = client.pods().withName("pod-standard"); // When - final boolean uploadResult = podResource.file("/tmp/binfile").upload(tempFile); + retryUpload(() -> podResource.file("/tmp/binfile").upload(tempFile)); // Then - assertThat(uploadResult).isTrue(); try (InputStream checkIs = podResource.file("/tmp/binfile").read();) { ByteArrayOutputStream baos = new ByteArrayOutputStream(); InputStreamPumper.transferTo(checkIs, baos::write); @@ -330,7 +351,7 @@ void uploadDir() throws IOException { PodResource podResource = client.pods().withName("pod-standard"); - podResource.dir("/tmp/uploadDir").withReadyWaitTimeout(POD_READY_WAIT_IN_MILLIS).upload(tmpDir); + retryUpload(() -> podResource.dir("/tmp/uploadDir").withReadyWaitTimeout(POD_READY_WAIT_IN_MILLIS).upload(tmpDir)); for (String fileName : files) { try (InputStream checkIs = podResource.file("/tmp/uploadDir/" + fileName).read();