Skip to content

Commit

Permalink
fix: resumable uploads should respect autoRetry & multipart uploads s…
Browse files Browse the repository at this point in the history
…hould correctly use preconditions (#1779)

* chore: began refactoring gcs-resumable-upload

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fix: consistent defaults

* fixed tests

* gcs-resumable-upload should use retry interface from storage

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* refactored code

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* fixed tests

* create shallow copy of retry options before passing to resumable upload

* fix unit test with retry options and resumable upload

* 🦉 Updates from OwlBot

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* trying to fix build issues that i cannot repro

* tried changing paginator version

* changed version back

* tried changing paginator version

* fix resumable upload conformance test

* fixed bug in preconditions

* update paginator to 3.0.7

* make instanceRetryValue private

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
Co-authored-by: Denis DelGrosso <[email protected]>
  • Loading branch information
3 people authored Feb 16, 2022
1 parent 6ce9a2a commit 1e72586
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 155 deletions.
15 changes: 7 additions & 8 deletions conformance-test/libraryMethods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ export async function bucketSetStorageClass(bucket: Bucket) {
}

export async function bucketUploadResumable(bucket: Bucket) {
if (bucket.instancePreconditionOpts) {
bucket.instancePreconditionOpts.ifGenerationMatch = 0;
}
await bucket.upload(
path.join(
__dirname,
Expand All @@ -189,14 +192,10 @@ export async function bucketUploadResumable(bucket: Bucket) {
}

export async function bucketUploadMultipart(bucket: Bucket) {
// If we are using a precondition we must make sure the file exists and the metageneration matches that provided as a query parameter
bucket = new Bucket(bucket.storage, bucket.name, {
preconditionOpts: {
ifMetagenerationMatch: 1,
},
});
const fileToSave = bucket.file('retryStrategyTestData.json');
await fileToSave.save('fileToSave contents');
if (bucket.instancePreconditionOpts) {
delete bucket.instancePreconditionOpts.ifMetagenerationMatch;
bucket.instancePreconditionOpts.ifGenerationMatch = 0;
}
await bucket.upload(
path.join(
__dirname,
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
},
"dependencies": {
"@google-cloud/common": "^3.8.1",
"@google-cloud/paginator": "^3.0.0",
"@google-cloud/paginator": "^3.0.7",
"@google-cloud/promisify": "^2.0.0",
"abort-controller": "^3.0.0",
"arrify": "^2.0.0",
Expand Down
9 changes: 5 additions & 4 deletions src/bucket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ class Bucket extends ServiceObject {
signer?: URLSigner;

private instanceRetryValue?: boolean;
private instancePreconditionOpts?: PreconditionOptions;
instancePreconditionOpts?: PreconditionOptions;

constructor(storage: Storage, name: string, options?: BucketOptions) {
options = options || {};
Expand Down Expand Up @@ -3991,11 +3991,12 @@ class Bucket extends ServiceObject {
options
);

// Do not retry if precondition option ifMetagenerationMatch is not set
// Do not retry if precondition option ifGenerationMatch is not set
// because this is a file operation
let maxRetries = this.storage.retryOptions.maxRetries;
if (
(options?.preconditionOpts?.ifMetagenerationMatch === undefined &&
this.instancePreconditionOpts?.ifMetagenerationMatch === undefined &&
(options?.preconditionOpts?.ifGenerationMatch === undefined &&
this.instancePreconditionOpts?.ifGenerationMatch === undefined &&
this.storage.retryOptions.idempotencyStrategy ===
IdempotencyStrategy.RetryConditional) ||
this.storage.retryOptions.idempotencyStrategy ===
Expand Down
2 changes: 1 addition & 1 deletion src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3982,7 +3982,7 @@ class File extends ServiceObject<File> {
public: options.public,
uri: options.uri,
userProject: options.userProject || this.userProject,
retryOptions: retryOptions,
retryOptions: {...retryOptions},
params: options?.preconditionOpts || this.instancePreconditionOpts,
chunkSize: options?.chunkSize,
});
Expand Down
99 changes: 26 additions & 73 deletions src/gcs-resumable-upload/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,12 @@ import * as Pumpify from 'pumpify';
import {Duplex, PassThrough, Readable} from 'stream';
import * as streamEvents from 'stream-events';
import retry = require('async-retry');
import {RetryOptions, PreconditionOptions} from '../storage';

const NOT_FOUND_STATUS_CODE = 404;
const TERMINATED_UPLOAD_STATUS_CODE = 410;
const RESUMABLE_INCOMPLETE_STATUS_CODE = 308;
const RETRY_LIMIT = 5;
const DEFAULT_API_ENDPOINT_REGEX = /.*\.googleapis\.com/;
const MAX_RETRY_DELAY = 64;
const RETRY_DELAY_MULTIPLIER = 2;
const MAX_TOTAL_RETRY_TIMEOUT = 600;
const AUTO_RETRY_VALUE = true;

export const PROTOCOL_REGEX = /^(\w*):\/\//;

Expand All @@ -60,12 +56,8 @@ export type PredefinedAcl =
| 'projectPrivate'
| 'publicRead';

export interface QueryParameters {
export interface QueryParameters extends PreconditionOptions {
contentEncoding?: string;
ifGenerationMatch?: number;
ifGenerationNotMatch?: number;
ifMetagenerationMatch?: number;
ifMetagenerationNotMatch?: number;
kmsKeyName?: string;
predefinedAcl?: PredefinedAcl;
projection?: 'full' | 'noAcl';
Expand Down Expand Up @@ -207,7 +199,7 @@ export interface UploadConfig {
/**
* Configuration options for retrying retryable errors.
*/
retryOptions?: RetryOptions;
retryOptions: RetryOptions;
}

export interface ConfigMetadata {
Expand All @@ -225,15 +217,6 @@ export interface ConfigMetadata {
contentType?: string;
}

export interface RetryOptions {
retryDelayMultiplier?: number;
totalTimeout?: number;
maxRetryDelay?: number;
autoRetry?: boolean;
maxRetries?: number;
retryableErrorFn?: (err: ApiError) => boolean;
}

export interface GoogleInnerError {
reason?: string;
}
Expand Down Expand Up @@ -279,12 +262,8 @@ export class Upload extends Pumpify {
numBytesWritten = 0;
numRetries = 0;
contentLength: number | '*';
retryLimit: number = RETRY_LIMIT;
maxRetryDelay: number = MAX_RETRY_DELAY;
retryDelayMultiplier: number = RETRY_DELAY_MULTIPLIER;
maxRetryTotalTimeout: number = MAX_TOTAL_RETRY_TIMEOUT;
retryOptions: RetryOptions;
timeOfFirstRequest: number;
retryableErrorFn?: (err: ApiError) => boolean;
private upstreamChunkBuffer: Buffer = Buffer.alloc(0);
private chunkBufferEncoding?: BufferEncoding = undefined;
private numChunksReadInRequest = 0;
Expand Down Expand Up @@ -340,6 +319,7 @@ export class Upload extends Pumpify {
this.params = cfg.params || {};
this.userProject = cfg.userProject;
this.chunkSize = cfg.chunkSize;
this.retryOptions = cfg.retryOptions;

if (cfg.key) {
/**
Expand All @@ -363,32 +343,16 @@ export class Upload extends Pumpify {
configPath,
});

const autoRetry = cfg?.retryOptions?.autoRetry || AUTO_RETRY_VALUE;
const autoRetry = cfg.retryOptions.autoRetry;
this.uriProvidedManually = !!cfg.uri;
this.uri = cfg.uri || this.get('uri');
this.numBytesWritten = 0;
this.numRetries = 0; //counter for number of retries currently executed

if (autoRetry && cfg?.retryOptions?.maxRetries !== undefined) {
this.retryLimit = cfg.retryOptions.maxRetries;
} else if (!autoRetry) {
this.retryLimit = 0;
}

if (cfg?.retryOptions?.maxRetryDelay !== undefined) {
this.maxRetryDelay = cfg.retryOptions.maxRetryDelay;
}

if (cfg?.retryOptions?.retryDelayMultiplier !== undefined) {
this.retryDelayMultiplier = cfg.retryOptions.retryDelayMultiplier;
}

if (cfg?.retryOptions?.totalTimeout !== undefined) {
this.maxRetryTotalTimeout = cfg.retryOptions.totalTimeout;
if (!autoRetry) {
cfg.retryOptions.maxRetries = 0;
}

this.timeOfFirstRequest = Date.now();
this.retryableErrorFn = cfg?.retryOptions?.retryableErrorFn;

const contentLength = cfg.metadata
? Number(cfg.metadata.contentLength)
Expand Down Expand Up @@ -646,9 +610,8 @@ export class Upload extends Pumpify {
],
};
if (
this.retryLimit > 0 &&
this.retryableErrorFn &&
this.retryableErrorFn!(apiError as ApiError)
this.retryOptions.maxRetries! > 0 &&
this.retryOptions.retryableErrorFn!(apiError as ApiError)
) {
throw e;
} else {
Expand All @@ -657,10 +620,10 @@ export class Upload extends Pumpify {
}
},
{
retries: this.retryLimit,
factor: this.retryDelayMultiplier,
maxTimeout: this.maxRetryDelay! * 1000, //convert to milliseconds
maxRetryTime: this.maxRetryTotalTimeout! * 1000, //convert to milliseconds
retries: this.retryOptions.maxRetries,
factor: this.retryOptions.retryDelayMultiplier,
maxTimeout: this.retryOptions.maxRetryDelay! * 1000, //convert to milliseconds
maxRetryTime: this.retryOptions.totalTimeout! * 1000, //convert to milliseconds
}
);

Expand Down Expand Up @@ -1055,14 +1018,11 @@ export class Upload extends Pumpify {
*/
private onResponse(resp: GaxiosResponse) {
if (
(this.retryableErrorFn &&
this.retryableErrorFn({
code: resp.status,
message: resp.statusText,
name: resp.statusText,
})) ||
resp.status === NOT_FOUND_STATUS_CODE ||
this.isServerErrorResponse(resp.status)
this.retryOptions.retryableErrorFn!({
code: resp.status,
message: resp.statusText,
name: resp.statusText,
})
) {
this.attemptDelayedRetry(resp);
return false;
Expand All @@ -1076,7 +1036,7 @@ export class Upload extends Pumpify {
* @param resp GaxiosResponse object from previous attempt
*/
private attemptDelayedRetry(resp: GaxiosResponse) {
if (this.numRetries < this.retryLimit) {
if (this.numRetries < this.retryOptions.maxRetries!) {
if (
resp.status === NOT_FOUND_STATUS_CODE &&
this.numChunksReadInRequest === 0
Expand Down Expand Up @@ -1120,10 +1080,13 @@ export class Upload extends Pumpify {
private getRetryDelay(): number {
const randomMs = Math.round(Math.random() * 1000);
const waitTime =
Math.pow(this.retryDelayMultiplier, this.numRetries) * 1000 + randomMs;
Math.pow(this.retryOptions.retryDelayMultiplier!, this.numRetries) *
1000 +
randomMs;
const maxAllowableDelayMs =
this.maxRetryTotalTimeout * 1000 - (Date.now() - this.timeOfFirstRequest);
const maxRetryDelayMs = this.maxRetryDelay * 1000;
this.retryOptions.totalTimeout! * 1000 -
(Date.now() - this.timeOfFirstRequest);
const maxRetryDelayMs = this.retryOptions.maxRetryDelay! * 1000;

return Math.min(waitTime, maxRetryDelayMs, maxAllowableDelayMs);
}
Expand All @@ -1147,16 +1110,6 @@ export class Upload extends Pumpify {
public isSuccessfulResponse(status: number): boolean {
return status >= 200 && status < 300;
}

/**
* Check if a given status code is 5xx
*
* @param status The status code to check
* @returns if the status is 5xx
*/
public isServerErrorResponse(status: number): boolean {
return status >= 500 && status < 600;
}
}

export function upload(cfg: UploadConfig) {
Expand Down
18 changes: 6 additions & 12 deletions src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,42 +205,37 @@ export const PROTOCOL_REGEX = /^(\w*):\/\//;
* Default behavior: Automatically retry retriable server errors.
*
* @const {boolean}
* @private
*/
const AUTO_RETRY_DEFAULT = true;
export const AUTO_RETRY_DEFAULT = true;

/**
* Default behavior: Only attempt to retry retriable errors 3 times.
*
* @const {number}
* @private
*/
const MAX_RETRY_DEFAULT = 3;
export const MAX_RETRY_DEFAULT = 3;

/**
* Default behavior: Wait twice as long as previous retry before retrying.
*
* @const {number}
* @private
*/
const RETRY_DELAY_MULTIPLIER_DEFAULT = 2;
export const RETRY_DELAY_MULTIPLIER_DEFAULT = 2;

/**
* Default behavior: If the operation doesn't succeed after 600 seconds,
* stop retrying.
*
* @const {number}
* @private
*/
const TOTAL_TIMEOUT_DEFAULT = 600;
export const TOTAL_TIMEOUT_DEFAULT = 600;

/**
* Default behavior: Wait no more than 64 seconds between retries.
*
* @const {number}
* @private
*/
const MAX_RETRY_DELAY_DEFAULT = 64;
export const MAX_RETRY_DELAY_DEFAULT = 64;

/**
* Default behavior: Retry conditionally idempotent operations if correct preconditions are set.
Expand All @@ -254,11 +249,10 @@ const IDEMPOTENCY_STRATEGY_DEFAULT = IdempotencyStrategy.RetryConditional;
* Returns true if the API request should be retried, given the error that was
* given the first time the request was attempted.
* @const
* @private
* @param {error} err - The API error to check if it is appropriate to retry.
* @return {boolean} True if the API request should be retried, false otherwise.
*/
const RETRYABLE_ERR_FN_DEFAULT = function (err?: ApiError) {
export const RETRYABLE_ERR_FN_DEFAULT = function (err?: ApiError) {
if (err) {
if ([408, 429, 500, 502, 503, 504].indexOf(err.code!) !== -1) {
return true;
Expand Down
Loading

0 comments on commit 1e72586

Please sign in to comment.