Skip to content

Commit

Permalink
@uppy/tus: pause all requests in response to server rate limiting (#3394
Browse files Browse the repository at this point in the history
)

* @uppy/tus: pause all requests in response to server rate limiting

When the remote server responds with HTTP 429, all requests are
paused for a while in the hope that it can resolve the rate limiting.
Failed requests are also now queued up after the retry delay. Before
that, they were simply scheduled which would sometimes end up
overflowing the `limit` option.

* Address review comments

* fix requests bypassing queue pause state

* Auto rate limiting

* fix `RateLimitedQueue`
  • Loading branch information
aduh95 authored Jan 10, 2022
1 parent fa140ea commit a08ec4e
Show file tree
Hide file tree
Showing 2 changed files with 142 additions and 24 deletions.
89 changes: 67 additions & 22 deletions packages/@uppy/tus/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ const tusDefaultOptions = {
addRequestId: false,

chunkSize: Infinity,
retryDelays: [0, 1000, 3000, 5000],
retryDelays: [100, 1000, 3000, 5000],
parallelUploads: 1,
removeFingerprintOnSuccess: false,
uploadLengthDeferred: false,
Expand All @@ -51,8 +51,11 @@ const tusDefaultOptions = {
* Tus resumable file uploader
*/
module.exports = class Tus extends BasePlugin {
// eslint-disable-next-line global-require
static VERSION = require('../package.json').version

#retryDelayIterator

/**
* @param {Uppy} uppy
* @param {TusOptions} opts
Expand All @@ -66,8 +69,8 @@ module.exports = class Tus extends BasePlugin {
// set default options
const defaultOptions = {
useFastRemoteRetry: true,
limit: 5,
retryDelays: [0, 1000, 3000, 5000],
limit: 20,
retryDelays: tusDefaultOptions.retryDelays,
withCredentials: false,
}

Expand All @@ -85,6 +88,7 @@ module.exports = class Tus extends BasePlugin {
* @type {RateLimitedQueue}
*/
this.requests = new RateLimitedQueue(this.opts.limit)
this.#retryDelayIterator = this.opts.retryDelays?.values()

this.uploaders = Object.create(null)
this.uploaderEvents = Object.create(null)
Expand Down Expand Up @@ -178,6 +182,9 @@ module.exports = class Tus extends BasePlugin {

// Create a new tus upload
return new Promise((resolve, reject) => {
let queuedRequest
let qRequest

this.uppy.emit('upload-started', file)

const opts = {
Expand Down Expand Up @@ -219,7 +226,7 @@ module.exports = class Tus extends BasePlugin {
}

this.resetUploaderReferences(file.id)
queuedRequest.done()
queuedRequest.abort()

this.uppy.emit('upload-error', file, err)

Expand Down Expand Up @@ -252,6 +259,46 @@ module.exports = class Tus extends BasePlugin {
resolve(upload)
}

uploadOptions.onShouldRetry = (err, retryAttempt, options) => {
const status = err?.originalResponse?.getStatus()
if (status === 429) {
// HTTP 429 Too Many Requests => to avoid the whole download to fail, pause all requests.
if (!this.requests.isPaused) {
const next = this.#retryDelayIterator?.next()
if (next == null || next.done) {
return false
}
this.requests.rateLimit(next.value)
}
queuedRequest.abort()
queuedRequest = this.requests.run(qRequest)
} else if (status > 400 && status < 500 && status !== 409) {
// HTTP 4xx, the server won't send anything, it's doesn't make sense to retry
return false
} else if (typeof navigator !== 'undefined' && navigator.onLine === false) {
// The navigator is offline, let's wait for it to come back online.
if (!this.requests.isPaused) {
this.requests.pause()
window.addEventListener('online', () => {
this.requests.resume()
}, { once: true })
}
queuedRequest.abort()
queuedRequest = this.requests.run(qRequest)
} else {
// For a non-4xx error, we can re-queue the request.
setTimeout(() => {
queuedRequest.abort()
queuedRequest = this.requests.run(qRequest)
}, options.retryDelays[retryAttempt])
}
// Aborting the timeout set by tus-js-client to not short-circuit the rate limiting.
// eslint-disable-next-line no-underscore-dangle
queueMicrotask(() => clearTimeout(queuedRequest._retryTimeout))
// We need to return true here so tus-js-client increments the retryAttempt and do not emit an error event.
return true
}

const copyProp = (obj, srcProp, destProp) => {
if (hasProperty(obj, srcProp) && !hasProperty(obj, destProp)) {
obj[destProp] = obj[srcProp]
Expand All @@ -278,15 +325,7 @@ module.exports = class Tus extends BasePlugin {
this.uploaders[file.id] = upload
this.uploaderEvents[file.id] = new EventTracker(this.uppy)

upload.findPreviousUploads().then((previousUploads) => {
const previousUpload = previousUploads[0]
if (previousUpload) {
this.uppy.log(`[Tus] Resuming upload of ${file.id} started at ${previousUpload.creationTime}`)
upload.resumeFromPreviousUpload(previousUpload)
}
})

let queuedRequest = this.requests.run(() => {
qRequest = () => {
if (!file.isPaused) {
upload.start()
}
Expand All @@ -297,8 +336,18 @@ module.exports = class Tus extends BasePlugin {
// Also, we need to remove the request from the queue _without_ destroying everything
// related to this upload to handle pauses.
return () => {}
}

upload.findPreviousUploads().then((previousUploads) => {
const previousUpload = previousUploads[0]
if (previousUpload) {
this.uppy.log(`[Tus] Resuming upload of ${file.id} started at ${previousUpload.creationTime}`)
upload.resumeFromPreviousUpload(previousUpload)
}
})

queuedRequest = this.requests.run(qRequest)

this.onFileRemove(file.id, (targetFileID) => {
queuedRequest.abort()
this.resetUploaderReferences(file.id, { abort: !!upload.url })
Expand All @@ -314,10 +363,7 @@ module.exports = class Tus extends BasePlugin {
// Resuming an upload should be queued, else you could pause and then
// resume a queued upload to make it skip the queue.
queuedRequest.abort()
queuedRequest = this.requests.run(() => {
upload.start()
return () => {}
})
queuedRequest = this.requests.run(qRequest)
}
})

Expand All @@ -337,10 +383,7 @@ module.exports = class Tus extends BasePlugin {
if (file.error) {
upload.abort()
}
queuedRequest = this.requests.run(() => {
upload.start()
return () => {}
})
queuedRequest = this.requests.run(qRequest)
})
}).catch((err) => {
this.uppy.emit('upload-error', file, err)
Expand Down Expand Up @@ -412,6 +455,8 @@ module.exports = class Tus extends BasePlugin {
this.uploaderSockets[file.id] = socket
this.uploaderEvents[file.id] = new EventTracker(this.uppy)

let queuedRequest

this.onFileRemove(file.id, () => {
queuedRequest.abort()
socket.send('cancel', {})
Expand Down Expand Up @@ -512,7 +557,7 @@ module.exports = class Tus extends BasePlugin {
resolve()
})

let queuedRequest = this.requests.run(() => {
queuedRequest = this.requests.run(() => {
socket.open()
if (file.isPaused) {
socket.send('pause', {})
Expand Down
77 changes: 75 additions & 2 deletions packages/@uppy/utils/src/RateLimitedQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ class RateLimitedQueue {

#queuedHandlers = []

#paused = false

#pauseTimer

#downLimit = 1

#upperLimit

#rateLimitingTimer

constructor (limit) {
if (typeof limit !== 'number' || limit === 0) {
this.limit = Infinity
Expand Down Expand Up @@ -54,7 +64,7 @@ class RateLimitedQueue {
}

#next () {
if (this.#activeRequests >= this.limit) {
if (this.#paused || this.#activeRequests >= this.limit) {
return
}
if (this.#queuedHandlers.length === 0) {
Expand Down Expand Up @@ -101,7 +111,7 @@ class RateLimitedQueue {
}

run (fn, queueOptions) {
if (this.#activeRequests < this.limit) {
if (!this.#paused && this.#activeRequests < this.limit) {
return this.#call(fn)
}
return this.#queue(fn, queueOptions)
Expand Down Expand Up @@ -149,6 +159,69 @@ class RateLimitedQueue {
return outerPromise
}
}

resume () {
this.#paused = false
clearTimeout(this.#pauseTimer)
for (let i = 0; i < this.limit; i++) {
this.#queueNext()
}
}

#resume = () => this.resume()

/**
* Freezes the queue for a while or indefinitely.
*
* @param {number | null } [duration] Duration for the pause to happen, in milliseconds.
* If omitted, the queue won't resume automatically.
*/
pause (duration = null) {
this.#paused = true
clearTimeout(this.#pauseTimer)
if (duration != null) {
this.#pauseTimer = setTimeout(this.#resume, duration)
}
}

/**
* Pauses the queue for a duration, and lower the limit of concurrent requests
* when the queue resumes. When the queue resumes, it tries to progressively
* increase the limit in `this.#increaseLimit` until another call is made to
* `this.rateLimit`.
* Call this function when using the RateLimitedQueue for network requests and
* the remote server responds with 429 HTTP code.
*
* @param {number} duration in milliseconds.
*/
rateLimit (duration) {
clearTimeout(this.#rateLimitingTimer)
this.pause(duration)
if (this.limit > 1 && Number.isFinite(this.limit)) {
this.#upperLimit = this.limit - 1
this.limit = this.#downLimit
this.#rateLimitingTimer = setTimeout(this.#increaseLimit, duration)
}
}

#increaseLimit = () => {
if (this.#paused) {
this.#rateLimitingTimer = setTimeout(this.#increaseLimit, 0)
return
}
this.#downLimit = this.limit
this.limit = Math.ceil((this.#upperLimit + this.#downLimit) / 2)
for (let i = this.#downLimit; i <= this.limit; i++) {
this.#queueNext()
}
if (this.#upperLimit - this.#downLimit > 3) {
this.#rateLimitingTimer = setTimeout(this.#increaseLimit, 2000)
} else {
this.#downLimit = Math.floor(this.#downLimit / 2)
}
}

get isPaused () { return this.#paused }
}

module.exports = {
Expand Down

0 comments on commit a08ec4e

Please sign in to comment.