diff --git a/dev/src/bulk-writer.ts b/dev/src/bulk-writer.ts index d4d2ac598..1ddad577d 100644 --- a/dev/src/bulk-writer.ts +++ b/dev/src/bulk-writer.ts @@ -684,7 +684,7 @@ export class BulkWriter { */ flush(): Promise { this._verifyNotClosed(); - this._sendCurrentBatch(/* flush= */ true); + this._scheduleCurrentBatch(/* flush= */ true); return this._lastOp; } @@ -738,50 +738,52 @@ export class BulkWriter { * `flush()` or `close()` call. * @private */ - private _sendCurrentBatch(flush = false): void { + private _scheduleCurrentBatch(flush = false): void { if (this._bulkCommitBatch._opCount === 0) return; - const tag = requestTag(); const pendingBatch = this._bulkCommitBatch; + this._bulkCommitBatch = new BulkCommitBatch(this.firestore); // Use the write with the longest backoff duration when determining backoff. const highestBackoffDuration = pendingBatch.pendingOps.reduce((prev, cur) => prev.backoffDuration > cur.backoffDuration ? prev : cur ).backoffDuration; - const backoffMsWithJitter = BulkWriter.applyJitter(highestBackoffDuration); - const delayMs = this._rateLimiter.getNextRequestDelayMs( - pendingBatch._opCount - ); - const finalDelayMs = Math.max(backoffMsWithJitter, delayMs); - + const backoffMsWithJitter = BulkWriter._applyJitter(highestBackoffDuration); const delayedExecution = new Deferred(); - this._bulkCommitBatch = new BulkCommitBatch(this.firestore); + + if (backoffMsWithJitter > 0) { + delayExecution(() => delayedExecution.resolve(), backoffMsWithJitter); + } else { + delayedExecution.resolve(); + } + + delayedExecution.promise.then(() => this._sendBatch(pendingBatch, flush)); + } + + /** + * Sends the provided batch once the rate limiter does not require any delay. + */ + private async _sendBatch( + batch: BulkCommitBatch, + flush = false + ): Promise { + const tag = requestTag(); // Send the batch if it is does not require any delay, or schedule another // attempt after the appropriate timeout. - if (finalDelayMs === 0) { - const underRateLimit = this._rateLimiter.tryMakeRequest( - pendingBatch._opCount - ); - assert( - underRateLimit, - 'RateLimiter should allow request if delayMs === 0' - ); - delayedExecution.resolve(); + const underRateLimit = this._rateLimiter.tryMakeRequest(batch._opCount); + if (underRateLimit) { + await batch.bulkCommit({requestTag: tag}); + if (flush) this._scheduleCurrentBatch(flush); } else { + const delayMs = this._rateLimiter.getNextRequestDelayMs(batch._opCount); logger( - 'BulkWriter._sendCurrentBatch', + 'BulkWriter._sendBatch', tag, - `Backing off for ${finalDelayMs} seconds` + `Backing off for ${delayMs} seconds` ); - delayExecution(() => delayedExecution.resolve(), finalDelayMs); + delayExecution(() => this._sendBatch(batch, flush), delayMs); } - - delayedExecution.promise.then(async () => { - // This should subtract rate limit, but it's not. - await pendingBatch.bulkCommit({requestTag: tag}); - if (flush) this._sendCurrentBatch(flush); - }); } /** @@ -789,7 +791,7 @@ export class BulkWriter { * * @private */ - private static applyJitter(backoffMs: number): number { + private static _applyJitter(backoffMs: number): number { if (backoffMs === 0) return 0; // Random value in [-0.3, 0.3]. const jitter = DEFAULT_JITTER_FACTOR * (Math.random() * 2 - 1); @@ -832,7 +834,7 @@ export class BulkWriter { if (this._bulkCommitBatch.has(op.ref)) { // Create a new batch since the backend doesn't support batches with two // writes to the same document. - this._sendCurrentBatch(); + this._scheduleCurrentBatch(); } // Run the operation on the current batch and advance the `_lastOp` pointer. @@ -843,7 +845,7 @@ export class BulkWriter { this._lastOp = this._lastOp.then(() => silencePromise(op.promise)); if (this._bulkCommitBatch._opCount === this._maxBatchSize) { - this._sendCurrentBatch(); + this._scheduleCurrentBatch(); } } }