diff --git a/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts index 7260e4fb2c..c5bbad195e 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts @@ -30,7 +30,9 @@ import { import * as pgTypes from 'pg'; import * as pgPoolTypes from 'pg-pool'; import { + PgClientConnect, PgClientExtended, + PgErrorCallback, NormalizedQueryConfig, PostgresCallback, PgPoolExtended, @@ -46,6 +48,7 @@ import { import { VERSION } from './version'; const PG_POOL_COMPONENT = 'pg-pool'; + export class PgInstrumentation extends InstrumentationBase { static readonly COMPONENT = 'pg'; @@ -67,11 +70,23 @@ export class PgInstrumentation extends InstrumentationBase { if (isWrapped(moduleExports.Client.prototype.query)) { this._unwrap(moduleExports.Client.prototype, 'query'); } + + if (isWrapped(moduleExports.Client.prototype.connect)) { + this._unwrap(moduleExports.Client.prototype, 'connect'); + } + this._wrap( moduleExports.Client.prototype, 'query', - this._getClientQueryPatch() as never + this._getClientQueryPatch() as any + ); + + this._wrap( + moduleExports.Client.prototype, + 'connect', + this._getClientConnectPatch() as any ); + return moduleExports; }, moduleExports => { @@ -93,7 +108,7 @@ export class PgInstrumentation extends InstrumentationBase { this._wrap( moduleExports.prototype, 'connect', - this._getPoolConnectPatch() as never + this._getPoolConnectPatch() as any ); return moduleExports; }, @@ -107,6 +122,49 @@ export class PgInstrumentation extends InstrumentationBase { return [modulePG, modulePGPool]; } + private _getClientConnectPatch() { + const plugin = this; + return (original: PgClientConnect) => { + return function connect( + this: pgTypes.Client, + callback?: PgErrorCallback + ) { + const span = plugin.tracer.startSpan( + `${PgInstrumentation.COMPONENT}.connect`, + { + kind: SpanKind.CLIENT, + attributes: { + [SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL, + [SemanticAttributes.DB_NAME]: this.database, + [SemanticAttributes.NET_PEER_NAME]: this.host, + [SemanticAttributes.DB_CONNECTION_STRING]: + utils.getConnectionString(this), + [SemanticAttributes.NET_PEER_PORT]: this.port, + [SemanticAttributes.DB_USER]: this.user, + }, + } + ); + + if (callback) { + const parentSpan = trace.getSpan(context.active()); + callback = utils.patchClientConnectCallback(span, callback); + if (parentSpan) { + callback = context.bind(context.active(), callback); + } + } + + const connectResult: unknown = context.with( + trace.setSpan(context.active(), span), + () => { + return original.call(this, callback); + } + ); + + return handleConnectResult(span, connectResult); + }; + }; + } + private _getClientQueryPatch() { const plugin = this; return (original: typeof pgTypes.Client.prototype.query) => { @@ -186,7 +244,7 @@ export class PgInstrumentation extends InstrumentationBase { } // Perform the original query - const result: unknown = original.apply(this, args as never); + const result: unknown = original.apply(this, args as any); // Bind promise to parent span and end the span if (result instanceof Promise) { @@ -225,7 +283,7 @@ export class PgInstrumentation extends InstrumentationBase { const plugin = this; return (originalConnect: typeof pgPoolTypes.prototype.connect) => { return function connect(this: PgPoolExtended, callback?: PgPoolCallback) { - const jdbcString = utils.getJDBCString(this.options); + const connString = utils.getConnectionString(this.options); // setup span const span = plugin.tracer.startSpan(`${PG_POOL_COMPONENT}.connect`, { kind: SpanKind.CLIENT, @@ -233,7 +291,7 @@ export class PgInstrumentation extends InstrumentationBase { [SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL, [SemanticAttributes.DB_NAME]: this.options.database, // required [SemanticAttributes.NET_PEER_NAME]: this.options.host, // required - [SemanticAttributes.DB_CONNECTION_STRING]: jdbcString, // required + [SemanticAttributes.DB_CONNECTION_STRING]: connString, // required [SemanticAttributes.NET_PEER_PORT]: this.options.port, [SemanticAttributes.DB_USER]: this.options.user, [AttributeNames.IDLE_TIMEOUT_MILLIS]: @@ -254,40 +312,39 @@ export class PgInstrumentation extends InstrumentationBase { } } - const connectResult: unknown = originalConnect.call( - this, - callback as never + const connectResult: unknown = context.with( + trace.setSpan(context.active(), span), + () => { + return originalConnect.call(this, callback as any); + } ); - // No callback was provided, return a promise instead - if (connectResult instanceof Promise) { - const connectResultPromise = connectResult as Promise; - return context.bind( - context.active(), - connectResultPromise - .then(result => { - // Return a pass-along promise which ends the span and then goes to user's orig resolvers - return new Promise(resolve => { - span.end(); - resolve(result); - }); - }) - .catch((error: Error) => { - return new Promise((_, reject) => { - span.setStatus({ - code: SpanStatusCode.ERROR, - message: error.message, - }); - span.end(); - reject(error); - }); - }) - ); - } - - // Else a callback was provided, so just return the result - return connectResult; + return handleConnectResult(span, connectResult); }; }; } } + +function handleConnectResult(span: Span, connectResult: unknown) { + if (!(connectResult instanceof Promise)) { + return connectResult; + } + + const connectResultPromise = connectResult as Promise; + return context.bind( + context.active(), + connectResultPromise + .then(result => { + span.end(); + return result; + }) + .catch((error: Error) => { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: error.message, + }); + span.end(); + return Promise.reject(error); + }) + ); +} diff --git a/plugins/node/opentelemetry-instrumentation-pg/src/types.ts b/plugins/node/opentelemetry-instrumentation-pg/src/types.ts index 8728dc1028..19cdef646a 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/src/types.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/src/types.ts @@ -47,10 +47,10 @@ export type PostgresCallback = (err: Error, res: object) => unknown; // These are not included in @types/pg, so manually define them. // https://github.com/brianc/node-postgres/blob/fde5ec586e49258dfc4a2fcd861fcdecb4794fc3/lib/client.js#L25 export interface PgClientConnectionParams { - database: string; - host: string; - port: number; - user: string; + database?: string; + host?: string; + port?: number; + user?: string; } export interface PgClientExtended extends pgTypes.Client { @@ -69,6 +69,8 @@ export type PgPoolCallback = ( done: (release?: any) => void ) => void; +export type PgErrorCallback = (err: Error) => void; + export interface PgPoolOptionsParams { database: string; host: string; @@ -81,3 +83,7 @@ export interface PgPoolOptionsParams { export interface PgPoolExtended extends pgPoolTypes { options: PgPoolOptionsParams; } + +export type PgClientConnect = ( + callback?: (err: Error) => void +) => Promise | void; diff --git a/plugins/node/opentelemetry-instrumentation-pg/src/utils.ts b/plugins/node/opentelemetry-instrumentation-pg/src/utils.ts index 87efaf6400..86b060dd26 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/src/utils.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/src/utils.ts @@ -31,6 +31,7 @@ import { NormalizedQueryConfig, PostgresCallback, PgClientConnectionParams, + PgErrorCallback, PgPoolCallback, PgPoolExtended, PgInstrumentationConfig, @@ -50,16 +51,16 @@ function getCommandFromText(text?: string): string { return words[0].length > 0 ? words[0] : 'unknown'; } -export function getJDBCString(params: PgClientConnectionParams) { - const host = params.host || 'localhost'; // postgres defaults to localhost - const port = params.port || 5432; // postgres defaults to port 5432 +export function getConnectionString(params: PgClientConnectionParams) { + const host = params.host || 'localhost'; + const port = params.port || 5432; const database = params.database || ''; - return `jdbc:postgresql://${host}:${port}/${database}`; + return `postgresql://${host}:${port}/${database}`; } // Private helper function to start a span function pgStartSpan(tracer: Tracer, client: PgClientExtended, name: string) { - const jdbcString = getJDBCString(client.connectionParameters); + const jdbcString = getConnectionString(client.connectionParameters); return tracer.startSpan(name, { kind: SpanKind.CLIENT, attributes: { @@ -236,3 +237,22 @@ export function patchCallbackPGPool( cb.call(this, err, res, done); }; } + +export function patchClientConnectCallback( + span: Span, + cb: PgErrorCallback +): PgErrorCallback { + return function patchedClientConnectCallback( + this: pgTypes.Client, + err: Error + ) { + if (err) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: err.message, + }); + } + span.end(); + cb.call(this, err); + }; +} diff --git a/plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts b/plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts index a39d20887a..7cb29e87d6 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts @@ -67,7 +67,7 @@ const DEFAULT_PGPOOL_ATTRIBUTES = { [SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL, [SemanticAttributes.DB_NAME]: CONFIG.database, [SemanticAttributes.NET_PEER_NAME]: CONFIG.host, - [SemanticAttributes.DB_CONNECTION_STRING]: `jdbc:postgresql://${CONFIG.host}:${CONFIG.port}/${CONFIG.database}`, + [SemanticAttributes.DB_CONNECTION_STRING]: `postgresql://${CONFIG.host}:${CONFIG.port}/${CONFIG.database}`, [SemanticAttributes.NET_PEER_PORT]: CONFIG.port, [SemanticAttributes.DB_USER]: CONFIG.user, [AttributeNames.MAX_CLIENT]: CONFIG.maxClient, @@ -78,7 +78,7 @@ const DEFAULT_PG_ATTRIBUTES = { [SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL, [SemanticAttributes.DB_NAME]: CONFIG.database, [SemanticAttributes.NET_PEER_NAME]: CONFIG.host, - [SemanticAttributes.DB_CONNECTION_STRING]: `jdbc:postgresql://${CONFIG.host}:${CONFIG.port}/${CONFIG.database}`, + [SemanticAttributes.DB_CONNECTION_STRING]: `postgresql://${CONFIG.host}:${CONFIG.port}/${CONFIG.database}`, [SemanticAttributes.NET_PEER_PORT]: CONFIG.port, [SemanticAttributes.DB_USER]: CONFIG.user, }; @@ -197,11 +197,19 @@ describe('pg-pool', () => { const span = provider.getTracer('test-pg-pool').startSpan('test span'); await context.with(trace.setSpan(context.active(), span), async () => { const client = await pool.connect(); - runCallbackTest(span, pgPoolattributes, events, unsetStatus, 1, 0); + runCallbackTest(span, pgPoolattributes, events, unsetStatus, 2, 1); + + const [connectSpan, poolConnectSpan] = + memoryExporter.getFinishedSpans(); + assert.strictEqual( + connectSpan.parentSpanId, + poolConnectSpan.spanContext().spanId + ); + assert.ok(client, 'pool.connect() returns a promise'); try { await client.query('SELECT NOW()'); - runCallbackTest(span, pgAttributes, events, unsetStatus, 2, 1); + runCallbackTest(span, pgAttributes, events, unsetStatus, 3, 2); } finally { client.release(); } diff --git a/plugins/node/opentelemetry-instrumentation-pg/test/pg.test.ts b/plugins/node/opentelemetry-instrumentation-pg/test/pg.test.ts index 2c26cf4c8f..d966eda8d8 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/test/pg.test.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/test/pg.test.ts @@ -64,7 +64,7 @@ const DEFAULT_ATTRIBUTES = { [SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL, [SemanticAttributes.DB_NAME]: CONFIG.database, [SemanticAttributes.NET_PEER_NAME]: CONFIG.host, - [SemanticAttributes.DB_CONNECTION_STRING]: `jdbc:postgresql://${CONFIG.host}:${CONFIG.port}/${CONFIG.database}`, + [SemanticAttributes.DB_CONNECTION_STRING]: `postgresql://${CONFIG.host}:${CONFIG.port}/${CONFIG.database}`, [SemanticAttributes.NET_PEER_PORT]: CONFIG.port, [SemanticAttributes.DB_USER]: CONFIG.user, }; @@ -99,6 +99,7 @@ describe('pg', () => { instrumentation.enable(); } + let postgres: typeof pg; let client: pg.Client; let instrumentation: PgInstrumentation; let contextManager: AsyncHooksContextManager; @@ -140,8 +141,8 @@ describe('pg', () => { context.setGlobalContextManager(contextManager); instrumentation.setTracerProvider(provider); - const pg = require('pg'); - client = new pg.Client(CONFIG); + postgres = require('pg'); + client = new postgres.Client(CONFIG); await client.connect(); }); @@ -205,6 +206,81 @@ describe('pg', () => { ); }); + describe('#client.connect(...)', () => { + let connClient: pg.Client; + + beforeEach(() => { + connClient = new postgres.Client(CONFIG); + }); + + afterEach(async () => { + await connClient.end(); + }); + + it('should not return a promise when callback is provided', done => { + const res = connClient.connect(err => { + assert.strictEqual(err, null); + done(); + }); + assert.strictEqual(res, undefined, 'No promise is returned'); + }); + + it('should return a promise if callback is not provided', done => { + const resPromise = connClient.connect(); + resPromise + .then(res => { + assert.equal(res, undefined); + assert.deepStrictEqual( + memoryExporter.getFinishedSpans()[0].name, + 'pg.connect' + ); + done(); + }) + .catch((err: Error) => { + assert.ok(false, err.message); + }); + }); + + it('should throw on failure', done => { + connClient = new postgres.Client({ ...CONFIG, port: 59999 }); + connClient + .connect() + .then(() => assert.fail('expected connect to throw')) + .catch(err => { + assert(err instanceof Error); + done(); + }); + }); + + it('should call back with an error', done => { + connClient = new postgres.Client({ ...CONFIG, port: 59999 }); + connClient.connect(err => { + assert(err instanceof Error); + done(); + }); + }); + + it('should intercept connect', async () => { + const span = tracer.startSpan('test span'); + context.with(trace.setSpan(context.active(), span), async () => { + await connClient.connect(); + const spans = memoryExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1); + const connectSpan = spans[0]; + assert.deepStrictEqual(connectSpan.name, 'pg.connect'); + testUtils.assertSpan( + connectSpan, + SpanKind.CLIENT, + DEFAULT_ATTRIBUTES, + [], + { code: SpanStatusCode.UNSET } + ); + + testUtils.assertPropagation(connectSpan, span); + }); + }); + }); + describe('#client.query(...)', () => { it('should not return a promise if callback is provided', done => { const res = client.query('SELECT NOW()', (err, res) => {