From bc2cbf646085775ed0080c6cb69b11333bd5479c Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 28 Oct 2024 06:48:24 -0700 Subject: [PATCH] feat: (observability): trace Database.runPartitionedUpdate (#2176) This change traces Database.runPartitionedUpdate along with the appropriate tests for it with and without errors. Updates #2079 --- observability-test/database.ts | 221 ++++++++++++++++++++++++++++++++- observability-test/spanner.ts | 47 +++++++ src/database.ts | 26 +++- 3 files changed, 287 insertions(+), 7 deletions(-) diff --git a/observability-test/database.ts b/observability-test/database.ts index f2884be1d..7351f5297 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -38,7 +38,10 @@ const { InMemorySpanExporter, } = require('@opentelemetry/sdk-trace-node'); // eslint-disable-next-line n/no-extraneous-require -const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); +const { + ReadableSpan, + SimpleSpanProcessor, +} = require('@opentelemetry/sdk-trace-base'); import * as db from '../src/database'; import {Instance, MutationGroup, Spanner} from '../src'; import * as pfy from '@google-cloud/promisify'; @@ -1953,4 +1956,220 @@ describe('Database', () => { fakeStream2.push(null); }); }); + + describe('runPartitionedUpdate', () => { + const QUERY = { + sql: 'INSERT INTO `MyTable` (Key, Thing) VALUES(@key, @thing)', + params: { + key: 'k999', + thing: 'abc', + }, + }; + + let fakePool: FakeSessionPool; + let fakeSession: FakeSession; + let fakePartitionedDml = new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.PartitionedDml + ); + + let getSessionStub; + let beginStub; + let runUpdateStub; + + beforeEach(() => { + fakePool = database.pool_; + fakeSession = new FakeSession(); + fakePartitionedDml = new FakeTransaction( + {} as google.spanner.v1.TransactionOptions.PartitionedDml + ); + + getSessionStub = ( + sandbox.stub(fakePool, 'getSession') as sinon.SinonStub + ).callsFake(callback => { + callback(null, fakeSession); + }); + + sandbox.stub(fakeSession, 'partitionedDml').returns(fakePartitionedDml); + + beginStub = ( + sandbox.stub(fakePartitionedDml, 'begin') as sinon.SinonStub + ).callsFake(callback => callback(null)); + + runUpdateStub = ( + sandbox.stub(fakePartitionedDml, 'runUpdate') as sinon.SinonStub + ).callsFake((_, callback) => callback(null)); + }); + + interface traceExportResults { + spanNames: string[]; + spans: (typeof ReadableSpan)[]; + eventNames: string[]; + } + + async function getTraceExportResults(): Promise { + await provider.forceFlush(); + await traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + return Promise.resolve({ + spanNames: actualSpanNames, + spans: spans, + eventNames: actualEventNames, + }); + } + + it('with pool errors', done => { + const fakeError = new Error('err'); + const fakeCallback = sandbox.spy(); + + getSessionStub.callsFake(callback => callback(fakeError)); + database.runPartitionedUpdate(QUERY, async (err, rowCount) => { + assert.strictEqual(err, fakeError); + assert.strictEqual(rowCount, 0); + + const exportResults = await getTraceExportResults(); + const actualSpanNames = exportResults.spanNames; + const spans = exportResults.spans; + const actualEventNames = exportResults.eventNames; + + const expectedSpanNames = [ + 'CloudSpanner.Database.runPartitionedUpdate', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the first span actually produced an error that was recorded. + const parentSpan = spans[0]; + assert.deepStrictEqual( + SpanStatusCode.ERROR, + parentSpan.status.code, + 'Expected an ERROR span status' + ); + assert.deepStrictEqual( + fakeError.message, + parentSpan.status.message.toString(), + 'Mismatched span status message' + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + + it('with begin errors', done => { + const fakeError = new Error('err'); + + beginStub.callsFake(callback => callback(fakeError)); + + const releaseStub = ( + sandbox.stub(fakePool, 'release') as sinon.SinonStub + ).withArgs(fakeSession); + + database.runPartitionedUpdate(QUERY, async (err, rowCount) => { + assert.strictEqual(err, fakeError); + assert.strictEqual(rowCount, 0); + assert.strictEqual(releaseStub.callCount, 1); + + const exportResults = await getTraceExportResults(); + const actualSpanNames = exportResults.spanNames; + const spans = exportResults.spans; + const actualEventNames = exportResults.eventNames; + + const expectedSpanNames = [ + 'CloudSpanner.Database.runPartitionedUpdate', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the first span actually produced an error that was recorded. + const parentSpan = spans[0]; + assert.deepStrictEqual( + SpanStatusCode.ERROR, + parentSpan.status.code, + 'Expected an ERROR span status' + ); + assert.deepStrictEqual( + fakeError.message, + parentSpan.status.message.toString(), + 'Mismatched span status message' + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + done(); + }); + }); + + it('session released on transaction end', done => { + const releaseStub = ( + sandbox.stub(fakePool, 'release') as sinon.SinonStub + ).withArgs(fakeSession); + + database.runPartitionedUpdate(QUERY, async (err, rowCount) => { + const exportResults = await getTraceExportResults(); + const actualSpanNames = exportResults.spanNames; + const spans = exportResults.spans; + const actualEventNames = exportResults.eventNames; + + const expectedSpanNames = [ + 'CloudSpanner.Database.runPartitionedUpdate', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Ensure that the first span actually produced an error that was recorded. + const parentSpan = spans[0]; + assert.deepStrictEqual( + SpanStatusCode.UNSET, + parentSpan.status.code, + 'Unexpected span status' + ); + assert.deepStrictEqual( + undefined, + parentSpan.status.message, + 'Mismatched span status message' + ); + + const expectedEventNames = []; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + done(); + }); + + fakePartitionedDml.emit('end'); + assert.strictEqual(releaseStub.callCount, 1); + }); + }); }); diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index d5fcd409a..e94e5bc23 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -422,6 +422,53 @@ describe('EndToEnd', async () => { done(); }); }); + + it('runPartitionedUpdate', async () => { + const [rowCount] = await database.runPartitionedUpdate({ + sql: updateSql, + }); + + await tracerProvider.forceFlush(); + await traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Dml.runUpdate', + 'CloudSpanner.PartitionedDml.runUpdate', + 'CloudSpanner.Database.runPartitionedUpdate', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Begin Transaction', + 'Transaction Creation Done', + 'Starting stream', + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + }); }); }); diff --git a/src/database.ts b/src/database.ts index 3d8321355..12db176c6 100644 --- a/src/database.ts +++ b/src/database.ts @@ -2858,13 +2858,27 @@ class Database extends common.GrpcServiceObject { query: string | RunPartitionedUpdateOptions, callback?: RunUpdateCallback ): void | Promise<[number]> { - this.pool_.getSession((err, session) => { - if (err) { - callback!(err as ServiceError, 0); - return; - } + const traceConfig = { + sql: query, + ...this._traceConfig, + }; + return startTrace('Database.runPartitionedUpdate', traceConfig, span => { + this.pool_.getSession((err, session) => { + if (err) { + setSpanError(span, err); + span.end(); + callback!(err as ServiceError, 0); + return; + } - this._runPartitionedUpdate(session!, query, callback); + this._runPartitionedUpdate(session!, query, (err, count) => { + if (err) { + setSpanError(span, err); + } + span.end(); + callback!(err, count); + }); + }); }); }