diff --git a/observability-test/database.ts b/observability-test/database.ts index c951b7b1c..5ce075aa1 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -605,7 +605,7 @@ describe('Database', () => { // pool, so that the pool can remove it from its inventory. const releaseStub = sandbox.stub(fakePool, 'release'); - database.getSnapshot((err, snapshot) => { + database.getSnapshot(async (err, snapshot) => { assert.ifError(err); assert.strictEqual(snapshot, fakeSnapshot2); // The first session that error should already have been released back @@ -616,8 +616,9 @@ describe('Database', () => { snapshot.emit('end'); assert.strictEqual(releaseStub.callCount, 2); + await provider.forceFlush(); + await traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); withAllSpansHaveDBName(spans); const actualSpanNames: string[] = []; @@ -640,7 +641,7 @@ describe('Database', () => { ); // Ensure that the first span actually produced an error that was recorded. - const parentSpan = spans[1]; + const parentSpan = spans[0]; assert.strictEqual( SpanStatusCode.ERROR, parentSpan.status.code, @@ -653,7 +654,7 @@ describe('Database', () => { ); // Ensure that the second span is a child of the first span. - const secondRetrySpan = spans[0]; + const secondRetrySpan = spans[1]; assert.ok( parentSpan.spanContext().traceId, 'Expected that the initial parent span has a defined traceId' @@ -774,6 +775,7 @@ describe('Database', () => { callback(null, RESPONSE); }, once() {}, + end() {}, }; database.batchTransaction = (identifier, options) => { @@ -782,10 +784,14 @@ describe('Database', () => { return fakeTransaction; }; - database.createBatchTransaction(opts, (err, transaction, resp) => { + database.createBatchTransaction(opts, async (err, transaction, resp) => { assert.strictEqual(err, null); assert.strictEqual(transaction, fakeTransaction); assert.strictEqual(resp, RESPONSE); + transaction!.end(); + + await provider.forceFlush(); + traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); withAllSpansHaveDBName(spans); @@ -839,8 +845,8 @@ describe('Database', () => { begin(callback) { callback(error, RESPONSE); }, - once() {}, + end() {}, }; database.batchTransaction = () => { @@ -926,9 +932,11 @@ describe('Database', () => { getSessionStub.callsFake(callback => callback(fakeError)); - database.getTransaction(err => { + database.getTransaction(async err => { assert.strictEqual(err, fakeError); + await provider.forceFlush(); + traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); withAllSpansHaveDBName(spans); @@ -978,6 +986,7 @@ describe('Database', () => { database.getTransaction((err, transaction) => { assert.ifError(err); assert.strictEqual(transaction, fakeTransaction); + transaction!.end(); const spans = traceExporter.getFinishedSpans(); withAllSpansHaveDBName(spans); @@ -1869,11 +1878,14 @@ describe('Database', () => { .on('error', err => { assert.fail(err); }) - .on('end', () => { + .on('end', async () => { assert.strictEqual(endStub.callCount, 1); assert.strictEqual(endStub2.callCount, 1); assert.strictEqual(rows, 1); + await provider.forceFlush(); + await traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 2, 'Exactly 1 span expected'); withAllSpansHaveDBName(spans); @@ -1898,35 +1910,35 @@ describe('Database', () => { ); // Ensure that the span actually produced an error that was recorded. - const secondSpan = spans[1]; - assert.strictEqual( + const lastSpan = spans[0]; + assert.deepStrictEqual( SpanStatusCode.ERROR, - secondSpan.status.code, + lastSpan.status.code, 'Expected an ERROR span status' ); - assert.strictEqual( + assert.deepStrictEqual( 'Session not found', - secondSpan.status.message, + lastSpan.status.message, 'Mismatched span status message' ); // Ensure that the final span that got retries did not error. - const firstSpan = spans[0]; - assert.strictEqual( + const firstSpan = spans[1]; + assert.deepStrictEqual( SpanStatusCode.UNSET, firstSpan.status.code, - 'Unexpected an span status code' + 'Unexpected span status code' ); - assert.strictEqual( + assert.deepStrictEqual( undefined, firstSpan.status.message, 'Unexpected span status message' ); const expectedEventNames = [ - 'Using Session', 'Using Session', 'No session available', + 'Using Session', ]; assert.deepStrictEqual( actualEventNames, diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index fca1b30f7..2e90b3044 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -20,9 +20,14 @@ import {google} from '../protos/protos'; import {Database, Instance, Spanner} from '../src'; import {MutationSet} from '../src/transaction'; import protobuf = google.spanner.v1; +import v1 = google.spanner.v1; +import PartialResultSet = google.spanner.v1.PartialResultSet; import * as mock from '../test/mockserver/mockspanner'; import * as mockInstanceAdmin from '../test/mockserver/mockinstanceadmin'; import * as mockDatabaseAdmin from '../test/mockserver/mockdatabaseadmin'; +import * as sinon from 'sinon'; +import {Row} from '../src/partial-result-stream'; +import {Json} from '../src/codec'; const { AlwaysOnSampler, NodeTracerProvider, @@ -30,6 +35,7 @@ const { } = require('@opentelemetry/sdk-trace-node'); // eslint-disable-next-line n/no-extraneous-require const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); +const {SpanStatusCode} = require('@opentelemetry/api'); const { disableContextAndManager, generateWithAllSpansHaveDBName, @@ -117,41 +123,45 @@ async function setup( }); } -describe('EndToEnd', () => { - let server: grpc.Server; - let spanner: Spanner; - let database: Database; - let spannerMock: mock.MockSpanner; - let traceExporter: typeof InMemorySpanExporter; - +describe('EndToEnd', async () => { const contextManager = new AsyncHooksContextManager(); setGlobalContextManager(contextManager); - afterEach(() => { disableContextAndManager(contextManager); }); - beforeEach(async () => { - traceExporter = new InMemorySpanExporter(); - const sampler = new AlwaysOnSampler(); - const provider = new NodeTracerProvider({ - sampler: sampler, - exporter: traceExporter, - }); - provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + const traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + const tracerProvider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, + }); + tracerProvider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); - const setupResult = await setup({ - tracerProvider: provider, - enableExtendedTracing: false, - }); + const setupResult = await setup({ + tracerProvider: tracerProvider, + enableExtendedTracing: false, + }); - spanner = setupResult.spanner; - server = setupResult.server; - spannerMock = setupResult.spannerMock; + const server = setupResult.server; + const spannerMock = setupResult.spannerMock; + const spanner = setupResult.spanner; + const instance = spanner.instance('instance'); - const instance = spanner.instance('instance'); - database = instance.database('database'); + after(async () => { + spanner.close(); + await server.tryShutdown(() => {}); + }); + + afterEach(async () => { + await tracerProvider.forceFlush(); + await traceExporter.reset(); + spannerMock.resetRequests(); + }); + + const database = instance.database('database'); + beforeEach(async () => { // To deflake expectations of session creation, let's // issue out a warm-up request request that'll ensure // that the SessionPool is created deterministically. @@ -161,13 +171,6 @@ describe('EndToEnd', () => { traceExporter.reset(); }); - afterEach(() => { - traceExporter.reset(); - spannerMock.resetRequests(); - spanner.close(); - server.tryShutdown(() => {}); - }); - describe('Database', () => { it('getSessions', async () => { const [rows] = await database.getSessions(); @@ -212,9 +215,11 @@ describe('EndToEnd', () => { database.getSnapshot((err, transaction) => { assert.ifError(err); - transaction!.run('SELECT 1', (err, rows) => { + transaction!.run('SELECT 1', async (err, rows) => { assert.ifError(err); + transaction!.end(); + await tracerProvider.forceFlush(); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); withAllSpansHaveDBName(spans); @@ -246,6 +251,7 @@ describe('EndToEnd', () => { 'Acquiring session', 'Cache hit: has usable session', 'Acquired session', + 'Starting stream', ]; assert.deepStrictEqual( actualEventNames, @@ -262,10 +268,13 @@ describe('EndToEnd', () => { const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( database.formattedName_ ); - database.getTransaction((err, transaction) => { + database.getTransaction(async (err, transaction) => { assert.ifError(err); assert.ok(transaction); + transaction!.end(); + transaction!.commit(); + await tracerProvider.forceFlush(); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); withAllSpansHaveDBName(spans); @@ -309,7 +318,7 @@ describe('EndToEnd', () => { database .runStream('SELECT 1') .on('data', row => {}) - .on('error', assert.ifError) + .once('error', assert.ifError) .on('end', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); @@ -335,6 +344,7 @@ describe('EndToEnd', () => { ); const expectedEventNames = [ + 'Starting stream', 'Acquiring session', 'Cache hit: has usable session', 'Acquired session', @@ -406,6 +416,7 @@ describe('EndToEnd', () => { ); const expectedEventNames = [ + 'Starting stream', 'Acquiring session', 'Cache hit: has usable session', 'Acquired session', @@ -422,49 +433,53 @@ describe('EndToEnd', () => { const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( database.formattedName_ ); - database.runTransaction((err, transaction) => { + + database.runTransaction(async (err, transaction) => { assert.ifError(err); - transaction!.run('SELECT 1', (err, rows) => { - assert.ifError(err); + await transaction!.run('SELECT 1'); + await transaction!.commit(); + await transaction!.end(); + await traceExporter.forceFlush(); - traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); - withAllSpansHaveDBName(spans); + const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); - const actualEventNames: string[] = []; - const actualSpanNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - span.events.forEach(event => { - actualEventNames.push(event.name); - }); + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); }); + }); - const expectedSpanNames = [ - 'CloudSpanner.Database.runTransaction', - 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', - ]; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - - const expectedEventNames = [ - 'Acquiring session', - 'Cache hit: has usable session', - 'Acquired session', - 'Transaction Creation Done', - ]; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` - ); + const expectedSpanNames = [ + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Database.runTransaction', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); - done(); - }); + const expectedEventNames = [ + 'Starting stream', + 'Transaction Creation Done', + 'Starting Commit', + 'Commit Done', + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + done(); }); }); @@ -501,6 +516,7 @@ describe('EndToEnd', () => { ); const expectedEventNames = [ + 'Starting stream', 'Transaction Creation Done', 'Acquiring session', 'Cache hit: has usable session', @@ -790,7 +806,10 @@ describe('ObservabilityOptions injection and propagation', async () => { database.getTransaction((err, tx) => { assert.ifError(err); - tx!.run('SELECT 1', (err, rows) => { + tx!.run('SELECT 1', async (err, rows) => { + tx!.end(); + + await tracerProvider.forceFlush(); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); @@ -821,6 +840,7 @@ describe('ObservabilityOptions injection and propagation', async () => { 'Cache hit: has usable session', 'Acquired session', 'Using Session', + 'Starting stream', 'Transaction Creation Done', ]; assert.strictEqual( @@ -842,14 +862,15 @@ describe('ObservabilityOptions injection and propagation', async () => { traceExporter.reset(); tx!.begin(); - tx!.runUpdate(updateSql, (err, rowCount) => { + tx!.runUpdate(updateSql, async (err, rowCount) => { assert.ifError(err); + tx!.end(); - traceExporter.forceFlush(); + await tracerProvider.forceFlush(); + await traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); withAllSpansHaveDBName(spans); - assert.strictEqual(spans.length, 4); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -875,6 +896,7 @@ describe('ObservabilityOptions injection and propagation', async () => { const expectedEventNames = [ 'Begin Transaction', 'Transaction Creation Done', + 'Starting stream', ]; assert.deepStrictEqual( actualEventNames.every(value => expectedEventNames.includes(value)), @@ -896,9 +918,10 @@ describe('ObservabilityOptions injection and propagation', async () => { .on('data', () => rowCount++) .on('error', assert.ifError) .on('stats', _stats => {}) - .on('end', () => { + .on('end', async () => { tx!.end(); + await tracerProvider.forceFlush(); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); @@ -928,6 +951,7 @@ describe('ObservabilityOptions injection and propagation', async () => { 'Cache hit: has usable session', 'Acquired session', 'Using Session', + 'Starting stream', ]; assert.deepStrictEqual( actualEventNames, @@ -951,7 +975,9 @@ describe('ObservabilityOptions injection and propagation', async () => { tx!.runUpdate(updateSql, async (err, rowCount) => { assert.ifError(err); - tx!.rollback(err => { + tx!.rollback(async err => { + tx!.end(); + await tracerProvider.forceFlush(); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); @@ -982,6 +1008,7 @@ describe('ObservabilityOptions injection and propagation', async () => { const expectedEventNames = [ 'Begin Transaction', 'Transaction Creation Done', + 'Starting stream', ]; assert.strictEqual( actualEventNames.every(value => @@ -1091,6 +1118,7 @@ describe('ObservabilityOptions injection and propagation', async () => { 'Requesting 25 sessions', 'Creating 25 sessions', 'Requested for 25 sessions returned 25', + 'Starting stream', 'Acquiring session', 'Waiting for a session to become available', 'Acquired session', @@ -1238,6 +1266,7 @@ describe('E2E traces with async/await', async () => { 'Requesting 25 sessions', 'Creating 25 sessions', 'Requested for 25 sessions returned 25', + 'Starting stream', 'Acquiring session', 'Waiting for a session to become available', 'Acquired session', @@ -1298,3 +1327,967 @@ describe('E2E traces with async/await', async () => { }); }); }); + +describe('Negative cases', async () => { + let server: grpc.Server; + let spanner: Spanner; + let database: Database; + let spannerMock: mock.MockSpanner; + let traceExporter: typeof InMemorySpanExporter; + let provider: typeof TracerProvider; + let observabilityOptions: typeof ObservabilityOptions; + + const selectSql1p = 'SELECT 1p'; + const messageBadSelect1p = `Missing whitespace between literal and alias [at 1:9] +SELECT 1p + ^`; + const insertAlreadyExistentDataSql = + "INSERT INTO Singers(firstName, SingerId) VALUES('Foo', 1)"; + const messageBadInsertAlreadyExistent = + 'Failed to insert row with primary key ({pk#SingerId:1}) due to previously existing row'; + + beforeEach(async () => { + traceExporter = new InMemorySpanExporter(); + provider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: traceExporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + observabilityOptions = { + tracerProvider: provider, + enableExtendedTracing: true, + }; + const setupResult = await setup(observabilityOptions); + spanner = setupResult.spanner; + server = setupResult.server; + spannerMock = setupResult.spannerMock; + + const serverErr = { + message: messageBadSelect1p, + code: grpc.status.INVALID_ARGUMENT, + } as mock.MockError; + spannerMock.putStatementResult( + selectSql1p, + mock.StatementResult.error(serverErr) + ); + + const insertAlreadyExistentErr = { + message: messageBadInsertAlreadyExistent, + code: grpc.status.ALREADY_EXISTS, + } as mock.MockError; + spannerMock.putStatementResult( + insertAlreadyExistentDataSql, + mock.StatementResult.error(insertAlreadyExistentErr) + ); + }); + + afterEach(async () => { + traceExporter.reset(); + provider.shutdown(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); + + function assertRunBadSyntaxExpectations() { + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + spans.sort((spanA, spanB) => { + return spanA.startTime < spanB.startTime; + }); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Database.runStream', + 'CloudSpanner.Database.run', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // We need to ensure a strict relationship between the spans. + // runSpan -------------------| + // |-runStream ----------| + const runStreamSpan = spans[spans.length - 2]; + const runSpan = spans[spans.length - 1]; + assert.ok( + runSpan.spanContext().traceId, + 'Expected that runSpan has a defined traceId' + ); + assert.ok( + runStreamSpan.spanContext().traceId, + 'Expected that runStreamSpan has a defined traceId' + ); + assert.deepStrictEqual( + runStreamSpan.parentSpanId, + runSpan.spanContext().spanId, + `Expected that runSpan(spanId=${runSpan.spanContext().spanId}) is the parent to runStreamSpan(parentSpanId=${runStreamSpan.parentSpanId})` + ); + assert.deepStrictEqual( + runSpan.spanContext().traceId, + runStreamSpan.spanContext().traceId, + 'Expected that both spans share a traceId' + ); + assert.ok( + runStreamSpan.spanContext().spanId, + 'Expected that runStreamSpan has a defined spanId' + ); + assert.ok( + runSpan.spanContext().spanId, + 'Expected that runSpan has a defined spanId' + ); + + const databaseBatchCreateSessionsSpan = spans[0]; + assert.strictEqual( + databaseBatchCreateSessionsSpan.name, + 'CloudSpanner.Database.batchCreateSessions' + ); + const sessionPoolCreateSessionsSpan = spans[1]; + assert.strictEqual( + sessionPoolCreateSessionsSpan.name, + 'CloudSpanner.SessionPool.createSessions' + ); + assert.ok( + sessionPoolCreateSessionsSpan.spanContext().traceId, + 'Expecting a defined sessionPoolCreateSessions traceId' + ); + assert.deepStrictEqual( + sessionPoolCreateSessionsSpan.spanContext().traceId, + databaseBatchCreateSessionsSpan.spanContext().traceId, + 'Expected the same traceId' + ); + assert.deepStrictEqual( + databaseBatchCreateSessionsSpan.parentSpanId, + sessionPoolCreateSessionsSpan.spanContext().spanId, + 'Expected that sessionPool.createSessions is the parent to db.batchCreassionSessions' + ); + + // Assert that despite all being exported, SessionPool.createSessions + // is not in the same trace as runStream, createSessions is invoked at + // Spanner Client instantiation, thus before database.run is invoked. + assert.notEqual( + sessionPoolCreateSessionsSpan.spanContext().traceId, + runSpan.spanContext().traceId, + 'Did not expect the same traceId' + ); + + // Ensure that the last span has an error. + assert.deepStrictEqual( + runStreamSpan.status.code, + SpanStatusCode.ERROR, + 'Expected an error status' + ); + + const want = '3 INVALID_ARGUMENT: ' + messageBadSelect1p; + assert.deepStrictEqual( + runStreamSpan.status.message, + want, + `Mismatched status message:\n\n\tGot: '${runStreamSpan.status.message}'\n\tWant: '${want}'` + ); + + // Finally check for the collective expected event names. + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Starting stream', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + } + + it('database.run with bad syntax: async/await', async () => { + const instance = spanner.instance('instance'); + const database = instance.database('database'); + + try { + const [rows] = await database.run(selectSql1p); + } catch (e) { + // This catch is meant to ensure that we + // can assert on the generated spans. + } finally { + provider.forceFlush(); + } + + assertRunBadSyntaxExpectations(); + }); + + it('database.run with bad syntax: callback', done => { + const instance = spanner.instance('instance'); + const database = instance.database('database'); + + database.run(selectSql1p, (err, rows) => { + assert.ok(err); + provider.forceFlush(); + assertRunBadSyntaxExpectations(); + done(); + }); + }); + + function assertDatabaseRunPlusAwaitTransactionForAlreadyExistentData() { + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + spans.sort((spanA, spanB) => { + return spanA.startTime < spanB.startTime; + }); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Database.runTransactionAsync', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + const spanSnapshotRun = spans[3]; + assert.strictEqual(spanSnapshotRun.name, 'CloudSpanner.Snapshot.run'); + const wantSpanErr = '6 ALREADY_EXISTS: ' + messageBadInsertAlreadyExistent; + assert.deepStrictEqual( + spanSnapshotRun.status.code, + SpanStatusCode.ERROR, + 'Unexpected status code' + ); + assert.deepStrictEqual( + spanSnapshotRun.status.message, + wantSpanErr, + 'Unexpexcted error message' + ); + + const databaseBatchCreateSessionsSpan = spans[0]; + assert.strictEqual( + databaseBatchCreateSessionsSpan.name, + 'CloudSpanner.Database.batchCreateSessions' + ); + const sessionPoolCreateSessionsSpan = spans[1]; + assert.strictEqual( + sessionPoolCreateSessionsSpan.name, + 'CloudSpanner.SessionPool.createSessions' + ); + assert.ok( + sessionPoolCreateSessionsSpan.spanContext().traceId, + 'Expecting a defined sessionPoolCreateSessions traceId' + ); + assert.deepStrictEqual( + sessionPoolCreateSessionsSpan.spanContext().traceId, + databaseBatchCreateSessionsSpan.spanContext().traceId, + 'Expected the same traceId' + ); + assert.deepStrictEqual( + databaseBatchCreateSessionsSpan.parentSpanId, + sessionPoolCreateSessionsSpan.spanContext().spanId, + 'Expected that sessionPool.createSessions is the parent to db.batchCreassionSessions' + ); + + // We need to ensure a strict relationship between the spans. + // |-Database.runTransactionAsync |-------------------------------------| + // |-Snapshot.run |------------------------| + // |-Snapshot.runStream |---------------------| + // |-Transaction.commit |--------| + // |-Snapshot.begin |------| + // |-Snapshot.commit |-----| + const spanDatabaseRunTransactionAsync = spans[spans.length - 1]; + assert.deepStrictEqual( + spanDatabaseRunTransactionAsync.name, + 'CloudSpanner.Database.runTransactionAsync', + `${actualSpanNames}` + ); + const spanTransactionCommit0 = spans[spans.length - 2]; + assert.strictEqual( + spanTransactionCommit0.name, + 'CloudSpanner.Transaction.commit' + ); + assert.deepStrictEqual( + spanTransactionCommit0.parentSpanId, + spanDatabaseRunTransactionAsync.spanContext().spanId, + 'Expected that Database.runTransaction is the parent to Transaction.commmit' + ); + + assert.deepStrictEqual( + spanSnapshotRun.parentSpanId, + spanDatabaseRunTransactionAsync.spanContext().spanId, + 'Expected that Database.runTransaction is the parent to Snapshot.run' + ); + + // Assert that despite all being exported, SessionPool.createSessions + // is not in the same trace as runStream, createSessions is invoked at + // Spanner Client instantiation, thus before database.run is invoked. + assert.notEqual( + sessionPoolCreateSessionsSpan.spanContext().traceId, + spanDatabaseRunTransactionAsync.spanContext().traceId, + 'Did not expect the same traceId' + ); + + // Finally check for the collective expected event names. + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Starting stream', + 'Stream broken. Safe to retry', + 'Begin Transaction', + 'Transaction Creation Done', + 'Begin Transaction', + 'Transaction Creation Done', + 'Starting Commit', + 'Commit Done', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + 'exception', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + } + + it('database.runTransaction with async/await for INSERT with existent data + transaction.commit', async () => { + const instance = spanner.instance('instance'); + const database = instance.database('database'); + + const update = { + sql: insertAlreadyExistentDataSql, + }; + + try { + await database.runTransactionAsync(async transaction => { + try { + await transaction!.run(update); + } finally { + await transaction!.commit(); + } + }); + } catch (e) { + assert.strictEqual( + (e as grpc.ServiceError).code, + grpc.status.ALREADY_EXISTS + ); + } + + provider.forceFlush(); + assertDatabaseRunPlusAwaitTransactionForAlreadyExistentData(); + }); +}); + +describe('Traces for ExecuteStream broken stream retries', () => { + let sandbox: sinon.SinonSandbox; + const selectSql = 'SELECT NUM, NAME FROM NUMBERS'; + const select1 = 'SELECT 1'; + const invalidSql = 'SELECT * FROM FOO'; + const insertSql = "INSERT INTO NUMBER (NUM, NAME) VALUES (4, 'Four')"; + const selectAllTypes = 'SELECT * FROM TABLE_WITH_ALL_TYPES'; + const insertSqlForAllTypes = `INSERT INTO TABLE_WITH_ALL_TYPES( + COLBOOL, COLINT64, COLFLOAT64, COLNUMERIC, COLSTRING, COLBYTES, COLJSON, COLDATE, COLTIMESTAMP + ) VALUES ( + @bool, @int64, @float64, @numeric, @string, @bytes, @json, @date, @timestamp + )`; + const updateSql = "UPDATE NUMBER SET NAME='Unknown' WHERE NUM IN (5, 6)"; + const fooNotFoundErr = Object.assign(new Error('Table FOO not found'), { + code: grpc.status.NOT_FOUND, + }); + const server = new grpc.Server(); + const spannerMock = mock.createMockSpanner(server); + mockInstanceAdmin.createMockInstanceAdmin(server); + mockDatabaseAdmin.createMockDatabaseAdmin(server); + let port: number; + let spanner: Spanner; + let instance: Instance; + let dbCounter = 1; + + const traceExporter = new InMemorySpanExporter(); + const tracerProvider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: traceExporter, + }); + tracerProvider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + function newTestDatabase(): Database { + return instance.database(`database-${dbCounter++}`); + } + + before(async () => { + sandbox = sinon.createSandbox(); + port = await new Promise((resolve, reject) => { + server.bindAsync( + '0.0.0.0:0', + grpc.ServerCredentials.createInsecure(), + (err, assignedPort) => { + if (err) { + reject(err); + } else { + resolve(assignedPort); + } + } + ); + }); + spannerMock.putStatementResult( + selectSql, + mock.StatementResult.resultSet(mock.createSimpleResultSet()) + ); + spannerMock.putStatementResult( + select1, + mock.StatementResult.resultSet(mock.createSelect1ResultSet()) + ); + spannerMock.putStatementResult( + selectAllTypes, + mock.StatementResult.resultSet(mock.createResultSetWithAllDataTypes()) + ); + spannerMock.putStatementResult( + invalidSql, + mock.StatementResult.error(fooNotFoundErr) + ); + spannerMock.putStatementResult( + insertSql, + mock.StatementResult.updateCount(1) + ); + spannerMock.putStatementResult( + insertSqlForAllTypes, + mock.StatementResult.updateCount(1) + ); + spannerMock.putStatementResult( + updateSql, + mock.StatementResult.updateCount(2) + ); + + const observabilityOptions: typeof ObservabilityOptions = { + tracerProvider: tracerProvider, + enableExtendedTracing: true, + }; + + spanner = new Spanner({ + servicePath: 'localhost', + port, + sslCreds: grpc.credentials.createInsecure(), + observabilityOptions: observabilityOptions, + }); + // Gets a reference to a Cloud Spanner instance and database + instance = spanner.instance('instance'); + }); + + after(() => { + spanner.close(); + server.tryShutdown(() => {}); + sandbox.restore(); + }); + + beforeEach(async () => { + spannerMock.resetRequests(); + spannerMock.removeExecutionTimes(); + await tracerProvider.forceFlush(); + await traceExporter.forceFlush(); + await traceExporter.reset(); + }); + + describe('PartialResultStream', () => { + const streamIndexes = [1, 2]; + streamIndexes.forEach(index => { + it('should retry UNAVAILABLE during streaming', async () => { + const database = newTestDatabase(); + const err = { + message: 'Temporary unavailable', + code: grpc.status.UNAVAILABLE, + streamIndex: index, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + const [rows] = await database.run(selectSql); + assert.strictEqual(rows.length, 3); + await database.close(); + }); + + it('should retry UNAVAILABLE during streaming with txn ID from inline begin response', async () => { + const err = { + message: 'Temporary unavailable', + code: grpc.status.UNAVAILABLE, + streamIndex: index, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + const database = newTestDatabase(); + + await database.runTransactionAsync(async tx => { + await tx.run(selectSql); + await tx.commit(); + }); + await database.close(); + + const requests = spannerMock + .getRequests() + .filter(val => (val as v1.ExecuteSqlRequest).sql) + .map(req => req as v1.ExecuteSqlRequest); + assert.strictEqual(requests.length, 2); + assert.ok( + requests[0].transaction?.begin!.readWrite, + 'inline txn is not set.' + ); + assert.ok( + requests[1].transaction!.id, + 'Transaction ID is not used for retries.' + ); + assert.ok( + requests[1].resumeToken, + 'Resume token is not set for the retried' + ); + }); + + it('should retry UNAVAILABLE during streaming with txn ID from inline begin response with parallel queries', async () => { + const err = { + message: 'Temporary unavailable', + code: grpc.status.UNAVAILABLE, + streamIndex: index, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + const database = newTestDatabase(); + + await database.runTransactionAsync(async tx => { + const [rows1, rows2] = await Promise.all([ + tx!.run(selectSql), + tx!.run(selectSql), + ]); + assert.equal(rows1.length, 3); + assert.equal(rows2.length, 3); + await tx.commit(); + }); + await database.close(); + + const requests = spannerMock + .getRequests() + .filter(val => (val as v1.ExecuteSqlRequest).sql) + .map(req => req as v1.ExecuteSqlRequest); + assert.strictEqual(requests.length, 3); + assert.ok( + requests[0].transaction?.begin!.readWrite, + 'inline txn is not set.' + ); + assert.ok( + requests[1].transaction!.id, + 'Transaction ID is not used for retries.' + ); + assert.ok( + requests[1].resumeToken, + 'Resume token is not set for the retried' + ); + const commitRequests = spannerMock + .getRequests() + .filter(val => (val as v1.CommitRequest).mutations) + .map(req => req as v1.CommitRequest); + assert.strictEqual(commitRequests.length, 1); + assert.deepStrictEqual( + requests[1].transaction!.id, + requests[2].transaction!.id + ); + assert.deepStrictEqual( + requests[1].transaction!.id, + commitRequests[0].transactionId + ); + const beginTxnRequests = spannerMock + .getRequests() + .filter(val => (val as v1.BeginTransactionRequest).options?.readWrite) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequests.length, 0); + }); + + it('should not retry non-retryable error during streaming', async () => { + const database = newTestDatabase(); + const err = { + message: 'Test error', + streamIndex: index, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + try { + await database.run(selectSql); + assert.fail('missing expected error'); + } catch (e) { + assert.strictEqual( + (e as grpc.ServiceError).message, + '2 UNKNOWN: Test error' + ); + } + await database.close(); + }); + + it('should retry UNAVAILABLE during streaming with a callback', done => { + const database = newTestDatabase(); + const err = { + message: 'Temporary unavailable', + code: grpc.status.UNAVAILABLE, + streamIndex: index, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + database.run(selectSql, (err, rows) => { + assert.ifError(err); + assert.strictEqual(rows!.length, 3); + database + .close() + .catch(done) + .then(() => done()); + }); + }); + + it('should not retry non-retryable error during streaming with a callback', done => { + const database = newTestDatabase(); + const err = { + message: 'Non-retryable error', + streamIndex: index, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + database.run(selectSql, err => { + assert.ok(err, 'Missing expected error'); + assert.strictEqual(err!.message, '2 UNKNOWN: Non-retryable error'); + database + .close() + .catch(done) + .then(() => done()); + }); + }); + + it('should emit non-retryable error during streaming to stream', done => { + const database = newTestDatabase(); + + const err = { + message: 'Non-retryable error', + streamIndex: index, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + const receivedRows: Row[] = []; + database + .runStream(selectSql) + // We will receive data for the partial result sets that are + // returned before the error occurs. + .on('data', row => { + receivedRows.push(row); + }) + .on('end', () => { + assert.fail('Missing expected error'); + }) + .on('error', err => { + assert.strictEqual(err.message, '2 UNKNOWN: Non-retryable error'); + database + .close() + .catch(done) + .then(() => { + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + spans.sort((spanA, spanB) => { + return spanA.startTime < spanB.startTime; + }); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Database.runStream', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Finally check for the collective expected event names. + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Starting stream', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + 'Transaction Creation Done', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + }); + }); + + it('should retry UNAVAILABLE from executeStreamingSql with multiple errors during streaming', async () => { + const database = newTestDatabase(); + const errors: mock.MockError[] = []; + for (const index of [0, 1, 1, 2, 2]) { + errors.push({ + message: 'Temporary unavailable', + code: grpc.status.UNAVAILABLE, + streamIndex: index, + } as mock.MockError); + } + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofErrors(errors) + ); + const [rows] = await database.run(selectSql); + assert.strictEqual(rows.length, 3); + await database.close(); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + spans.sort((spanA, spanB) => { + return spanA.startTime < spanB.startTime; + }); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Database.runStream', + 'CloudSpanner.Database.run', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Finally check for the collective expected event names. + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Starting stream', + 'Re-attempting start stream', + 'Resuming stream', + 'Resuming stream', + 'Resuming stream', + 'Resuming stream', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + }); + + it('should retry UNAVAILABLE on update', async () => { + const database = newTestDatabase(); + const err = { + message: 'Temporary unavailable', + code: grpc.status.UNAVAILABLE, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + + await database.runTransactionAsync(async tx => { + const [updateCount] = await tx!.runUpdate(insertSql); + assert.strictEqual(updateCount, 1); + await tx!.commit(); + }); + await database.close(); + + // The span for a successful invocation of database.runTransaction + // can only be ended after the calling function is completed. + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Dml.runUpdate', + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Database.runTransactionAsync', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Finally check for the collective expected event names. + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Starting stream', + 'Re-attempting start stream', + 'Begin Transaction', + 'Transaction Creation Done', + 'Starting Commit', + 'Commit Done', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + }); + + it('should not retry non-retryable error on update', async () => { + const database = newTestDatabase(); + const err = { + message: 'Permanent error', + // We need to specify a non-retryable error code to prevent the entire + // transaction to retry. Not specifying an error code, will result in + // an error with code UNKNOWN, which again will retry the transaction. + code: grpc.status.INVALID_ARGUMENT, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + let attempts = 0; + + await database.runTransactionAsync(async tx => { + attempts++; + await tx!.runUpdate(insertSql, err => { + assert.ok(err, 'Missing expected error'); + assert.strictEqual(err!.code, grpc.status.INVALID_ARGUMENT); + assert.strictEqual(attempts, 1); + tx! + .commit() + .then(() => { + database.close().catch(assert.ifError); + }) + .catch(assert.ifError); + }); + }); + assert.deepStrictEqual( + attempts, + 1, + 'runTransactionAsync.attempt must be 1' + ); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Database.runTransactionAsync', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + }); +}); diff --git a/observability-test/transaction.ts b/observability-test/transaction.ts index 550f85fc3..233728784 100644 --- a/observability-test/transaction.ts +++ b/observability-test/transaction.ts @@ -479,7 +479,7 @@ describe('Transaction', () => { `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['exception']; + const expectedEventNames = ['Starting stream', 'exception']; assert.deepStrictEqual( actualEventNames, expectedEventNames, diff --git a/src/database.ts b/src/database.ts index 4db9fdfb2..3d8321355 100644 --- a/src/database.ts +++ b/src/database.ts @@ -1130,6 +1130,8 @@ class Database extends common.GrpcServiceObject { } catch (e) { setSpanErrorAndException(span, e as Error); this.emit('error', e); + } finally { + span.end(); } }); } @@ -2100,10 +2102,8 @@ class Database extends common.GrpcServiceObject { }); session!.lastError = err; this.pool_.release(session!); - this.getSnapshot(options, (err, snapshot) => { - span.end(); - callback!(err, snapshot); - }); + span.end(); + this.getSnapshot(options, callback!); } else { span.addEvent('Using Session', {'session.id': session?.id}); this.pool_.release(session!); @@ -2815,6 +2815,7 @@ class Database extends common.GrpcServiceObject { this.runStream(query, options) .on('error', err => { setSpanError(span, err); + span.end(); callback!(err as grpc.ServiceError, rows, stats, metadata); }) .on('response', response => { @@ -3054,7 +3055,6 @@ class Database extends common.GrpcServiceObject { let dataStream = snapshot.runStream(query); const endListener = () => { - span.end(); snapshot.end(); }; dataStream @@ -3080,6 +3080,7 @@ class Database extends common.GrpcServiceObject { dataStream.removeListener('end', endListener); dataStream.end(); snapshot.end(); + span.end(); // Create a new data stream and add it to the end user stream. dataStream = this.runStream(query, options); dataStream.pipe(proxyStream); @@ -3222,8 +3223,8 @@ class Database extends common.GrpcServiceObject { span.addEvent('No session available', { 'session.id': session?.id, }); - this.runTransaction(options, runFn!); span.end(); + this.runTransaction(options, runFn!); return; } @@ -3242,27 +3243,19 @@ class Database extends common.GrpcServiceObject { } const release = () => { - span.end(); this.pool_.release(session!); + span.end(); }; const runner = new TransactionRunner( session!, transaction!, - (err, resp) => { - if (err) { - setSpanError(span, err!); - } - span.end(); - runFn!(err, resp); - }, + runFn!, options ); runner.run().then(release, err => { - if (err) { - setSpanError(span, err!); - } + setSpanError(span, err!); if (isSessionNotFoundError(err)) { span.addEvent('No session available', { @@ -3271,9 +3264,6 @@ class Database extends common.GrpcServiceObject { release(); this.runTransaction(options, runFn!); } else { - if (!err) { - span.addEvent('Using Session', {'session.id': session!.id}); - } setImmediate(runFn!, err); release(); } @@ -3531,6 +3521,7 @@ class Database extends common.GrpcServiceObject { // Remove the current data stream from the end user stream. dataStream.unpipe(proxyStream); dataStream.end(); + span.end(); // Create a new stream and add it to the end user stream. dataStream = this.batchWriteAtLeastOnce( mutationGroups, @@ -3635,8 +3626,8 @@ class Database extends common.GrpcServiceObject { span.addEvent('No session available', { 'session.id': session?.id, }); - this.writeAtLeastOnce(mutations, options, cb!); span.end(); + this.writeAtLeastOnce(mutations, options, cb!); return; } if (err) { diff --git a/src/index.ts b/src/index.ts index db3110568..0bbdccd01 100644 --- a/src/index.ts +++ b/src/index.ts @@ -80,7 +80,10 @@ import { import grpcGcpModule = require('grpc-gcp'); const grpcGcp = grpcGcpModule(grpc); import * as v1 from './v1'; -import {ObservabilityOptions} from './instrument'; +import { + ObservabilityOptions, + ensureInitialContextManagerSet, +} from './instrument'; // eslint-disable-next-line @typescript-eslint/no-var-requires const gcpApiConfig = require('./spanner_grpc_config.json'); @@ -370,6 +373,7 @@ class Spanner extends GrpcService { }; this.directedReadOptions = directedReadOptions; this._observabilityOptions = options.observabilityOptions; + ensureInitialContextManagerSet(); } /** diff --git a/src/instrument.ts b/src/instrument.ts index 99b260bf4..15fe7b8e3 100644 --- a/src/instrument.ts +++ b/src/instrument.ts @@ -89,7 +89,6 @@ interface traceConfig { tableName?: string; dbName?: string; opts?: ObservabilityOptions; - that?: Object; } const SPAN_NAMESPACE_PREFIX = 'CloudSpanner'; // TODO: discuss & standardize this prefix. @@ -117,6 +116,7 @@ function ensureInitialContextManagerSet() { context.setGlobalContextManager(contextManager); } } +export {ensureInitialContextManagerSet}; /** * startTrace begins an active span in the current active context @@ -136,8 +136,6 @@ export function startTrace( config = {} as traceConfig; } - ensureInitialContextManagerSet(); - return getTracer(config.opts?.tracerProvider).startActiveSpan( SPAN_NAMESPACE_PREFIX + '.' + spanNameSuffix, {kind: SpanKind.CLIENT}, @@ -165,11 +163,15 @@ export function startTrace( } } - if (config.that) { - const fn = cb.bind(config.that); - return fn(span); - } else { + // If at all the invoked function throws an exception, + // record the exception and then end this span. + try { return cb(span); + } catch (e) { + setSpanErrorAndException(span, e as Error); + span.end(); + // Finally re-throw the exception. + throw e; } } ); diff --git a/src/table.ts b/src/table.ts index 227f8d107..8f87ba00b 100644 --- a/src/table.ts +++ b/src/table.ts @@ -35,6 +35,7 @@ import { ObservabilityOptions, startTrace, setSpanError, + setSpanErrorAndException, traceConfig, } from './instrument'; diff --git a/src/transaction.ts b/src/transaction.ts index e7993e74c..fa1f10814 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -462,13 +462,11 @@ export class Snapshot extends EventEmitter { ) => { if (err) { setSpanError(span, err); - span.end(); - callback!(err, resp); - return; + } else { + this._update(resp); } - this._update(resp); span.end(); - callback!(null, resp); + callback!(err, resp); } ); }); @@ -708,26 +706,43 @@ export class Snapshot extends EventEmitter { addLeaderAwareRoutingHeader(headers); } - const makeRequest = (resumeToken?: ResumeToken): Readable => { - if (this.id && transaction.begin) { - delete transaction.begin; - transaction.id = this.id; - } - return this.requestStream({ - client: 'SpannerClient', - method: 'streamingRead', - reqOpts: Object.assign({}, reqOpts, {resumeToken}), - gaxOpts: gaxOptions, - headers: headers, - }); - }; - const traceConfig = { tableName: table, opts: this._observabilityOptions, dbName: this._dbName!, }; return startTrace('Snapshot.createReadStream', traceConfig, span => { + let attempt = 0; + const makeRequest = (resumeToken?: ResumeToken): Readable => { + if (this.id && transaction.begin) { + delete transaction.begin; + transaction.id = this.id; + } + + attempt++; + + if (!resumeToken) { + if (attempt === 1) { + span.addEvent('Starting stream'); + } else { + span.addEvent('Re-attempting start stream', {attempt: attempt}); + } + } else { + span.addEvent('Resuming stream', { + resume_token: resumeToken!.toString(), + attempt: attempt, + }); + } + + return this.requestStream({ + client: 'SpannerClient', + method: 'streamingRead', + reqOpts: Object.assign({}, reqOpts, {resumeToken}), + gaxOpts: gaxOptions, + headers: headers, + }); + }; + const resultStream = partialResultStream( this._wrapWithIdWaiter(makeRequest), { @@ -744,19 +759,19 @@ export class Snapshot extends EventEmitter { } }) .on('error', err => { - const isServiceError = - err && typeof err === 'object' && 'code' in err; - if ( - !this.id && - this._useInRunner && - !( - isServiceError && - (err as grpc.ServiceError).code === grpc.status.ABORTED - ) - ) { + setSpanError(span, err); + const wasAborted = isErrorAborted(err); + if (!this.id && this._useInRunner && !wasAborted) { + // TODO: resolve https://github.com/googleapis/nodejs-spanner/issues/2170 this.begin(); + } else { + if (wasAborted) { + span.addEvent('Stream broken. Not safe to retry', { + 'transaction.id': this.id?.toString(), + }); + } } - setSpanError(span, err); + span.end(); }) .on('end', err => { if (err) { @@ -1281,7 +1296,23 @@ export class Snapshot extends EventEmitter { ...query, }; return startTrace('Snapshot.runStream', traceConfig, span => { + let attempt = 0; const makeRequest = (resumeToken?: ResumeToken): Readable => { + attempt++; + + if (!resumeToken) { + if (attempt === 1) { + span.addEvent('Starting stream'); + } else { + span.addEvent('Re-attempting start stream', {attempt: attempt}); + } + } else { + span.addEvent('Resuming stream', { + resume_token: resumeToken!.toString(), + attempt: attempt, + }); + } + if (!reqOpts || (this.id && !reqOpts.transaction.id)) { try { sanitizeRequest(); @@ -1320,18 +1351,19 @@ export class Snapshot extends EventEmitter { }) .on('error', err => { setSpanError(span, err as Error); - const isServiceError = - err && typeof err === 'object' && 'code' in err; - if ( - !this.id && - this._useInRunner && - !( - isServiceError && - (err as grpc.ServiceError).code === grpc.status.ABORTED - ) - ) { + const wasAborted = isErrorAborted(err); + if (!this.id && this._useInRunner && !wasAborted) { + span.addEvent('Stream broken. Safe to retry'); + // TODO: resolve https://github.com/googleapis/nodejs-spanner/issues/2170 this.begin(); + } else { + if (wasAborted) { + span.addEvent('Stream broken. Not safe to retry', { + 'transaction.id': this.id?.toString(), + }); + } } + span.end(); }) .on('end', err => { if (err) { @@ -2112,15 +2144,22 @@ export class Transaction extends Dml { } else if (!this._useInRunner) { reqOpts.singleUseTransaction = this._options; } else { - this.begin().then(() => { - this.commit(options, (err, resp) => { - if (err) { - setSpanError(span, err); - } + this.begin().then( + () => { + this.commit(options, (err, resp) => { + if (err) { + setSpanError(span, err); + } + span.end(); + callback(err, resp); + }); + }, + err => { + setSpanError(span, err); span.end(); - callback(err, resp); - }); - }, callback); + callback(err, null); + } + ); return; } @@ -2985,6 +3024,15 @@ export class PartitionedDml extends Dml { } } +function isErrorAborted(err): boolean { + return ( + err && + typeof err === 'object' && + 'code' in err && + (err as grpc.ServiceError).code === grpc.status.ABORTED + ); +} + /*! Developer Documentation * * All async methods (except for streams) return a Promise in the event diff --git a/test/spanner.ts b/test/spanner.ts index 032d18493..d324ed911 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -5062,6 +5062,7 @@ describe('Spanner with mock server', () => { 'Requesting 25 sessions', 'Creating 25 sessions', 'Requested for 25 sessions returned 25', + 'Starting stream', 'Acquiring session', 'Waiting for a session to become available', 'Acquired session',