Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: (observability) Add support for OpenTelemetry traces and allow observability options to be passed. #2131

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion observability-test/batch-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@
batchTransaction = new BatchTransaction(SESSION as {} as Session);
batchTransaction.session = SESSION as {} as Session;
batchTransaction.id = ID;
batchTransaction.observabilityOptions = {tracerProvider: provider};
batchTransaction._observabilityOptions = {tracerProvider: provider};
REQUEST.callsFake((_, callback) => callback(null, RESPONSE));
});

Expand All @@ -153,11 +153,11 @@
};

it('createQueryPartitions', done => {
const REQUEST = sandbox.stub();

Check warning on line 156 in observability-test/batch-transaction.ts

View workflow job for this annotation

GitHub Actions / lint

'REQUEST' is assigned a value but never used

const res = batchTransaction.createQueryPartitions(

Check warning on line 158 in observability-test/batch-transaction.ts

View workflow job for this annotation

GitHub Actions / lint

'res' is assigned a value but never used
QUERY,
(err, part, resp) => {

Check warning on line 160 in observability-test/batch-transaction.ts

View workflow job for this annotation

GitHub Actions / lint

'part' is defined but never used

Check warning on line 160 in observability-test/batch-transaction.ts

View workflow job for this annotation

GitHub Actions / lint

'resp' is defined but never used
assert.ifError(err);
traceExporter.forceFlush();
const spans = traceExporter.getFinishedSpans();
Expand Down Expand Up @@ -222,9 +222,9 @@
const response = {};
REQUEST.callsFake((_, callback) => callback(null, response));

const res = batchTransaction.createReadPartitions(

Check warning on line 225 in observability-test/batch-transaction.ts

View workflow job for this annotation

GitHub Actions / lint

'res' is assigned a value but never used
QUERY,
(err, part, resp) => {

Check warning on line 227 in observability-test/batch-transaction.ts

View workflow job for this annotation

GitHub Actions / lint

'part' is defined but never used

Check warning on line 227 in observability-test/batch-transaction.ts

View workflow job for this annotation

GitHub Actions / lint

'resp' is defined but never used
assert.ifError(err);
traceExporter.forceFlush();
const spans = traceExporter.getFinishedSpans();
Expand Down
2 changes: 1 addition & 1 deletion observability-test/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@

class FakeSession {
calledWith_: IArguments;
formattedName_: any;

Check warning on line 87 in observability-test/database.ts

View workflow job for this annotation

GitHub Actions / lint

Unexpected any. Specify a different type
constructor() {
this.calledWith_ = arguments;
}
Expand Down Expand Up @@ -241,7 +241,7 @@
database = new Database(INSTANCE, NAME, POOL_OPTIONS);
database.parent = INSTANCE;
database.databaseRole = 'parent_role';
database.observabilityConfig = {
database._observabilityOptions = {
tracerProvider: provider,
enableExtendedTracing: false,
};
Expand Down Expand Up @@ -382,7 +382,7 @@

let beginSnapshotStub: sinon.SinonStub;
let getSessionStub: sinon.SinonStub;
let snapshotStub: sinon.SinonStub;

Check warning on line 385 in observability-test/database.ts

View workflow job for this annotation

GitHub Actions / lint

'snapshotStub' is assigned a value but never used

beforeEach(() => {
fakePool = database.pool_;
Expand All @@ -409,7 +409,7 @@

getSessionStub.callsFake(callback => callback(fakeError, null));

database.getSnapshot((err, snapshot) => {

Check warning on line 412 in observability-test/database.ts

View workflow job for this annotation

GitHub Actions / lint

'snapshot' is defined but never used
assert.strictEqual(err, fakeError);
traceExporter.forceFlush();
const spans = traceExporter.getFinishedSpans();
Expand Down
171 changes: 157 additions & 14 deletions observability-test/spanner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import * as assert from 'assert';
import {grpc} from 'google-gax';
import {google} from '../protos/protos';
import {Database, Spanner} from '../src';
import {Database, Instance, Spanner} from '../src';
import {MutationSet} from '../src/transaction';
import protobuf = google.spanner.v1;
import * as mock from '../test/mockserver/mockspanner';
Expand All @@ -35,6 +35,8 @@ const {
AsyncHooksContextManager,
} = require('@opentelemetry/context-async-hooks');

const {ObservabilityOptions} = require('../src/instrument');

/** A simple result set for SELECT 1. */
function createSelect1ResultSet(): protobuf.ResultSet {
const fields = [
Expand All @@ -60,7 +62,9 @@ interface setupResults {
spannerMock: mock.MockSpanner;
}

async function setup(): Promise<setupResults> {
async function setup(
observabilityOptions?: typeof ObservabilityOptions
): Promise<setupResults> {
const server = new grpc.Server();

const spannerMock = mock.createMockSpanner(server);
Expand Down Expand Up @@ -97,6 +101,7 @@ async function setup(): Promise<setupResults> {
servicePath: 'localhost',
port,
sslCreds: grpc.credentials.createInsecure(),
observabilityOptions: observabilityOptions,
});

return Promise.resolve({
Expand All @@ -122,7 +127,16 @@ describe('EndToEnd', () => {
});

beforeEach(async () => {
const setupResult = await setup();
traceExporter = new InMemorySpanExporter();
const sampler = new AlwaysOnSampler();
const provider = new NodeTracerProvider({
sampler: sampler,
exporter: traceExporter,
});
const setupResult = await setup({
tracerProvider: provider,
enableExtendedTracing: false,
});
spanner = setupResult.spanner;
server = setupResult.server;
spannerMock = setupResult.spannerMock;
Expand All @@ -138,21 +152,10 @@ describe('EndToEnd', () => {
mock.StatementResult.updateCount(1)
);

traceExporter = new InMemorySpanExporter();
const sampler = new AlwaysOnSampler();

const provider = new NodeTracerProvider({
sampler: sampler,
exporter: traceExporter,
});
provider.addSpanProcessor(new SimpleSpanProcessor(traceExporter));

const instance = spanner.instance('instance');
database = instance.database('database');
database.observabilityConfig = {
tracerProvider: provider,
enableExtendedTracing: false,
};
});

afterEach(() => {
Expand Down Expand Up @@ -440,3 +443,143 @@ describe('EndToEnd', () => {
});
});
});

describe('ObservabilityOptions injection and propagation', async () => {
const globalTraceExporter = new InMemorySpanExporter();
const globalTracerProvider = new NodeTracerProvider({
sampler: new AlwaysOnSampler(),
exporter: globalTraceExporter,
});
globalTracerProvider.addSpanProcessor(
new SimpleSpanProcessor(globalTraceExporter)
);
globalTracerProvider.register();

const injectedTraceExporter = new InMemorySpanExporter();
const injectedTracerProvider = new NodeTracerProvider({
sampler: new AlwaysOnSampler(),
exporter: injectedTraceExporter,
});
injectedTracerProvider.addSpanProcessor(
new SimpleSpanProcessor(injectedTraceExporter)
);

const observabilityOptions: typeof ObservabilityOptions = {
tracerProvider: injectedTracerProvider,
enableExtendedTracing: true,
};

const setupResult = await setup(observabilityOptions);
const spanner = setupResult.spanner;
const server = setupResult.server;
const spannerMock = setupResult.spannerMock;

after(async () => {
globalTraceExporter.reset();
injectedTraceExporter.reset();
await globalTracerProvider.shutdown();
await injectedTracerProvider.shutdown();
spannerMock.resetRequests();
spanner.close();
server.tryShutdown(() => {});
});

it('Passed into Spanner, Instance and Database', done => {
// Ensure that the same observability configuration is set on the Spanner client.
assert.deepStrictEqual(spanner._observabilityOptions, observabilityOptions);

// Acquire a handle to the Instance through spanner.instance.
const instanceByHandle = spanner.instance('instance');
assert.deepStrictEqual(
instanceByHandle._observabilityOptions,
observabilityOptions
);

// Create the Instance by means of a constructor directly.
const instanceByConstructor = new Instance(spanner, 'myInstance');
assert.deepStrictEqual(
instanceByConstructor._observabilityOptions,
observabilityOptions
);

// Acquire a handle to the Database through instance.database.
const databaseByHandle = instanceByHandle.database('database');
assert.deepStrictEqual(
databaseByHandle._observabilityOptions,
observabilityOptions
);

// Create the Database by means of a constructor directly.
const databaseByConstructor = new Database(
instanceByConstructor,
'myDatabase'
);
assert.deepStrictEqual(
databaseByConstructor._observabilityOptions,
observabilityOptions
);

done();
});

it('Propagates spans to the injected not global TracerProvider', done => {
const instance = spanner.instance('instance');
const database = instance.database('database');

database.run('SELECT 1', (err, rows) => {
assert.ifError(err);

injectedTraceExporter.forceFlush();
globalTraceExporter.forceFlush();
const spansFromInjected = injectedTraceExporter.getFinishedSpans();
const spansFromGlobal = globalTraceExporter.getFinishedSpans();

assert.strictEqual(
spansFromGlobal.length,
0,
'Expecting no spans from the global exporter'
);
assert.strictEqual(
spansFromInjected.length > 0,
true,
'Expecting spans from the injected exporter'
);

spansFromInjected.sort((spanA, spanB) => {
spanA.startTime < spanB.startTime;
});
const actualSpanNames: string[] = [];
const actualEventNames: string[] = [];
spansFromInjected.forEach(span => {
actualSpanNames.push(span.name);
span.events.forEach(event => {
actualEventNames.push(event.name);
});
});

const expectedSpanNames = [
'CloudSpanner.Database.runStream',
'CloudSpanner.Database.run',
];
assert.deepStrictEqual(
actualSpanNames,
expectedSpanNames,
`span names mismatch:\n\tGot: ${actualSpanNames}\n\tWant: ${expectedSpanNames}`
);

const expectedEventNames = [
'Acquiring session',
'Waiting for a session to become available',
'Acquired session',
'Using Session',
];
assert.deepStrictEqual(
actualEventNames,
expectedEventNames,
`Unexpected events:\n\tGot: ${actualEventNames}\n\tWant: ${expectedEventNames}`
);

done();
});
});
});
2 changes: 1 addition & 1 deletion observability-test/table.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ describe('Table', () => {
extend(Table, TableCached);
table = new Table(DATABASE, NAME);
transaction = new FakeTransaction();
table.observabilityOptions = {tracerProvider: provider};
table._observabilityOptions = {tracerProvider: provider};
});

afterEach(() => {
Expand Down
6 changes: 3 additions & 3 deletions src/batch-transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class BatchTransaction extends Snapshot {

const traceConfig: traceConfig = {
sql: query,
opts: this.observabilityOptions,
opts: this._observabilityOptions,
};
return startTrace(
'BatchTransaction.createQueryPartitions',
Expand Down Expand Up @@ -182,7 +182,7 @@ class BatchTransaction extends Snapshot {
*/
createPartitions_(config, callback) {
const traceConfig: traceConfig = {
opts: this.observabilityOptions,
opts: this._observabilityOptions,
};

return startTrace(
Expand Down Expand Up @@ -259,7 +259,7 @@ class BatchTransaction extends Snapshot {
*/
createReadPartitions(options, callback) {
const traceConfig: traceConfig = {
opts: this.observabilityOptions,
opts: this._observabilityOptions,
};

return startTrace(
Expand Down
26 changes: 14 additions & 12 deletions src/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ class Database extends common.GrpcServiceObject {
databaseDialect?: EnumKey<
typeof databaseAdmin.spanner.admin.database.v1.DatabaseDialect
> | null;
observabilityConfig: ObservabilityOptions | undefined;
_observabilityOptions?: ObservabilityOptions;
constructor(
instance: Instance,
name: string,
Expand Down Expand Up @@ -467,7 +467,7 @@ class Database extends common.GrpcServiceObject {
Object.assign({}, queryOptions),
Database.getEnvironmentQueryOptions()
);
this.observabilityConfig = instance.observabilityConfig;
this._observabilityOptions = instance._observabilityOptions;
}
/**
* @typedef {array} SetDatabaseMetadataResponse
Expand Down Expand Up @@ -693,7 +693,7 @@ class Database extends common.GrpcServiceObject {

const sessions = (resp!.session || []).map(metadata => {
const session = this.session(metadata.name!);
session.observabilityConfig = this.observabilityConfig;
session._observabilityOptions = this._observabilityOptions;
session.metadata = metadata;
return session;
});
Expand Down Expand Up @@ -738,6 +738,7 @@ class Database extends common.GrpcServiceObject {
const id = identifier.transaction;
const transaction = new BatchTransaction(session, options);
transaction.id = id;
transaction._observabilityOptions = this._observabilityOptions;
transaction.readTimestamp = identifier.timestamp as PreciseDate;
return transaction;
}
Expand Down Expand Up @@ -827,7 +828,7 @@ class Database extends common.GrpcServiceObject {
? (optionsOrCallback as TimestampBounds)
: {};

const q = {opts: this.observabilityConfig};
const q = {opts: this._observabilityOptions};
return startTrace('Database.createBatchTransaction', q, span => {
this.pool_.getSession((err, session) => {
if (err) {
Expand Down Expand Up @@ -1085,6 +1086,7 @@ class Database extends common.GrpcServiceObject {
/CREATE TABLE `*([^\s`(]+)/
)![1];
const table = this.table(tableName!);
table._observabilityOptions = this._observabilityOptions;
callback!(null, table, operation!, resp!);
});
}
Expand Down Expand Up @@ -1873,7 +1875,7 @@ class Database extends common.GrpcServiceObject {
delete (gaxOpts as GetSessionsOptions).pageToken;
}

const q = {opts: this.observabilityConfig};
const q = {opts: this._observabilityOptions};
return startTrace('Database.getSessions', q, span => {
this.request<
google.spanner.v1.ISession,
Expand All @@ -1895,7 +1897,7 @@ class Database extends common.GrpcServiceObject {
sessionInstances = sessions.map(metadata => {
const session = self.session(metadata.name!);
session.metadata = metadata;
session.observabilityConfig = this.observabilityConfig;
session._observabilityOptions = this._observabilityOptions;
return session;
});
}
Expand Down Expand Up @@ -2056,7 +2058,7 @@ class Database extends common.GrpcServiceObject {
? (optionsOrCallback as TimestampBounds)
: {};

const q = {opts: this.observabilityConfig};
const q = {opts: this._observabilityOptions};
return startTrace('Database.getSnapshot', q, span => {
this.pool_.getSession((err, session) => {
if (err) {
Expand Down Expand Up @@ -2157,7 +2159,7 @@ class Database extends common.GrpcServiceObject {
? (optionsOrCallback as GetTransactionOptions)
: {};

const q = {opts: this.observabilityConfig};
const q = {opts: this._observabilityOptions};
return startTrace('Database.getTransaction', q, span => {
this.pool_.getSession((err, session, transaction) => {
if (options.requestOptions) {
Expand Down Expand Up @@ -2784,7 +2786,7 @@ class Database extends common.GrpcServiceObject {
? (optionsOrCallback as TimestampBounds)
: {};

const q = {sql: query, opts: this.observabilityConfig};
const q = {sql: query, opts: this._observabilityOptions};
return startTrace('Database.run', q, span => {
this.runStream(query, options)
.on('error', err => {
Expand Down Expand Up @@ -3005,7 +3007,7 @@ class Database extends common.GrpcServiceObject {
options?: TimestampBounds
): PartialResultStream {
const proxyStream: Transform = through.obj();
const q = {sql: query, opts: this.observabilityConfig};
const q = {sql: query, opts: this._observabilityOptions};
return startTrace('Database.runStream', q, span => {
this.pool_.getSession((err, session) => {
if (err) {
Expand Down Expand Up @@ -3183,7 +3185,7 @@ class Database extends common.GrpcServiceObject {
? (optionsOrRunFn as RunTransactionOptions)
: {};

const q = {opts: this.observabilityConfig};
const q = {opts: this._observabilityOptions};
startTrace('Database.runTransaction', q, span => {
this.pool_.getSession((err, session?, transaction?) => {
if (err) {
Expand Down Expand Up @@ -3576,7 +3578,7 @@ class Database extends common.GrpcServiceObject {
? (optionsOrCallback as CallOptions)
: {};

const q = {opts: this.observabilityConfig};
const q = {opts: this._observabilityOptions};
return startTrace('Database.writeAtLeastOnce', q, span => {
this.pool_.getSession((err, session?, transaction?) => {
if (err && isSessionNotFoundError(err as grpc.ServiceError)) {
Expand Down
Loading
Loading