Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: properly handle asynchronous read from stream #1284

Merged
merged 18 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions src/chunktransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ export class ChunkTransformer extends Transform {
* @public
*
* @param {object} data readrows response containing array of chunks.
* @param {object} [enc] encoding options.
* @param {object} [_encoding] encoding options.
* @param {callback} next callback will be called once data is processed, with error if any error in processing
*/
_transform(data: Data, enc: string, next: Function): void {
_transform(data: Data, _encoding: string, next: Function): void {
for (const chunk of data.chunks!) {
switch (this.state) {
case RowStateEnum.NEW_ROW:
Expand All @@ -148,6 +148,7 @@ export class ChunkTransformer extends Transform {
break;
}
if (this._destroyed) {
next();
return;
}
}
Expand Down Expand Up @@ -226,7 +227,16 @@ export class ChunkTransformer extends Transform {
chunk.familyName ||
chunk.qualifier ||
(chunk.value && chunk.value.length !== 0) ||
chunk.timestampMicros! > 0;
// timestampMicros is an int64 in the protobuf definition,
// which can be either a number or an instance of Long.
// If it's a number...
(typeof chunk.timestampMicros === 'number' &&
chunk.timestampMicros! > 0) ||
// If it's an instance of Long...
(typeof chunk.timestampMicros === 'object' &&
'compare' in chunk.timestampMicros &&
typeof chunk.timestampMicros.compare === 'function' &&
chunk.timestampMicros.compare(0) === 1);
if (chunk.resetRow && containsData) {
this.destroy(
new TransformError({
Expand Down
49 changes: 39 additions & 10 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -745,22 +745,51 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
filter = Filter.parse(options.filter);
}

const userStream = new PassThrough({objectMode: true});
const end = userStream.end.bind(userStream);
userStream.end = () => {
let chunkTransformer: ChunkTransformer;
let rowStream: Duplex;

let userCanceled = false;
const userStream = new PassThrough({
objectMode: true,
transform(row, _encoding, callback) {
if (userCanceled) {
callback();
return;
}
callback(null, row);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we add this logic here to prevent the user from receiving rows when they make it past the race condition in toRowStream? If so then do we need the userCanceled || check in toRowStream at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The reason we also have the condition in toRowStream.transform is just because it does not make much sense to pipe more data through toRowStream after the user explicitly asked us to stop.

},
});

// The caller should be able to call userStream.end() to stop receiving
// more rows and cancel the stream prematurely. But also, the 'end' event
// will be emitted if the stream ended normally. To tell these two
// situations apart, we'll save the "original" end() function, and
// will call it on rowStream.on('end').
const originalEnd = userStream.end.bind(userStream);

// Taking care of this extra listener when piping and unpiping userStream:
const rowStreamPipe = (rowStream: Duplex, userStream: PassThrough) => {
rowStream.pipe(userStream, {end: false});
rowStream.on('end', originalEnd);
};
const rowStreamUnpipe = (rowStream: Duplex, userStream: PassThrough) => {
rowStream?.unpipe(userStream);
rowStream?.removeListener('end', originalEnd);
};

// eslint-disable-next-line @typescript-eslint/no-explicit-any
userStream.end = (chunk?: any, encoding?: any, cb?: () => void) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a comment explaining that this is needed to fulfill a pre-existing contract with the caller, where the caller can prematurely cancel the stream by calling end. And we need to be able to disambiguate between a user cancellation and the a normal end. So we capture the originalEnd to be used for normal termination and the overwritten end is meant for end users

rowStreamUnpipe(rowStream, userStream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this prevent the data from being passed on from rowStream to userStream? It looks like we are using both userCanceled and unpipe to stop passing data along. Should we also unpipe the data being passed from the chunk transformer to toRowStream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really care about the data passed from chunkTransformer to toRowStream because this is all inside rowStream. We only need to stop that data from flowing into the user stream. Since it's an end of the user stream, it's not really important if more or less data has flown inside the rowStream pipeline.

userCanceled = true;
if (activeRequestStream) {
activeRequestStream.abort();
}
if (retryTimer) {
clearTimeout(retryTimer);
}
return end();
return originalEnd(chunk, encoding, cb);
};

let chunkTransformer: ChunkTransformer;
let rowStream: Duplex;

const makeNewRequest = () => {
// Avoid cancelling an expired timer if user
// cancelled the stream in the middle of a retry
Expand Down Expand Up @@ -882,7 +911,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
const toRowStream = new Transform({
transform: (rowData, _, next) => {
if (
chunkTransformer._destroyed ||
userCanceled ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(userStream as any)._writableState.ended
) {
Expand Down Expand Up @@ -913,7 +942,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);

rowStream
.on('error', (error: ServiceError) => {
rowStream.unpipe(userStream);
rowStreamUnpipe(rowStream, userStream);
activeRequestStream = null;
if (IGNORED_STATUS_CODES.has(error.code)) {
// We ignore the `cancelled` "error", since we are the ones who cause
Expand Down Expand Up @@ -947,7 +976,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
.on('end', () => {
activeRequestStream = null;
});
rowStream.pipe(userStream);
rowStreamPipe(rowStream, userStream);
};

makeNewRequest();
Expand Down
6 changes: 0 additions & 6 deletions test/chunktransformer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -997,12 +997,6 @@ describe('Bigtable/ChunkTransformer', () => {
const err = callback.getCall(0).args[0];
assert(!err, 'did not expect error');
});
it('should return when stream is destroyed', () => {
chunkTransformer._destroyed = true;
const chunks = [{key: 'key'}];
chunkTransformer._transform({chunks}, {}, callback);
assert(!callback.called, 'unexpected call to next');
});
it('should change the `lastRowKey` value for `data.lastScannedRowKey`', () => {
chunkTransformer._transform(
{chunks: [], lastScannedRowKey: 'foo'},
Expand Down
5 changes: 2 additions & 3 deletions test/readrows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,7 @@ describe('Bigtable/ReadRows', () => {
pipeline(readStream, transform, passThrough, () => {});
});

// TODO(@alexander-fenster): enable after https://github.com/googleapis/nodejs-bigtable/issues/607 is fixed
it.skip('should create read stream and read asynchronously using Transform stream', function (done) {
it('should create read stream and read asynchronously using Transform stream', function (done) {
if (process.platform === 'win32') {
this.timeout(60000); // it runs much slower on Windows!
}
Expand Down Expand Up @@ -222,7 +221,7 @@ describe('Bigtable/ReadRows', () => {
});
});

// TODO(@alexander-fenster): enable after it's fixed
// TODO: enable after https://github.com/googleapis/nodejs-bigtable/issues/1286 is fixed
it.skip('should be able to stop reading from the read stream when reading asynchronously', function (done) {
if (process.platform === 'win32') {
this.timeout(60000); // it runs much slower on Windows!
Expand Down