From 69be7366ea192567dd191bf17c90e971389cce5c Mon Sep 17 00:00:00 2001 From: Steve Hawkins Date: Mon, 13 Mar 2023 13:45:23 -0400 Subject: [PATCH] fix #4910: speculative changes for upload --- .../internal/core/v1/PodOperationsImpl.java | 5 +- .../dsl/internal/uploadable/PodUpload.java | 120 +++++++++++------- 2 files changed, 77 insertions(+), 48 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java index a0866c186bf..73acc27ff57 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/PodOperationsImpl.java @@ -42,7 +42,6 @@ import io.fabric8.kubernetes.client.dsl.TailPrettyLoggable; import io.fabric8.kubernetes.client.dsl.TimeTailPrettyLoggable; import io.fabric8.kubernetes.client.dsl.TtyExecErrorChannelable; -import io.fabric8.kubernetes.client.dsl.TtyExecErrorable; import io.fabric8.kubernetes.client.dsl.TtyExecOutputErrorable; import io.fabric8.kubernetes.client.dsl.TtyExecable; import io.fabric8.kubernetes.client.dsl.internal.ExecWebSocketListener; @@ -562,13 +561,13 @@ public PodOperationsImpl redirectingInput(Integer bufferSize) { } @Override - public TtyExecErrorable writingOutput(OutputStream out) { + public PodOperationsImpl writingOutput(OutputStream out) { checkForPiped(out); return new PodOperationsImpl(getContext().toBuilder().output(new StreamContext(out)).build(), context); } @Override - public TtyExecErrorable redirectingOutput() { + public PodOperationsImpl redirectingOutput() { return new PodOperationsImpl(getContext().toBuilder().output(new StreamContext()).build(), context); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java index a6362752e17..6acb8e16231 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/uploadable/PodUpload.java @@ -15,29 +15,32 @@ */ package io.fabric8.kubernetes.client.dsl.internal.uploadable; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.ExecWatch; import io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl; import io.fabric8.kubernetes.client.utils.InputStreamPumper; import io.fabric8.kubernetes.client.utils.Utils; import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; +import org.apache.commons.compress.archivers.tar.TarConstants; +import org.apache.commons.compress.utils.CountingOutputStream; +import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.reflect.Field; import java.nio.file.Files; import java.nio.file.Path; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.zip.GZIPOutputStream; import static io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl.shellQuote; public class PodUpload { + private static final String FABRIC8DONE = ".fabric8done"; private static final String TAR_PATH_DELIMITER = "/"; private PodUpload() { @@ -47,90 +50,117 @@ public static boolean upload(PodOperationsImpl operation, Path pathToUpload) throws IOException { if (Utils.isNotNullOrEmpty(operation.getContext().getFile()) && pathToUpload.toFile().isFile()) { - return uploadFile(operation, pathToUpload); + return uploadTar(operation, getDirectoryFromFile(operation), + tar -> addFileToTar(null, new File(operation.getContext().getFile()).getName(), pathToUpload.toFile(), tar)); } else if (Utils.isNotNullOrEmpty(operation.getContext().getDir()) && pathToUpload.toFile().isDirectory()) { - return uploadDirectory(operation, pathToUpload); + return uploadTar(operation, operation.getContext().getDir(), tar -> { + for (File file : pathToUpload.toFile().listFiles()) { + addFileToTar(null, file.getName(), file, tar); + } + }); } throw new IllegalArgumentException("Provided arguments are not valid (file, directory, path)"); } - private interface UploadProcessor { + private static String getDirectoryFromFile(PodOperationsImpl operation) { + return Optional.ofNullable(new File(operation.getContext().getFile()).getParent()).orElse("/"); + } + + private interface UploadProcessor { - void process(OutputStream out) throws IOException; + void process(T out) throws IOException; } - private static boolean upload(PodOperationsImpl operation, String command, UploadProcessor processor) throws IOException { - operation = operation.redirectingInput().terminateOnError(); + private static boolean upload(PodOperationsImpl operation, String command, UploadProcessor processor) + throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + operation = operation.redirectingInput().writingOutput(baos).terminateOnError(); CompletableFuture exitFuture; - try (ExecWatch execWatch = operation.exec("sh", "-c", command)) { - OutputStream out = execWatch.getInput(); - processor.process(out); - out.close(); // also flushes - exitFuture = execWatch.exitCode(); + ExecWatch execWatch = operation.exec("sh", "-c", command); + OutputStream out = execWatch.getInput(); + processor.process(out); + out.close(); // also flushes + exitFuture = execWatch.exitCode(); + while (baos.size() == 0) { + try { + Thread.sleep(50L); + } catch (InterruptedException e) { + } } - // TODO: should this timeout be from the start of the upload? - if (!Utils.waitUntilReady(exitFuture, operation.getRequestConfig().getUploadRequestTimeout(), - TimeUnit.MILLISECONDS)) { + if (!baos.toString().trim().equals(FABRIC8DONE)) { return false; } + + // TODO: should this timeout be from the start of the upload? + //if (!Utils.waitUntilReady(exitFuture, operation.getRequestConfig().getUploadRequestTimeout(), + // TimeUnit.MILLISECONDS)) { + // return false; + //} Integer exitCode = exitFuture.getNow(null); return exitCode == null || exitCode.intValue() == 0; } public static boolean uploadFileData(PodOperationsImpl operation, InputStream inputStream) throws IOException { - String command = createExecCommandForUpload(operation.getContext().getFile()); - - return upload(operation, command, os -> InputStreamPumper.transferTo(inputStream, os::write)); + return uploadTar(operation, getDirectoryFromFile(operation), + tar -> addFileToTar(new File(operation.getContext().getFile()).getName(), inputStream, tar)); } - private static boolean uploadFile(PodOperationsImpl operation, Path pathToUpload) + private static boolean uploadTar(PodOperationsImpl operation, String directory, + UploadProcessor processor) throws IOException { - try (final FileInputStream fis = new FileInputStream(pathToUpload.toFile())) { - return uploadFileData(operation, fis); - } - } - private static boolean uploadDirectory(PodOperationsImpl operation, Path pathToUpload) - throws IOException { - - final String command = String.format( - "mkdir -p %1$s && tar -C %1$s -xzf -", shellQuote(operation.getContext().getDir())); + final String command = uploadTarCommand(directory); return upload(operation, command, os -> { - try (final GZIPOutputStream gzip = new GZIPOutputStream(os); - final TarArchiveOutputStream tar = new TarArchiveOutputStream(gzip)) { + try (final TarArchiveOutputStream tar = new TarArchiveOutputStream(os)) { tar.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX); - for (File file : pathToUpload.toFile().listFiles()) { - addFileToTar(null, file, tar); - } + processor.process(tar); + tar.putArchiveEntry(new TarArchiveEntry(FABRIC8DONE)); + tar.closeArchiveEntry(); tar.flush(); } }); } - private static void addFileToTar(String rootTarPath, File file, TarArchiveOutputStream tar) + static String uploadTarCommand(String directory) { + return String.format( + "{ while [[ ! -f %1$s/%2$s ]]; do sleep 1; done; rm %1$s/%2$s; echo %2$s; } & mkdir -p %1$s && tar -C %1$s -xf -", + shellQuote(directory), FABRIC8DONE); + } + + private static void addFileToTar(String fileName, InputStream file, TarArchiveOutputStream tar) throws IOException { + TarArchiveEntry archiveEntry = new TarArchiveEntry(fileName); + archiveEntry.setSize(TarConstants.MAXSIZE); + tar.putArchiveEntry(archiveEntry); + CountingOutputStream countingOutputStream = new CountingOutputStream(tar); + InputStreamPumper.transferTo(file, countingOutputStream::write); + archiveEntry.setSize(countingOutputStream.getBytesWritten()); + try { + Field field = TarArchiveOutputStream.class.getDeclaredField("currSize"); + field.setAccessible(true); + field.set(tar, countingOutputStream.getBytesWritten()); + } catch (Exception e) { + throw KubernetesClientException.launderThrowable(e); + } + tar.closeArchiveEntry(); + } - final String fileName = Optional.ofNullable(rootTarPath).orElse("") + TAR_PATH_DELIMITER + file.getName(); + private static void addFileToTar(String rootTarPath, String fileName, File file, TarArchiveOutputStream tar) + throws IOException { tar.putArchiveEntry(new TarArchiveEntry(file, fileName)); if (file.isFile()) { Files.copy(file.toPath(), tar); tar.closeArchiveEntry(); } else if (file.isDirectory()) { tar.closeArchiveEntry(); + String dirRootPath = Optional.ofNullable(rootTarPath).orElse("") + TAR_PATH_DELIMITER + fileName; for (File fileInDirectory : file.listFiles()) { - addFileToTar(fileName, fileInDirectory, tar); + addFileToTar(dirRootPath, fileInDirectory.getName(), fileInDirectory, tar); } } } - static String createExecCommandForUpload(String file) { - String directoryTrimmedFromFilePath = file.substring(0, file.lastIndexOf('/')); - final String directory = directoryTrimmedFromFilePath.isEmpty() ? "/" : directoryTrimmedFromFilePath; - return String.format( - "mkdir -p %s && cat - > %s", shellQuote(directory), shellQuote(file)); - } - }