diff --git a/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts index 2df376d29f..d1f0a3f0cb 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts @@ -26,6 +26,7 @@ import { Span, SpanStatusCode, SpanKind, + UpDownCounter, } from '@opentelemetry/api'; import type * as pgTypes from 'pg'; import type * as pgPoolTypes from 'pg-pool'; @@ -41,12 +42,53 @@ import * as utils from './utils'; import { addSqlCommenterComment } from '@opentelemetry/sql-common'; import { PACKAGE_NAME, PACKAGE_VERSION } from './version'; import { SpanNames } from './enums/SpanNames'; +import { + METRIC_DB_CLIENT_CONNECTION_COUNT, + METRIC_DB_CLIENT_CONNECTION_PENDING_REQUESTS, +} from '@opentelemetry/semantic-conventions/incubating'; export class PgInstrumentation extends InstrumentationBase { + private _connectionsCount!: UpDownCounter; + private _connectionPendingRequests!: UpDownCounter; + // Pool events connect, acquire, release and remove can be called + // multiple times without changing the values of total, idle and waiting + // connections. The _connectionsCounter is used to keep track of latest + // values and only update the metrics _connectionsCount and _connectionPendingRequests + // when the value change. + private _connectionsCounter: utils.poolConnectionsCounter = { + used: 0, + idle: 0, + pending: 0, + }; + constructor(config: PgInstrumentationConfig = {}) { super(PACKAGE_NAME, PACKAGE_VERSION, config); } + override _updateMetricInstruments() { + this._connectionsCounter = { + idle: 0, + pending: 0, + used: 0, + }; + this._connectionsCount = this.meter.createUpDownCounter( + METRIC_DB_CLIENT_CONNECTION_COUNT, + { + description: + 'The number of connections that are currently in state described by the state attribute.', + unit: '{connection}', + } + ); + this._connectionPendingRequests = this.meter.createUpDownCounter( + METRIC_DB_CLIENT_CONNECTION_PENDING_REQUESTS, + { + description: + 'The number of current pending requests for an open connection.', + unit: '{connection}', + } + ); + } + protected init() { const modulePG = new InstrumentationNodeModuleDefinition( 'pg', @@ -334,6 +376,42 @@ export class PgInstrumentation extends InstrumentationBase { + plugin._connectionsCounter = utils.updateCounter( + this, + plugin._connectionsCount, + plugin._connectionPendingRequests, + plugin._connectionsCounter + ); + }); + + this.on('acquire', () => { + plugin._connectionsCounter = utils.updateCounter( + this, + plugin._connectionsCount, + plugin._connectionPendingRequests, + plugin._connectionsCounter + ); + }); + + this.on('remove', () => { + plugin._connectionsCounter = utils.updateCounter( + this, + plugin._connectionsCount, + plugin._connectionPendingRequests, + plugin._connectionsCounter + ); + }); + + this.on('release' as any, () => { + plugin._connectionsCounter = utils.updateCounter( + this, + plugin._connectionsCount, + plugin._connectionPendingRequests, + plugin._connectionsCounter + ); + }); + if (callback) { const parentSpan = trace.getSpan(context.active()); callback = utils.patchCallbackPGPool( diff --git a/plugins/node/opentelemetry-instrumentation-pg/src/utils.ts b/plugins/node/opentelemetry-instrumentation-pg/src/utils.ts index a16ccc9cd0..08f6bbe56c 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/src/utils.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/src/utils.ts @@ -22,6 +22,7 @@ import { Tracer, SpanKind, diag, + UpDownCounter, } from '@opentelemetry/api'; import { AttributeNames } from './enums/AttributeNames'; import { @@ -34,6 +35,12 @@ import { SEMATTRS_DB_STATEMENT, DBSYSTEMVALUES_POSTGRESQL, } from '@opentelemetry/semantic-conventions'; +import { + ATTR_DB_CLIENT_CONNECTION_POOL_NAME, + ATTR_DB_CLIENT_CONNECTION_STATE, + DB_CLIENT_CONNECTION_STATE_VALUE_USED, + DB_CLIENT_CONNECTION_STATE_VALUE_IDLE, +} from '@opentelemetry/semantic-conventions/incubating'; import { PgClientExtended, PostgresCallback, @@ -258,6 +265,50 @@ export function patchCallback( }; } +export function getPoolName(pool: PgPoolOptionsParams): string { + let poolName = ''; + poolName += (pool?.host ? `${pool.host}` : 'unknown_host') + ':'; + poolName += (pool?.port ? `${pool.port}` : 'unknown_port') + '/'; + poolName += pool?.database ? `${pool.database}` : 'unknown_database'; + + return poolName.trim(); +} + +export interface poolConnectionsCounter { + used: number; + idle: number; + pending: number; +} + +export function updateCounter( + pool: PgPoolExtended, + connectionCount: UpDownCounter, + connectionPendingRequests: UpDownCounter, + latestCounter: poolConnectionsCounter +): poolConnectionsCounter { + const poolName = getPoolName(pool.options); + const all = pool.totalCount; + const pending = pool.waitingCount; + const idle = pool.idleCount; + const used = all - idle; + + connectionCount.add(used - latestCounter.used, { + [ATTR_DB_CLIENT_CONNECTION_STATE]: DB_CLIENT_CONNECTION_STATE_VALUE_USED, + [ATTR_DB_CLIENT_CONNECTION_POOL_NAME]: poolName, + }); + + connectionCount.add(idle - latestCounter.idle, { + [ATTR_DB_CLIENT_CONNECTION_STATE]: DB_CLIENT_CONNECTION_STATE_VALUE_IDLE, + [ATTR_DB_CLIENT_CONNECTION_POOL_NAME]: poolName, + }); + + connectionPendingRequests.add(pending - latestCounter.pending, { + [ATTR_DB_CLIENT_CONNECTION_POOL_NAME]: poolName, + }); + + return { used: used, idle: idle, pending: pending }; +} + export function patchCallbackPGPool( span: Span, cb: PgPoolCallback diff --git a/plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts b/plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts index 62bfaf1711..ab57a0fb05 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/test/pg-pool.test.ts @@ -50,6 +50,7 @@ import { SEMATTRS_DB_USER, SEMATTRS_DB_STATEMENT, } from '@opentelemetry/semantic-conventions'; +import { ATTR_DB_CLIENT_CONNECTION_STATE } from '@opentelemetry/semantic-conventions/incubating'; const memoryExporter = new InMemorySpanExporter(); @@ -180,7 +181,7 @@ describe('pg-pool', () => { describe('#pool.connect()', () => { // promise - checkout a client it('should intercept pool.connect()', async () => { - const pgPoolattributes = { + const pgPoolAttributes = { ...DEFAULT_PGPOOL_ATTRIBUTES, }; const pgAttributes = { @@ -191,7 +192,7 @@ describe('pg-pool', () => { const span = provider.getTracer('test-pg-pool').startSpan('test span'); await context.with(trace.setSpan(context.active(), span), async () => { const client = await pool.connect(); - runCallbackTest(span, pgPoolattributes, events, unsetStatus, 2, 1); + runCallbackTest(span, pgPoolAttributes, events, unsetStatus, 2, 1); const [connectSpan, poolConnectSpan] = memoryExporter.getFinishedSpans(); @@ -212,7 +213,7 @@ describe('pg-pool', () => { // callback - checkout a client it('should not return a promise if callback is provided', done => { - const pgPoolattributes = { + const pgPoolAttributes = { ...DEFAULT_PGPOOL_ATTRIBUTES, }; const pgAttributes = { @@ -237,7 +238,7 @@ describe('pg-pool', () => { assert.ok(client); runCallbackTest( parentSpan, - pgPoolattributes, + pgPoolAttributes, events, unsetStatus, 1, @@ -285,7 +286,7 @@ describe('pg-pool', () => { describe('#pool.query()', () => { // promise it('should call patched client.query()', async () => { - const pgPoolattributes = { + const pgPoolAttributes = { ...DEFAULT_PGPOOL_ATTRIBUTES, }; const pgAttributes = { @@ -296,7 +297,7 @@ describe('pg-pool', () => { const span = provider.getTracer('test-pg-pool').startSpan('test span'); await context.with(trace.setSpan(context.active(), span), async () => { const result = await pool.query('SELECT NOW()'); - runCallbackTest(span, pgPoolattributes, events, unsetStatus, 2, 0); + runCallbackTest(span, pgPoolAttributes, events, unsetStatus, 2, 0); runCallbackTest(span, pgAttributes, events, unsetStatus, 2, 1); assert.ok(result, 'pool.query() returns a promise'); }); @@ -304,7 +305,7 @@ describe('pg-pool', () => { // callback it('should not return a promise if callback is provided', done => { - const pgPoolattributes = { + const pgPoolAttributes = { ...DEFAULT_PGPOOL_ATTRIBUTES, }; const pgAttributes = { @@ -322,7 +323,7 @@ describe('pg-pool', () => { } runCallbackTest( parentSpan, - pgPoolattributes, + pgPoolAttributes, events, unsetStatus, 2, @@ -341,7 +342,7 @@ describe('pg-pool', () => { const events: TimedEvent[] = []; describe('AND valid responseHook', () => { - const pgPoolattributes = { + const pgPoolAttributes = { ...DEFAULT_PGPOOL_ATTRIBUTES, }; const pgAttributes = { @@ -375,7 +376,7 @@ describe('pg-pool', () => { } runCallbackTest( parentSpan, - pgPoolattributes, + pgPoolAttributes, events, unsetStatus, 2, @@ -409,7 +410,7 @@ describe('pg-pool', () => { const result = await pool.query(query); runCallbackTest( span, - pgPoolattributes, + pgPoolAttributes, events, unsetStatus, 2, @@ -423,7 +424,7 @@ describe('pg-pool', () => { }); describe('AND invalid responseHook', () => { - const pgPoolattributes = { + const pgPoolAttributes = { ...DEFAULT_PGPOOL_ATTRIBUTES, }; const pgAttributes = { @@ -456,7 +457,7 @@ describe('pg-pool', () => { runCallbackTest( parentSpan, - pgPoolattributes, + pgPoolAttributes, events, unsetStatus, 2, @@ -482,4 +483,88 @@ describe('pg-pool', () => { }); }); }); + + describe('pg metrics', () => { + let metricReader: testUtils.TestMetricReader; + + beforeEach(() => { + metricReader = testUtils.initMeterProvider(instrumentation); + }); + + it('should generate `db.client.connection.count` and `db.client.connection.pending_requests` metrics', async () => { + pool.connect((err, client, release) => { + if (err) { + throw new Error(err.message); + } + if (!release) { + throw new Error('Did not receive release function'); + } + if (!client) { + throw new Error('No client received'); + } + assert.ok(client); + + client.query('SELECT NOW()', async (err, ret) => { + release(); + if (err) { + throw new Error(err.message); + } + assert.ok(ret); + + const { resourceMetrics, errors } = await metricReader.collect(); + assert.deepEqual( + errors, + [], + 'expected no errors from the callback during metric collection' + ); + + const metrics = resourceMetrics.scopeMetrics[0].metrics; + assert.strictEqual( + metrics[0].descriptor.name, + 'db.client.connection.count' + ); + assert.strictEqual( + metrics[0].descriptor.description, + 'The number of connections that are currently in state described by the state attribute.' + ); + assert.strictEqual( + metrics[0].dataPoints[0].attributes[ + ATTR_DB_CLIENT_CONNECTION_STATE + ], + 'used' + ); + assert.strictEqual( + metrics[0].dataPoints[0].value, + 1, + 'expected to have 1 used connection' + ); + assert.strictEqual( + metrics[0].dataPoints[1].attributes[ + ATTR_DB_CLIENT_CONNECTION_STATE + ], + 'idle' + ); + assert.strictEqual( + metrics[0].dataPoints[1].value, + 0, + 'expected to have 0 idle connections' + ); + + assert.strictEqual( + metrics[1].descriptor.name, + 'db.client.connection.pending_requests' + ); + assert.strictEqual( + metrics[1].descriptor.description, + 'The number of current pending requests for an open connection.' + ); + assert.strictEqual( + metrics[1].dataPoints[0].value, + 0, + 'expected to have 0 pending requests' + ); + }); + }); + }); + }); }); diff --git a/plugins/node/opentelemetry-instrumentation-pg/test/utils.test.ts b/plugins/node/opentelemetry-instrumentation-pg/test/utils.test.ts index 41ab6e5621..a76bb61988 100644 --- a/plugins/node/opentelemetry-instrumentation-pg/test/utils.test.ts +++ b/plugins/node/opentelemetry-instrumentation-pg/test/utils.test.ts @@ -26,7 +26,7 @@ import * as assert from 'assert'; import * as pg from 'pg'; import { PgInstrumentationConfig } from '../src'; import { AttributeNames } from '../src/enums/AttributeNames'; -import { PgClientExtended } from '../src/internal-types'; +import { PgClientExtended, PgPoolOptionsParams } from '../src/internal-types'; import * as utils from '../src/utils'; import { SEMATTRS_NET_PEER_PORT } from '@opentelemetry/semantic-conventions'; @@ -238,4 +238,22 @@ describe('utils.ts', () => { ); }); }); + + describe('.getPoolName()', () => { + it('creation of pool name based on pool config', () => { + const dummyPool: PgPoolOptionsParams = { + host: 'host_name', + port: 1234, + user: 'username', + database: 'database_name', + idleTimeoutMillis: 10, + maxClient: 5, + }; + + assert.strictEqual( + utils.getPoolName(dummyPool), + 'host_name:1234/database_name' + ); + }); + }); });