diff --git a/src/transaction.ts b/src/transaction.ts index 212698e4c..3a64b7730 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -17,6 +17,7 @@ import {DateStruct, PreciseDate} from '@google-cloud/precise-date'; import {promisifyAll} from '@google-cloud/promisify'; import arrify = require('arrify'); +import Long = require('long'); import {EventEmitter} from 'events'; import {grpc, CallOptions, ServiceError, Status, GoogleError} from 'google-gax'; import * as is from 'is'; @@ -101,7 +102,7 @@ export interface ReadRequest extends RequestOptions { keys?: string[] | string[][]; ranges?: KeyRange[]; keySet?: spannerClient.spanner.v1.IKeySet | null; - limit?: number | Long | null; + limit?: number | Long | string | null; resumeToken?: Uint8Array | null; partitionToken?: Uint8Array | null; requestOptions?: Omit; @@ -223,7 +224,7 @@ export type CommitCallback = export class Snapshot extends EventEmitter { protected _options!: spannerClient.spanner.v1.ITransactionOptions; protected _seqno = 1; - protected _idWaiter: Readable; + protected _waitingRequests: Array<() => void>; protected _inlineBeginStarted; protected _useInRunner = false; id?: Uint8Array | string; @@ -299,9 +300,7 @@ export class Snapshot extends EventEmitter { this.resourceHeader_ = { [CLOUD_RESOURCE_HEADER]: (this.session.parent as Database).formattedName_, }; - this._idWaiter = new Readable({ - read() {}, - }); + this._waitingRequests = []; this._inlineBeginStarted = false; } @@ -1306,7 +1305,7 @@ export class Snapshot extends EventEmitter { this.readTimestampProto = readTimestamp; this.readTimestamp = new PreciseDate(readTimestamp as DateStruct); } - this._idWaiter.emit('notify'); + this._releaseWaitingRequests(); } /** @@ -1326,12 +1325,28 @@ export class Snapshot extends EventEmitter { this._inlineBeginStarted = true; return makeRequest; } - return (resumeToken?: ResumeToken): Readable => - this._idWaiter.once('notify', () => + + // Queue subsequent requests. + return (resumeToken?: ResumeToken): Readable => { + const streamProxy = new Readable({ + read() {}, + }); + + this._waitingRequests.push(() => { makeRequest(resumeToken) - .on('data', chunk => this._idWaiter.emit('data', chunk)) - .once('end', () => this._idWaiter.emit('end')) - ); + .on('data', chunk => streamProxy.emit('data', chunk)) + .on('end', () => streamProxy.emit('end')); + }); + + return streamProxy; + }; + } + + _releaseWaitingRequests() { + while (this._waitingRequests.length > 0) { + const request = this._waitingRequests.shift(); + request?.(); + } } /** diff --git a/test/spanner.ts b/test/spanner.ts index 2ca4697f0..a01338ac5 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -3296,6 +3296,40 @@ describe('Spanner with mock server', () => { assert.ok(!beginTxnRequest, 'beginTransaction was called'); }); + it('should handle parallel request with inline begin transaction', async () => { + const database = newTestDatabase(); + await database.runTransactionAsync(async tx => { + const rowCount1 = getRowCountFromStreamingSql(tx!, {sql: selectSql}); + const rowCount2 = getRowCountFromStreamingSql(tx!, {sql: selectSql}); + const rowCount3 = getRowCountFromStreamingSql(tx!, {sql: selectSql}); + await Promise.all([rowCount1, rowCount2, rowCount3]); + await tx.commit(); + }); + await database.close(); + + let request = spannerMock.getRequests().find(val => { + return (val as v1.ExecuteSqlRequest).sql; + }) as v1.ExecuteSqlRequest; + assert.ok(request, 'no ExecuteSqlRequest found'); + assert.ok(request.transaction!.begin!.readWrite, 'ReadWrite is not set'); + assert.strictEqual(request.sql, selectSql); + + request = spannerMock + .getRequests() + .slice() + .reverse() + .find(val => { + return (val as v1.ExecuteSqlRequest).sql; + }) as v1.ExecuteSqlRequest; + assert.ok(request, 'no ExecuteSqlRequest found'); + assert.strictEqual(request.sql, selectSql); + assert.ok(request.transaction!.id, 'TransactionID is not set.'); + const beginTxnRequest = spannerMock.getRequests().find(val => { + return (val as v1.BeginTransactionRequest).options?.readWrite; + }) as v1.BeginTransactionRequest; + assert.ok(!beginTxnRequest, 'beginTransaction was called'); + }); + it('should use beginTransaction on retry', async () => { const database = newTestDatabase(); let attempts = 0;