Skip to content

Commit

Permalink
fix: disable retry-request, add exponential backoff in mutateRows and…
Browse files Browse the repository at this point in the history
… readRows (#1060)

* fix: add rpc level retries for mutate

* remove debugging logs

* add exponential backoff

* simplify mutate row retry logic

* fix broken tests

* ignore checks for retry options

* fix lint

* comments

* reset retry after a succee response

* fix lint

* fix system test

* clean up

* add rpc status in mutate rows, and remove http status

* remove unnecessary check

* remove decorate status

* update

* fix

* correct retry count
  • Loading branch information
mutianf authored Mar 31, 2022
1 parent 2b175ac commit 3718011
Show file tree
Hide file tree
Showing 9 changed files with 264 additions and 254 deletions.
80 changes: 0 additions & 80 deletions src/decorateStatus.ts

This file was deleted.

2 changes: 0 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import {
CreateInstanceResponse,
IInstance,
} from './instance';
import {shouldRetryRequest} from './decorateStatus';
import {google} from '../protos/protos';
import {ServiceError} from 'google-gax';
import * as v2 from './v2';
Expand Down Expand Up @@ -842,7 +841,6 @@ export class Bigtable {
currentRetryAttempt: 0,
noResponseRetries: 0,
objectMode: true,
shouldRetryFn: shouldRetryRequest,
},
config.retryOpts
);
Expand Down
121 changes: 93 additions & 28 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import {promisifyAll} from '@google-cloud/promisify';
import arrify = require('arrify');
import {ServiceError} from 'google-gax';
import {decorateStatus} from './decorateStatus';
import {BackoffSettings} from 'google-gax/build/src/gax';
import {PassThrough, Transform} from 'stream';

// eslint-disable-next-line @typescript-eslint/no-var-requires
Expand Down Expand Up @@ -46,9 +46,16 @@ import {Duplex} from 'stream';
// See protos/google/rpc/code.proto
// (4=DEADLINE_EXCEEDED, 10=ABORTED, 14=UNAVAILABLE)
const RETRYABLE_STATUS_CODES = new Set([4, 10, 14]);
const IDEMPOTENT_RETRYABLE_STATUS_CODES = new Set([4, 14]);
// (1=CANCELLED)
const IGNORED_STATUS_CODES = new Set([1]);

const DEFAULT_BACKOFF_SETTINGS: BackoffSettings = {
initialRetryDelayMillis: 10,
retryDelayMultiplier: 2,
maxRetryDelayMillis: 60000,
};

/**
* @typedef {object} Policy
* @property {number} [version] Specifies the format of the policy.
Expand Down Expand Up @@ -735,7 +742,8 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
const rowsLimit = options.limit || 0;
const hasLimit = rowsLimit !== 0;
let rowsRead = 0;
let numRequestsMade = 0;
let numConsecutiveErrors = 0;
let retryTimer: NodeJS.Timeout | null;

rowKeys = options.keys || [];

Expand Down Expand Up @@ -788,13 +796,20 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
if (activeRequestStream) {
activeRequestStream.abort();
}
if (retryTimer) {
clearTimeout(retryTimer);
}
return end();
};

let chunkTransformer: ChunkTransformer;
let rowStream: Duplex;

const makeNewRequest = () => {
// Avoid cancelling an expired timer if user
// cancelled the stream in the middle of a retry
retryTimer = null;

const lastRowKey = chunkTransformer ? chunkTransformer.lastRowKey : '';
// eslint-disable-next-line @typescript-eslint/no-explicit-any
chunkTransformer = new ChunkTransformer({decode: options.decode} as any);
Expand All @@ -805,7 +820,13 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
} as google.bigtable.v2.IReadRowsRequest;

const retryOpts = {
currentRetryAttempt: numRequestsMade,
currentRetryAttempt: numConsecutiveErrors,
// Handling retries in this client. Specify the retry options to
// make sure nothing is retried in retry-request.
noResponseRetries: 0,
shouldRetryFn: (_: any) => {
return false;
},
};

if (lastRowKey) {
Expand Down Expand Up @@ -915,7 +936,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
) {
return next();
}
numRequestsMade = 0;
rowsRead++;
const row = this.row(rowData.key);
row.data = rowData.data;
Expand All @@ -936,20 +956,32 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
userStream.end();
return;
}
numConsecutiveErrors++;
if (
numRequestsMade <= maxRetries &&
numConsecutiveErrors <= maxRetries &&
RETRYABLE_STATUS_CODES.has(error.code)
) {
makeNewRequest();
const backOffSettings =
options.gaxOptions?.retry?.backoffSettings ||
DEFAULT_BACKOFF_SETTINGS;
const nextRetryDelay = getNextDelay(
numConsecutiveErrors,
backOffSettings
);
retryTimer = setTimeout(makeNewRequest, nextRetryDelay);
} else {
userStream.emit('error', error);
}
})
.on('data', _ => {
// Reset error count after a successful read so the backoff
// time won't keep increasing when as stream had multiple errors
numConsecutiveErrors = 0;
})
.on('end', () => {
activeRequestStream = null;
});
rowStream.pipe(userStream);
numRequestsMade++;
};

makeNewRequest();
Expand Down Expand Up @@ -1504,23 +1536,43 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
);
const mutationErrorsByEntryIndex = new Map();

const onBatchResponse = (
err: ServiceError | PartialFailureError | null
) => {
// TODO: enable retries when the entire RPC fails
if (err) {
// The error happened before a request was even made, don't retry.
const isRetryable = (err: ServiceError | null) => {
// Don't retry if there are no more entries or retry attempts
if (pendingEntryIndices.size === 0 || numRequestsMade >= maxRetries + 1) {
return false;
}
// If the error is empty but there are still outstanding mutations,
// it means that there are retryable errors in the mutate response
// even when the RPC succeeded
return !err || IDEMPOTENT_RETRYABLE_STATUS_CODES.has(err.code);
};

const onBatchResponse = (err: ServiceError | null) => {
// Return if the error happened before a request was made
if (numRequestsMade === 0) {
callback(err);
return;
}
if (pendingEntryIndices.size !== 0 && numRequestsMade <= maxRetries) {
makeNextBatchRequest();

if (isRetryable(err)) {
const backOffSettings =
options.gaxOptions?.retry?.backoffSettings ||
DEFAULT_BACKOFF_SETTINGS;
const nextDelay = getNextDelay(numRequestsMade, backOffSettings);
setTimeout(makeNextBatchRequest, nextDelay);
return;
}

// If there's no more pending mutations, set the error
// to null
if (pendingEntryIndices.size === 0) {
err = null;
}

if (mutationErrorsByEntryIndex.size !== 0) {
const mutationErrors = Array.from(mutationErrorsByEntryIndex.values());
err = new PartialFailureError(mutationErrors);
callback(new PartialFailureError(mutationErrors, err));
return;
}

callback(err);
Expand All @@ -1541,6 +1593,12 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);

const retryOpts = {
currentRetryAttempt: numRequestsMade,
// Handling retries in this client. Specify the retry options to
// make sure nothing is retried in retry-request.
noResponseRetries: 0,
shouldRetryFn: (_: any) => {
return false;
},
};

this.bigtable
Expand All @@ -1552,13 +1610,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
retryOpts,
})
.on('error', (err: ServiceError) => {
// TODO: this check doesn't actually do anything, onBatchResponse
// currently doesn't retry RPC errors, only entry failures
if (numRequestsMade === 0) {
callback(err); // Likely a "projectId not detected" error.
return;
}

onBatchResponse(err);
})
.on('data', (obj: google.bigtable.v2.IMutateRowsResponse) => {
Expand All @@ -1572,13 +1623,13 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
mutationErrorsByEntryIndex.delete(originalEntriesIndex);
return;
}
if (!RETRYABLE_STATUS_CODES.has(entry.status!.code!)) {
if (!IDEMPOTENT_RETRYABLE_STATUS_CODES.has(entry.status!.code!)) {
pendingEntryIndices.delete(originalEntriesIndex);
}
const status = decorateStatus(entry.status);
const errorDetails = entry.status;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(status as any).entry = originalEntry;
mutationErrorsByEntryIndex.set(originalEntriesIndex, status);
(errorDetails as any).entry = originalEntry;
mutationErrorsByEntryIndex.set(originalEntriesIndex, errorDetails);
});
})
.on('end', onBatchResponse);
Expand Down Expand Up @@ -1997,14 +2048,25 @@ promisifyAll(Table, {
exclude: ['family', 'row'],
});

function getNextDelay(numConsecutiveErrors: number, config: BackoffSettings) {
// 0 - 100 ms jitter
const jitter = Math.floor(Math.random() * 100);
const calculatedNextRetryDelay =
config.initialRetryDelayMillis *
Math.pow(config.retryDelayMultiplier, numConsecutiveErrors) +
jitter;

return Math.min(calculatedNextRetryDelay, config.maxRetryDelayMillis);
}

export interface GoogleInnerError {
reason?: string;
message?: string;
}

export class PartialFailureError extends Error {
errors?: GoogleInnerError[];
constructor(errors: GoogleInnerError[]) {
constructor(errors: GoogleInnerError[], rpcError?: ServiceError | null) {
super();
this.errors = errors;
this.name = 'PartialFailureError';
Expand All @@ -2017,5 +2079,8 @@ export class PartialFailureError extends Error {
messages.push('\n');
}
this.message = messages.join('\n');
if (rpcError) {
this.message += 'Request failed with: ' + rpcError.message;
}
}
}
2 changes: 1 addition & 1 deletion system-test/data/mutate-rows-retry-test.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
],
"responses": [
{ "code": 200, "entry_codes": [ 4, 4, 4, 4, 4, 1 ] },
{ "code": 200, "entry_codes": [ 10, 14, 10, 14, 0 ] },
{ "code": 200, "entry_codes": [ 4, 14, 14, 14, 0 ] },
{ "code": 200, "entry_codes": [ 1, 4, 4, 0 ] },
{ "code": 200, "entry_codes": [ 0, 4 ] },
{ "code": 200, "entry_codes": [ 4 ] },
Expand Down
4 changes: 2 additions & 2 deletions system-test/data/read-rows-retry-test.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@

{
"name": "resets the retry counter after a successful read",
"max_retries": 3,
"max_retries": 4,
"request_options": [
{ "rowKeys": [],
"rowRanges": [{}]
Expand Down Expand Up @@ -211,7 +211,7 @@

{
"name": "does the previous 5 things in one giant test case",
"max_retries": 3,
"max_retries": 4,
"createReadStream_options": {
"limit": 10,
"ranges": [{
Expand Down
Loading

0 comments on commit 3718011

Please sign in to comment.