Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #4910: making upload more reliable #4968

Merged
merged 1 commit into from
Apr 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 6.6-SNAPSHOT

#### Bugs
* Fix #4910: Pod file upload will now detect if it's not completely sent to the api server
* Fix #4963: Openshift Client return 403 when use websocket
* Fix #4985: triggering the immediate cleanup of the okhttp idle task
* fix #5002: Jetty response completion accounts for header processing
Expand All @@ -21,9 +22,9 @@
#### New Features

#### _**Note**_: Breaking changes
* Fix #4998: Serialization.yamlMapper and Serialization.clearYamlMapper have been deprecated

* Fix #4875: Removed unused options from the java-generator
* Fix #4910: all Pod file uploads not require commons-compress
* Fix #4998: Serialization.yamlMapper and Serialization.clearYamlMapper have been deprecated

### 6.5.1 (2023-03-20)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,29 @@
import io.fabric8.kubernetes.client.utils.Utils;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.utils.CountingOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;

import static io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl.shellQuote;

public class PodUpload {

private static final Logger LOG = LoggerFactory.getLogger(PodUpload.class);

private static final String TAR_PATH_DELIMITER = "/";

private PodUpload() {
Expand All @@ -47,81 +53,137 @@ public static boolean upload(PodOperationsImpl operation, Path pathToUpload)
throws IOException {

if (Utils.isNotNullOrEmpty(operation.getContext().getFile()) && pathToUpload.toFile().isFile()) {
return uploadFile(operation, pathToUpload);
return uploadTar(operation, getDirectoryFromFile(operation),
tar -> addFileToTar(null, new File(operation.getContext().getFile()).getName(), pathToUpload.toFile(), tar));
} else if (Utils.isNotNullOrEmpty(operation.getContext().getDir()) && pathToUpload.toFile().isDirectory()) {
return uploadDirectory(operation, pathToUpload);
return uploadTar(operation, operation.getContext().getDir(), tar -> {
for (File file : pathToUpload.toFile().listFiles()) {
addFileToTar(null, file.getName(), file, tar);
}
});
}
throw new IllegalArgumentException("Provided arguments are not valid (file, directory, path)");
}

private interface UploadProcessor {
private static String getDirectoryFromFile(PodOperationsImpl operation) {
String file = operation.getContext().getFile();
String directoryTrimmedFromFilePath = file.substring(0, file.lastIndexOf('/'));
return directoryTrimmedFromFilePath.isEmpty() ? "/" : directoryTrimmedFromFilePath;
}

void process(OutputStream out) throws IOException;
private interface UploadProcessor<T extends OutputStream> {

void process(T out) throws IOException;

}

private static boolean upload(PodOperationsImpl operation, String command, UploadProcessor processor) throws IOException {
operation = operation.redirectingInput().terminateOnError();
private static boolean upload(PodOperationsImpl operation, String file, UploadProcessor<OutputStream> processor)
throws IOException {

String command = createExecCommandForUpload(file);

CompletableFuture<Integer> exitFuture;
try (ExecWatch execWatch = operation.exec("sh", "-c", command)) {

int uploadRequestTimeout = operation.getRequestConfig().getUploadRequestTimeout();
long uploadRequestTimeoutEnd = uploadRequestTimeout < 0 ? Long.MAX_VALUE
: uploadRequestTimeout + System.currentTimeMillis();
long expected = 0;
try (ExecWatch execWatch = operation.redirectingInput().terminateOnError().exec("sh", "-c", command)) {
OutputStream out = execWatch.getInput();
processor.process(out);
CountingOutputStream countingStream = new CountingOutputStream(out);
processor.process(countingStream);
out.close(); // also flushes
expected = countingStream.getBytesWritten();
exitFuture = execWatch.exitCode();
}
// TODO: should this timeout be from the start of the upload?
if (!Utils.waitUntilReady(exitFuture, operation.getRequestConfig().getUploadRequestTimeout(),

// enforce the timeout after we've written everything - generally this won't fail, but
// we may have already exceeded the timeout because of how long it took to write
if (!Utils.waitUntilReady(exitFuture, Math.max(0, uploadRequestTimeoutEnd - System.currentTimeMillis()),
TimeUnit.MILLISECONDS)) {
LOG.debug("failed to complete upload before timeout expired");
return false;
}
Integer exitCode = exitFuture.getNow(null);
return exitCode == null || exitCode.intValue() == 0;
}

public static boolean uploadFileData(PodOperationsImpl operation, InputStream inputStream)
throws IOException {
String command = createExecCommandForUpload(operation.getContext().getFile());
if (exitCode != null && exitCode.intValue() != 0) {
LOG.debug("upload process failed with exit code {}", exitCode);
return false;
}

return upload(operation, command, os -> InputStreamPumper.transferTo(inputStream, os::write));
ByteArrayOutputStream byteCount = new ByteArrayOutputStream();
try (ExecWatch countWatch = operation.writingOutput(byteCount).exec("sh", "-c",
String.format("wc -c < %s", shellQuote(file)))) {
CompletableFuture<Integer> countExitFuture = countWatch.exitCode();
if (!Utils.waitUntilReady(countExitFuture, Math.max(0, uploadRequestTimeoutEnd - System.currentTimeMillis()),
TimeUnit.MILLISECONDS) || !Integer.valueOf(0).equals(countExitFuture.getNow(null))) {
LOG.debug("failed to validate the upload size, exit code {}", countExitFuture.getNow(null));
return false;
}
String remoteSize = new String(byteCount.toByteArray(), StandardCharsets.UTF_8);
if (!String.valueOf(expected).equals(remoteSize.trim())) {
LOG.debug("upload file size validation failed, expected {}, but was {}", expected, remoteSize);
return false;
}
}
return true;
}

private static boolean uploadFile(PodOperationsImpl operation, Path pathToUpload)
public static boolean uploadFileData(PodOperationsImpl operation, InputStream inputStream)
throws IOException {
try (final FileInputStream fis = new FileInputStream(pathToUpload.toFile())) {
return uploadFileData(operation, fis);
}
return upload(operation, operation.getContext().getFile(), os -> InputStreamPumper.transferTo(inputStream, os::write));
}

private static boolean uploadDirectory(PodOperationsImpl operation, Path pathToUpload)
private static boolean uploadTar(PodOperationsImpl operation, String directory,
UploadProcessor<TarArchiveOutputStream> processor)
throws IOException {

final String command = String.format(
"mkdir -p %1$s && tar -C %1$s -xzf -", shellQuote(operation.getContext().getDir()));
String fileName = String.format("/tmp/fabric8-%s.tar", UUID.randomUUID());

return upload(operation, command, os -> {
try (final GZIPOutputStream gzip = new GZIPOutputStream(os);
final TarArchiveOutputStream tar = new TarArchiveOutputStream(gzip)) {
boolean uploaded = upload(operation, fileName, os -> {
try (final TarArchiveOutputStream tar = new TarArchiveOutputStream(os)) {
tar.setLongFileMode(TarArchiveOutputStream.LONGFILE_POSIX);
for (File file : pathToUpload.toFile().listFiles()) {
addFileToTar(null, file, tar);
}
tar.flush();
processor.process(tar);
}
});

if (!uploaded) {
// best effort delete of the failed upload
try (ExecWatch rm = operation.writingOutput(new ByteArrayOutputStream()).exec("sh", "-c",
String.format("rm %s", fileName))) {
if (!Utils.waitUntilReady(rm.exitCode(), operation.getRequestConfig().getUploadRequestTimeout(), TimeUnit.MILLISECONDS)
|| !Integer.valueOf(0).equals(rm.exitCode().getNow(null))) {
LOG.warn("delete of temporary tar file {} may not have completed", fileName);
}
}
return false;
}

final String command = extractTarCommand(directory, fileName);

try (ExecWatch execWatch = operation.redirectingInput().exec("sh", "-c", command)) {
CompletableFuture<Integer> countExitFuture = execWatch.exitCode();
// TODO: this enforcement duplicates the timeout
return Utils.waitUntilReady(countExitFuture, operation.getRequestConfig().getUploadRequestTimeout(),
TimeUnit.MILLISECONDS) && Integer.valueOf(0).equals(countExitFuture.getNow(null));
}

}

private static void addFileToTar(String rootTarPath, File file, TarArchiveOutputStream tar)
throws IOException {
static String extractTarCommand(String directory, String tar) {
return String.format("mkdir -p %1$s; tar -C %1$s -xmf %2$s; e=$?; rm %2$s; exit $e", shellQuote(directory), tar);
}

final String fileName = Optional.ofNullable(rootTarPath).orElse("") + TAR_PATH_DELIMITER + file.getName();
private static void addFileToTar(String rootTarPath, String fileName, File file, TarArchiveOutputStream tar)
throws IOException {
tar.putArchiveEntry(new TarArchiveEntry(file, fileName));
if (file.isFile()) {
Files.copy(file.toPath(), tar);
tar.closeArchiveEntry();
} else if (file.isDirectory()) {
tar.closeArchiveEntry();
String dirRootPath = Optional.ofNullable(rootTarPath).orElse("") + TAR_PATH_DELIMITER + fileName;
for (File fileInDirectory : file.listFiles()) {
addFileToTar(fileName, fileInDirectory, tar);
addFileToTar(dirRootPath, fileInDirectory.getName(), fileInDirectory, tar);
}
}
}
Expand Down
Loading