diff --git a/src/decorateStatus.ts b/src/decorateStatus.ts deleted file mode 100644 index 079778a31..000000000 --- a/src/decorateStatus.ts +++ /dev/null @@ -1,80 +0,0 @@ -// Copyright 2019 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import * as extend from 'extend'; -import {google} from '../protos/protos'; - -/** - * @const {object} - A map of protobuf codes to HTTP status codes. - * @private - */ -const GRPC_ERROR_CODE_TO_HTTP = [ - {code: 200, message: 'OK'}, - {code: 499, message: 'Client Closed Request'}, - {code: 500, message: 'Internal Server Error'}, - {code: 400, message: 'Bad Request'}, - {code: 504, message: 'Gateway Timeout'}, - {code: 404, message: 'Not Found'}, - {code: 409, message: 'Conflict'}, - {code: 403, message: 'Forbidden'}, - {code: 429, message: 'Too Many Requests'}, - {code: 412, message: 'Precondition Failed'}, - {code: 409, message: 'Conflict'}, - {code: 400, message: 'Bad Request'}, - {code: 501, message: 'Not Implemented'}, - {code: 500, message: 'Internal Server Error'}, - {code: 503, message: 'Service Unavailable'}, - {code: 500, message: 'Internal Server Error'}, - {code: 401, message: 'Unauthorized'}, -]; - -export type DecoratedStatus = google.rpc.IStatus & { - code: number; - message: string; -}; - -/** - * Checks for a grpc status code and extends the supplied object with - * additional information. - * - * @param {object} obj - The object to be extended. - * @param {object} response - The grpc response. - * @return {object|null} - */ -export function decorateStatus( - response?: google.rpc.IStatus | null -): DecoratedStatus | null { - const obj = {}; - if (response && GRPC_ERROR_CODE_TO_HTTP[response.code!]) { - const defaultResponseDetails = GRPC_ERROR_CODE_TO_HTTP[response.code!]; - let message = defaultResponseDetails.message; - if (response.message) { - // gRPC error messages can be either stringified JSON or strings. - try { - message = JSON.parse(response.message).description; - } catch (e) { - message = response.message; - } - } - return extend(true, obj, response, { - code: defaultResponseDetails.code, - message, - }); - } - return null; -} - -export function shouldRetryRequest(r: {code: number}) { - return [429, 500, 502, 503].includes(r.code); -} diff --git a/src/index.ts b/src/index.ts index 997824b82..506c072ea 100644 --- a/src/index.ts +++ b/src/index.ts @@ -29,7 +29,6 @@ import { CreateInstanceResponse, IInstance, } from './instance'; -import {shouldRetryRequest} from './decorateStatus'; import {google} from '../protos/protos'; import {ServiceError} from 'google-gax'; import * as v2 from './v2'; @@ -842,7 +841,6 @@ export class Bigtable { currentRetryAttempt: 0, noResponseRetries: 0, objectMode: true, - shouldRetryFn: shouldRetryRequest, }, config.retryOpts ); diff --git a/src/table.ts b/src/table.ts index bcf7606c7..2518ff06c 100644 --- a/src/table.ts +++ b/src/table.ts @@ -15,7 +15,7 @@ import {promisifyAll} from '@google-cloud/promisify'; import arrify = require('arrify'); import {ServiceError} from 'google-gax'; -import {decorateStatus} from './decorateStatus'; +import {BackoffSettings} from 'google-gax/build/src/gax'; import {PassThrough, Transform} from 'stream'; // eslint-disable-next-line @typescript-eslint/no-var-requires @@ -46,9 +46,16 @@ import {Duplex} from 'stream'; // See protos/google/rpc/code.proto // (4=DEADLINE_EXCEEDED, 10=ABORTED, 14=UNAVAILABLE) const RETRYABLE_STATUS_CODES = new Set([4, 10, 14]); +const IDEMPOTENT_RETRYABLE_STATUS_CODES = new Set([4, 14]); // (1=CANCELLED) const IGNORED_STATUS_CODES = new Set([1]); +const DEFAULT_BACKOFF_SETTINGS: BackoffSettings = { + initialRetryDelayMillis: 10, + retryDelayMultiplier: 2, + maxRetryDelayMillis: 60000, +}; + /** * @typedef {object} Policy * @property {number} [version] Specifies the format of the policy. @@ -735,7 +742,8 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); const rowsLimit = options.limit || 0; const hasLimit = rowsLimit !== 0; let rowsRead = 0; - let numRequestsMade = 0; + let numConsecutiveErrors = 0; + let retryTimer: NodeJS.Timeout | null; rowKeys = options.keys || []; @@ -788,6 +796,9 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); if (activeRequestStream) { activeRequestStream.abort(); } + if (retryTimer) { + clearTimeout(retryTimer); + } return end(); }; @@ -795,6 +806,10 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); let rowStream: Duplex; const makeNewRequest = () => { + // Avoid cancelling an expired timer if user + // cancelled the stream in the middle of a retry + retryTimer = null; + const lastRowKey = chunkTransformer ? chunkTransformer.lastRowKey : ''; // eslint-disable-next-line @typescript-eslint/no-explicit-any chunkTransformer = new ChunkTransformer({decode: options.decode} as any); @@ -805,7 +820,13 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); } as google.bigtable.v2.IReadRowsRequest; const retryOpts = { - currentRetryAttempt: numRequestsMade, + currentRetryAttempt: numConsecutiveErrors, + // Handling retries in this client. Specify the retry options to + // make sure nothing is retried in retry-request. + noResponseRetries: 0, + shouldRetryFn: (_: any) => { + return false; + }, }; if (lastRowKey) { @@ -915,7 +936,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); ) { return next(); } - numRequestsMade = 0; rowsRead++; const row = this.row(rowData.key); row.data = rowData.data; @@ -936,20 +956,32 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); userStream.end(); return; } + numConsecutiveErrors++; if ( - numRequestsMade <= maxRetries && + numConsecutiveErrors <= maxRetries && RETRYABLE_STATUS_CODES.has(error.code) ) { - makeNewRequest(); + const backOffSettings = + options.gaxOptions?.retry?.backoffSettings || + DEFAULT_BACKOFF_SETTINGS; + const nextRetryDelay = getNextDelay( + numConsecutiveErrors, + backOffSettings + ); + retryTimer = setTimeout(makeNewRequest, nextRetryDelay); } else { userStream.emit('error', error); } }) + .on('data', _ => { + // Reset error count after a successful read so the backoff + // time won't keep increasing when as stream had multiple errors + numConsecutiveErrors = 0; + }) .on('end', () => { activeRequestStream = null; }); rowStream.pipe(userStream); - numRequestsMade++; }; makeNewRequest(); @@ -1504,23 +1536,43 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); ); const mutationErrorsByEntryIndex = new Map(); - const onBatchResponse = ( - err: ServiceError | PartialFailureError | null - ) => { - // TODO: enable retries when the entire RPC fails - if (err) { - // The error happened before a request was even made, don't retry. + const isRetryable = (err: ServiceError | null) => { + // Don't retry if there are no more entries or retry attempts + if (pendingEntryIndices.size === 0 || numRequestsMade >= maxRetries + 1) { + return false; + } + // If the error is empty but there are still outstanding mutations, + // it means that there are retryable errors in the mutate response + // even when the RPC succeeded + return !err || IDEMPOTENT_RETRYABLE_STATUS_CODES.has(err.code); + }; + + const onBatchResponse = (err: ServiceError | null) => { + // Return if the error happened before a request was made + if (numRequestsMade === 0) { callback(err); return; } - if (pendingEntryIndices.size !== 0 && numRequestsMade <= maxRetries) { - makeNextBatchRequest(); + + if (isRetryable(err)) { + const backOffSettings = + options.gaxOptions?.retry?.backoffSettings || + DEFAULT_BACKOFF_SETTINGS; + const nextDelay = getNextDelay(numRequestsMade, backOffSettings); + setTimeout(makeNextBatchRequest, nextDelay); return; } + // If there's no more pending mutations, set the error + // to null + if (pendingEntryIndices.size === 0) { + err = null; + } + if (mutationErrorsByEntryIndex.size !== 0) { const mutationErrors = Array.from(mutationErrorsByEntryIndex.values()); - err = new PartialFailureError(mutationErrors); + callback(new PartialFailureError(mutationErrors, err)); + return; } callback(err); @@ -1541,6 +1593,12 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); const retryOpts = { currentRetryAttempt: numRequestsMade, + // Handling retries in this client. Specify the retry options to + // make sure nothing is retried in retry-request. + noResponseRetries: 0, + shouldRetryFn: (_: any) => { + return false; + }, }; this.bigtable @@ -1552,13 +1610,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); retryOpts, }) .on('error', (err: ServiceError) => { - // TODO: this check doesn't actually do anything, onBatchResponse - // currently doesn't retry RPC errors, only entry failures - if (numRequestsMade === 0) { - callback(err); // Likely a "projectId not detected" error. - return; - } - onBatchResponse(err); }) .on('data', (obj: google.bigtable.v2.IMutateRowsResponse) => { @@ -1572,13 +1623,13 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); mutationErrorsByEntryIndex.delete(originalEntriesIndex); return; } - if (!RETRYABLE_STATUS_CODES.has(entry.status!.code!)) { + if (!IDEMPOTENT_RETRYABLE_STATUS_CODES.has(entry.status!.code!)) { pendingEntryIndices.delete(originalEntriesIndex); } - const status = decorateStatus(entry.status); + const errorDetails = entry.status; // eslint-disable-next-line @typescript-eslint/no-explicit-any - (status as any).entry = originalEntry; - mutationErrorsByEntryIndex.set(originalEntriesIndex, status); + (errorDetails as any).entry = originalEntry; + mutationErrorsByEntryIndex.set(originalEntriesIndex, errorDetails); }); }) .on('end', onBatchResponse); @@ -1997,6 +2048,17 @@ promisifyAll(Table, { exclude: ['family', 'row'], }); +function getNextDelay(numConsecutiveErrors: number, config: BackoffSettings) { + // 0 - 100 ms jitter + const jitter = Math.floor(Math.random() * 100); + const calculatedNextRetryDelay = + config.initialRetryDelayMillis * + Math.pow(config.retryDelayMultiplier, numConsecutiveErrors) + + jitter; + + return Math.min(calculatedNextRetryDelay, config.maxRetryDelayMillis); +} + export interface GoogleInnerError { reason?: string; message?: string; @@ -2004,7 +2066,7 @@ export interface GoogleInnerError { export class PartialFailureError extends Error { errors?: GoogleInnerError[]; - constructor(errors: GoogleInnerError[]) { + constructor(errors: GoogleInnerError[], rpcError?: ServiceError | null) { super(); this.errors = errors; this.name = 'PartialFailureError'; @@ -2017,5 +2079,8 @@ export class PartialFailureError extends Error { messages.push('\n'); } this.message = messages.join('\n'); + if (rpcError) { + this.message += 'Request failed with: ' + rpcError.message; + } } } diff --git a/system-test/data/mutate-rows-retry-test.json b/system-test/data/mutate-rows-retry-test.json index 7ecc1b01d..e88fbc4bb 100644 --- a/system-test/data/mutate-rows-retry-test.json +++ b/system-test/data/mutate-rows-retry-test.json @@ -70,7 +70,7 @@ ], "responses": [ { "code": 200, "entry_codes": [ 4, 4, 4, 4, 4, 1 ] }, - { "code": 200, "entry_codes": [ 10, 14, 10, 14, 0 ] }, + { "code": 200, "entry_codes": [ 4, 14, 14, 14, 0 ] }, { "code": 200, "entry_codes": [ 1, 4, 4, 0 ] }, { "code": 200, "entry_codes": [ 0, 4 ] }, { "code": 200, "entry_codes": [ 4 ] }, diff --git a/system-test/data/read-rows-retry-test.json b/system-test/data/read-rows-retry-test.json index 037c0e1f6..aad5178c6 100644 --- a/system-test/data/read-rows-retry-test.json +++ b/system-test/data/read-rows-retry-test.json @@ -76,7 +76,7 @@ { "name": "resets the retry counter after a successful read", - "max_retries": 3, + "max_retries": 4, "request_options": [ { "rowKeys": [], "rowRanges": [{}] @@ -211,7 +211,7 @@ { "name": "does the previous 5 things in one giant test case", - "max_retries": 3, + "max_retries": 4, "createReadStream_options": { "limit": 10, "ranges": [{ diff --git a/system-test/mutate-rows.ts b/system-test/mutate-rows.ts index f7e15ec8e..a5c9729b4 100644 --- a/system-test/mutate-rows.ts +++ b/system-test/mutate-rows.ts @@ -27,7 +27,6 @@ import {Entry, PartialFailureError} from '../src/table'; import {CancellableStream, GrpcClient, GoogleAuth} from 'google-gax'; import {BigtableClient} from '../src/v2'; import {PassThrough} from 'stream'; -import {shouldRetryRequest} from '../src/decorateStatus'; const {grpc} = new GrpcClient(); @@ -94,20 +93,26 @@ describe('Bigtable/Table', () => { responses = null; bigtable.api.BigtableClient = { mutateRows: (reqOpts, options) => { - const retryRequestOptions = { - noResponseRetries: 0, - objectMode: true, - shouldRetryFn: shouldRetryRequest, - currentRetryAttempt: currentRetryAttempt++, - }; + // TODO: Currently retry options for retry-request are ignored. + // Retry-request is not handling grpc errors correctly, so + // we are handling retries in table.ts and disabling retries in + // gax to avoid a request getting retried in multiple places. + // Re-enable this test after switching back to using the retry + // logic in gax + // const retryRequestOptions = { + // noResponseRetries: 0, + // objectMode: true, + // shouldRetryFn: shouldRetryRequest, + // currentRetryAttempt: currentRetryAttempt++, + // }; mutationBatchesInvoked.push( // eslint-disable-next-line @typescript-eslint/no-explicit-any reqOpts!.entries!.map(entry => (entry.rowKey as any).asciiSlice()) ); - assert.deepStrictEqual( - options!.retryRequestOptions, - retryRequestOptions - ); + // assert.deepStrictEqual( + // options!.retryRequestOptions, + // retryRequestOptions + // ); mutationCallTimes.push(new Date().getTime()); const emitter = new PassThrough({objectMode: true}); dispatch(emitter, responses!.shift()); diff --git a/test/common.ts b/test/common.ts deleted file mode 100644 index de7b50d97..000000000 --- a/test/common.ts +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2019 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import * as assert from 'assert'; -import {describe, it} from 'mocha'; -import {shouldRetryRequest, decorateStatus} from '../src/decorateStatus'; - -describe('decorateStatus', () => { - it('should attach the correct HTTP code', () => { - const grpcStatus = {code: 0}; - const status = decorateStatus(grpcStatus); - assert.strictEqual(status!.message, 'OK'); - }); - - it('should return null if the code doesnt match', () => { - const grpcStatus = {code: 999}; - const status = decorateStatus(grpcStatus); - assert.strictEqual(status, null); - }); - - it('should accept a basic message', () => { - const message = 'QUACK!'; - const grpcStatus = {code: 1, message}; - const status = decorateStatus(grpcStatus); - assert.strictEqual(status!.message, message); - }); - - it('should parse JSON from the response message', () => { - const message = { - description: { - rubber: '🦆', - }, - }; - const grpcStatus = {code: 1, message: JSON.stringify(message)}; - const status = decorateStatus(grpcStatus); - assert.deepStrictEqual(status!.message, message.description); - }); -}); - -describe('shouldRetryRequest_', () => { - it('should retry on 429, 500, 502, and 503', () => { - const s1 = shouldRetryRequest({code: 429}); - assert.strictEqual(s1, true); - const s2 = shouldRetryRequest({code: 444}); - assert.strictEqual(s2, false); - }); -}); diff --git a/test/index.ts b/test/index.ts index 5b7301347..147a76001 100644 --- a/test/index.ts +++ b/test/index.ts @@ -23,7 +23,6 @@ import * as sn from 'sinon'; import {Cluster} from '../src/cluster.js'; import {Instance} from '../src/instance.js'; import {PassThrough} from 'stream'; -import {shouldRetryRequest} from '../src/decorateStatus.js'; // eslint-disable-next-line @typescript-eslint/no-var-requires const v2 = require('../src/v2'); @@ -895,51 +894,55 @@ describe('Bigtable', () => { }; }); - it('should pass retryRequestOptions', done => { - const expectedRetryRequestOptions = { - currentRetryAttempt: 0, - noResponseRetries: 0, - objectMode: true, - shouldRetryFn: shouldRetryRequest, - }; - - bigtable.api[CONFIG.client] = { - [CONFIG.method]: (reqOpts: {}, options: gax.CallOptions) => { - assert.deepStrictEqual( - options.retryRequestOptions, - expectedRetryRequestOptions - ); - done(); - }, - }; - - const requestStream = bigtable.request(CONFIG); - requestStream.emit('reading'); - }); - - it('should set gaxOpts.retryRequestOptions when gaxOpts undefined', done => { - const expectedRetryRequestOptions = { - currentRetryAttempt: 0, - noResponseRetries: 0, - objectMode: true, - shouldRetryFn: shouldRetryRequest, - }; - - bigtable.api[CONFIG.client] = { - [CONFIG.method]: (reqOpts: {}, options: gax.CallOptions) => { - assert.deepStrictEqual( - options.retryRequestOptions, - expectedRetryRequestOptions - ); - done(); - }, - }; - - const config = Object.assign({}, CONFIG); - delete config.gaxOpts; - const requestStream = bigtable.request(config); - requestStream.emit('reading'); - }); + // TODO: retry request options are currently ignored + // Re-enable after retry logic is fixed in gax / retry-request + // it('should pass retryRequestOptions', done => { + // const expectedRetryRequestOptions = { + // currentRetryAttempt: 0, + // noResponseRetries: 0, + // objectMode: true, + // shouldRetryFn: shouldRetryRequest, + // }; + + // bigtable.api[CONFIG.client] = { + // [CONFIG.method]: (reqOpts: {}, options: gax.CallOptions) => { + // assert.deepStrictEqual( + // options.retryRequestOptions, + // expectedRetryRequestOptions + // ); + // done(); + // }, + // }; + + // const requestStream = bigtable.request(CONFIG); + // requestStream.emit('reading'); + // }); + + // TODO: retry request options are currently ignored + // Re-enable after retry logic is fixed in gax / retry-request + // it('should set gaxOpts.retryRequestOptions when gaxOpts undefined', done => { + // const expectedRetryRequestOptions = { + // currentRetryAttempt: 0, + // noResponseRetries: 0, + // objectMode: true, + // shouldRetryFn: shouldRetryRequest, + // }; + + // bigtable.api[CONFIG.client] = { + // [CONFIG.method]: (reqOpts: {}, options: gax.CallOptions) => { + // assert.deepStrictEqual( + // options.retryRequestOptions, + // expectedRetryRequestOptions + // ); + // done(); + // }, + // }; + + // const config = Object.assign({}, CONFIG); + // delete config.gaxOpts; + // const requestStream = bigtable.request(config); + // requestStream.emit('reading'); + // }); it('should expose an abort function', done => { GAX_STREAM.cancel = done; diff --git a/test/table.ts b/test/table.ts index 50b8064d4..f25dac613 100644 --- a/test/table.ts +++ b/test/table.ts @@ -20,7 +20,6 @@ import * as pumpify from 'pumpify'; import * as sinon from 'sinon'; import {PassThrough, Writable, Duplex} from 'stream'; import {ServiceError} from 'google-gax'; -import {DecoratedStatus} from '../src/decorateStatus'; import * as inst from '../src/instance'; import {ChunkTransformer} from '../src/chunktransformer.js'; @@ -28,7 +27,6 @@ import {Family} from '../src/family.js'; import {Mutation} from '../src/mutation.js'; import {Row} from '../src/row.js'; import * as tblTypes from '../src/table'; -import * as ds from '../src/decorateStatus.js'; import {Bigtable} from '../src'; import {EventEmitter} from 'events'; @@ -2457,18 +2455,18 @@ describe('Bigtable/Table', () => { index: 0, status: { code: 1, + message: 'CANCELLED', }, }, { index: 1, status: { - code: 1, + code: 10, + message: 'ABORTED', }, }, ]; - const parsedStatuses = [{} as DecoratedStatus, {} as DecoratedStatus]; - beforeEach(() => { table.bigtable.request = () => { const stream = new PassThrough({objectMode: true}); @@ -2478,32 +2476,31 @@ describe('Bigtable/Table', () => { }); return stream; }; - - let statusCount = 0; - sandbox.stub(ds, 'decorateStatus').callsFake(status => { - assert.strictEqual(status, fakeStatuses[statusCount].status); - return parsedStatuses[statusCount++]; - }); }); it('should return a PartialFailureError', done => { - table.mutate(entries, (err: Error) => { + const newEntries = [ + { + key: 'a', + }, + { + key: 'b', + }, + ]; + table.mutate(newEntries, (err: Error) => { assert.strictEqual(err.name, 'PartialFailureError'); // eslint-disable-next-line @typescript-eslint/no-explicit-any assert.deepStrictEqual((err as any).errors, [ - Object.assign( - { - entry: entries[0], - }, - parsedStatuses[0] - ), - - Object.assign( - { - entry: entries[1], - }, - parsedStatuses[1] - ), + Object.assign({ + entry: newEntries[0], + code: fakeStatuses[0].status.code, + message: fakeStatuses[0].status.message, + }), + Object.assign({ + entry: newEntries[1], + code: fakeStatuses[1].status.code, + message: fakeStatuses[1].status.message, + }), ]); done(); @@ -2578,7 +2575,6 @@ describe('Bigtable/Table', () => { }, ], ]; - sandbox.stub(ds, 'decorateStatus').returns({} as DecoratedStatus); // eslint-disable-next-line @typescript-eslint/no-explicit-any table.bigtable.request = (config: any) => { entryRequests.push(config.reqOpts.entries); @@ -2609,6 +2605,87 @@ describe('Bigtable/Table', () => { }); }); }); + + describe('rpc level retries', () => { + let emitters: EventEmitter[] | null; // = [((stream: Writable) => { stream.push([{ key: 'a' }]); + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let entryRequests: any; + + beforeEach(() => { + emitters = null; // This needs to be assigned in each test case. + + entryRequests = []; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + table.bigtable.request = (config: any) => { + entryRequests.push(config.reqOpts.entries); + const stream = new PassThrough({ + objectMode: true, + }); + + setImmediate(() => { + (emitters!.shift() as any)(stream); + }); + + return stream; + }; + }); + + it('should not retry unretriable errors', done => { + const unretriableError = new Error('not retryable') as ServiceError; + unretriableError.code = 10; // Aborted + emitters = [ + ((stream: Writable) => { + stream.emit('error', unretriableError); + }) as {} as EventEmitter, + ]; + table.maxRetries = 1; + table.mutate(entries, () => { + assert.strictEqual(entryRequests.length, 1); + done(); + }); + }); + + it('should retry retryable errors', done => { + const error = new Error('retryable') as ServiceError; + error.code = 14; // Unavailable + emitters = [ + ((stream: Writable) => { + stream.emit('error', error); + }) as {} as EventEmitter, + ((stream: Writable) => { + stream.end(); + }) as {} as EventEmitter, + ]; + table.maxRetries = 1; + table.mutate(entries, () => { + assert.strictEqual(entryRequests.length, 2); + done(); + }); + }); + + it('should not retry more than maxRetries times', done => { + const error = new Error('retryable') as ServiceError; + error.code = 14; // Unavailable + emitters = [ + ((stream: Writable) => { + stream.emit('error', error); + }) as {} as EventEmitter, + ((stream: Writable) => { + stream.emit('error', error); + }) as {} as EventEmitter, + ((stream: Writable) => { + stream.end(); + }) as {} as EventEmitter, + ]; + table.maxRetries = 1; + table.mutate(entries, () => { + assert.strictEqual(entryRequests.length, 2); + done(); + }); + }); + }); }); describe('row', () => {