Skip to content

Commit

Permalink
Add more options for FileAsyncResponseTransformer
Browse files Browse the repository at this point in the history
Add the following things to control the behavior of the transformer:

 - position parameter to specify where in the file to write the stream
 - isNewFile parameter to specify whether this is a new file
 - deleteOnFailure to specify whether the file should be deleted on failure
  • Loading branch information
dagnir committed Apr 23, 2019
1 parent 66cc220 commit 9749ff7
Show file tree
Hide file tree
Showing 4 changed files with 281 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,26 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(Path pa
return new FileAsyncResponseTransformer<>(path);
}

/**
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file.
*
* @param path Path to file to write to.
* @param position The value for the data between the current end of the file and the starting position is
* undefined.
* @param isNewFile Whether this is a new file. If this is {@code true} and the file already exists, the
* transformer will complete with an exception.
* @param deleteOnFailure Whether the file on disk should be deleted in the event of a failure when writing the
* stream.
* @param <ResponseT> Pojo Response type.
* @return AsyncResponseTransformer instance.
*/
static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseT> toFile(Path path,
long position,
boolean isNewFile,
boolean deleteOnFailure) {
return new FileAsyncResponseTransformer<>(path, position, isNewFile, deleteOnFailure);
}

/**
* Creates an {@link AsyncResponseTransformer} that writes all the content to the given file. In the event of an error,
* the SDK will attempt to delete the file (whatever has been written to it so far). If the file already exists, an
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.utils.Validate;

/**
* {@link AsyncResponseTransformer} that writes the data to the specified file.
Expand All @@ -40,16 +41,29 @@
@SdkInternalApi
public final class FileAsyncResponseTransformer<ResponseT> implements AsyncResponseTransformer<ResponseT, ResponseT> {
private final Path path;
private final long offset;
private final boolean isNewFile;
private final boolean deleteOnFailure;
private volatile AsynchronousFileChannel fileChannel;
private volatile CompletableFuture<Void> cf;
private volatile ResponseT response;

public FileAsyncResponseTransformer(Path path) {
this(path, 0L, true, true);
}

public FileAsyncResponseTransformer(Path path, long offset, boolean isNewFile, boolean deleteOnFailure) {
this.path = path;
this.offset = Validate.isNotNegative(offset, "offset");
this.isNewFile = isNewFile;
this.deleteOnFailure = deleteOnFailure;
}

private AsynchronousFileChannel createChannel(Path path) throws IOException {
return AsynchronousFileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
if (isNewFile) {
return AsynchronousFileChannel.open(path, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
}
return AsynchronousFileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
}

@Override
Expand All @@ -72,15 +86,17 @@ public void onResponse(ResponseT response) {
public void onStream(SdkPublisher<ByteBuffer> publisher) {
// onStream may be called multiple times so reset the file channel every time
this.fileChannel = invokeSafely(() -> createChannel(path));
publisher.subscribe(new FileSubscriber(this.fileChannel, path, cf));
publisher.subscribe(new FileSubscriber(offset, this.fileChannel, path, cf));
}

@Override
public void exceptionOccurred(Throwable throwable) {
try {
invokeSafely(fileChannel::close);
} finally {
invokeSafely(() -> Files.deleteIfExists(path));
if (deleteOnFailure) {
invokeSafely(() -> Files.deleteIfExists(path));
}
}
cf.completeExceptionally(throwable);
}
Expand All @@ -89,7 +105,7 @@ public void exceptionOccurred(Throwable throwable) {
* {@link Subscriber} implementation that writes chunks to a file.
*/
static class FileSubscriber implements Subscriber<ByteBuffer> {
private final AtomicLong position = new AtomicLong();
private final AtomicLong position;

private final AsynchronousFileChannel fileChannel;
private final Path path;
Expand All @@ -99,12 +115,17 @@ static class FileSubscriber implements Subscriber<ByteBuffer> {
private volatile boolean closeOnLastWrite = false;
private Subscription subscription;

FileSubscriber(AsynchronousFileChannel fileChannel, Path path, CompletableFuture<Void> future) {
FileSubscriber(long position, AsynchronousFileChannel fileChannel, Path path, CompletableFuture<Void> future) {
this.position = new AtomicLong(position);
this.fileChannel = fileChannel;
this.path = path;
this.future = future;
}

FileSubscriber(AsynchronousFileChannel fileChannel, Path path, CompletableFuture<Void> future) {
this(0, fileChannel, path, future);
}

@Override
public void onSubscribe(Subscription s) {
if (this.subscription != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
/*
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.async;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CompletableFuture;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;

/**
* Tests for {@link FileAsyncResponseTransformer}.
*/
public class FileAsyncResponseTransformerTest {
private FileSystem testFs;

@Before
public void setup() {
testFs = Jimfs.newFileSystem(Configuration.forCurrentPlatform());
}

@After
public void teardown() throws IOException {
testFs.close();
}

@Test
public void defaultCreatesNewWritableFile() {
byte[] content = "test".getBytes(StandardCharsets.UTF_8);
Path testFile = testFs.getPath("testFile");
AsyncResponseTransformer<String, String> transformer = AsyncResponseTransformer.toFile(testFile);
CompletableFuture<String> transformFuture = transformer.prepare();
transformer.onResponse("some response");
transformer.onStream(AsyncRequestBody.fromBytes(content));
transformFuture.join();

assertFileContentsEquals(testFile, "test");
}

@Test
public void honorsPosition() throws IOException {
byte[] content = "test".getBytes(StandardCharsets.UTF_8);
Path testFile = testFs.getPath("testFile");
AsyncResponseTransformer<String, String> transformer = AsyncResponseTransformer.toFile(testFile, content.length, true, true);
CompletableFuture<String> transformFuture = transformer.prepare();
transformer.onResponse("some response");
transformer.onStream(AsyncRequestBody.fromBytes(content));
transformFuture.join();

assertThat(Files.size(testFile)).isEqualTo(content.length * 2);
}

@Test
public void honorsNewFileFlags_False() throws IOException {
Path exists = testFs.getPath("exists");
createFileWithContents(exists, "Hello".getBytes(StandardCharsets.UTF_8));

honorsNewFileFlagTest(exists, 5, false, "Test", "HelloTest");
}

@Test
public void honorsNewFileFlag_True_FileNotExists() {
Path notExists = testFs.getPath("notExists");
honorsNewFileFlagTest(notExists, 0, true, "Test", "Test");
}

@Test
public void honorsNewFileFlag_True_FileExists() throws IOException {
Path exists = testFs.getPath("exists");
createFileWithContents(exists, "Hello".getBytes(StandardCharsets.UTF_8));
assertThatThrownBy(() -> honorsNewFileFlagTest(exists, 5, true, "Test", null))
.hasCauseInstanceOf(FileAlreadyExistsException.class);
}

@Test
public void honorsDeleteOnFailure_True_NoExistingFile() {
Path notExists = testFs.getPath("notExists");
honorsDeleteOnFailureTest(notExists, true, true);
}

@Test
public void honorsDeleteOnFailure_True_ExistingFile() throws IOException {
Path exists = testFs.getPath("exists");
createFileWithContents(exists, "Hello".getBytes(StandardCharsets.UTF_8));
honorsDeleteOnFailureTest(exists, false, true);
}

@Test
public void honorsDeleteOnFailure_False_NoExistingFile() {
Path notExists = testFs.getPath("notExists");
honorsDeleteOnFailureTest(notExists, true, false);
}

@Test
public void honorsDeleteOnFailure_False_ExistingFile() throws IOException {
Path exists = testFs.getPath("exists");
createFileWithContents(exists, "Hello".getBytes(StandardCharsets.UTF_8));
honorsDeleteOnFailureTest(exists, false, false);
}

private void honorsNewFileFlagTest(Path file, long position, boolean isNewFile, String streamContents, String expectedContents) {
AsyncResponseTransformer<String, String> transformer = AsyncResponseTransformer.toFile(file, position, isNewFile, true);
CompletableFuture<String> transformFuture = transformer.prepare();
transformer.onResponse("some response");
transformer.onStream(AsyncRequestBody.fromString(streamContents));
transformFuture.join();

if (expectedContents != null) {
assertFileContentsEquals(file, expectedContents);
}
}

private void honorsDeleteOnFailureTest(Path file, boolean isNewFile, boolean deleteOnFailure) {
AsyncResponseTransformer<String, String> transformer = AsyncResponseTransformer.toFile(file, 0, isNewFile, deleteOnFailure);
CompletableFuture<String> transformFuture = transformer.prepare();
IOException error = new IOException("Something went wrong");
transformer.onResponse("some response");
transformer.onStream(new ErrorPublisher<>(error));
transformer.exceptionOccurred(error);
assertThatThrownBy(transformFuture::join).hasCause(error);
if (deleteOnFailure) {
assertThat(Files.exists(file)).isFalse();
} else {
assertThat(Files.exists(file)).isTrue();
}
}

private static void createFileWithContents(Path file, byte[] contents) throws IOException {
OutputStream os = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);
os.write(contents);
os.close();
}

private static void assertFileContentsEquals(Path file, String expected) {
StringBuilder sb = new StringBuilder();
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(Files.newInputStream(file)));
String s;
while ((s = reader.readLine()) != null) {
sb.append(s);
}
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
assertThat(sb.toString()).isEqualTo(expected);
}

private static final class ErrorPublisher<T> implements SdkPublisher<T> {
private final Throwable error;

private ErrorPublisher(Throwable error) {
this.error = error;
}

@Override
public void subscribe(Subscriber<? super T> subscriber) {
subscriber.onSubscribe(new ErrorSubscription(subscriber, error));
}
}

private static final class ErrorSubscription implements Subscription {
private final Subscriber<?> subscriber;
private final Throwable error;

public ErrorSubscription(Subscriber<?> subscriber, Throwable error) {
this.subscriber = subscriber;
this.error = error;
}

@Override
public void request(long l) {
subscriber.onError(error);
}

@Override
public void cancel() {

}
}
}
23 changes: 23 additions & 0 deletions utils/src/main/java/software/amazon/awssdk/utils/Validate.java
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,13 @@ public static long isPositive(long num, String fieldName) {
return num;
}

/**
* Asserts that the given number is non-negative.
*
* @param num Number to validate
* @param fieldName Field name to display in exception message if negative.
* @return Number if not negative.
*/
public static int isNotNegative(int num, String fieldName) {

if (num < 0) {
Expand All @@ -610,6 +617,22 @@ public static int isNotNegative(int num, String fieldName) {
return num;
}

/**
* Asserts that the given number is non-negative.
*
* @param num Number to validate
* @param fieldName Field name to display in exception message if negative.
* @return Number if not negative.
*/
public static long isNotNegative(long num, String fieldName) {

if (num < 0) {
throw new IllegalArgumentException(String.format("%s must not be negative", fieldName));
}

return num;
}

/**
* Asserts that the given duration is positive (non-negative and non-zero).
*
Expand Down

0 comments on commit 9749ff7

Please sign in to comment.