Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pg): add requireParentSpan option #1199

Merged
merged 11 commits into from
Oct 4, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,7 @@ import {
InstrumentationNodeModuleDefinition,
} from '@opentelemetry/instrumentation';

import {
context,
diag,
trace,
Span,
SpanKind,
SpanStatusCode,
} from '@opentelemetry/api';
import { context, diag, trace, Span, SpanStatusCode } from '@opentelemetry/api';
import * as pgTypes from 'pg';
import * as pgPoolTypes from 'pg-pool';
import {
Expand All @@ -46,6 +39,7 @@ import {
DbSystemValues,
} from '@opentelemetry/semantic-conventions';
import { VERSION } from './version';
import { startSpan } from './utils';

const PG_POOL_COMPONENT = 'pg-pool';

Expand Down Expand Up @@ -122,26 +116,33 @@ export class PgInstrumentation extends InstrumentationBase {
return [modulePG, modulePGPool];
}

override setConfig(config: PgInstrumentationConfig = {}) {
this._config = Object.assign({}, config);
}

override getConfig(): PgInstrumentationConfig {
return this._config as PgInstrumentationConfig;
}

private _getClientConnectPatch() {
const plugin = this;
return (original: PgClientConnect) => {
return function connect(
this: pgTypes.Client,
callback?: PgErrorCallback
) {
const span = plugin.tracer.startSpan(
const span = startSpan(
plugin.tracer,
plugin.getConfig(),
`${PgInstrumentation.COMPONENT}.connect`,
{
kind: SpanKind.CLIENT,
attributes: {
[SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL,
[SemanticAttributes.DB_NAME]: this.database,
[SemanticAttributes.NET_PEER_NAME]: this.host,
[SemanticAttributes.DB_CONNECTION_STRING]:
utils.getConnectionString(this),
[SemanticAttributes.NET_PEER_PORT]: this.port,
[SemanticAttributes.DB_USER]: this.user,
},
[SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL,
[SemanticAttributes.DB_NAME]: this.database,
[SemanticAttributes.NET_PEER_NAME]: this.host,
[SemanticAttributes.DB_CONNECTION_STRING]:
utils.getConnectionString(this),
[SemanticAttributes.NET_PEER_PORT]: this.port,
[SemanticAttributes.DB_USER]: this.user,
}
);

Expand Down Expand Up @@ -182,25 +183,31 @@ export class PgInstrumentation extends InstrumentationBase {
span = utils.handleParameterizedQuery.call(
this,
plugin.tracer,
plugin.getConfig() as PgInstrumentationConfig,
plugin.getConfig(),
query,
params
);
} else {
span = utils.handleTextQuery.call(this, plugin.tracer, query);
span = utils.handleTextQuery.call(
this,
plugin.tracer,
plugin.getConfig(),
query
);
}
} else if (typeof args[0] === 'object') {
const queryConfig = args[0] as NormalizedQueryConfig;
span = utils.handleConfigQuery.call(
this,
plugin.tracer,
plugin.getConfig() as PgInstrumentationConfig,
plugin.getConfig(),
queryConfig
);
} else {
return utils.handleInvalidQuery.call(
this,
plugin.tracer,
plugin.getConfig(),
original,
...args
);
Expand All @@ -212,7 +219,7 @@ export class PgInstrumentation extends InstrumentationBase {
if (typeof args[args.length - 1] === 'function') {
// Patch ParameterQuery callback
args[args.length - 1] = utils.patchCallback(
plugin.getConfig() as PgInstrumentationConfig,
plugin.getConfig(),
span,
args[args.length - 1] as PostgresCallback
);
Expand All @@ -228,7 +235,7 @@ export class PgInstrumentation extends InstrumentationBase {
) {
// Patch ConfigQuery callback
let callback = utils.patchCallback(
plugin.getConfig() as PgInstrumentationConfig,
plugin.getConfig(),
span,
(args[0] as NormalizedQueryConfig).callback!
);
Expand All @@ -252,11 +259,7 @@ export class PgInstrumentation extends InstrumentationBase {
.then((result: unknown) => {
// Return a pass-along promise which ends the span and then goes to user's orig resolvers
return new Promise(resolve => {
utils.handleExecutionResult(
plugin.getConfig() as PgInstrumentationConfig,
span,
result
);
utils.handleExecutionResult(plugin.getConfig(), span, result);
span.end();
resolve(result);
});
Expand Down Expand Up @@ -285,9 +288,11 @@ export class PgInstrumentation extends InstrumentationBase {
return function connect(this: PgPoolExtended, callback?: PgPoolCallback) {
const connString = utils.getConnectionString(this.options);
// setup span
const span = plugin.tracer.startSpan(`${PG_POOL_COMPONENT}.connect`, {
kind: SpanKind.CLIENT,
attributes: {
const span = startSpan(
plugin.tracer,
plugin.getConfig(),
`${PG_POOL_COMPONENT}.connect`,
{
[SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL,
[SemanticAttributes.DB_NAME]: this.options.database, // required
[SemanticAttributes.NET_PEER_NAME]: this.options.host, // required
Expand All @@ -297,8 +302,8 @@ export class PgInstrumentation extends InstrumentationBase {
[AttributeNames.IDLE_TIMEOUT_MILLIS]:
this.options.idleTimeoutMillis,
[AttributeNames.MAX_CLIENT]: this.options.maxClient,
},
});
}
);

if (callback) {
const parentSpan = trace.getSpan(context.active());
Expand Down
7 changes: 7 additions & 0 deletions plugins/node/opentelemetry-instrumentation-pg/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ export interface PgInstrumentationConfig extends InstrumentationConfig {
* @default undefined
*/
responseHook?: PgInstrumentationExecutionResponseHook;

/**
* If true, requires a parent span to create new spans.
*
* @default false
*/
requireParentSpan?: boolean;
}

export type PostgresCallback = (err: Error, res: object) => unknown;
Expand Down
63 changes: 48 additions & 15 deletions plugins/node/opentelemetry-instrumentation-pg/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
*/

import {
context,
trace,
Span,
SpanStatusCode,
Tracer,
SpanKind,
diag,
INVALID_SPAN_CONTEXT,
Attributes,
} from '@opentelemetry/api';
import { AttributeNames } from './enums/AttributeNames';
import {
Expand Down Expand Up @@ -58,19 +62,41 @@ export function getConnectionString(params: PgClientConnectionParams) {
return `postgresql://${host}:${port}/${database}`;
}

// Private helper function to start a span
function pgStartSpan(tracer: Tracer, client: PgClientExtended, name: string) {
const jdbcString = getConnectionString(client.connectionParameters);
export function startSpan(
tracer: Tracer,
instrumentationConfig: PgInstrumentationConfig,
name: string,
attributes: Attributes
): Span {
// If a parent span is required but not present, use a noop span to propagate
// context without recording it. Adapted from opentelemetry-instrumentation-http:
// https://github.com/open-telemetry/opentelemetry-js/blob/597ea98e58a4f68bcd9aec5fd283852efe444cd6/experimental/packages/opentelemetry-instrumentation-http/src/http.ts#L660
const currentSpan = trace.getSpan(context.active());
if (instrumentationConfig.requireParentSpan && currentSpan === undefined) {
return trace.wrapSpanContext(INVALID_SPAN_CONTEXT);
}

return tracer.startSpan(name, {
kind: SpanKind.CLIENT,
attributes: {
[SemanticAttributes.DB_NAME]: client.connectionParameters.database, // required
[SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL, // required
[SemanticAttributes.DB_CONNECTION_STRING]: jdbcString, // required
[SemanticAttributes.NET_PEER_NAME]: client.connectionParameters.host, // required
[SemanticAttributes.NET_PEER_PORT]: client.connectionParameters.port,
[SemanticAttributes.DB_USER]: client.connectionParameters.user,
},
attributes,
});
}

// Private helper function to start a span after a connection has been established
function startQuerySpan(
client: PgClientExtended,
tracer: Tracer,
instrumentationConfig: PgInstrumentationConfig,
name: string
) {
const jdbcString = getConnectionString(client.connectionParameters);
return startSpan(tracer, instrumentationConfig, name, {
[SemanticAttributes.DB_NAME]: client.connectionParameters.database, // required
[SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL, // required
[SemanticAttributes.DB_CONNECTION_STRING]: jdbcString, // required
[SemanticAttributes.NET_PEER_NAME]: client.connectionParameters.host, // required
[SemanticAttributes.NET_PEER_PORT]: client.connectionParameters.port,
[SemanticAttributes.DB_USER]: client.connectionParameters.user,
});
}

Expand All @@ -84,7 +110,7 @@ export function handleConfigQuery(
// Set child span name
const queryCommand = getCommandFromText(queryConfig.name || queryConfig.text);
const name = PgInstrumentation.BASE_SPAN_NAME + ':' + queryCommand;
const span = pgStartSpan(tracer, this, name);
const span = startQuerySpan(this, tracer, instrumentationConfig, name);

// Set attributes
if (queryConfig.text) {
Expand Down Expand Up @@ -118,7 +144,7 @@ export function handleParameterizedQuery(
// Set child span name
const queryCommand = getCommandFromText(query);
const name = PgInstrumentation.BASE_SPAN_NAME + ':' + queryCommand;
const span = pgStartSpan(tracer, this, name);
const span = startQuerySpan(this, tracer, instrumentationConfig, name);

// Set attributes
span.setAttribute(SemanticAttributes.DB_STATEMENT, query);
Expand All @@ -133,12 +159,13 @@ export function handleParameterizedQuery(
export function handleTextQuery(
this: PgClientExtended,
tracer: Tracer,
instrumentationConfig: PgInstrumentationConfig,
query: string
) {
// Set child span name
const queryCommand = getCommandFromText(query);
const name = PgInstrumentation.BASE_SPAN_NAME + ':' + queryCommand;
const span = pgStartSpan(tracer, this, name);
const span = startQuerySpan(this, tracer, instrumentationConfig, name);

// Set attributes
span.setAttribute(SemanticAttributes.DB_STATEMENT, query);
Expand All @@ -153,11 +180,17 @@ export function handleTextQuery(
export function handleInvalidQuery(
this: PgClientExtended,
tracer: Tracer,
instrumentationConfig: PgInstrumentationConfig,
originalQuery: typeof pgTypes.Client.prototype.query,
...args: unknown[]
) {
let result;
const span = pgStartSpan(tracer, this, PgInstrumentation.BASE_SPAN_NAME);
const span = startQuerySpan(
this,
tracer,
instrumentationConfig,
PgInstrumentation.BASE_SPAN_NAME
);
try {
result = originalQuery.apply(this, args as never);
} catch (e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ describe('pg-pool', () => {
};

beforeEach(async () => {
const config: PgInstrumentationConfig = {
create({
enhancedDatabaseReporting: true,
responseHook: (
span: Span,
Expand All @@ -350,9 +350,7 @@ describe('pg-pool', () => {
dataAttributeName,
JSON.stringify({ rowCount: responseInfo?.data.rowCount })
),
};

create(config);
});
});

it('should attach response hook data to resulting spans for query with callback ', done => {
Expand Down
29 changes: 26 additions & 3 deletions plugins/node/opentelemetry-instrumentation-pg/test/pg.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,16 @@ describe('pg', () => {
testUtils.assertPropagation(connectSpan, span);
});
});

it('should not generate traces when requireParentSpan=true is specified', async () => {
instrumentation.setConfig({
requireParentSpan: true,
});
memoryExporter.reset();
await connClient.connect();
const spans = memoryExporter.getFinishedSpans();
assert.strictEqual(spans.length, 0);
});
});

describe('#client.query(...)', () => {
Expand Down Expand Up @@ -482,7 +492,7 @@ describe('pg', () => {
[dataAttributeName]: '{"rowCount":1}',
};
beforeEach(async () => {
const config: PgInstrumentationConfig = {
create({
enhancedDatabaseReporting: true,
responseHook: (
span: Span,
Expand All @@ -492,8 +502,7 @@ describe('pg', () => {
dataAttributeName,
JSON.stringify({ rowCount: responseInfo?.data.rowCount })
),
};
create(config);
});
});

it('should attach response hook data to resulting spans for query with callback ', done => {
Expand Down Expand Up @@ -639,5 +648,19 @@ describe('pg', () => {
client.query('SELECT NOW()').then(queryHandler);
});
});

it('should not generate traces for client.query() when requireParentSpan=true is specified', done => {
instrumentation.setConfig({
requireParentSpan: true,
});
memoryExporter.reset();
client.query('SELECT NOW()', (err, res) => {
assert.strictEqual(err, null);
assert.ok(res);
const spans = memoryExporter.getFinishedSpans();
assert.strictEqual(spans.length, 0);
done();
});
});
});
});
Loading