diff --git a/package.json b/package.json index 74611d88e..0496fd489 100644 --- a/package.json +++ b/package.json @@ -53,28 +53,32 @@ "@google-cloud/common": "^3.8.1", "@google-cloud/paginator": "^3.0.0", "@google-cloud/promisify": "^2.0.0", + "abort-controller": "^3.0.0", "arrify": "^2.0.0", - "async-retry": "^1.3.1", + "async-retry": "^1.3.3", "compressible": "^2.0.12", + "configstore": "^5.0.0", "date-and-time": "^2.0.0", "duplexify": "^4.0.0", "extend": "^3.0.2", - "gcs-resumable-upload": "^3.6.0", + "gaxios": "^4.0.0", "get-stream": "^6.0.0", + "google-auth-library": "^7.0.0", "hash-stream-validation": "^0.2.2", "mime": "^3.0.0", "mime-types": "^2.0.8", "p-limit": "^3.0.1", "pumpify": "^2.0.0", "snakeize": "^0.1.0", - "stream-events": "^1.0.1", + "stream-events": "^1.0.4", "xdg-basedir": "^4.0.0" }, "devDependencies": { + "@compodoc/compodoc": "^1.1.7", "@google-cloud/pubsub": "^2.0.0", "@grpc/grpc-js": "^1.0.3", "@grpc/proto-loader": "^0.6.0", - "@types/async-retry": "^1.4.2", + "@types/async-retry": "^1.4.3", "@types/compressible": "^2.0.0", "@types/concat-stream": "^1.6.0", "@types/configstore": "^5.0.0", @@ -83,6 +87,7 @@ "@types/mime": "^2.0.0", "@types/mime-types": "^2.1.0", "@types/mocha": "^8.0.0", + "@types/mockery": "^1.4.29", "@types/node": "^16.0.0", "@types/node-fetch": "^2.1.3", "@types/proxyquire": "^1.3.28", @@ -99,6 +104,7 @@ "jsdoc-region-tag": "^1.0.2", "linkinator": "^2.0.0", "mocha": "^8.0.0", + "mockery": "^2.1.0", "nock": "~13.2.0", "node-fetch": "^2.2.0", "proxyquire": "^2.1.3", diff --git a/src/file.ts b/src/file.ts index dc6c95812..f05924a9a 100644 --- a/src/file.ts +++ b/src/file.ts @@ -35,7 +35,7 @@ import * as mime from 'mime'; import * as os from 'os'; // eslint-disable-next-line @typescript-eslint/no-var-requires const pumpify = require('pumpify'); -import * as resumableUpload from 'gcs-resumable-upload'; +import * as resumableUpload from './gcs-resumable-upload'; import {Duplex, Writable, Readable, PassThrough} from 'stream'; import * as streamEvents from 'stream-events'; import * as xdgBasedir from 'xdg-basedir'; diff --git a/src/gcs-resumable-upload/index.ts b/src/gcs-resumable-upload/index.ts new file mode 100644 index 000000000..4da2dbf40 --- /dev/null +++ b/src/gcs-resumable-upload/index.ts @@ -0,0 +1,1177 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import AbortController from 'abort-controller'; +import * as ConfigStore from 'configstore'; +import {createHash} from 'crypto'; +import * as extend from 'extend'; +import { + GaxiosOptions, + GaxiosPromise, + GaxiosResponse, + GaxiosError, +} from 'gaxios'; +import * as gaxios from 'gaxios'; +import {GoogleAuth, GoogleAuthOptions} from 'google-auth-library'; +import * as Pumpify from 'pumpify'; +import {Duplex, PassThrough, Readable} from 'stream'; +import * as streamEvents from 'stream-events'; +import retry = require('async-retry'); + +const NOT_FOUND_STATUS_CODE = 404; +const TERMINATED_UPLOAD_STATUS_CODE = 410; +const RESUMABLE_INCOMPLETE_STATUS_CODE = 308; +const RETRY_LIMIT = 5; +const DEFAULT_API_ENDPOINT_REGEX = /.*\.googleapis\.com/; +const MAX_RETRY_DELAY = 64; +const RETRY_DELAY_MULTIPLIER = 2; +const MAX_TOTAL_RETRY_TIMEOUT = 600; +const AUTO_RETRY_VALUE = true; + +export const PROTOCOL_REGEX = /^(\w*):\/\//; + +export interface ErrorWithCode extends Error { + code: number; +} + +export type CreateUriCallback = (err: Error | null, uri?: string) => void; + +export interface Encryption { + key: {}; + hash: {}; +} + +export type PredefinedAcl = + | 'authenticatedRead' + | 'bucketOwnerFullControl' + | 'bucketOwnerRead' + | 'private' + | 'projectPrivate' + | 'publicRead'; + +export interface QueryParameters { + contentEncoding?: string; + ifGenerationMatch?: number; + ifGenerationNotMatch?: number; + ifMetagenerationMatch?: number; + ifMetagenerationNotMatch?: number; + kmsKeyName?: string; + predefinedAcl?: PredefinedAcl; + projection?: 'full' | 'noAcl'; + userProject?: string; +} + +export interface UploadConfig { + /** + * The API endpoint used for the request. + * Defaults to `storage.googleapis.com`. + * **Warning**: + * If this value does not match the pattern *.googleapis.com, + * an emulator context will be assumed and authentication will be bypassed. + */ + apiEndpoint?: string; + + /** + * The name of the destination bucket. + */ + bucket: string; + + /** + * The name of the destination file. + */ + file: string; + + /** + * The GoogleAuthOptions passed to google-auth-library + */ + authConfig?: GoogleAuthOptions; + + /** + * If you want to re-use an auth client from google-auto-auth, pass an + * instance here. + * Defaults to GoogleAuth and gets automatically overridden if an + * emulator context is detected. + */ + authClient?: { + request: ( + opts: GaxiosOptions + ) => Promise> | GaxiosPromise; + }; + + /** + * Where the gcs-resumable-upload configuration file should be stored on your + * system. This maps to the configstore option by the same name. + */ + configPath?: string; + + /** + * Create a separate request per chunk. + * + * Should be a multiple of 256 KiB (2^18). + * We recommend using at least 8 MiB for the chunk size. + * + * @link https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload + */ + chunkSize?: number; + + /** + * For each API request we send, you may specify custom request options that + * we'll add onto the request. The request options follow the gaxios API: + * https://github.com/googleapis/gaxios#request-options. + */ + customRequestOptions?: GaxiosOptions; + + /** + * This will cause the upload to fail if the current generation of the remote + * object does not match the one provided here. + */ + generation?: number; + + /** + * A customer-supplied encryption key. See + * https://cloud.google.com/storage/docs/encryption#customer-supplied. + */ + key?: string | Buffer; + + /** + * Resource name of the Cloud KMS key, of the form + * `projects/my-project/locations/global/keyRings/my-kr/cryptoKeys/my-key`, + * that will be used to encrypt the object. Overrides the object metadata's + * `kms_key_name` value, if any. + */ + kmsKeyName?: string; + + /** + * Any metadata you wish to set on the object. + */ + metadata?: ConfigMetadata; + + /** + * The starting byte of the upload stream, for resuming an interrupted upload. + * See + * https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload#resume-upload. + */ + offset?: number; + + /** + * Set an Origin header when creating the resumable upload URI. + */ + origin?: string; + + /** + * Specify query parameters that go along with the initial upload request. See + * https://cloud.google.com/storage/docs/json_api/v1/objects/insert#parameters + */ + params?: QueryParameters; + + /** + * Apply a predefined set of access controls to the created file. + */ + predefinedAcl?: PredefinedAcl; + + /** + * Make the uploaded file private. (Alias for config.predefinedAcl = + * 'private') + */ + private?: boolean; + + /** + * Make the uploaded file public. (Alias for config.predefinedAcl = + * 'publicRead') + */ + public?: boolean; + + /** + * If you already have a resumable URI from a previously-created resumable + * upload, just pass it in here and we'll use that. + */ + uri?: string; + + /** + * If the bucket being accessed has requesterPays functionality enabled, this + * can be set to control which project is billed for the access of this file. + */ + userProject?: string; + + /** + * Configuration options for retrying retryable errors. + */ + retryOptions?: RetryOptions; +} + +export interface ConfigMetadata { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + [key: string]: any; + + /** + * Set the length of the file being uploaded. + */ + contentLength?: number; + + /** + * Set the content type of the incoming data. + */ + contentType?: string; +} + +export interface RetryOptions { + retryDelayMultiplier?: number; + totalTimeout?: number; + maxRetryDelay?: number; + autoRetry?: boolean; + maxRetries?: number; + retryableErrorFn?: (err: ApiError) => boolean; +} + +export interface GoogleInnerError { + reason?: string; +} + +export interface ApiError extends Error { + code?: number; + errors?: GoogleInnerError[]; +} + +export class Upload extends Pumpify { + bucket: string; + file: string; + apiEndpoint: string; + baseURI: string; + authConfig?: {scopes?: string[]}; + /* + * Defaults to GoogleAuth and gets automatically overridden if an + * emulator context is detected. + */ + authClient: { + request: ( + opts: GaxiosOptions + ) => Promise> | GaxiosPromise; + }; + cacheKey: string; + chunkSize?: number; + customRequestOptions: GaxiosOptions; + generation?: number; + key?: string | Buffer; + kmsKeyName?: string; + metadata: ConfigMetadata; + offset?: number; + origin?: string; + params: QueryParameters; + predefinedAcl?: PredefinedAcl; + private?: boolean; + public?: boolean; + uri?: string; + userProject?: string; + encryption?: Encryption; + configStore: ConfigStore; + uriProvidedManually: boolean; + numBytesWritten = 0; + numRetries = 0; + contentLength: number | '*'; + retryLimit: number = RETRY_LIMIT; + maxRetryDelay: number = MAX_RETRY_DELAY; + retryDelayMultiplier: number = RETRY_DELAY_MULTIPLIER; + maxRetryTotalTimeout: number = MAX_TOTAL_RETRY_TIMEOUT; + timeOfFirstRequest: number; + retryableErrorFn?: (err: ApiError) => boolean; + private upstreamChunkBuffer: Buffer = Buffer.alloc(0); + private chunkBufferEncoding?: BufferEncoding = undefined; + private numChunksReadInRequest = 0; + /** + * A chunk used for caching the most recent upload chunk. + * We should not assume that the server received all bytes sent in the request. + * - https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload + */ + private lastChunkSent = Buffer.alloc(0); + private upstreamEnded = false; + + constructor(cfg: UploadConfig) { + super(); + streamEvents(this); + + cfg = cfg || {}; + + if (!cfg.bucket || !cfg.file) { + throw new Error('A bucket and file name are required'); + } + + cfg.authConfig = cfg.authConfig || {}; + cfg.authConfig.scopes = [ + 'https://www.googleapis.com/auth/devstorage.full_control', + ]; + this.authClient = cfg.authClient || new GoogleAuth(cfg.authConfig); + + this.apiEndpoint = 'https://storage.googleapis.com'; + if (cfg.apiEndpoint) { + this.apiEndpoint = this.sanitizeEndpoint(cfg.apiEndpoint); + if (!DEFAULT_API_ENDPOINT_REGEX.test(cfg.apiEndpoint)) { + this.authClient = gaxios; + } + } + + this.baseURI = `${this.apiEndpoint}/upload/storage/v1/b`; + this.bucket = cfg.bucket; + + const cacheKeyElements = [cfg.bucket, cfg.file]; + if (typeof cfg.generation === 'number') { + cacheKeyElements.push(`${cfg.generation}`); + } + + this.cacheKey = cacheKeyElements.join('/'); + + this.customRequestOptions = cfg.customRequestOptions || {}; + this.file = cfg.file; + this.generation = cfg.generation; + this.kmsKeyName = cfg.kmsKeyName; + this.metadata = cfg.metadata || {}; + this.offset = cfg.offset; + this.origin = cfg.origin; + this.params = cfg.params || {}; + this.userProject = cfg.userProject; + this.chunkSize = cfg.chunkSize; + + if (cfg.key) { + /** + * NOTE: This is `as string` because there appears to be some weird kind + * of TypeScript bug as 2.8. Tracking the issue here: + * https://github.com/Microsoft/TypeScript/issues/23155 + */ + const base64Key = Buffer.from(cfg.key as string).toString('base64'); + this.encryption = { + key: base64Key, + hash: createHash('sha256').update(cfg.key).digest('base64'), + }; + } + + this.predefinedAcl = cfg.predefinedAcl; + if (cfg.private) this.predefinedAcl = 'private'; + if (cfg.public) this.predefinedAcl = 'publicRead'; + + const configPath = cfg.configPath; + this.configStore = new ConfigStore('gcs-resumable-upload', null, { + configPath, + }); + + const autoRetry = cfg?.retryOptions?.autoRetry || AUTO_RETRY_VALUE; + this.uriProvidedManually = !!cfg.uri; + this.uri = cfg.uri || this.get('uri'); + this.numBytesWritten = 0; + this.numRetries = 0; //counter for number of retries currently executed + + if (autoRetry && cfg?.retryOptions?.maxRetries !== undefined) { + this.retryLimit = cfg.retryOptions.maxRetries; + } else if (!autoRetry) { + this.retryLimit = 0; + } + + if (cfg?.retryOptions?.maxRetryDelay !== undefined) { + this.maxRetryDelay = cfg.retryOptions.maxRetryDelay; + } + + if (cfg?.retryOptions?.retryDelayMultiplier !== undefined) { + this.retryDelayMultiplier = cfg.retryOptions.retryDelayMultiplier; + } + + if (cfg?.retryOptions?.totalTimeout !== undefined) { + this.maxRetryTotalTimeout = cfg.retryOptions.totalTimeout; + } + + this.timeOfFirstRequest = Date.now(); + this.retryableErrorFn = cfg?.retryOptions?.retryableErrorFn; + + const contentLength = cfg.metadata + ? Number(cfg.metadata.contentLength) + : NaN; + this.contentLength = isNaN(contentLength) ? '*' : contentLength; + + this.upstream.on('end', () => { + this.upstreamEnded = true; + }); + + this.on('prefinish', () => { + this.upstreamEnded = true; + }); + + this.once('writing', () => { + // Now that someone is writing to this object, let's attach + // some duplexes. These duplexes enable this object to be + // better managed in terms of 'end'/'finish' control and + // buffering writes downstream if someone enables multi- + // chunk upload support (`chunkSize`) w/o adding too much into + // memory. + this.setPipeline(this.upstream, new PassThrough()); + + if (this.uri) { + this.continueUploading(); + } else { + this.createURI((err, uri) => { + if (err) { + return this.destroy(err); + } + this.set({uri}); + this.startUploading(); + }); + } + }); + } + + /** A stream representing the incoming data to upload */ + private readonly upstream = new Duplex({ + read: async () => { + this.once('prepareFinish', () => { + // Allows this (`Upload`) to finish/end once the upload has succeeded. + this.upstream.push(null); + }); + }, + write: this.writeToChunkBuffer.bind(this), + }); + + /** + * A handler for `upstream` to write and buffer its data. + * + * @param chunk The chunk to append to the buffer + * @param encoding The encoding of the chunk + * @param readCallback A callback for when the buffer has been read downstream + */ + private writeToChunkBuffer( + chunk: Buffer | string, + encoding: BufferEncoding, + readCallback: () => void + ) { + this.upstreamChunkBuffer = Buffer.concat([ + this.upstreamChunkBuffer, + typeof chunk === 'string' ? Buffer.from(chunk, encoding) : chunk, + ]); + this.chunkBufferEncoding = encoding; + + this.once('readFromChunkBuffer', readCallback); + process.nextTick(() => this.emit('wroteToChunkBuffer')); + } + + /** + * Prepends data back to the upstream chunk buffer. + * + * @param chunk The data to prepend + */ + private unshiftChunkBuffer(chunk: Buffer) { + this.upstreamChunkBuffer = Buffer.concat([chunk, this.upstreamChunkBuffer]); + } + + /** + * Retrieves data from upstream's buffer. + * + * @param limit The maximum amount to return from the buffer. + * @returns The data requested. + */ + private pullFromChunkBuffer(limit: number) { + const chunk = this.upstreamChunkBuffer.slice(0, limit); + this.upstreamChunkBuffer = this.upstreamChunkBuffer.slice(limit); + + // notify upstream we've read from the buffer so it can potentially + // send more data down. + process.nextTick(() => this.emit('readFromChunkBuffer')); + + return chunk; + } + + /** + * A handler for determining if data is ready to be read from upstream. + * + * @returns If there will be more chunks to read in the future + */ + private async waitForNextChunk(): Promise { + const willBeMoreChunks = await new Promise(resolve => { + // There's data available - it should be digested + if (this.upstreamChunkBuffer.byteLength) { + return resolve(true); + } + + // The upstream writable ended, we shouldn't expect any more data. + if (this.upstreamEnded) { + return resolve(false); + } + + // Nothing immediate seems to be determined. We need to prepare some + // listeners to determine next steps... + + const wroteToChunkBufferCallback = () => { + removeListeners(); + return resolve(true); + }; + + const upstreamFinishedCallback = () => { + removeListeners(); + + // this should be the last chunk, if there's anything there + if (this.upstreamChunkBuffer.length) return resolve(true); + + return resolve(false); + }; + + // Remove listeners when we're ready to callback. + // It's important to clean-up listeners as Node has a default max number of + // event listeners. Notably, The number of requests can be greater than the + // number of potential listeners. + // - https://nodejs.org/api/events.html#eventsdefaultmaxlisteners + const removeListeners = () => { + this.removeListener('wroteToChunkBuffer', wroteToChunkBufferCallback); + this.upstream.removeListener('finish', upstreamFinishedCallback); + this.removeListener('prefinish', upstreamFinishedCallback); + }; + + // If there's data recently written it should be digested + this.once('wroteToChunkBuffer', wroteToChunkBufferCallback); + + // If the upstream finishes let's see if there's anything to grab + this.upstream.once('finish', upstreamFinishedCallback); + this.once('prefinish', upstreamFinishedCallback); + }); + + return willBeMoreChunks; + } + + /** + * Reads data from upstream up to the provided `limit`. + * Ends when the limit has reached or no data is expected to be pushed from upstream. + * + * @param limit The most amount of data this iterator should return. `Infinity` by default. + * @param oneChunkMode Determines if one, exhaustive chunk is yielded for the iterator + */ + private async *upstreamIterator(limit = Infinity, oneChunkMode?: boolean) { + let completeChunk = Buffer.alloc(0); + + // read from upstream chunk buffer + while (limit && (await this.waitForNextChunk())) { + // read until end or limit has been reached + const chunk = this.pullFromChunkBuffer(limit); + + limit -= chunk.byteLength; + if (oneChunkMode) { + // return 1 chunk at the end of iteration + completeChunk = Buffer.concat([completeChunk, chunk]); + } else { + // return many chunks throughout iteration + yield { + chunk, + encoding: this.chunkBufferEncoding, + }; + } + } + + if (oneChunkMode) { + yield { + chunk: completeChunk, + encoding: this.chunkBufferEncoding, + }; + } + } + + createURI(): Promise; + createURI(callback: CreateUriCallback): void; + createURI(callback?: CreateUriCallback): void | Promise { + if (!callback) { + return this.createURIAsync(); + } + this.createURIAsync().then(r => callback(null, r), callback); + } + + protected async createURIAsync(): Promise { + const metadata = this.metadata; + + const reqOpts: GaxiosOptions = { + method: 'POST', + url: [this.baseURI, this.bucket, 'o'].join('/'), + params: Object.assign( + { + name: this.file, + uploadType: 'resumable', + }, + this.params + ), + data: metadata, + headers: {}, + }; + + if (metadata.contentLength) { + reqOpts.headers!['X-Upload-Content-Length'] = + metadata.contentLength.toString(); + } + + if (metadata.contentType) { + reqOpts.headers!['X-Upload-Content-Type'] = metadata.contentType; + } + + if (typeof this.generation !== 'undefined') { + reqOpts.params.ifGenerationMatch = this.generation; + } + + if (this.kmsKeyName) { + reqOpts.params.kmsKeyName = this.kmsKeyName; + } + + if (this.predefinedAcl) { + reqOpts.params.predefinedAcl = this.predefinedAcl; + } + + if (this.origin) { + reqOpts.headers!.Origin = this.origin; + } + const uri = await retry( + async (bail: (err: Error) => void) => { + try { + const res = await this.makeRequest(reqOpts); + return res.headers.location; + } catch (err) { + const e = err as GaxiosError; + const apiError = { + code: e.response?.status, + name: e.response?.statusText, + message: e.response?.statusText, + errors: [ + { + reason: e.code as string, + }, + ], + }; + if ( + this.retryLimit > 0 && + this.retryableErrorFn && + this.retryableErrorFn!(apiError as ApiError) + ) { + throw e; + } else { + return bail(e); + } + } + }, + { + retries: this.retryLimit, + factor: this.retryDelayMultiplier, + maxTimeout: this.maxRetryDelay! * 1000, //convert to milliseconds + maxRetryTime: this.maxRetryTotalTimeout! * 1000, //convert to milliseconds + } + ); + + this.uri = uri; + this.offset = 0; + return uri; + } + + private async continueUploading() { + if (typeof this.offset === 'number') { + this.startUploading(); + return; + } + await this.getAndSetOffset(); + this.startUploading(); + } + + async startUploading() { + const multiChunkMode = !!this.chunkSize; + let responseReceived = false; + this.numChunksReadInRequest = 0; + + if (!this.offset) { + this.offset = 0; + } + + // Check if we're uploading the expected object + if (this.numBytesWritten === 0) { + const isSameObject = await this.ensureUploadingSameObject(); + if (!isSameObject) { + // `ensureUploadingSameObject` will restart the upload. + return; + } + } + + // Check if the offset (server) is too far behind the current stream + if (this.offset < this.numBytesWritten) { + this.emit( + 'error', + new RangeError('The offset is lower than the number of bytes written') + ); + return; + } + + // Check if we should 'fast-forward' to the relevant data to upload + if (this.numBytesWritten < this.offset) { + // 'fast-forward' to the byte where we need to upload. + // only push data from the byte after the one we left off on + const fastForwardBytes = this.offset - this.numBytesWritten; + + for await (const _chunk of this.upstreamIterator(fastForwardBytes)) { + _chunk; // discard the data up until the point we want + } + + this.numBytesWritten = this.offset; + } + + let expectedUploadSize: number | undefined = undefined; + + // Set `expectedUploadSize` to `contentLength` if available + if (typeof this.contentLength === 'number') { + expectedUploadSize = this.contentLength - this.numBytesWritten; + } + + // `expectedUploadSize` should be no more than the `chunkSize`. + // It's possible this is the last chunk request for a multiple + // chunk upload, thus smaller than the chunk size. + if (this.chunkSize) { + expectedUploadSize = expectedUploadSize + ? Math.min(this.chunkSize, expectedUploadSize) + : this.chunkSize; + } + + // A queue for the upstream data + const upstreamQueue = this.upstreamIterator( + expectedUploadSize, + multiChunkMode // multi-chunk mode should return 1 chunk per request + ); + + // The primary read stream for this request. This stream retrieves no more + // than the exact requested amount from upstream. + const requestStream = new Readable({ + read: async () => { + // Don't attempt to retrieve data upstream if we already have a response + if (responseReceived) requestStream.push(null); + + const result = await upstreamQueue.next(); + + if (result.value) { + this.numChunksReadInRequest++; + this.lastChunkSent = result.value.chunk; + this.numBytesWritten += result.value.chunk.byteLength; + + this.emit('progress', { + bytesWritten: this.numBytesWritten, + contentLength: this.contentLength, + }); + + requestStream.push(result.value.chunk, result.value.encoding); + } + + if (result.done) { + requestStream.push(null); + } + }, + }); + + // This should be 'once' as `startUploading` can be called again for + // multi chunk uploads and each request would have its own response. + this.once('response', resp => { + responseReceived = true; + this.responseHandler(resp); + }); + let headers: GaxiosOptions['headers'] = {}; + + // If using multiple chunk upload, set appropriate header + if (multiChunkMode && expectedUploadSize) { + // The '-1' is because the ending byte is inclusive in the request. + const endingByte = expectedUploadSize + this.numBytesWritten - 1; + headers = { + 'Content-Length': expectedUploadSize, + 'Content-Range': `bytes ${this.offset}-${endingByte}/${this.contentLength}`, + }; + } else { + headers = { + 'Content-Range': `bytes ${this.offset}-*/${this.contentLength}`, + }; + } + + const reqOpts: GaxiosOptions = { + method: 'PUT', + url: this.uri, + headers, + body: requestStream, + }; + + try { + await this.makeRequestStream(reqOpts); + } catch (err) { + const e = err as Error; + this.destroy(e); + } + } + + // Process the API response to look for errors that came in + // the response body. + private responseHandler(resp: GaxiosResponse) { + if (resp.data.error) { + this.destroy(resp.data.error); + return; + } + + const shouldContinueWithNextMultiChunkRequest = + this.chunkSize && + resp.status === RESUMABLE_INCOMPLETE_STATUS_CODE && + resp.headers.range; + + if (shouldContinueWithNextMultiChunkRequest) { + // Use the upper value in this header to determine where to start the next chunk. + // We should not assume that the server received all bytes sent in the request. + // https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload + const range: string = resp.headers.range; + this.offset = Number(range.split('-')[1]) + 1; + + // We should not assume that the server received all bytes sent in the request. + // - https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload + const missingBytes = this.numBytesWritten - this.offset; + if (missingBytes) { + const dataToPrependForResending = this.lastChunkSent.slice( + -missingBytes + ); + // As multi-chunk uploads send one chunk per request and pulls one + // chunk into the pipeline, prepending the missing bytes back should + // be fine for the next request. + this.unshiftChunkBuffer(dataToPrependForResending); + this.numBytesWritten -= missingBytes; + this.lastChunkSent = Buffer.alloc(0); + } + + // continue uploading next chunk + this.continueUploading(); + } else if (!this.isSuccessfulResponse(resp.status)) { + const err: ApiError = { + code: resp.status, + name: 'Upload failed', + message: 'Upload failed', + }; + this.destroy(err); + } else { + // remove the last chunk sent + this.lastChunkSent = Buffer.alloc(0); + + if (resp && resp.data) { + resp.data.size = Number(resp.data.size); + } + this.emit('metadata', resp.data); + this.deleteConfig(); + + // Allow the object (Upload) to continue naturally so the user's + // "finish" event fires. + this.emit('prepareFinish'); + } + } + + /** + * Check if this is the same content uploaded previously. This caches a + * slice of the first chunk, then compares it with the first byte of + * incoming data. + * + * @returns if the request is ok to continue as-is + */ + private async ensureUploadingSameObject() { + // A queue for the upstream data + const upstreamQueue = this.upstreamIterator( + 16, + true // we just want one chunk for this validation + ); + + const upstreamChunk = await upstreamQueue.next(); + const chunk = upstreamChunk.value + ? upstreamChunk.value.chunk + : Buffer.alloc(0); + + // Put the original chunk back into the buffer as we just wanted to 'peek' + // at the stream for validation. + this.unshiftChunkBuffer(chunk); + + let cachedFirstChunk = this.get('firstChunk'); + const firstChunk = chunk.valueOf(); + + if (!cachedFirstChunk) { + // This is a new upload. Cache the first chunk. + this.set({uri: this.uri, firstChunk}); + } else { + // this continues an upload in progress. check if the bytes are the same + cachedFirstChunk = Buffer.from(cachedFirstChunk); + const nextChunk = Buffer.from(firstChunk); + if (Buffer.compare(cachedFirstChunk, nextChunk) !== 0) { + // this data is not the same. start a new upload + this.restart(); + return false; + } + } + + return true; + } + + private async getAndSetOffset() { + const opts: GaxiosOptions = { + method: 'PUT', + url: this.uri!, + headers: {'Content-Length': 0, 'Content-Range': 'bytes */*'}, + }; + try { + const resp = await this.makeRequest(opts); + if (resp.status === RESUMABLE_INCOMPLETE_STATUS_CODE) { + if (resp.headers.range) { + const range = resp.headers.range as string; + this.offset = Number(range.split('-')[1]) + 1; + return; + } + } + this.offset = 0; + } catch (e) { + const err = e as GaxiosError; + const resp = err.response; + // we don't return a 404 to the user if they provided the resumable + // URI. if we're just using the configstore file to tell us that this + // file exists, and it turns out that it doesn't (the 404), that's + // probably stale config data. + if ( + resp && + resp.status === NOT_FOUND_STATUS_CODE && + !this.uriProvidedManually + ) { + this.restart(); + return; + } + + // this resumable upload is unrecoverable (bad data or service error). + // - + // https://github.com/googleapis/gcs-resumable-upload/issues/15 + // - + // https://github.com/googleapis/gcs-resumable-upload/pull/16#discussion_r80363774 + if (resp && resp.status === TERMINATED_UPLOAD_STATUS_CODE) { + this.restart(); + return; + } + + this.destroy(err); + } + } + + private async makeRequest(reqOpts: GaxiosOptions): GaxiosPromise { + if (this.encryption) { + reqOpts.headers = reqOpts.headers || {}; + reqOpts.headers['x-goog-encryption-algorithm'] = 'AES256'; + reqOpts.headers['x-goog-encryption-key'] = this.encryption.key.toString(); + reqOpts.headers['x-goog-encryption-key-sha256'] = + this.encryption.hash.toString(); + } + + if (this.userProject) { + reqOpts.params = reqOpts.params || {}; + reqOpts.params.userProject = this.userProject; + } + // Let gaxios know we will handle a 308 error code ourselves. + reqOpts.validateStatus = (status: number) => { + return ( + this.isSuccessfulResponse(status) || + status === RESUMABLE_INCOMPLETE_STATUS_CODE + ); + }; + + const combinedReqOpts = extend( + true, + {}, + this.customRequestOptions, + reqOpts + ); + const res = await this.authClient.request<{error?: object}>( + combinedReqOpts + ); + if (res.data && res.data.error) { + throw res.data.error; + } + return res; + } + + private async makeRequestStream(reqOpts: GaxiosOptions): GaxiosPromise { + const controller = new AbortController(); + const errorCallback = () => controller.abort(); + this.once('error', errorCallback); + + if (this.userProject) { + reqOpts.params = reqOpts.params || {}; + reqOpts.params.userProject = this.userProject; + } + reqOpts.signal = controller.signal; + reqOpts.validateStatus = () => true; + + const combinedReqOpts = extend( + true, + {}, + this.customRequestOptions, + reqOpts + ); + const res = await this.authClient.request(combinedReqOpts); + this.onResponse(res); + this.removeListener('error', errorCallback); + + return res; + } + + private restart() { + if (this.numBytesWritten) { + let message = + 'Attempting to restart an upload after unrecoverable bytes have been written from upstream. '; + message += 'Stopping as this could result in data loss. '; + message += 'Create a new upload object to continue.'; + + this.emit('error', new RangeError(message)); + return; + } + + this.lastChunkSent = Buffer.alloc(0); + this.deleteConfig(); + this.createURI((err, uri) => { + if (err) { + return this.destroy(err); + } + this.set({uri}); + this.startUploading(); + }); + } + + private get(prop: string) { + const store = this.configStore.get(this.cacheKey); + return store && store[prop]; + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private set(props: any) { + this.configStore.set(this.cacheKey, props); + } + + deleteConfig() { + this.configStore.delete(this.cacheKey); + } + + /** + * @return {bool} is the request good? + */ + private onResponse(resp: GaxiosResponse) { + if ( + (this.retryableErrorFn && + this.retryableErrorFn({ + code: resp.status, + message: resp.statusText, + name: resp.statusText, + })) || + resp.status === NOT_FOUND_STATUS_CODE || + this.isServerErrorResponse(resp.status) + ) { + this.attemptDelayedRetry(resp); + return false; + } + + this.emit('response', resp); + return true; + } + + /** + * @param resp GaxiosResponse object from previous attempt + */ + private attemptDelayedRetry(resp: GaxiosResponse) { + if (this.numRetries < this.retryLimit) { + if ( + resp.status === NOT_FOUND_STATUS_CODE && + this.numChunksReadInRequest === 0 + ) { + this.startUploading(); + } else { + const retryDelay = this.getRetryDelay(); + + if (retryDelay <= 0) { + this.destroy( + new Error(`Retry total time limit exceeded - ${resp.data}`) + ); + return; + } + + // Unshift the most recent chunk back in case it's needed for the next + // request. + this.numBytesWritten -= this.lastChunkSent.byteLength; + this.unshiftChunkBuffer(this.lastChunkSent); + this.lastChunkSent = Buffer.alloc(0); + + // We don't know how much data has been received by the server. + // `continueUploading` will recheck the offset via `getAndSetOffset`. + // If `offset` < `numberBytesReceived` then we will raise a RangeError + // as we've streamed too much data that has been missed - this should + // not be the case for multi-chunk uploads as `lastChunkSent` is the + // body of the entire request. + this.offset = undefined; + + setTimeout(this.continueUploading.bind(this), retryDelay); + } + this.numRetries++; + } else { + this.destroy(new Error('Retry limit exceeded - ' + resp.data)); + } + } + + /** + * @returns {number} the amount of time to wait before retrying the request + */ + private getRetryDelay(): number { + const randomMs = Math.round(Math.random() * 1000); + const waitTime = + Math.pow(this.retryDelayMultiplier, this.numRetries) * 1000 + randomMs; + const maxAllowableDelayMs = + this.maxRetryTotalTimeout * 1000 - (Date.now() - this.timeOfFirstRequest); + const maxRetryDelayMs = this.maxRetryDelay * 1000; + + return Math.min(waitTime, maxRetryDelayMs, maxAllowableDelayMs); + } + + /* + * Prepare user-defined API endpoint for compatibility with our API. + */ + private sanitizeEndpoint(url: string) { + if (!PROTOCOL_REGEX.test(url)) { + url = `https://${url}`; + } + return url.replace(/\/+$/, ''); // Remove trailing slashes + } + + /** + * Check if a given status code is 2xx + * + * @param status The status code to check + * @returns if the status is 2xx + */ + public isSuccessfulResponse(status: number): boolean { + return status >= 200 && status < 300; + } + + /** + * Check if a given status code is 5xx + * + * @param status The status code to check + * @returns if the status is 5xx + */ + public isServerErrorResponse(status: number): boolean { + return status >= 500 && status < 600; + } +} + +export function upload(cfg: UploadConfig) { + return new Upload(cfg); +} + +export function createURI(cfg: UploadConfig): Promise; +export function createURI(cfg: UploadConfig, callback: CreateUriCallback): void; +export function createURI( + cfg: UploadConfig, + callback?: CreateUriCallback +): void | Promise { + const up = new Upload(cfg); + if (!callback) { + return up.createURI(); + } + up.createURI().then(r => callback(null, r), callback); +} diff --git a/system-test/kitchen.ts b/system-test/kitchen.ts new file mode 100644 index 000000000..90a463d8e --- /dev/null +++ b/system-test/kitchen.ts @@ -0,0 +1,191 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as assert from 'assert'; +import {describe, it, beforeEach} from 'mocha'; +import * as fs from 'fs'; +import * as path from 'path'; +import * as tmp from 'tmp'; +import * as crypto from 'crypto'; +import * as os from 'os'; +import {Readable} from 'stream'; +import {createURI, ErrorWithCode, upload} from '../src/gcs-resumable-upload'; + +const bucketName = process.env.BUCKET_NAME || 'gcs-resumable-upload-test'; +tmp.setGracefulCleanup(); +const tmpFileContents = crypto.randomBytes(1024 * 1024 * 20); +const filePath = path.join(os.tmpdir(), '20MB.zip'); +const writeStream = fs.createWriteStream(filePath); +writeStream.write(tmpFileContents); +writeStream.close(); + +async function delay(title: string, retries: number, done: Function) { + if (retries === 0) return done(); // no retry on the first failure. + // see: https://cloud.google.com/storage/docs/exponential-backoff: + const ms = Math.pow(2, retries) * 1000 + Math.random() * 2000; + console.info(`retrying "${title}" in ${ms}ms`); + setTimeout(done(), ms); +} + +describe('gcs-resumable-upload', () => { + beforeEach(() => { + upload({bucket: bucketName, file: filePath}).deleteConfig(); + }); + + it('should work', done => { + let uploadSucceeded = false; + fs.createReadStream(filePath) + .on('error', done) + .pipe( + upload({ + bucket: bucketName, + file: filePath, + metadata: {contentType: 'image/jpg'}, + }) + ) + .on('error', done) + .on('response', resp => { + uploadSucceeded = resp.status === 200; + }) + .on('finish', () => { + assert.strictEqual(uploadSucceeded, true); + done(); + }); + }); + + let retries = 0; + it('should resume an interrupted upload', function (done) { + this.retries(3); + delay(this.test!.title, retries, () => { + retries++; + // If we've retried, delay. + fs.stat(filePath, (err, fd) => { + assert.ifError(err); + + const size = fd.size; + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + type DoUploadCallback = (...args: any[]) => void; + const doUpload = ( + opts: {interrupt?: boolean}, + callback: DoUploadCallback + ) => { + let sizeStreamed = 0; + let destroyed = false; + + const ws = upload({ + bucket: bucketName, + file: filePath, + metadata: {contentType: 'image/jpg'}, + }); + + fs.createReadStream(filePath) + .on('error', callback) + .on('data', function (this: Readable, chunk) { + sizeStreamed += chunk.length; + + if (!destroyed && opts.interrupt && sizeStreamed >= size / 2) { + // stop sending data half way through + destroyed = true; + this.destroy(); + process.nextTick(() => ws.destroy(new Error('Interrupted'))); + } + }) + .pipe(ws) + .on('error', callback) + .on('metadata', callback.bind(null, null)); + }; + + doUpload({interrupt: true}, (err: Error) => { + assert.strictEqual(err.message, 'Interrupted'); + + doUpload( + {interrupt: false}, + (err: Error, metadata: {size: number}) => { + assert.ifError(err); + assert.strictEqual(metadata.size, size); + assert.strictEqual(typeof metadata.size, 'number'); + done(); + } + ); + }); + }); + }); + }); + + it('should just make an upload URI', done => { + createURI( + { + bucket: bucketName, + file: filePath, + metadata: {contentType: 'image/jpg'}, + }, + done + ); + }); + + it('should return a non-resumable failed upload', done => { + const metadata = { + metadata: {largeString: 'a'.repeat(2.1e6)}, + }; + + fs.createReadStream(filePath) + .on('error', done) + .pipe( + upload({ + bucket: bucketName, + file: filePath, + metadata, + }) + ) + .on('error', (err: ErrorWithCode) => { + assert.strictEqual(err.code, '400'); + done(); + }); + }); + + it('should set custom config file', done => { + const uploadOptions = { + bucket: bucketName, + file: filePath, + metadata: {contentType: 'image/jpg'}, + configPath: path.join( + os.tmpdir(), + `test-gcs-resumable-${Date.now()}.json` + ), + }; + let uploadSucceeded = false; + + fs.createReadStream(filePath) + .on('error', done) + .pipe(upload(uploadOptions)) + .on('error', done) + .on('response', resp => { + uploadSucceeded = resp.status === 200; + }) + .on('finish', () => { + assert.strictEqual(uploadSucceeded, true); + + const configData = JSON.parse( + fs.readFileSync(uploadOptions.configPath, 'utf8') + ); + const keyName = `${uploadOptions.bucket}/${uploadOptions.file}`.replace( + path.extname(filePath), + '' + ); + assert.ok(Object.keys(configData).includes(keyName)); + done(); + }); + }); +}); diff --git a/test/file.ts b/test/file.ts index 8d0e298da..96a60d22a 100644 --- a/test/file.ts +++ b/test/file.ts @@ -32,7 +32,7 @@ import * as fs from 'fs'; import * as os from 'os'; import * as path from 'path'; import * as proxyquire from 'proxyquire'; -import * as resumableUpload from 'gcs-resumable-upload'; +import * as resumableUpload from '../src/gcs-resumable-upload'; import * as sinon from 'sinon'; import * as tmp from 'tmp'; import * as zlib from 'zlib'; @@ -210,7 +210,7 @@ describe('File', () => { }, '@google-cloud/promisify': fakePromisify, fs: fakeFs, - 'gcs-resumable-upload': fakeResumableUpload, + '../src/gcs-resumable-upload': fakeResumableUpload, 'hash-stream-validation': fakeHashStreamValidation, os: fakeOs, './signer': fakeSigner, diff --git a/test/fixtures/keys.json b/test/fixtures/keys.json new file mode 100644 index 000000000..073edb707 --- /dev/null +++ b/test/fixtures/keys.json @@ -0,0 +1,12 @@ +{ + "type": "service_account", + "project_id": "project-id", + "private_key_id": "12345", + "private_key": "-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQC5z21IhrvlHBj7\nifRhobA9ibn25Od7DpE5OauGmqy7B+A9LQOsk1ZujAFdHItnBPcjihSVHpiYxf1a\nLpFbM8z/hRvDvYS3Hs1pyRejmpGiznoOjCyUf6Wv3T1xKelbgn0twHHjqD1o0xzW\njyUILl7yuCbsAf8QlsV6ewS3IqO3i5A9RNHfKjeap8e6A7U3s9QBtR58RrxaMQpM\nz72gw7yOdJRfElkerQfyZTbtu/EBfE6CcskOyoMoRN3YgkjQqLMr1yVdL5/phEcQ\n5hpbDN5lrafGHN7FUtsrMge2iIuIYFfWQUTqu7HtnNXVmwj1LJNq5WeI1iInWaGz\nb7c1rUT9AgMBAAECggEAEB0FicqVX3L7qk9LsBkeItgKFnfB/eaaKsTuM7K/fqCv\njjPpzlIgprQ20g+i+dYbuytC9Fjo5tFp/SNuBji3Ha7kuil56Yoe9NOJh0M6P0zP\nQj+0W1Rj1p0bB5tDhLoLh6eEDjgNde+zioUeCFhCck4MogmHnbVVfspNnba/99oD\nl36heAioqj/KODdkbQ83+ByiH+3BzqblqJ4TR/3y0wOUXtlQvCHko1qximJFIM0z\n3TNoPiit74hTiFFOYfJyHpmRsiEJ5FUUImkmCJz2gk4fbpafKrgxxOMo1m7GqlsE\nE+ybHxyAq61HYbZOoUOO8B4md1/52QXP7DgPvV7JyQKBgQD+JS5nsR4TXRl61c9G\nNxoPW9yCMCoarIjkpyPmhh0uJ7y68cj9wHFgX6ATi1QuTnG9BzJ4z27PMgvv70N+\nAK6k74sdIT2ts8wYsD8H0UyuxDxeKiAnb2JW2f5GTcXNmELQi6rKkMNMoS8jv00d\ngzLCV7UbCbdf+ng9uRPs+Fvk9wKBgQC7KpNaeYFf5dmIYRWQhlZWBRoftdm1ROH/\n5GJsURkzlEjUH1g1y9eAigBn5I+Z9hylX2q1vHLpUHqONWwDz8oQ1L1o2iLz+tkp\nkNoaLSAb9uCl6t8tpqCG2dqUrxOmy1+xj3G8KI8XuYb+IwVSy6KK2df8fWN4d+i0\ng+TBb75MqwKBgEezwcXriKq554hqblJHFYkjx7DLWfWwm+a26UAOsojlGTA9KxG8\ni8A++nDJLHTsGNbWAv1muMKoQgntnUMdeih6lOshB7/MLFcC0qWn/VSJdOa0R+IY\nYMxUMJMxOg9pV+BypzsDYLZr+1rAjEc5TsbZ6/S25w+jIO15HBANeg+9AoGAZulz\nGkVDCLq2UJGpLM1gvW2Svqrb6RrV9UDbiVlSNRUssk4Fz5akiM3YiUeYWfyEJb4A\nS6sxt+4DZRwkpzfikDyZZQTEQUjFjWBTPB9hz16AiVpKmqxLCbrRv/1AHe8nT9di\nnyXiABaIDkatT6geWKCNbQx43C16a382EdJiXX8CgYEAqyAS2xuDi2+uoljRm1Bp\naz7Q2UBtBbcBr/CQmagEacWPXsSyCL6EySOH0e985k7ABZiW+AzWlOwKS5WMWAIb\nkncmxP0SU6WQDWl8xGbXAQ8Dw+HTu5G1n0vrl1rRO5FPwRs3pbV94ML+d5eoai6D\njHs1asOGIpdQ3OGpBpNRub0=\n-----END PRIVATE KEY-----\n", + "client_email": "some-email@example.com", + "client_id": "12345", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://accounts.google.com/o/oauth2/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/el-gato%40el-gato.iam.gserviceaccount.com" +} diff --git a/test/gcs-resumable-upload.ts b/test/gcs-resumable-upload.ts new file mode 100644 index 000000000..f22480121 --- /dev/null +++ b/test/gcs-resumable-upload.ts @@ -0,0 +1,2489 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as assert from 'assert'; +import {describe, it, beforeEach, before, afterEach, after} from 'mocha'; +import * as crypto from 'crypto'; +import * as mockery from 'mockery'; +import * as nock from 'nock'; +import * as path from 'path'; +import * as sinon from 'sinon'; +import {PassThrough, Readable} from 'stream'; + +import { + ApiError, + CreateUriCallback, + PROTOCOL_REGEX, +} from '../src/gcs-resumable-upload/index'; +import {GaxiosOptions, GaxiosError, GaxiosResponse} from 'gaxios'; + +nock.disableNetConnect(); + +class AbortController { + aborted = false; + signal = this; + abort() { + this.aborted = true; + } +} + +let configData = {} as {[index: string]: {}}; +class ConfigStore { + constructor(packageName: string, defaults: object, config: object) { + this.set('packageName', packageName); + this.set('config', config); + } + delete(key: string) { + delete configData[key]; + } + get(key: string) { + return configData[key]; + } + set(key: string, value: {}) { + configData[key] = value; + } +} + +const RESUMABLE_INCOMPLETE_STATUS_CODE = 308; +/** 256 KiB */ +const CHUNK_SIZE_MULTIPLE = 2 ** 18; +const queryPath = '/?userProject=user-project-id'; + +function mockAuthorizeRequest( + code = 200, + data: {} | string = { + access_token: 'abc123', + } +) { + return nock('https://www.googleapis.com') + .post('/oauth2/v4/token') + .reply(code, data); +} + +describe('gcs-resumable-upload', () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let upload: any; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + let up: any; + + const BUCKET = 'bucket-name'; + const CUSTOM_REQUEST_OPTIONS = {headers: {'X-My-Header': 'My custom value'}}; + const FILE = 'file-name'; + const GENERATION = Date.now(); + const METADATA = {contentLength: 1024, contentType: 'application/json'}; + const ORIGIN = '*'; + const PARAMS = {ifMetagenerationNotMatch: 3}; + const PREDEFINED_ACL = 'authenticatedRead'; + const USER_PROJECT = 'user-project-id'; + const API_ENDPOINT = 'https://fake.googleapis.com'; + const BASE_URI = `${API_ENDPOINT}/upload/storage/v1/b`; + let REQ_OPTS: GaxiosOptions; + const keyFile = path.join(__dirname, '../../test/fixtures/keys.json'); + + before(() => { + mockery.registerMock('abort-controller', {default: AbortController}); + mockery.registerMock('configstore', ConfigStore); + mockery.enable({useCleanCache: true, warnOnUnregistered: false}); + upload = require('../src/gcs-resumable-upload').upload; + }); + + beforeEach(() => { + configData = {}; + REQ_OPTS = {url: 'http://fake.local'}; + up = upload({ + bucket: BUCKET, + file: FILE, + customRequestOptions: CUSTOM_REQUEST_OPTIONS, + generation: GENERATION, + metadata: METADATA, + origin: ORIGIN, + params: PARAMS, + predefinedAcl: PREDEFINED_ACL, + userProject: USER_PROJECT, + authConfig: {keyFile}, + apiEndpoint: API_ENDPOINT, + }); + }); + + afterEach(() => { + nock.cleanAll(); + }); + + after(() => { + mockery.deregisterAll(); + mockery.disable(); + }); + + describe('ctor', () => { + it('should throw if a bucket or file is not given', () => { + assert.throws(() => { + upload(); + }, /A bucket and file name are required/); + }); + + it('should localize the bucket', () => { + assert.strictEqual(up.bucket, BUCKET); + }); + + it('should localize the cacheKey', () => { + assert.strictEqual(up.cacheKey, [BUCKET, FILE, GENERATION].join('/')); + }); + + it('should localize customRequestOptions', () => { + assert.strictEqual(up.customRequestOptions, CUSTOM_REQUEST_OPTIONS); + }); + + it('should default customRequestOptions to empty object', () => { + const up = upload({bucket: BUCKET, file: FILE}); + assert.deepStrictEqual(up.customRequestOptions, {}); + }); + + it('should include ZERO generation value in the cacheKey', () => { + const upWithZeroGeneration = upload({ + bucket: BUCKET, + file: FILE, + generation: 0, + metadata: METADATA, + origin: ORIGIN, + predefinedAcl: PREDEFINED_ACL, + userProject: USER_PROJECT, + authConfig: {keyFile}, + apiEndpoint: API_ENDPOINT, + }); + assert.strictEqual( + upWithZeroGeneration.cacheKey, + [BUCKET, FILE, 0].join('/') + ); + }); + + it('should not include a generation in the cacheKey if it was not set', () => { + const up = upload({ + bucket: BUCKET, + file: FILE, + }); + + assert.strictEqual(up.cacheKey, [BUCKET, FILE].join('/')); + }); + + it('should localize the file', () => { + assert.strictEqual(up.file, FILE); + }); + + it('should localize the generation', () => { + assert.strictEqual(up.generation, GENERATION); + }); + + it('should localize the apiEndpoint', () => { + assert.strictEqual(up.apiEndpoint, API_ENDPOINT); + assert.strictEqual(up.baseURI, BASE_URI); + }); + + it('should prepend https:// to apiEndpoint if not present', () => { + const up = upload({ + bucket: BUCKET, + file: FILE, + apiEndpoint: 'fake.googleapis.com', + }); + assert.strictEqual(up.apiEndpoint, API_ENDPOINT); + assert.strictEqual(up.baseURI, BASE_URI); + }); + + it('should localize the KMS key name', () => { + const kmsKeyName = 'kms-key-name'; + const up = upload({bucket: 'BUCKET', file: FILE, kmsKeyName}); + assert.strictEqual(up.kmsKeyName, kmsKeyName); + }); + + it('should localize metadata or default to empty object', () => { + assert.strictEqual(up.metadata, METADATA); + + const upWithoutMetadata = upload({bucket: BUCKET, file: FILE}); + assert.deepStrictEqual(upWithoutMetadata.metadata, {}); + }); + + it('should set the offset if it is provided', () => { + const offset = 10; + const up = upload({bucket: BUCKET, file: FILE, offset}); + + assert.strictEqual(up.offset, offset); + }); + + it('should localize the origin', () => { + assert.strictEqual(up.origin, ORIGIN); + }); + + it('should localize the params', () => { + assert.strictEqual(up.params, PARAMS); + }); + + it('should localize userProject', () => { + assert.strictEqual(up.userProject, USER_PROJECT); + }); + + it('should localize an encryption object from a key', () => { + const key = crypto.randomBytes(32); + const up = upload({bucket: BUCKET, file: FILE, key}); + const expectedKey = key.toString('base64'); + const expectedHash = crypto + .createHash('sha256') + .update(key) + .digest('base64'); + assert.deepStrictEqual(up.encryption, { + key: expectedKey, + hash: expectedHash, + }); + }); + + it('should localize the predefinedAcl', () => { + assert.strictEqual(up.predefinedAcl, PREDEFINED_ACL); + }); + + it('should set the predefinedAcl with public: true', () => { + const up = upload({bucket: BUCKET, file: FILE, public: true}); + assert.strictEqual(up.predefinedAcl, 'publicRead'); + }); + + it('should set the predefinedAcl with private: true', () => { + const up = upload({bucket: BUCKET, file: FILE, private: true}); + assert.strictEqual(up.predefinedAcl, 'private'); + }); + + it('should create a ConfigStore instance', () => { + assert.strictEqual(configData.packageName, 'gcs-resumable-upload'); + }); + + it('should set the configPath', () => { + const configPath = '/custom/config/path'; + // eslint-disable-next-line @typescript-eslint/no-unused-vars + const up = upload({bucket: BUCKET, file: FILE, configPath}); + assert.deepStrictEqual(configData.config, {configPath}); + }); + + it('should set numBytesWritten to 0', () => { + assert.strictEqual(up.numBytesWritten, 0); + }); + + it('should set numRetries to 0', () => { + assert.strictEqual(up.numRetries, 0); + }); + + it('should set the contentLength if provided', () => { + const up = upload({ + bucket: BUCKET, + file: FILE, + metadata: {contentLength: METADATA.contentLength}, + }); + assert.strictEqual(up.contentLength, METADATA.contentLength); + }); + + it('should default the contentLength to *', () => { + const up = upload({bucket: BUCKET, file: FILE}); + assert.strictEqual(up.contentLength, '*'); + }); + + it('should localize the uri or get one from config', () => { + const uri = 'http://www.blah.com/'; + const upWithUri = upload({bucket: BUCKET, file: FILE, uri}); + assert.strictEqual(upWithUri.uriProvidedManually, true); + assert.strictEqual(upWithUri.uri, uri); + + configData[`${BUCKET}/${FILE}`] = {uri: 'fake-uri'}; + const up = upload({bucket: BUCKET, file: FILE}); + assert.strictEqual(up.uriProvidedManually, false); + assert.strictEqual(up.uri, 'fake-uri'); + }); + + it('should not have `chunkSize` by default', () => { + const up = upload({bucket: BUCKET, file: FILE}); + assert.strictEqual(up.chunkSize, undefined); + }); + + it('should accept and set `chunkSize`', () => { + const up = upload({bucket: BUCKET, file: FILE, chunkSize: 123}); + assert.strictEqual(up.chunkSize, 123); + }); + + it('should set `upstreamEnded` to `true` on `prefinish`', () => { + const up = upload({bucket: BUCKET, file: FILE, chunkSize: 123}); + + assert.strictEqual(up.upstreamEnded, false); + + up.emit('prefinish'); + + assert.strictEqual(up.upstreamEnded, true); + }); + + describe('on write', () => { + let uri = ''; + + beforeEach(() => { + uri = 'uri'; + }); + + it("should emit 'writing' when piped", done => { + let read = false; + const upstreamBuffer = new Readable({ + read() { + if (!read) { + this.push(Buffer.alloc(1)); + read = true; + } + }, + }); + + up.createURI = () => {}; + up.once('writing', () => { + upstreamBuffer.push(null); + done(); + }); + upstreamBuffer.pipe(up); + }); + + it("should emit 'finished' after 'prepareFinish'", async () => { + const upstreamBuffer = new Readable({ + read() { + this.push(null); + }, + }); + + up.createURI = () => {}; + await new Promise(resolve => { + up.once('writing', resolve); + upstreamBuffer.pipe(up); + }); + + assert(up.upstream.readable); + + await new Promise(resolve => { + up.once('finish', resolve); + up.emit('prepareFinish'); + }); + }); + + it('should continue uploading', done => { + up.uri = uri; + up.continueUploading = done; + up.emit('writing'); + }); + + it('should create an upload', done => { + up.startUploading = done; + up.createURI = (callback: CreateUriCallback) => { + callback(null); + }; + up.emit('writing'); + }); + + it('should destroy the stream from an error', done => { + const error: ApiError = { + message: ':(', + name: ':(', + code: 123, + }; + up.destroy = (err: ApiError) => { + assert(err.message.indexOf(error.message) > -1); + assert(err.name.indexOf(error.name) > -1); + assert.strictEqual(err.code, 123); + done(); + }; + up.createURI = (callback: CreateUriCallback) => { + callback(error); + }; + up.emit('writing'); + }); + + it('should save the uri to config on first write event', done => { + const uri = 'http://newly-created-uri'; + up.createURI = (callback: CreateUriCallback) => { + callback(null, uri); + }; + up.set = (props: {}) => { + assert.deepStrictEqual(props, {uri}); + done(); + }; + up.emit('writing'); + }); + }); + }); + + describe('#upstream', () => { + beforeEach(() => { + up.createURI = () => {}; + }); + + it('should write to `writeToChunkBuffer`', done => { + up.on('wroteToChunkBuffer', () => { + assert.equal(up.upstreamChunkBuffer.byteLength, 16); + assert.equal(up.chunkBufferEncoding, 'buffer'); + done(); + }); + + up.write(Buffer.alloc(16)); + }); + + it("should setup a 'prepareFinish' handler", done => { + assert.equal(up.eventNames().includes('prepareFinish'), false); + + up.on('wroteToChunkBuffer', () => { + assert.equal(up.eventNames().includes('prepareFinish'), true); + done(); + }); + + up.write(Buffer.alloc(16)); + }); + + it("should finish only after 'prepareFinish' is emitted", done => { + const upstreamBuffer = new Readable({ + read() { + this.push(Buffer.alloc(1)); + this.push(null); + }, + }); + + // Readable has ended + upstreamBuffer.on('end', () => { + // The data has been written to the buffer + up.on('wroteToChunkBuffer', () => { + // Allow the writer's callback be called immediately + up.emit('readFromChunkBuffer'); + + // setting up the listener now to prove it hasn't been fired before + up.on('finish', done); + up.emit('prepareFinish'); + }); + }); + + upstreamBuffer.pipe(up); + }); + }); + + describe('#writeToChunkBuffer', () => { + it('should append buffer to existing `upstreamChunkBuffer`', () => { + up.upstreamChunkBuffer = Buffer.from('abc'); + up.writeToChunkBuffer(Buffer.from('def'), 'buffer', () => {}); + + assert.equal( + Buffer.compare(up.upstreamChunkBuffer, Buffer.from('abcdef')), + 0 + ); + }); + + it('should convert string with encoding to Buffer and append to existing `upstreamChunkBuffer`', () => { + const sample = '🦃'; + + assert.equal(up.chunkBufferEncoding, undefined); + up.writeToChunkBuffer(sample, 'utf-8', () => {}); + + assert(Buffer.isBuffer(up.upstreamChunkBuffer)); + assert.equal(up.upstreamChunkBuffer.toString(), sample); + assert.equal(up.chunkBufferEncoding, 'utf-8'); + }); + + it("should callback on 'readFromChunkBuffer'", done => { + up.writeToChunkBuffer('sample', 'utf-8', done); + up.emit('readFromChunkBuffer'); + }); + + it("should emit 'wroteToChunkBuffer' asynchronously", done => { + up.writeToChunkBuffer('sample', 'utf-8', () => {}); + + // setting this here proves it's async + up.on('wroteToChunkBuffer', done); + }); + }); + + describe('#unshiftChunkBuffer', () => { + it('should synchronously prepend to existing buffer', () => { + up.upstreamChunkBuffer = Buffer.from('456'); + + up.unshiftChunkBuffer(Buffer.from('123')); + assert.equal( + Buffer.compare(up.upstreamChunkBuffer, Buffer.from('123456')), + 0 + ); + }); + }); + + describe('#pullFromChunkBuffer', () => { + it('should retrieve from the beginning of the `upstreamChunkBuffer`', () => { + up.upstreamChunkBuffer = Buffer.from('ab'); + + const chunk = up.pullFromChunkBuffer(1); + assert.equal(chunk.toString(), 'a'); + assert.equal(up.upstreamChunkBuffer.toString(), 'b'); + }); + + it('should retrieve no more than the limit provided', () => { + up.upstreamChunkBuffer = Buffer.from('0123456789'); + + const chunk = up.pullFromChunkBuffer(4); + assert.equal(chunk.toString(), '0123'); + assert.equal(up.upstreamChunkBuffer.toString(), '456789'); + }); + + it('should retrieve less than the limit if no more data is available', () => { + up.upstreamChunkBuffer = Buffer.from('0123456789'); + + const chunk = up.pullFromChunkBuffer(512); + assert.equal(chunk.toString(), '0123456789'); + assert.equal(up.upstreamChunkBuffer.toString(), ''); + }); + + it('should return all data if `Infinity` is provided', () => { + up.upstreamChunkBuffer = Buffer.from('0123456789'); + const chunk = up.pullFromChunkBuffer(Infinity); + assert.equal(chunk.toString(), '0123456789'); + assert.equal(up.upstreamChunkBuffer.toString(), ''); + }); + + it("should emit 'readFromChunkBuffer' asynchronously", done => { + up.pullFromChunkBuffer(0); + + // setting this here proves it's async + up.on('readFromChunkBuffer', done); + }); + }); + + describe('#waitForNextChunk', () => { + it('should resolve `true` asynchronously if `upstreamChunkBuffer.byteLength` has data', async () => { + up.upstreamChunkBuffer = Buffer.from('ab'); + + assert(await up.waitForNextChunk()); + }); + + it('should resolve `false` asynchronously if `upstreamEnded`', async () => { + up.upstreamEnded = true; + + assert.equal(await up.waitForNextChunk(), false); + }); + + it('should resolve `true` asynchronously if `upstreamChunkBuffer.byteLength` and `upstreamEnded`', async () => { + up.upstreamChunkBuffer = Buffer.from('ab'); + up.upstreamEnded = true; + + assert(await up.waitForNextChunk()); + }); + + it('should wait for `wroteToChunkBuffer` if !`upstreamChunkBuffer.byteLength` && !`upstreamEnded`', async () => { + const result = await new Promise(resolve => { + up.waitForNextChunk().then(resolve); + up.emit('wroteToChunkBuffer'); + }); + + assert(result); + }); + + it("should wait for upstream to 'finish' if !`upstreamChunkBuffer.byteLength` && !`upstreamEnded`", async () => { + await new Promise(resolve => { + up.waitForNextChunk().then(resolve); + up.upstream.emit('finish'); + }); + }); + + it("should wait for upstream to 'finish' and resolve `false` if data is not available", async () => { + const result = await new Promise(resolve => { + up.waitForNextChunk().then(resolve); + up.upstream.emit('finish'); + }); + + assert.equal(result, false); + }); + + it("should wait for upstream to 'finish' and resolve `true` if data is available", async () => { + const result = await new Promise(resolve => { + up.upstream.on('newListener', (event: string) => { + if (event === 'finish') { + // Update the `upstreamChunkBuffer` before emitting 'finish' + up.upstreamChunkBuffer = Buffer.from('abc'); + + process.nextTick(() => up.upstream.emit('finish')); + } + }); + + up.waitForNextChunk().then(resolve); + }); + + assert.equal(result, true); + }); + + it("should wait for 'prefinish' if !`upstreamChunkBuffer.byteLength` && !`upstreamEnded`", async () => { + await new Promise(resolve => { + up.waitForNextChunk().then(resolve); + up.emit('prefinish'); + }); + }); + + it("should wait for 'prefinish' and resolve `false` if data is not available", async () => { + const result = await new Promise(resolve => { + up.waitForNextChunk().then(resolve); + up.emit('prefinish'); + }); + + assert.equal(result, false); + }); + + it("should wait for 'prefinish' and resolve `true` if data is available", async () => { + const result = await new Promise(resolve => { + up.on('newListener', (event: string) => { + if (event === 'prefinish') { + // Update the `upstreamChunkBuffer` before emitting 'prefinish' + up.upstreamChunkBuffer = Buffer.from('abc'); + + process.nextTick(() => up.emit('prefinish')); + } + }); + + up.waitForNextChunk().then(resolve); + }); + + assert.equal(result, true); + }); + + it('should remove listeners after calling back from `wroteToChunkBuffer`', async () => { + assert.equal(up.listenerCount('finish'), 0); + assert.equal(up.listenerCount('wroteToChunkBuffer'), 0); + assert.equal(up.listenerCount('prefinish'), 1); + + await new Promise(resolve => { + up.on('newListener', (event: string) => { + if (event === 'wroteToChunkBuffer') { + process.nextTick(() => up.emit('wroteToChunkBuffer')); + } + }); + + up.waitForNextChunk().then(resolve); + }); + + assert.equal(up.listenerCount('finish'), 0); + assert.equal(up.listenerCount('wroteToChunkBuffer'), 0); + assert.equal(up.listenerCount('prefinish'), 1); + }); + + it("should remove listeners after calling back from upstream to 'finish'", async () => { + assert.equal(up.listenerCount('finish'), 0); + assert.equal(up.listenerCount('wroteToChunkBuffer'), 0); + assert.equal(up.listenerCount('prefinish'), 1); + + await new Promise(resolve => { + up.upstream.on('newListener', (event: string) => { + if (event === 'finish') { + process.nextTick(() => up.upstream.emit('finish')); + } + }); + + up.waitForNextChunk().then(resolve); + }); + + assert.equal(up.listenerCount('finish'), 0); + assert.equal(up.listenerCount('wroteToChunkBuffer'), 0); + assert.equal(up.listenerCount('prefinish'), 1); + }); + + it("should remove listeners after calling back from 'prefinish'", async () => { + assert.equal(up.listenerCount('finish'), 0); + assert.equal(up.listenerCount('wroteToChunkBuffer'), 0); + assert.equal(up.listenerCount('prefinish'), 1); + + await new Promise(resolve => { + up.on('newListener', (event: string) => { + if (event === 'prefinish') { + process.nextTick(() => up.emit('prefinish')); + } + }); + + up.waitForNextChunk().then(resolve); + }); + + assert.equal(up.listenerCount('finish'), 0); + assert.equal(up.listenerCount('wroteToChunkBuffer'), 0); + assert.equal(up.listenerCount('prefinish'), 1); + }); + }); + + describe('#upstreamIterator', () => { + it('should yield all data from upstream by default', done => { + up.upstreamChunkBuffer = Buffer.alloc(1); + up.pullFromChunkBuffer = (limit: number) => { + assert.equal(limit, Infinity); + done(); + }; + + const iterator = up.upstreamIterator(); + iterator.next(); + }); + + it('should yield up to limit if provided', async () => { + up.upstreamChunkBuffer = Buffer.alloc(16); + + let data = Buffer.alloc(0); + + for await (const {chunk} of up.upstreamIterator(8)) { + data = Buffer.concat([data, chunk]); + } + + assert.equal(data.byteLength, 8); + }); + + it("should yield less than the limit if that's all that's available", async () => { + up.upstreamChunkBuffer = Buffer.alloc(8); + up.upstreamEnded = true; + + let data = Buffer.alloc(0); + + for await (const {chunk} of up.upstreamIterator(16)) { + data = Buffer.concat([data, chunk]); + } + + assert.equal(data.byteLength, 8); + }); + + it('should yield many, arbitrarily sized chunks by default', async () => { + up.waitForNextChunk = () => true; + up.pullFromChunkBuffer = () => Buffer.from('a'); + + let data = Buffer.alloc(0); + let count = 0; + + for await (const {chunk} of up.upstreamIterator(16)) { + data = Buffer.concat([data, chunk]); + count++; + } + + assert.equal(data.toString(), 'a'.repeat(16)); + assert.equal(count, 16); + }); + + it('should yield one single chunk if `oneChunkMode`', async () => { + up.waitForNextChunk = () => true; + up.pullFromChunkBuffer = () => Buffer.from('b'); + + let data = Buffer.alloc(0); + let count = 0; + + for await (const {chunk} of up.upstreamIterator(16, true)) { + data = Buffer.concat([data, chunk]); + count++; + } + + assert.equal(data.toString(), 'b'.repeat(16)); + assert.equal(count, 1); + }); + }); + + describe('#createURI', () => { + it('should make the correct request', done => { + up.makeRequest = async (reqOpts: GaxiosOptions) => { + assert.strictEqual(reqOpts.method, 'POST'); + assert.strictEqual(reqOpts.url, `${BASE_URI}/${BUCKET}/o`); + assert.deepStrictEqual(reqOpts.params, { + predefinedAcl: up.predefinedAcl, + name: FILE, + uploadType: 'resumable', + ifGenerationMatch: GENERATION, + ifMetagenerationNotMatch: PARAMS.ifMetagenerationNotMatch, + }); + assert.strictEqual(reqOpts.data, up.metadata); + done(); + return {headers: {location: '/foo'}}; + }; + up.createURI(); + }); + + it('should pass through the KMS key name', done => { + const kmsKeyName = 'kms-key-name'; + const up = upload({bucket: BUCKET, file: FILE, kmsKeyName}); + + up.makeRequest = async (reqOpts: GaxiosOptions) => { + assert.strictEqual(reqOpts.params.kmsKeyName, kmsKeyName); + done(); + return {headers: {location: '/foo'}}; + }; + + up.createURI(); + }); + + it('should respect 0 as a generation', done => { + up.makeRequest = async (reqOpts: GaxiosOptions) => { + assert.strictEqual(reqOpts.params.ifGenerationMatch, 0); + done(); + return {headers: {location: '/foo'}}; + }; + up.generation = 0; + up.createURI(); + }); + + describe('error', () => { + const error = new Error(':('); + + beforeEach(() => { + up.makeRequest = async () => { + throw error; + }; + }); + + it('should exec callback with error', done => { + up.createURI((err: Error) => { + assert.strictEqual(err, error); + done(); + }); + }); + }); + + describe('success', () => { + const URI = 'uri'; + const RESP = {headers: {location: URI}}; + + beforeEach(() => { + up.makeRequest = async () => { + return RESP; + }; + }); + + it('should localize the uri', done => { + up.createURI((err: Error) => { + assert.ifError(err); + assert.strictEqual(up.uri, URI); + assert.strictEqual(up.offset, 0); + done(); + }); + }); + + it('should default the offset to 0', done => { + up.createURI((err: Error) => { + assert.ifError(err); + assert.strictEqual(up.offset, 0); + done(); + }); + }); + + it('should exec callback with URI', done => { + up.createURI((err: Error, uri: string) => { + assert.ifError(err); + assert.strictEqual(uri, URI); + done(); + }); + }); + }); + }); + + describe('#continueUploading', () => { + it('should start uploading if an offset was set', done => { + up.offset = 0; + up.startUploading = async () => { + done(); + }; + up.continueUploading(); + }); + + it('should get and set offset if no offset was set', done => { + up.getAndSetOffset = async () => { + done(); + }; + up.startUploading = () => Promise.resolve(); + up.continueUploading(); + }); + + it('should start uploading when done', done => { + up.startUploading = async function () { + assert.strictEqual(this, up); + done(); + }; + up.getAndSetOffset = () => Promise.resolve(); + up.continueUploading(); + }); + }); + + describe('#startUploading', () => { + beforeEach(() => { + up.makeRequestStream = async () => new PassThrough(); + up.upstreamChunkBuffer = Buffer.alloc(16); + }); + + it('should reset `numChunksReadInRequest` to 0', async () => { + up.numChunksReadInRequest = 1; + + await up.startUploading(); + + assert.equal(up.numChunksReadInRequest, 0); + }); + + it('should set `offset` to 0 when not set', async () => { + assert.equal(up.offset, undefined); + + await up.startUploading(); + + assert.equal(up.offset, 0); + }); + + it('should emit error if `offset` < `numBytesWritten`', done => { + up.numBytesWritten = 1; + + up.on('error', (error: Error) => { + assert(error instanceof RangeError); + assert( + /The offset is lower than the number of bytes written/.test( + error.message + ) + ); + done(); + }); + + up.startUploading(); + }); + + it("should 'fast-forward' upstream if `numBytesWritten` < `offset`", async () => { + up.upstreamChunkBuffer = Buffer.alloc(24); + + up.offset = 9; + up.numBytesWritten = 1; + + await up.startUploading(); + + // Should fast-forward (9-1) bytes + assert.equal(up.offset, 9); + assert.equal(up.numBytesWritten, 9); + assert.equal(up.upstreamChunkBuffer.byteLength, 16); + }); + + it('should emit a progress event with the bytes written', done => { + up.upstreamChunkBuffer = Buffer.alloc(24); + up.upstreamEnded = true; + up.contentLength = 24; + + up.on( + 'progress', + (data: {bytesWritten: number; contentLength: number}) => { + assert.equal(data.bytesWritten, 24); + assert.equal(data.contentLength, 24); + + done(); + } + ); + + up.makeRequestStream = async (reqOpts: GaxiosOptions) => { + reqOpts.body.on('data', () => {}); + }; + + up.startUploading(); + }); + + it("should setup a 'response' listener", async () => { + assert.equal(up.eventNames().includes('response'), false); + + await up.startUploading(); + + assert.equal(up.eventNames().includes('response'), true); + }); + + it('should destroy the stream if the request failed', done => { + const error = new Error('Error.'); + up.on('error', (e: Error) => { + assert.strictEqual(e, error); + done(); + }); + + up.makeRequestStream = async () => { + throw error; + }; + up.startUploading(); + }); + + describe('request preparation', () => { + // a convenient handle for getting the request options + let reqOpts: GaxiosOptions; + + async function getAllDataFromRequest() { + let payload = Buffer.alloc(0); + + await new Promise(resolve => { + reqOpts.body.on('data', (data: Buffer) => { + payload = Buffer.concat([payload, data]); + }); + + reqOpts.body.on('end', () => { + resolve(payload); + }); + }); + + return payload; + } + + beforeEach(() => { + reqOpts = {}; + up.makeRequestStream = async (requestOptions: GaxiosOptions) => { + assert.equal(requestOptions.method, 'PUT'); + assert.equal(requestOptions.url, up.uri); + assert.equal(typeof requestOptions.headers, 'object'); + assert(requestOptions.body instanceof Readable); + + reqOpts = requestOptions; + }; + up.upstreamChunkBuffer = Buffer.alloc(512); + up.upstreamEnded = true; + }); + + describe('single chunk', () => { + it('should use `contentLength` and `offset` if set', async () => { + const OFFSET = 100; + const CONTENT_LENGTH = 123; + + up.offset = OFFSET; + up.contentLength = CONTENT_LENGTH; + + await up.startUploading(); + + assert.deepEqual(reqOpts.headers, { + 'Content-Range': `bytes ${OFFSET}-*/${CONTENT_LENGTH}`, + }); + + const data = await getAllDataFromRequest(); + + assert.equal(data.byteLength, 23); + }); + + it('should prepare a valid request if `contentLength` is unknown', async () => { + up.contentLength = '*'; + + await up.startUploading(); + + assert.deepEqual(reqOpts.headers, { + 'Content-Range': 'bytes 0-*/*', + }); + + const data = await getAllDataFromRequest(); + + assert.equal(data.byteLength, 512); + }); + }); + + describe('multiple chunk', () => { + const CHUNK_SIZE = 256; + + beforeEach(() => { + up.chunkSize = CHUNK_SIZE; + }); + + it('should use `chunkSize` if less than `contentLength`', async () => { + const OFFSET = 100; + const CONTENT_LENGTH = 512; + + up.offset = OFFSET; + up.contentLength = CONTENT_LENGTH; + + await up.startUploading(); + + const endByte = OFFSET + CHUNK_SIZE - 1; + assert.deepEqual(reqOpts.headers, { + 'Content-Length': CHUNK_SIZE, + 'Content-Range': `bytes ${OFFSET}-${endByte}/${CONTENT_LENGTH}`, + }); + + const data = await getAllDataFromRequest(); + + assert.equal(data.byteLength, CHUNK_SIZE); + }); + + it('should prepare a valid request if `contentLength` is unknown', async () => { + const OFFSET = 100; + + up.offset = OFFSET; + up.contentLength = '*'; + + await up.startUploading(); + + const endByte = OFFSET + CHUNK_SIZE - 1; + assert.deepEqual(reqOpts.headers, { + 'Content-Length': CHUNK_SIZE, + 'Content-Range': `bytes ${OFFSET}-${endByte}/*`, + }); + + const data = await getAllDataFromRequest(); + + assert.equal(data.byteLength, CHUNK_SIZE); + }); + + it('should prepare a valid request if the remaining data is less than `chunkSize`', async () => { + const NUM_BYTES_WRITTEN = 400; + const OFFSET = NUM_BYTES_WRITTEN; + const CONTENT_LENGTH = 512; + + up.offset = OFFSET; + up.numBytesWritten = NUM_BYTES_WRITTEN; + up.contentLength = CONTENT_LENGTH; + + await up.startUploading(); + + const endByte = CONTENT_LENGTH - NUM_BYTES_WRITTEN + OFFSET - 1; + assert.deepEqual(reqOpts.headers, { + 'Content-Length': CONTENT_LENGTH - NUM_BYTES_WRITTEN, + 'Content-Range': `bytes ${OFFSET}-${endByte}/${CONTENT_LENGTH}`, + }); + const data = await getAllDataFromRequest(); + + assert.equal(data.byteLength, CONTENT_LENGTH - NUM_BYTES_WRITTEN); + }); + }); + }); + }); + + describe('#responseHandler', () => { + it('should emit the metadata', done => { + const BODY = {hi: 1}; + const RESP = {data: BODY, status: 200}; + up.on('metadata', (body: {}) => { + assert.strictEqual(body, BODY); + done(); + }); + + up.responseHandler(RESP); + }); + + it('should return response data size as number', done => { + const metadata = { + size: '0', + }; + const RESP = {data: metadata, status: 200}; + up.on('metadata', (data: {size: number}) => { + assert.strictEqual(Number(metadata.size), data.size); + assert.strictEqual(typeof data.size, 'number'); + done(); + }); + + up.responseHandler(RESP); + }); + + it('should destroy the stream if an error occurred', done => { + const RESP = {data: {error: new Error('Error.')}}; + up.on('metadata', done); + // metadata shouldn't be emitted... will blow up test if called + up.destroy = (err: Error) => { + assert.strictEqual(err, RESP.data.error); + done(); + }; + up.responseHandler(RESP); + }); + + it('should destroy the stream if the status code is out of range', done => { + const RESP = {data: {}, status: 300}; + up.on('metadata', done); + // metadata shouldn't be emitted... will blow up test if called + up.destroy = (err: Error) => { + assert.strictEqual(err.message, 'Upload failed'); + done(); + }; + up.responseHandler(RESP); + }); + + it('should delete the config', done => { + const RESP = {data: '', status: 200}; + up.deleteConfig = done; + up.responseHandler(RESP); + }); + + it('should emit `prepareFinish` when request succeeds', done => { + const RESP = {data: '', status: 200}; + up.once('prepareFinish', done); + + up.responseHandler(RESP); + }); + + it('should continue with multi-chunk upload when incomplete', done => { + const lastByteReceived = 9; + + const RESP = { + data: '', + status: RESUMABLE_INCOMPLETE_STATUS_CODE, + headers: { + range: `bytes=0-${lastByteReceived}`, + }, + }; + + up.chunkSize = 1; + + up.continueUploading = () => { + assert.equal(up.offset, lastByteReceived + 1); + + done(); + }; + + up.responseHandler(RESP); + }); + + it('should unshift missing data if server did not receive the entire chunk', done => { + const NUM_BYTES_WRITTEN = 20; + const LAST_CHUNK_LENGTH = 256; + const UPSTREAM_BUFFER_LENGTH = 1024; + const lastByteReceived = 9; + const expectedUnshiftAmount = NUM_BYTES_WRITTEN - lastByteReceived - 1; + + const RESP = { + data: '', + status: RESUMABLE_INCOMPLETE_STATUS_CODE, + headers: { + range: `bytes=0-${lastByteReceived}`, + }, + }; + + up.chunkSize = 256; + up.numBytesWritten = NUM_BYTES_WRITTEN; + up.upstreamChunkBuffer = Buffer.alloc(UPSTREAM_BUFFER_LENGTH, 'b'); + + up.lastChunkSent = Buffer.concat([ + Buffer.alloc(LAST_CHUNK_LENGTH, 'c'), + // different to ensure this is the data that's prepended + Buffer.alloc(expectedUnshiftAmount, 'a'), + ]); + + up.continueUploading = () => { + assert.equal(up.offset, lastByteReceived + 1); + assert.equal( + up.upstreamChunkBuffer.byteLength, + UPSTREAM_BUFFER_LENGTH + expectedUnshiftAmount + ); + assert.equal( + up.upstreamChunkBuffer.slice(0, expectedUnshiftAmount).toString(), + 'a'.repeat(expectedUnshiftAmount) + ); + + // we should discard part of the last chunk, as we know what the server + // has at this point. + assert.equal(up.lastChunkSent.byteLength, 0); + + done(); + }; + + up.responseHandler(RESP); + }); + }); + + describe('#ensureUploadingSameObject', () => { + let chunk = Buffer.alloc(0); + + beforeEach(() => { + chunk = crypto.randomBytes(512); + up.upstreamChunkBuffer = chunk; + }); + + it('should not alter the chunk buffer', async () => { + await up.ensureUploadingSameObject(); + + assert.equal(Buffer.compare(up.upstreamChunkBuffer, chunk), 0); + }); + + describe('first write', () => { + it('should get the first chunk', async () => { + let calledGet = false; + up.get = (prop: string) => { + assert.strictEqual(prop, 'firstChunk'); + calledGet = true; + }; + + const result = await up.ensureUploadingSameObject(); + + assert(result); + assert(calledGet); + }); + + describe('new upload', () => { + it('should save the uri and first chunk (16 bytes) if its not cached', done => { + const URI = 'uri'; + up.uri = URI; + up.get = () => {}; + up.set = (props: {uri?: string; firstChunk: Buffer}) => { + const firstChunk = chunk.slice(0, 16); + assert.deepStrictEqual(props.uri, URI); + assert.strictEqual(Buffer.compare(props.firstChunk, firstChunk), 0); + done(); + }; + up.ensureUploadingSameObject(); + }); + }); + + describe('continued upload', () => { + beforeEach(() => { + up.restart = () => {}; + }); + + it('should not `#restart` and return `true` if cache is the same', async () => { + up.upstreamChunkBuffer = Buffer.alloc(512, 'a'); + up.get = (param: string) => { + return param === 'firstChunk' ? Buffer.alloc(16, 'a') : undefined; + }; + + let calledRestart = false; + up.restart = () => { + calledRestart = true; + }; + + const result = await up.ensureUploadingSameObject(); + + assert(result); + assert.equal(calledRestart, false); + }); + + it('should `#restart` and return `false` if different', async () => { + up.upstreamChunkBuffer = Buffer.alloc(512, 'a'); + up.get = (param: string) => { + return param === 'firstChunk' ? Buffer.alloc(16, 'b') : undefined; + }; + + let calledRestart = false; + up.restart = () => { + calledRestart = true; + }; + + const result = await up.ensureUploadingSameObject(); + + assert(calledRestart); + assert.equal(result, false); + }); + }); + }); + }); + + describe('#getAndSetOffset', () => { + const RANGE = 123456; + const RESP = {status: 308, headers: {range: `range-${RANGE}`}}; + + it('should make the correct request', done => { + const URI = 'uri'; + up.uri = URI; + up.makeRequest = async (reqOpts: GaxiosOptions) => { + assert.strictEqual(reqOpts.method, 'PUT'); + assert.strictEqual(reqOpts.url, URI); + assert.deepStrictEqual(reqOpts.headers, { + 'Content-Length': 0, + 'Content-Range': 'bytes */*', + }); + done(); + return {}; + }; + up.getAndSetOffset(); + }); + + describe('restart on 404', () => { + const RESP = {status: 404} as GaxiosResponse; + const ERROR = new Error(':(') as GaxiosError; + ERROR.response = RESP; + + beforeEach(() => { + up.makeRequest = async () => { + throw ERROR; + }; + }); + + it('should restart the upload', done => { + up.restart = done; + up.getAndSetOffset(); + }); + + it('should not restart if URI provided manually', done => { + up.uriProvidedManually = true; + up.restart = done; // will cause test to fail + up.on('error', (err: Error) => { + assert.strictEqual(err, ERROR); + done(); + }); + up.getAndSetOffset(); + }); + }); + + describe('restart on 410', () => { + const ERROR = new Error(':(') as GaxiosError; + const RESP = {status: 410} as GaxiosResponse; + ERROR.response = RESP; + + beforeEach(() => { + up.makeRequest = async () => { + throw ERROR; + }; + }); + + it('should restart the upload', done => { + up.restart = done; + up.getAndSetOffset(); + }); + }); + + it('should set the offset from the range', async () => { + up.makeRequest = async () => RESP; + await up.getAndSetOffset(); + assert.strictEqual(up.offset, RANGE + 1); + }); + + it('should set the offset to 0 if no range is back from the API', async () => { + up.makeRequest = async () => { + return {}; + }; + await up.getAndSetOffset(); + assert.strictEqual(up.offset, 0); + }); + }); + + describe('#makeRequest', () => { + it('should set encryption headers', async () => { + const key = crypto.randomBytes(32); + const up = upload({ + bucket: 'BUCKET', + file: FILE, + key, + authConfig: {keyFile}, + }); + const scopes = [ + mockAuthorizeRequest(), + nock(REQ_OPTS.url!).get('/').reply(200, {}), + ]; + const res = await up.makeRequest(REQ_OPTS); + scopes.forEach(x => x.done()); + const headers = res.config.headers; + assert.strictEqual(headers['x-goog-encryption-algorithm'], 'AES256'); + assert.strictEqual(headers['x-goog-encryption-key'], up.encryption.key); + assert.strictEqual( + headers['x-goog-encryption-key-sha256'], + up.encryption.hash + ); + }); + + it('should set userProject', async () => { + const scopes = [ + mockAuthorizeRequest(), + nock(REQ_OPTS.url!).get(queryPath).reply(200, {}), + ]; + const res: GaxiosResponse = await up.makeRequest(REQ_OPTS); + assert.strictEqual(res.config.url, REQ_OPTS.url + queryPath.slice(1)); + scopes.forEach(x => x.done()); + }); + + it('should set validate status', done => { + up.authClient = { + request: (reqOpts: GaxiosOptions) => { + assert.strictEqual(reqOpts.validateStatus!(100), false); + assert.strictEqual(reqOpts.validateStatus!(199), false); + assert.strictEqual(reqOpts.validateStatus!(300), false); + assert.strictEqual(reqOpts.validateStatus!(400), false); + assert.strictEqual(reqOpts.validateStatus!(500), false); + + assert.strictEqual(reqOpts.validateStatus!(200), true); + assert.strictEqual(reqOpts.validateStatus!(299), true); + assert.strictEqual(reqOpts.validateStatus!(308), true); + + done(); + + return {}; + }, + }; + up.makeRequest(REQ_OPTS); + }); + + it('should make the correct request', async () => { + const scopes = [ + mockAuthorizeRequest(), + nock(REQ_OPTS.url!).get(queryPath).reply(200, undefined, {}), + ]; + const res = await up.makeRequest(REQ_OPTS); + scopes.forEach(x => x.done()); + assert.strictEqual(res.config.url, REQ_OPTS.url + queryPath.slice(1)); + assert.deepStrictEqual(res.headers, {}); + }); + + it('should bypass authentication if emulator context detected', async () => { + up = upload({ + bucket: BUCKET, + file: FILE, + customRequestOptions: CUSTOM_REQUEST_OPTIONS, + generation: GENERATION, + metadata: METADATA, + origin: ORIGIN, + params: PARAMS, + predefinedAcl: PREDEFINED_ACL, + userProject: USER_PROJECT, + authConfig: {keyFile}, + apiEndpoint: 'https://fake.endpoint.com', + }); + const scopes = [ + nock(REQ_OPTS.url!).get(queryPath).reply(200, undefined, {}), + ]; + const res = await up.makeRequest(REQ_OPTS); + scopes.forEach(x => x.done()); + assert.strictEqual(res.config.url, REQ_OPTS.url + queryPath.slice(1)); + assert.deepStrictEqual(res.headers, {}); + }); + + it('should combine customRequestOptions', done => { + const up = upload({ + bucket: BUCKET, + file: FILE, + customRequestOptions: { + headers: { + 'X-My-Header': 'My custom value', + }, + }, + }); + mockAuthorizeRequest(); + up.authClient = { + request: (reqOpts: GaxiosOptions) => { + const customHeader = + reqOpts.headers && reqOpts.headers['X-My-Header']; + assert.strictEqual(customHeader, 'My custom value'); + setImmediate(done); + return {}; + }, + }; + up.makeRequest(REQ_OPTS); + }); + + it('should execute the callback with a body error & response', async () => { + const error = new GaxiosError('Error message', {}, { + config: {}, + data: {}, + status: 500, + statusText: 'sad trombone', + headers: {}, + } as GaxiosResponse); + mockAuthorizeRequest(); + const scope = nock(REQ_OPTS.url!).get(queryPath).reply(500, {error}); + await assert.rejects(up.makeRequest(REQ_OPTS), (err: GaxiosError) => { + scope.done(); + assert.strictEqual(err.code, '500'); + return true; + }); + }); + + it('should execute the callback with a body error & response for non-2xx status codes', async () => { + const error = new GaxiosError('Error message', {}, { + config: {}, + data: {}, + status: 500, + statusText: 'sad trombone', + headers: {}, + } as GaxiosResponse); + mockAuthorizeRequest(); + const scope = nock(REQ_OPTS.url!).get(queryPath).reply(500, {error}); + await assert.rejects(up.makeRequest(REQ_OPTS), (err: GaxiosError) => { + scope.done(); + assert.deepStrictEqual(err.code, '500'); + return true; + }); + }); + + it('should execute the callback', async () => { + const data = {red: 'tape'}; + mockAuthorizeRequest(); + up.onResponse = () => true; + const scope = nock(REQ_OPTS.url!).get(queryPath).reply(200, data); + const res = await up.makeRequest(REQ_OPTS); + scope.done(); + assert.strictEqual(res.status, 200); + assert.deepStrictEqual(res.data, data); + }); + }); + + describe('#makeRequestStream', () => { + beforeEach(() => { + up.authClient = {request: () => {}}; + up.onResponse = () => {}; + }); + + it('should pass a signal from the abort controller', done => { + up.authClient = { + request: (reqOpts: GaxiosOptions) => { + assert(reqOpts.signal instanceof AbortController); + done(); + }, + }; + up.makeRequestStream(REQ_OPTS); + }); + + it('should abort on an error', done => { + up.on('error', () => {}); + + let abortController: AbortController; + up.authClient = { + request: (reqOpts: GaxiosOptions) => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + abortController = reqOpts.signal as any; + }, + }; + + up.makeRequestStream(REQ_OPTS); + up.emit('error', new Error('Error.')); + + setImmediate(() => { + assert.strictEqual(abortController.aborted, true); + done(); + }); + }); + + it('should set userProject', done => { + up.userProject = 'user-project'; + up.authClient = { + request: (reqOpts: GaxiosOptions) => { + assert.deepStrictEqual(reqOpts.params, {userProject: 'user-project'}); + done(); + }, + }; + up.makeRequestStream(REQ_OPTS); + }); + + it('should not remove existing params when userProject is set', done => { + REQ_OPTS.params = {a: 'b', c: 'd'}; + up.userProject = 'user-project'; + up.authClient = { + request: (reqOpts: GaxiosOptions) => { + assert.deepStrictEqual(reqOpts.params, { + userProject: 'user-project', + a: 'b', + c: 'd', + }); + done(); + }, + }; + up.makeRequestStream(REQ_OPTS); + }); + + it('should always validate the status', done => { + up.authClient = { + request: (reqOpts: GaxiosOptions) => { + assert.strictEqual(reqOpts.validateStatus!(0), true); + done(); + }, + }; + up.makeRequestStream(REQ_OPTS); + }); + + it('should combine customRequestOptions', done => { + const up = upload({ + bucket: BUCKET, + file: FILE, + customRequestOptions: { + headers: { + 'X-My-Header': 'My custom value', + }, + }, + }); + mockAuthorizeRequest(); + up.authClient = { + request: (reqOpts: GaxiosOptions) => { + const customHeader = + reqOpts.headers && reqOpts.headers['X-My-Header']; + assert.strictEqual(customHeader, 'My custom value'); + setImmediate(done); + return {}; + }, + }; + up.makeRequestStream(REQ_OPTS); + }); + + it('should pass the response to the handler', done => { + const response = {}; + up.authClient = { + request: async () => response, + }; + up.onResponse = (res: GaxiosResponse) => { + assert.strictEqual(res, response); + done(); + }; + up.makeRequestStream(REQ_OPTS); + }); + + it('should return the response', async () => { + const response = {}; + up.authClient = { + request: async () => response, + }; + const stream = await up.makeRequestStream(REQ_OPTS); + assert.strictEqual(stream, response); + }); + }); + + describe('#restart', () => { + beforeEach(() => { + up.createURI = () => {}; + }); + + it('should throw if `numBytesWritten` is not 0', done => { + up.numBytesWritten = 8; + + up.on('error', (error: Error) => { + assert(error instanceof RangeError); + assert( + /Attempting to restart an upload after unrecoverable bytes have been written/.test( + error.message + ) + ); + done(); + }); + + up.restart(); + }); + + it('should delete the config', done => { + up.deleteConfig = done; + up.restart(); + }); + + describe('starting a new upload', () => { + it('should create a new URI', done => { + up.createURI = () => { + done(); + }; + + up.restart(); + }); + + it('should destroy stream if it cannot create a URI', done => { + const error = new Error(':('); + + up.createURI = (callback: Function) => { + callback(error); + }; + + up.destroy = (err: Error) => { + assert.strictEqual(err, error); + done(); + }; + + up.restart(); + }); + + it('should save the uri to config when restarting', done => { + const uri = 'http://newly-created-uri'; + + up.createURI = (callback: Function) => { + callback(null, uri); + }; + + up.set = (props: {}) => { + assert.deepStrictEqual(props, {uri}); + done(); + }; + + up.restart(); + }); + + it('should start uploading', done => { + up.createURI = (callback: Function) => { + up.startUploading = done; + callback(); + }; + up.restart(); + }); + }); + }); + + describe('#get', () => { + it('should return the value from the config store', () => { + const prop = 'property'; + const value = 'abc'; + up.configStore = { + get(name: string) { + assert.strictEqual(name, up.cacheKey); + const obj: {[i: string]: string} = {}; + obj[prop] = value; + return obj; + }, + }; + assert.strictEqual(up.get(prop), value); + }); + }); + + describe('#set', () => { + it('should set the value to the config store', done => { + const props = {setting: true}; + up.configStore = { + set(name: string, prps: {}) { + assert.strictEqual(name, up.cacheKey); + assert.strictEqual(prps, props); + done(); + }, + }; + up.set(props); + }); + }); + + describe('#deleteConfig', () => { + it('should delete the entry from the config store', done => { + const props = {setting: true}; + + up.configStore = { + delete(name: string) { + assert.strictEqual(name, up.cacheKey); + done(); + }, + }; + + up.deleteConfig(props); + }); + }); + + describe('#onResponse', () => { + beforeEach(() => { + up.numRetries = 0; + up.startUploading = () => {}; + up.continueUploading = () => {}; + }); + + describe('404', () => { + const RESP = {status: 404, data: 'error message from server'}; + + it('should increase the retry count if less than limit', () => { + assert.strictEqual(up.numRetries, 0); + assert.strictEqual(up.onResponse(RESP), false); + assert.strictEqual(up.numRetries, 1); + }); + + it('should destroy the stream if gte limit', done => { + up.destroy = (err: Error) => { + assert.strictEqual( + err.message, + `Retry limit exceeded - ${RESP.data}` + ); + done(); + }; + + up.onResponse(RESP); + up.onResponse(RESP); + up.onResponse(RESP); + up.onResponse(RESP); + up.onResponse(RESP); + up.onResponse(RESP); + }); + + it('should start an upload', done => { + up.startUploading = done; + up.onResponse(RESP); + }); + }); + + describe('500s', () => { + const RESP = {status: 500, data: 'error message from server'}; + + it('should increase the retry count if less than limit', () => { + up.getRetryDelay = () => 1; + assert.strictEqual(up.numRetries, 0); + assert.strictEqual(up.onResponse(RESP), false); + assert.strictEqual(up.numRetries, 1); + }); + + it('should destroy the stream if greater than limit', done => { + up.getRetryDelay = () => 1; + up.destroy = (err: Error) => { + assert.strictEqual( + err.message, + `Retry limit exceeded - ${RESP.data}` + ); + done(); + }; + + up.onResponse(RESP); + up.onResponse(RESP); + up.onResponse(RESP); + up.onResponse(RESP); + up.onResponse(RESP); + up.onResponse(RESP); + }); + + describe('exponential back off', () => { + let clock: sinon.SinonFakeTimers; + let setTimeoutSpy: sinon.SinonSpy; + beforeEach(() => { + clock = sinon.useFakeTimers({toFake: ['setTimeout']}); + setTimeoutSpy = sinon.spy(global, 'setTimeout'); + }); + afterEach(() => { + clock.restore(); + }); + + it('should continue uploading after retry count^2 * random', done => { + up.continueUploading = function () { + assert.strictEqual(this, up); + + const minTime = Math.pow(2, up.numRetries - 1) * 1000; + const maxTime = minTime + 1000; + + const delay = setTimeoutSpy.lastCall.args[1]; + assert(delay >= minTime); + assert(delay <= maxTime); + + // make it keep retrying until the limit is reached + up.onResponse(RESP); + }; + + up.on('error', (err: Error) => { + assert.strictEqual(up.numRetries, 5); + assert.strictEqual( + err.message, + `Retry limit exceeded - ${RESP.data}` + ); + done(); + }); + + up.onResponse(RESP); + clock.runAll(); + }); + }); + }); + + describe('all others', () => { + const RESP = {status: 200}; + + it('should emit the response on the stream', done => { + up.getRetryDelay = () => 1; + up.on('response', (resp: {}) => { + assert.strictEqual(resp, RESP); + done(); + }); + up.onResponse(RESP); + }); + + it('should return true', () => { + up.getRetryDelay = () => 1; + assert.strictEqual(up.onResponse(RESP), true); + }); + + it('should handle a custom status code when passed a retry function', () => { + up.getRetryDelay = () => 1; + const RESP = {status: 1000}; + const customHandlerFunction = (err: ApiError) => { + return err.code === 1000; + }; + up.retryableErrorFn = customHandlerFunction; + + assert.strictEqual(up.onResponse(RESP), false); + }); + }); + }); + + describe('#attemptDelayedRetry', () => { + beforeEach(() => { + up.startUploading = () => {}; + up.continueUploading = () => {}; + up.getRetryDelay = () => 1; + }); + + it('should increment numRetries', () => { + assert.equal(up.numRetries, 0); + + up.attemptDelayedRetry({}); + + assert.equal(up.numRetries, 1); + }); + + it('should call `startUploading` on 404 && !this.numChunksReadInRequest', done => { + up.startUploading = done; + up.continueUploading = () => done('wanted `startUploading`'); + + up.attemptDelayedRetry({status: 404}); + }); + + it('should not call `startUploading` when on 404 && this.numChunksReadInRequest != 0', done => { + up.startUploading = () => done('wanted `continueUploading`'); + up.continueUploading = done; + + up.numChunksReadInRequest = 1; + up.attemptDelayedRetry({status: 404}); + }); + + it('should not call `startUploading` when !this.numChunksReadInRequest && status != 404', done => { + up.startUploading = () => done('wanted `continueUploading`'); + up.continueUploading = done; + + up.attemptDelayedRetry({status: 400}); + }); + + it('should call `getRetryDelay` when not calling `startUploading`', done => { + up.startUploading = () => done('wanted `continueUploading`'); + up.getRetryDelay = () => { + process.nextTick(done); + return 1; + }; + + up.attemptDelayedRetry({}); + }); + + it('should unshift last buffer, unset `offset`, and call `continueUploading` when not calling `startUploading`', done => { + up.startUploading = () => done('wanted `continueUploading`'); + up.continueUploading = () => { + assert.equal(up.numBytesWritten, 4); + assert.equal(up.lastChunkSent.byteLength, 0); + assert.equal( + up.upstreamChunkBuffer.toString(), + 'a'.repeat(12) + 'b'.repeat(10) + ); + assert.equal(up.offset, undefined); + + done(); + }; + + up.numBytesWritten = 16; + up.lastChunkSent = Buffer.alloc(12, 'a'); + up.upstreamChunkBuffer = Buffer.alloc(10, 'b'); + up.offset = 16; + + up.attemptDelayedRetry({}); + }); + + it('should destroy if retry total time limit exceeded (0)', done => { + up.getRetryDelay = () => 0; + up.on('error', (error: Error) => { + assert(error.message.match(/Retry total time limit exceeded/)); + done(); + }); + + up.attemptDelayedRetry({}); + }); + + it('should destroy if retry total time limit exceeded (< 0)', done => { + up.getRetryDelay = () => -123; + up.on('error', (error: Error) => { + assert(error.message.match(/Retry total time limit exceeded/)); + done(); + }); + + up.attemptDelayedRetry({}); + }); + + it('should destroy the object if this.numRetries > this.retryLimit', done => { + up.startUploading = () => done("shouldn't have called this"); + up.continueUploading = () => done("shouldn't have called this"); + up.getRetryDelay = () => done("shouldn't have called this"); + + up.on('error', (error: Error) => { + assert(error.message.match(/Retry limit exceeded/)); + done(); + }); + + up.numRetries = 4; + up.retryLimit = 3; + + up.attemptDelayedRetry({}); + }); + + it('should destroy the object if this.numRetries === this.retryLimit', done => { + up.startUploading = () => done("shouldn't have called this"); + up.continueUploading = () => done("shouldn't have called this"); + up.getRetryDelay = () => done("shouldn't have called this"); + + up.on('error', (error: Error) => { + assert(error.message.match(/Retry limit exceeded/)); + done(); + }); + + up.numRetries = 3; + up.retryLimit = 3; + + up.attemptDelayedRetry({}); + }); + }); + + describe('PROTOCOL_REGEX', () => { + it('should match a protocol', () => { + const urls = [ + {input: 'http://www.hi.com', match: 'http'}, + {input: 'mysite://www.hi.com', match: 'mysite'}, + {input: 'www.hi.com', match: null}, + ]; + + for (const url of urls) { + assert.strictEqual( + url.input.match(PROTOCOL_REGEX) && + url.input.match(PROTOCOL_REGEX)![1], + url.match + ); + } + }); + }); + + describe('#sanitizeEndpoint', () => { + const USER_DEFINED_SHORT_API_ENDPOINT = 'myapi.com:8080'; + const USER_DEFINED_PROTOCOL = 'myproto'; + const USER_DEFINED_FULL_API_ENDPOINT = `${USER_DEFINED_PROTOCOL}://myapi.com:8080`; + + it('should default protocol to https', () => { + const endpoint = up.sanitizeEndpoint(USER_DEFINED_SHORT_API_ENDPOINT); + assert.strictEqual(endpoint.match(PROTOCOL_REGEX)![1], 'https'); + }); + + it('should not override protocol', () => { + const endpoint = up.sanitizeEndpoint(USER_DEFINED_FULL_API_ENDPOINT); + assert.strictEqual( + endpoint.match(PROTOCOL_REGEX)![1], + USER_DEFINED_PROTOCOL + ); + }); + + it('should remove trailing slashes from URL', () => { + const endpointsWithTrailingSlashes = [ + `${USER_DEFINED_FULL_API_ENDPOINT}/`, + `${USER_DEFINED_FULL_API_ENDPOINT}//`, + ]; + for (const endpointWithTrailingSlashes of endpointsWithTrailingSlashes) { + const endpoint = up.sanitizeEndpoint(endpointWithTrailingSlashes); + assert.strictEqual(endpoint.endsWith('/'), false); + } + }); + }); + + describe('#getRetryDelay', () => { + beforeEach(() => { + up.timeOfFirstRequest = Date.now(); + }); + + it('should return exponential retry delay', () => { + const min = Math.pow(up.retryDelayMultiplier, up.numRetries) * 1000; + const max = + Math.pow(up.retryDelayMultiplier, up.numRetries) * 1000 + 1000; + const delayValue = up.getRetryDelay(); + + assert(delayValue >= min && delayValue <= max); + }); + + it('allows overriding the delay multiplier', () => { + [1, 2, 3].forEach(delayMultiplier => { + up.retryDelayMultiplier = delayMultiplier; + const min = Math.pow(up.retryDelayMultiplier, up.numRetries) * 1000; + const max = + Math.pow(up.retryDelayMultiplier, up.numRetries) * 1000 + 1000; + const delayValue = up.getRetryDelay(); + + assert(delayValue >= min && delayValue <= max); + }); + }); + + it('allows overriding the number of retries', () => { + [1, 2, 3].forEach(numRetry => { + up.numRetries = numRetry; + const min = Math.pow(up.retryDelayMultiplier, up.numRetries) * 1000; + const max = + Math.pow(up.retryDelayMultiplier, up.numRetries) * 1000 + 1000; + const delayValue = up.getRetryDelay(); + + assert(delayValue >= min && delayValue <= max); + }); + }); + + it('returns the value of maxRetryDelay when calculated values are larger', () => { + up.maxRetryDelay = 1; + const delayValue = up.getRetryDelay(); + + assert.strictEqual(delayValue, 1000); + }); + }); + + describe('upload', () => { + describe('single chunk', () => { + let uri = ''; + + beforeEach(() => { + uri = 'uri'; + + up.contentLength = CHUNK_SIZE_MULTIPLE * 8; + up.createURI = ( + callback: (error: Error | null, uri: string) => void + ) => { + up.uri = uri; + up.offset = 0; + callback(null, uri); + }; + }); + + it('should make the correct request', done => { + // For additional information: + // - https://cloud.google.com/storage/docs/performing-resumable-uploads#single-chunk-upload + + const CHUNK_SIZE = CHUNK_SIZE_MULTIPLE * 2; + const NON_CHUNK_SIZE_DIVISIBLE_AMOUNT = 2; + const CONTENT_LENGTH = CHUNK_SIZE * 8 + NON_CHUNK_SIZE_DIVISIBLE_AMOUNT; + const EXPECTED_NUM_REQUESTS = 1; + + // We want the class to be able to handle varying chunk sizes uniformly. + let wrote = 0; + let wroteChunkLargerThanChunkSize = false; + let wroteChunkEqualToChunkSize = false; + let wroteChunkLessThanChunkSize = false; + + const upstreamBuffer = new Readable({ + read() { + const remainingToWrite = CONTENT_LENGTH - wrote; + + if (!remainingToWrite) { + // signal finish + this.push(null); + } else if (remainingToWrite > CHUNK_SIZE * 3) { + // write large chunk + const LARGE_CHUNK = Buffer.alloc(CHUNK_SIZE * 2); + + wrote += LARGE_CHUNK.byteLength; + wroteChunkLargerThanChunkSize = true; + + this.push(LARGE_CHUNK); + } else if (remainingToWrite > CHUNK_SIZE) { + // write chunk-sized chunk + const EQUAL_CHUNK = Buffer.alloc(CHUNK_SIZE); + + wrote += EQUAL_CHUNK.byteLength; + wroteChunkEqualToChunkSize = true; + + this.push(EQUAL_CHUNK); + } else { + // write small chunk + const SMALL_CHUNK = Buffer.alloc(remainingToWrite); + + wrote += SMALL_CHUNK.byteLength; + wroteChunkLessThanChunkSize = true; + + this.push(SMALL_CHUNK); + } + }, + }); + + const requests: { + dataReceived: number; + opts: GaxiosOptions; + chunkWritesInRequest: number; + }[] = []; + let overallDataReceived = 0; + + up.contentLength = CONTENT_LENGTH; + + up.makeRequestStream = async (opts: GaxiosOptions) => { + let dataReceived = 0; + let chunkWritesInRequest = 0; + + await new Promise(resolve => { + opts.body.on('data', (data: Buffer) => { + dataReceived += data.byteLength; + overallDataReceived += data.byteLength; + chunkWritesInRequest++; + }); + + opts.body.on('end', () => { + requests.push({dataReceived, opts, chunkWritesInRequest}); + + up.emit('response', { + status: 200, + data: {}, + }); + + resolve(null); + }); + }); + }; + + up.on('error', done); + + up.on('finish', () => { + // Ensure the correct number of requests and data look correct + assert.equal(requests.length, EXPECTED_NUM_REQUESTS); + assert.equal(overallDataReceived, CONTENT_LENGTH); + + // Make sure we wrote the desire mix of chunk sizes + assert(wroteChunkLargerThanChunkSize); + assert(wroteChunkEqualToChunkSize); + assert(wroteChunkLessThanChunkSize); + + // Validate the single request + const request = requests[0]; + + assert.strictEqual(request.opts.method, 'PUT'); + assert.strictEqual(request.opts.url, uri); + + // We should be writing multiple chunks down the wire + assert(request.chunkWritesInRequest > 1); + + assert.equal(request.dataReceived, CONTENT_LENGTH); + assert.deepStrictEqual(request.opts.headers, { + 'Content-Range': `bytes 0-*/${CONTENT_LENGTH}`, + }); + + done(); + }); + + // init the request + upstreamBuffer.pipe(up); + }); + }); + + describe('multiple chunk', () => { + let uri = ''; + + beforeEach(() => { + uri = 'uri'; + + up.chunkSize = CHUNK_SIZE_MULTIPLE; + up.contentLength = CHUNK_SIZE_MULTIPLE * 8; + up.createURI = ( + callback: (error: Error | null, uri: string) => void + ) => { + up.uri = uri; + up.offset = 0; + callback(null, uri); + }; + }); + + it('should make the correct requests', done => { + // For additional information: + // - https://cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload + // - https://cloud.google.com/storage/docs/resumable-uploads#resent-data + + const CHUNK_SIZE = CHUNK_SIZE_MULTIPLE * 2; + // This is important - we want to make sure requests + // where `CONTENT_LENGTH % CHUNK_SIZE !== 0` are fine. + const LAST_REQUEST_SIZE = 2; + const CONTENT_LENGTH = CHUNK_SIZE * 8 + LAST_REQUEST_SIZE; + const EXPECTED_NUM_REQUESTS = + Math.floor(CONTENT_LENGTH / CHUNK_SIZE) + 1; + + // We want the class to be able to handle varying chunk sizes uniformly. + let wrote = 0; + let wroteChunkLargerThanChunkSize = false; + let wroteChunkEqualToChunkSize = false; + let wroteChunkLessThanChunkSize = false; + + const upstreamBuffer = new Readable({ + read() { + const remainingToWrite = CONTENT_LENGTH - wrote; + + if (!remainingToWrite) { + // signal finish + this.push(null); + } else if (remainingToWrite > CHUNK_SIZE * 3) { + // write large chunk + const LARGE_CHUNK = Buffer.alloc(CHUNK_SIZE * 2); + + wrote += LARGE_CHUNK.byteLength; + wroteChunkLargerThanChunkSize = true; + + this.push(LARGE_CHUNK); + } else if (remainingToWrite > CHUNK_SIZE) { + // write chunk-sized chunk + const EQUAL_CHUNK = Buffer.alloc(CHUNK_SIZE); + + wrote += EQUAL_CHUNK.byteLength; + wroteChunkEqualToChunkSize = true; + + this.push(EQUAL_CHUNK); + } else { + // write small chunk + const SMALL_CHUNK = Buffer.alloc(remainingToWrite); + + wrote += SMALL_CHUNK.byteLength; + wroteChunkLessThanChunkSize = true; + + this.push(SMALL_CHUNK); + } + }, + }); + + const requests: { + dataReceived: number; + opts: GaxiosOptions; + chunkWritesInRequest: number; + }[] = []; + let overallDataReceived = 0; + + up.chunkSize = CHUNK_SIZE; + up.contentLength = CONTENT_LENGTH; + + up.makeRequestStream = async (opts: GaxiosOptions) => { + let dataReceived = 0; + let chunkWritesInRequest = 0; + + await new Promise(resolve => { + opts.body.on('data', (data: Buffer) => { + dataReceived += data.byteLength; + overallDataReceived += data.byteLength; + chunkWritesInRequest++; + }); + + opts.body.on('end', () => { + requests.push({dataReceived, opts, chunkWritesInRequest}); + + if (overallDataReceived < CONTENT_LENGTH) { + const lastByteReceived = overallDataReceived + ? overallDataReceived - 1 + : 0; + + up.emit('response', { + status: RESUMABLE_INCOMPLETE_STATUS_CODE, + headers: { + range: `bytes=0-${lastByteReceived}`, + }, + data: {}, + }); + } else { + up.emit('response', { + status: 200, + data: {}, + }); + } + + resolve(null); + }); + }); + }; + + up.on('error', done); + + up.on('finish', () => { + // Ensure the correct number of requests and data look correct + assert.equal(requests.length, EXPECTED_NUM_REQUESTS); + assert.equal(overallDataReceived, CONTENT_LENGTH); + + // Make sure we wrote the desire mix of chunk sizes + assert(wroteChunkLargerThanChunkSize); + assert(wroteChunkEqualToChunkSize); + assert(wroteChunkLessThanChunkSize); + + // Validate each request + for (let i = 0; i < requests.length; i++) { + const request = requests[i]; + const offset = i * CHUNK_SIZE; + + assert.strictEqual(request.opts.method, 'PUT'); + assert.strictEqual(request.opts.url, uri); + + // We should be writing 1, single chunk down the wire + assert.strictEqual(request.chunkWritesInRequest, 1); + + if (requests.length - i === 1) { + // The last chunk + const endByte = offset + LAST_REQUEST_SIZE - 1; + + assert.equal(request.dataReceived, LAST_REQUEST_SIZE); + assert.deepStrictEqual(request.opts.headers, { + 'Content-Length': LAST_REQUEST_SIZE, + 'Content-Range': `bytes ${offset}-${endByte}/${CONTENT_LENGTH}`, + }); + } else { + // The preceding chunks + const endByte = offset + CHUNK_SIZE - 1; + + assert.equal(request.dataReceived, CHUNK_SIZE); + assert.deepStrictEqual(request.opts.headers, { + 'Content-Length': CHUNK_SIZE, + 'Content-Range': `bytes ${offset}-${endByte}/${CONTENT_LENGTH}`, + }); + } + } + + done(); + }); + + // init the request + upstreamBuffer.pipe(up); + }); + }); + }); +});