From 7f58f30588735d129fda0503e1daec5f605f8447 Mon Sep 17 00:00:00 2001 From: Denis DelGrosso <85250797+ddelgrosso1@users.noreply.github.com> Date: Thu, 21 Sep 2023 13:48:52 -0400 Subject: [PATCH] feat: add headers option to MPU (#2303) * feat: add headers option to MPU * remove proxyquire from transfer manager tests * add abort option to MPU * turn auto abort on by default and add option to disable * change wording in docs --- src/transfer-manager.ts | 53 +++++- test/transfer-manager.ts | 340 ++++++++++++++++++--------------------- 2 files changed, 206 insertions(+), 187 deletions(-) diff --git a/src/transfer-manager.ts b/src/transfer-manager.ts index d1d7dcd7b..af0a2d3ae 100644 --- a/src/transfer-manager.ts +++ b/src/transfer-manager.ts @@ -91,8 +91,10 @@ export interface UploadFileInChunksOptions { uploadName?: string; maxQueueSize?: number; uploadId?: string; + autoAbortFailure?: boolean; partsMap?: Map; validation?: 'md5' | false; + headers?: {[key: string]: string}; } export interface MultiPartUploadHelper { @@ -100,13 +102,14 @@ export interface MultiPartUploadHelper { fileName: string; uploadId?: string; partsMap?: Map; - initiateUpload(): Promise; + initiateUpload(headers?: {[key: string]: string}): Promise; uploadPart( partNumber: number, chunk: Buffer, validation?: 'md5' | false ): Promise; completeUpload(): Promise; + abortUpload(): Promise; } export type MultiPartHelperGenerator = ( @@ -185,13 +188,14 @@ class XMLMultiPartUploadHelper implements MultiPartUploadHelper { * * @returns {Promise} */ - async initiateUpload(): Promise { + async initiateUpload(headers: {[key: string]: string} = {}): Promise { const url = `${this.baseUrl}?uploads`; return retry(async bail => { try { const res = await this.authClient.request({ method: 'POST', url, + headers, }); if (res.data && res.data.error) { throw res.data.error; @@ -285,6 +289,30 @@ class XMLMultiPartUploadHelper implements MultiPartUploadHelper { }, this.retryOptions); } + /** + * Aborts an multipart upload that is in progress. Once aborted, any parts in the process of being uploaded fail, + * and future requests using the upload ID fail. + * + * @returns {Promise} + */ + async abortUpload(): Promise { + const url = `${this.baseUrl}?uploadId=${this.uploadId}`; + return retry(async bail => { + try { + const res = await this.authClient.request({ + url, + method: 'DELETE', + }); + if (res.data && res.data.error) { + throw res.data.error; + } + } catch (e) { + this.#handleErrorResponse(e as Error, bail); + return; + } + }, this.retryOptions); + } + /** * Handles error responses and calls the bail function if the error should not be retried. * @@ -615,6 +643,10 @@ export class TransferManager { * @property {string} [uploadId] If specified attempts to resume a previous upload. * @property {Map} [partsMap] If specified alongside uploadId, attempts to resume a previous upload from the last chunk * specified in partsMap + * @property {object} [headers] headers to be sent when initiating the multipart upload. + * See {@link https://cloud.google.com/storage/docs/xml-api/post-object-multipart#request_headers| Request Headers: Initiate a Multipart Upload} + * @property {boolean} [autoAbortFailure] boolean to indicate if an in progress upload session will be automatically aborted upon failure. If not set, + * failures will be automatically aborted. * @experimental */ /** @@ -669,7 +701,7 @@ export class TransferManager { let promises: Promise[] = []; try { if (options.uploadId === undefined) { - await mpuHelper.initiateUpload(); + await mpuHelper.initiateUpload(options.headers); } const startOrResumptionByte = mpuHelper.partsMap!.size * chunkSize; const readStream = createReadStream(filePath, { @@ -692,6 +724,21 @@ export class TransferManager { await Promise.all(promises); return await mpuHelper.completeUpload(); } catch (e) { + if ( + (options.autoAbortFailure === undefined || options.autoAbortFailure) && + mpuHelper.uploadId + ) { + try { + await mpuHelper.abortUpload(); + return; + } catch (e) { + throw new MultiPartUploadError( + (e as Error).message, + mpuHelper.uploadId!, + mpuHelper.partsMap! + ); + } + } throw new MultiPartUploadError( (e as Error).message, mpuHelper.uploadId!, diff --git a/test/transfer-manager.ts b/test/transfer-manager.ts index ab9efb4c4..9eb2d7877 100644 --- a/test/transfer-manager.ts +++ b/test/transfer-manager.ts @@ -15,169 +15,48 @@ */ /* eslint-disable @typescript-eslint/no-explicit-any */ -import { - BaseMetadata, - ServiceObject, - ServiceObjectConfig, - util, -} from '../src/nodejs-common'; -import * as pLimit from 'p-limit'; -import * as proxyquire from 'proxyquire'; +import {ApiError} from '../src/nodejs-common'; import { Bucket, + File, CRC32C, - CreateWriteStreamOptions, DownloadOptions, - FileOptions, IdempotencyStrategy, MultiPartHelperGenerator, MultiPartUploadError, MultiPartUploadHelper, UploadOptions, + TransferManager, + Storage, } from '../src'; import * as assert from 'assert'; import * as path from 'path'; import * as stream from 'stream'; import * as fs from 'fs'; +import * as fsp from 'fs/promises'; import * as sinon from 'sinon'; import {GaxiosResponse} from 'gaxios'; -import {FileMetadata} from '../src/file'; - -const fakeUtil = Object.assign({}, util); -fakeUtil.noop = util.noop; - -class FakeServiceObject extends ServiceObject { - calledWith_: IArguments; - constructor(config: ServiceObjectConfig) { - super(config); - // eslint-disable-next-line prefer-rest-params - this.calledWith_ = arguments; - } -} - -class FakeAcl { - calledWith_: Array<{}>; - constructor(...args: Array<{}>) { - this.calledWith_ = args; - } -} - -class FakeFile { - calledWith_: IArguments; - bucket: Bucket; - name: string; - options: FileOptions; - metadata: FileMetadata; - createWriteStream: Function; - isSameFile = () => false; - constructor(bucket: Bucket, name: string, options?: FileOptions) { - // eslint-disable-next-line prefer-rest-params - this.calledWith_ = arguments; - this.bucket = bucket; - this.name = name; - this.options = options || {}; - this.metadata = {}; - - this.createWriteStream = (options: CreateWriteStreamOptions) => { - this.metadata = options.metadata!; - const ws = new stream.Writable(); - ws.write = () => { - ws.emit('complete'); - ws.end(); - return true; - }; - return ws; - }; - } -} - -class HTTPError extends Error { - code: number; - constructor(message: string, code: number) { - super(message); - this.code = code; - } -} - -let pLimitOverride: Function | null; -const fakePLimit = (limit: number) => (pLimitOverride || pLimit)(limit); -const fakeFs = { - ...fs, - get promises() { - return { - open: () => { - return { - close: () => {}, - write: (buffer: Buffer) => { - return Promise.resolve({buffer}); - }, - }; - }, - lstat: () => { - return { - isDirectory: () => { - return false; - }, - }; - }, - }; - }, -}; describe('Transfer Manager', () => { - let TransferManager: any; - let transferManager: any; - let Bucket: any; - let bucket: any; - let File: any; - - const STORAGE: any = { - createBucket: util.noop, - apiEndpoint: 'https://test.notvalid.com', - retryOptions: { - autoRetry: true, - maxRetries: 3, - retryDelayMultipier: 2, - totalTimeout: 600, - maxRetryDelay: 60, - retryableErrorFn: (err: HTTPError) => { - return err.code === 500; - }, - idempotencyStrategy: IdempotencyStrategy.RetryConditional, + const BUCKET_NAME = 'test-bucket'; + const STORAGE = sinon.createStubInstance(Storage); + STORAGE.retryOptions = { + autoRetry: true, + maxRetries: 3, + retryDelayMultiplier: 2, + totalTimeout: 600, + maxRetryDelay: 60, + retryableErrorFn: (err: ApiError) => { + return err.code === 500; }, - crc32cGenerator: () => new CRC32C(), + idempotencyStrategy: IdempotencyStrategy.RetryConditional, }; - const BUCKET_NAME = 'test-bucket'; + let sandbox: sinon.SinonSandbox; + let transferManager: TransferManager; + let bucket: Bucket; before(() => { - Bucket = proxyquire('../src/bucket.js', { - 'p-limit': fakePLimit, - './nodejs-common': { - ServiceObject: FakeServiceObject, - util: fakeUtil, - }, - './acl.js': {Acl: FakeAcl}, - './file.js': {File: FakeFile}, - }).Bucket; - - File = proxyquire('../src/file.js', { - './nodejs-common': { - ServiceObject: FakeServiceObject, - util: fakeUtil, - }, - }).File; - - TransferManager = proxyquire('../src/transfer-manager.js', { - 'p-limit': fakePLimit, - './nodejs-common': { - ServiceObject: FakeServiceObject, - util: fakeUtil, - }, - './acl.js': {Acl: FakeAcl}, - './file.js': {File: FakeFile}, - fs: fakeFs, - fsp: fakeFs, - }).TransferManager; + sandbox = sinon.createSandbox(); }); beforeEach(() => { @@ -185,6 +64,10 @@ describe('Transfer Manager', () => { transferManager = new TransferManager(bucket); }); + afterEach(() => { + sandbox.restore(); + }); + describe('instantiation', () => { it('should correctly set the bucket', () => { assert.strictEqual(transferManager.bucket, bucket); @@ -192,14 +75,21 @@ describe('Transfer Manager', () => { }); describe('uploadManyFiles', () => { + beforeEach(() => { + sandbox.stub(fsp, 'lstat').resolves({ + isDirectory: () => { + return false; + }, + } as fs.Stats); + }); + it('calls upload with the provided file paths', async () => { const paths = ['/a/b/c', '/d/e/f', '/h/i/j']; let count = 0; - - bucket.upload = (path: string) => { + sandbox.stub(bucket, 'upload').callsFake(path => { count++; assert(paths.includes(path)); - }; + }); await transferManager.uploadManyFiles(paths); assert.strictEqual(count, paths.length); @@ -207,10 +97,12 @@ describe('Transfer Manager', () => { it('sets ifGenerationMatch to 0 if skipIfExists is set', async () => { const paths = ['/a/b/c']; - - bucket.upload = (_path: string, options: UploadOptions) => { - assert.strictEqual(options.preconditionOpts?.ifGenerationMatch, 0); - }; + sandbox.stub(bucket, 'upload').callsFake((path, options) => { + assert.strictEqual( + (options as UploadOptions).preconditionOpts?.ifGenerationMatch, + 0 + ); + }); await transferManager.uploadManyFiles(paths, {skipIfExists: true}); }); @@ -218,16 +110,23 @@ describe('Transfer Manager', () => { it('sets destination to prefix + filename when prefix is supplied', async () => { const paths = ['/a/b/foo/bar.txt']; const expectedDestination = path.normalize('hello/world/a/b/foo/bar.txt'); - - bucket.upload = (_path: string, options: UploadOptions) => { - assert.strictEqual(options.destination, expectedDestination); - }; + sandbox.stub(bucket, 'upload').callsFake((path, options) => { + assert.strictEqual( + (options as UploadOptions).destination, + expectedDestination + ); + }); await transferManager.uploadManyFiles(paths, {prefix: 'hello/world'}); }); it('returns a promise with the uploaded file if there is no callback', async () => { const paths = [path.join(__dirname, '../../test/testdata/testfile.json')]; + sandbox.stub(bucket, 'upload').callsFake(() => { + const resp = [{name: paths[0]}]; + return Promise.resolve(resp); + }); + const result = await transferManager.uploadManyFiles(paths); assert.strictEqual(result[0][0].name, paths[0]); }); @@ -236,13 +135,14 @@ describe('Transfer Manager', () => { describe('downloadManyFiles', () => { it('calls download for each provided file', async () => { let count = 0; - const download = () => { - count++; - }; const firstFile = new File(bucket, 'first.txt'); - firstFile.download = download; + sandbox.stub(firstFile, 'download').callsFake(() => { + count++; + }); const secondFile = new File(bucket, 'second.txt'); - secondFile.download = download; + sandbox.stub(secondFile, 'download').callsFake(() => { + count++; + }); const files = [firstFile, secondFile]; await transferManager.downloadManyFiles(files); @@ -253,12 +153,14 @@ describe('Transfer Manager', () => { const prefix = 'test-prefix'; const filename = 'first.txt'; const expectedDestination = path.normalize(`${prefix}/${filename}`); - const download = (options: DownloadOptions) => { - assert.strictEqual(options.destination, expectedDestination); - }; const file = new File(bucket, filename); - file.download = download; + sandbox.stub(file, 'download').callsFake(options => { + assert.strictEqual( + (options as DownloadOptions).destination, + expectedDestination + ); + }); await transferManager.downloadManyFiles([file], {prefix}); }); @@ -266,38 +168,43 @@ describe('Transfer Manager', () => { const stripPrefix = 'should-be-removed/'; const filename = 'should-be-removed/first.txt'; const expectedDestination = 'first.txt'; - const download = (options: DownloadOptions) => { - assert.strictEqual(options.destination, expectedDestination); - }; const file = new File(bucket, filename); - file.download = download; + sandbox.stub(file, 'download').callsFake(options => { + assert.strictEqual( + (options as DownloadOptions).destination, + expectedDestination + ); + }); await transferManager.downloadManyFiles([file], {stripPrefix}); }); }); describe('downloadFileInChunks', () => { - let file: any; + let file: File; beforeEach(() => { + sandbox.stub(fsp, 'open').resolves({ + close: () => Promise.resolve(), + write: (buffer: any) => Promise.resolve({buffer}), + } as fsp.FileHandle); + file = new File(bucket, 'some-large-file'); - file.get = () => { - return [ - { - metadata: { - size: 1024, - }, + sandbox.stub(file, 'get').resolves([ + { + metadata: { + size: 1024, }, - ]; - }; + }, + ]); }); it('should download a single chunk if file size is below threshold', async () => { let downloadCallCount = 0; - file.download = () => { + sandbox.stub(file, 'download').callsFake(() => { downloadCallCount++; return Promise.resolve([Buffer.alloc(100)]); - }; + }); await transferManager.downloadFileInChunks(file); assert.strictEqual(downloadCallCount, 1); @@ -321,7 +228,6 @@ describe('Transfer Manager', () => { describe('uploadFileInChunks', () => { let mockGeneratorFunction: MultiPartHelperGenerator; let fakeHelper: sinon.SinonStubbedInstance; - let sandbox: sinon.SinonSandbox; let readStreamStub: sinon.SinonStub; const path = '/a/b/c.txt'; const pThrough = new stream.PassThrough(); @@ -344,12 +250,14 @@ describe('Transfer Manager', () => { completeUpload(): Promise { throw new Error('Method not implemented.'); } + abortUpload(): Promise { + throw new Error('Method not implemented.'); + } } beforeEach(() => { - sandbox = sinon.createSandbox(); readStreamStub = sandbox - .stub(fakeFs, 'createReadStream') + .stub(fs, 'createReadStream') .returns(pThrough as unknown as fs.ReadStream); mockGeneratorFunction = (bucket, fileName, uploadId, partsMap) => { fakeHelper = sandbox.createStubInstance(FakeXMLHelper); @@ -358,14 +266,11 @@ describe('Transfer Manager', () => { fakeHelper.initiateUpload.resolves(); fakeHelper.uploadPart.resolves(); fakeHelper.completeUpload.resolves(); + fakeHelper.abortUpload.resolves(); return fakeHelper; }; }); - afterEach(() => { - sandbox.restore(); - }); - it('should call initiateUpload, uploadPart, and completeUpload', async () => { process.nextTick(() => { pThrough.push('hello world'); @@ -443,12 +348,79 @@ describe('Transfer Manager', () => { fakeHelper.initiateUpload.rejects(new Error(expectedErr.message)); fakeHelper.uploadPart.resolves(); fakeHelper.completeUpload.resolves(); + fakeHelper.abortUpload.resolves(); return fakeHelper; }; assert.rejects( - transferManager.uploadFileInChunks(path, {}, mockGeneratorFunction), + transferManager.uploadFileInChunks( + path, + {autoAbortFailure: false}, + mockGeneratorFunction + ), expectedErr ); }); + + it('should pass through headers to initiateUpload', async () => { + const headersToAdd = { + 'Content-Type': 'foo/bar', + 'x-goog-meta-foo': 'foobar', + }; + + mockGeneratorFunction = (bucket, fileName, uploadId, partsMap) => { + fakeHelper = sandbox.createStubInstance(FakeXMLHelper); + fakeHelper.uploadId = uploadId || ''; + fakeHelper.partsMap = partsMap || new Map(); + fakeHelper.initiateUpload.callsFake(headers => { + assert.deepStrictEqual(headers, headersToAdd); + return Promise.resolve(); + }); + fakeHelper.uploadPart.resolves(); + fakeHelper.completeUpload.resolves(); + fakeHelper.abortUpload.resolves(); + return fakeHelper; + }; + + await transferManager.uploadFileInChunks( + path, + {headers: headersToAdd}, + mockGeneratorFunction + ); + }); + + it('should call abortUpload when a failure occurs after an uploadID is established', async () => { + const expectedErr = new MultiPartUploadError( + 'Hello World', + '', + new Map() + ); + const fakeId = '123'; + + mockGeneratorFunction = (bucket, fileName, uploadId, partsMap) => { + fakeHelper = sandbox.createStubInstance(FakeXMLHelper); + fakeHelper.uploadId = uploadId || ''; + fakeHelper.partsMap = partsMap || new Map(); + fakeHelper.initiateUpload.resolves(); + fakeHelper.uploadPart.callsFake(() => { + fakeHelper.uploadId = fakeId; + return Promise.reject(expectedErr); + }); + fakeHelper.completeUpload.resolves(); + fakeHelper.abortUpload.callsFake(() => { + assert.strictEqual(fakeHelper.uploadId, fakeId); + return Promise.resolve(); + }); + return fakeHelper; + }; + + process.nextTick(() => { + pThrough.push('hello world'); + pThrough.end(); + }); + + assert.doesNotThrow(() => + transferManager.uploadFileInChunks(path, {}, mockGeneratorFunction) + ); + }); }); });