Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add missing apis in storage Kotlin & RxJava facade #2160

Merged
merged 5 commits into from
Dec 8, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.amplifyframework.kotlin.storage

import com.amplifyframework.core.Amplify
import com.amplifyframework.core.async.Cancelable
import com.amplifyframework.kotlin.storage.Storage.InProgressStorageOperation
import com.amplifyframework.storage.StorageCategoryBehavior as Delegate
import com.amplifyframework.storage.StorageException
Expand Down Expand Up @@ -75,10 +74,11 @@ class KotlinStorageFacade(private val delegate: Delegate = Amplify.Storage) : St
{ errors.tryEmit(it) }
)
return InProgressStorageOperation(
operation.transferId,
results.asSharedFlow(),
progress.asSharedFlow(),
errors.asSharedFlow(),
operation as Cancelable
operation
)
}

Expand All @@ -98,10 +98,11 @@ class KotlinStorageFacade(private val delegate: Delegate = Amplify.Storage) : St
{ errors.tryEmit(it) }
)
return InProgressStorageOperation(
operation.transferId,
results.asSharedFlow(),
progress.asSharedFlow(),
errors.asSharedFlow(),
operation as Cancelable
operation
)
}

Expand All @@ -125,10 +126,11 @@ class KotlinStorageFacade(private val delegate: Delegate = Amplify.Storage) : St
{ errors.tryEmit(it) }
)
return InProgressStorageOperation(
cancelable.transferId,
results.asSharedFlow(),
progress.asSharedFlow(),
errors.asSharedFlow(),
cancelable as Cancelable
cancelable
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.amplifyframework.kotlin.storage

import com.amplifyframework.core.async.Cancelable
import com.amplifyframework.core.async.Resumable
import com.amplifyframework.storage.StorageException
import com.amplifyframework.storage.operation.StorageTransferOperation
import com.amplifyframework.storage.options.StorageDownloadFileOptions
Expand Down Expand Up @@ -91,11 +92,12 @@ interface Storage {

@FlowPreview
data class InProgressStorageOperation<T>(
val transferId: String,
private val results: Flow<T>,
private val progress: Flow<StorageTransferProgress>,
private val errors: Flow<StorageException>,
private val delegate: Cancelable?
) : Cancelable {
private val delegate: StorageTransferOperation<*, *>?
) : Cancelable, Resumable {

override fun cancel() {
delegate?.cancel()
Expand All @@ -120,5 +122,13 @@ interface Storage {
.map { it as T }
.first()
}

override fun pause() {
delegate?.pause()
}

override fun resume() {
delegate?.resume()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.amplifyframework.storage.result.StorageUploadFileResult
import com.amplifyframework.storage.result.StorageUploadInputStreamResult
import io.mockk.every
import io.mockk.mockk
import io.mockk.verifyOrder
import java.io.File
import java.io.InputStream
import java.net.URL
Expand Down Expand Up @@ -102,12 +103,13 @@ class KotlinStorageFacadeTest {
fun downloadFileSucceeds(): Unit = runBlocking {
val fromRemoteKey = "kool-pic.png"
val toLocalFile = File("/local/path/kool-pic.png")

val transferId = UUID.randomUUID().toString()
val progressEvents = (0L until 101 step 50)
.map { amount -> StorageTransferProgress(amount, 100) }

val cancelable = mockk<StorageDownloadFileOperation<*>>()
every { cancelable.cancel() } answers {}
every { cancelable.transferId } answers { transferId }

every {
delegate.downloadFile(eq(fromRemoteKey), eq(toLocalFile), any(), any(), any(), any())
Expand All @@ -127,11 +129,45 @@ class KotlinStorageFacadeTest {
}

val download = storage.downloadFile(fromRemoteKey, toLocalFile)
assertEquals(transferId, download.transferId)
val actualProgressEvents = download.progress().take(progressEvents.size).toList()
assertEquals(progressEvents, actualProgressEvents)
assertEquals(toLocalFile, download.result().file)
}

/**
* When the downloadFile() kotlin operation invokes pause, resume & cancel operation then corresponding
* delegate apis are invoked.
*/
@Test
fun performActionsOnDownloadFile(): Unit = runBlocking {
val fromRemoteKey = "kool-pic.png"
val toLocalFile = File("/local/path/kool-pic.png")
val transferId = UUID.randomUUID().toString()

val cancelable = mockk<StorageDownloadFileOperation<*>>()
every { cancelable.cancel() } answers {}
every { cancelable.pause() } answers {}
every { cancelable.resume() } answers {}
every { cancelable.transferId } answers { transferId }

every {
delegate.downloadFile(eq(fromRemoteKey), eq(toLocalFile), any(), any(), any(), any())
} answers {
cancelable
}

val download = storage.downloadFile(fromRemoteKey, toLocalFile)
download.pause()
download.resume()
download.cancel()
verifyOrder {
cancelable.pause()
cancelable.resume()
cancelable.cancel()
}
}

/**
* When the downloadFile() API emits an error, it should be thrown from
* the coroutine API.
Expand All @@ -141,10 +177,10 @@ class KotlinStorageFacadeTest {
val fromRemoteKey = "kool-pic.png"
val toLocalFile = File("/local/path/kool-pic.png")
val error = StorageException("uh", "oh")

val transferId = UUID.randomUUID().toString()
val cancelable = mockk<StorageDownloadFileOperation<*>>()
every { cancelable.cancel() } answers {}

every { cancelable.transferId } answers { transferId }
every {
delegate.downloadFile(eq(fromRemoteKey), eq(toLocalFile), any(), any(), any(), any())
} answers {
Expand All @@ -169,10 +205,10 @@ class KotlinStorageFacadeTest {

val progressEvents = (0L until 101 step 50)
.map { amount -> StorageTransferProgress(amount, 100) }

val transferId = UUID.randomUUID().toString()
val cancelable = mockk<StorageUploadFileOperation<*>>()
every { cancelable.cancel() } answers {}

every { cancelable.transferId } answers { transferId }
every {
delegate.uploadFile(eq(toRemoteKey), eq(fromLocalFile), any(), any(), any(), any())
} answers {
Expand All @@ -191,7 +227,9 @@ class KotlinStorageFacadeTest {
}

val upload = storage.uploadFile(toRemoteKey, fromLocalFile)
assertEquals(transferId, upload.transferId)
val receivedProgressEvents = upload.progress().take(3).toList()
assertEquals(transferId, upload.transferId)
assertEquals(progressEvents, receivedProgressEvents)
assertEquals(toRemoteKey, upload.result().key)
}
Expand All @@ -205,10 +243,10 @@ class KotlinStorageFacadeTest {
val toRemoteKey = "kool-pic.png"
val fromLocalFile = File("/local/path/kool-pic.png")
val error = StorageException("uh", "oh")

val transferId = UUID.randomUUID().toString()
val cancelable = mockk<StorageUploadFileOperation<*>>()
every { cancelable.cancel() } answers {}

every { cancelable.transferId } answers { transferId }
every {
delegate.uploadFile(eq(toRemoteKey), eq(fromLocalFile), any(), any(), any(), any())
} answers {
Expand All @@ -222,6 +260,37 @@ class KotlinStorageFacadeTest {
.result()
}

/**
* When the uploadFile() kotlin operation invokes pause, resume & cancel operation then corresponding
* delegate apis are invoked.
*/
@Test
fun performActionOnUploadFileSucceeds() = runBlocking {
val toRemoteKey = "kool-pic.png"
val fromLocalFile = File("/local/path/kool-pic.png")
val transferId = UUID.randomUUID().toString()
val cancelable = mockk<StorageUploadFileOperation<*>>()
every { cancelable.cancel() } answers {}
every { cancelable.pause() } answers {}
every { cancelable.resume() } answers {}
every { cancelable.transferId } answers { transferId }
every {
delegate.uploadFile(eq(toRemoteKey), eq(fromLocalFile), any(), any(), any(), any())
} answers {
cancelable
}

val upload = storage.uploadFile(toRemoteKey, fromLocalFile)
upload.pause()
upload.resume()
upload.cancel()
verifyOrder {
cancelable.pause()
cancelable.resume()
cancelable.cancel()
}
}

/**
* When the underlying uploadInputStream() delegate emits a result,
* it should be returned from the coroutine API.
Expand All @@ -230,13 +299,13 @@ class KotlinStorageFacadeTest {
fun uploadInputStreamSucceeds() = runBlocking {
val toRemoteKey = "kool-pic.png"
val fromStream = mockk<InputStream>()

val transferId = UUID.randomUUID().toString()
val progressEvents = (0L until 101 step 50)
.map { amount -> StorageTransferProgress(amount, 100) }

val cancelable = mockk<StorageUploadInputStreamOperation<*>>()
every { cancelable.cancel() } answers {}

every { cancelable.transferId } answers { transferId }
every {
delegate.uploadInputStream(eq(toRemoteKey), eq(fromStream), any(), any(), any(), any())
} answers {
Expand All @@ -255,6 +324,7 @@ class KotlinStorageFacadeTest {
}

val upload = storage.uploadInputStream(toRemoteKey, fromStream)
assertEquals(transferId, upload.transferId)
val receivedProgressEvents = upload.progress().take(3).toList()
assertEquals(progressEvents, receivedProgressEvents)
assertEquals(toRemoteKey, upload.result().key)
Expand All @@ -269,9 +339,10 @@ class KotlinStorageFacadeTest {
val toRemoteKey = "kool-pic.png"
val fromStream = mockk<InputStream>()
val error = StorageException("uh", "oh")

val transferId = UUID.randomUUID().toString()
val cancelable = mockk<StorageUploadInputStreamOperation<*>>()
every { cancelable.cancel() } answers {}
every { cancelable.transferId } answers { transferId }

every {
delegate.uploadInputStream(eq(toRemoteKey), eq(fromStream), any(), any(), any(), any())
Expand All @@ -286,6 +357,38 @@ class KotlinStorageFacadeTest {
.result()
}

/**
* When the underlying uploadInputStream() kotlin operation invokes pause, resume & cancel operation then the
* corresponding delegate apis are invoked.
*/
@Test
fun performActionOnUploadInputStream() = runBlocking {
val toRemoteKey = "kool-pic.png"
val fromStream = mockk<InputStream>()
val transferId = UUID.randomUUID().toString()
val cancelable = mockk<StorageUploadInputStreamOperation<*>>()
every { cancelable.cancel() } answers {}
every { cancelable.pause() } answers {}
every { cancelable.resume() } answers {}
every { cancelable.transferId } answers { transferId }

every {
delegate.uploadInputStream(eq(toRemoteKey), eq(fromStream), any(), any(), any(), any())
} answers {
cancelable
}

val upload = storage.uploadInputStream(toRemoteKey, fromStream)
upload.pause()
upload.resume()
upload.cancel()
verifyOrder {
cancelable.pause()
cancelable.resume()
cancelable.cancel()
}
}

/**
* When the remove() delegate emits a result, it should be returned
* from the coroutine API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

import com.amplifyframework.core.Amplify;
import com.amplifyframework.core.Consumer;
import com.amplifyframework.core.async.Cancelable;
import com.amplifyframework.core.async.NoOpCancelable;
import com.amplifyframework.core.async.Resumable;
import com.amplifyframework.rx.RxAdapters.CancelableBehaviors;
import com.amplifyframework.storage.StorageCategory;
import com.amplifyframework.storage.StorageCategoryBehavior;
Expand Down Expand Up @@ -186,10 +186,10 @@ private <T> Single<T> toSingle(CancelableBehaviors.ResultEmitter<T, StorageExcep
* progress information and returns a single.
* @param <T> The type that represents the result of a given operation.
*/
public static final class RxProgressAwareSingleOperation<T> implements RxAdapters.RxSingleOperation<T> {
public static final class RxProgressAwareSingleOperation<T> implements RxAdapters.RxSingleOperation<T>, Resumable {
private final PublishSubject<StorageTransferProgress> progressSubject;
private final ReplaySubject<T> resultSubject;
private final Cancelable amplifyOperation;
private final StorageTransferOperation<?, ?> amplifyOperation;

RxProgressAwareSingleOperation(RxStorageTransferCallbackMapper<T> callbacks) {
progressSubject = PublishSubject.create();
Expand All @@ -199,6 +199,26 @@ public static final class RxProgressAwareSingleOperation<T> implements RxAdapter
resultSubject::onError);
}

/**
* Return the transfer ID for this operation.
*
* @return unique transferId for this operation
*/
@NonNull
public String getTransferId() {
return amplifyOperation.getTransferId();
}

@Override
public void resume() {
amplifyOperation.resume();
}

@Override
public void pause() {
amplifyOperation.pause();
}

@Override
public void cancel() {
amplifyOperation.cancel();
Expand Down Expand Up @@ -235,7 +255,7 @@ public Observable<StorageTransferProgress> observeProgress() {
* @param <T> The type that represents the result of a given operation.
*/
interface RxStorageTransferCallbackMapper<T> {
Cancelable emitTo(
StorageTransferOperation<?, ?> emitTo(
Consumer<StorageTransferProgress> onProgress,
Consumer<T> onItem,
Consumer<StorageException> onError
Expand Down
Loading