Skip to content

Commit

Permalink
fix #4910: using tar for most uploads and verifying upload
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed Apr 4, 2023
1 parent 160d9d9 commit d15341c
Show file tree
Hide file tree
Showing 4 changed files with 189 additions and 160 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 6.6-SNAPSHOT

#### Bugs
* Fix #4910: Pod file upload will now detect if it's not completely sent to the api server
* Fix #4963: Openshift Client return 403 when use websocket
* Fix #4985: triggering the immediate cleanup of the okhttp idle task
* fix #5002: Jetty response completion accounts for header processing
Expand All @@ -21,9 +22,9 @@
#### New Features

#### _**Note**_: Breaking changes
* Fix #4998: Serialization.yamlMapper and Serialization.clearYamlMapper have been deprecated

* Fix #4875: Removed unused options from the java-generator
* Fix #4910: all Pod file uploads not require commons-compress
* Fix #4998: Serialization.yamlMapper and Serialization.clearYamlMapper have been deprecated

### 6.5.1 (2023-03-20)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,29 @@
import io.fabric8.kubernetes.client.utils.Utils;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.utils.CountingOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;

import static io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl.shellQuote;

public class PodUpload {

private static final Logger LOG = LoggerFactory.getLogger(PodUpload.class);

private static final String TAR_PATH_DELIMITER = "/";

private PodUpload() {
Expand All @@ -47,81 +53,137 @@ public static boolean upload(PodOperationsImpl operation, Path pathToUpload)
throws IOException {

if (Utils.isNotNullOrEmpty(operation.getContext().getFile()) && pathToUpload.toFile().isFile()) {
return uploadFile(operation, pathToUpload);
return uploadTar(operation, getDirectoryFromFile(operation),
tar -> addFileToTar(null, new File(operation.getContext().getFile()).getName(), pathToUpload.toFile(), tar));
} else if (Utils.isNotNullOrEmpty(operation.getContext().getDir()) && pathToUpload.toFile().isDirectory()) {
return uploadDirectory(operation, pathToUpload);
return uploadTar(operation, operation.getContext().getDir(), tar -> {
for (File file : pathToUpload.toFile().listFiles()) {
addFileToTar(null, file.getName(), file, tar);
}
});
}
throw new IllegalArgumentException("Provided arguments are not valid (file, directory, path)");
}

private interface UploadProcessor {
private static String getDirectoryFromFile(PodOperationsImpl operation) {
String file = operation.getContext().getFile();
String directoryTrimmedFromFilePath = file.substring(0, file.lastIndexOf('/'));
return directoryTrimmedFromFilePath.isEmpty() ? "/" : directoryTrimmedFromFilePath;
}

void process(OutputStream out) throws IOException;
private interface UploadProcessor<T extends OutputStream> {

void process(T out) throws IOException;

}

private static boolean upload(PodOperationsImpl operation, String command, UploadProcessor processor) throws IOException {
operation = operation.redirectingInput().terminateOnError();
private static boolean upload(PodOperationsImpl operation, String file, UploadProcessor<OutputStream> processor)
throws IOException {

String command = createExecCommandForUpload(file);

CompletableFuture<Integer> exitFuture;
try (ExecWatch execWatch = operation.exec("sh", "-c", command)) {

int uploadRequestTimeout = operation.getRequestConfig().getUploadRequestTimeout();
long uploadRequestTimeoutEnd = uploadRequestTimeout < 0 ? Long.MAX_VALUE
: uploadRequestTimeout + System.currentTimeMillis();
long expected = 0;
try (ExecWatch execWatch = operation.redirectingInput().terminateOnError().exec("sh", "-c", command)) {
OutputStream out = execWatch.getInput();
processor.process(out);
CountingOutputStream countingStream = new CountingOutputStream(out);
processor.process(countingStream);
out.close(); // also flushes
expected = countingStream.getBytesWritten();
exitFuture = execWatch.exitCode();
}
// TODO: should this timeout be from the start of the upload?
if (!Utils.waitUntilReady(exitFuture, operation.getRequestConfig().getUploadRequestTimeout(),

// enforce the timeout after we've written everything - generally this won't fail, but
// we may have already exceeded the timeout because of how long it took to write
if (!Utils.waitUntilReady(exitFuture, Math.max(0, uploadRequestTimeoutEnd - System.currentTimeMillis()),
TimeUnit.MILLISECONDS)) {
LOG.debug("failed to complete upload before timeout expired");
return false;
}
Integer exitCode = exitFuture.getNow(null);
return exitCode == null || exitCode.intValue() == 0;
}

public static boolean uploadFileData(PodOperationsImpl operation, InputStream inputStream)
throws IOException {
String command = createExecCommandForUpload(operation.getContext().getFile());
if (exitCode != null && exitCode.intValue() != 0) {
LOG.debug("upload process failed with exit code {}", exitCode);
return false;
}

return upload(operation, command, os -> InputStreamPumper.transferTo(inputStream, os::write));
ByteArrayOutputStream byteCount = new ByteArrayOutputStream();
try (ExecWatch countWatch = operation.writingOutput(byteCount).exec("sh", "-c",
String.format("wc -c < %s", shellQuote(file)))) {
CompletableFuture<Integer> countExitFuture = countWatch.exitCode();
if (!Utils.waitUntilReady(countExitFuture, Math.max(0, uploadRequestTimeoutEnd - System.currentTimeMillis()),
TimeUnit.MILLISECONDS) || !Integer.valueOf(0).equals(countExitFuture.getNow(null))) {
LOG.debug("failed to validate the upload size, exit code {}", countExitFuture.getNow(null));
return false;
}
String remoteSize = new String(byteCount.toByteArray(), StandardCharsets.UTF_8);
if (!String.valueOf(expected).equals(remoteSize.trim())) {
LOG.debug("upload file size validation failed, expected {}, but was {}", expected, remoteSize);
return false;
}
}
return true;
}

private static boolean uploadFile(PodOperationsImpl operation, Path pathToUpload)
public static boolean uploadFileData(PodOperationsImpl operation, InputStream inputStream)
throws IOException {
try (final FileInputStream fis = new FileInputStream(pathToUpload.toFile())) {
return uploadFileData(operation, fis);
}
return upload(operation, operation.getContext().getFile(), os -> InputStreamPumper.transferTo(inputStream, os::write));
}

private static boolean uploadDirectory(PodOperationsImpl operation, Path pathToUpload)
private static boolean uploadTar(PodOperationsImpl operation, String directory,
UploadProcessor<TarArchiveOutputStream> processor)
throws IOException {

final String command = String.format(
"mkdir -p %1$s && tar -C %1$s -xzf -", shellQuote(operation.getContext().getDir()));
String fileName = String.format("/tmp/fabric8-%s.tar", UUID.randomUUID());

return upload(operation, command, os -> {
try (final GZIPOutputStream gzip = new GZIPOutputStream(os);
final TarArchiveOutputStream tar = new TarArchiveOutputStream(gzip)) {
boolean uploaded = upload(operation, fileName, os -> {
try (final TarArchiveOutputStream tar = new TarArchiveOutputStream(os)) {
tar.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
for (File file : pathToUpload.toFile().listFiles()) {
addFileToTar(null, file, tar);
}
tar.flush();
processor.process(tar);
}
});

if (!uploaded) {
// best effort delete of the failed upload
try (ExecWatch rm = operation.writingOutput(new ByteArrayOutputStream()).exec("sh", "-c",
String.format("rm %s", fileName))) {
if (!Utils.waitUntilReady(rm.exitCode(), operation.getRequestConfig().getUploadRequestTimeout(), TimeUnit.MILLISECONDS)
|| !Integer.valueOf(0).equals(rm.exitCode().getNow(null))) {
LOG.warn("delete of temporary tar file {} may not have completed", fileName);
}
}
return false;
}

final String command = extractTarCommand(directory, fileName);

try (ExecWatch execWatch = operation.redirectingInput().exec("sh", "-c", command)) {
CompletableFuture<Integer> countExitFuture = execWatch.exitCode();
// TODO: this enforcement duplicates the timeout
return Utils.waitUntilReady(countExitFuture, operation.getRequestConfig().getUploadRequestTimeout(),
TimeUnit.MILLISECONDS) && Integer.valueOf(0).equals(countExitFuture.getNow(null));
}

}

private static void addFileToTar(String rootTarPath, File file, TarArchiveOutputStream tar)
throws IOException {
static String extractTarCommand(String directory, String tar) {
return String.format("mkdir -p %1$s; tar -C %1$s -xmf %2$s; e=$?; rm %2$s; exit $e", shellQuote(directory), tar);
}

final String fileName = Optional.ofNullable(rootTarPath).orElse("") + TAR_PATH_DELIMITER + file.getName();
private static void addFileToTar(String rootTarPath, String fileName, File file, TarArchiveOutputStream tar)
throws IOException {
tar.putArchiveEntry(new TarArchiveEntry(file, fileName));
if (file.isFile()) {
Files.copy(file.toPath(), tar);
tar.closeArchiveEntry();
} else if (file.isDirectory()) {
tar.closeArchiveEntry();
String dirRootPath = Optional.ofNullable(rootTarPath).orElse("") + TAR_PATH_DELIMITER + fileName;
for (File fileInDirectory : file.listFiles()) {
addFileToTar(fileName, fileInDirectory, tar);
addFileToTar(dirRootPath, fileInDirectory.getName(), fileInDirectory, tar);
}
}
}
Expand Down
Loading

0 comments on commit d15341c

Please sign in to comment.