Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Remove pumpify #2029

Merged
merged 7 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
"mime": "^3.0.0",
"mime-types": "^2.0.8",
"p-limit": "^3.0.1",
"pumpify": "^2.0.0",
"retry-request": "^5.0.0",
"teeny-request": "^8.0.0",
"uuid": "^8.0.0"
Expand All @@ -85,7 +84,6 @@
"@types/node": "^17.0.30",
"@types/node-fetch": "^2.1.3",
"@types/proxyquire": "^1.3.28",
"@types/pumpify": "^1.4.1",
"@types/request": "^2.48.4",
"@types/sinon": "^10.0.0",
"@types/tmp": "0.2.3",
Expand Down
246 changes: 149 additions & 97 deletions src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@ import * as crypto from 'crypto';
import * as extend from 'extend';
import * as fs from 'fs';
import * as mime from 'mime';
// eslint-disable-next-line @typescript-eslint/no-var-requires
const pumpify = require('pumpify');
import * as resumableUpload from './resumable-upload';
import {Writable, Readable, PassThrough} from 'stream';
import {Writable, Readable, pipeline, Transform, PassThrough} from 'stream';
import * as zlib from 'zlib';
import * as http from 'http';

Expand Down Expand Up @@ -1495,7 +1493,7 @@ class File extends ServiceObject<File> {

const headers = rawResponseStream.toJSON().headers;
isServedCompressed = headers['content-encoding'] === 'gzip';
const throughStreams: Writable[] = [];
const transformStreams: Transform[] = [];

if (shouldRunValidation) {
// The x-goog-hash header should be set with a crc32c and md5 hash.
Expand All @@ -1517,28 +1515,32 @@ class File extends ServiceObject<File> {
crc32cGenerator: this.crc32cGenerator,
});

throughStreams.push(validateStream);
transformStreams.push(validateStream);
}

if (isServedCompressed && options.decompress) {
throughStreams.push(zlib.createGunzip());
transformStreams.push(zlib.createGunzip());
}

if (throughStreams.length === 1) {
rawResponseStream =
// eslint-disable-next-line @typescript-eslint/no-explicit-any
rawResponseStream.pipe(throughStreams[0]) as any;
} else if (throughStreams.length > 1) {
rawResponseStream = rawResponseStream.pipe(
pumpify.obj(throughStreams)
);
}
const handoffStream = new PassThrough({
final: async cb => {
// Preserving `onComplete`'s ability to
// close `throughStream` before pipeline
// attempts to.
await onComplete(null);
cb();
},
});

rawResponseStream
.on('error', onComplete)
.on('end', onComplete)
.pipe(throughStream, {end: false});
pipeline(
rawResponseStream,
...(transformStreams as [Transform]),
handoffStream,
throughStream,
onComplete
);
};

// This is hooked to the `complete` event from the request stream. This is
// our chance to validate the data and let the user know if anything went
// wrong.
Expand Down Expand Up @@ -1948,101 +1950,92 @@ class File extends ServiceObject<File> {
crc32c = false;
}

// Collect data as it comes in to store in a hash. This is compared to the
// checksum value on the returned metadata from the API.
const validateStream = new HashStreamValidator({
/**
* A callback for determining when the underlying pipeline is complete.
* It's possible the pipeline callback could error before the write stream
* calls `final` so by default this will destroy the write stream unless the
* write stream sets this callback via it's `final` handler.
danielbankhead marked this conversation as resolved.
Show resolved Hide resolved
* @param error An optional error
*/
let pipelineCallback: (error?: Error | null) => void = error => {
writeStream.destroy(error || undefined);
};

// A stream for consumer to write to
const writeStream = new Writable({
final(cb) {
// Set the pipeline callback to this callback so the pipeline's results
// can be populated to the consumer
pipelineCallback = cb;

emitStream.end();
},
write(chunk, encoding, cb) {
emitStream.write(chunk, encoding, cb);
},
});

const emitStream = new PassThroughShim();
const hashCalculatingStream = new HashStreamValidator({
crc32c,
md5,
crc32cGenerator: this.crc32cGenerator,
});

const fileWriteStream = duplexify();

fileWriteStream.on('progress', evt => {
stream.emit('progress', evt);
});

const passThroughShim = new PassThroughShim();

passThroughShim.on('writing', () => {
stream.emit('writing');
let fileWriteStreamMetadataReceived = false;

// Handing off emitted events to users
emitStream.on('reading', () => writeStream.emit('reading'));
emitStream.on('writing', () => writeStream.emit('writing'));
fileWriteStream.on('progress', evt => writeStream.emit('progress', evt));
fileWriteStream.on('response', resp => writeStream.emit('response', resp));
fileWriteStream.once('metadata', () => {
fileWriteStreamMetadataReceived = true;
});

const stream = pumpify([
passThroughShim,
gzip ? zlib.createGzip() : new PassThrough(),
validateStream,
fileWriteStream,
]);

// Wait until we've received data to determine what upload technique to use.
stream.on('writing', () => {
writeStream.on('writing', () => {
if (options.resumable === false) {
this.startSimpleUpload_(fileWriteStream, options);
return;
}
this.startResumableUpload_(fileWriteStream, options);
});

fileWriteStream.on('response', stream.emit.bind(stream, 'response'));

// This is to preserve the `finish` event. We wait until the request stream
// emits "complete", as that is when we do validation of the data. After
// that is successful, we can allow the stream to naturally finish.
//
// Reference for tracking when we can use a non-hack solution:
// https://github.com/nodejs/node/pull/2314
fileWriteStream.on('prefinish', () => {
stream.cork();
});

// Compare our hashed version vs the completed upload's version.
fileWriteStream.on('complete', () => {
const metadata = this.metadata;

// If we're doing validation, assume the worst-- a data integrity
// mismatch. If not, these tests won't be performed, and we can assume the
// best.
let failed = crc32c || md5;

if (crc32c && metadata.crc32c) {
failed = !validateStream.test('crc32c', metadata.crc32c);
}

if (md5 && metadata.md5Hash) {
failed = !validateStream.test('md5', metadata.md5Hash);
} else {
this.startResumableUpload_(fileWriteStream, options);
}

if (failed) {
this.delete((err: ApiError) => {
let code;
let message;

if (err) {
code = 'FILE_NO_UPLOAD_DELETE';
message = `${FileExceptionMessages.UPLOAD_MISMATCH_DELETE_FAIL}${err.message}`;
} else if (md5 && !metadata.md5Hash) {
code = 'MD5_NOT_AVAILABLE';
message = FileExceptionMessages.MD5_NOT_AVAILABLE;
} else {
code = 'FILE_NO_UPLOAD';
message = FileExceptionMessages.UPLOAD_MISMATCH;
pipeline(
emitStream,
gzip ? zlib.createGzip() : new PassThrough(),
hashCalculatingStream,
fileWriteStream,
async e => {
if (e) {
return pipelineCallback(e);
}

const error = new RequestError(message);
error.code = code;
error.errors = [err!];

fileWriteStream.destroy(error);
});

return;
}
// We want to make sure we've received the metadata from the server in order
// to properly validate the object's integrity. Depending on the type of upload,
// the stream could close before the response is returned.
if (!fileWriteStreamMetadataReceived) {
try {
await new Promise((resolve, reject) => {
fileWriteStream.once('metadata', resolve);
fileWriteStream.once('error', reject);
});
} catch (e) {
return pipelineCallback(e as Error);
}
}

stream.uncork();
try {
await this.#validateIntegrity(hashCalculatingStream, {crc32c, md5});
pipelineCallback();
} catch (e) {
pipelineCallback(e as Error);
}
}
);
});

return stream as Writable;
return writeStream;
}

/**
Expand Down Expand Up @@ -3932,6 +3925,7 @@ class File extends ServiceObject<File> {
})
.on('metadata', metadata => {
this.metadata = metadata;
dup.emit('metadata');
})
.on('finish', () => {
dup.emit('complete');
Expand Down Expand Up @@ -4011,6 +4005,7 @@ class File extends ServiceObject<File> {
}

this.metadata = body;
dup.emit('metadata', body);
dup.emit('response', resp);
dup.emit('complete');
});
Expand Down Expand Up @@ -4049,6 +4044,63 @@ class File extends ServiceObject<File> {

return Buffer.concat(buf);
}

/**
*
* @param hashCalculatingStream
* @param verify
* @returns {boolean} Returns `true` if valid, throws with error otherwise
*/
async #validateIntegrity(
hashCalculatingStream: HashStreamValidator,
verify: {crc32c?: boolean; md5?: boolean} = {}
) {
const metadata = this.metadata;

// If we're doing validation, assume the worst
let dataMismatch = !!(verify.crc32c || verify.md5);

if (verify.crc32c && metadata.crc32c) {
dataMismatch = !hashCalculatingStream.test('crc32c', metadata.crc32c);
}

if (verify.md5 && metadata.md5Hash) {
dataMismatch = !hashCalculatingStream.test('md5', metadata.md5Hash);
}

if (dataMismatch) {
const errors: Error[] = [];
let code = '';
let message = '';

try {
await this.delete();

if (verify.md5 && !metadata.md5Hash) {
code = 'MD5_NOT_AVAILABLE';
message = FileExceptionMessages.MD5_NOT_AVAILABLE;
} else {
code = 'FILE_NO_UPLOAD';
message = FileExceptionMessages.UPLOAD_MISMATCH;
}
} catch (e) {
const error = e as Error;

code = 'FILE_NO_UPLOAD_DELETE';
message = `${FileExceptionMessages.UPLOAD_MISMATCH_DELETE_FAIL}${error.message}`;

errors.push(error);
}

const error = new RequestError(message);
error.code = code;
error.errors = errors;

throw error;
}

return true;
}
}

/*! Developer Documentation
Expand Down
Loading