Skip to content

Commit

Permalink
fix: callbacks not invoked when attached using getTransfer api (#2111)
Browse files Browse the repository at this point in the history
* fix: listener not invoked when attached using getTransfer api

* address PR comments

* default state to unknown

* add comment

* add comment

* override getRequest in storage operation & make SocketExcpetion retryable

* add nullable annotation

* address nullable request

Co-authored-by: Tyler Roach <[email protected]>
  • Loading branch information
sdhuka and tylerjroach authored Nov 15, 2022
1 parent 9a25914 commit 610edc7
Show file tree
Hide file tree
Showing 19 changed files with 314 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import com.amplifyframework.storage.StorageChannelEventName;
import com.amplifyframework.storage.TransferState;
import com.amplifyframework.storage.operation.StorageUploadFileOperation;
import com.amplifyframework.storage.operation.StorageUploadInputStreamOperation;
import com.amplifyframework.storage.options.StorageUploadFileOptions;
import com.amplifyframework.storage.options.StorageUploadInputStreamOptions;
import com.amplifyframework.storage.s3.test.R;
import com.amplifyframework.storage.s3.util.WorkmanagerTestUtils;
import com.amplifyframework.testutils.random.RandomTempFile;
Expand All @@ -43,6 +45,7 @@
import org.junit.Test;

import java.io.File;
import java.io.FileInputStream;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -122,6 +125,21 @@ public void testUploadSmallFile() throws Exception {
synchronousStorage.uploadFile(fileName, uploadFile, options);
}

/**
* Tests that small file (single-part) uploads using input stream successfully.
*
* @throws Exception if upload fails
*/
@Test
public void testUploadSmallFileStream() throws Exception {
File uploadFile = new RandomTempFile(4 * 1024 * 1024);
String fileName = uploadFile.getName();
StorageUploadInputStreamOptions options = StorageUploadInputStreamOptions.builder()
.accessLevel(TESTING_ACCESS_LEVEL)
.build();
synchronousStorage.uploadInputStream(fileName, new FileInputStream(uploadFile), options);
}

/**
* Tests that large file (multi-part) uploads successfully.
*
Expand Down Expand Up @@ -234,4 +252,130 @@ public void testUploadFileIsResumable() throws Exception {
assertTrue(completed.await(EXTENDED_TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertNull(errorContainer.get());
}

/**
* Tests that file upload operation can be paused and resumed
* using getTransfer API.
*
* @throws Exception if upload is not paused, resumed, and
* completed successfully before timeout
*/
@SuppressWarnings("unchecked")
@Test
public void testUploadFileGetTransferOnPause() throws Exception {
final CountDownLatch completed = new CountDownLatch(1);
final CountDownLatch resumed = new CountDownLatch(1);
final AtomicReference<String> transferId = new AtomicReference<>();
final AtomicReference<StorageUploadFileOperation<?>> opContainer = new AtomicReference<>();
final AtomicReference<Throwable> errorContainer = new AtomicReference<>();

// Create a file large enough that transfer won't finish before being paused
File uploadFile = new RandomTempFile(LARGE_FILE_SIZE);

// Listen to Hub events to resume when operation has been paused
SubscriptionToken resumeToken = Amplify.Hub.subscribe(HubChannel.STORAGE, hubEvent -> {
if (StorageChannelEventName.UPLOAD_STATE.toString().equals(hubEvent.getName())) {
HubEvent<String> stateEvent = (HubEvent<String>) hubEvent;
TransferState state = TransferState.getState(stateEvent.getData());
if (TransferState.PAUSED.equals(state)) {
opContainer.get().clearAllListeners();
storageCategory.getTransfer(transferId.get(),
operation -> {
StorageUploadFileOperation<?> getOp = (StorageUploadFileOperation<?>) operation;
getOp.resume();
resumed.countDown();
getOp.setOnSuccess(result -> {
completed.countDown();
});
}, errorContainer::set);
}
}
});
subscriptions.add(resumeToken);

// Begin uploading a large file
StorageUploadFileOperation<?> op = storageCategory.uploadFile(
uploadFile.getName(),
uploadFile,
options,
progress -> {
if (progress.getCurrentBytes() > 0 && resumed.getCount() > 0) {
opContainer.get().pause();
}
},
result -> { },
errorContainer::set
);
opContainer.set(op);
transferId.set(op.getTransferId());

// Assert that all the required conditions have been met
assertTrue(resumed.await(EXTENDED_TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertTrue(completed.await(EXTENDED_TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertNull(errorContainer.get());
}

/**
* Tests that input stream upload operation can be paused and resumed
* using getTransfer API.
*
* @throws Exception if upload is not paused, resumed, and
* completed successfully before timeout
*/
@SuppressWarnings("unchecked")
@Test
public void testUploadInputStreamGetTransferOnPause() throws Exception {
final CountDownLatch completed = new CountDownLatch(1);
final CountDownLatch resumed = new CountDownLatch(1);
final AtomicReference<String> transferId = new AtomicReference<>();
final AtomicReference<StorageUploadInputStreamOperation<?>> opContainer = new AtomicReference<>();
final AtomicReference<Throwable> errorContainer = new AtomicReference<>();

// Create a file large enough that transfer won't finish before being paused
File uploadFile = new RandomTempFile(LARGE_FILE_SIZE);

// Listen to Hub events to resume when operation has been paused
SubscriptionToken resumeToken = Amplify.Hub.subscribe(HubChannel.STORAGE, hubEvent -> {
if (StorageChannelEventName.UPLOAD_STATE.toString().equals(hubEvent.getName())) {
HubEvent<String> stateEvent = (HubEvent<String>) hubEvent;
TransferState state = TransferState.getState(stateEvent.getData());
if (TransferState.PAUSED.equals(state)) {
opContainer.get().clearAllListeners();
storageCategory.getTransfer(transferId.get(),
operation -> {
StorageUploadFileOperation<?> getOp = (StorageUploadFileOperation<?>) operation;
getOp.resume();
resumed.countDown();
getOp.setOnSuccess(result -> {
completed.countDown();
});
}, errorContainer::set);
}
}
});
subscriptions.add(resumeToken);
StorageUploadInputStreamOptions inputStreamOptions = StorageUploadInputStreamOptions.builder()
.accessLevel(TESTING_ACCESS_LEVEL)
.build();
// Begin uploading a large file
StorageUploadInputStreamOperation<?> op = storageCategory.uploadInputStream(
uploadFile.getName(),
new FileInputStream(uploadFile),
inputStreamOptions,
progress -> {
if (progress.getCurrentBytes() > 0 && resumed.getCount() > 0) {
opContainer.get().pause();
}
},
result -> { },
errorContainer::set
);
opContainer.set(op);
transferId.set(op.getTransferId());

// Assert that all the required conditions have been met
assertTrue(resumed.await(EXTENDED_TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertTrue(completed.await(EXTENDED_TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertNull(errorContainer.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.amplifyframework.storage.StorageAccessLevel;
import com.amplifyframework.storage.StorageException;
import com.amplifyframework.storage.StoragePlugin;
import com.amplifyframework.storage.TransferState;
import com.amplifyframework.storage.operation.StorageDownloadFileOperation;
import com.amplifyframework.storage.operation.StorageGetUrlOperation;
import com.amplifyframework.storage.operation.StorageListOperation;
Expand Down Expand Up @@ -509,61 +510,67 @@ public void getTransfer(
@NonNull Consumer<StorageTransferOperation<?, ? extends StorageTransferResult>> onReceived,
@NonNull Consumer<StorageException> onError) {
executorService.submit(() -> {
TransferRecord transferRecord = storageService.getTransfer(transferId);
if (transferRecord != null) {
TransferObserver transferObserver =
new TransferObserver(
transferRecord.getId(),
storageService.getTransferManager().getTransferStatusUpdater(),
transferRecord.getBucketName(),
transferRecord.getKey(),
transferRecord.getFile(),
null,
transferRecord.getState());
TransferType transferType = transferRecord.getType();
switch (Objects.requireNonNull(transferType)) {
case UPLOAD:
if (transferRecord.getFile().startsWith(TransferStatusUpdater.TEMP_FILE_PREFIX)) {
AWSS3StorageUploadInputStreamOperation operation =
new AWSS3StorageUploadInputStreamOperation(
transferId,
storageService,
executorService,
authCredentialsProvider,
awsS3StoragePluginConfiguration,
null,
transferObserver);
onReceived.accept(operation);
} else {
AWSS3StorageUploadFileOperation operation =
new AWSS3StorageUploadFileOperation(
transferId,
storageService,
executorService,
authCredentialsProvider,
awsS3StoragePluginConfiguration,
null,
transferObserver);
onReceived.accept(operation);
}
break;
case DOWNLOAD:
AWSS3StorageDownloadFileOperation
downloadFileOperation = new AWSS3StorageDownloadFileOperation(
transferId,
new File(transferRecord.getFile()),
storageService,
executorService,
authCredentialsProvider,
awsS3StoragePluginConfiguration,
try {
TransferRecord transferRecord = storageService.getTransfer(transferId);
if (transferRecord != null) {
TransferObserver transferObserver =
new TransferObserver(
transferRecord.getId(),
storageService.getTransferManager().getTransferStatusUpdater(),
transferRecord.getBucketName(),
transferRecord.getKey(),
transferRecord.getFile(),
null,
transferObserver);
onReceived.accept(downloadFileOperation);
break;
default:
transferRecord.getState() != null ? transferRecord.getState() : TransferState.UNKNOWN);
TransferType transferType = transferRecord.getType();
switch (Objects.requireNonNull(transferType)) {
case UPLOAD:
if (transferRecord.getFile().startsWith(TransferStatusUpdater.TEMP_FILE_PREFIX)) {
AWSS3StorageUploadInputStreamOperation operation =
new AWSS3StorageUploadInputStreamOperation(
transferId,
storageService,
executorService,
authCredentialsProvider,
awsS3StoragePluginConfiguration,
null,
transferObserver);
onReceived.accept(operation);
} else {
AWSS3StorageUploadFileOperation operation =
new AWSS3StorageUploadFileOperation(
transferId,
storageService,
executorService,
authCredentialsProvider,
awsS3StoragePluginConfiguration,
null,
transferObserver);
onReceived.accept(operation);
}
break;
case DOWNLOAD:
AWSS3StorageDownloadFileOperation
downloadFileOperation = new AWSS3StorageDownloadFileOperation(
transferId,
new File(transferRecord.getFile()),
storageService,
executorService,
authCredentialsProvider,
awsS3StoragePluginConfiguration,
null,
transferObserver);
onReceived.accept(downloadFileOperation);
break;
default:
}
} else {
onError.accept(new StorageException("Get transfer failed",
"Please verify that the transfer id is valid and the transfer is not completed"));
}
} else {
} catch (Exception exception) {
onError.accept(new StorageException("Get transfer failed",
exception,
"Please verify that the transfer id is valid and the transfer is not completed"));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,13 @@ internal object TransferOperations {
workManager: WorkManager
): Boolean {
if (!TransferState.isInTerminalState(transferRecord.state)) {
val nextState: TransferState = TransferState.PENDING_CANCEL
var nextState: TransferState = TransferState.PENDING_CANCEL
if (TransferState.isPaused(transferRecord.state)) {
if (transferRecord.isMultipart == 1) {
abortMultipartUploadRequest(transferRecord, pluginKey, workManager)
} else {
// transfer is paused so directly mark it as canceled
nextState = TransferState.CANCELED
}
} else {
workManager.cancelUniqueWork(transferRecord.id.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
*/
package com.amplifyframework.storage.s3.operation

import android.util.Log
import com.amplifyframework.auth.AuthCredentialsProvider
import com.amplifyframework.core.Amplify
import com.amplifyframework.core.Consumer
Expand Down Expand Up @@ -53,14 +52,7 @@ class AWSS3StorageDownloadFileOperation @JvmOverloads internal constructor(
) : StorageDownloadFileOperation<AWSS3StorageDownloadFileRequest>(request, transferId, onProgress, onSuccess, onError) {

init {
transferObserver?.let {
val listener = DownloadTransferListener()
Log.d(
"AWSS3StorageDownloadFileOperation",
"Setting up new transfer listener ${listener.hashCode()} for operation ${hashCode()}"
)
it.setTransferListener(listener)
}
transferObserver?.setTransferListener(DownloadTransferListener())
}

constructor(
Expand Down Expand Up @@ -88,18 +80,19 @@ class AWSS3StorageDownloadFileOperation @JvmOverloads internal constructor(

override fun start() {
// Only start if it hasn't already been started
if (transferObserver != null || request == null) {
if (transferObserver != null) {
return
}
val downloadRequest = request ?: return
executorService.submit(
Runnable {
awsS3StoragePluginConfiguration.getAWSS3PluginPrefixResolver(authCredentialsProvider).resolvePrefix(
request.accessLevel,
request.targetIdentityId,
downloadRequest.accessLevel,
downloadRequest.targetIdentityId,
Consumer { prefix: String ->
try {
val serviceKey = prefix + request.key
this.file = request.local
val serviceKey = prefix + downloadRequest.key
this.file = downloadRequest.local
transferObserver = storageService.downloadToFile(transferId, serviceKey, file)
transferObserver?.setTransferListener(DownloadTransferListener())
} catch (exception: Exception) {
Expand Down Expand Up @@ -188,7 +181,7 @@ class AWSS3StorageDownloadFileOperation @JvmOverloads internal constructor(
}

inner class DownloadTransferListener : TransferListener {
override fun onStateChanged(transferId: Int, state: TransferState) {
override fun onStateChanged(transferId: Int, state: TransferState, key: String) {
Amplify.Hub.publish(
HubChannel.STORAGE,
HubEvent.create(StorageChannelEventName.DOWNLOAD_STATE, state.name)
Expand Down
Loading

0 comments on commit 610edc7

Please sign in to comment.