From 80a8d20b6e7c0588c9f2d0c692aeeda4c1159771 Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Tue, 29 Aug 2023 14:37:49 +0530 Subject: [PATCH 1/7] fix: idwaiter with multiple requests --- src/transaction.ts | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/src/transaction.ts b/src/transaction.ts index 212698e4c..e413d33c6 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -223,7 +223,7 @@ export type CommitCallback = export class Snapshot extends EventEmitter { protected _options!: spannerClient.spanner.v1.ITransactionOptions; protected _seqno = 1; - protected _idWaiter: Readable; + protected _waitingRequests: Array<() => void>; protected _inlineBeginStarted; protected _useInRunner = false; id?: Uint8Array | string; @@ -299,9 +299,7 @@ export class Snapshot extends EventEmitter { this.resourceHeader_ = { [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_, }; - this._idWaiter = new Readable({ - read() {}, - }); + this._waitingRequests = [] this._inlineBeginStarted = false; } @@ -1306,7 +1304,7 @@ export class Snapshot extends EventEmitter { this.readTimestampProto = readTimestamp; this.readTimestamp = new PreciseDate(readTimestamp as DateStruct); } - this._idWaiter.emit('notify'); + this._releaseWaitingRequests(); } /** @@ -1326,12 +1324,30 @@ export class Snapshot extends EventEmitter { this._inlineBeginStarted = true; return makeRequest; } - return (resumeToken?: ResumeToken): Readable => - this._idWaiter.once('notify', () => + + // Queue subsequent requests. + return (resumeToken?: ResumeToken): Readable => { + const streamProxy = new Readable({ + read() { + } + }); + + this._waitingRequests.push(() => { makeRequest(resumeToken) - .on('data', chunk => this._idWaiter.emit('data', chunk)) - .once('end', () => this._idWaiter.emit('end')) - ); + .on('data', (chunk) => streamProxy.emit('data', chunk)) + .on('end', () => streamProxy.emit('end')); + }); + + return streamProxy; + }; + } + + _releaseWaitingRequests() { + this._inlineBeginStarted = false; + while (this._waitingRequests.length > 0) { + const request = this._waitingRequests.shift(); + request?.(); + } } /** From 4f92751be3153fb643762a7a35f3d0d3807616e6 Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Tue, 29 Aug 2023 16:42:30 +0530 Subject: [PATCH 2/7] test: for parallel request --- test/spanner.ts | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/test/spanner.ts b/test/spanner.ts index 2ca4697f0..a01338ac5 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -3296,6 +3296,40 @@ describe('Spanner with mock server', () => { assert.ok(!beginTxnRequest, 'beginTransaction was called'); }); + it('should handle parallel request with inline begin transaction', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(async tx => { + const rowCount1 = getRowCountFromStreamingSql(tx!, {sql: selectSql}); + const rowCount2 = getRowCountFromStreamingSql(tx!, {sql: selectSql}); + const rowCount3 = getRowCountFromStreamingSql(tx!, {sql: selectSql}); + await Promise.all([rowCount1, rowCount2, rowCount3]); + await tx.commit(); + }); + await database.close(); + + let request = spannerMock.getRequests().find(val => { + return (val as v1.ExecuteSqlRequest).sql; + }) as v1.ExecuteSqlRequest; + assert.ok(request, 'no ExecuteSqlRequest found'); + assert.ok(request.transaction!.begin!.readWrite, 'ReadWrite is not set'); + assert.strictEqual(request.sql, selectSql); + + request = spannerMock + .getRequests() + .slice() + .reverse() + .find(val => { + return (val as v1.ExecuteSqlRequest).sql; + }) as v1.ExecuteSqlRequest; + assert.ok(request, 'no ExecuteSqlRequest found'); + assert.strictEqual(request.sql, selectSql); + assert.ok(request.transaction!.id, 'TransactionID is not set.'); + const beginTxnRequest = spannerMock.getRequests().find(val => { + return (val as v1.BeginTransactionRequest).options?.readWrite; + }) as v1.BeginTransactionRequest; + assert.ok(!beginTxnRequest, 'beginTransaction was called'); + }); + it('should use beginTransaction on retry', async () => { const database = newTestDatabase(); let attempts = 0; From 3f17fa6d47a2fca47db970188948588706fd56eb Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Tue, 29 Aug 2023 18:11:01 +0530 Subject: [PATCH 3/7] fix: review comments --- src/transaction.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/transaction.ts b/src/transaction.ts index e413d33c6..baa6e327f 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -1343,7 +1343,6 @@ export class Snapshot extends EventEmitter { } _releaseWaitingRequests() { - this._inlineBeginStarted = false; while (this._waitingRequests.length > 0) { const request = this._waitingRequests.shift(); request?.(); From 4c6d673ae43ef7846851c40c0c2ab5ca523ba922 Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Wed, 30 Aug 2023 10:19:49 +0530 Subject: [PATCH 4/7] fix: lint --- src/transaction.ts | 13 ++++++------- system-test/spanner.ts | 10 ++++------ test/spanner.ts | 5 ++--- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/src/transaction.ts b/src/transaction.ts index baa6e327f..0fa537c34 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -299,7 +299,7 @@ export class Snapshot extends EventEmitter { this.resourceHeader_ = { [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_, }; - this._waitingRequests = [] + this._waitingRequests = []; this._inlineBeginStarted = false; } @@ -1328,14 +1328,13 @@ export class Snapshot extends EventEmitter { // Queue subsequent requests. return (resumeToken?: ResumeToken): Readable => { const streamProxy = new Readable({ - read() { - } + read() {}, }); this._waitingRequests.push(() => { makeRequest(resumeToken) - .on('data', (chunk) => streamProxy.emit('data', chunk)) - .on('end', () => streamProxy.emit('end')); + .on('data', chunk => streamProxy.emit('data', chunk)) + .on('end', () => streamProxy.emit('end')); }); return streamProxy; @@ -1344,8 +1343,8 @@ export class Snapshot extends EventEmitter { _releaseWaitingRequests() { while (this._waitingRequests.length > 0) { - const request = this._waitingRequests.shift(); - request?.(); + const request = this._waitingRequests.shift(); + request?.(); } } diff --git a/system-test/spanner.ts b/system-test/spanner.ts index ddacbf344..fa5878f04 100644 --- a/system-test/spanner.ts +++ b/system-test/spanner.ts @@ -158,9 +158,8 @@ describe('Spanner', () => { PRIMARY KEY (SingerId) );`, ]; - const [postgreSqlOperationUpdateDDL] = await pg_database.updateSchema( - schema - ); + const [postgreSqlOperationUpdateDDL] = + await pg_database.updateSchema(schema); await postgreSqlOperationUpdateDDL.promise(); RESOURCES_TO_CLEAN.push(PG_DATABASE); @@ -2617,9 +2616,8 @@ describe('Spanner', () => { ); await operation.promise(); - const [operationUpdateDDL] = await database.updateSchema( - database_schema - ); + const [operationUpdateDDL] = + await database.updateSchema(database_schema); await operationUpdateDDL.promise(); const [schema] = await database.getSchema(); diff --git a/test/spanner.ts b/test/spanner.ts index a01338ac5..87e1493c9 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -4254,9 +4254,8 @@ describe('Spanner with mock server', () => { const dbSpecificQuery: GetDatabaseOperationsOptions = { filter: dbSpecificFilter, }; - const [operations1] = await instance.getDatabaseOperations( - dbSpecificQuery - ); + const [operations1] = + await instance.getDatabaseOperations(dbSpecificQuery); const database = instance.database('test-database'); const [operations2] = await database.getOperations(); From a34a29da4aaf0b799ac9b391934ac893c613d7ad Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Wed, 30 Aug 2023 10:28:32 +0530 Subject: [PATCH 5/7] fix: compile error --- src/transaction.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/transaction.ts b/src/transaction.ts index 0fa537c34..1112700f5 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -101,7 +101,7 @@ export interface ReadRequest extends RequestOptions { keys?: string[] | string[][]; ranges?: KeyRange[]; keySet?: spannerClient.spanner.v1.IKeySet | null; - limit?: number | Long | null; + limit?: number | Long | string | null; resumeToken?: Uint8Array | null; partitionToken?: Uint8Array | null; requestOptions?: Omit; From 01405694c370397e02b272caea6c5fe97cf7b70a Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Wed, 30 Aug 2023 10:41:37 +0530 Subject: [PATCH 6/7] fix: typescript error --- src/transaction.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/transaction.ts b/src/transaction.ts index 1112700f5..55dad8b5a 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -17,6 +17,7 @@ import {DateStruct, PreciseDate} from '@google-cloud/precise-date'; import {promisifyAll} from '@google-cloud/promisify'; import arrify = require('arrify'); +import Long = require("long") import {EventEmitter} from 'events'; import {grpc, CallOptions, ServiceError, Status, GoogleError} from 'google-gax'; import * as is from 'is'; From 423e299e0fbce5fc782863c32c76a64322dc7a1b Mon Sep 17 00:00:00 2001 From: surbhigarg92 Date: Wed, 30 Aug 2023 10:47:36 +0530 Subject: [PATCH 7/7] fix: lint --- src/transaction.ts | 2 +- system-test/spanner.ts | 10 ++++++---- test/spanner.ts | 5 +++-- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/transaction.ts b/src/transaction.ts index 55dad8b5a..3a64b7730 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -17,7 +17,7 @@ import {DateStruct, PreciseDate} from '@google-cloud/precise-date'; import {promisifyAll} from '@google-cloud/promisify'; import arrify = require('arrify'); -import Long = require("long") +import Long = require('long'); import {EventEmitter} from 'events'; import {grpc, CallOptions, ServiceError, Status, GoogleError} from 'google-gax'; import * as is from 'is'; diff --git a/system-test/spanner.ts b/system-test/spanner.ts index fa5878f04..ddacbf344 100644 --- a/system-test/spanner.ts +++ b/system-test/spanner.ts @@ -158,8 +158,9 @@ describe('Spanner', () => { PRIMARY KEY (SingerId) );`, ]; - const [postgreSqlOperationUpdateDDL] = - await pg_database.updateSchema(schema); + const [postgreSqlOperationUpdateDDL] = await pg_database.updateSchema( + schema + ); await postgreSqlOperationUpdateDDL.promise(); RESOURCES_TO_CLEAN.push(PG_DATABASE); @@ -2616,8 +2617,9 @@ describe('Spanner', () => { ); await operation.promise(); - const [operationUpdateDDL] = - await database.updateSchema(database_schema); + const [operationUpdateDDL] = await database.updateSchema( + database_schema + ); await operationUpdateDDL.promise(); const [schema] = await database.getSchema(); diff --git a/test/spanner.ts b/test/spanner.ts index 87e1493c9..a01338ac5 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -4254,8 +4254,9 @@ describe('Spanner with mock server', () => { const dbSpecificQuery: GetDatabaseOperationsOptions = { filter: dbSpecificFilter, }; - const [operations1] = - await instance.getDatabaseOperations(dbSpecificQuery); + const [operations1] = await instance.getDatabaseOperations( + dbSpecificQuery + ); const database = instance.database('test-database'); const [operations2] = await database.getOperations();