Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add download implementation for API requests #1231

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changes/next-release/bugfix-AWSSDKforJavav2-f0e1099.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"category": "AWS SDK for Java v2",
"type": "bugfix",
"description": "Fix a bug where events in an event stream were being signed with the request date, and not with the current system time."
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,26 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(Path pa
return new FileAsyncResponseTransformer<>(path);
}

/**
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file.
*
* @param path Path to file to write to.
* @param position The value for the data between the current end of the file and the starting position is
* undefined.
* @param isNewFile Whether this is a new file. If this is {@code true} and the file already exists, the
* transformer will complete with an exception.
* @param deleteOnFailure Whether the file on disk should be deleted in the event of a failure when writing the
* stream.
* @param <ResponseT> Pojo Response type.
* @return AsyncResponseTransformer instance.
*/
static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(Path path,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: I have another PR for this in #1222 that will be merged to master. I will update the s3-transfermanager branch once it goes through. Just didn't want it to hold this PR up.

long position,
boolean isNewFile,
boolean deleteOnFailure) {
return new FileAsyncResponseTransformer<>(path, position, isNewFile, deleteOnFailure);
}

/**
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error,
* the SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.utils.Validate;

/**
* {@link AsyncResponseTransformer} that writes the data to the specified file.
Expand All @@ -40,16 +41,29 @@
@SdkInternalApi
public final class FileAsyncResponseTransformer<ResponseT> implements AsyncResponseTransformer<ResponseT, ResponseT> {
private final Path path;
private final long offset;
private final boolean isNewFile;
private final boolean deleteOnFailure;
private volatile AsynchronousFileChannel fileChannel;
private volatile CompletableFuture<Void> cf;
private volatile ResponseT response;

public FileAsyncResponseTransformer(Path path) {
this(path, 0L, true, true);
}

public FileAsyncResponseTransformer(Path path, long offset, boolean isNewFile, boolean deleteOnFailure) {
this.path = path;
this.offset = Validate.isNotNegative(offset, "offset");
this.isNewFile = isNewFile;
this.deleteOnFailure = deleteOnFailure;
}

private AsynchronousFileChannel createChannel(Path path) throws IOException {
return AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
if (isNewFile) {
return AsynchronousFileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
}
return AsynchronousFileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
}

@Override
Expand All @@ -72,15 +86,17 @@ public void onResponse(ResponseT response) {
public void onStream(SdkPublisher<ByteBuffer> publisher) {
// onStream may be called multiple times so reset the file channel every time
this.fileChannel = invokeSafely(() -> createChannel(path));
publisher.subscribe(new FileSubscriber(this.fileChannel, path, cf));
publisher.subscribe(new FileSubscriber(offset, this.fileChannel, path, cf));
}

@Override
public void exceptionOccurred(Throwable throwable) {
try {
invokeSafely(fileChannel::close);
} finally {
invokeSafely(() -> Files.deleteIfExists(path));
if (deleteOnFailure) {
invokeSafely(() -> Files.deleteIfExists(path));
}
}
cf.completeExceptionally(throwable);
}
Expand All @@ -89,7 +105,7 @@ public void exceptionOccurred(Throwable throwable) {
* {@link Subscriber} implementation that writes chunks to a file.
*/
static class FileSubscriber implements Subscriber<ByteBuffer> {
private final AtomicLong position = new AtomicLong();
private final AtomicLong position;

private final AsynchronousFileChannel fileChannel;
private final Path path;
Expand All @@ -99,12 +115,17 @@ static class FileSubscriber implements Subscriber<ByteBuffer> {
private volatile boolean closeOnLastWrite = false;
private Subscription subscription;

FileSubscriber(AsynchronousFileChannel fileChannel, Path path, CompletableFuture<Void> future) {
FileSubscriber(long position, AsynchronousFileChannel fileChannel, Path path, CompletableFuture<Void> future) {
this.position = new AtomicLong(position);
this.fileChannel = fileChannel;
this.path = path;
this.future = future;
}

FileSubscriber(AsynchronousFileChannel fileChannel, Path path, CompletableFuture<Void> future) {
this(0, fileChannel, path, future);
}

@Override
public void onSubscribe(Subscription s) {
if (this.subscription != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.async;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CompletableFuture;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;

/**
* Tests for {@link FileAsyncResponseTransformer}.
*/
public class FileAsyncResponseTransformerTest {
private FileSystem testFs;

@Before
public void setup() {
testFs = Jimfs.newFileSystem(Configuration.forCurrentPlatform());
}

@After
public void teardown() throws IOException {
testFs.close();
}

@Test
public void defaultCreatesNewWritableFile() {
byte[] content = "test".getBytes(StandardCharsets.UTF_8);
Path testFile = testFs.getPath("testFile");
AsyncResponseTransformer<String, String> transformer = AsyncResponseTransformer.toFile(testFile);
CompletableFuture<String> transformFuture = transformer.prepare();
transformer.onResponse("some response");
transformer.onStream(AsyncRequestBody.fromBytes(content));
transformFuture.join();

assertFileContentsEquals(testFile, "test");
}

@Test
public void honorsPosition() throws IOException {
byte[] content = "test".getBytes(StandardCharsets.UTF_8);
Path testFile = testFs.getPath("testFile");
AsyncResponseTransformer<String, String> transformer = AsyncResponseTransformer.toFile(testFile, content.length, true, true);
CompletableFuture<String> transformFuture = transformer.prepare();
transformer.onResponse("some response");
transformer.onStream(AsyncRequestBody.fromBytes(content));
transformFuture.join();

assertThat(Files.size(testFile)).isEqualTo(content.length * 2);
}

@Test
public void honorsNewFileFlags_False() throws IOException {
Path exists = testFs.getPath("exists");
createFileWithContents(exists, "Hello".getBytes(StandardCharsets.UTF_8));

honorsNewFileFlagTest(exists, 5, false, "Test", "HelloTest");
}

@Test
public void honorsNewFileFlag_True_FileNotExists() {
Path notExists = testFs.getPath("notExists");
honorsNewFileFlagTest(notExists, 0, true, "Test", "Test");
}

@Test
public void honorsNewFileFlag_True_FileExists() throws IOException {
Path exists = testFs.getPath("exists");
createFileWithContents(exists, "Hello".getBytes(StandardCharsets.UTF_8));
assertThatThrownBy(() -> honorsNewFileFlagTest(exists, 5, true, "Test", null))
.hasCauseInstanceOf(FileAlreadyExistsException.class);
}

@Test
public void honorsDeleteOnFailure_True_NoExistingFile() {
Path notExists = testFs.getPath("notExists");
honorsDeleteOnFailureTest(notExists, true, true);
}

@Test
public void honorsDeleteOnFailure_True_ExistingFile() throws IOException {
Path exists = testFs.getPath("exists");
createFileWithContents(exists, "Hello".getBytes(StandardCharsets.UTF_8));
honorsDeleteOnFailureTest(exists, false, true);
}

@Test
public void honorsDeleteOnFailure_False_NonExistingFile() {
Path notExists = testFs.getPath("notExists");
honorsDeleteOnFailureTest(notExists, true, false);
}

@Test
public void honorsDeleteOnFailure_False_ExistingFile() throws IOException {
Path exists = testFs.getPath("exists");
createFileWithContents(exists, "Hello".getBytes(StandardCharsets.UTF_8));
honorsDeleteOnFailureTest(exists, false, false);
}

private void honorsNewFileFlagTest(Path file, long position, boolean isNewFile, String streamContents, String expectedContents) {
AsyncResponseTransformer<String, String> transformer = AsyncResponseTransformer.toFile(file, position, isNewFile, true);
CompletableFuture<String> transformFuture = transformer.prepare();
transformer.onResponse("some response");
transformer.onStream(AsyncRequestBody.fromString(streamContents));
transformFuture.join();

if (expectedContents != null) {
assertFileContentsEquals(file, expectedContents);
}
}

private void honorsDeleteOnFailureTest(Path file, boolean isNewFile, boolean deleteOnFailure) {
AsyncResponseTransformer<String, String> transformer = AsyncResponseTransformer.toFile(file, 0, isNewFile, deleteOnFailure);
CompletableFuture<String> transformFuture = transformer.prepare();
IOException error = new IOException("Something went wrong");
transformer.onResponse("some response");
transformer.onStream(new ErrorPublisher<>(error));
transformer.exceptionOccurred(error);
assertThatThrownBy(transformFuture::join).hasCause(error);
if (deleteOnFailure) {
assertThat(Files.exists(file)).isFalse();
} else {
assertThat(Files.exists(file)).isTrue();
}
}

private static void createFileWithContents(Path file, byte[] contents) throws IOException {
OutputStream os = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
os.write(contents);
os.close();
}

private static void assertFileContentsEquals(Path file, String expected) {
StringBuilder sb = new StringBuilder();
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(Files.newInputStream(file)));
String s;
while ((s = reader.readLine()) != null) {
sb.append(s);
}
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
assertThat(sb.toString()).isEqualTo(expected);
}

private static final class ErrorPublisher<T> implements SdkPublisher<T> {
private final Throwable error;

private ErrorPublisher(Throwable error) {
this.error = error;
}

@Override
public void subscribe(Subscriber<? super T> subscriber) {
subscriber.onSubscribe(new ErrorSubscription(subscriber, error));
}
}

private static final class ErrorSubscription implements Subscription {
private final Subscriber<?> subscriber;
private final Throwable error;

public ErrorSubscription(Subscriber<?> subscriber, Throwable error) {
this.subscriber = subscriber;
this.error = error;
}

@Override
public void request(long l) {
subscriber.onError(error);
}

@Override
public void cancel() {

}
}
}
15 changes: 10 additions & 5 deletions docs/design/services/s3/transfermanager/prototype.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,16 @@ interface Builder {
*/
Builder maxDownloadBytesPerSecond(Long maxDownloadBytesPerSecond);

/**
* The multipart download configuration.
*/
Builder multipartDownloadConfiguration(MultipartDownloadConfiguration multipartDownloadConfiguration);

/**
* The multipart upload configuration.
*/
Builder multipartUploadConfiguration(MultipartUploadConfiguration multipartUploadConfiguration);

/**
* Add a progress listener to the currently configured list of
* listeners.
Expand Down Expand Up @@ -445,11 +455,6 @@ public interface SinglePartDownloadContext {
* The original download request given to the Transfer Manager.
*/
DownloadObjectRequest downloadRequest();

/**
* The request sent to S3 for this object. This is empty if downloading a presigned URL.
*/
GetObjectRequest objectRequest();
}

/**
Expand Down
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
<module>aws-sdk-java</module>
<module>core</module>
<module>services</module>
<module>services-custom/s3-transfermanager</module>
<module>bom</module>
<module>bom-internal</module>
<module>codegen</module>
Expand Down
Loading