Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: properly handle asynchronous read from stream #1284

Merged
merged 18 commits into from
May 30, 2023
Merged

Conversation

alexander-fenster
Copy link
Contributor

@alexander-fenster alexander-fenster commented May 19, 2023

Fixes #607.

Note: I'm currently comparing this pull request to #1282, I will change the base to main after #1282 is merged. This way it's easier to review.

Several changes here:

  1. Remove the logic in table.ts that would skip the unprocessed records if a ChunkTransformer has _destroyed flag set. There are two separate cases when the ChunkTransformer gets destroyed:

    1. if it's a network problem or anything else that forced gRPC to emit an error event. In such case, it always makes sense to process all the rows we have received before giving up.

    2. if it's a user cancellation (by calling stream.end() from the user code). In this case we need to stop emitting new rows.

So let's introduce a new flag, _canceled, and ignore the incoming rows only if the stream is canceled from .end(). Let's have the new flag, userCanceled, and use it in userStream transform method to stop emitting new rows when the user don't need them, but we'll still keep emitting all rows if there was no user cancellation. This change fixes #607.

  1. When a network error happens, we need to update the lastRowKey only if the row processing has completed (the commit bit was set in the last chunk). Until then, don't update lastRowKey. The current code makes recovery skip one record - the one that was incomplete. This was actually a test problem, there is no problem here.

  2. Minor change to make modern TypeScript compiler happy when comparing chunk.timestampMicros to zero. chunk.timestampMicros can theoretically be an instance of Long, so let's handle this accordingly. It was showing as an error in my editor and it makes sense so I'm fixing it in the safest way possible. In the default case, the || expression will short circuit in the typeof chunk.timestampMicros === 'number' branch.

@alexander-fenster alexander-fenster requested review from a team as code owners May 19, 2023 22:15
@product-auto-label product-auto-label bot added size: m Pull request size is medium. api: bigtable Issues related to the googleapis/nodejs-bigtable API. labels May 19, 2023
@alexander-fenster alexander-fenster added the kokoro:run Add this label to force Kokoro to re-run the tests. label May 19, 2023
@alexander-fenster
Copy link
Contributor Author

Note: the system and samples tests won't run here until I rebase this PR to be against the main branch, but I ran them locally and they pass.

Also note: I cherry-picked a commit from #1285 to avoid an unrelated Windows test failure.

this.lastRowKey = undefined;
this.reset();
}

get canceled() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we expose ChunkTransformer as part of our public api. Which I think is incorrect to begin with. To minimize further leakage of implementation details, can we mark this method as internal?

Comment on lines 225 to 231
(typeof chunk.timestampMicros === 'number' &&
chunk.timestampMicros! > 0) ||
// if it's an instance of Long
(typeof chunk.timestampMicros === 'object' &&
'compare' in chunk.timestampMicros &&
typeof chunk.timestampMicros.compare === 'function' &&
chunk.timestampMicros.compare(0) === 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit strange, chunk is a protobuf, so shouldnt it have a stable type?

Copy link
Contributor Author

@alexander-fenster alexander-fenster May 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the specifics of protobufjs handling 64 bit integers. In JavaScript, all numbers are 64-bit IEEE 754 floats, so they can reliably represent up to 2⁵³ – 1. So protobufjs accepts and may emit an object of type Long (from https://www.npmjs.com/package/long). As far as I understand in this case it's always number in practice, but in my clean workspace VSCode shows error here, so should be safe to fix.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please leave a comment in the code explaining this

src/table.ts Outdated
@@ -882,7 +885,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
const toRowStream = new Transform({
transform: (rowData, _, next) => {
if (
chunkTransformer._destroyed ||
chunkTransformer.canceled ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there are only 2 things that care about the cancelled flag:

  • the closure for userstream.end()
  • the closure for toRowStream

ChunkTransformer seems like an innocent bystander here. Why not move the userCancelled flag as a local var in createReadStream(), whose scope is shared by the 2 closure that care about the flag?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, maybe create a subclass for userStream that holds the state

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving the flag closer to the consumer makes sense to me, I'll try to do it. I only put it here to replace the _destroyed logic, but you are right, it will make more sense downstream.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 154 to 156
if (data.lastScannedRowKey && data.lastScannedRowKey.length > 0) {
this.lastRowKey = Mutation.convertFromBytes(
data.lastScannedRowKey as Bytes,
{
userOptions: this.options,
}
);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this breaks request resumption logic in makeNewRequest

const lastRowKey = chunkTransformer ? chunkTransformer.lastRowKey : '';

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lastRowKey is set when committing, it should not be set here because the row is still incomplete at this moment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my fault, I forgot to mention one other detail of ReadRows protocol, there are 2 ways that a readrows resumption request can be built:

  1. based on the last received row key of a committed row chunk. ( This one you know about)
  2. As a heartbeat sent via an empty ReadRowsResponse. This is will be used when a caller has very restrictive filters that cause the scan to omit entire tablets. The protocol allows for the server to emit a heartbeat with the last scanned (as opposed to returned) row key

This is not currently enable on the serverside, but is specified by the protocol

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed this

@igorbernstein2
Copy link
Contributor

Also I think we still have a problem of end() on the userStream being delayed. I think we can deal with it in a separate PR but I think that in the case where the consumer is slower than the grpc producer (ie. the user transformer calls the callback in a setTimeout()). Then rows can get buffered in the userStream. When the end user calls cancels the stream early via an end(), the rows buffered in the userStream will still be emitted.

I'm not sure how to deal with this. We would need the userStream to signal the stream end on read() when cancelled()

Base automatically changed from stream-tests to main May 22, 2023 23:15
@product-auto-label product-auto-label bot added size: s Pull request size is small. and removed size: m Pull request size is medium. labels May 22, 2023
@yoshi-kokoro yoshi-kokoro removed the kokoro:run Add this label to force Kokoro to re-run the tests. label May 22, 2023
@product-auto-label product-auto-label bot added size: m Pull request size is medium. and removed size: s Pull request size is small. labels May 23, 2023
Comment on lines 225 to 231
(typeof chunk.timestampMicros === 'number' &&
chunk.timestampMicros! > 0) ||
// if it's an instance of Long
(typeof chunk.timestampMicros === 'object' &&
'compare' in chunk.timestampMicros &&
typeof chunk.timestampMicros.compare === 'function' &&
chunk.timestampMicros.compare(0) === 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please leave a comment in the code explaining this

});
const originalEnd = userStream.end.bind(userStream);
// eslint-disable-next-line @typescript-eslint/no-explicit-any
userStream.end = (chunk?: any, encoding?: any, cb?: () => void) => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a comment explaining that this is needed to fulfill a pre-existing contract with the caller, where the caller can prematurely cancel the stream by calling end. And we need to be able to disambiguate between a user cancellation and the a normal end. So we capture the originalEnd to be used for normal termination and the overwritten end is meant for end users

src/table.ts Outdated
Comment on lines 765 to 766
rowStream?.unpipe(userStream);
rowStream.removeListener('end', originalEnd);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add 2 helper functions rowStreamPipe & rowStreamUnpipe and add comments that we need to do special handling of the end event to re-direct the handler to the originalEnd

src/table.ts Outdated
Comment on lines 929 to 930
rowStream.unpipe(userStream);
rowStream.removeListener('end', originalEnd);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please delegate to a helper

Copy link
Contributor

@danieljbruce danieljbruce left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good. A couple of questions/comments about specific details.

callback();
return;
}
callback(null, row);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we add this logic here to prevent the user from receiving rows when they make it past the race condition in toRowStream? If so then do we need the userCanceled || check in toRowStream at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The reason we also have the condition in toRowStream.transform is just because it does not make much sense to pipe more data through toRowStream after the user explicitly asked us to stop.


// eslint-disable-next-line @typescript-eslint/no-explicit-any
userStream.end = (chunk?: any, encoding?: any, cb?: () => void) => {
rowStreamUnpipe(rowStream, userStream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this prevent the data from being passed on from rowStream to userStream? It looks like we are using both userCanceled and unpipe to stop passing data along. Should we also unpipe the data being passed from the chunk transformer to toRowStream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really care about the data passed from chunkTransformer to toRowStream because this is all inside rowStream. We only need to stop that data from flowing into the user stream. Since it's an end of the user stream, it's not really important if more or less data has flown inside the rowStream pipeline.

@danieljbruce danieljbruce mentioned this pull request May 29, 2023
@alexander-fenster alexander-fenster merged commit 55d86ba into main May 30, 2023
@alexander-fenster alexander-fenster deleted the fix-streams branch May 30, 2023 18:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: bigtable Issues related to the googleapis/nodejs-bigtable API. size: m Pull request size is medium.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

createReadStream didn't returns all data when stream is pause and resumed
4 participants