Skip to content

Commit

Permalink
fix: do not cancel stream after server returned ok or cancelled status (
Browse files Browse the repository at this point in the history
googleapis#1029)

* fix: do not cancel stream after server returned ok or cancelled status

* remove comment

* lint

* lint
  • Loading branch information
mutianf authored Mar 16, 2022
1 parent 2576d14 commit 33754a2
Showing 1 changed file with 23 additions and 19 deletions.
42 changes: 23 additions & 19 deletions src/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
createReadStream(opts?: GetRowsOptions) {
const options = opts || {};
const maxRetries = is.number(this.maxRetries) ? this.maxRetries! : 3;
let activeRequestStream: AbortableDuplex;
let activeRequestStream: AbortableDuplex | null;
let rowKeys: string[];
const ranges = options.ranges || [];
let filter: {} | null;
Expand Down Expand Up @@ -786,7 +786,6 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);
userStream.end = () => {
rowStream?.unpipe(userStream);
if (activeRequestStream) {
// TODO: properly end the stream instead of abort
activeRequestStream.abort();
}
return end();
Expand Down Expand Up @@ -927,23 +926,28 @@ Please use the format 'prezzy' or '${instance.name}/tables/prezzy'.`);

rowStream = pumpify.obj([requestStream, chunkTransformer, toRowStream]);

rowStream.on('error', (error: ServiceError) => {
if (IGNORED_STATUS_CODES.has(error.code)) {
// We ignore the `cancelled` "error", since we are the ones who cause
// it when the user calls `.abort()`.
userStream.end();
return;
}
rowStream.unpipe(userStream);
if (
numRequestsMade <= maxRetries &&
RETRYABLE_STATUS_CODES.has(error.code)
) {
makeNewRequest();
} else {
userStream.emit('error', error);
}
});
rowStream
.on('error', (error: ServiceError) => {
rowStream.unpipe(userStream);
activeRequestStream = null;
if (IGNORED_STATUS_CODES.has(error.code)) {
// We ignore the `cancelled` "error", since we are the ones who cause
// it when the user calls `.abort()`.
userStream.end();
return;
}
if (
numRequestsMade <= maxRetries &&
RETRYABLE_STATUS_CODES.has(error.code)
) {
makeNewRequest();
} else {
userStream.emit('error', error);
}
})
.on('end', () => {
activeRequestStream = null;
});
rowStream.pipe(userStream);
numRequestsMade++;
};
Expand Down

0 comments on commit 33754a2

Please sign in to comment.