From 52136d85064b0d451b3cc67530ee96f8bb8128af Mon Sep 17 00:00:00 2001 From: Abhijeet Prasad Date: Tue, 7 Feb 2023 19:43:42 +0100 Subject: [PATCH] fix(instrumentation-amqplib): move `@types/amqplib` into dev deps (#1320) * fix(instrumentation-amqplib): move `@types/amqplib` into dev deps * refactor(instrumentation-amqplib): vendor in types into internal-types * refactor(instrumentation-amqplib): revert prettier formatting * refactor(instrumentation-amqplib): add code comments for vendoring * refactor(instrumentation-amqplib): delete unused file + run lint fixer * refactor(instrumentation-amqplib): fix ts eslint errorsOC * refactor(instrumentation-amqplib): fix lint --------- Co-authored-by: Amir Blum --- .../node/instrumentation-amqplib/package.json | 4 +- .../instrumentation-amqplib/src/amqplib.ts | 75 +++--- .../node/instrumentation-amqplib/src/types.ts | 249 +++++++++++++++++- 3 files changed, 285 insertions(+), 43 deletions(-) diff --git a/plugins/node/instrumentation-amqplib/package.json b/plugins/node/instrumentation-amqplib/package.json index b56b2b604fe..6ffae057ad9 100644 --- a/plugins/node/instrumentation-amqplib/package.json +++ b/plugins/node/instrumentation-amqplib/package.json @@ -48,12 +48,12 @@ "dependencies": { "@opentelemetry/core": "^1.8.0", "@opentelemetry/instrumentation": "^0.35.1", - "@opentelemetry/semantic-conventions": "^1.0.0", - "@types/amqplib": "^0.5.17" + "@opentelemetry/semantic-conventions": "^1.0.0" }, "devDependencies": { "@opentelemetry/api": "^1.3.0", "@opentelemetry/contrib-test-utils": "^0.33.0", + "@types/amqplib": "^0.5.17", "@types/lodash": "4.14.178", "@types/mocha": "8.2.3", "@types/sinon": "10.0.2", diff --git a/plugins/node/instrumentation-amqplib/src/amqplib.ts b/plugins/node/instrumentation-amqplib/src/amqplib.ts index 643fea0fbcb..f07663a2924 100644 --- a/plugins/node/instrumentation-amqplib/src/amqplib.ts +++ b/plugins/node/instrumentation-amqplib/src/amqplib.ts @@ -30,7 +30,6 @@ import { } from '@opentelemetry/core'; import { InstrumentationBase, - InstrumentationModuleDefinition, InstrumentationNodeModuleDefinition, InstrumentationNodeModuleFile, isWrapped, @@ -41,11 +40,15 @@ import { MessagingOperationValues, MessagingDestinationKindValues, } from '@opentelemetry/semantic-conventions'; -import type * as amqp from 'amqplib'; import { AmqplibInstrumentationConfig, + Connection, + ConsumeMessage, DEFAULT_CONFIG, EndOperation, + Message, + Options, + Replies, } from './types'; import { CHANNEL_CONSUME_TIMEOUT_TIMER, @@ -64,7 +67,7 @@ import { } from './utils'; import { VERSION } from './version'; -export class AmqplibInstrumentation extends InstrumentationBase { +export class AmqplibInstrumentation extends InstrumentationBase { protected override _config!: AmqplibInstrumentationConfig; constructor(config?: AmqplibInstrumentationConfig) { @@ -79,31 +82,29 @@ export class AmqplibInstrumentation extends InstrumentationBase { this._config = Object.assign({}, DEFAULT_CONFIG, config); } - protected init(): InstrumentationModuleDefinition { - const channelModelModuleFile = - new InstrumentationNodeModuleFile( - 'amqplib/lib/channel_model.js', - ['>=0.5.5'], - this.patchChannelModel.bind(this), - this.unpatchChannelModel.bind(this) - ); + protected init() { + const channelModelModuleFile = new InstrumentationNodeModuleFile( + 'amqplib/lib/channel_model.js', + ['>=0.5.5'], + this.patchChannelModel.bind(this), + this.unpatchChannelModel.bind(this) + ); - const callbackModelModuleFile = - new InstrumentationNodeModuleFile( - 'amqplib/lib/callback_model.js', - ['>=0.5.5'], - this.patchChannelModel.bind(this), - this.unpatchChannelModel.bind(this) - ); + const callbackModelModuleFile = new InstrumentationNodeModuleFile( + 'amqplib/lib/callback_model.js', + ['>=0.5.5'], + this.patchChannelModel.bind(this), + this.unpatchChannelModel.bind(this) + ); - const connectModuleFile = new InstrumentationNodeModuleFile( + const connectModuleFile = new InstrumentationNodeModuleFile( 'amqplib/lib/connect.js', ['>=0.5.5'], this.patchConnect.bind(this), this.unpatchConnect.bind(this) ); - const module = new InstrumentationNodeModuleDefinition( + const module = new InstrumentationNodeModuleDefinition( 'amqplib', ['>=0.5.5'], undefined, @@ -231,14 +232,14 @@ export class AmqplibInstrumentation extends InstrumentationBase { private getConnectPatch( original: ( - url: string | amqp.Options.Connect, + url: string | Options.Connect, socketOptions: any, - openCallback: (err: any, connection: amqp.Connection) => void - ) => amqp.Connection + openCallback: (err: any, connection: Connection) => void + ) => Connection ) { return function patchedConnect( this: unknown, - url: string | amqp.Options.Connect, + url: string | Options.Connect, socketOptions: any, openCallback: Function ) { @@ -246,7 +247,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { this, url, socketOptions, - function (this: unknown, err, conn: amqp.Connection) { + function (this: unknown, err, conn: Connection) { if (err == null) { const urlAttributes = getConnectionAttributesFromUrl(url); // the type of conn in @types/amqplib is amqp.Connection, but in practice the library send the @@ -321,7 +322,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { const self = this; return function ack( this: InstrumentationConsumeChannel, - message: amqp.Message, + message: Message, allUpToOrRequeue?: boolean, requeue?: boolean ): void { @@ -330,7 +331,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { const requeueResolved = endOperation === EndOperation.Reject ? allUpToOrRequeue : requeue; - const spansNotEnded: { msg: amqp.Message }[] = + const spansNotEnded: { msg: Message }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; const msgIndex = spansNotEnded.findIndex( msgDetails => msgDetails.msg === message @@ -375,9 +376,9 @@ export class AmqplibInstrumentation extends InstrumentationBase { return function consume( this: InstrumentationConsumeChannel, queue: string, - onMessage: (msg: amqp.ConsumeMessage | null) => void, - options?: amqp.Options.Consume - ): Promise { + onMessage: (msg: ConsumeMessage | null) => void, + options?: Options.Consume + ): Promise { const channel = this; if ( !Object.prototype.hasOwnProperty.call(channel, CHANNEL_SPANS_NOT_ENDED) @@ -475,8 +476,8 @@ export class AmqplibInstrumentation extends InstrumentationBase { exchange: string, routingKey: string, content: Buffer, - options?: amqp.Options.Publish, - callback?: (err: any, ok: amqp.Replies.Empty) => void + options?: Options.Publish, + callback?: (err: any, ok: Replies.Empty) => void ): boolean { const channel = this; const { span, modifiedOptions } = self.createPublishSpan( @@ -510,7 +511,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { const patchedOnConfirm = function ( this: unknown, err: any, - ok: amqp.Replies.Empty + ok: Replies.Empty ) { try { callback?.call(this, err, ok); @@ -572,7 +573,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { exchange: string, routingKey: string, content: Buffer, - options?: amqp.Options.Publish + options?: Options.Publish ): boolean { if (isConfirmChannelTracing(context.active())) { // work already done @@ -623,7 +624,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { exchange: string, routingKey: string, channel: InstrumentationPublishChannel, - options?: amqp.Options.Publish + options?: Options.Publish ) { const normalizedExchange = normalizeExchange(exchange); @@ -689,7 +690,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { operation: EndOperation, requeue: boolean | undefined ) { - const spansNotEnded: { msg: amqp.Message }[] = + const spansNotEnded: { msg: Message }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; spansNotEnded.forEach(msgDetails => { this.endConsumerSpan(msgDetails.msg, isRejected, operation, requeue); @@ -699,7 +700,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { private callConsumeEndHook( span: Span, - msg: amqp.ConsumeMessage, + msg: ConsumeMessage, rejected: boolean | null, endOperation: EndOperation ) { diff --git a/plugins/node/instrumentation-amqplib/src/types.ts b/plugins/node/instrumentation-amqplib/src/types.ts index 66c6140eeba..fb170571dc2 100644 --- a/plugins/node/instrumentation-amqplib/src/types.ts +++ b/plugins/node/instrumentation-amqplib/src/types.ts @@ -15,14 +15,13 @@ */ import { Span } from '@opentelemetry/api'; import { InstrumentationConfig } from '@opentelemetry/instrumentation'; -import type * as amqp from 'amqplib'; export interface PublishInfo { moduleVersion: string | undefined; exchange: string; routingKey: string; content: Buffer; - options?: amqp.Options.Publish; + options?: Options.Publish; isConfirmChannel?: boolean; } @@ -32,11 +31,11 @@ export interface PublishConfirmedInfo extends PublishInfo { export interface ConsumeInfo { moduleVersion: string | undefined; - msg: amqp.ConsumeMessage; + msg: ConsumeMessage; } export interface ConsumeEndInfo { - msg: amqp.ConsumeMessage; + msg: ConsumeMessage; rejected: boolean | null; endOperation: EndOperation; } @@ -102,3 +101,245 @@ export interface AmqplibInstrumentationConfig extends InstrumentationConfig { export const DEFAULT_CONFIG: AmqplibInstrumentationConfig = { consumeTimeoutMs: 1000 * 60, // 1 minute }; + +// The following types are vendored from `@types/amqplib@0.10.1` - commit SHA: 4205e03127692a40b4871709a7134fe4e2ed5510 + +// Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/properties.d.ts#LL23C18-L23C25 +// eslint-disable-next-line @typescript-eslint/no-namespace +export namespace Options { + export interface Connect { + /** + * The to be used protocol + * + * Default value: 'amqp' + */ + protocol?: string; + /** + * Hostname used for connecting to the server. + * + * Default value: 'localhost' + */ + hostname?: string; + /** + * Port used for connecting to the server. + * + * Default value: 5672 + */ + port?: number; + /** + * Username used for authenticating against the server. + * + * Default value: 'guest' + */ + username?: string; + /** + * Password used for authenticating against the server. + * + * Default value: 'guest' + */ + password?: string; + /** + * The desired locale for error messages. RabbitMQ only ever uses en_US + * + * Default value: 'en_US' + */ + locale?: string; + /** + * The size in bytes of the maximum frame allowed over the connection. 0 means + * no limit (but since frames have a size field which is an unsigned 32 bit integer, it’s perforce 2^32 - 1). + * + * Default value: 0x1000 (4kb) - That's the allowed minimum, it will fit many purposes + */ + frameMax?: number; + /** + * The period of the connection heartbeat in seconds. + * + * Default value: 0 + */ + heartbeat?: number; + /** + * What VHost shall be used. + * + * Default value: '/' + */ + vhost?: string; + } + + export interface AssertQueue { + exclusive?: boolean; + durable?: boolean; + autoDelete?: boolean; + arguments?: any; + messageTtl?: number; + expires?: number; + deadLetterExchange?: string; + deadLetterRoutingKey?: string; + maxLength?: number; + maxPriority?: number; + } + export interface DeleteQueue { + ifUnused?: boolean; + ifEmpty?: boolean; + } + export interface AssertExchange { + durable?: boolean; + internal?: boolean; + autoDelete?: boolean; + alternateExchange?: string; + arguments?: any; + } + export interface DeleteExchange { + ifUnused?: boolean; + } + export interface Publish { + expiration?: string | number; + userId?: string; + CC?: string | string[]; + + mandatory?: boolean; + persistent?: boolean; + deliveryMode?: boolean | number; + BCC?: string | string[]; + + contentType?: string; + contentEncoding?: string; + headers?: any; + priority?: number; + correlationId?: string; + replyTo?: string; + messageId?: string; + timestamp?: number; + type?: string; + appId?: string; + } + + export interface Consume { + consumerTag?: string; + noLocal?: boolean; + noAck?: boolean; + exclusive?: boolean; + priority?: number; + arguments?: any; + } + + export interface Get { + noAck?: boolean; + } +} + +// Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/properties.d.ts#L214 +interface ServerProperties { + host: string; + product: string; + version: string; + platform: string; + copyright?: string; + information: string; + [key: string]: string | undefined; +} + +// Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/properties.d.ts#L1 +// eslint-disable-next-line @typescript-eslint/no-namespace +export namespace Replies { + // eslint-disable-next-line @typescript-eslint/no-empty-interface + export interface Empty {} + + export interface AssertQueue { + queue: string; + messageCount: number; + consumerCount: number; + } + export interface PurgeQueue { + messageCount: number; + } + export interface DeleteQueue { + messageCount: number; + } + export interface AssertExchange { + exchange: string; + } + export interface Consume { + consumerTag: string; + } +} + +// Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/callback_api.d.ts#L55 +export interface ConfirmChannel { + publish( + exchange: string, + routingKey: string, + content: Buffer, + options?: Options.Publish, + callback?: (err: any, ok: Replies.Empty) => void + ): boolean; + sendToQueue( + queue: string, + content: Buffer, + options?: Options.Publish, + callback?: (err: any, ok: Replies.Empty) => void + ): boolean; + + waitForConfirms(): Promise; +} + +// Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/callback_api.d.ts#L5 +export interface Connection { + close(): Promise; + createChannel(): Promise; + createConfirmChannel(): Promise; + connection: { + serverProperties: ServerProperties; + }; +} + +// Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/properties.d.ts#L142 +export interface Message { + content: Buffer; + fields: MessageFields; + properties: MessageProperties; +} + +export interface GetMessage extends Message { + fields: GetMessageFields; +} + +export interface ConsumeMessage extends Message { + fields: ConsumeMessageFields; +} + +export interface CommonMessageFields { + deliveryTag: number; + redelivered: boolean; + exchange: string; + routingKey: string; +} + +export interface MessageFields extends CommonMessageFields { + messageCount?: number; + consumerTag?: string; +} + +export interface GetMessageFields extends CommonMessageFields { + messageCount: number; +} + +export interface ConsumeMessageFields extends CommonMessageFields { + deliveryTag: number; +} + +export interface MessageProperties { + contentType: any | undefined; + contentEncoding: any | undefined; + headers: any; + deliveryMode: any | undefined; + priority: any | undefined; + correlationId: any | undefined; + replyTo: any | undefined; + expiration: any | undefined; + messageId: any | undefined; + timestamp: any | undefined; + type: any | undefined; + userId: any | undefined; + appId: any | undefined; + clusterId: any | undefined; +}