From 828b962dfa28751b83cfbbbbd1892647a2a0ca27 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Sat, 12 Oct 2024 02:24:12 -0700 Subject: [PATCH 01/18] feat(observability): fix bugs found from product review + negative cases This change adds recording of retry span annotations, catching cases in which exceptions where thrown but spans were not ended while testing out and visually confirming the results. --- observability-test/spanner.ts | 1074 ++++++++++++++++++++++++++++- observability-test/transaction.ts | 2 +- src/database.ts | 85 ++- src/index.ts | 6 +- src/instrument.ts | 22 +- src/table.ts | 1 + src/transaction.ts | 169 +++-- test/spanner.ts | 1 + 8 files changed, 1228 insertions(+), 132 deletions(-) diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index fca1b30f7..96abffd69 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -20,9 +20,14 @@ import {google} from '../protos/protos'; import {Database, Instance, Spanner} from '../src'; import {MutationSet} from '../src/transaction'; import protobuf = google.spanner.v1; +import v1 = google.spanner.v1; +import PartialResultSet = google.spanner.v1.PartialResultSet; import * as mock from '../test/mockserver/mockspanner'; import * as mockInstanceAdmin from '../test/mockserver/mockinstanceadmin'; import * as mockDatabaseAdmin from '../test/mockserver/mockdatabaseadmin'; +import * as sinon from 'sinon'; +import {Row} from '../src/partial-result-stream'; +import {Json} from '../src/codec'; const { AlwaysOnSampler, NodeTracerProvider, @@ -30,6 +35,7 @@ const { } = require('@opentelemetry/sdk-trace-node'); // eslint-disable-next-line n/no-extraneous-require const {SimpleSpanProcessor} = require('@opentelemetry/sdk-trace-base'); +const {SpanStatusCode} = require('@opentelemetry/api'); const { disableContextAndManager, generateWithAllSpansHaveDBName, @@ -246,6 +252,7 @@ describe('EndToEnd', () => { 'Acquiring session', 'Cache hit: has usable session', 'Acquired session', + 'Starting stream', ]; assert.deepStrictEqual( actualEventNames, @@ -335,6 +342,7 @@ describe('EndToEnd', () => { ); const expectedEventNames = [ + 'Starting stream', 'Acquiring session', 'Cache hit: has usable session', 'Acquired session', @@ -406,6 +414,7 @@ describe('EndToEnd', () => { ); const expectedEventNames = [ + 'Starting stream', 'Acquiring session', 'Cache hit: has usable session', 'Acquired session', @@ -422,49 +431,52 @@ describe('EndToEnd', () => { const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( database.formattedName_ ); - database.runTransaction((err, transaction) => { + + database.runTransaction(async (err, transaction) => { assert.ifError(err); - transaction!.run('SELECT 1', (err, rows) => { - assert.ifError(err); + const [rows] = await transaction!.run('SELECT 1'); + await transaction!.commit(); - traceExporter.forceFlush(); - const spans = traceExporter.getFinishedSpans(); - withAllSpansHaveDBName(spans); + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + withAllSpansHaveDBName(spans); - const actualEventNames: string[] = []; - const actualSpanNames: string[] = []; - spans.forEach(span => { - actualSpanNames.push(span.name); - span.events.forEach(event => { - actualEventNames.push(event.name); - }); + const actualEventNames: string[] = []; + const actualSpanNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); }); + }); - const expectedSpanNames = [ - 'CloudSpanner.Database.runTransaction', - 'CloudSpanner.Snapshot.runStream', - 'CloudSpanner.Snapshot.run', - ]; - assert.deepStrictEqual( - actualSpanNames, - expectedSpanNames, - `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` - ); - - const expectedEventNames = [ - 'Acquiring session', - 'Cache hit: has usable session', - 'Acquired session', - 'Transaction Creation Done', - ]; - assert.deepStrictEqual( - actualEventNames, - expectedEventNames, - `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` - ); + const expectedSpanNames = [ + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Database.runTransaction', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); - done(); - }); + const expectedEventNames = [ + 'Starting stream', + 'Transaction Creation Done', + 'Starting Commit', + 'Commit Done', + 'Acquiring session', + 'Cache hit: has usable session', + 'Acquired session', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + done(); }); }); @@ -821,6 +833,7 @@ describe('ObservabilityOptions injection and propagation', async () => { 'Cache hit: has usable session', 'Acquired session', 'Using Session', + 'Starting stream', 'Transaction Creation Done', ]; assert.strictEqual( @@ -849,7 +862,6 @@ describe('ObservabilityOptions injection and propagation', async () => { const spans = traceExporter.getFinishedSpans(); withAllSpansHaveDBName(spans); - assert.strictEqual(spans.length, 4); const actualSpanNames: string[] = []; const actualEventNames: string[] = []; @@ -875,6 +887,7 @@ describe('ObservabilityOptions injection and propagation', async () => { const expectedEventNames = [ 'Begin Transaction', 'Transaction Creation Done', + 'Starting stream', ]; assert.deepStrictEqual( actualEventNames.every(value => expectedEventNames.includes(value)), @@ -928,6 +941,7 @@ describe('ObservabilityOptions injection and propagation', async () => { 'Cache hit: has usable session', 'Acquired session', 'Using Session', + 'Starting stream', ]; assert.deepStrictEqual( actualEventNames, @@ -982,6 +996,7 @@ describe('ObservabilityOptions injection and propagation', async () => { const expectedEventNames = [ 'Begin Transaction', 'Transaction Creation Done', + 'Starting stream', ]; assert.strictEqual( actualEventNames.every(value => @@ -1091,6 +1106,7 @@ describe('ObservabilityOptions injection and propagation', async () => { 'Requesting 25 sessions', 'Creating 25 sessions', 'Requested for 25 sessions returned 25', + 'Starting stream', 'Acquiring session', 'Waiting for a session to become available', 'Acquired session', @@ -1238,6 +1254,7 @@ describe('E2E traces with async/await', async () => { 'Requesting 25 sessions', 'Creating 25 sessions', 'Requested for 25 sessions returned 25', + 'Starting stream', 'Acquiring session', 'Waiting for a session to become available', 'Acquired session', @@ -1298,3 +1315,984 @@ describe('E2E traces with async/await', async () => { }); }); }); + +describe('Negative cases', async () => { + let server: grpc.Server; + let spanner: Spanner; + let database: Database; + let spannerMock: mock.MockSpanner; + let traceExporter: typeof InMemorySpanExporter; + let provider: typeof TracerProvider; + let observabilityOptions: typeof ObservabilityOptions; + + const selectSql1p = 'SELECT 1p'; + const messageBadSelect1p = `Missing whitespace between literal and alias [at 1:9] +SELECT 1p + ^`; + const insertAlreadyExistentDataSql = + "INSERT INTO Singers(firstName, SingerId) VALUES('Foo', 1)"; + const messageBadInsertAlreadyExistent = + 'Failed to insert row with primary key ({pk#SingerId:1}) due to previously existing row'; + + beforeEach(async () => { + traceExporter = new InMemorySpanExporter(); + provider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: traceExporter, + }); + provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + observabilityOptions = { + tracerProvider: provider, + enableExtendedTracing: true, + }; + const setupResult = await setup(observabilityOptions); + spanner = setupResult.spanner; + server = setupResult.server; + spannerMock = setupResult.spannerMock; + + const serverErr = { + message: messageBadSelect1p, + code: grpc.status.INVALID_ARGUMENT, + } as mock.MockError; + spannerMock.putStatementResult( + selectSql1p, + mock.StatementResult.error(serverErr) + ); + + const insertAlreadyExistentErr = { + message: messageBadInsertAlreadyExistent, + code: grpc.status.ALREADY_EXISTS, + } as mock.MockError; + spannerMock.putStatementResult( + insertAlreadyExistentDataSql, + mock.StatementResult.error(insertAlreadyExistentErr) + ); + }); + + afterEach(async () => { + traceExporter.reset(); + provider.shutdown(); + spannerMock.resetRequests(); + spanner.close(); + server.tryShutdown(() => {}); + }); + + function assertRunBadSyntaxExpectations() { + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + spans.sort((spanA, spanB) => { + return spanA.startTime < spanB.startTime; + }); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Database.runStream', + 'CloudSpanner.Database.run', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // We need to ensure a strict relationship between the spans. + // runSpan -------------------| + // |-runStream ----------| + const runStreamSpan = spans[spans.length - 2]; + const runSpan = spans[spans.length - 1]; + assert.ok( + runSpan.spanContext().traceId, + 'Expected that runSpan has a defined traceId' + ); + assert.ok( + runStreamSpan.spanContext().traceId, + 'Expected that runStreamSpan has a defined traceId' + ); + assert.deepStrictEqual( + runStreamSpan.parentSpanId, + runSpan.spanContext().spanId, + `Expected that runSpan(spanId=${runSpan.spanContext().spanId}) is the parent to runStreamSpan(parentSpanId=${runStreamSpan.parentSpanId})` + ); + assert.deepStrictEqual( + runSpan.spanContext().traceId, + runStreamSpan.spanContext().traceId, + 'Expected that both spans share a traceId' + ); + assert.ok( + runStreamSpan.spanContext().spanId, + 'Expected that runStreamSpan has a defined spanId' + ); + assert.ok( + runSpan.spanContext().spanId, + 'Expected that runSpan has a defined spanId' + ); + + const databaseBatchCreateSessionsSpan = spans[0]; + assert.strictEqual( + databaseBatchCreateSessionsSpan.name, + 'CloudSpanner.Database.batchCreateSessions' + ); + const sessionPoolCreateSessionsSpan = spans[1]; + assert.strictEqual( + sessionPoolCreateSessionsSpan.name, + 'CloudSpanner.SessionPool.createSessions' + ); + assert.ok( + sessionPoolCreateSessionsSpan.spanContext().traceId, + 'Expecting a defined sessionPoolCreateSessions traceId' + ); + assert.deepStrictEqual( + sessionPoolCreateSessionsSpan.spanContext().traceId, + databaseBatchCreateSessionsSpan.spanContext().traceId, + 'Expected the same traceId' + ); + assert.deepStrictEqual( + databaseBatchCreateSessionsSpan.parentSpanId, + sessionPoolCreateSessionsSpan.spanContext().spanId, + 'Expected that sessionPool.createSessions is the parent to db.batchCreassionSessions' + ); + + // Assert that despite all being exported, SessionPool.createSessions + // is not in the same trace as runStream, createSessions is invoked at + // Spanner Client instantiation, thus before database.run is invoked. + assert.notEqual( + sessionPoolCreateSessionsSpan.spanContext().traceId, + runSpan.spanContext().traceId, + 'Did not expect the same traceId' + ); + + // Ensure that the last span has an error. + assert.deepStrictEqual( + runStreamSpan.status.code, + SpanStatusCode.ERROR, + 'Expected an error status' + ); + + const want = '3 INVALID_ARGUMENT: ' + messageBadSelect1p; + assert.deepStrictEqual( + runStreamSpan.status.message, + want, + `Mismatched status message:\n\n\tGot: '${runStreamSpan.status.message}'\n\tWant: '${want}'` + ); + + // Finally check for the collective expected event names. + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Starting stream', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + } + + it('database.run with bad syntax: async/await', async () => { + const instance = spanner.instance('instance'); + const database = instance.database('database'); + + try { + const [rows] = await database.run(selectSql1p); + } catch (e) { + // This catch is meant to ensure that we + // can assert on the generated spans. + } finally { + provider.forceFlush(); + } + + assertRunBadSyntaxExpectations(); + }); + + it('database.run with bad syntax: callback', done => { + const instance = spanner.instance('instance'); + const database = instance.database('database'); + + database.run(selectSql1p, (err, rows) => { + assert.ok(err); + provider.forceFlush(); + assertRunBadSyntaxExpectations(); + done(); + }); + }); + + function assertDatabaseRunPlusAwaitTransactionForAlreadyExistentData() { + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + spans.sort((spanA, spanB) => { + return spanA.startTime < spanB.startTime; + }); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Transaction.commit', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + const spanSnapshotRun = spans[2]; + assert.strictEqual(spanSnapshotRun.name, 'CloudSpanner.Snapshot.run'); + const wantSpanErr = '6 ALREADY_EXISTS: ' + messageBadInsertAlreadyExistent; + assert.deepStrictEqual( + spanSnapshotRun.status.code, + SpanStatusCode.ERROR, + 'Unexpected status code' + ); + assert.deepStrictEqual( + spanSnapshotRun.status.message, + wantSpanErr, + 'Unexpexcted error message' + ); + + const databaseBatchCreateSessionsSpan = spans[0]; + assert.strictEqual( + databaseBatchCreateSessionsSpan.name, + 'CloudSpanner.Database.batchCreateSessions' + ); + const sessionPoolCreateSessionsSpan = spans[1]; + assert.strictEqual( + sessionPoolCreateSessionsSpan.name, + 'CloudSpanner.SessionPool.createSessions' + ); + assert.ok( + sessionPoolCreateSessionsSpan.spanContext().traceId, + 'Expecting a defined sessionPoolCreateSessions traceId' + ); + assert.deepStrictEqual( + sessionPoolCreateSessionsSpan.spanContext().traceId, + databaseBatchCreateSessionsSpan.spanContext().traceId, + 'Expected the same traceId' + ); + assert.deepStrictEqual( + databaseBatchCreateSessionsSpan.parentSpanId, + sessionPoolCreateSessionsSpan.spanContext().spanId, + 'Expected that sessionPool.createSessions is the parent to db.batchCreassionSessions' + ); + + // TODO: Uncomment once we've merged Database.runTransactionAsync tracing. + // We need to ensure a strict relationship between the spans. + // |-Database.runTransactionAsync |-------------------------------------| + // |-Snapshot.run |------------------------| + // |-Snapshot.runStream |---------------------| + // |-Transaction.commit |--------| + // |-Snapshot.begin |------| + // |-Snapshot.commit |-----| + /* + const spanDatabaseRunTransactionAsync = spans[spans.length - 1]; + assert.deepStrictEqual( + spanDatabaseRunTransactionAsync.name, + 'CloudSpanner.Database.runTransactionAsync', + `${actualSpanNames}` + ); + const spanTransactionCommit0 = spans[spans.length - 1]; + assert.strictEqual( + spanTransactionCommit0.name, + 'CloudSpanner.Transaction.commit' + ); + assert.deepStrictEqual( + spanTransactionCommit0.parentSpanId, + spanDatabaseRunTransactionAsync.spanContext().spanId, + 'Expected that Database.runTransaction is the parent to Transaction.commmit' + ); + + assert.deepStrictEqual( + spanSnapshotRun.parentSpanId, + spanDatabaseRunTransactionAsync.spanContext().spanId, + 'Expected that Database.runTransaction is the parent to Snapshot.run' + ); + + // Assert that despite all being exported, SessionPool.createSessions + // is not in the same trace as runStream, createSessions is invoked at + // Spanner Client instantiation, thus before database.run is invoked. + assert.notEqual( + sessionPoolCreateSessionsSpan.spanContext().traceId, + spanDatabaseRunTransactionAsync.spanContext().traceId, + 'Did not expect the same traceId' + ); + */ + + // Finally check for the collective expected event names. + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Begin Transaction', + 'Transaction Creation Done', + 'Starting stream', + 'Stream broken. Safe to retry', + 'Begin Transaction', + 'Transaction Creation Done', + 'Starting Commit', + 'Commit Done', + + // TODO: Uncomment once we've merged Database.runTransactionAsync tracing. + /* + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + 'Transaction Attempt Failed', + 'Transaction Attempt Aborted', + 'exception', + 'exception', + */ + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + } + + it('database.runTransaction with async/await for INSERT with existent data + transaction.commit', async () => { + const instance = spanner.instance('instance'); + const database = instance.database('database'); + + const update = { + sql: insertAlreadyExistentDataSql, + }; + + try { + await database.runTransactionAsync(async transaction => { + try { + await transaction!.run(update); + } finally { + await transaction!.commit(); + } + }); + } catch (e) { + assert.strictEqual( + (e as grpc.ServiceError).code, + grpc.status.ALREADY_EXISTS + ); + } + + provider.forceFlush(); + assertDatabaseRunPlusAwaitTransactionForAlreadyExistentData(); + }); +}); + +describe('Traces for ExecuteStream broken stream retries', () => { + let sandbox: sinon.SinonSandbox; + const selectSql = 'SELECT NUM, NAME FROM NUMBERS'; + const select1 = 'SELECT 1'; + const invalidSql = 'SELECT * FROM FOO'; + const insertSql = "INSERT INTO NUMBER (NUM, NAME) VALUES (4, 'Four')"; + const selectAllTypes = 'SELECT * FROM TABLE_WITH_ALL_TYPES'; + const insertSqlForAllTypes = `INSERT INTO TABLE_WITH_ALL_TYPES( + COLBOOL, COLINT64, COLFLOAT64, COLNUMERIC, COLSTRING, COLBYTES, COLJSON, COLDATE, COLTIMESTAMP + ) VALUES ( + @bool, @int64, @float64, @numeric, @string, @bytes, @json, @date, @timestamp + )`; + const updateSql = "UPDATE NUMBER SET NAME='Unknown' WHERE NUM IN (5, 6)"; + const fooNotFoundErr = Object.assign(new Error('Table FOO not found'), { + code: grpc.status.NOT_FOUND, + }); + const server = new grpc.Server(); + const spannerMock = mock.createMockSpanner(server); + mockInstanceAdmin.createMockInstanceAdmin(server); + mockDatabaseAdmin.createMockDatabaseAdmin(server); + let port: number; + let spanner: Spanner; + let instance: Instance; + let dbCounter = 1; + + const traceExporter = new InMemorySpanExporter(); + const tracerProvider = new NodeTracerProvider({ + sampler: new AlwaysOnSampler(), + exporter: traceExporter, + }); + tracerProvider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + + function newTestDatabase(): Database { + return instance.database(`database-${dbCounter++}`); + } + + before(async () => { + sandbox = sinon.createSandbox(); + port = await new Promise((resolve, reject) => { + server.bindAsync( + '0.0.0.0:0', + grpc.ServerCredentials.createInsecure(), + (err, assignedPort) => { + if (err) { + reject(err); + } else { + resolve(assignedPort); + } + } + ); + }); + spannerMock.putStatementResult( + selectSql, + mock.StatementResult.resultSet(mock.createSimpleResultSet()) + ); + spannerMock.putStatementResult( + select1, + mock.StatementResult.resultSet(mock.createSelect1ResultSet()) + ); + spannerMock.putStatementResult( + selectAllTypes, + mock.StatementResult.resultSet(mock.createResultSetWithAllDataTypes()) + ); + spannerMock.putStatementResult( + invalidSql, + mock.StatementResult.error(fooNotFoundErr) + ); + spannerMock.putStatementResult( + insertSql, + mock.StatementResult.updateCount(1) + ); + spannerMock.putStatementResult( + insertSqlForAllTypes, + mock.StatementResult.updateCount(1) + ); + spannerMock.putStatementResult( + updateSql, + mock.StatementResult.updateCount(2) + ); + + const observabilityOptions: typeof ObservabilityOptions = { + tracerProvider: tracerProvider, + enableExtendedTracing: true, + }; + + spanner = new Spanner({ + servicePath: 'localhost', + port, + sslCreds: grpc.credentials.createInsecure(), + observabilityOptions: observabilityOptions, + }); + // Gets a reference to a Cloud Spanner instance and database + instance = spanner.instance('instance'); + }); + + after(() => { + spanner.close(); + server.tryShutdown(() => {}); + sandbox.restore(); + }); + + beforeEach(async () => { + spannerMock.resetRequests(); + spannerMock.removeExecutionTimes(); + await tracerProvider.forceFlush(); + await traceExporter.forceFlush(); + await traceExporter.reset(); + }); + + describe('PartialResultStream', () => { + const streamIndexes = [1, 2]; + streamIndexes.forEach(index => { + it('should retry UNAVAILABLE during streaming', async () => { + const database = newTestDatabase(); + const err = { + message: 'Temporary unavailable', + code: grpc.status.UNAVAILABLE, + streamIndex: index, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + const [rows] = await database.run(selectSql); + assert.strictEqual(rows.length, 3); + await database.close(); + }); + + it('should retry UNAVAILABLE during streaming with txn ID from inline begin response', async () => { + const err = { + message: 'Temporary unavailable', + code: grpc.status.UNAVAILABLE, + streamIndex: index, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + const database = newTestDatabase(); + + await database.runTransactionAsync(async tx => { + await tx.run(selectSql); + await tx.commit(); + }); + await database.close(); + + const requests = spannerMock + .getRequests() + .filter(val => (val as v1.ExecuteSqlRequest).sql) + .map(req => req as v1.ExecuteSqlRequest); + assert.strictEqual(requests.length, 2); + assert.ok( + requests[0].transaction?.begin!.readWrite, + 'inline txn is not set.' + ); + assert.ok( + requests[1].transaction!.id, + 'Transaction ID is not used for retries.' + ); + assert.ok( + requests[1].resumeToken, + 'Resume token is not set for the retried' + ); + }); + + it('should retry UNAVAILABLE during streaming with txn ID from inline begin response with parallel queries', async () => { + const err = { + message: 'Temporary unavailable', + code: grpc.status.UNAVAILABLE, + streamIndex: index, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + const database = newTestDatabase(); + + await database.runTransactionAsync(async tx => { + const [rows1, rows2] = await Promise.all([ + tx!.run(selectSql), + tx!.run(selectSql), + ]); + assert.equal(rows1.length, 3); + assert.equal(rows2.length, 3); + await tx.commit(); + }); + await database.close(); + + const requests = spannerMock + .getRequests() + .filter(val => (val as v1.ExecuteSqlRequest).sql) + .map(req => req as v1.ExecuteSqlRequest); + assert.strictEqual(requests.length, 3); + assert.ok( + requests[0].transaction?.begin!.readWrite, + 'inline txn is not set.' + ); + assert.ok( + requests[1].transaction!.id, + 'Transaction ID is not used for retries.' + ); + assert.ok( + requests[1].resumeToken, + 'Resume token is not set for the retried' + ); + const commitRequests = spannerMock + .getRequests() + .filter(val => (val as v1.CommitRequest).mutations) + .map(req => req as v1.CommitRequest); + assert.strictEqual(commitRequests.length, 1); + assert.deepStrictEqual( + requests[1].transaction!.id, + requests[2].transaction!.id + ); + assert.deepStrictEqual( + requests[1].transaction!.id, + commitRequests[0].transactionId + ); + const beginTxnRequests = spannerMock + .getRequests() + .filter(val => (val as v1.BeginTransactionRequest).options?.readWrite) + .map(req => req as v1.BeginTransactionRequest); + assert.deepStrictEqual(beginTxnRequests.length, 0); + }); + + it('should not retry non-retryable error during streaming', async () => { + const database = newTestDatabase(); + const err = { + message: 'Test error', + streamIndex: index, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + try { + await database.run(selectSql); + assert.fail('missing expected error'); + } catch (e) { + assert.strictEqual( + (e as grpc.ServiceError).message, + '2 UNKNOWN: Test error' + ); + } + await database.close(); + }); + + it('should retry UNAVAILABLE during streaming with a callback', done => { + const database = newTestDatabase(); + const err = { + message: 'Temporary unavailable', + code: grpc.status.UNAVAILABLE, + streamIndex: index, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + database.run(selectSql, (err, rows) => { + assert.ifError(err); + assert.strictEqual(rows!.length, 3); + database + .close() + .catch(done) + .then(() => done()); + }); + }); + + it('should not retry non-retryable error during streaming with a callback', done => { + const database = newTestDatabase(); + const err = { + message: 'Non-retryable error', + streamIndex: index, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + database.run(selectSql, err => { + assert.ok(err, 'Missing expected error'); + assert.strictEqual(err!.message, '2 UNKNOWN: Non-retryable error'); + database + .close() + .catch(done) + .then(() => done()); + }); + }); + + it('should emit non-retryable error during streaming to stream', done => { + const database = newTestDatabase(); + + const err = { + message: 'Non-retryable error', + streamIndex: index, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + const receivedRows: Row[] = []; + database + .runStream(selectSql) + // We will receive data for the partial result sets that are + // returned before the error occurs. + .on('data', row => { + receivedRows.push(row); + }) + .on('end', () => { + assert.fail('Missing expected error'); + }) + .on('error', err => { + assert.strictEqual(err.message, '2 UNKNOWN: Non-retryable error'); + database + .close() + .catch(done) + .then(() => { + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + spans.sort((spanA, spanB) => { + return spanA.startTime < spanB.startTime; + }); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Database.runStream', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Finally check for the collective expected event names. + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Starting stream', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + 'Transaction Creation Done', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + + done(); + }); + }); + }); + }); + }); + + it('should retry UNAVAILABLE from executeStreamingSql with multiple errors during streaming', async () => { + const database = newTestDatabase(); + const errors: mock.MockError[] = []; + for (const index of [0, 1, 1, 2, 2]) { + errors.push({ + message: 'Temporary unavailable', + code: grpc.status.UNAVAILABLE, + streamIndex: index, + } as mock.MockError); + } + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofErrors(errors) + ); + const [rows] = await database.run(selectSql); + assert.strictEqual(rows.length, 3); + await database.close(); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + spans.sort((spanA, spanB) => { + return spanA.startTime < spanB.startTime; + }); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Database.runStream', + 'CloudSpanner.Database.run', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Finally check for the collective expected event names. + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Starting stream', + 'Re-attempting start stream', + 'Resuming stream', + 'Resuming stream', + 'Resuming stream', + 'Resuming stream', + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + }); + + it('should retry UNAVAILABLE on update', async () => { + const database = newTestDatabase(); + const err = { + message: 'Temporary unavailable', + code: grpc.status.UNAVAILABLE, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + + await database.runTransactionAsync(async tx => { + const [updateCount] = await tx!.runUpdate(insertSql); + assert.strictEqual(updateCount, 1); + await tx!.commit(); + }); + await database.close(); + + // The span for a successful invocation of database.runTransaction + // can only be ended after the calling function is completed. + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', + 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Dml.runUpdate', + 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Transaction.commit', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + // Finally check for the collective expected event names. + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + 'Starting stream', + 'Re-attempting start stream', + 'Begin Transaction', + 'Transaction Creation Done', + 'Starting Commit', + 'Commit Done', + + // TODO: Uncomment once we've merged Database.runTransactionAsync tracing. + /* + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + 'Transaction Attempt Succeeded', + */ + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + }); + + it('should not retry non-retryable error on update', async () => { + const database = newTestDatabase(); + const err = { + message: 'Permanent error', + // We need to specify a non-retryable error code to prevent the entire + // transaction to retry. Not specifying an error code, will result in + // an error with code UNKNOWN, which again will retry the transaction. + code: grpc.status.INVALID_ARGUMENT, + } as mock.MockError; + spannerMock.setExecutionTime( + spannerMock.executeStreamingSql, + mock.SimulatedExecutionTime.ofError(err) + ); + let attempts = 0; + + await database.runTransactionAsync(async tx => { + attempts++; + await tx!.runUpdate(insertSql, err => { + assert.ok(err, 'Missing expected error'); + assert.strictEqual(err!.code, grpc.status.INVALID_ARGUMENT); + assert.strictEqual(attempts, 1); + tx! + .commit() + .then(() => { + database.close().catch(assert.ifError); + }) + .catch(assert.ifError); + }); + }); + assert.deepStrictEqual( + attempts, + 1, + 'runTransactionAsync.attempt must be 1' + ); + + traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); + + const actualSpanNames: string[] = []; + const actualEventNames: string[] = []; + spans.forEach(span => { + actualSpanNames.push(span.name); + span.events.forEach(event => { + actualEventNames.push(event.name); + }); + }); + + const expectedSpanNames = [ + 'CloudSpanner.Database.batchCreateSessions', + 'CloudSpanner.SessionPool.createSessions', + ]; + assert.deepStrictEqual( + actualSpanNames, + expectedSpanNames, + `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` + ); + + const expectedEventNames = [ + 'Requesting 25 sessions', + 'Creating 25 sessions', + 'Requested for 25 sessions returned 25', + + // TODO: Uncomment once we've merged Database.runTransactionAsync tracing. + /* + 'Acquiring session', + 'Waiting for a session to become available', + 'Acquired session', + 'Using Session', + 'Transaction Attempt Succeeded', + */ + ]; + assert.deepStrictEqual( + actualEventNames, + expectedEventNames, + `Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}` + ); + }); +}); diff --git a/observability-test/transaction.ts b/observability-test/transaction.ts index 550f85fc3..233728784 100644 --- a/observability-test/transaction.ts +++ b/observability-test/transaction.ts @@ -479,7 +479,7 @@ describe('Transaction', () => { `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const expectedEventNames = ['exception']; + const expectedEventNames = ['Starting stream', 'exception']; assert.deepStrictEqual( actualEventNames, expectedEventNames, diff --git a/src/database.ts b/src/database.ts index 4db9fdfb2..e8d48c05f 100644 --- a/src/database.ts +++ b/src/database.ts @@ -1130,6 +1130,8 @@ class Database extends common.GrpcServiceObject { } catch (e) { setSpanErrorAndException(span, e as Error); this.emit('error', e); + } finally { + span.end(); } }); } @@ -2101,6 +2103,9 @@ class Database extends common.GrpcServiceObject { session!.lastError = err; this.pool_.release(session!); this.getSnapshot(options, (err, snapshot) => { + if (err) { + setSpanError(span, err); + } span.end(); callback!(err, snapshot); }); @@ -2815,6 +2820,7 @@ class Database extends common.GrpcServiceObject { this.runStream(query, options) .on('error', err => { setSpanError(span, err); + span.end(); callback!(err as grpc.ServiceError, rows, stats, metadata); }) .on('response', response => { @@ -3054,7 +3060,6 @@ class Database extends common.GrpcServiceObject { let dataStream = snapshot.runStream(query); const endListener = () => { - span.end(); snapshot.end(); }; dataStream @@ -3083,6 +3088,11 @@ class Database extends common.GrpcServiceObject { // Create a new data stream and add it to the end user stream. dataStream = this.runStream(query, options); dataStream.pipe(proxyStream); + dataStream.on('end', () => span.end()); + dataStream.on('error', err => { + setSpanError(span, err); + span.end(); + }); } else { proxyStream.destroy(err); snapshot.end(); @@ -3098,7 +3108,9 @@ class Database extends common.GrpcServiceObject { if (err) { setSpanError(span, err); } - span.end(); + if (span.isRecording()) { + span.end(); + } }); return proxyStream as PartialResultStream; @@ -3212,6 +3224,26 @@ class Database extends common.GrpcServiceObject { ? (optionsOrRunFn as RunTransactionOptions) : {}; + const retry = (span: Span) => { + this.runTransaction(options, (err, txn) => { + if (err) { + setSpanError(span, err); + span.end(); + runFn!(err, null); + return; + } + + txn!.once('end', () => { + span.end(); + }); + txn!.once('error', err => { + setSpanError(span, err!); + span.end(); + }); + runFn!(null, txn!); + }); + }; + startTrace('Database.runTransaction', this._traceConfig, span => { this.pool_.getSession((err, session?, transaction?) => { if (err) { @@ -3222,8 +3254,7 @@ class Database extends common.GrpcServiceObject { span.addEvent('No session available', { 'session.id': session?.id, }); - this.runTransaction(options, runFn!); - span.end(); + retry(span); return; } @@ -3241,41 +3272,42 @@ class Database extends common.GrpcServiceObject { transaction!.excludeTxnFromChangeStreams(); } - const release = () => { + // Our span should only be ended if the + // transaction either errored or was ended. + transaction!.once('error', err => { + setSpanError(span, err!); + span.end(); + }); + + transaction!.once('end', err => { + setSpanError(span, err!); span.end(); + }); + + const release = () => { this.pool_.release(session!); }; const runner = new TransactionRunner( session!, transaction!, - (err, resp) => { - if (err) { - setSpanError(span, err!); - } - span.end(); - runFn!(err, resp); - }, + runFn!, options ); runner.run().then(release, err => { - if (err) { - setSpanError(span, err!); - } + setSpanError(span, err); if (isSessionNotFoundError(err)) { span.addEvent('No session available', { 'session.id': session?.id, }); release(); - this.runTransaction(options, runFn!); + retry(span); } else { - if (!err) { - span.addEvent('Using Session', {'session.id': session!.id}); - } setImmediate(runFn!, err); release(); + span.end(); } }); }); @@ -3536,6 +3568,11 @@ class Database extends common.GrpcServiceObject { mutationGroups, options ); + dataStream.once('end', () => span.end()); + dataStream.once('error', err => { + setSpanError(span, err!); + span.end(); + }); dataStream.pipe(proxyStream); } else { span.end(); @@ -3635,8 +3672,14 @@ class Database extends common.GrpcServiceObject { span.addEvent('No session available', { 'session.id': session?.id, }); - this.writeAtLeastOnce(mutations, options, cb!); - span.end(); + // Retry this method. + this.writeAtLeastOnce(mutations, options, (err, resp) => { + if (err) { + setSpanError(span, err); + } + span.end(); + cb!(err, resp); + }); return; } if (err) { diff --git a/src/index.ts b/src/index.ts index db3110568..0bbdccd01 100644 --- a/src/index.ts +++ b/src/index.ts @@ -80,7 +80,10 @@ import { import grpcGcpModule = require('grpc-gcp'); const grpcGcp = grpcGcpModule(grpc); import * as v1 from './v1'; -import {ObservabilityOptions} from './instrument'; +import { + ObservabilityOptions, + ensureInitialContextManagerSet, +} from './instrument'; // eslint-disable-next-line @typescript-eslint/no-var-requires const gcpApiConfig = require('./spanner_grpc_config.json'); @@ -370,6 +373,7 @@ class Spanner extends GrpcService { }; this.directedReadOptions = directedReadOptions; this._observabilityOptions = options.observabilityOptions; + ensureInitialContextManagerSet(); } /** diff --git a/src/instrument.ts b/src/instrument.ts index 99b260bf4..a7812d473 100644 --- a/src/instrument.ts +++ b/src/instrument.ts @@ -117,6 +117,7 @@ function ensureInitialContextManagerSet() { context.setGlobalContextManager(contextManager); } } +export {ensureInitialContextManagerSet}; /** * startTrace begins an active span in the current active context @@ -136,8 +137,6 @@ export function startTrace( config = {} as traceConfig; } - ensureInitialContextManagerSet(); - return getTracer(config.opts?.tracerProvider).startActiveSpan( SPAN_NAMESPACE_PREFIX + '.' + spanNameSuffix, {kind: SpanKind.CLIENT}, @@ -165,11 +164,20 @@ export function startTrace( } } - if (config.that) { - const fn = cb.bind(config.that); - return fn(span); - } else { - return cb(span); + // If at all the invoked function throws an exception, + // record the exception and then end this span. + try { + if (config.that) { + const fn = cb.bind(config.that); + return fn(span); + } else { + return cb(span); + } + } catch (e) { + setSpanErrorAndException(span, e as Error); + span.end(); + // Finally re-throw the exception. + throw e; } } ); diff --git a/src/table.ts b/src/table.ts index 227f8d107..8f87ba00b 100644 --- a/src/table.ts +++ b/src/table.ts @@ -35,6 +35,7 @@ import { ObservabilityOptions, startTrace, setSpanError, + setSpanErrorAndException, traceConfig, } from './instrument'; diff --git a/src/transaction.ts b/src/transaction.ts index e7993e74c..eddf8f61c 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -456,19 +456,21 @@ export class Snapshot extends EventEmitter { gaxOpts, headers: headers, }, - ( + async ( err: null | grpc.ServiceError, resp: spannerClient.spanner.v1.ITransaction ) => { if (err) { setSpanError(span, err); - span.end(); - callback!(err, resp); - return; + } else { + this._update(resp); } - this._update(resp); + + // begin simply issues an RPC to signal a state change to the backend, + // and doesn't invoke any other subsequent code hence it is paramount + // to invoke span.end() before returning results to the callback. span.end(); - callback!(null, resp); + callback!(err, resp); } ); }); @@ -708,26 +710,37 @@ export class Snapshot extends EventEmitter { addLeaderAwareRoutingHeader(headers); } - const makeRequest = (resumeToken?: ResumeToken): Readable => { - if (this.id && transaction.begin) { - delete transaction.begin; - transaction.id = this.id; - } - return this.requestStream({ - client: 'SpannerClient', - method: 'streamingRead', - reqOpts: Object.assign({}, reqOpts, {resumeToken}), - gaxOpts: gaxOptions, - headers: headers, - }); - }; - const traceConfig = { tableName: table, opts: this._observabilityOptions, dbName: this._dbName!, }; return startTrace('Snapshot.createReadStream', traceConfig, span => { + let attempt = 0; + const makeRequest = (resumeToken?: ResumeToken): Readable => { + if (this.id && transaction.begin) { + delete transaction.begin; + transaction.id = this.id; + } + + if (resumeToken) { + span.addEvent('Resuming stream', { + resume_token: resumeToken!.toString(), + attempt: attempt, + }); + } + + attempt++; + + return this.requestStream({ + client: 'SpannerClient', + method: 'streamingRead', + reqOpts: Object.assign({}, reqOpts, {resumeToken}), + gaxOpts: gaxOptions, + headers: headers, + }); + }; + const resultStream = partialResultStream( this._wrapWithIdWaiter(makeRequest), { @@ -743,20 +756,21 @@ export class Snapshot extends EventEmitter { this._update(response.metadata!.transaction); } }) - .on('error', err => { - const isServiceError = - err && typeof err === 'object' && 'code' in err; - if ( - !this.id && - this._useInRunner && - !( - isServiceError && - (err as grpc.ServiceError).code === grpc.status.ABORTED - ) - ) { - this.begin(); - } + .on('error', async err => { setSpanError(span, err); + const wasAborted = isErrorAborted(err); + if (!this.id && this._useInRunner && !wasAborted) { + // It is paramount for us to await this invocation to + // .begin() to complete before we invoke span.end(); + await this.begin(); + } else { + if (wasAborted) { + span.addEvent('Stream broken. Not safe to retry', { + 'transaction.id': this.id?.toString(), + }); + } + } + span.end(); }) .on('end', err => { if (err) { @@ -770,7 +784,9 @@ export class Snapshot extends EventEmitter { if (err) { setSpanError(span, err); } - span.end(); + if (span.isRecording()) { + span.end(); + } }); } @@ -1281,7 +1297,21 @@ export class Snapshot extends EventEmitter { ...query, }; return startTrace('Snapshot.runStream', traceConfig, span => { + let attempt = 0; const makeRequest = (resumeToken?: ResumeToken): Readable => { + if (!resumeToken) { + if (attempt === 0) { + span.addEvent('Starting stream'); + } else { + span.addEvent('Re-attempting start stream', {attempt: attempt}); + } + } else { + span.addEvent('Resuming stream', { + resume_token: resumeToken!.toString(), + attempt: attempt, + }); + } + if (!reqOpts || (this.id && !reqOpts.transaction.id)) { try { sanitizeRequest(); @@ -1294,6 +1324,8 @@ export class Snapshot extends EventEmitter { } } + attempt++; + return this.requestStream({ client: 'SpannerClient', method: 'executeStreamingSql', @@ -1318,36 +1350,29 @@ export class Snapshot extends EventEmitter { this._update(response.metadata!.transaction); } }) - .on('error', err => { + .on('error', async err => { setSpanError(span, err as Error); - const isServiceError = - err && typeof err === 'object' && 'code' in err; - if ( - !this.id && - this._useInRunner && - !( - isServiceError && - (err as grpc.ServiceError).code === grpc.status.ABORTED - ) - ) { - this.begin(); + const wasAborted = isErrorAborted(err); + if (!this.id && this._useInRunner && !wasAborted) { + span.addEvent('Stream broken. Safe to retry'); + await this.begin(); + } else { + if (wasAborted) { + span.addEvent('Stream broken. Not safe to retry', { + 'transaction.id': this.id?.toString(), + }); + } } + span.end(); }) .on('end', err => { if (err) { setSpanError(span, err as Error); } - span.end(); - }); - - if (resultStream instanceof Stream) { - finished(resultStream, err => { - if (err) { - setSpanError(span, err); + if (span.isRecording()) { + span.end(); } - span.end(); }); - } return resultStream; }); @@ -2106,21 +2131,28 @@ export class Transaction extends Dml { opts: this._observabilityOptions, dbName: this._dbName!, }; - return startTrace('Transaction.commit', traceConfig, span => { + startTrace('Transaction.commit', traceConfig, async span => { if (this.id) { reqOpts.transactionId = this.id as Uint8Array; } else if (!this._useInRunner) { reqOpts.singleUseTransaction = this._options; } else { - this.begin().then(() => { - this.commit(options, (err, resp) => { - if (err) { - setSpanError(span, err); - } + this.begin().then( + () => { + this.commit(options, (err, resp) => { + if (err) { + setSpanError(span, err); + } + span.end(); + callback(err, resp); + }); + }, + err => { + setSpanError(span, err); span.end(); - callback(err, resp); - }); - }, callback); + callback(err, null); + } + ); return; } @@ -2985,6 +3017,15 @@ export class PartitionedDml extends Dml { } } +function isErrorAborted(err): boolean { + return ( + err && + typeof err === 'object' && + 'code' in err && + (err as grpc.ServiceError).code === grpc.status.ABORTED + ); +} + /*! Developer Documentation * * All async methods (except for streams) return a Promise in the event diff --git a/test/spanner.ts b/test/spanner.ts index 032d18493..d324ed911 100644 --- a/test/spanner.ts +++ b/test/spanner.ts @@ -5062,6 +5062,7 @@ describe('Spanner with mock server', () => { 'Requesting 25 sessions', 'Creating 25 sessions', 'Requested for 25 sessions returned 25', + 'Starting stream', 'Acquiring session', 'Waiting for a session to become available', 'Acquired session', From f3df28844a5e94894508043fb0b1fd0bfa6c3de4 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 21 Oct 2024 06:14:51 -0700 Subject: [PATCH 02/18] Address merge from main --- observability-test/spanner.ts | 26 +++++--------------------- 1 file changed, 5 insertions(+), 21 deletions(-) diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index 96abffd69..14176cd5e 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -513,6 +513,7 @@ describe('EndToEnd', () => { ); const expectedEventNames = [ + 'Starting stream', 'Transaction Creation Done', 'Acquiring session', 'Cache hit: has usable session', @@ -1558,6 +1559,7 @@ SELECT 1p 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Transaction.commit', 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Database.runTransactionAsync', ]; assert.deepStrictEqual( actualSpanNames, @@ -1603,7 +1605,6 @@ SELECT 1p 'Expected that sessionPool.createSessions is the parent to db.batchCreassionSessions' ); - // TODO: Uncomment once we've merged Database.runTransactionAsync tracing. // We need to ensure a strict relationship between the spans. // |-Database.runTransactionAsync |-------------------------------------| // |-Snapshot.run |------------------------| @@ -1611,14 +1612,13 @@ SELECT 1p // |-Transaction.commit |--------| // |-Snapshot.begin |------| // |-Snapshot.commit |-----| - /* const spanDatabaseRunTransactionAsync = spans[spans.length - 1]; assert.deepStrictEqual( spanDatabaseRunTransactionAsync.name, 'CloudSpanner.Database.runTransactionAsync', `${actualSpanNames}` ); - const spanTransactionCommit0 = spans[spans.length - 1]; + const spanTransactionCommit0 = spans[spans.length - 2]; assert.strictEqual( spanTransactionCommit0.name, 'CloudSpanner.Transaction.commit' @@ -1643,7 +1643,6 @@ SELECT 1p spanDatabaseRunTransactionAsync.spanContext().traceId, 'Did not expect the same traceId' ); - */ // Finally check for the collective expected event names. const expectedEventNames = [ @@ -1658,18 +1657,11 @@ SELECT 1p 'Transaction Creation Done', 'Starting Commit', 'Commit Done', - - // TODO: Uncomment once we've merged Database.runTransactionAsync tracing. - /* 'Acquiring session', 'Waiting for a session to become available', 'Acquired session', 'Using Session', - 'Transaction Attempt Failed', - 'Transaction Attempt Aborted', - 'exception', 'exception', - */ ]; assert.deepStrictEqual( actualEventNames, @@ -2183,6 +2175,7 @@ describe('Traces for ExecuteStream broken stream retries', () => { 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Transaction.commit', 'CloudSpanner.Transaction.commit', + 'CloudSpanner.Database.runTransactionAsync', ]; assert.deepStrictEqual( actualSpanNames, @@ -2201,15 +2194,10 @@ describe('Traces for ExecuteStream broken stream retries', () => { 'Transaction Creation Done', 'Starting Commit', 'Commit Done', - - // TODO: Uncomment once we've merged Database.runTransactionAsync tracing. - /* 'Acquiring session', 'Waiting for a session to become available', 'Acquired session', 'Using Session', - 'Transaction Attempt Succeeded', - */ ]; assert.deepStrictEqual( actualEventNames, @@ -2268,6 +2256,7 @@ describe('Traces for ExecuteStream broken stream retries', () => { const expectedSpanNames = [ 'CloudSpanner.Database.batchCreateSessions', 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Database.runTransactionAsync', ]; assert.deepStrictEqual( actualSpanNames, @@ -2279,15 +2268,10 @@ describe('Traces for ExecuteStream broken stream retries', () => { 'Requesting 25 sessions', 'Creating 25 sessions', 'Requested for 25 sessions returned 25', - - // TODO: Uncomment once we've merged Database.runTransactionAsync tracing. - /* 'Acquiring session', 'Waiting for a session to become available', 'Acquired session', 'Using Session', - 'Transaction Attempt Succeeded', - */ ]; assert.deepStrictEqual( actualEventNames, From ed4a3457cd0a4877400933ec9122764a3d24c8f1 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 21 Oct 2024 07:00:58 -0700 Subject: [PATCH 03/18] Ensure Database.getSnapshot correct span ending + tests --- observability-test/spanner.ts | 18 +++++++++++------- src/database.ts | 13 ++++++++----- src/transaction.ts | 16 +++++++++------- 3 files changed, 28 insertions(+), 19 deletions(-) diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index 14176cd5e..74d4964c5 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -129,6 +129,7 @@ describe('EndToEnd', () => { let database: Database; let spannerMock: mock.MockSpanner; let traceExporter: typeof InMemorySpanExporter; + let tracerProvider: typeof NodeTracerProvider; const contextManager = new AsyncHooksContextManager(); setGlobalContextManager(contextManager); @@ -140,14 +141,14 @@ describe('EndToEnd', () => { beforeEach(async () => { traceExporter = new InMemorySpanExporter(); const sampler = new AlwaysOnSampler(); - const provider = new NodeTracerProvider({ + tracerProvider = new NodeTracerProvider({ sampler: sampler, exporter: traceExporter, }); - provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + tracerProvider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); const setupResult = await setup({ - tracerProvider: provider, + tracerProvider: tracerProvider, enableExtendedTracing: false, }); @@ -167,7 +168,8 @@ describe('EndToEnd', () => { traceExporter.reset(); }); - afterEach(() => { + afterEach(async () => { + await tracerProvider.forceFlush(); traceExporter.reset(); spannerMock.resetRequests(); spanner.close(); @@ -218,9 +220,11 @@ describe('EndToEnd', () => { database.getSnapshot((err, transaction) => { assert.ifError(err); - transaction!.run('SELECT 1', (err, rows) => { + transaction!.run('SELECT 1', async (err, rows) => { assert.ifError(err); + transaction!.end(); + await tracerProvider.forceFlush(); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); withAllSpansHaveDBName(spans); @@ -236,9 +240,9 @@ describe('EndToEnd', () => { const expectedSpanNames = [ 'CloudSpanner.Snapshot.begin', - 'CloudSpanner.Database.getSnapshot', 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Snapshot.run', + 'CloudSpanner.Database.getSnapshot', ]; assert.deepStrictEqual( actualSpanNames, @@ -249,10 +253,10 @@ describe('EndToEnd', () => { const expectedEventNames = [ 'Begin Transaction', 'Transaction Creation Done', + 'Starting stream', 'Acquiring session', 'Cache hit: has usable session', 'Acquired session', - 'Starting stream', ]; assert.deepStrictEqual( actualEventNames, diff --git a/src/database.ts b/src/database.ts index e8d48c05f..cca9dcf44 100644 --- a/src/database.ts +++ b/src/database.ts @@ -2105,8 +2105,14 @@ class Database extends common.GrpcServiceObject { this.getSnapshot(options, (err, snapshot) => { if (err) { setSpanError(span, err); + span.end(); + } else { + snapshot!.once('end', () => span.end()); + snapshot!.once('error', err => { + setSpanError(span, err); + span.end(); + }); } - span.end(); callback!(err, snapshot); }); } else { @@ -2119,7 +2125,6 @@ class Database extends common.GrpcServiceObject { } this._releaseOnEnd(session!, snapshot, span); - span.end(); callback!(err, snapshot); }); }); @@ -3108,9 +3113,7 @@ class Database extends common.GrpcServiceObject { if (err) { setSpanError(span, err); } - if (span.isRecording()) { - span.end(); - } + span.end(); }); return proxyStream as PartialResultStream; diff --git a/src/transaction.ts b/src/transaction.ts index eddf8f61c..06b53a28a 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -456,7 +456,7 @@ export class Snapshot extends EventEmitter { gaxOpts, headers: headers, }, - async ( + ( err: null | grpc.ServiceError, resp: spannerClient.spanner.v1.ITransaction ) => { @@ -784,9 +784,7 @@ export class Snapshot extends EventEmitter { if (err) { setSpanError(span, err); } - if (span.isRecording()) { - span.end(); - } + span.end(); }); } @@ -1369,11 +1367,15 @@ export class Snapshot extends EventEmitter { if (err) { setSpanError(span, err as Error); } - if (span.isRecording()) { - span.end(); - } + span.end(); }); + finished(resultStream, err => { + if (err) { + setSpanError(span, err); + } + span.end(); + }); return resultStream; }); } From 1d17cbbe6d54fc57aa504acedc12d3fd0ca5bd9e Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 21 Oct 2024 18:40:25 -0700 Subject: [PATCH 04/18] Proper placement of Transaction span.end() --- src/database.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/database.ts b/src/database.ts index cca9dcf44..a274a833f 100644 --- a/src/database.ts +++ b/src/database.ts @@ -874,7 +874,6 @@ class Database extends common.GrpcServiceObject { return; } span.addEvent('Using Session', {'session.id': session?.id}); - span.end(); callback!(null, transaction, resp!); }); }); @@ -2214,10 +2213,11 @@ class Database extends common.GrpcServiceObject { 'session.id': session?.id, }); setSpanError(span, err); + span.end(); } else { setSpanError(span, err); + span.end(); } - span.end(); cb!(err as grpc.ServiceError | null, transaction); }); }); From b8d91b84505fe77167a3ba764ab8b184e5200fb8 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Mon, 21 Oct 2024 23:26:45 -0700 Subject: [PATCH 05/18] Adjust tests to ensure transaction.end() where necessary --- observability-test/database.ts | 16 ++++++++++++---- observability-test/spanner.ts | 30 +++++++++++++++++++++--------- src/database.ts | 9 ++++++--- src/transaction.ts | 16 +++++++++------- 4 files changed, 48 insertions(+), 23 deletions(-) diff --git a/observability-test/database.ts b/observability-test/database.ts index c951b7b1c..eb9309fdc 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -774,6 +774,7 @@ describe('Database', () => { callback(null, RESPONSE); }, once() {}, + end() {}, }; database.batchTransaction = (identifier, options) => { @@ -782,10 +783,14 @@ describe('Database', () => { return fakeTransaction; }; - database.createBatchTransaction(opts, (err, transaction, resp) => { + database.createBatchTransaction(opts, async (err, transaction, resp) => { assert.strictEqual(err, null); assert.strictEqual(transaction, fakeTransaction); assert.strictEqual(resp, RESPONSE); + transaction!.end(); + + await provider.forceFlush(); + traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); withAllSpansHaveDBName(spans); @@ -839,8 +844,8 @@ describe('Database', () => { begin(callback) { callback(error, RESPONSE); }, - once() {}, + end() {}, }; database.batchTransaction = () => { @@ -926,9 +931,11 @@ describe('Database', () => { getSessionStub.callsFake(callback => callback(fakeError)); - database.getTransaction(err => { + database.getTransaction(async err => { assert.strictEqual(err, fakeError); + await provider.forceFlush(); + traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 1, 'Exactly 1 span expected'); withAllSpansHaveDBName(spans); @@ -975,9 +982,10 @@ describe('Database', () => { }); it('with no errors', done => { - database.getTransaction((err, transaction) => { + database.getTransaction(async (err, transaction) => { assert.ifError(err); assert.strictEqual(transaction, fakeTransaction); + transaction!.end(); const spans = traceExporter.getFinishedSpans(); withAllSpansHaveDBName(spans); diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index 74d4964c5..96fa280a9 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -273,13 +273,17 @@ describe('EndToEnd', () => { const withAllSpansHaveDBName = generateWithAllSpansHaveDBName( database.formattedName_ ); - database.getTransaction((err, transaction) => { + database.getTransaction(async (err, transaction) => { assert.ifError(err); assert.ok(transaction); + transaction!.end(); + transaction!.commit(); + await tracerProvider.forceFlush(); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); withAllSpansHaveDBName(spans); + console.log(`flushed spans: ${spans.toString()}`); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -807,7 +811,10 @@ describe('ObservabilityOptions injection and propagation', async () => { database.getTransaction((err, tx) => { assert.ifError(err); - tx!.run('SELECT 1', (err, rows) => { + tx!.run('SELECT 1', async (err, rows) => { + tx!.end(); + + await tracerProvider.forceFlush(); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); @@ -860,9 +867,11 @@ describe('ObservabilityOptions injection and propagation', async () => { traceExporter.reset(); tx!.begin(); - tx!.runUpdate(updateSql, (err, rowCount) => { + tx!.runUpdate(updateSql, async (err, rowCount) => { assert.ifError(err); + tx!.end(); + await tracerProvider.forceFlush(); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); @@ -914,9 +923,10 @@ describe('ObservabilityOptions injection and propagation', async () => { .on('data', () => rowCount++) .on('error', assert.ifError) .on('stats', _stats => {}) - .on('end', () => { + .on('end', async () => { tx!.end(); + await tracerProvider.forceFlush(); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); @@ -970,7 +980,9 @@ describe('ObservabilityOptions injection and propagation', async () => { tx!.runUpdate(updateSql, async (err, rowCount) => { assert.ifError(err); - tx!.rollback(err => { + tx!.rollback(async err => { + tx!.end(); + await tracerProvider.forceFlush(); traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); @@ -1557,9 +1569,9 @@ SELECT 1p const expectedSpanNames = [ 'CloudSpanner.Database.batchCreateSessions', 'CloudSpanner.SessionPool.createSessions', + 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Snapshot.run', 'CloudSpanner.Snapshot.begin', - 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Snapshot.begin', 'CloudSpanner.Transaction.commit', 'CloudSpanner.Transaction.commit', @@ -1570,7 +1582,7 @@ SELECT 1p expectedSpanNames, `span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}` ); - const spanSnapshotRun = spans[2]; + const spanSnapshotRun = spans[3]; assert.strictEqual(spanSnapshotRun.name, 'CloudSpanner.Snapshot.run'); const wantSpanErr = '6 ALREADY_EXISTS: ' + messageBadInsertAlreadyExistent; assert.deepStrictEqual( @@ -1653,12 +1665,12 @@ SELECT 1p 'Requesting 25 sessions', 'Creating 25 sessions', 'Requested for 25 sessions returned 25', - 'Begin Transaction', - 'Transaction Creation Done', 'Starting stream', 'Stream broken. Safe to retry', 'Begin Transaction', 'Transaction Creation Done', + 'Begin Transaction', + 'Transaction Creation Done', 'Starting Commit', 'Commit Done', 'Acquiring session', diff --git a/src/database.ts b/src/database.ts index a274a833f..be50f5c5c 100644 --- a/src/database.ts +++ b/src/database.ts @@ -856,6 +856,7 @@ class Database extends common.GrpcServiceObject { callback!(err as ServiceError, null, undefined); return; } + const transaction = this.batchTransaction( {session: session!}, options @@ -874,6 +875,7 @@ class Database extends common.GrpcServiceObject { return; } span.addEvent('Using Session', {'session.id': session?.id}); + span.end(); callback!(null, transaction, resp!); }); }); @@ -1130,7 +1132,9 @@ class Database extends common.GrpcServiceObject { setSpanErrorAndException(span, e as Error); this.emit('error', e); } finally { - span.end(); + if (span.isRecording()) { + span.end(); + } } }); } @@ -2213,11 +2217,10 @@ class Database extends common.GrpcServiceObject { 'session.id': session?.id, }); setSpanError(span, err); - span.end(); } else { setSpanError(span, err); - span.end(); } + span.end(); cb!(err as grpc.ServiceError | null, transaction); }); }); diff --git a/src/transaction.ts b/src/transaction.ts index 06b53a28a..3d0778fed 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -1370,12 +1370,14 @@ export class Snapshot extends EventEmitter { span.end(); }); - finished(resultStream, err => { - if (err) { - setSpanError(span, err); - } - span.end(); - }); + if (resultStream instanceof Stream) { + finished(resultStream, err => { + if (err) { + setSpanError(span, err); + } + span.end(); + }); + } return resultStream; }); } @@ -2133,7 +2135,7 @@ export class Transaction extends Dml { opts: this._observabilityOptions, dbName: this._dbName!, }; - startTrace('Transaction.commit', traceConfig, async span => { + return startTrace('Transaction.commit', traceConfig, span => { if (this.id) { reqOpts.transactionId = this.id as Uint8Array; } else if (!this._useInRunner) { From 0205732faaad699aefda0278dd9caf33238b3ce5 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 22 Oct 2024 00:00:35 -0700 Subject: [PATCH 06/18] Address some review feedback --- src/instrument.ts | 8 +------- src/transaction.ts | 39 ++++++++++++++++++++++++--------------- 2 files changed, 25 insertions(+), 22 deletions(-) diff --git a/src/instrument.ts b/src/instrument.ts index a7812d473..15fe7b8e3 100644 --- a/src/instrument.ts +++ b/src/instrument.ts @@ -89,7 +89,6 @@ interface traceConfig { tableName?: string; dbName?: string; opts?: ObservabilityOptions; - that?: Object; } const SPAN_NAMESPACE_PREFIX = 'CloudSpanner'; // TODO: discuss & standardize this prefix. @@ -167,12 +166,7 @@ export function startTrace( // If at all the invoked function throws an exception, // record the exception and then end this span. try { - if (config.that) { - const fn = cb.bind(config.that); - return fn(span); - } else { - return cb(span); - } + return cb(span); } catch (e) { setSpanErrorAndException(span, e as Error); span.end(); diff --git a/src/transaction.ts b/src/transaction.ts index 3d0778fed..5a119f5d0 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -466,9 +466,6 @@ export class Snapshot extends EventEmitter { this._update(resp); } - // begin simply issues an RPC to signal a state change to the backend, - // and doesn't invoke any other subsequent code hence it is paramount - // to invoke span.end() before returning results to the callback. span.end(); callback!(err, resp); } @@ -723,15 +720,21 @@ export class Snapshot extends EventEmitter { transaction.id = this.id; } - if (resumeToken) { + attempt++; + + if (!resumeToken) { + if (attempt === 1) { + span.addEvent('Starting stream'); + } else { + span.addEvent('Re-attempting start stream', {attempt: attempt}); + } + } else { span.addEvent('Resuming stream', { resume_token: resumeToken!.toString(), attempt: attempt, }); } - attempt++; - return this.requestStream({ client: 'SpannerClient', method: 'streamingRead', @@ -756,13 +759,15 @@ export class Snapshot extends EventEmitter { this._update(response.metadata!.transaction); } }) - .on('error', async err => { + .on('error', err => { setSpanError(span, err); const wasAborted = isErrorAborted(err); if (!this.id && this._useInRunner && !wasAborted) { - // It is paramount for us to await this invocation to - // .begin() to complete before we invoke span.end(); - await this.begin(); + // TODO: re-examine if this.begin() should still exist and if + // an await is needed: with await the generated begin span + // will look wonky and out of order. Please ses + // https://github.com/googleapis/nodejs-spanner/issues/2170 + this.begin(); } else { if (wasAborted) { span.addEvent('Stream broken. Not safe to retry', { @@ -1297,8 +1302,10 @@ export class Snapshot extends EventEmitter { return startTrace('Snapshot.runStream', traceConfig, span => { let attempt = 0; const makeRequest = (resumeToken?: ResumeToken): Readable => { + attempt++; + if (!resumeToken) { - if (attempt === 0) { + if (attempt === 1) { span.addEvent('Starting stream'); } else { span.addEvent('Re-attempting start stream', {attempt: attempt}); @@ -1322,8 +1329,6 @@ export class Snapshot extends EventEmitter { } } - attempt++; - return this.requestStream({ client: 'SpannerClient', method: 'executeStreamingSql', @@ -1348,12 +1353,15 @@ export class Snapshot extends EventEmitter { this._update(response.metadata!.transaction); } }) - .on('error', async err => { + .on('error', err => { setSpanError(span, err as Error); const wasAborted = isErrorAborted(err); if (!this.id && this._useInRunner && !wasAborted) { span.addEvent('Stream broken. Safe to retry'); - await this.begin(); + // TODO: Examine the consequence of not awaiting this method, + // as the span might appear out of order. + // Please see https://github.com/googleapis/nodejs-spanner/issues/2170 + this.begin(); } else { if (wasAborted) { span.addEvent('Stream broken. Not safe to retry', { @@ -1378,6 +1386,7 @@ export class Snapshot extends EventEmitter { span.end(); }); } + return resultStream; }); } From e7e84e36f407fb87aa6f129a7e5d83a64e66365a Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 22 Oct 2024 00:40:48 -0700 Subject: [PATCH 07/18] Handle moving 2nd getSnapshot() retry to itself --- observability-test/database.ts | 5 +++-- observability-test/spanner.ts | 7 +++---- src/database.ts | 18 +++++------------- 3 files changed, 11 insertions(+), 19 deletions(-) diff --git a/observability-test/database.ts b/observability-test/database.ts index eb9309fdc..ddf94de95 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -605,7 +605,7 @@ describe('Database', () => { // pool, so that the pool can remove it from its inventory. const releaseStub = sandbox.stub(fakePool, 'release'); - database.getSnapshot((err, snapshot) => { + database.getSnapshot(async (err, snapshot) => { assert.ifError(err); assert.strictEqual(snapshot, fakeSnapshot2); // The first session that error should already have been released back @@ -616,8 +616,9 @@ describe('Database', () => { snapshot.emit('end'); assert.strictEqual(releaseStub.callCount, 2); + await provider.forceFlush(); + await traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); - assert.strictEqual(spans.length, 2, 'Exactly 2 spans expected'); withAllSpansHaveDBName(spans); const actualSpanNames: string[] = []; diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index 96fa280a9..f3487f953 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -283,7 +283,6 @@ describe('EndToEnd', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); withAllSpansHaveDBName(spans); - console.log(`flushed spans: ${spans.toString()}`); const actualEventNames: string[] = []; const actualSpanNames: string[] = []; @@ -442,10 +441,10 @@ describe('EndToEnd', () => { database.runTransaction(async (err, transaction) => { assert.ifError(err); - const [rows] = await transaction!.run('SELECT 1'); + await transaction!.run('SELECT 1'); await transaction!.commit(); + await traceExporter.forceFlush(); - traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); withAllSpansHaveDBName(spans); @@ -872,7 +871,7 @@ describe('ObservabilityOptions injection and propagation', async () => { tx!.end(); await tracerProvider.forceFlush(); - traceExporter.forceFlush(); + await traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); withAllSpansHaveDBName(spans); diff --git a/src/database.ts b/src/database.ts index be50f5c5c..a7fd777e5 100644 --- a/src/database.ts +++ b/src/database.ts @@ -2105,19 +2105,11 @@ class Database extends common.GrpcServiceObject { }); session!.lastError = err; this.pool_.release(session!); - this.getSnapshot(options, (err, snapshot) => { - if (err) { - setSpanError(span, err); - span.end(); - } else { - snapshot!.once('end', () => span.end()); - snapshot!.once('error', err => { - setSpanError(span, err); - span.end(); - }); - } - callback!(err, snapshot); - }); + this.getSnapshot(options, callback!); + // Explicitly requested in code review that this span.end() be + // moved out of this.getSnapshot, and that there will a later refactor, + // similar to https://github.com/googleapis/nodejs-spanner/issues/2159 + span.end(); } else { span.addEvent('Using Session', {'session.id': session?.id}); this.pool_.release(session!); From d1ec79b957dbf8ab3d10fccc9dfb093afe4efa4d Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 22 Oct 2024 01:14:02 -0700 Subject: [PATCH 08/18] Remove dataStream.once span nesting --- observability-test/database.ts | 25 ++++++++++++++----------- src/database.ts | 9 +++------ 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/observability-test/database.ts b/observability-test/database.ts index ddf94de95..0157f0995 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -1878,11 +1878,14 @@ describe('Database', () => { .on('error', err => { assert.fail(err); }) - .on('end', () => { + .on('end', async () => { assert.strictEqual(endStub.callCount, 1); assert.strictEqual(endStub2.callCount, 1); assert.strictEqual(rows, 1); + await provider.forceFlush(); + await traceExporter.forceFlush(); + const spans = traceExporter.getFinishedSpans(); assert.strictEqual(spans.length, 2, 'Exactly 1 span expected'); withAllSpansHaveDBName(spans); @@ -1907,35 +1910,35 @@ describe('Database', () => { ); // Ensure that the span actually produced an error that was recorded. - const secondSpan = spans[1]; - assert.strictEqual( + const lastSpan = spans[0]; + assert.deepStrictEqual( SpanStatusCode.ERROR, - secondSpan.status.code, + lastSpan.status.code, 'Expected an ERROR span status' ); - assert.strictEqual( + assert.deepStrictEqual( 'Session not found', - secondSpan.status.message, + lastSpan.status.message, 'Mismatched span status message' ); // Ensure that the final span that got retries did not error. - const firstSpan = spans[0]; - assert.strictEqual( + const firstSpan = spans[1]; + assert.deepStrictEqual( SpanStatusCode.UNSET, firstSpan.status.code, - 'Unexpected an span status code' + 'Unexpected span status code' ); - assert.strictEqual( + assert.deepStrictEqual( undefined, firstSpan.status.message, 'Unexpected span status message' ); const expectedEventNames = [ - 'Using Session', 'Using Session', 'No session available', + 'Using Session', ]; assert.deepStrictEqual( actualEventNames, diff --git a/src/database.ts b/src/database.ts index a7fd777e5..7f7419487 100644 --- a/src/database.ts +++ b/src/database.ts @@ -856,7 +856,6 @@ class Database extends common.GrpcServiceObject { callback!(err as ServiceError, null, undefined); return; } - const transaction = this.batchTransaction( {session: session!}, options @@ -3088,11 +3087,9 @@ class Database extends common.GrpcServiceObject { // Create a new data stream and add it to the end user stream. dataStream = this.runStream(query, options); dataStream.pipe(proxyStream); - dataStream.on('end', () => span.end()); - dataStream.on('error', err => { - setSpanError(span, err); - span.end(); - }); + // Explicitly invoking span.end() here, + // instead of inside dataStream.on('error'). + span.end(); } else { proxyStream.destroy(err); snapshot.end(); From abfc3736217ddf5d1fcb82970469a4775730863c Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 22 Oct 2024 02:15:46 -0700 Subject: [PATCH 09/18] End spans even on dataStream retry --- src/database.ts | 6 +----- src/transaction.ts | 1 - 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/src/database.ts b/src/database.ts index 7f7419487..e5226b47c 100644 --- a/src/database.ts +++ b/src/database.ts @@ -3563,12 +3563,8 @@ class Database extends common.GrpcServiceObject { mutationGroups, options ); - dataStream.once('end', () => span.end()); - dataStream.once('error', err => { - setSpanError(span, err!); - span.end(); - }); dataStream.pipe(proxyStream); + span.end(); } else { span.end(); proxyStream.destroy(err); diff --git a/src/transaction.ts b/src/transaction.ts index 5a119f5d0..9ac07f81a 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -465,7 +465,6 @@ export class Snapshot extends EventEmitter { } else { this._update(resp); } - span.end(); callback!(err, resp); } From 13cc5092ff9e2a4c5e251c01726ee4e657ec9972 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 22 Oct 2024 03:23:37 -0700 Subject: [PATCH 10/18] De-flake EndToEnd Database.runStream test with by ignoring grpc.CANCELLED --- observability-test/spanner.ts | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index f3487f953..4049ea7a1 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -170,6 +170,7 @@ describe('EndToEnd', () => { afterEach(async () => { await tracerProvider.forceFlush(); + await tracerProvider.shutdown(); traceExporter.reset(); spannerMock.resetRequests(); spanner.close(); @@ -323,7 +324,16 @@ describe('EndToEnd', () => { database .runStream('SELECT 1') .on('data', row => {}) - .on('error', assert.ifError) + .on('error', err => { + // De-flake by ignoring grpc.status.CANCELLED as we've + // seen on the Github test runners, due to timing. + const grpcErr = err as grpc.ServiceError; + if (!grpcErr) { + assert.ifError(err); + } else if (grpcErr.code != grpc.status.CANCELLED) { + assert.ifError(err); + } + }) .on('end', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); From d86986b348a744db2dee39e0c770aa5d0fe58e40 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Tue, 22 Oct 2024 22:11:15 -0700 Subject: [PATCH 11/18] test: remove async vestige --- observability-test/database.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/observability-test/database.ts b/observability-test/database.ts index 0157f0995..d4f3aa9ef 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -983,7 +983,7 @@ describe('Database', () => { }); it('with no errors', done => { - database.getTransaction(async (err, transaction) => { + database.getTransaction((err, transaction) => { assert.ifError(err); assert.strictEqual(transaction, fakeTransaction); transaction!.end(); From 76bd38d5bdbf3dca5a9277103094486bd6c4cdf1 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 23 Oct 2024 02:39:28 -0700 Subject: [PATCH 12/18] Attempt to isolate and de-flake --- observability-test/spanner.ts | 14 ++++++++++++-- src/database.ts | 3 +-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index 4049ea7a1..45756cad6 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -172,6 +172,7 @@ describe('EndToEnd', () => { await tracerProvider.forceFlush(); await tracerProvider.shutdown(); traceExporter.reset(); + database.close(); spannerMock.resetRequests(); spanner.close(); server.tryShutdown(() => {}); @@ -324,8 +325,12 @@ describe('EndToEnd', () => { database .runStream('SELECT 1') .on('data', row => {}) - .on('error', err => { - // De-flake by ignoring grpc.status.CANCELLED as we've + .once('error', err => { + // TODO: Examine why this error condition is triggered + // after the test has finished running. + console.log(`\x1b[31mRogue error: ${err}\x1b[00m`); + /* + // De-flake by ignoring grpc.status.CANCELLED as we've // seen on the Github test runners, due to timing. const grpcErr = err as grpc.ServiceError; if (!grpcErr) { @@ -333,6 +338,7 @@ describe('EndToEnd', () => { } else if (grpcErr.code != grpc.status.CANCELLED) { assert.ifError(err); } + */ }) .on('end', () => { traceExporter.forceFlush(); @@ -816,6 +822,10 @@ describe('ObservabilityOptions injection and propagation', async () => { db.formattedName_ ); + after(() => { + db.close(); + }); + it('run', done => { database.getTransaction((err, tx) => { assert.ifError(err); diff --git a/src/database.ts b/src/database.ts index e5226b47c..ecfbee71b 100644 --- a/src/database.ts +++ b/src/database.ts @@ -3274,8 +3274,7 @@ class Database extends common.GrpcServiceObject { span.end(); }); - transaction!.once('end', err => { - setSpanError(span, err!); + transaction!.once('end', () => { span.end(); }); From bea444c3ba4dae5e040be4a8f01106e57a641854 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 23 Oct 2024 02:56:52 -0700 Subject: [PATCH 13/18] More test failure debugs --- observability-test/spanner.ts | 5 ----- package.json | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index 45756cad6..2e9b9897e 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -172,7 +172,6 @@ describe('EndToEnd', () => { await tracerProvider.forceFlush(); await tracerProvider.shutdown(); traceExporter.reset(); - database.close(); spannerMock.resetRequests(); spanner.close(); server.tryShutdown(() => {}); @@ -822,10 +821,6 @@ describe('ObservabilityOptions injection and propagation', async () => { db.formattedName_ ); - after(() => { - db.close(); - }); - it('run', done => { database.getTransaction((err, tx) => { assert.ifError(err); diff --git a/package.json b/package.json index 27ed2f743..6834b2951 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,7 @@ "system-test": "mocha build/system-test --timeout 1600000", "observability-test": "mocha build/observability-test --timeout 1600000", "cleanup": "mocha scripts/cleanup.js --timeout 30000", - "test": "mocha build/test build/test/common build/observability-test", + "test": "mocha build/test build/test/common build/observability-test --trace-warnings --timeout 1600000", "ycsb": "node ./benchmark/ycsb.js run -P ./benchmark/workloada -p table=usertable -p cloudspanner.instance=ycsb-instance -p operationcount=100 -p cloudspanner.database=ycsb", "fix": "gts fix", "clean": "gts clean", From 7128de7e48c9dffc5e3690af199a698399cff960 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 23 Oct 2024 04:59:25 -0700 Subject: [PATCH 14/18] hotfix to allow tests to run without blocking testers --- observability-test/spanner.ts | 3 ++- package.json | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index 2e9b9897e..dae6c9dc8 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -173,7 +173,8 @@ describe('EndToEnd', () => { await tracerProvider.shutdown(); traceExporter.reset(); spannerMock.resetRequests(); - spanner.close(); + // Hot-fix, do not close spanner. + // spanner.close(); server.tryShutdown(() => {}); }); diff --git a/package.json b/package.json index 6834b2951..27ed2f743 100644 --- a/package.json +++ b/package.json @@ -35,7 +35,7 @@ "system-test": "mocha build/system-test --timeout 1600000", "observability-test": "mocha build/observability-test --timeout 1600000", "cleanup": "mocha scripts/cleanup.js --timeout 30000", - "test": "mocha build/test build/test/common build/observability-test --trace-warnings --timeout 1600000", + "test": "mocha build/test build/test/common build/observability-test", "ycsb": "node ./benchmark/ycsb.js run -P ./benchmark/workloada -p table=usertable -p cloudspanner.instance=ycsb-instance -p operationcount=100 -p cloudspanner.database=ycsb", "fix": "gts fix", "clean": "gts clean", From d10a888980ac9f2443574c0ba55f33f9cbcef6b3 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 23 Oct 2024 06:56:37 -0700 Subject: [PATCH 15/18] Address feedback from code review --- observability-test/database.ts | 4 +- observability-test/spanner.ts | 1 + src/database.ts | 93 ++++++++++------------------------ src/transaction.ts | 9 +--- 4 files changed, 33 insertions(+), 74 deletions(-) diff --git a/observability-test/database.ts b/observability-test/database.ts index d4f3aa9ef..5ce075aa1 100644 --- a/observability-test/database.ts +++ b/observability-test/database.ts @@ -641,7 +641,7 @@ describe('Database', () => { ); // Ensure that the first span actually produced an error that was recorded. - const parentSpan = spans[1]; + const parentSpan = spans[0]; assert.strictEqual( SpanStatusCode.ERROR, parentSpan.status.code, @@ -654,7 +654,7 @@ describe('Database', () => { ); // Ensure that the second span is a child of the first span. - const secondRetrySpan = spans[0]; + const secondRetrySpan = spans[1]; assert.ok( parentSpan.spanContext().traceId, 'Expected that the initial parent span has a defined traceId' diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index dae6c9dc8..6a3c8cef5 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -459,6 +459,7 @@ describe('EndToEnd', () => { assert.ifError(err); await transaction!.run('SELECT 1'); await transaction!.commit(); + await transaction!.end(); await traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); diff --git a/src/database.ts b/src/database.ts index ecfbee71b..babc278ba 100644 --- a/src/database.ts +++ b/src/database.ts @@ -1131,9 +1131,7 @@ class Database extends common.GrpcServiceObject { setSpanErrorAndException(span, e as Error); this.emit('error', e); } finally { - if (span.isRecording()) { - span.end(); - } + span.end(); } }); } @@ -2104,11 +2102,8 @@ class Database extends common.GrpcServiceObject { }); session!.lastError = err; this.pool_.release(session!); - this.getSnapshot(options, callback!); - // Explicitly requested in code review that this span.end() be - // moved out of this.getSnapshot, and that there will a later refactor, - // similar to https://github.com/googleapis/nodejs-spanner/issues/2159 span.end(); + this.getSnapshot(options, callback!); } else { span.addEvent('Using Session', {'session.id': session?.id}); this.pool_.release(session!); @@ -3084,12 +3079,10 @@ class Database extends common.GrpcServiceObject { dataStream.removeListener('end', endListener); dataStream.end(); snapshot.end(); + span.end(); // Create a new data stream and add it to the end user stream. dataStream = this.runStream(query, options); dataStream.pipe(proxyStream); - // Explicitly invoking span.end() here, - // instead of inside dataStream.on('error'). - span.end(); } else { proxyStream.destroy(err); snapshot.end(); @@ -3219,26 +3212,6 @@ class Database extends common.GrpcServiceObject { ? (optionsOrRunFn as RunTransactionOptions) : {}; - const retry = (span: Span) => { - this.runTransaction(options, (err, txn) => { - if (err) { - setSpanError(span, err); - span.end(); - runFn!(err, null); - return; - } - - txn!.once('end', () => { - span.end(); - }); - txn!.once('error', err => { - setSpanError(span, err!); - span.end(); - }); - runFn!(null, txn!); - }); - }; - startTrace('Database.runTransaction', this._traceConfig, span => { this.pool_.getSession((err, session?, transaction?) => { if (err) { @@ -3249,7 +3222,8 @@ class Database extends common.GrpcServiceObject { span.addEvent('No session available', { 'session.id': session?.id, }); - retry(span); + span.end(); + this.runTransaction(options, runFn!); return; } @@ -3267,19 +3241,9 @@ class Database extends common.GrpcServiceObject { transaction!.excludeTxnFromChangeStreams(); } - // Our span should only be ended if the - // transaction either errored or was ended. - transaction!.once('error', err => { - setSpanError(span, err!); - span.end(); - }); - - transaction!.once('end', () => { - span.end(); - }); - const release = () => { this.pool_.release(session!); + span.end(); }; const runner = new TransactionRunner( @@ -3289,21 +3253,26 @@ class Database extends common.GrpcServiceObject { options ); - runner.run().then(release, err => { - setSpanError(span, err); + runner + .run() + .then(release, err => { + setSpanError(span, err); - if (isSessionNotFoundError(err)) { - span.addEvent('No session available', { - 'session.id': session?.id, - }); - release(); - retry(span); - } else { - setImmediate(runFn!, err); - release(); - span.end(); - } - }); + if (isSessionNotFoundError(err)) { + span.addEvent('No session available', { + 'session.id': session?.id, + }); + release(); + this.runTransaction(options, runFn!); + } else { + release(); + setImmediate(runFn!, err); + } + }) + .catch(e => { + setSpanErrorAndException(span, e as Error); + throw e; + }); }); }); } @@ -3557,13 +3526,13 @@ class Database extends common.GrpcServiceObject { // Remove the current data stream from the end user stream. dataStream.unpipe(proxyStream); dataStream.end(); + span.end(); // Create a new stream and add it to the end user stream. dataStream = this.batchWriteAtLeastOnce( mutationGroups, options ); dataStream.pipe(proxyStream); - span.end(); } else { span.end(); proxyStream.destroy(err); @@ -3662,14 +3631,8 @@ class Database extends common.GrpcServiceObject { span.addEvent('No session available', { 'session.id': session?.id, }); - // Retry this method. - this.writeAtLeastOnce(mutations, options, (err, resp) => { - if (err) { - setSpanError(span, err); - } - span.end(); - cb!(err, resp); - }); + span.end(); + this.writeAtLeastOnce(mutations, options, cb!); return; } if (err) { diff --git a/src/transaction.ts b/src/transaction.ts index 9ac07f81a..fa1f10814 100644 --- a/src/transaction.ts +++ b/src/transaction.ts @@ -762,10 +762,7 @@ export class Snapshot extends EventEmitter { setSpanError(span, err); const wasAborted = isErrorAborted(err); if (!this.id && this._useInRunner && !wasAborted) { - // TODO: re-examine if this.begin() should still exist and if - // an await is needed: with await the generated begin span - // will look wonky and out of order. Please ses - // https://github.com/googleapis/nodejs-spanner/issues/2170 + // TODO: resolve https://github.com/googleapis/nodejs-spanner/issues/2170 this.begin(); } else { if (wasAborted) { @@ -1357,9 +1354,7 @@ export class Snapshot extends EventEmitter { const wasAborted = isErrorAborted(err); if (!this.id && this._useInRunner && !wasAborted) { span.addEvent('Stream broken. Safe to retry'); - // TODO: Examine the consequence of not awaiting this method, - // as the span might appear out of order. - // Please see https://github.com/googleapis/nodejs-spanner/issues/2170 + // TODO: resolve https://github.com/googleapis/nodejs-spanner/issues/2170 this.begin(); } else { if (wasAborted) { From 9b0a4588653756d2abcbc6712dc920da02e8a589 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 23 Oct 2024 23:38:01 -0700 Subject: [PATCH 16/18] EndToEnd test setup reduced to avoid flakes --- observability-test/spanner.ts | 81 +++++++++++++---------------------- 1 file changed, 30 insertions(+), 51 deletions(-) diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index 6a3c8cef5..cc5e8420e 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -123,42 +123,45 @@ async function setup( }); } -describe('EndToEnd', () => { - let server: grpc.Server; - let spanner: Spanner; - let database: Database; - let spannerMock: mock.MockSpanner; - let traceExporter: typeof InMemorySpanExporter; - let tracerProvider: typeof NodeTracerProvider; - +describe('EndToEnd', async () => { const contextManager = new AsyncHooksContextManager(); setGlobalContextManager(contextManager); - afterEach(() => { disableContextAndManager(contextManager); }); - beforeEach(async () => { - traceExporter = new InMemorySpanExporter(); - const sampler = new AlwaysOnSampler(); - tracerProvider = new NodeTracerProvider({ - sampler: sampler, - exporter: traceExporter, - }); - tracerProvider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); + const traceExporter = new InMemorySpanExporter(); + const sampler = new AlwaysOnSampler(); + const tracerProvider = new NodeTracerProvider({ + sampler: sampler, + exporter: traceExporter, + }); + tracerProvider.addSpanProcessor(new SimpleSpanProcessor(traceExporter)); - const setupResult = await setup({ - tracerProvider: tracerProvider, - enableExtendedTracing: false, - }); + const setupResult = await setup({ + tracerProvider: tracerProvider, + enableExtendedTracing: false, + }); - spanner = setupResult.spanner; - server = setupResult.server; - spannerMock = setupResult.spannerMock; + const server = setupResult.server; + const spannerMock = setupResult.spannerMock; + const spanner = setupResult.spanner; + const instance = spanner.instance('instance'); - const instance = spanner.instance('instance'); - database = instance.database('database'); + after(async () => { + spanner.close(); + await server.tryShutdown(() => {}); + }); + afterEach(async () => { + await tracerProvider.forceFlush(); + await traceExporter.reset(); + spannerMock.resetRequests(); + }); + + const database = instance.database('database'); + + beforeEach(async () => { // To deflake expectations of session creation, let's // issue out a warm-up request request that'll ensure // that the SessionPool is created deterministically. @@ -168,16 +171,6 @@ describe('EndToEnd', () => { traceExporter.reset(); }); - afterEach(async () => { - await tracerProvider.forceFlush(); - await tracerProvider.shutdown(); - traceExporter.reset(); - spannerMock.resetRequests(); - // Hot-fix, do not close spanner. - // spanner.close(); - server.tryShutdown(() => {}); - }); - describe('Database', () => { it('getSessions', async () => { const [rows] = await database.getSessions(); @@ -325,21 +318,7 @@ describe('EndToEnd', () => { database .runStream('SELECT 1') .on('data', row => {}) - .once('error', err => { - // TODO: Examine why this error condition is triggered - // after the test has finished running. - console.log(`\x1b[31mRogue error: ${err}\x1b[00m`); - /* - // De-flake by ignoring grpc.status.CANCELLED as we've - // seen on the Github test runners, due to timing. - const grpcErr = err as grpc.ServiceError; - if (!grpcErr) { - assert.ifError(err); - } else if (grpcErr.code != grpc.status.CANCELLED) { - assert.ifError(err); - } - */ - }) + .once('error', assert.ifError) .on('end', () => { traceExporter.forceFlush(); const spans = traceExporter.getFinishedSpans(); From 0458064cdf197f839a5b077a1e0f9b6507ea1c82 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Wed, 23 Oct 2024 23:57:35 -0700 Subject: [PATCH 17/18] Revert Database.runTransaction runner ordering --- src/database.ts | 32 +++++++++++++------------------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/src/database.ts b/src/database.ts index babc278ba..79c5ceaa7 100644 --- a/src/database.ts +++ b/src/database.ts @@ -3253,26 +3253,20 @@ class Database extends common.GrpcServiceObject { options ); - runner - .run() - .then(release, err => { - setSpanError(span, err); + runner.run().then(release, err => { + setSpanError(span, err!); - if (isSessionNotFoundError(err)) { - span.addEvent('No session available', { - 'session.id': session?.id, - }); - release(); - this.runTransaction(options, runFn!); - } else { - release(); - setImmediate(runFn!, err); - } - }) - .catch(e => { - setSpanErrorAndException(span, e as Error); - throw e; - }); + if (isSessionNotFoundError(err)) { + span.addEvent('No session available', { + 'session.id': session?.id, + }); + release(); + this.runTransaction(options, runFn!); + } else { + setImmediate(runFn!, err); + release(); + } + }); }); }); } From f0b87458701108f8235690bba0f630be9b9563e4 Mon Sep 17 00:00:00 2001 From: Emmanuel T Odeke Date: Thu, 24 Oct 2024 00:34:59 -0700 Subject: [PATCH 18/18] Uncommitted change for Database.getSnapshot: end span before sending back response --- observability-test/spanner.ts | 4 ++-- src/database.ts | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/observability-test/spanner.ts b/observability-test/spanner.ts index cc5e8420e..2e90b3044 100644 --- a/observability-test/spanner.ts +++ b/observability-test/spanner.ts @@ -235,9 +235,9 @@ describe('EndToEnd', async () => { const expectedSpanNames = [ 'CloudSpanner.Snapshot.begin', + 'CloudSpanner.Database.getSnapshot', 'CloudSpanner.Snapshot.runStream', 'CloudSpanner.Snapshot.run', - 'CloudSpanner.Database.getSnapshot', ]; assert.deepStrictEqual( actualSpanNames, @@ -248,10 +248,10 @@ describe('EndToEnd', async () => { const expectedEventNames = [ 'Begin Transaction', 'Transaction Creation Done', - 'Starting stream', 'Acquiring session', 'Cache hit: has usable session', 'Acquired session', + 'Starting stream', ]; assert.deepStrictEqual( actualEventNames, diff --git a/src/database.ts b/src/database.ts index 79c5ceaa7..3d8321355 100644 --- a/src/database.ts +++ b/src/database.ts @@ -2114,6 +2114,7 @@ class Database extends common.GrpcServiceObject { } this._releaseOnEnd(session!, snapshot, span); + span.end(); callback!(err, snapshot); }); });