Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Closes #9033 improve resume downloads in slow networks
Browse files Browse the repository at this point in the history
  • Loading branch information
Amejia481 committed Nov 30, 2020
1 parent 4f91495 commit fa9caeb
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,8 @@ abstract class AbstractFetchDownloadService : Service() {
}
}

@Suppress("ComplexCondition")
internal fun performDownload(currentDownloadJobState: DownloadJobState) {
@Suppress("ComplexCondition", "ComplexMethod")
internal fun performDownload(currentDownloadJobState: DownloadJobState, useHttpClient: Boolean = false) {
val download = currentDownloadJobState.state
val isResumingDownload = currentDownloadJobState.currentBytesCopied > 0L
val headers = MutableHeaders()
Expand All @@ -613,13 +613,15 @@ abstract class AbstractFetchDownloadService : Service() {
Request.CookiePolicy.INCLUDE
}

var isUsingHttpClient = false
val request = Request(download.url.sanitizeURL(), headers = headers, cookiePolicy = cookiePolicy)
// When resuming a download we need to use the httpClient as
// download.response doesn't support adding headers.
val response = if (isResumingDownload) {
val response = if (isResumingDownload || useHttpClient || download.response == null) {
isUsingHttpClient = true
httpClient.fetch(request)
} else {
download.response ?: httpClient.fetch(request)
requireNotNull(download.response)
}
logger.debug("Fetching download for ${currentDownloadJobState.state.id} ")

Expand All @@ -636,14 +638,17 @@ abstract class AbstractFetchDownloadService : Service() {
}

response.body.useStream { inStream ->
var copyInChuckStatus: CopyInChuckStatus? = null
val newDownloadState = download.withResponse(response.headers, inStream)
currentDownloadJobState.state = newDownloadState

useFileStream(newDownloadState, isResumingDownload) { outStream ->
copyInChunks(currentDownloadJobState, inStream, outStream)
copyInChuckStatus = copyInChunks(currentDownloadJobState, inStream, outStream, isUsingHttpClient)
}

verifyDownload(currentDownloadJobState)
if (copyInChuckStatus != CopyInChuckStatus.ERROR_IN_STREAM_CLOSED) {
verifyDownload(currentDownloadJobState)
}
}
}

Expand Down Expand Up @@ -672,7 +677,18 @@ abstract class AbstractFetchDownloadService : Service() {
}

@VisibleForTesting
internal fun copyInChunks(downloadJobState: DownloadJobState, inStream: InputStream, outStream: OutputStream) {
internal enum class CopyInChuckStatus {
COMPLETED, ERROR_IN_STREAM_CLOSED
}

@VisibleForTesting
@Suppress("MaxLineLength")
internal fun copyInChunks(
downloadJobState: DownloadJobState,
inStream: InputStream,
outStream: OutputStream,
downloadWithHttpClient: Boolean = false
): CopyInChuckStatus {
val data = ByteArray(CHUNK_SIZE)
logger.debug("starting copyInChunks ${downloadJobState.state.id}" +
" currentBytesCopied ${downloadJobState.state.currentBytesCopied}")
Expand All @@ -685,10 +701,19 @@ abstract class AbstractFetchDownloadService : Service() {
updateDownloadState(newState)
}

var isInStreamClosed = false
// To ensure that we copy all files (even ones that don't have fileSize, we must NOT check < fileSize
while (getDownloadJobStatus(downloadJobState) == DOWNLOADING) {
val bytesRead = inStream.read(data)

var bytesRead: Int
try {
bytesRead = inStream.read(data)
} catch (e: IOException) {
if (downloadWithHttpClient) {
throw e
}
isInStreamClosed = true
break
}
// If bytesRead is -1, there's no data left to read from the stream
if (bytesRead == -1) { break }
downloadJobState.currentBytesCopied += bytesRead
Expand All @@ -697,10 +722,20 @@ abstract class AbstractFetchDownloadService : Service() {

outStream.write(data, 0, bytesRead)
}
if (isInStreamClosed) {
// In cases where [download.response] is available and users with slow
// networks start a download but quickly press pause and then resume
// [isResumingDownload] will be false as there will be not enough time
// for bytes to be copied, but the stream in [download.response] will be closed,
// we have to fallback to [httpClient]
performDownload(downloadJobState, useHttpClient = true)
return CopyInChuckStatus.ERROR_IN_STREAM_CLOSED
}
logger.debug(
"Finishing copyInChunks ${downloadJobState.state.id} " +
"currentBytesCopied ${downloadJobState.currentBytesCopied}"
)
return CopyInChuckStatus.COMPLETED
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import mozilla.components.feature.downloads.AbstractFetchDownloadService.Compani
import mozilla.components.feature.downloads.AbstractFetchDownloadService.Companion.ACTION_RESUME
import mozilla.components.feature.downloads.AbstractFetchDownloadService.Companion.ACTION_TRY_AGAIN
import mozilla.components.feature.downloads.AbstractFetchDownloadService.Companion.PROGRESS_UPDATE_INTERVAL
import mozilla.components.feature.downloads.AbstractFetchDownloadService.CopyInChuckStatus.ERROR_IN_STREAM_CLOSED
import mozilla.components.feature.downloads.AbstractFetchDownloadService.DownloadJobState
import mozilla.components.feature.downloads.DownloadNotification.NOTIFICATION_DOWNLOAD_GROUP_ID
import mozilla.components.feature.downloads.facts.DownloadsFacts.Items.NOTIFICATION
Expand Down Expand Up @@ -72,6 +73,7 @@ import org.mockito.ArgumentMatchers.anyLong
import org.mockito.ArgumentMatchers.isNull
import org.mockito.Mock
import org.mockito.Mockito.atLeastOnce
import org.mockito.Mockito.doAnswer
import org.mockito.Mockito.doCallRealMethod
import org.mockito.Mockito.doNothing
import org.mockito.Mockito.doReturn
Expand Down Expand Up @@ -146,7 +148,7 @@ class AbstractFetchDownloadServiceTest {
service.downloadJobs.values.forEach { it.job?.join() }

val providedDownload = argumentCaptor<DownloadJobState>()
verify(service).performDownload(providedDownload.capture())
verify(service).performDownload(providedDownload.capture(), anyBoolean())

assertEquals(download.url, providedDownload.value.state.url)
assertEquals(download.fileName, providedDownload.value.state.fileName)
Expand Down Expand Up @@ -333,7 +335,7 @@ class AbstractFetchDownloadServiceTest {
service.downloadJobs.values.forEach { it.job?.join() }

val providedDownload = argumentCaptor<DownloadJobState>()
verify(service).performDownload(providedDownload.capture())
verify(service).performDownload(providedDownload.capture(), anyBoolean())

val pauseIntent = Intent(ACTION_PAUSE).apply {
setPackage(testContext.applicationContext.packageName)
Expand Down Expand Up @@ -372,7 +374,7 @@ class AbstractFetchDownloadServiceTest {
service.downloadJobs.values.forEach { it.job?.join() }

val providedDownload = argumentCaptor<DownloadJobState>()
verify(service).performDownload(providedDownload.capture())
verify(service).performDownload(providedDownload.capture(), anyBoolean())

val cancelIntent = Intent(ACTION_CANCEL).apply {
setPackage(testContext.applicationContext.packageName)
Expand Down Expand Up @@ -419,7 +421,7 @@ class AbstractFetchDownloadServiceTest {
service.downloadJobs.values.forEach { it.job?.join() }

val providedDownload = argumentCaptor<DownloadJobState>()
verify(service).performDownload(providedDownload.capture())
verify(service).performDownload(providedDownload.capture(), anyBoolean())

// Simulate a pause
var downloadJobState = service.downloadJobs[providedDownload.value.state.id]!!
Expand Down Expand Up @@ -472,7 +474,7 @@ class AbstractFetchDownloadServiceTest {
service.downloadJobs.values.forEach { it.job?.join() }

val providedDownload = argumentCaptor<DownloadJobState>()
verify(service).performDownload(providedDownload.capture())
verify(service).performDownload(providedDownload.capture(), anyBoolean())
service.downloadJobs[providedDownload.value.state.id]?.job?.join()

// Simulate a failure
Expand Down Expand Up @@ -523,7 +525,7 @@ class AbstractFetchDownloadServiceTest {
service.downloadJobs.values.forEach { it.job?.join() }

val providedDownload = argumentCaptor<DownloadJobState>()
verify(service).performDownload(providedDownload.capture())
verify(service).performDownload(providedDownload.capture(), anyBoolean())

service.downloadJobs[providedDownload.value.state.id]?.job?.join()
val downloadJobState = service.downloadJobs[providedDownload.value.state.id]!!
Expand Down Expand Up @@ -577,7 +579,7 @@ class AbstractFetchDownloadServiceTest {
service.downloadJobs.values.forEach { it.job?.join() }

val providedDownload = argumentCaptor<DownloadJobState>()
verify(service).performDownload(providedDownload.capture())
verify(service).performDownload(providedDownload.capture(), anyBoolean())

service.downloadJobs[providedDownload.value.state.id]?.job?.join()
val downloadJobState = service.downloadJobs[providedDownload.value.state.id]!!
Expand All @@ -597,7 +599,7 @@ class AbstractFetchDownloadServiceTest {
val downloadIntent = Intent("ACTION_DOWNLOAD")
downloadIntent.putExtra(EXTRA_DOWNLOAD_ID, download.id)

doNothing().`when`(service).performDownload(any())
doNothing().`when`(service).performDownload(any(), anyBoolean())

browserStore.dispatch(DownloadAction.AddDownloadAction(download)).joinBlocking()
service.onStartCommand(downloadIntent, 0, 0)
Expand Down Expand Up @@ -628,7 +630,7 @@ class AbstractFetchDownloadServiceTest {
val downloadIntent = Intent("ACTION_DOWNLOAD")
downloadIntent.putExtra(EXTRA_DOWNLOAD_ID, download.id)

doNothing().`when`(service).performDownload(any())
doNothing().`when`(service).performDownload(any(), anyBoolean())

browserStore.dispatch(DownloadAction.AddDownloadAction(download)).joinBlocking()
service.onStartCommand(downloadIntent, 0, 0)
Expand Down Expand Up @@ -708,7 +710,7 @@ class AbstractFetchDownloadServiceTest {
val downloadIntent = Intent("ACTION_DOWNLOAD")
downloadIntent.putExtra(EXTRA_DOWNLOAD_ID, download.id)

doNothing().`when`(service).performDownload(any())
doNothing().`when`(service).performDownload(any(), anyBoolean())

service.onStartCommand(downloadIntent, 0, 0)

Expand All @@ -723,7 +725,7 @@ class AbstractFetchDownloadServiceTest {
val downloadIntent = Intent("ACTION_DOWNLOAD")
downloadIntent.putExtra(EXTRA_DOWNLOAD_ID, download.id)

doNothing().`when`(service).performDownload(any())
doNothing().`when`(service).performDownload(any(), anyBoolean())

browserStore.dispatch(DownloadAction.AddDownloadAction(download)).joinBlocking()
service.onStartCommand(downloadIntent, 0, 0)
Expand Down Expand Up @@ -932,7 +934,7 @@ class AbstractFetchDownloadServiceTest {
service.downloadJobs.values.forEach { it.job?.join() }

val providedDownload = argumentCaptor<DownloadJobState>()
verify(service).performDownload(providedDownload.capture())
verify(service).performDownload(providedDownload.capture(), anyBoolean())

service.downloadJobs[providedDownload.value.state.id]?.job?.join()
val downloadJobState = service.downloadJobs[providedDownload.value.state.id]!!
Expand Down Expand Up @@ -964,7 +966,7 @@ class AbstractFetchDownloadServiceTest {
service.downloadJobs.values.forEach { it.job?.join() }

val providedDownload = argumentCaptor<DownloadJobState>()
verify(service).performDownload(providedDownload.capture())
verify(service).performDownload(providedDownload.capture(), anyBoolean())

service.downloadJobs[providedDownload.value.state.id]?.job?.join()
val downloadJobState = service.downloadJobs[providedDownload.value.state.id]!!
Expand Down Expand Up @@ -995,7 +997,7 @@ class AbstractFetchDownloadServiceTest {
service.downloadJobs.values.forEach { it.job?.join() }

val providedDownload = argumentCaptor<DownloadJobState>()
verify(service).performDownload(providedDownload.capture())
verify(service).performDownload(providedDownload.capture(), anyBoolean())

service.downloadJobs[providedDownload.value.state.id]?.job?.join()
val downloadJobState = service.downloadJobs[providedDownload.value.state.id]!!
Expand Down Expand Up @@ -1027,7 +1029,7 @@ class AbstractFetchDownloadServiceTest {
service.downloadJobs.values.forEach { it.job?.join() }

val providedDownload = argumentCaptor<DownloadJobState>()
verify(service).performDownload(providedDownload.capture())
verify(service).performDownload(providedDownload.capture(), anyBoolean())

service.downloadJobs[providedDownload.value.state.id]?.job?.join()
val downloadJobState = service.downloadJobs[providedDownload.value.state.id]!!
Expand All @@ -1053,7 +1055,7 @@ class AbstractFetchDownloadServiceTest {
service.downloadJobs.values.forEach { it.job?.join() }

val providedDownload = argumentCaptor<DownloadJobState>()
verify(service).performDownload(providedDownload.capture())
verify(service).performDownload(providedDownload.capture(), anyBoolean())

val downloadJobState = service.downloadJobs[providedDownload.value.state.id]!!
assertEquals(FAILED, service.getDownloadJobStatus(downloadJobState))
Expand Down Expand Up @@ -1160,7 +1162,7 @@ class AbstractFetchDownloadServiceTest {
service.downloadJobs.values.forEach { assertTrue(it.job!!.isActive) }

val providedDownload = argumentCaptor<DownloadJobState>()
verify(service).performDownload(providedDownload.capture())
verify(service).performDownload(providedDownload.capture(), anyBoolean())

// Advance the clock so that the puller posts a notification.
testDispatcher.advanceTimeBy(PROGRESS_UPDATE_INTERVAL)
Expand Down Expand Up @@ -1219,7 +1221,7 @@ class AbstractFetchDownloadServiceTest {
service.downloadJobs.values.forEach { it.job?.join() }

val providedDownload = argumentCaptor<DownloadJobState>()
verify(service).performDownload(providedDownload.capture())
verify(service).performDownload(providedDownload.capture(), anyBoolean())

service.setDownloadJobStatus(service.downloadJobs[download.id]!!, DownloadState.Status.PAUSED)

Expand Down Expand Up @@ -1502,7 +1504,7 @@ class AbstractFetchDownloadServiceTest {

val providedDownload = argumentCaptor<DownloadJobState>()

verify(service).performDownload(providedDownload.capture())
verify(service).performDownload(providedDownload.capture(), anyBoolean())
service.downloadJobs[providedDownload.value.state.id]?.job?.join()

val cancelledDownloadJobState = service.downloadJobs[providedDownload.value.state.id]!!
Expand All @@ -1521,7 +1523,7 @@ class AbstractFetchDownloadServiceTest {
browserStore.dispatch(DownloadAction.AddDownloadAction(download)).joinBlocking()
service.onStartCommand(downloadIntent, 0, 0)
service.downloadJobs.values.forEach { it.job?.join() }
verify(service, times(2)).performDownload(providedDownload.capture())
verify(service, times(2)).performDownload(providedDownload.capture(), anyBoolean())
service.downloadJobs[providedDownload.value.state.id]?.job?.join()

val downloadJobState = service.downloadJobs[providedDownload.value.state.id]!!
Expand Down Expand Up @@ -1619,4 +1621,61 @@ class AbstractFetchDownloadServiceTest {

assertEquals(15, downloadJobState.currentBytesCopied)
}

@Test
fun `copyInChunks - must return ERROR_IN_STREAM_CLOSED when inStream is closed`() = runBlocking {
val downloadJobState = DownloadJobState(state = mock(), status = DOWNLOADING)
val inputStream = mock<InputStream>()

assertEquals(0, downloadJobState.currentBytesCopied)

doAnswer { throw IOException() }.`when`(inputStream).read(any())
doNothing().`when`(service).updateDownloadState(any())
doNothing().`when`(service).performDownload(any(), anyBoolean())

val status = service.copyInChunks(downloadJobState, inputStream, mock())

verify(service).performDownload(downloadJobState, true)
assertEquals(ERROR_IN_STREAM_CLOSED, status)
}

@Test
fun `copyInChunks - must throw when inStream is closed and download was performed using http client`() = runBlocking {
val downloadJobState = DownloadJobState(state = mock(), status = DOWNLOADING)
val inputStream = mock<InputStream>()
var exceptionWasThrown = false

assertEquals(0, downloadJobState.currentBytesCopied)

doAnswer { throw IOException() }.`when`(inputStream).read(any())
doNothing().`when`(service).updateDownloadState(any())
doNothing().`when`(service).performDownload(any(), anyBoolean())

try {
service.copyInChunks(downloadJobState, inputStream, mock(), true)
} catch (e: IOException) {
exceptionWasThrown = true
}

verify(service, times(0)).performDownload(downloadJobState, true)
assertTrue(exceptionWasThrown)
}

@Test
fun `copyInChunks - must return COMPLETED when finish copying bytes`() = runBlocking {
val downloadJobState = DownloadJobState(state = mock(), status = DOWNLOADING)
val inputStream = mock<InputStream>()

assertEquals(0, downloadJobState.currentBytesCopied)

doReturn(15, -1).`when`(inputStream).read(any())
doNothing().`when`(service).updateDownloadState(any())

val status = service.copyInChunks(downloadJobState, inputStream, mock())

verify(service, never()).performDownload(any(), anyBoolean())

assertEquals(15, downloadJobState.currentBytesCopied)
assertEquals(AbstractFetchDownloadService.CopyInChuckStatus.COMPLETED, status)
}
}
3 changes: 3 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ permalink: /changelog/
* **browser-engine-gecko**, **browser-engine-gecko-beta**, **browser-engine-gecko-nightly**
* 🚒 Bug fixed [issue #8464](https://github.com/mozilla-mobile/android-components/issues/8464) - Crash when confirming a prompt that was already confirmed

* **feature-downloads**
* 🚒 Bug fixed [issue #9033](https://github.com/mozilla-mobile/android-components/issues/9033) - Fix resuming downloads in slow networks more details see the [Fenix issue](https://github.com/mozilla-mobile/fenix/issues/9354#issuecomment-731267368).

* **feature-app-links**
* Added handling of PackageItemInfo.packageName NullPointerException on some Xiaomi and TCL devices

Expand Down

0 comments on commit fa9caeb

Please sign in to comment.