diff --git a/observability-test/database.ts b/observability-test/database.ts index fce3ef743..b14813cce 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -21,7 +21,12 @@ import {EventEmitter} from 'events'; import * as assert from 'assert'; import * as extend from 'extend'; import {google} from '../protos/protos'; -import {CommitCallback, CommitOptions, MutationSet} from '../src/transaction'; +import { + BatchWriteOptions, + CommitCallback, + CommitOptions, + MutationSet, +} from '../src/transaction'; import {util} from '@google-cloud/common'; import {Transform} from 'stream'; import * as proxyquire from 'proxyquire'; @@ -35,7 +40,7 @@ const { // eslint-disable-next-line n/no-extraneous-require const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); import * as db from '../src/database'; -import {Instance, Spanner} from '../src'; +import {Instance, MutationGroup, Spanner} from '../src'; import * as pfy from '@google-cloud/promisify'; import {grpc} from 'google-gax'; import {MockError} from '../test/mockserver/mockspanner'; @@ -1215,6 +1220,224 @@ describe('Database', () => { }); }); + describe('batchWriteAtLeastOnce', () => { + const mutationGroup1 = new MutationGroup(); + mutationGroup1.insert('MyTable', { + Key: 'ks1', + Thing: 'abc', + }); + const mutationGroup2 = new MutationGroup(); + mutationGroup2.insert('MyTable', { + Key: 'ks2', + Thing: 'xyz', + }); + + const mutationGroups = [mutationGroup1, mutationGroup2]; + + let fakePool: FakeSessionPool; + let fakeSession: FakeSession; + let fakeDataStream: Transform; + let getSessionStub: sinon.SinonStub; + let requestStreamStub: sinon.SinonStub; + + const options = { + requestOptions: { + transactionTag: 'batch-write-tag', + }, + excludeTxnFromChangeStream: true, + gaxOptions: {autoPaginate: false}, + } as BatchWriteOptions; + + beforeEach(() => { + fakePool = database.pool_; + fakeSession = new FakeSession(); + fakeDataStream = through.obj(); + + getSessionStub = ( + sandbox.stub(fakePool, 'getSession') as sinon.SinonStub + ).callsFake(callback => callback(null, fakeSession)); + + requestStreamStub = sandbox + .stub(database, 'requestStream') + .returns(fakeDataStream); + }); + + it('on retry with "Session not found" error', done => { + const sessionNotFoundError = { + code: grpc.status.NOT_FOUND, + message: 'Session not found', + } as grpc.ServiceError; + let retryCount = 0; + + database + .batchWriteAtLeastOnce(mutationGroups, options) + .on('data', () => {}) + .on('error', err => { + assert.fail(err); + }) + .on('end', () => { + assert.strictEqual(retryCount, 1); + + const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchWriteAtLeastOnce', + 'CloudSpanner.Database.batchWriteAtLeastOnce', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + + const errorMessage = firstSpan.status.message; + assert.deepStrictEqual( + firstSpan.status.message, + sessionNotFoundError.message + ); + + // The last span should not have an error status. + const lastSpan = spans[spans.length - 1]; + assert.strictEqual( + SpanStatusCode.UNSET, + lastSpan.status.code, + 'Unexpected span status' + ); + + assert.deepStrictEqual(lastSpan.status.message, undefined); + + const expectedEventNames = [ + 'Using Session', + 'No session available', + 'Using Session', + ]; + assert.deepStrictEqual(actualEventNames, expectedEventNames); + + done(); + }); + + fakeDataStream.emit('error', sessionNotFoundError); + retryCount++; + }); + + it('on getSession errors', done => { + const fakeError = new Error('err'); + + getSessionStub.callsFake(callback => callback(fakeError)); + database + .batchWriteAtLeastOnce(mutationGroups, options) + .on('error', err => { + assert.strictEqual(err, fakeError); + + const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchWriteAtLeastOnce', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.ERROR, + firstSpan.status.code, + 'Expected an ERROR span status' + ); + + assert.deepStrictEqual(firstSpan.status.message, fakeError.message); + + const expectedEventNames = []; + assert.deepStrictEqual(expectedEventNames, actualEventNames); + + done(); + }); + }); + + it('with no errors', done => { + getSessionStub.callsFake(callback => callback(null, {})); + database + .batchWriteAtLeastOnce(mutationGroups, options) + .on('data', () => {}) + .on('error', assert.ifError) + .on('end', () => { + const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchWriteAtLeastOnce', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the span actually produced an error that was recorded. + const firstSpan = spans[0]; + assert.strictEqual( + SpanStatusCode.UNSET, + firstSpan.status.code, + 'Unexpected span status code' + ); + + assert.strictEqual( + undefined, + firstSpan.status.message, + 'Unexpected span status message' + ); + + const expectedEventNames = ['Using Session']; + assert.deepStrictEqual(actualEventNames, expectedEventNames); + + done(); + }); + + fakeDataStream.emit('data', 'response'); + fakeDataStream.end('end'); + }); + }); + describe('runTransaction', () => { const SESSION = new FakeSession(); const TRANSACTION = new FakeTransaction( diff --git a/src/database.ts b/src/database.ts index 9a2b703a0..ba46d691d 100644 --- a/src/database.ts +++ b/src/database.ts @@ -3469,66 +3469,79 @@ class Database extends common.GrpcServiceObject { ): NodeJS.ReadableStream { const proxyStream: Transform = through.obj(); - const span = getActiveOrNoopSpan(); - - this.pool_.getSession((err, session) => { - if (err) { - proxyStream.destroy(err); - return; - } - - span.addEvent('Using Session', {'session.id': session?.id}); - const gaxOpts = extend(true, {}, options?.gaxOptions); - const reqOpts = Object.assign( - {} as spannerClient.spanner.v1.BatchWriteRequest, - { - session: session!.formattedName_!, - mutationGroups: mutationGroups.map(mg => mg.proto()), - requestOptions: options?.requestOptions, - excludeTxnFromChangeStream: options?.excludeTxnFromChangeStreams, - } - ); - let dataReceived = false; - let dataStream = this.requestStream({ - client: 'SpannerClient', - method: 'batchWrite', - reqOpts, - gaxOpts, - headers: this.resourceHeader_, - }); - dataStream - .once('data', () => (dataReceived = true)) - .once('error', err => { - if ( - !dataReceived && - isSessionNotFoundError(err as grpc.ServiceError) - ) { - // If there's a 'Session not found' error and we have not yet received - // any data, we can safely retry the writes on a new session. - // Register the error on the session so the pool can discard it. - if (session) { - session.lastError = err as grpc.ServiceError; - } - span.addEvent('No session available', { - 'session.id': session?.id, - }); - // Remove the current data stream from the end user stream. - dataStream.unpipe(proxyStream); - dataStream.end(); - // Create a new stream and add it to the end user stream. - dataStream = this.batchWriteAtLeastOnce(mutationGroups, options); - dataStream.pipe(proxyStream); - } else { + return startTrace( + 'Database.batchWriteAtLeastOnce', + this._traceConfig, + span => { + this.pool_.getSession((err, session) => { + if (err) { proxyStream.destroy(err); + setSpanError(span, err); + span.end(); + return; } - }) - .once('end', () => { - this.pool_.release(session!); - }) - .pipe(proxyStream); - }); - return proxyStream as NodeJS.ReadableStream; + span.addEvent('Using Session', {'session.id': session?.id}); + const gaxOpts = extend(true, {}, options?.gaxOptions); + const reqOpts = Object.assign( + {} as spannerClient.spanner.v1.BatchWriteRequest, + { + session: session!.formattedName_!, + mutationGroups: mutationGroups.map(mg => mg.proto()), + requestOptions: options?.requestOptions, + excludeTxnFromChangeStream: options?.excludeTxnFromChangeStreams, + } + ); + let dataReceived = false; + let dataStream = this.requestStream({ + client: 'SpannerClient', + method: 'batchWrite', + reqOpts, + gaxOpts, + headers: this.resourceHeader_, + }); + dataStream + .once('data', () => (dataReceived = true)) + .once('error', err => { + setSpanError(span, err); + + if ( + !dataReceived && + isSessionNotFoundError(err as grpc.ServiceError) + ) { + // If there's a 'Session not found' error and we have not yet received + // any data, we can safely retry the writes on a new session. + // Register the error on the session so the pool can discard it. + if (session) { + session.lastError = err as grpc.ServiceError; + } + span.addEvent('No session available', { + 'session.id': session?.id, + }); + // Remove the current data stream from the end user stream. + dataStream.unpipe(proxyStream); + dataStream.end(); + // Create a new stream and add it to the end user stream. + dataStream = this.batchWriteAtLeastOnce( + mutationGroups, + options + ); + dataStream.pipe(proxyStream); + } else { + span.end(); + proxyStream.destroy(err); + } + }) + .once('end', () => { + span.end(); + this.pool_.release(session!); + }) + .pipe(proxyStream); + }); + + return proxyStream as NodeJS.ReadableStream; + } + ); } /**