diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java index a6362752e17..722f434116f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java @@ -21,18 +21,18 @@ import io.fabric8.kubernetes.client.utils.Utils; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.compress.utils.CountingOutputStream; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.Optional; +import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.zip.GZIPOutputStream; import static io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl.shellQuote; @@ -47,30 +47,59 @@ public static boolean upload(PodOperationsImpl operation, Path pathToUpload) throws IOException { if (Utils.isNotNullOrEmpty(operation.getContext().getFile()) && pathToUpload.toFile().isFile()) { - return uploadFile(operation, pathToUpload); + return uploadTar(operation, getDirectoryFromFile(operation), + tar -> addFileToTar(null, new File(operation.getContext().getFile()).getName(), pathToUpload.toFile(), tar)); } else if (Utils.isNotNullOrEmpty(operation.getContext().getDir()) && pathToUpload.toFile().isDirectory()) { - return uploadDirectory(operation, pathToUpload); + return uploadTar(operation, operation.getContext().getDir(), tar -> { + for (File file : pathToUpload.toFile().listFiles()) { + addFileToTar(null, file.getName(), file, tar); + } + }); } throw new IllegalArgumentException("Provided arguments are not valid (file, directory, path)"); } - private interface UploadProcessor { + private static String getDirectoryFromFile(PodOperationsImpl operation) { + return Optional.ofNullable(new File(operation.getContext().getFile()).getParent()).orElse("/"); + } + + private interface UploadProcessor { - void process(OutputStream out) throws IOException; + void process(T out) throws IOException; } - private static boolean upload(PodOperationsImpl operation, String command, UploadProcessor processor) throws IOException { + private static boolean upload(PodOperationsImpl operation, String file, UploadProcessor processor) + throws IOException { + + String command = createExecCommandForUpload(file); + operation = operation.redirectingInput().terminateOnError(); CompletableFuture exitFuture; + + int uploadRequestTimeout = operation.getRequestConfig().getUploadRequestTimeout(); + long uploadRequestTimeoutEnd = uploadRequestTimeout < 0 ? Long.MAX_VALUE + : uploadRequestTimeout + System.currentTimeMillis(); try (ExecWatch execWatch = operation.exec("sh", "-c", command)) { OutputStream out = execWatch.getInput(); - processor.process(out); + CountingOutputStream countingStream = new CountingOutputStream(out); + processor.process(countingStream); out.close(); // also flushes + long expected = countingStream.getBytesWritten(); exitFuture = execWatch.exitCode(); + + // confirm upload + try (ExecWatch countWatch = operation.exec("sh", "-c", + String.format("until [ %s -eq $(wc -c < %s) ]; do sleep .1; done", expected, shellQuote(file)))) { + CompletableFuture countExitFuture = countWatch.exitCode(); + if (!Utils.waitUntilReady(countExitFuture, Math.max(0, uploadRequestTimeoutEnd - System.currentTimeMillis()), + TimeUnit.MILLISECONDS) || !Integer.valueOf(0).equals(countExitFuture.getNow(null))) { + return false; + } + } } - // TODO: should this timeout be from the start of the upload? - if (!Utils.waitUntilReady(exitFuture, operation.getRequestConfig().getUploadRequestTimeout(), + // not expected to fail here or on the final exit code, which is not typically received + if (!Utils.waitUntilReady(exitFuture, Math.max(0, uploadRequestTimeoutEnd - System.currentTimeMillis()), TimeUnit.MILLISECONDS)) { return false; } @@ -80,48 +109,53 @@ private static boolean upload(PodOperationsImpl operation, String command, Uploa public static boolean uploadFileData(PodOperationsImpl operation, InputStream inputStream) throws IOException { - String command = createExecCommandForUpload(operation.getContext().getFile()); - - return upload(operation, command, os -> InputStreamPumper.transferTo(inputStream, os::write)); + return upload(operation, operation.getContext().getFile(), os -> InputStreamPumper.transferTo(inputStream, os::write)); } - private static boolean uploadFile(PodOperationsImpl operation, Path pathToUpload) + private static boolean uploadTar(PodOperationsImpl operation, String directory, + UploadProcessor processor) throws IOException { - try (final FileInputStream fis = new FileInputStream(pathToUpload.toFile())) { - return uploadFileData(operation, fis); - } - } - private static boolean uploadDirectory(PodOperationsImpl operation, Path pathToUpload) - throws IOException { + String fileName = String.format("/tmp/fabric8-%s.tar", UUID.randomUUID()); - final String command = String.format( - "mkdir -p %1$s && tar -C %1$s -xzf -", shellQuote(operation.getContext().getDir())); - - return upload(operation, command, os -> { - try (final GZIPOutputStream gzip = new GZIPOutputStream(os); - final TarArchiveOutputStream tar = new TarArchiveOutputStream(gzip)) { + boolean uploaded = upload(operation, fileName, os -> { + try (final TarArchiveOutputStream tar = new TarArchiveOutputStream(os)) { tar.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX); - for (File file : pathToUpload.toFile().listFiles()) { - addFileToTar(null, file, tar); - } - tar.flush(); + processor.process(tar); } }); + + if (!uploaded) { + // TODO: delete tmp file + return false; + } + + final String command = extractTarCommand(directory, fileName); + + try (ExecWatch execWatch = operation.exec("sh", "-c", command)) { + CompletableFuture countExitFuture = execWatch.exitCode(); + // TODO: this enforcement duplicates the timeout + return Utils.waitUntilReady(countExitFuture, operation.getRequestConfig().getUploadRequestTimeout(), + TimeUnit.MILLISECONDS) && Integer.valueOf(0).equals(countExitFuture.getNow(null)); + } + } - private static void addFileToTar(String rootTarPath, File file, TarArchiveOutputStream tar) - throws IOException { + static String extractTarCommand(String directory, String tar) { + return String.format("mkdir -p %1$s; tar -C %1$s -xmf %2$s; e=$?; rm %2$s; exit $e", shellQuote(directory), tar); + } - final String fileName = Optional.ofNullable(rootTarPath).orElse("") + TAR_PATH_DELIMITER + file.getName(); + private static void addFileToTar(String rootTarPath, String fileName, File file, TarArchiveOutputStream tar) + throws IOException { tar.putArchiveEntry(new TarArchiveEntry(file, fileName)); if (file.isFile()) { Files.copy(file.toPath(), tar); tar.closeArchiveEntry(); } else if (file.isDirectory()) { tar.closeArchiveEntry(); + String dirRootPath = Optional.ofNullable(rootTarPath).orElse("") + TAR_PATH_DELIMITER + fileName; for (File fileInDirectory : file.listFiles()) { - addFileToTar(fileName, fileInDirectory, tar); + addFileToTar(dirRootPath, fileInDirectory.getName(), fileInDirectory, tar); } } } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadTest.java index 2615ecd66be..29fdf588913 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUploadTest.java @@ -116,13 +116,13 @@ void uploadInvalidParametersShouldThrowException(@TempDir Path pathToUpload) { void upload_withFile_shouldUploadFile() throws IOException, InterruptedException { final Path toUpload = new File(PodUpload.class.getResource("/upload/upload-sample.txt").getFile()) .toPath(); - uploadFileAndVerify(() -> PodUpload.upload(operation, toUpload)); + uploadFileAndVerify(() -> PodUpload.upload(operation, toUpload), false); } @Test void uploadFileData_whenByteArrayInputStreamProvided_shouldUploadFile() throws IOException, InterruptedException { InputStream inputStream = new ByteArrayInputStream("test data".getBytes()); - uploadFileAndVerify(() -> PodUpload.uploadFileData(operation, inputStream)); + uploadFileAndVerify(() -> PodUpload.uploadFileData(operation, inputStream), true); } @Test @@ -183,7 +183,8 @@ void createExecCommandForUpload_withMultipleSingleQuotesInPath() { assertThat(result).isEqualTo("mkdir -p '/tmp/f\'\\'\'o\'\\'\'o' && cat - > '/tmp/f\'\\'\'o\'\\'\'o/c\'\\'\'p.log'"); } - void uploadFileAndVerify(PodUploadTester fileUploadMethodToTest) throws IOException, InterruptedException { + void uploadFileAndVerify(PodUploadTester fileUploadMethodToTest, boolean stream) + throws IOException, InterruptedException { operation = operation.file("/mock/dir/file"); WebSocket.Builder builder = mock(WebSocket.Builder.class, Mockito.RETURNS_SELF); when(builder.buildAsync(any())).thenAnswer(newWebSocket -> { @@ -212,9 +213,15 @@ void uploadFileAndVerify(PodUploadTester fileUploadMethodToTest) throws assertEquals( "https://openshift.com:8443/api/v1/namespaces/default/pods?fieldSelector=metadata.name%3Dpod&timeoutSeconds=600&allowWatchBookmarks=true&watch=true", captor.getAllValues().get(0).toString()); - assertEquals( - "https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20cat%20-%20%3E%20%27%2Fmock%2Fdir%2Ffile%27&container=container&stdin=true&stderr=true", - captor.getAllValues().get(1).toString()); + if (stream) { + assertEquals( + "https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20cat%20-%20%3E%20%27%2Fmock%2Fdir%2Ffile%27&container=container&stdin=true&stderr=true", + captor.getAllValues().get(1).toString()); + } else { + assertEquals( + "https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20tar%20-C%20%27%2Fmock%2Fdir%27%20-xmf%20-&container=container&stdin=true&stderr=true", + captor.getAllValues().get(1).toString()); + } verify(mockWebSocket, atLeast(1)).send(any(ByteBuffer.class)); } @@ -249,7 +256,7 @@ private void uploadDirectoryAndVerify(PodUploadTester directoryUpload) "https://openshift.com:8443/api/v1/namespaces/default/pods?fieldSelector=metadata.name%3Dpod&timeoutSeconds=600&allowWatchBookmarks=true&watch=true", captor.getAllValues().get(0).toString()); assertEquals( - "https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20tar%20-C%20%27%2Fmock%2Fdir%27%20-xzf%20-&container=container&stdin=true&stderr=true", + "https://openshift.com:8443/api/v1/namespaces/default/pods/pod/exec?command=sh&command=-c&command=mkdir%20-p%20%27%2Fmock%2Fdir%27%20%26%26%20tar%20-C%20%27%2Fmock%2Fdir%27%20-xmf%20-&container=container&stdin=true&stderr=true", captor.getAllValues().get(1).toString()); verify(mockWebSocket, atLeast(1)).send(any(ByteBuffer.class)); } diff --git a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java index 7229db00df5..ae29c54d526 100644 --- a/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java +++ b/kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java @@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; @@ -299,6 +300,24 @@ void uploadFile(String uploadPath) throws IOException { } } + @Test + void uploadBinaryStream() throws Exception { + byte[] bytes = new byte[16385]; + for (int i = 0; i < bytes.length; i++) { + bytes[i] = (byte) i; + } + final PodResource podResource = client.pods().withName("pod-standard"); + // When + final boolean uploadResult = podResource.file("/tmp/binstream").upload(new ByteArrayInputStream(bytes)); + // Then + assertThat(uploadResult).isTrue(); + try (InputStream checkIs = podResource.file("/tmp/binstream").read();) { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + InputStreamPumper.transferTo(checkIs, baos::write); + assertArrayEquals(bytes, baos.toByteArray(), () -> checkFile(podResource, null, "/tmp/binstream")); + } + } + @Test void uploadBinaryFile() throws IOException { // Given