From 8391ea8ca8d973073c342c51bcd27c5d37dd95e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 21 Feb 2024 20:00:42 +0100 Subject: [PATCH 1/4] fix: only reset pending value with resume token --- .gitignore | 1 + src/partial-result-stream.ts | 14 ++- test/spanner.ts | 207 +++++++++++++++++++++++++++++++++-- 3 files changed, 210 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index d4f03a0df..604f64ee0 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ system-test/*key.json .DS_Store package-lock.json __pycache__ +.idea diff --git a/src/partial-result-stream.ts b/src/partial-result-stream.ts index a3d42ee6d..92af4a331 100644 --- a/src/partial-result-stream.ts +++ b/src/partial-result-stream.ts @@ -244,11 +244,15 @@ export class PartialResultStream extends Transform implements ResultEvents { } } - _clearPendingValues() { - this._values = []; - if (this._pendingValueForResume) { - this._pendingValue = this._pendingValueForResume; + _resetPendingValues(hasResumeToken: boolean) { + if (hasResumeToken) { + if (this._pendingValueForResume) { + this._pendingValue = this._pendingValueForResume; + } else { + delete this._pendingValue; + } } else { + this._values = []; delete this._pendingValue; } } @@ -484,7 +488,7 @@ export function partialResultStream( }); }; const makeRequest = (): void => { - partialRSStream._clearPendingValues(); + partialRSStream._resetPendingValues(is.defined(lastResumeToken) && lastResumeToken.length > 0); lastRequestStream = requestFn(lastResumeToken); lastRequestStream.on('end', endListener); requestsStream.add(lastRequestStream); diff --git a/test/spanner.ts b/test/spanner.ts index 315fb0596..32a1b8e58 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -120,7 +120,6 @@ describe('Spanner with mock server', () => { } ); }); - server.start(); spannerMock.putStatementResult( selectSql, mock.StatementResult.resultSet(mock.createSimpleResultSet()) @@ -3695,10 +3694,10 @@ describe('Spanner with mock server', () => { }); it('should return all values from PartialResultSet with chunked string value', async () => { - for (const includeResumeToken in [true, false]) { + for (const includeResumeToken of [true, false]) { // eslint-disable-next-line @typescript-eslint/no-explicit-any let errorOnIndexes: any; - for (errorOnIndexes in [[], [0], [1], [0, 1]]) { + for (errorOnIndexes of [[], [0], [1], [0, 1]]) { const sql = 'SELECT * FROM TestTable'; const prs1 = PartialResultSet.create({ resumeToken: includeResumeToken @@ -3747,10 +3746,10 @@ describe('Spanner with mock server', () => { }); it('should return all values from PartialResultSet with chunked string value in an array', async () => { - for (const includeResumeToken in [true, false]) { + for (const includeResumeToken of [true, false]) { // eslint-disable-next-line @typescript-eslint/no-explicit-any let errorOnIndexes: any; - for (errorOnIndexes in [[], [0], [1], [0, 1]]) { + for (errorOnIndexes of [[], [0], [1], [0, 1]]) { const sql = 'SELECT * FROM TestTable'; const prs1 = PartialResultSet.create({ resumeToken: includeResumeToken @@ -3800,10 +3799,10 @@ describe('Spanner with mock server', () => { }); it('should return all values from PartialResultSet with chunked list value', async () => { - for (const includeResumeToken in [true, false]) { + for (const includeResumeToken of [true, false]) { // eslint-disable-next-line @typescript-eslint/no-explicit-any let errorOnIndexes: any; - for (errorOnIndexes in [[], [0], [1], [0, 1]]) { + for (errorOnIndexes of [[], [0], [1], [0, 1]]) { const sql = 'SELECT * FROM TestTable'; const prs1 = PartialResultSet.create({ resumeToken: includeResumeToken @@ -4047,6 +4046,200 @@ describe('Spanner with mock server', () => { } }); + it('should clear pending values if the last partial result did not have a resume token and was not a complete row', async () => { + const sql = 'SELECT * FROM TestTable'; + const prs1 = PartialResultSet.create({ + resumeToken: undefined, + metadata: createMultiColumnMetadata(), + values: [ + {stringValue: 'id1.1'}, + {stringValue: 'id1.2'}, + {stringValue: '100'}, + ], + chunkedValue: false, + }); + const prs2 = PartialResultSet.create({ + resumeToken: undefined, + values: [ + {boolValue: true}, + {boolValue: true}, + {numberValue: 0.5}, + {stringValue: 'id2.1'}, + {stringValue: 'id2.2'}, + ], + chunkedValue: false, + }); + const prs3 = PartialResultSet.create({ + resumeToken: undefined, + values: [ + {stringValue: '200'}, + {boolValue: true}, + {boolValue: true}, + {numberValue: 0.5}, + ], + }); + // Let the stream return UNAVAILABLE on index 1 (so the second PartialResultSet). + setupResultsAndErrors(sql, [prs1, prs2, prs3], [1]); + const database = newTestDatabase(); + try { + const [rows] = (await database.run({ + sql, + json: true, + })) as Json[][]; + verifyQueryResult(rows); + } finally { + await database.close(); + } + }); + + it('should not clear pending values if the last partial result had a resume token and was not a complete row', async () => { + for (const errorIndexes of [[1], [2]]) { + const sql = 'SELECT * FROM TestTable'; + const prs1 = PartialResultSet.create({ + resumeToken: Buffer.from('00000000'), + metadata: createMultiColumnMetadata(), + values: [ + {stringValue: 'id1.1'}, + {stringValue: 'id1.2'}, + {stringValue: '100'}, + ], + chunkedValue: false, + }); + const prs2 = PartialResultSet.create({ + resumeToken: undefined, + values: [ + {boolValue: true}, + {boolValue: true}, + {numberValue: 0.5}, + {stringValue: 'id2.1'}, + {stringValue: 'id2.2'}, + ], + chunkedValue: false, + }); + const prs3 = PartialResultSet.create({ + resumeToken: undefined, + values: [ + {stringValue: '200'}, + {boolValue: true}, + {boolValue: true}, + {numberValue: 0.5}, + ], + }); + setupResultsAndErrors(sql, [prs1, prs2, prs3], errorIndexes); + const database = newTestDatabase(); + try { + const [rows] = (await database.run({ + sql, + json: true, + })) as Json[][]; + verifyQueryResult(rows); + } finally { + await database.close(); + } + } + }); + + it('should not clear pending values if the last partial result was chunked and had a resume token', async () => { + for (const errorIndexes of [[2]]) { + const sql = 'SELECT * FROM TestTable'; + const prs1 = PartialResultSet.create({ + resumeToken: Buffer.from('00000000'), + metadata: createMultiColumnMetadata(), + values: [ + {stringValue: 'id1.1'}, + {stringValue: 'id1.2'}, + {stringValue: '100'}, + ], + chunkedValue: true, + }); + const prs2 = PartialResultSet.create({ + resumeToken: undefined, + values: [ + // The previous value was chunked, but it is still perfectly possible that it actually contained + // the entire value. So in this case the actual value was '100'. + {stringValue: ''}, + {boolValue: true}, + {boolValue: true}, + {numberValue: 0.5}, + {stringValue: 'id2.1'}, + {stringValue: 'id2.2'}, + ], + chunkedValue: false, + }); + const prs3 = PartialResultSet.create({ + resumeToken: undefined, + values: [ + {stringValue: '200'}, + {boolValue: true}, + {boolValue: true}, + {numberValue: 0.5}, + ], + }); + setupResultsAndErrors(sql, [prs1, prs2, prs3], errorIndexes); + const database = newTestDatabase(); + try { + const [rows] = (await database.run({ + sql, + json: true, + })) as Json[][]; + verifyQueryResult(rows); + } finally { + await database.close(); + } + } + }); + + function verifyQueryResult(rows: Json[]) { + assert.strictEqual(rows.length, 2); + assert.strictEqual(rows[0].col1, 'id1.1'); + assert.strictEqual(rows[0].col2, 'id1.2'); + assert.strictEqual(rows[0].col3, 100); + assert.strictEqual(rows[0].col4, true); + assert.strictEqual(rows[0].col5, true); + assert.strictEqual(rows[0].col6, 0.5); + + assert.strictEqual(rows[1].col1, 'id2.1'); + assert.strictEqual(rows[1].col2, 'id2.2'); + assert.strictEqual(rows[1].col3, 200); + assert.strictEqual(rows[1].col4, true); + assert.strictEqual(rows[1].col5, true); + assert.strictEqual(rows[1].col6, 0.5); + } + + function createMultiColumnMetadata() { + const fields = [ + protobuf.StructType.Field.create({ + name: 'col1', + type: protobuf.Type.create({code: protobuf.TypeCode.STRING}), + }), + protobuf.StructType.Field.create({ + name: 'col2', + type: protobuf.Type.create({code: protobuf.TypeCode.STRING}), + }), + protobuf.StructType.Field.create({ + name: 'col3', + type: protobuf.Type.create({code: protobuf.TypeCode.INT64}), + }), + protobuf.StructType.Field.create({ + name: 'col4', + type: protobuf.Type.create({code: protobuf.TypeCode.BOOL}), + }), + protobuf.StructType.Field.create({ + name: 'col5', + type: protobuf.Type.create({code: protobuf.TypeCode.BOOL}), + }), + protobuf.StructType.Field.create({ + name: 'col6', + type: protobuf.Type.create({code: protobuf.TypeCode.FLOAT64}), + }), + ]; + return new protobuf.ResultSetMetadata({ + rowType: new protobuf.StructType({ + fields, + }), + }); + } + function createMetadata() { const fields = [ protobuf.StructType.Field.create({ From 26f1d3ca67a0cfdfaeecf43b39627878f6412183 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Wed, 21 Feb 2024 19:05:39 +0000 Subject: [PATCH 2/4] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- src/partial-result-stream.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/partial-result-stream.ts b/src/partial-result-stream.ts index 92af4a331..8131ab205 100644 --- a/src/partial-result-stream.ts +++ b/src/partial-result-stream.ts @@ -488,7 +488,9 @@ export function partialResultStream( }); }; const makeRequest = (): void => { - partialRSStream._resetPendingValues(is.defined(lastResumeToken) && lastResumeToken.length > 0); + partialRSStream._resetPendingValues( + is.defined(lastResumeToken) && lastResumeToken.length > 0 + ); lastRequestStream = requestFn(lastResumeToken); lastRequestStream.on('end', endListener); requestsStream.add(lastRequestStream); From 5d1f2a8575b2143946ed5d6084866bdfd894265b Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Thu, 22 Feb 2024 16:37:40 +0530 Subject: [PATCH 3/4] Call resetpendingvalues only when there is a resume token --- src/partial-result-stream.ts | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/partial-result-stream.ts b/src/partial-result-stream.ts index 8131ab205..04208307e 100644 --- a/src/partial-result-stream.ts +++ b/src/partial-result-stream.ts @@ -244,15 +244,10 @@ export class PartialResultStream extends Transform implements ResultEvents { } } - _resetPendingValues(hasResumeToken: boolean) { - if (hasResumeToken) { - if (this._pendingValueForResume) { - this._pendingValue = this._pendingValueForResume; - } else { - delete this._pendingValue; - } + _resetPendingValues() { + if (this._pendingValueForResume) { + this._pendingValue = this._pendingValueForResume; } else { - this._values = []; delete this._pendingValue; } } @@ -488,9 +483,10 @@ export function partialResultStream( }); }; const makeRequest = (): void => { - partialRSStream._resetPendingValues( - is.defined(lastResumeToken) && lastResumeToken.length > 0 - ); + + if(is.defined(lastResumeToken) && lastResumeToken.length > 0) { + partialRSStream._resetPendingValues(); + } lastRequestStream = requestFn(lastResumeToken); lastRequestStream.on('end', endListener); requestsStream.add(lastRequestStream); From eba384403f59a2c0a2fed3c6847533e8f4beb3ce Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Thu, 22 Feb 2024 11:11:03 +0000 Subject: [PATCH 4/4] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20po?= =?UTF-8?q?st-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- src/partial-result-stream.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/partial-result-stream.ts b/src/partial-result-stream.ts index 04208307e..e03539219 100644 --- a/src/partial-result-stream.ts +++ b/src/partial-result-stream.ts @@ -483,9 +483,8 @@ export function partialResultStream( }); }; const makeRequest = (): void => { - - if(is.defined(lastResumeToken) && lastResumeToken.length > 0) { - partialRSStream._resetPendingValues(); + if (is.defined(lastResumeToken) && lastResumeToken.length > 0) { + partialRSStream._resetPendingValues(); } lastRequestStream = requestFn(lastResumeToken); lastRequestStream.on('end', endListener);