-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: properly handle asynchronous read from stream #1284
Changes from all commits
318aeff
2c229de
1924249
99813aa
3353666
3f2385b
bf950d0
fbd43a0
aaff430
1e52970
b02c587
d944d58
69077d9
56d4364
71b67a1
4099b2b
e54f607
63453b9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -745,22 +745,51 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); | |
filter = Filter.parse(options.filter); | ||
} | ||
|
||
const userStream = new PassThrough({objectMode: true}); | ||
const end = userStream.end.bind(userStream); | ||
userStream.end = () => { | ||
let chunkTransformer: ChunkTransformer; | ||
let rowStream: Duplex; | ||
|
||
let userCanceled = false; | ||
const userStream = new PassThrough({ | ||
objectMode: true, | ||
transform(row, _encoding, callback) { | ||
if (userCanceled) { | ||
callback(); | ||
return; | ||
} | ||
callback(null, row); | ||
}, | ||
}); | ||
|
||
// The caller should be able to call userStream.end() to stop receiving | ||
// more rows and cancel the stream prematurely. But also, the 'end' event | ||
// will be emitted if the stream ended normally. To tell these two | ||
// situations apart, we'll save the "original" end() function, and | ||
// will call it on rowStream.on('end'). | ||
const originalEnd = userStream.end.bind(userStream); | ||
|
||
// Taking care of this extra listener when piping and unpiping userStream: | ||
const rowStreamPipe = (rowStream: Duplex, userStream: PassThrough) => { | ||
rowStream.pipe(userStream, {end: false}); | ||
rowStream.on('end', originalEnd); | ||
}; | ||
const rowStreamUnpipe = (rowStream: Duplex, userStream: PassThrough) => { | ||
rowStream?.unpipe(userStream); | ||
rowStream?.removeListener('end', originalEnd); | ||
}; | ||
|
||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
userStream.end = (chunk?: any, encoding?: any, cb?: () => void) => { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add a comment explaining that this is needed to fulfill a pre-existing contract with the caller, where the caller can prematurely cancel the stream by calling end. And we need to be able to disambiguate between a user cancellation and the a normal end. So we capture the originalEnd to be used for normal termination and the overwritten end is meant for end users |
||
rowStreamUnpipe(rowStream, userStream); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this prevent the data from being passed on from rowStream to userStream? It looks like we are using both There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't really care about the data passed from |
||
userCanceled = true; | ||
if (activeRequestStream) { | ||
activeRequestStream.abort(); | ||
} | ||
if (retryTimer) { | ||
clearTimeout(retryTimer); | ||
} | ||
return end(); | ||
return originalEnd(chunk, encoding, cb); | ||
}; | ||
|
||
let chunkTransformer: ChunkTransformer; | ||
let rowStream: Duplex; | ||
|
||
const makeNewRequest = () => { | ||
// Avoid cancelling an expired timer if user | ||
// cancelled the stream in the middle of a retry | ||
|
@@ -882,7 +911,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); | |
const toRowStream = new Transform({ | ||
transform: (rowData, _, next) => { | ||
if ( | ||
chunkTransformer._destroyed || | ||
userCanceled || | ||
// eslint-disable-next-line @typescript-eslint/no-explicit-any | ||
(userStream as any)._writableState.ended | ||
) { | ||
|
@@ -913,7 +942,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); | |
|
||
rowStream | ||
.on('error', (error: ServiceError) => { | ||
rowStream.unpipe(userStream); | ||
rowStreamUnpipe(rowStream, userStream); | ||
activeRequestStream = null; | ||
if (IGNORED_STATUS_CODES.has(error.code)) { | ||
// We ignore the `cancelled` "error", since we are the ones who cause | ||
|
@@ -947,7 +976,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`); | |
.on('end', () => { | ||
activeRequestStream = null; | ||
}); | ||
rowStream.pipe(userStream); | ||
rowStreamPipe(rowStream, userStream); | ||
}; | ||
|
||
makeNewRequest(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we add this logic here to prevent the user from receiving rows when they make it past the race condition in
toRowStream
? If so then do we need theuserCanceled ||
check intoRowStream
at all?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The reason we also have the condition in
toRowStream.transform
is just because it does not make much sense to pipe more data throughtoRowStream
after the user explicitly asked us to stop.