Skip to content

Commit

Permalink
feat(instrumentation-pg): implementation of metric operation duration (
Browse files Browse the repository at this point in the history
…#2380)

Co-authored-by: Marc Pichler <[email protected]>
  • Loading branch information
maryliag and pichlermarc authored Sep 25, 2024
1 parent 0d6ebde commit 050fee0
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ import {
InstrumentationNodeModuleDefinition,
safeExecuteInTheMiddle,
} from '@opentelemetry/instrumentation';

import {
context,
trace,
Span,
SpanStatusCode,
SpanKind,
Histogram,
ValueType,
Attributes,
HrTime,
UpDownCounter,
} from '@opentelemetry/api';
import type * as pgTypes from 'pg';
Expand All @@ -42,12 +45,27 @@ import * as utils from './utils';
import { addSqlCommenterComment } from '@opentelemetry/sql-common';
import { PACKAGE_NAME, PACKAGE_VERSION } from './version';
import { SpanNames } from './enums/SpanNames';
import {
hrTime,
hrTimeDuration,
hrTimeToMilliseconds,
} from '@opentelemetry/core';
import {
DBSYSTEMVALUES_POSTGRESQL,
SEMATTRS_DB_SYSTEM,
ATTR_ERROR_TYPE,
ATTR_SERVER_PORT,
ATTR_SERVER_ADDRESS,
} from '@opentelemetry/semantic-conventions';
import {
METRIC_DB_CLIENT_CONNECTION_COUNT,
METRIC_DB_CLIENT_CONNECTION_PENDING_REQUESTS,
METRIC_DB_CLIENT_OPERATION_DURATION,
ATTR_DB_NAMESPACE,
} from '@opentelemetry/semantic-conventions/incubating';

export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConfig> {
private _operationDuration!: Histogram;
private _connectionsCount!: UpDownCounter;
private _connectionPendingRequests!: UpDownCounter;
// Pool events connect, acquire, release and remove can be called
Expand All @@ -66,6 +84,20 @@ export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConf
}

override _updateMetricInstruments() {
this._operationDuration = this.meter.createHistogram(
METRIC_DB_CLIENT_OPERATION_DURATION,
{
description: 'Duration of database client operations.',
unit: 's',
valueType: ValueType.DOUBLE,
advice: {
explicitBucketBoundaries: [
0.001, 0.005, 0.01, 0.05, 0.1, 0.5, 1, 5, 10,
],
},
}
);

this._connectionsCounter = {
idle: 0,
pending: 0,
Expand Down Expand Up @@ -188,6 +220,27 @@ export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConf
};
}

private recordOperationDuration(attributes: Attributes, startTime: HrTime) {
const metricsAttributes: Attributes = {};
const keysToCopy = [
SEMATTRS_DB_SYSTEM,
ATTR_DB_NAMESPACE,
ATTR_ERROR_TYPE,
ATTR_SERVER_PORT,
ATTR_SERVER_ADDRESS,
];

keysToCopy.forEach(key => {
if (key in attributes) {
metricsAttributes[key] = attributes[key];
}
});

const durationSeconds =
hrTimeToMilliseconds(hrTimeDuration(startTime, hrTime())) / 1000;
this._operationDuration.record(durationSeconds, metricsAttributes);
}

private _getClientQueryPatch() {
const plugin = this;
return (original: typeof pgTypes.Client.prototype.query) => {
Expand All @@ -196,6 +249,7 @@ export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConf
if (utils.shouldSkipInstrumentation(plugin.getConfig())) {
return original.apply(this, args as never);
}
const startTime = hrTime();

// client.query(text, cb?), client.query(text, values, cb?), and
// client.query(configObj, cb?) are all valid signatures. We construct
Expand All @@ -221,6 +275,17 @@ export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConf
? (arg0 as utils.ObjectWithText)
: undefined;

const attributes: Attributes = {
[SEMATTRS_DB_SYSTEM]: DBSYSTEMVALUES_POSTGRESQL,
[ATTR_DB_NAMESPACE]: this.database,
[ATTR_SERVER_PORT]: this.connectionParameters.port,
[ATTR_SERVER_ADDRESS]: this.connectionParameters.host,
};

const recordDuration = () => {
plugin.recordOperationDuration(attributes, startTime);
};

const instrumentationConfig = plugin.getConfig();

const span = utils.handleConfigQuery.call(
Expand Down Expand Up @@ -251,7 +316,8 @@ export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConf
args[args.length - 1] = utils.patchCallback(
instrumentationConfig,
span,
args[args.length - 1] as PostgresCallback // nb: not type safe.
args[args.length - 1] as PostgresCallback, // nb: not type safe.
recordDuration
);

// If a parent span exists, bind the callback
Expand All @@ -266,7 +332,8 @@ export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConf
let callback = utils.patchCallback(
plugin.getConfig(),
span,
queryConfig.callback as PostgresCallback // nb: not type safe.
queryConfig.callback as PostgresCallback, // nb: not type safe.
recordDuration
);

// If a parent span existed, bind the callback
Expand Down Expand Up @@ -324,7 +391,6 @@ export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConf
try {
result = original.apply(this, args as never);
} catch (e: unknown) {
// span.recordException(e);
span.setStatus({
code: SpanStatusCode.ERROR,
message: utils.getErrorMessage(e),
Expand Down
5 changes: 3 additions & 2 deletions plugins/node/opentelemetry-instrumentation-pg/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,15 @@ export function handleExecutionResult(
export function patchCallback(
instrumentationConfig: PgInstrumentationConfig,
span: Span,
cb: PostgresCallback
cb: PostgresCallback,
recordDuration: { (): void }
): PostgresCallback {
return function patchedCallback(
this: PgClientExtended,
err: Error,
res: object
) {
if (err) {
// span.recordException(err);
span.setStatus({
code: SpanStatusCode.ERROR,
message: err.message,
Expand All @@ -260,6 +260,7 @@ export function patchCallback(
handleExecutionResult(instrumentationConfig, span, res);
}

recordDuration();
span.end();
cb.call(this, err, res);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -520,46 +520,46 @@ describe('pg-pool', () => {

const metrics = resourceMetrics.scopeMetrics[0].metrics;
assert.strictEqual(
metrics[0].descriptor.name,
metrics[1].descriptor.name,
'db.client.connection.count'
);
assert.strictEqual(
metrics[0].descriptor.description,
metrics[1].descriptor.description,
'The number of connections that are currently in state described by the state attribute.'
);
assert.strictEqual(
metrics[0].dataPoints[0].attributes[
metrics[1].dataPoints[0].attributes[
ATTR_DB_CLIENT_CONNECTION_STATE
],
'used'
);
assert.strictEqual(
metrics[0].dataPoints[0].value,
metrics[1].dataPoints[0].value,
1,
'expected to have 1 used connection'
);
assert.strictEqual(
metrics[0].dataPoints[1].attributes[
metrics[1].dataPoints[1].attributes[
ATTR_DB_CLIENT_CONNECTION_STATE
],
'idle'
);
assert.strictEqual(
metrics[0].dataPoints[1].value,
metrics[1].dataPoints[1].value,
0,
'expected to have 0 idle connections'
);

assert.strictEqual(
metrics[1].descriptor.name,
metrics[2].descriptor.name,
'db.client.connection.pending_requests'
);
assert.strictEqual(
metrics[1].descriptor.description,
metrics[2].descriptor.description,
'The number of current pending requests for an open connection.'
);
assert.strictEqual(
metrics[1].dataPoints[0].value,
metrics[2].dataPoints[0].value,
0,
'expected to have 0 pending requests'
);
Expand Down
54 changes: 54 additions & 0 deletions plugins/node/opentelemetry-instrumentation-pg/test/pg.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
InMemorySpanExporter,
SimpleSpanProcessor,
} from '@opentelemetry/sdk-trace-base';
import { DataPoint, Histogram } from '@opentelemetry/sdk-metrics';
import * as assert from 'assert';
import type * as pg from 'pg';
import * as sinon from 'sinon';
Expand All @@ -50,6 +51,7 @@ import {
SEMATTRS_NET_PEER_PORT,
SEMATTRS_DB_USER,
DBSYSTEMVALUES_POSTGRESQL,
ATTR_ERROR_TYPE,
} from '@opentelemetry/semantic-conventions';
import { addSqlCommenterComment } from '@opentelemetry/sql-common';

Expand Down Expand Up @@ -960,4 +962,56 @@ describe('pg', () => {
});
});
});

describe('pg metrics', () => {
let metricReader: testUtils.TestMetricReader;

beforeEach(() => {
metricReader = testUtils.initMeterProvider(instrumentation);
});

it('should generate db.client.operation.duration metric', done => {
client.query('SELECT NOW()', async (_, ret) => {
assert.ok(ret, 'query should be executed');

const { resourceMetrics, errors } = await metricReader.collect();
assert.deepEqual(
errors,
[],
'expected no errors from the callback during metric collection'
);

const metrics = resourceMetrics.scopeMetrics[0].metrics;
assert.strictEqual(
metrics[0].descriptor.name,
'db.client.operation.duration'
);
assert.strictEqual(
metrics[0].descriptor.description,
'Duration of database client operations.'
);
const dataPoint = metrics[0].dataPoints[0];
assert.strictEqual(
dataPoint.attributes[SEMATTRS_DB_SYSTEM],
DBSYSTEMVALUES_POSTGRESQL
);
assert.strictEqual(dataPoint.attributes[ATTR_ERROR_TYPE], undefined);

const v = (dataPoint as DataPoint<Histogram>).value;
v.min = v.min ? v.min : 0;
v.max = v.max ? v.max : 0;
assert.equal(
v.min > 0,
true,
'expect min value for Histogram to be greater than 0'
);
assert.equal(
v.max > 0,
true,
'expect max value for Histogram to be greater than 0'
);
done();
});
});
});
});

0 comments on commit 050fee0

Please sign in to comment.