Skip to content

Commit

Permalink
refactor(deps): Remove get-stream in favor of async iterators (#1925)
Browse files Browse the repository at this point in the history
* refactor(deps): Remove get-stream in favor of async iterators

* explicitly handle stream error for node < 16

* clean up implementation, fix unit tests

* update testbench docker image to 0.20.0
  • Loading branch information
ddelgrosso1 authored May 16, 2022
1 parent c8f6257 commit 5d8bfd0
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 11 deletions.
2 changes: 1 addition & 1 deletion conformance-test/testBenchUtil.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ const PORT = new URL(HOST).port;
const CONTAINER_NAME = 'storage-testbench';
const DEFAULT_IMAGE_NAME =
'gcr.io/cloud-devrel-public-resources/storage-testbench';
const DEFAULT_IMAGE_TAG = 'v0.14.0';
const DEFAULT_IMAGE_TAG = 'v0.20.0';
const DOCKER_IMAGE = `${DEFAULT_IMAGE_NAME}:${DEFAULT_IMAGE_TAG}`;
const PULL_CMD = `docker pull ${DOCKER_IMAGE}`;
const RUN_CMD = `docker run --rm -d -p ${PORT}:${PORT} --name ${CONTAINER_NAME} ${DOCKER_IMAGE} && sleep 1`;
Expand Down
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
"ent": "^2.2.0",
"extend": "^3.0.2",
"gaxios": "^4.0.0",
"get-stream": "^6.0.0",
"google-auth-library": "^7.14.1",
"hash-stream-validation": "^0.2.2",
"mime": "^3.0.0",
Expand Down
20 changes: 14 additions & 6 deletions src/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import {
import {promisifyAll} from '@google-cloud/promisify';

import compressible = require('compressible');
import getStream = require('get-stream');
import * as crypto from 'crypto';
import * as dateFormat from 'date-and-time';
import * as extend from 'extend';
Expand Down Expand Up @@ -1394,8 +1393,8 @@ class File extends ServiceObject<File> {
) => {
if (err) {
// Get error message from the body.
getStream(rawResponseStream).then(body => {
err.message = body;
this.getBufferFromReadable(rawResponseStream).then(body => {
err.message = body.toString('utf8');
throughStream.destroy(err);
});

Expand Down Expand Up @@ -2172,10 +2171,9 @@ class File extends ServiceObject<File> {
fileStream.pipe(writable).on('error', callback).on('finish', callback);
});
} else {
getStream
.buffer(fileStream)
this.getBufferFromReadable(fileStream)
.then(contents => callback?.(null, contents))
.catch(callback as (error: RequestError) => void);
.catch(callback as (err: RequestError) => void);
}
}

Expand Down Expand Up @@ -4104,6 +4102,15 @@ class File extends ServiceObject<File> {
this.storage.retryOptions.autoRetry = false;
}
}

private async getBufferFromReadable(readable: Readable): Promise<Buffer> {
const buf = [];
for await (const chunk of readable) {
buf.push(chunk);
}

return Buffer.concat(buf);
}
}

/*! Developer Documentation
Expand All @@ -4118,6 +4125,7 @@ promisifyAll(File, {
'save',
'setEncryptionKey',
'shouldRetryBasedOnPreconditionAndIdempotencyStrat',
'getBufferFromReadable',
],
});

Expand Down
17 changes: 14 additions & 3 deletions test/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ const fakePromisify = {
'save',
'setEncryptionKey',
'shouldRetryBasedOnPreconditionAndIdempotencyStrat',
'getBufferFromReadable',
]);
},
};
Expand Down Expand Up @@ -2651,8 +2652,13 @@ describe('File', () => {
it('should only execute callback once', done => {
Object.assign(fileReadStream, {
_read(this: Readable) {
this.emit('error', new Error('Error.'));
this.emit('error', new Error('Error.'));
// Do not fire the errors immediately as this is a synchronous operation here
// and the iterator getter is also synchronous in file.getBufferFromReadable.
// this is only an issue for <= node 12. This cannot happen in practice.
process.nextTick(() => {
this.emit('error', new Error('Error.'));
this.emit('error', new Error('Error.'));
});
},
});

Expand Down Expand Up @@ -2685,7 +2691,12 @@ describe('File', () => {

Object.assign(fileReadStream, {
_read(this: Readable) {
this.emit('error', error);
// Do not fire the errors immediately as this is a synchronous operation here
// and the iterator getter is also synchronous in file.getBufferFromReadable.
// this is only an issue for <= node 12. This cannot happen in practice.
process.nextTick(() => {
this.emit('error', error);
});
},
});

Expand Down

0 comments on commit 5d8bfd0

Please sign in to comment.