From ae83421e617a5761c75a7c8a15eaa1ea7c7fb1de Mon Sep 17 00:00:00 2001 From: Denis DelGrosso <85250797+ddelgrosso1@users.noreply.github.com> Date: Wed, 12 Jul 2023 11:10:39 -0400 Subject: [PATCH] feat: MPU for transfer manager (#2192) * feat: MPU for transfer manager * add tests for MPU upload * naming fix in tests * return full response from completeUpload * add comment about queueing mechanism * fix typo * add md5 validation to uploadFileInChunks --- .../performTransferManagerTest.ts | 62 +++- package.json | 1 + src/transfer-manager.ts | 331 +++++++++++++++++- test/transfer-manager.ts | 144 ++++++++ 4 files changed, 519 insertions(+), 19 deletions(-) diff --git a/internal-tooling/performTransferManagerTest.ts b/internal-tooling/performTransferManagerTest.ts index e1f2e5bf5..5094eea6d 100644 --- a/internal-tooling/performTransferManagerTest.ts +++ b/internal-tooling/performTransferManagerTest.ts @@ -51,7 +51,7 @@ const argv = yargs(process.argv.slice(2)) * to the parent thread. */ async function main() { - let result: TestResult | undefined = undefined; + let results: TestResult[] = []; ({bucket, transferManager} = await performanceTestSetup( argv.project!, @@ -60,18 +60,18 @@ async function main() { switch (argv.test_type) { case PERFORMANCE_TEST_TYPES.TRANSFER_MANAGER_UPLOAD_MANY_FILES: - result = await performUploadManyFilesTest(); + results = await performUploadManyFilesTest(); break; case PERFORMANCE_TEST_TYPES.TRANSFER_MANAGER_DOWNLOAD_MANY_FILES: - result = await performDownloadManyFilesTest(); + results = await performDownloadManyFilesTest(); break; case PERFORMANCE_TEST_TYPES.TRANSFER_MANAGER_CHUNKED_FILE_DOWNLOAD: - result = await performDownloadFileInChunksTest(); + results = await performChunkUploadDownloadTest(); break; default: break; } - parentPort?.postMessage(result); + parentPort?.postMessage(results); await performTestCleanup(); } @@ -87,7 +87,7 @@ async function performTestCleanup() { * * @returns {Promise} A promise that resolves containing information about the test results. */ -async function performUploadManyFilesTest(): Promise { +async function performUploadManyFilesTest(): Promise { const fileSizeRange = getLowHighFileSize(argv.object_size); const creationInfo = generateRandomDirectoryStructure( argv.num_objects, @@ -126,15 +126,15 @@ async function performUploadManyFilesTest(): Promise { bucketName: bucket.name, }; - return result; + return [result]; } /** * Performs a test where multiple objects are downloaded in parallel from a bucket. * - * @returns {Promise} A promise that resolves containing information about the test results. + * @returns {Promise} A promise that resolves containing information about the test results. */ -async function performDownloadManyFilesTest(): Promise { +async function performDownloadManyFilesTest(): Promise { const fileSizeRange = getLowHighFileSize(argv.object_size); const creationInfo = generateRandomDirectoryStructure( argv.num_objects, @@ -179,15 +179,17 @@ async function performDownloadManyFilesTest(): Promise { transferOffset: 0, bucketName: bucket.name, }; - return result; + + return [result]; } /** - * Performs a test where a large file is downloaded as chunks in parallel. + * Performs a test where a large file is uploaded and downloaded as chunks in parallel. * * @returns {Promise} A promise that resolves containing information about the test results. */ -async function performDownloadFileInChunksTest(): Promise { +async function performChunkUploadDownloadTest(): Promise { + const results: TestResult[] = []; const fileSizeRange = getLowHighFileSize(argv.object_size); const fileName = generateRandomFileName(TEST_NAME_STRING); const sizeInBytes = generateRandomFile( @@ -197,21 +199,46 @@ async function performDownloadFileInChunksTest(): Promise { __dirname ); const file = bucket.file(`${fileName}`); + let result: TestResult = { + op: 'WRITE', + objectSize: sizeInBytes, + appBufferSize: NODE_DEFAULT_HIGHWATER_MARK_BYTES, + crc32cEnabled: checkType === 'crc32c', + md5Enabled: false, + api: 'JSON', + elapsedTimeUs: -1, + cpuTimeUs: -1, + status: 'OK', + chunkSize: argv.range_read_size, + workers: argv.workers, + library: 'nodejs', + transferSize: sizeInBytes, + transferOffset: 0, + bucketName: bucket.name, + }; - await bucket.upload(`${__dirname}/${fileName}`); + let start = performance.now(); + await transferManager.uploadFileInChunks(`${__dirname}/${fileName}`, { + concurrencyLimit: argv.workers, + chunkSizeBytes: argv.range_read_size, + }); + let end = performance.now(); + result.elapsedTimeUs = Math.round((end - start) * 1000); + results.push(result); cleanupFile(fileName); - const start = performance.now(); + + start = performance.now(); await transferManager.downloadFileInChunks(file, { concurrencyLimit: argv.workers, chunkSizeBytes: argv.range_read_size, destination: path.join(__dirname, fileName), validation: checkType === 'crc32c' ? checkType : false, }); - const end = performance.now(); + end = performance.now(); cleanupFile(fileName); - const result: TestResult = { + result = { op: 'READ[0]', objectSize: sizeInBytes, appBufferSize: NODE_DEFAULT_HIGHWATER_MARK_BYTES, @@ -228,8 +255,9 @@ async function performDownloadFileInChunksTest(): Promise { transferOffset: 0, bucketName: bucket.name, }; + results.push(result); - return result; + return results; } main(); diff --git a/package.json b/package.json index bf51cd3fe..f210c0425 100644 --- a/package.json +++ b/package.json @@ -59,6 +59,7 @@ "duplexify": "^4.0.0", "ent": "^2.2.0", "extend": "^3.0.2", + "fast-xml-parser": "^4.2.2", "gaxios": "^5.0.0", "google-auth-library": "^8.0.1", "mime": "^3.0.0", diff --git a/src/transfer-manager.ts b/src/transfer-manager.ts index 58cd75c11..deab9082d 100644 --- a/src/transfer-manager.ts +++ b/src/transfer-manager.ts @@ -19,8 +19,14 @@ import {DownloadOptions, DownloadResponse, File} from './file'; import * as pLimit from 'p-limit'; import * as path from 'path'; import * as extend from 'extend'; -import {promises as fsp} from 'fs'; +import {createReadStream, promises as fsp} from 'fs'; import {CRC32C} from './crc32c'; +import {GoogleAuth} from 'google-auth-library'; +import {XMLParser, XMLBuilder} from 'fast-xml-parser'; +import * as retry from 'async-retry'; +import {ApiError} from './nodejs-common'; +import {GaxiosResponse, Headers} from 'gaxios'; +import {createHash} from 'crypto'; /** * Default number of concurrently executing promises to use when calling uploadManyFiles. @@ -47,6 +53,17 @@ const DOWNLOAD_IN_CHUNKS_FILE_SIZE_THRESHOLD = 32 * 1024 * 1024; * @experimental */ const DOWNLOAD_IN_CHUNKS_DEFAULT_CHUNK_SIZE = 10 * 1024 * 1024; +/** + * The chunk size in bytes to use when calling uploadFileInChunks. + * @experimental + */ +const UPLOAD_IN_CHUNKS_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024; +/** + * Default number of concurrently executing promises to use when calling uploadFileInChunks. + * @experimental + */ +const DEFAULT_PARALLEL_CHUNKED_UPLOAD_LIMIT = 2; + const EMPTY_REGEX = '(?:)'; export interface UploadManyFilesOptions { concurrencyLimit?: number; @@ -69,6 +86,220 @@ export interface DownloadFileInChunksOptions { validation?: 'crc32c' | false; } +export interface UploadFileInChunksOptions { + concurrencyLimit?: number; + chunkSizeBytes?: number; + uploadName?: string; + maxQueueSize?: number; + uploadId?: string; + partsMap?: Map; + validation?: 'md5' | false; +} + +export interface MultiPartUploadHelper { + bucket: Bucket; + fileName: string; + uploadId?: string; + partsMap?: Map; + initiateUpload(): Promise; + uploadPart( + partNumber: number, + chunk: Buffer, + validation?: 'md5' | false + ): Promise; + completeUpload(): Promise; +} + +export type MultiPartHelperGenerator = ( + bucket: Bucket, + fileName: string, + uploadId?: string, + partsMap?: Map +) => MultiPartUploadHelper; + +const defaultMultiPartGenerator: MultiPartHelperGenerator = ( + bucket, + fileName, + uploadId, + partsMap +) => { + return new XMLMultiPartUploadHelper(bucket, fileName, uploadId, partsMap); +}; + +export class MultiPartUploadError extends Error { + private uploadId: string; + private partsMap: Map; + + constructor( + message: string, + uploadId: string, + partsMap: Map + ) { + super(message); + this.uploadId = uploadId; + this.partsMap = partsMap; + } +} +/** + * Class representing an implementation of MPU in the XML API. This class is not meant for public usage. + * + * @private + * @experimental + */ +class XMLMultiPartUploadHelper implements MultiPartUploadHelper { + public partsMap; + public uploadId; + public bucket; + public fileName; + + private authClient; + private xmlParser; + private xmlBuilder; + private baseUrl; + private retryOptions; + + constructor( + bucket: Bucket, + fileName: string, + uploadId?: string, + partsMap?: Map + ) { + this.authClient = bucket.storage.authClient || new GoogleAuth(); + this.uploadId = uploadId || ''; + this.bucket = bucket; + this.fileName = fileName; + // eslint-disable-next-line prettier/prettier + this.baseUrl = `https://${bucket.name}.${new URL(this.bucket.storage.apiEndpoint).hostname}/${fileName}`; + this.xmlBuilder = new XMLBuilder({arrayNodeName: 'Part'}); + this.xmlParser = new XMLParser(); + this.partsMap = partsMap || new Map(); + this.retryOptions = { + retries: this.bucket.storage.retryOptions.maxRetries, + factor: this.bucket.storage.retryOptions.retryDelayMultiplier, + maxTimeout: this.bucket.storage.retryOptions.maxRetryDelay! * 1000, + maxRetryTime: this.bucket.storage.retryOptions.totalTimeout! * 1000, + }; + } + + /** + * Initiates a multipart upload (MPU) to the XML API and stores the resultant upload id. + * + * @returns {Promise} + */ + async initiateUpload(): Promise { + const url = `${this.baseUrl}?uploads`; + return retry(async bail => { + try { + const res = await this.authClient.request({ + method: 'POST', + url, + }); + if (res.data && res.data.error) { + throw res.data.error; + } + const parsedXML = this.xmlParser.parse(res.data); + this.uploadId = parsedXML.InitiateMultipartUploadResult.UploadId; + } catch (e) { + this.#handleErrorResponse(e as Error, bail); + } + }, this.retryOptions); + } + + /** + * Uploads the provided chunk of data to the XML API using the previously created upload id. + * + * @param {number} partNumber the sequence number of this chunk. + * @param {Buffer} chunk the chunk of data to be uploaded. + * @param {string | false} validation whether or not to include the md5 hash in the headers to cause the server + * to validate the chunk was not corrupted. + * @returns {Promise} + */ + async uploadPart( + partNumber: number, + chunk: Buffer, + validation?: 'md5' | false + ): Promise { + const url = `${this.baseUrl}?partNumber=${partNumber}&uploadId=${this.uploadId}`; + let headers: Headers = {}; + + if (validation === 'md5') { + const hash = createHash('md5').update(chunk).digest('base64'); + headers = { + 'Content-MD5': hash, + }; + } + + return retry(async bail => { + try { + const res = await this.authClient.request({ + url, + method: 'PUT', + body: chunk, + headers, + }); + if (res.data && res.data.error) { + throw res.data.error; + } + this.partsMap.set(partNumber, res.headers['etag']); + } catch (e) { + this.#handleErrorResponse(e as Error, bail); + } + }, this.retryOptions); + } + + /** + * Sends the final request of the MPU to tell GCS the upload is now complete. + * + * @returns {Promise} + */ + async completeUpload(): Promise { + const url = `${this.baseUrl}?uploadId=${this.uploadId}`; + const sortedMap = new Map( + [...this.partsMap.entries()].sort((a, b) => a[0] - b[0]) + ); + const parts = []; + for (const entry of sortedMap.entries()) { + parts.push({PartNumber: entry[0], ETag: entry[1]}); + } + const body = `${this.xmlBuilder.build( + parts + )}`; + return retry(async bail => { + try { + const res = await this.authClient.request({ + url, + method: 'POST', + body, + }); + if (res.data && res.data.error) { + throw res.data.error; + } + return res; + } catch (e) { + this.#handleErrorResponse(e as Error, bail); + return; + } + }, this.retryOptions); + } + + /** + * Handles error responses and calls the bail function if the error should not be retried. + * + * @param {Error} err the thrown error + * @param {Function} bail if the error can not be retried, the function to be called. + */ + #handleErrorResponse(err: Error, bail: Function) { + if ( + this.bucket.storage.retryOptions.autoRetry && + this.bucket.storage.retryOptions.retryableErrorFn!(err as ApiError) + ) { + throw err; + } else { + bail(err as Error); + } + } +} + /** * Create a TransferManager object to perform parallel transfer operations on a Cloud Storage bucket. * @@ -300,7 +531,7 @@ export class TransferManager { * //- * // Download a large file in chunks utilizing parallel operations. * //- - * const response = await transferManager.downloadLargeFile(bucket.file('large-file.txt'); + * const response = await transferManager.downloadFileInChunks(bucket.file('large-file.txt'); * // Your local directory now contains: * // - "large-file.txt" (with the contents from my-bucket.large-file.txt) * ``` @@ -369,6 +600,102 @@ export class TransferManager { }); } + /** + * @typedef {object} UploadFileInChunksOptions + * @property {number} [concurrencyLimit] The number of concurrently executing promises + * to use when uploading the file. + * @property {number} [chunkSizeBytes] The size in bytes of each chunk to be uploaded. + * @property {string} [uploadName] Name of the file when saving to GCS. If ommitted the name is taken from the file path. + * @property {number} [maxQueueSize] The number of chunks to be uploaded to hold in memory concurrently. If not specified + * defaults to the specified concurrency limit. + * @property {string} [uploadId] If specified attempts to resume a previous upload. + * @property {Map} [partsMap] If specified alongside uploadId, attempts to resume a previous upload from the last chunk + * specified in partsMap + * @experimental + */ + /** + * Upload a large file in chunks utilizing parallel upload opertions. If the upload fails, an uploadId and + * map containing all the successfully uploaded parts will be returned to the caller. These arguments can be used to + * resume the upload. + * + * @param {string} [filePath] The path of the file to be uploaded + * @param {UploadFileInChunksOptions} [options] Configuration options. + * @param {MultiPartHelperGenerator} [generator] A function that will return a type that implements the MPU interface. Most users will not need to use this. + * @returns {Promise} If successful a promise resolving to void, otherwise a error containing the message, uploadid, and parts map. + * + * @example + * ``` + * const {Storage} = require('@google-cloud/storage'); + * const storage = new Storage(); + * const bucket = storage.bucket('my-bucket'); + * const transferManager = new TransferManager(bucket); + * + * //- + * // Upload a large file in chunks utilizing parallel operations. + * //- + * const response = await transferManager.uploadFileInChunks('large-file.txt'); + * // Your bucket now contains: + * // - "large-file.txt" + * ``` + * + * @experimental + */ + async uploadFileInChunks( + filePath: string, + options: UploadFileInChunksOptions = {}, + generator: MultiPartHelperGenerator = defaultMultiPartGenerator + ): Promise { + const chunkSize = + options.chunkSizeBytes || UPLOAD_IN_CHUNKS_DEFAULT_CHUNK_SIZE; + const limit = pLimit( + options.concurrencyLimit || DEFAULT_PARALLEL_CHUNKED_UPLOAD_LIMIT + ); + const maxQueueSize = + options.maxQueueSize || + options.concurrencyLimit || + DEFAULT_PARALLEL_CHUNKED_UPLOAD_LIMIT; + const fileName = options.uploadName || path.basename(filePath); + const mpuHelper = generator( + this.bucket, + fileName, + options.uploadId, + options.partsMap + ); + let partNumber = 1; + let promises = []; + try { + if (options.uploadId === undefined) { + await mpuHelper.initiateUpload(); + } + const startOrResumptionByte = mpuHelper.partsMap!.size * chunkSize; + const readStream = createReadStream(filePath, { + highWaterMark: chunkSize, + start: startOrResumptionByte, + }); + // p-limit only limits the number of running promises. We do not want to hold an entire + // large file in memory at once so promises acts a queue that will hold only maxQueueSize in memory. + for await (const curChunk of readStream) { + if (promises.length >= maxQueueSize) { + await Promise.all(promises); + promises = []; + } + promises.push( + limit(() => + mpuHelper.uploadPart(partNumber++, curChunk, options.validation) + ) + ); + } + await Promise.all(promises); + return await mpuHelper.completeUpload(); + } catch (e) { + throw new MultiPartUploadError( + (e as Error).message, + mpuHelper.uploadId!, + mpuHelper.partsMap! + ); + } + } + private async *getPathsFromDirectory( directory: string ): AsyncGenerator { diff --git a/test/transfer-manager.ts b/test/transfer-manager.ts index be9d56cd2..e713d000c 100644 --- a/test/transfer-manager.ts +++ b/test/transfer-manager.ts @@ -25,6 +25,9 @@ import { DownloadOptions, FileOptions, IdempotencyStrategy, + MultiPartHelperGenerator, + MultiPartUploadError, + MultiPartUploadHelper, UploadOptions, } from '../src'; import * as assert from 'assert'; @@ -32,6 +35,8 @@ import * as path from 'path'; import * as stream from 'stream'; import * as extend from 'extend'; import * as fs from 'fs'; +import * as sinon from 'sinon'; +import {GaxiosResponse} from 'gaxios'; const fakeUtil = Object.assign({}, util); fakeUtil.noop = util.noop; @@ -92,6 +97,10 @@ class HTTPError extends Error { let pLimitOverride: Function | null; const fakePLimit = (limit: number) => (pLimitOverride || pLimit)(limit); const fakeFs = extend(true, {}, fs, { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + createReadStream(path: string, _options: {}): stream.Stream { + return new stream.PassThrough(); + }, get promises() { return { open: () => { @@ -122,6 +131,7 @@ describe('Transfer Manager', () => { const STORAGE: any = { createBucket: util.noop, + apiEndpoint: 'https://test.notvalid.com', retryOptions: { autoRetry: true, maxRetries: 3, @@ -305,4 +315,138 @@ describe('Transfer Manager', () => { assert.strictEqual(callCount, 1); }); }); + + describe('uploadFileInChunks', () => { + let mockGeneratorFunction: MultiPartHelperGenerator; + let fakeHelper: sinon.SinonStubbedInstance; + let sandbox: sinon.SinonSandbox; + let readStreamStub: sinon.SinonStub; + const path = '/a/b/c.txt'; + const pThrough = new stream.PassThrough(); + class FakeXMLHelper implements MultiPartUploadHelper { + bucket: Bucket; + fileName: string; + uploadId?: string | undefined; + partsMap?: Map | undefined; + constructor(bucket: Bucket, fileName: string) { + this.bucket = bucket; + this.fileName = fileName; + } + initiateUpload(): Promise { + throw new Error('Method not implemented.'); + } + // eslint-disable-next-line @typescript-eslint/no-unused-vars + uploadPart(partNumber: number, chunk: Buffer): Promise { + throw new Error('Method not implemented.'); + } + completeUpload(): Promise { + throw new Error('Method not implemented.'); + } + } + + beforeEach(() => { + sandbox = sinon.createSandbox(); + readStreamStub = sandbox + .stub(fakeFs, 'createReadStream') + .returns(pThrough); + mockGeneratorFunction = (bucket, fileName, uploadId, partsMap) => { + fakeHelper = sandbox.createStubInstance(FakeXMLHelper); + fakeHelper.uploadId = uploadId || ''; + fakeHelper.partsMap = partsMap || new Map(); + fakeHelper.initiateUpload.resolves(); + fakeHelper.uploadPart.resolves(); + fakeHelper.completeUpload.resolves(); + return fakeHelper; + }; + }); + + afterEach(() => { + sandbox.restore(); + }); + + it('should call initiateUpload, uploadPart, and completeUpload', async () => { + process.nextTick(() => { + pThrough.push('hello world'); + pThrough.end(); + }); + await transferManager.uploadFileInChunks(path, {}, mockGeneratorFunction); + assert.strictEqual(fakeHelper.initiateUpload.calledOnce, true); + assert.strictEqual(fakeHelper.uploadPart.calledOnce, true); + assert.strictEqual(fakeHelper.completeUpload.calledOnce, true); + }); + + it('should call createReadStream with a highWaterMark equal to chunkSize', async () => { + const options = {highWaterMark: 32 * 1024 * 1024, start: 0}; + + await transferManager.uploadFileInChunks( + path, + { + chunkSizeBytes: 32 * 1024 * 1024, + }, + mockGeneratorFunction + ); + + assert.strictEqual(readStreamStub.calledOnceWith(path, options), true); + }); + + it('should set the correct start offset when called with an existing parts map', async () => { + const options = { + highWaterMark: 32 * 1024 * 1024, + start: 64 * 1024 * 1024, + }; + + await transferManager.uploadFileInChunks( + path, + { + uploadId: '123', + partsMap: new Map([ + [1, '123'], + [2, '321'], + ]), + chunkSizeBytes: 32 * 1024 * 1024, + }, + mockGeneratorFunction + ); + + assert.strictEqual(readStreamStub.calledOnceWith(path, options), true); + }); + + it('should not call initiateUpload if an uploadId is provided', async () => { + await transferManager.uploadFileInChunks( + path, + { + uploadId: '123', + partsMap: new Map([ + [1, '123'], + [2, '321'], + ]), + }, + mockGeneratorFunction + ); + + assert.strictEqual(fakeHelper.uploadId, '123'); + assert.strictEqual(fakeHelper.initiateUpload.notCalled, true); + }); + + it('should reject with an error with empty uploadId and partsMap', async () => { + const expectedErr = new MultiPartUploadError( + 'Hello World', + '', + new Map() + ); + mockGeneratorFunction = (bucket, fileName, uploadId, partsMap) => { + fakeHelper = sandbox.createStubInstance(FakeXMLHelper); + fakeHelper.uploadId = uploadId || ''; + fakeHelper.partsMap = partsMap || new Map(); + fakeHelper.initiateUpload.rejects(new Error(expectedErr.message)); + fakeHelper.uploadPart.resolves(); + fakeHelper.completeUpload.resolves(); + return fakeHelper; + }; + assert.rejects( + transferManager.uploadFileInChunks(path, {}, mockGeneratorFunction), + expectedErr + ); + }); + }); });