diff --git a/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/src/enums.ts b/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/src/enums.ts new file mode 100644 index 0000000000..06be4a2d4c --- /dev/null +++ b/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/src/enums.ts @@ -0,0 +1,36 @@ +/*! + * Copyright 2019, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export enum AttributeNames { + // required by https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/data-semantic-conventions.md#databases-client-calls + COMPONENT = 'component', + DB_TYPE = 'db.type', + DB_INSTANCE = 'db.instance', + DB_STATEMENT = 'db.statement', + PEER_ADDRESS = 'peer.address', + PEER_HOSTNAME = 'peer.host', + + // optional + DB_USER = 'db.user', + PEER_PORT = 'peer.port', + PEER_IPV4 = 'peer.ipv4', + PEER_IPV6 = 'peer.ipv6', + PEER_SERVICE = 'peer.service', + + // PG-POOL specific -- not specified by spec + IDLE_TIMEOUT_MILLIS = 'idle.timeout.millis', + MAX_CLIENT = 'max', +} diff --git a/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/src/index.ts b/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/src/index.ts new file mode 100644 index 0000000000..80fda80839 --- /dev/null +++ b/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/src/index.ts @@ -0,0 +1,17 @@ +/*! + * Copyright 2019, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export * from './pg-pool'; diff --git a/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/src/pg-pool.ts b/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/src/pg-pool.ts index ae225f6b52..be335c1203 100644 --- a/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/src/pg-pool.ts +++ b/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/src/pg-pool.ts @@ -13,3 +13,120 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +import { BasePlugin } from '@opentelemetry/core'; +import { CanonicalCode, SpanKind } from '@opentelemetry/types'; +import { AttributeNames } from './enums'; +import * as shimmer from 'shimmer'; +import * as pgPoolTypes from 'pg-pool'; +import { + PostgresPoolPluginOptions, + PgPoolCallback, + PgPoolExtended, +} from './types'; +import * as utils from './utils'; + +export class PostgresPoolPlugin extends BasePlugin { + protected _config: PostgresPoolPluginOptions; + + static readonly COMPONENT = 'pg-pool'; + static readonly DB_TYPE = 'sql'; + + readonly supportedVersions = ['2.*']; + + constructor(readonly moduleName: string) { + super(); + this._config = {}; + } + + protected patch(): typeof pgPoolTypes { + shimmer.wrap( + this._moduleExports.prototype, + 'connect', + this._getPoolConnectPatch() as never + ); + + return this._moduleExports; + } + + protected unpatch(): void { + shimmer.unwrap(this._moduleExports.prototype, 'connect'); + } + + private _getPoolConnectPatch() { + const plugin = this; + return (originalConnect: typeof pgPoolTypes.prototype.connect) => { + plugin._logger.debug( + `Patching ${PostgresPoolPlugin.COMPONENT}.prototype.connect` + ); + return function connect(this: PgPoolExtended, callback?: PgPoolCallback) { + const jdbcString = utils.getJDBCString(this.options); + // setup span + const span = plugin._tracer.startSpan( + `${PostgresPoolPlugin.COMPONENT}.connect`, + { + kind: SpanKind.CLIENT, + parent: plugin._tracer.getCurrentSpan() || undefined, + attributes: { + [AttributeNames.COMPONENT]: PostgresPoolPlugin.COMPONENT, // required + [AttributeNames.DB_TYPE]: PostgresPoolPlugin.DB_TYPE, // required + [AttributeNames.DB_INSTANCE]: this.options.database, // required + [AttributeNames.PEER_HOSTNAME]: this.options.host, // required + [AttributeNames.PEER_ADDRESS]: jdbcString, // required + [AttributeNames.PEER_PORT]: this.options.port, + [AttributeNames.DB_USER]: this.options.user, + [AttributeNames.IDLE_TIMEOUT_MILLIS]: this.options + .idleTimeoutMillis, + [AttributeNames.MAX_CLIENT]: this.options.maxClient, + }, + } + ); + + if (callback) { + const parentSpan = plugin._tracer.getCurrentSpan(); + callback = utils.patchCallback(span, callback) as PgPoolCallback; + // If a parent span exists, bind the callback + if (parentSpan) { + callback = plugin._tracer.bind(callback); + } + } + + const connectResult: unknown = originalConnect.call( + this, + callback as never + ); + + // No callback was provided, return a promise instead + if (connectResult instanceof Promise) { + const connectResultPromise = connectResult as Promise; + return plugin._tracer.bind( + connectResultPromise + .then((result: any) => { + // Resturn a pass-along promise which ends the span and then goes to user's orig resolvers + return new Promise((resolve, _) => { + span.setStatus({ code: CanonicalCode.OK }); + span.end(); + resolve(result); + }); + }) + .catch((error: Error) => { + return new Promise((_, reject) => { + span.setStatus({ + code: CanonicalCode.UNKNOWN, + message: error.message, + }); + span.end(); + reject(error); + }); + }) + ); + } + + // Else a callback was provided, so just return the result + return connectResult; + }; + }; + } +} + +export const plugin = new PostgresPoolPlugin(PostgresPoolPlugin.COMPONENT); diff --git a/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/src/types.ts b/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/src/types.ts new file mode 100644 index 0000000000..5f9648faf2 --- /dev/null +++ b/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/src/types.ts @@ -0,0 +1,39 @@ +/*! + * Copyright 2019, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as pgTypes from 'pg'; +import * as pgPoolTypes from 'pg-pool'; + +export interface PostgresPoolPluginOptions {} + +export type PgPoolCallback = ( + err: Error, + client: any, + done: (release?: any) => void +) => void; + +export interface PgPoolOptionsParams { + database: string; + host: string; + port: number; + user: string; + idleTimeoutMillis: number; // the minimum amount of time that an object may sit idle in the pool before it is eligible for eviction due to idle time + maxClient: number; // maximum size of the pool +} + +export interface PgPoolExtended extends pgPoolTypes { + options: PgPoolOptionsParams; +} diff --git a/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/src/utils.ts b/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/src/utils.ts new file mode 100644 index 0000000000..dce3f8e2b8 --- /dev/null +++ b/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/src/utils.ts @@ -0,0 +1,45 @@ +/*! + * Copyright 2019, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Span, CanonicalCode } from '@opentelemetry/types'; +import { PgPoolOptionsParams, PgPoolCallback, PgPoolExtended } from './types'; + +export function getJDBCString(params: PgPoolOptionsParams) { + const host = params.host || 'localhost'; // postgres defaults to localhost + const port = params.port || 5432; // postgres defaults to port 5432 + const database = params.database || ''; + return `jdbc:postgresql://${host}:${port}/${database}`; +} + +export function patchCallback(span: Span, cb: PgPoolCallback): PgPoolCallback { + return function patchedCallback( + this: PgPoolExtended, + err: Error, + res: object, + done: any + ) { + if (err) { + span.setStatus({ + code: CanonicalCode.UNKNOWN, + message: err.message, + }); + } else if (res) { + span.setStatus({ code: CanonicalCode.OK }); + } + span.end(); + cb.call(this, err, res, done); + }; +} diff --git a/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/test/assertionUtils.ts b/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/test/assertionUtils.ts new file mode 100644 index 0000000000..a964d47a2f --- /dev/null +++ b/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/test/assertionUtils.ts @@ -0,0 +1,79 @@ +/*! + * Copyright 2019, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { + SpanKind, + Attributes, + Event, + Span, + TimedEvent, +} from '@opentelemetry/types'; +import * as assert from 'assert'; +import { ReadableSpan } from '@opentelemetry/tracing'; +import { + hrTimeToMilliseconds, + hrTimeToMicroseconds, +} from '@opentelemetry/core'; + +export const assertSpan = ( + span: ReadableSpan, + kind: SpanKind, + attributes: Attributes, + events: Event[] +) => { + assert.strictEqual(span.spanContext.traceId.length, 32); + assert.strictEqual(span.spanContext.spanId.length, 16); + assert.strictEqual(span.kind, kind); + + // check all the AttributeNames fields + Object.keys(span.attributes).forEach(key => { + assert.deepStrictEqual(span.attributes[key], attributes[key]); + }); + + assert.ok(span.endTime); + assert.strictEqual(span.links.length, 0); + + assert.ok( + hrTimeToMicroseconds(span.startTime) < hrTimeToMicroseconds(span.endTime) + ); + assert.ok(hrTimeToMilliseconds(span.endTime) > 0); + + // events + assert.strictEqual( + span.events.length, + events.length, + 'Should contain same number of events' + ); + span.events.forEach((_: TimedEvent, index: number) => { + assert.deepStrictEqual(span.events[index], events[index]); + }); +}; + +// Check if sourceSpan was propagated to targetSpan +export const assertPropagation = ( + childSpan: ReadableSpan, + parentSpan: Span +) => { + const targetSpanContext = childSpan.spanContext; + const sourceSpanContext = parentSpan.context(); + assert.strictEqual(targetSpanContext.traceId, sourceSpanContext.traceId); + assert.strictEqual(childSpan.parentSpanId, sourceSpanContext.spanId); + assert.strictEqual( + targetSpanContext.traceFlags, + sourceSpanContext.traceFlags + ); + assert.notStrictEqual(targetSpanContext.spanId, sourceSpanContext.spanId); +}; diff --git a/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/test/pg-pool.test.ts b/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/test/pg-pool.test.ts index ae225f6b52..73ca6650a1 100644 --- a/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/test/pg-pool.test.ts +++ b/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/test/pg-pool.test.ts @@ -13,3 +13,229 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + +import { NoopLogger } from '@opentelemetry/core'; +import { NodeTracer } from '@opentelemetry/node'; +import { + InMemorySpanExporter, + SimpleSpanProcessor, +} from '@opentelemetry/tracing'; +import { SpanKind, Attributes, TimedEvent, Span } from '@opentelemetry/types'; +import { plugin as pgPlugin, PostgresPlugin } from '@opentelemetry/plugin-pg'; +import { plugin, PostgresPoolPlugin } from '../src'; +import { AttributeNames } from '../src/enums'; +import * as assert from 'assert'; +import * as pg from 'pg'; +import * as pgPool from 'pg-pool'; +import * as assertionUtils from './assertionUtils'; +import * as testUtils from './testUtils'; + +const memoryExporter = new InMemorySpanExporter(); + +const CONFIG = { + user: process.env.POSTGRES_USER || 'postgres', + database: process.env.POSTGRES_DB || 'postgres', + host: process.env.POSTGRES_HOST || 'localhost', + port: process.env.POSTGRES_PORT + ? parseInt(process.env.POSTGRES_PORT, 10) + : 54320, + maxClient: 1, + idleTimeoutMillis: 10000, +}; + +const DEFAULT_PGPOOL_ATTRIBUTES = { + [AttributeNames.COMPONENT]: PostgresPoolPlugin.COMPONENT, + [AttributeNames.DB_INSTANCE]: CONFIG.database, + [AttributeNames.DB_TYPE]: PostgresPoolPlugin.DB_TYPE, + [AttributeNames.PEER_HOSTNAME]: CONFIG.host, + [AttributeNames.PEER_ADDRESS]: `jdbc:postgresql://${CONFIG.host}:${CONFIG.port}/${CONFIG.database}`, + [AttributeNames.PEER_PORT]: CONFIG.port, + [AttributeNames.DB_USER]: CONFIG.user, + [AttributeNames.MAX_CLIENT]: CONFIG.maxClient, + [AttributeNames.IDLE_TIMEOUT_MILLIS]: CONFIG.idleTimeoutMillis, +}; + +const DEFAULT_PG_ATTRIBUTES = { + [AttributeNames.COMPONENT]: PostgresPlugin.COMPONENT, + [AttributeNames.DB_INSTANCE]: CONFIG.database, + [AttributeNames.DB_TYPE]: PostgresPlugin.DB_TYPE, + [AttributeNames.PEER_HOSTNAME]: CONFIG.host, + [AttributeNames.PEER_ADDRESS]: `jdbc:postgresql://${CONFIG.host}:${CONFIG.port}/${CONFIG.database}`, + [AttributeNames.PEER_PORT]: CONFIG.port, + [AttributeNames.DB_USER]: CONFIG.user, +}; + +const runCallbackTest = ( + parentSpan: Span, + attributes: Attributes, + events: TimedEvent[], + spansLength = 1, + spansIndex = 0 +) => { + const spans = memoryExporter.getFinishedSpans(); + assert.strictEqual(spans.length, spansLength); + const pgSpan = spans[spansIndex]; + assertionUtils.assertSpan(pgSpan, SpanKind.CLIENT, attributes, events); + assertionUtils.assertPropagation(pgSpan, parentSpan); +}; + +describe('pg-pool@2.x', () => { + let pool: pgPool; + const tracer = new NodeTracer(); + const logger = new NoopLogger(); + const testPostgres = process.env.TEST_POSTGRES; // For CI: assumes local postgres db is already available + const testPostgresLocally = process.env.TEST_POSTGRES_LOCAL; // For local: spins up local postgres db via docker + const shouldTest = testPostgres || testPostgresLocally; // Skips these tests if false (default) + + before(function(done) { + if (!shouldTest) { + // this.skip() workaround + // https://github.com/mochajs/mocha/issues/2683#issuecomment-375629901 + this.test!.parent!.pending = true; + this.skip(); + } + pool = new pgPool(CONFIG); + tracer.addSpanProcessor(new SimpleSpanProcessor(memoryExporter)); + if (testPostgresLocally) { + testUtils.startDocker(); + } + done(); + }); + + after(function(done) { + if (testPostgresLocally) { + testUtils.cleanUpDocker(); + } + pool.end(() => { + done(); + }); + }); + + beforeEach(function() { + plugin.enable(pgPool, tracer, logger); + pgPlugin.enable(pg, tracer, logger); + }); + + afterEach(() => { + memoryExporter.reset(); + plugin.disable(); + pgPlugin.disable(); + }); + + it('should return a plugin', () => { + assert.ok(plugin instanceof PostgresPoolPlugin); + }); + + it('should have correct moduleName', () => { + assert.strictEqual(plugin.moduleName, 'pg-pool'); + }); + + describe('#pool.connect()', () => { + // promise - checkout a client + it('should intercept pool.connect()', async () => { + const pgPoolattributes = { + ...DEFAULT_PGPOOL_ATTRIBUTES, + }; + const pgAttributes = { + ...DEFAULT_PG_ATTRIBUTES, + [AttributeNames.DB_STATEMENT]: 'SELECT NOW()', + }; + const events: TimedEvent[] = []; + const span = tracer.startSpan('test span'); + await tracer.withSpan(span, async () => { + const client = await pool.connect(); + runCallbackTest(span, pgPoolattributes, events, 1, 0); + assert.ok(client, 'pool.connect() returns a promise'); + try { + await client.query('SELECT NOW()'); + runCallbackTest(span, pgAttributes, events, 2, 1); + } catch (e) { + throw e; + } finally { + client.release(); + } + }); + }); + + // callback - checkout a client + it('should not return a promise if callback is provided', done => { + const pgPoolattributes = { + ...DEFAULT_PGPOOL_ATTRIBUTES, + }; + const pgAttributes = { + ...DEFAULT_PG_ATTRIBUTES, + [AttributeNames.DB_STATEMENT]: 'SELECT NOW()', + }; + const events: TimedEvent[] = []; + const parentSpan = tracer.startSpan('test span'); + tracer.withSpan(parentSpan, () => { + const resNoPromise = pool.connect((err, client, release) => { + if (err) { + return done(err); + } + release(); + assert.ok(client); + runCallbackTest(parentSpan, pgPoolattributes, events, 1, 0); + client.query('SELECT NOW()', (err, ret) => { + if (err) { + return done(err); + } + assert.ok(ret); + runCallbackTest(parentSpan, pgAttributes, events, 2, 1); + done(); + }); + }); + assert.strictEqual(resNoPromise, undefined, 'No promise is returned'); + }); + }); + }); + + describe('#pool.query()', () => { + // promise + it('should call patched client.query()', async () => { + const pgPoolattributes = { + ...DEFAULT_PGPOOL_ATTRIBUTES, + }; + const pgAttributes = { + ...DEFAULT_PG_ATTRIBUTES, + [AttributeNames.DB_STATEMENT]: 'SELECT NOW()', + }; + const events: TimedEvent[] = []; + const span = tracer.startSpan('test span'); + await tracer.withSpan(span, async () => { + try { + const result = await pool.query('SELECT NOW()'); + runCallbackTest(span, pgPoolattributes, events, 2, 0); + runCallbackTest(span, pgAttributes, events, 2, 1); + assert.ok(result, 'pool.query() returns a promise'); + } catch (e) { + throw e; + } + }); + }); + + // callback + it('should not return a promise if callback is provided', done => { + const pgPoolattributes = { + ...DEFAULT_PGPOOL_ATTRIBUTES, + }; + const pgAttributes = { + ...DEFAULT_PG_ATTRIBUTES, + [AttributeNames.DB_STATEMENT]: 'SELECT NOW()', + }; + const events: TimedEvent[] = []; + const parentSpan = tracer.startSpan('test span'); + tracer.withSpan(parentSpan, () => { + const resNoPromise = pool.query('SELECT NOW()', (err, result) => { + if (err) { + return done(err); + } + runCallbackTest(parentSpan, pgPoolattributes, events, 2, 0); + runCallbackTest(parentSpan, pgAttributes, events, 2, 1); + done(); + }); + assert.strictEqual(resNoPromise, undefined, 'No promise is returned'); + }); + }); + }); +}); diff --git a/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/test/testUtils.ts b/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/test/testUtils.ts new file mode 100644 index 0000000000..eec866fc41 --- /dev/null +++ b/packages/opentelemetry-plugin-postgres/opentelemetry-plugin-pg-pool/test/testUtils.ts @@ -0,0 +1,54 @@ +/*! + * Copyright 2019, OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import * as childProcess from 'child_process'; +export function startDocker() { + const tasks = [ + run('docker run -d -p 54320:5432 --name otpostgres postgres:alpine'), + ]; + + for (let i = 0; i < tasks.length; i++) { + const task = tasks[i]; + if (task && task.code !== 0) { + console.error('Failed to start container!'); + console.error(task.output); + return false; + } + } + return true; +} + +export function cleanUpDocker() { + run('docker stop otpostgres'); + run('docker rm otpostgres'); +} + +function run(cmd: string) { + try { + const proc = childProcess.spawnSync(cmd, { + shell: true, + }); + return { + code: proc.status, + output: proc.output + .map(v => String.fromCharCode.apply(null, v as any)) + .join(''), + }; + } catch (e) { + console.log(e); + return; + } +}