Skip to content

Commit

Permalink
Merge pull request #832 from Infomaniak/cancel-all-pending-upload-ses…
Browse files Browse the repository at this point in the history
…sions

Cancel all pending upload sessions if needed
  • Loading branch information
JorisBodin authored Jul 12, 2022
2 parents ba94f85 + 68e0b32 commit b2a1510
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 63 deletions.
45 changes: 21 additions & 24 deletions app/src/main/java/com/infomaniak/drive/data/api/UploadTask.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import com.infomaniak.drive.data.models.upload.ValidChunks
import com.infomaniak.drive.data.services.UploadWorker
import com.infomaniak.drive.data.sync.UploadNotifications
import com.infomaniak.drive.ui.MainActivity
import com.infomaniak.drive.utils.KDriveHttpClient
import com.infomaniak.drive.utils.NotificationUtils.CURRENT_UPLOAD_ID
import com.infomaniak.drive.utils.NotificationUtils.ELAPSED_TIME
import com.infomaniak.drive.utils.NotificationUtils.uploadProgressNotification
Expand All @@ -49,7 +48,6 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.Semaphore
import kotlinx.coroutines.sync.withLock
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.RequestBody
import okhttp3.RequestBody.Companion.toRequestBody
Expand Down Expand Up @@ -88,7 +86,7 @@ class UploadTask(
launchTask(this)
return@withContext true
} catch (exception: FileNotFoundException) {
UploadFile.deleteIfExists(uploadFile.getUriObject(), keepFile = uploadFile.isSync())
uploadFile.deleteIfExists(keepFile = uploadFile.isSync())
Sentry.withScope { scope ->
scope.level = SentryLevel.WARNING
scope.setExtra("data", gson.toJson(uploadFile))
Expand Down Expand Up @@ -125,22 +123,21 @@ class UploadTask(
private suspend fun launchTask(coroutineScope: CoroutineScope) = withContext(Dispatchers.IO) {
val uri = uploadFile.getUriObject()
val fileInputStream = context.contentResolver.openInputStream(uploadFile.getOriginalUri(context))
val okHttpClient = KDriveHttpClient.getHttpClient(userId = uploadFile.userId, timeout = 120)

initChunkSize(uploadFile.fileSize)
checkLimitParallelRequest()

BufferedInputStream(fileInputStream, chunkSize).use { input ->
val waitingCoroutines = arrayListOf<Job>()
val waitingCoroutines = mutableListOf<Job>()
val requestSemaphore = Semaphore(limitParallelRequest)
val totalChunks = ceil(uploadFile.fileSize.toDouble() / chunkSize).toInt()

if (totalChunks > TOTAL_CHUNKS) throw TotalChunksExceededException()

val uploadedChunks = uploadFile.getValidChunks(okHttpClient)
val uploadedChunks = uploadFile.getValidChunks()
val isNewUploadSession = uploadedChunks?.let { needToResetUpload(it) } ?: true

if (isNewUploadSession) uploadFile.prepareUploadSession(totalChunks, okHttpClient)
if (isNewUploadSession) uploadFile.prepareUploadSession(totalChunks)

Sentry.addBreadcrumb(Breadcrumb().apply {
category = UploadWorker.BREADCRUMB_TAG
Expand Down Expand Up @@ -176,19 +173,19 @@ class UploadTask(
Log.d("kDrive", "Upload > Start upload ${uploadFile.fileName} to $url data size:${data.size}")

waitingCoroutines.add(
coroutineScope.uploadChunkRequest(requestSemaphore, data.toRequestBody(), url, okHttpClient)
coroutineScope.uploadChunkRequest(requestSemaphore, data.toRequestBody(), url)
)
}
waitingCoroutines.joinAll()
}

coroutineScope.ensureActive()
onFinish(uri, okHttpClient)
onFinish(uri)
}

private suspend fun onFinish(uri: Uri, okHttpClient: OkHttpClient) {
with(ApiRepository.finishSession(uploadFile.driveId, uploadFile.uploadToken!!, okHttpClient)) {
if (!isSuccess()) manageUploadErrors(okHttpClient)
private suspend fun onFinish(uri: Uri) = with(uploadFile) {
with(ApiRepository.finishSession(driveId, uploadToken!!, okHttpClient)) {
if (!isSuccess()) manageUploadErrors()
}
uploadNotification.apply {
setOngoing(false)
Expand All @@ -205,8 +202,7 @@ class UploadTask(
private fun CoroutineScope.uploadChunkRequest(
requestSemaphore: Semaphore,
requestBody: RequestBody,
url: String,
okHttpClient: OkHttpClient
url: String
) = launch(Dispatchers.IO) {
val uploadRequestBody = ProgressRequestBody(requestBody) { currentBytes, bytesWritten, contentLength ->
launch {
Expand All @@ -220,8 +216,8 @@ class UploadTask(
.headers(HttpUtils.getHeaders(contentType = null))
.post(uploadRequestBody).build()

val response = okHttpClient.newCall(request).execute()
manageApiResponse(response, okHttpClient)
val response = uploadFile.okHttpClient.newCall(request).execute()
manageApiResponse(response)
requestSemaphore.release()
}

Expand Down Expand Up @@ -267,7 +263,7 @@ class UploadTask(
return false
}

private fun manageApiResponse(response: Response, okHttpClient: OkHttpClient) {
private fun manageApiResponse(response: Response) {
response.use {
val bodyResponse = it.body?.string()
Log.i("UploadTask", "response successful ${it.isSuccessful}")
Expand All @@ -278,7 +274,7 @@ class UploadTask(
} catch (e: Exception) {
null
}
apiResponse.manageUploadErrors(okHttpClient)
apiResponse.manageUploadErrors()
}
}
}
Expand Down Expand Up @@ -357,11 +353,11 @@ class UploadTask(
}
}

private fun UploadFile.getValidChunks(okHttpClient: OkHttpClient): ValidChunks? {
private fun UploadFile.getValidChunks(): ValidChunks? {
return uploadToken?.let { ApiRepository.getValidChunks(uploadFile.driveId, it, okHttpClient).data }
}

private fun UploadFile.prepareUploadSession(totalChunks: Int, okHttpClient: OkHttpClient) {
private fun UploadFile.prepareUploadSession(totalChunks: Int) {
val sessionBody = UploadSession.StartSessionBody(
conflict = if (replaceOnConflict()) ConflictOption.VERSION else ConflictOption.RENAME,
createdAt = if (fileCreatedAt == null) null else fileCreatedAt!!.time / 1000,
Expand All @@ -375,11 +371,11 @@ class UploadTask(

with(ApiRepository.startUploadSession(driveId, sessionBody, okHttpClient)) {
if (isSuccess()) data?.token?.let { uploadFile.updateUploadToken(it) }
else manageUploadErrors(okHttpClient)
else manageUploadErrors()
}
}

private fun <T> ApiResponse<T>?.manageUploadErrors(okHttpClient: OkHttpClient) {
private fun <T> ApiResponse<T>?.manageUploadErrors() {
if (this?.translatedError == R.string.connectionError) throw NetworkException()
when (this?.error?.code) {
"file_already_exists_error" -> Unit
Expand All @@ -391,9 +387,10 @@ class UploadTask(
// Upload finish with 0 chunks uploaded
// Upload finish with a different expected number of chunks
uploadFile.uploadToken?.let {
ApiRepository.cancelSession(uploadFile.driveId, it, okHttpClient)
with(ApiRepository.cancelSession(uploadFile.driveId, it, uploadFile.okHttpClient)) {
if (data == true) uploadFile.resetUploadToken()
}
}
uploadFile.resetUploadToken()
throw UploadNotTerminated("Upload finish with 0 chunks uploaded or a different expected number of chunks")
}
"upload_error" -> {
Expand Down
78 changes: 58 additions & 20 deletions app/src/main/java/com/infomaniak/drive/data/models/UploadFile.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,20 @@ import android.provider.DocumentsContract
import android.provider.MediaStore
import androidx.core.net.toFile
import androidx.core.net.toUri
import com.infomaniak.drive.data.api.ApiRepository
import com.infomaniak.drive.data.api.UploadTask
import com.infomaniak.drive.data.cache.DriveInfosController
import com.infomaniak.drive.data.sync.UploadMigration
import com.infomaniak.drive.utils.AccountUtils
import com.infomaniak.drive.utils.KDriveHttpClient
import com.infomaniak.drive.utils.RealmModules
import com.infomaniak.lib.core.R
import com.infomaniak.lib.core.utils.format
import io.realm.*
import io.realm.annotations.Ignore
import io.realm.annotations.PrimaryKey
import io.realm.kotlin.oneOf
import okhttp3.OkHttpClient
import java.io.File
import java.util.*

Expand All @@ -52,6 +58,14 @@ open class UploadFile(
var userId: Int = -1
) : RealmObject() {

@Ignore
lateinit var okHttpClient: OkHttpClient
private set

suspend fun initOkHttpClient() {
okHttpClient = KDriveHttpClient.getHttpClient(userId = userId, timeout = 120)
}

fun createSubFolder(parent: String, createDatedSubFolders: Boolean) {
remoteSubFolder = parent + if (createDatedSubFolders) "/${fileModifiedAt.format("yyyy/MM")}" else ""
}
Expand Down Expand Up @@ -109,6 +123,25 @@ open class UploadFile(
uploadToken = newUploadToken
}

fun deleteIfExists(keepFile: Boolean = false) {
getRealmInstance().use { realm ->
syncFileByUriQuery(realm, uri).findFirst()?.let { uploadFileProxy ->
// Cancel session if exists
uploadFileProxy.uploadToken?.let {
with(ApiRepository.cancelSession(uploadFileProxy.driveId, it, okHttpClient)) {
if (translatedError == R.string.connectionError) throw UploadTask.NetworkException()
}
}
// Delete in realm
realm.executeTransaction {
if (uploadFileProxy.isValid) {
if (keepFile) uploadFileProxy.deletedAt = Date() else uploadFileProxy.deleteFromRealm()
}
}
}
}
}

enum class Type {
SYNC, UPLOAD, SHARED_FILE, SYNC_OFFLINE, CLOUD_STORAGE
}
Expand All @@ -122,10 +155,12 @@ open class UploadFile(
.migration(UploadMigration())
.build()

private inline val Realm.uploadTable get() = where(UploadFile::class.java)

fun getRealmInstance(): Realm = Realm.getInstance(realmConfiguration)

private fun syncFileByUriQuery(realm: Realm, uri: String): RealmQuery<UploadFile> {
return realm.where(UploadFile::class.java).equalTo(UploadFile::uri.name, uri)
return realm.uploadTable.equalTo(UploadFile::uri.name, uri)
}

private fun pendingUploadsQuery(
Expand All @@ -134,7 +169,7 @@ open class UploadFile(
onlyCurrentUser: Boolean = false,
driveIds: Array<Int>? = null
): RealmQuery<UploadFile> {
return realm.where(UploadFile::class.java).apply {
return realm.uploadTable.apply {
folderId?.let { equalTo(UploadFile::remoteFolder.name, it) }
if (onlyCurrentUser) equalTo(UploadFile::userId.name, AccountUtils.currentUserId)
driveIds?.let { oneOf(UploadFile::driveId.name, it) }
Expand Down Expand Up @@ -196,7 +231,7 @@ open class UploadFile(
}

fun getAllUploadedFiles(type: String = Type.SYNC.name): ArrayList<UploadFile>? = getRealmInstance().use { realm ->
realm.where(UploadFile::class.java)
realm.uploadTable
.equalTo(UploadFile::type.name, type)
.isNull(UploadFile::deletedAt.name)
.isNotNull(UploadFile::uploadAt.name)
Expand Down Expand Up @@ -235,19 +270,7 @@ open class UploadFile(
}
}

fun deleteIfExists(uri: Uri, keepFile: Boolean = false) {
getRealmInstance().use { realm ->
syncFileByUriQuery(realm, uri.toString()).findFirst()?.let { syncFile ->
realm.executeTransaction {
if (syncFile.isValid) {
if (keepFile) syncFile.deletedAt = Date() else syncFile.deleteFromRealm()
}
}
}
}
}

fun deleteAll(uploadFiles: ArrayList<UploadFile>) {
fun deleteAll(uploadFiles: List<UploadFile>) {
getRealmInstance().use {
it.executeTransaction { realm ->
uploadFiles.forEach { uploadFile ->
Expand All @@ -269,19 +292,34 @@ open class UploadFile(
}
}

suspend fun cancelAllPendingFilesSessions(folderId: Int) {
getRealmInstance().use { realm ->
realm.uploadTable
.equalTo(UploadFile::remoteFolder.name, folderId)
.isNull(UploadFile::uploadAt.name)
.isNotNull(UploadFile::uploadToken.name)
.findAll()?.onEach { uploadFileProxy ->
with(uploadFileProxy) {
initOkHttpClient()
ApiRepository.cancelSession(driveId, uploadToken!!, okHttpClient)
}
}
}
}

fun deleteAll(folderId: Int?, permanently: Boolean = false) {
getRealmInstance().use {
it.executeTransaction { realm ->
// Delete all data files for all uploads with scheme FILE
realm.where(UploadFile::class.java)
realm.uploadTable
.apply { folderId?.let { equalTo(UploadFile::remoteFolder.name, folderId) } }
.beginsWith(UploadFile::uri.name, ContentResolver.SCHEME_FILE)
.findAll().forEach { uploadFile ->
if (!uploadFile.isSyncOffline()) uploadFile.getUriObject().toFile().apply { if (exists()) delete() }
}

if (permanently) {
realm.where(UploadFile::class.java)
realm.uploadTable
.apply { folderId?.let { equalTo(UploadFile::remoteFolder.name, folderId) } }
.isNull(UploadFile::uploadAt.name)
.findAll().deleteAllFromRealm()
Expand All @@ -293,7 +331,7 @@ open class UploadFile(
.findAll().forEach { uploadFile -> uploadFile.deletedAt = Date() }

// Delete all uploads without type SYNC
realm.where(UploadFile::class.java)
realm.uploadTable
.apply { folderId?.let { equalTo(UploadFile::remoteFolder.name, folderId) } }
.notEqualTo(UploadFile::type.name, Type.SYNC.name)
.findAll().deleteAllFromRealm()
Expand All @@ -305,7 +343,7 @@ open class UploadFile(
fun deleteAllSyncFile() {
getRealmInstance().use { realm ->
realm.executeTransaction {
it.where(UploadFile::class.java)
it.uploadTable
.equalTo(UploadFile::type.name, Type.SYNC.name)
.findAll()?.deleteAllFromRealm()
}
Expand Down
Loading

0 comments on commit b2a1510

Please sign in to comment.