From 54266b8345d1fd31f944834d9ff7748b15611bac Mon Sep 17 00:00:00 2001 From: Abhijeet Prasad Date: Tue, 6 Dec 2022 14:39:18 +0100 Subject: [PATCH 1/7] fix(instrumentation-amqplib): move `@types/amqplib` into dev deps --- plugins/node/instrumentation-amqplib/package.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugins/node/instrumentation-amqplib/package.json b/plugins/node/instrumentation-amqplib/package.json index 6a4282a7e6..632e7ee23d 100644 --- a/plugins/node/instrumentation-amqplib/package.json +++ b/plugins/node/instrumentation-amqplib/package.json @@ -48,12 +48,12 @@ "dependencies": { "@opentelemetry/core": "^1.8.0", "@opentelemetry/instrumentation": "^0.34.0", - "@opentelemetry/semantic-conventions": "^1.0.0", - "@types/amqplib": "^0.5.17" + "@opentelemetry/semantic-conventions": "^1.0.0" }, "devDependencies": { "@opentelemetry/api": "^1.3.0", "@opentelemetry/contrib-test-utils": "^0.33.0", + "@types/amqplib": "^0.5.17", "@types/lodash": "4.14.178", "@types/mocha": "8.2.3", "@types/sinon": "10.0.2", From b66d179ea60ea2359df88520d2be465d67ee2d0f Mon Sep 17 00:00:00 2001 From: Abhijeet Prasad Date: Mon, 9 Jan 2023 14:50:51 +0100 Subject: [PATCH 2/7] refactor(instrumentation-amqplib): vendor in types into internal-types --- .../instrumentation-amqplib/src/amqplib.ts | 175 +++++++------ .../src/internal-types.ts | 245 ++++++++++++++++++ .../node/instrumentation-amqplib/src/types.ts | 30 +-- 3 files changed, 349 insertions(+), 101 deletions(-) create mode 100644 plugins/node/instrumentation-amqplib/src/internal-types.ts diff --git a/plugins/node/instrumentation-amqplib/src/amqplib.ts b/plugins/node/instrumentation-amqplib/src/amqplib.ts index 5d51a3dde5..fe9c8c954b 100644 --- a/plugins/node/instrumentation-amqplib/src/amqplib.ts +++ b/plugins/node/instrumentation-amqplib/src/amqplib.ts @@ -22,31 +22,36 @@ import { SpanKind, SpanStatusCode, ROOT_CONTEXT, -} from '@opentelemetry/api'; +} from "@opentelemetry/api"; import { hrTime, hrTimeDuration, hrTimeToMilliseconds, -} from '@opentelemetry/core'; +} from "@opentelemetry/core"; import { InstrumentationBase, - InstrumentationModuleDefinition, InstrumentationNodeModuleDefinition, InstrumentationNodeModuleFile, isWrapped, safeExecuteInTheMiddle, -} from '@opentelemetry/instrumentation'; +} from "@opentelemetry/instrumentation"; import { SemanticAttributes, MessagingOperationValues, MessagingDestinationKindValues, -} from '@opentelemetry/semantic-conventions'; -import type * as amqp from 'amqplib'; +} from "@opentelemetry/semantic-conventions"; +import { + Connection, + ConsumeMessage, + Message, + Options, + Replies, +} from "./internal-types"; import { AmqplibInstrumentationConfig, DEFAULT_CONFIG, EndOperation, -} from './types'; +} from "./types"; import { CHANNEL_CONSUME_TIMEOUT_TIMER, CHANNEL_SPANS_NOT_ENDED, @@ -61,15 +66,15 @@ import { MESSAGE_STORED_SPAN, normalizeExchange, unmarkConfirmChannelTracing, -} from './utils'; -import { VERSION } from './version'; +} from "./utils"; +import { VERSION } from "./version"; -export class AmqplibInstrumentation extends InstrumentationBase { +export class AmqplibInstrumentation extends InstrumentationBase { protected override _config!: AmqplibInstrumentationConfig; constructor(config?: AmqplibInstrumentationConfig) { super( - '@opentelemetry/instrumentation-amqplib', + "@opentelemetry/instrumentation-amqplib", VERSION, Object.assign({}, DEFAULT_CONFIG, config) ); @@ -79,33 +84,31 @@ export class AmqplibInstrumentation extends InstrumentationBase { this._config = Object.assign({}, DEFAULT_CONFIG, config); } - protected init(): InstrumentationModuleDefinition { - const channelModelModuleFile = - new InstrumentationNodeModuleFile( - 'amqplib/lib/channel_model.js', - ['>=0.5.5'], - this.patchChannelModel.bind(this), - this.unpatchChannelModel.bind(this) - ); + protected init() { + const channelModelModuleFile = new InstrumentationNodeModuleFile( + "amqplib/lib/channel_model.js", + [">=0.5.5"], + this.patchChannelModel.bind(this), + this.unpatchChannelModel.bind(this) + ); - const callbackModelModuleFile = - new InstrumentationNodeModuleFile( - 'amqplib/lib/callback_model.js', - ['>=0.5.5'], - this.patchChannelModel.bind(this), - this.unpatchChannelModel.bind(this) - ); + const callbackModelModuleFile = new InstrumentationNodeModuleFile( + "amqplib/lib/callback_model.js", + [">=0.5.5"], + this.patchChannelModel.bind(this), + this.unpatchChannelModel.bind(this) + ); - const connectModuleFile = new InstrumentationNodeModuleFile( - 'amqplib/lib/connect.js', - ['>=0.5.5'], + const connectModuleFile = new InstrumentationNodeModuleFile( + "amqplib/lib/connect.js", + [">=0.5.5"], this.patchConnect.bind(this), this.unpatchConnect.bind(this) ); - const module = new InstrumentationNodeModuleDefinition( - 'amqplib', - ['>=0.5.5'], + const module = new InstrumentationNodeModuleDefinition( + "amqplib", + [">=0.5.5"], undefined, undefined, [channelModelModuleFile, connectModuleFile, callbackModelModuleFile] @@ -116,14 +119,14 @@ export class AmqplibInstrumentation extends InstrumentationBase { private patchConnect(moduleExports: any) { moduleExports = this.unpatchConnect(moduleExports); if (!isWrapped(moduleExports.connect)) { - this._wrap(moduleExports, 'connect', this.getConnectPatch.bind(this)); + this._wrap(moduleExports, "connect", this.getConnectPatch.bind(this)); } return moduleExports; } private unpatchConnect(moduleExports: any) { if (isWrapped(moduleExports.connect)) { - this._unwrap(moduleExports, 'connect'); + this._unwrap(moduleExports, "connect"); } return moduleExports; } @@ -135,63 +138,63 @@ export class AmqplibInstrumentation extends InstrumentationBase { if (!isWrapped(moduleExports.Channel.prototype.publish)) { this._wrap( moduleExports.Channel.prototype, - 'publish', + "publish", this.getPublishPatch.bind(this, moduleVersion) ); } if (!isWrapped(moduleExports.Channel.prototype.consume)) { this._wrap( moduleExports.Channel.prototype, - 'consume', + "consume", this.getConsumePatch.bind(this, moduleVersion) ); } if (!isWrapped(moduleExports.Channel.prototype.ack)) { this._wrap( moduleExports.Channel.prototype, - 'ack', + "ack", this.getAckPatch.bind(this, false, EndOperation.Ack) ); } if (!isWrapped(moduleExports.Channel.prototype.nack)) { this._wrap( moduleExports.Channel.prototype, - 'nack', + "nack", this.getAckPatch.bind(this, true, EndOperation.Nack) ); } if (!isWrapped(moduleExports.Channel.prototype.reject)) { this._wrap( moduleExports.Channel.prototype, - 'reject', + "reject", this.getAckPatch.bind(this, true, EndOperation.Reject) ); } if (!isWrapped(moduleExports.Channel.prototype.ackAll)) { this._wrap( moduleExports.Channel.prototype, - 'ackAll', + "ackAll", this.getAckAllPatch.bind(this, false, EndOperation.AckAll) ); } if (!isWrapped(moduleExports.Channel.prototype.nackAll)) { this._wrap( moduleExports.Channel.prototype, - 'nackAll', + "nackAll", this.getAckAllPatch.bind(this, true, EndOperation.NackAll) ); } if (!isWrapped(moduleExports.Channel.prototype.emit)) { this._wrap( moduleExports.Channel.prototype, - 'emit', + "emit", this.getChannelEmitPatch.bind(this) ); } if (!isWrapped(moduleExports.ConfirmChannel.prototype.publish)) { this._wrap( moduleExports.ConfirmChannel.prototype, - 'publish', + "publish", this.getConfirmedPublishPatch.bind(this, moduleVersion) ); } @@ -200,45 +203,45 @@ export class AmqplibInstrumentation extends InstrumentationBase { private unpatchChannelModel(moduleExports: any) { if (isWrapped(moduleExports.Channel.prototype.publish)) { - this._unwrap(moduleExports.Channel.prototype, 'publish'); + this._unwrap(moduleExports.Channel.prototype, "publish"); } if (isWrapped(moduleExports.Channel.prototype.consume)) { - this._unwrap(moduleExports.Channel.prototype, 'consume'); + this._unwrap(moduleExports.Channel.prototype, "consume"); } if (isWrapped(moduleExports.Channel.prototype.ack)) { - this._unwrap(moduleExports.Channel.prototype, 'ack'); + this._unwrap(moduleExports.Channel.prototype, "ack"); } if (isWrapped(moduleExports.Channel.prototype.nack)) { - this._unwrap(moduleExports.Channel.prototype, 'nack'); + this._unwrap(moduleExports.Channel.prototype, "nack"); } if (isWrapped(moduleExports.Channel.prototype.reject)) { - this._unwrap(moduleExports.Channel.prototype, 'reject'); + this._unwrap(moduleExports.Channel.prototype, "reject"); } if (isWrapped(moduleExports.Channel.prototype.ackAll)) { - this._unwrap(moduleExports.Channel.prototype, 'ackAll'); + this._unwrap(moduleExports.Channel.prototype, "ackAll"); } if (isWrapped(moduleExports.Channel.prototype.nackAll)) { - this._unwrap(moduleExports.Channel.prototype, 'nackAll'); + this._unwrap(moduleExports.Channel.prototype, "nackAll"); } if (isWrapped(moduleExports.Channel.prototype.emit)) { - this._unwrap(moduleExports.Channel.prototype, 'emit'); + this._unwrap(moduleExports.Channel.prototype, "emit"); } if (isWrapped(moduleExports.ConfirmChannel.prototype.publish)) { - this._unwrap(moduleExports.ConfirmChannel.prototype, 'publish'); + this._unwrap(moduleExports.ConfirmChannel.prototype, "publish"); } return moduleExports; } private getConnectPatch( original: ( - url: string | amqp.Options.Connect, + url: string | Options.Connect, socketOptions: any, - openCallback: (err: any, connection: amqp.Connection) => void - ) => amqp.Connection + openCallback: (err: any, connection: Connection) => void + ) => Connection ) { return function patchedConnect( this: unknown, - url: string | amqp.Options.Connect, + url: string | Options.Connect, socketOptions: any, openCallback: Function ) { @@ -246,7 +249,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { this, url, socketOptions, - function (this: unknown, err, conn: amqp.Connection) { + function (this: unknown, err, conn: Connection) { if (err == null) { const urlAttributes = getConnectionAttributesFromUrl(url); // the type of conn in @types/amqplib is amqp.Connection, but in practice the library send the @@ -274,7 +277,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { this: InstrumentationConsumeChannel, eventName: string ) { - if (eventName === 'close') { + if (eventName === "close") { self.endAllSpansOnChannel( this, true, @@ -286,7 +289,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { clearInterval(activeTimer); } this[CHANNEL_CONSUME_TIMEOUT_TIMER] = undefined; - } else if (eventName === 'error') { + } else if (eventName === "error") { self.endAllSpansOnChannel( this, true, @@ -321,7 +324,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { const self = this; return function ack( this: InstrumentationConsumeChannel, - message: amqp.Message, + message: Message, allUpToOrRequeue?: boolean, requeue?: boolean ): void { @@ -330,10 +333,10 @@ export class AmqplibInstrumentation extends InstrumentationBase { const requeueResolved = endOperation === EndOperation.Reject ? allUpToOrRequeue : requeue; - const spansNotEnded: { msg: amqp.Message }[] = + const spansNotEnded: { msg: Message }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; const msgIndex = spansNotEnded.findIndex( - msgDetails => msgDetails.msg === message + (msgDetails) => msgDetails.msg === message ); if (msgIndex < 0) { // should not happen in happy flow @@ -375,9 +378,9 @@ export class AmqplibInstrumentation extends InstrumentationBase { return function consume( this: InstrumentationConsumeChannel, queue: string, - onMessage: (msg: amqp.ConsumeMessage | null) => void, - options?: amqp.Options.Consume - ): Promise { + onMessage: (msg: ConsumeMessage | null) => void, + options?: Options.Consume + ): Promise { const channel = this; if ( !Object.prototype.hasOwnProperty.call(channel, CHANNEL_SPANS_NOT_ENDED) @@ -431,9 +434,9 @@ export class AmqplibInstrumentation extends InstrumentationBase { if (self._config.consumeHook) { safeExecuteInTheMiddle( () => self._config.consumeHook!(span, { moduleVersion, msg }), - e => { + (e) => { if (e) { - diag.error('amqplib instrumentation: consumerHook error', e); + diag.error("amqplib instrumentation: consumerHook error", e); } }, true @@ -475,8 +478,8 @@ export class AmqplibInstrumentation extends InstrumentationBase { exchange: string, routingKey: string, content: Buffer, - options?: amqp.Options.Publish, - callback?: (err: any, ok: amqp.Replies.Empty) => void + options?: Options.Publish, + callback?: (err: any, ok: Replies.Empty) => void ): boolean { const channel = this; const { span, modifiedOptions } = self.createPublishSpan( @@ -498,9 +501,9 @@ export class AmqplibInstrumentation extends InstrumentationBase { options: modifiedOptions, isConfirmChannel: true, }), - e => { + (e) => { if (e) { - diag.error('amqplib instrumentation: publishHook error', e); + diag.error("amqplib instrumentation: publishHook error", e); } }, true @@ -510,7 +513,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { const patchedOnConfirm = function ( this: unknown, err: any, - ok: amqp.Replies.Empty + ok: Replies.Empty ) { try { callback?.call(this, err, ok); @@ -527,10 +530,10 @@ export class AmqplibInstrumentation extends InstrumentationBase { isConfirmChannel: true, confirmError: err, }), - e => { + (e) => { if (e) { diag.error( - 'amqplib instrumentation: publishConfirmHook error', + "amqplib instrumentation: publishConfirmHook error", e ); } @@ -572,7 +575,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { exchange: string, routingKey: string, content: Buffer, - options?: amqp.Options.Publish + options?: Options.Publish ): boolean { if (isConfirmChannelTracing(context.active())) { // work already done @@ -598,9 +601,9 @@ export class AmqplibInstrumentation extends InstrumentationBase { options: modifiedOptions, isConfirmChannel: false, }), - e => { + (e) => { if (e) { - diag.error('amqplib instrumentation: publishHook error', e); + diag.error("amqplib instrumentation: publishHook error", e); } }, true @@ -623,7 +626,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { exchange: string, routingKey: string, channel: InstrumentationPublishChannel, - options?: amqp.Options.Publish + options?: Options.Publish ) { const normalizedExchange = normalizeExchange(exchange); @@ -670,10 +673,10 @@ export class AmqplibInstrumentation extends InstrumentationBase { operation !== EndOperation.ChannelError ? `${operation} called on message${ requeue === true - ? ' with requeue' + ? " with requeue" : requeue === false - ? ' without requeue' - : '' + ? " without requeue" + : "" }` : operation, }); @@ -689,9 +692,9 @@ export class AmqplibInstrumentation extends InstrumentationBase { operation: EndOperation, requeue: boolean | undefined ) { - const spansNotEnded: { msg: amqp.Message }[] = + const spansNotEnded: { msg: Message }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; - spansNotEnded.forEach(msgDetails => { + spansNotEnded.forEach((msgDetails) => { this.endConsumerSpan(msgDetails.msg, isRejected, operation, requeue); }); channel[CHANNEL_SPANS_NOT_ENDED] = []; @@ -699,7 +702,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { private callConsumeEndHook( span: Span, - msg: amqp.ConsumeMessage, + msg: ConsumeMessage, rejected: boolean | null, endOperation: EndOperation ) { @@ -707,9 +710,9 @@ export class AmqplibInstrumentation extends InstrumentationBase { safeExecuteInTheMiddle( () => this._config.consumeEndHook!(span, { msg, rejected, endOperation }), - e => { + (e) => { if (e) { - diag.error('amqplib instrumentation: consumerEndHook error', e); + diag.error("amqplib instrumentation: consumerEndHook error", e); } }, true diff --git a/plugins/node/instrumentation-amqplib/src/internal-types.ts b/plugins/node/instrumentation-amqplib/src/internal-types.ts new file mode 100644 index 0000000000..b646f1b871 --- /dev/null +++ b/plugins/node/instrumentation-amqplib/src/internal-types.ts @@ -0,0 +1,245 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export namespace Options { + export interface Connect { + /** + * The to be used protocol + * + * Default value: 'amqp' + */ + protocol?: string; + /** + * Hostname used for connecting to the server. + * + * Default value: 'localhost' + */ + hostname?: string; + /** + * Port used for connecting to the server. + * + * Default value: 5672 + */ + port?: number; + /** + * Username used for authenticating against the server. + * + * Default value: 'guest' + */ + username?: string; + /** + * Password used for authenticating against the server. + * + * Default value: 'guest' + */ + password?: string; + /** + * The desired locale for error messages. RabbitMQ only ever uses en_US + * + * Default value: 'en_US' + */ + locale?: string; + /** + * The size in bytes of the maximum frame allowed over the connection. 0 means + * no limit (but since frames have a size field which is an unsigned 32 bit integer, it’s perforce 2^32 - 1). + * + * Default value: 0x1000 (4kb) - That's the allowed minimum, it will fit many purposes + */ + frameMax?: number; + /** + * The period of the connection heartbeat in seconds. + * + * Default value: 0 + */ + heartbeat?: number; + /** + * What VHost shall be used. + * + * Default value: '/' + */ + vhost?: string; + } + + export interface AssertQueue { + exclusive?: boolean; + durable?: boolean; + autoDelete?: boolean; + arguments?: any; + messageTtl?: number; + expires?: number; + deadLetterExchange?: string; + deadLetterRoutingKey?: string; + maxLength?: number; + maxPriority?: number; + } + export interface DeleteQueue { + ifUnused?: boolean; + ifEmpty?: boolean; + } + export interface AssertExchange { + durable?: boolean; + internal?: boolean; + autoDelete?: boolean; + alternateExchange?: string; + arguments?: any; + } + export interface DeleteExchange { + ifUnused?: boolean; + } + export interface Publish { + expiration?: string | number; + userId?: string; + CC?: string | string[]; + + mandatory?: boolean; + persistent?: boolean; + deliveryMode?: boolean | number; + BCC?: string | string[]; + + contentType?: string; + contentEncoding?: string; + headers?: any; + priority?: number; + correlationId?: string; + replyTo?: string; + messageId?: string; + timestamp?: number; + type?: string; + appId?: string; + } + + export interface Consume { + consumerTag?: string; + noLocal?: boolean; + noAck?: boolean; + exclusive?: boolean; + priority?: number; + arguments?: any; + } + + export interface Get { + noAck?: boolean; + } +} + +interface ServerProperties { + host: string; + product: string; + version: string; + platform: string; + copyright?: string; + information: string; + [key: string]: string | undefined; +} + +export namespace Replies { + export interface Empty {} + export interface AssertQueue { + queue: string; + messageCount: number; + consumerCount: number; + } + export interface PurgeQueue { + messageCount: number; + } + export interface DeleteQueue { + messageCount: number; + } + export interface AssertExchange { + exchange: string; + } + export interface Consume { + consumerTag: string; + } +} + +export interface ConfirmChannel { + publish( + exchange: string, + routingKey: string, + content: Buffer, + options?: Options.Publish, + callback?: (err: any, ok: Replies.Empty) => void + ): boolean; + sendToQueue( + queue: string, + content: Buffer, + options?: Options.Publish, + callback?: (err: any, ok: Replies.Empty) => void + ): boolean; + + waitForConfirms(): Promise; +} + +export interface Connection { + close(): Promise; + createChannel(): Promise; + createConfirmChannel(): Promise; + connection: { + serverProperties: ServerProperties; + }; +} + +export interface Message { + content: Buffer; + fields: MessageFields; + properties: MessageProperties; +} + +export interface GetMessage extends Message { + fields: GetMessageFields; +} + +export interface ConsumeMessage extends Message { + fields: ConsumeMessageFields; +} + +export interface CommonMessageFields { + deliveryTag: number; + redelivered: boolean; + exchange: string; + routingKey: string; +} + +export interface MessageFields extends CommonMessageFields { + messageCount?: number; + consumerTag?: string; +} + +export interface GetMessageFields extends CommonMessageFields { + messageCount: number; +} + +export interface ConsumeMessageFields extends CommonMessageFields { + deliveryTag: number; +} + +export interface MessageProperties { + contentType: any | undefined; + contentEncoding: any | undefined; + headers: any; + deliveryMode: any | undefined; + priority: any | undefined; + correlationId: any | undefined; + replyTo: any | undefined; + expiration: any | undefined; + messageId: any | undefined; + timestamp: any | undefined; + type: any | undefined; + userId: any | undefined; + appId: any | undefined; + clusterId: any | undefined; +} diff --git a/plugins/node/instrumentation-amqplib/src/types.ts b/plugins/node/instrumentation-amqplib/src/types.ts index 66c6140eeb..d726bbf325 100644 --- a/plugins/node/instrumentation-amqplib/src/types.ts +++ b/plugins/node/instrumentation-amqplib/src/types.ts @@ -13,16 +13,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { Span } from '@opentelemetry/api'; -import { InstrumentationConfig } from '@opentelemetry/instrumentation'; -import type * as amqp from 'amqplib'; +import { Span } from "@opentelemetry/api"; +import { InstrumentationConfig } from "@opentelemetry/instrumentation"; +import { ConsumeMessage, Options } from "./internal-types"; export interface PublishInfo { moduleVersion: string | undefined; exchange: string; routingKey: string; content: Buffer; - options?: amqp.Options.Publish; + options?: Options.Publish; isConfirmChannel?: boolean; } @@ -32,11 +32,11 @@ export interface PublishConfirmedInfo extends PublishInfo { export interface ConsumeInfo { moduleVersion: string | undefined; - msg: amqp.ConsumeMessage; + msg: ConsumeMessage; } export interface ConsumeEndInfo { - msg: amqp.ConsumeMessage; + msg: ConsumeMessage; rejected: boolean | null; endOperation: EndOperation; } @@ -58,15 +58,15 @@ export interface AmqplibConsumeEndCustomAttributeFunction { } export enum EndOperation { - AutoAck = 'auto ack', - Ack = 'ack', - AckAll = 'ackAll', - Reject = 'reject', - Nack = 'nack', - NackAll = 'nackAll', - ChannelClosed = 'channel closed', - ChannelError = 'channel error', - InstrumentationTimeout = 'instrumentation timeout', + AutoAck = "auto ack", + Ack = "ack", + AckAll = "ackAll", + Reject = "reject", + Nack = "nack", + NackAll = "nackAll", + ChannelClosed = "channel closed", + ChannelError = "channel error", + InstrumentationTimeout = "instrumentation timeout", } export interface AmqplibInstrumentationConfig extends InstrumentationConfig { From d7b573af9d9e3e9d827bc179391894f31a328cd7 Mon Sep 17 00:00:00 2001 From: Abhijeet Prasad Date: Mon, 9 Jan 2023 15:45:02 +0100 Subject: [PATCH 3/7] refactor(instrumentation-amqplib): revert prettier formatting --- .../instrumentation-amqplib/src/amqplib.ts | 100 +++++++++--------- .../node/instrumentation-amqplib/src/types.ts | 24 ++--- 2 files changed, 62 insertions(+), 62 deletions(-) diff --git a/plugins/node/instrumentation-amqplib/src/amqplib.ts b/plugins/node/instrumentation-amqplib/src/amqplib.ts index fe9c8c954b..0473758f3d 100644 --- a/plugins/node/instrumentation-amqplib/src/amqplib.ts +++ b/plugins/node/instrumentation-amqplib/src/amqplib.ts @@ -22,36 +22,36 @@ import { SpanKind, SpanStatusCode, ROOT_CONTEXT, -} from "@opentelemetry/api"; +} from '@opentelemetry/api'; import { hrTime, hrTimeDuration, hrTimeToMilliseconds, -} from "@opentelemetry/core"; +} from '@opentelemetry/core'; import { InstrumentationBase, InstrumentationNodeModuleDefinition, InstrumentationNodeModuleFile, isWrapped, safeExecuteInTheMiddle, -} from "@opentelemetry/instrumentation"; +} from '@opentelemetry/instrumentation'; import { SemanticAttributes, MessagingOperationValues, MessagingDestinationKindValues, -} from "@opentelemetry/semantic-conventions"; +} from '@opentelemetry/semantic-conventions'; import { Connection, ConsumeMessage, Message, Options, Replies, -} from "./internal-types"; +} from './internal-types'; import { AmqplibInstrumentationConfig, DEFAULT_CONFIG, EndOperation, -} from "./types"; +} from './types'; import { CHANNEL_CONSUME_TIMEOUT_TIMER, CHANNEL_SPANS_NOT_ENDED, @@ -66,15 +66,15 @@ import { MESSAGE_STORED_SPAN, normalizeExchange, unmarkConfirmChannelTracing, -} from "./utils"; -import { VERSION } from "./version"; +} from './utils'; +import { VERSION } from './version'; export class AmqplibInstrumentation extends InstrumentationBase { protected override _config!: AmqplibInstrumentationConfig; constructor(config?: AmqplibInstrumentationConfig) { super( - "@opentelemetry/instrumentation-amqplib", + '@opentelemetry/instrumentation-amqplib', VERSION, Object.assign({}, DEFAULT_CONFIG, config) ); @@ -86,29 +86,29 @@ export class AmqplibInstrumentation extends InstrumentationBase { protected init() { const channelModelModuleFile = new InstrumentationNodeModuleFile( - "amqplib/lib/channel_model.js", - [">=0.5.5"], + 'amqplib/lib/channel_model.js', + ['>=0.5.5'], this.patchChannelModel.bind(this), this.unpatchChannelModel.bind(this) ); const callbackModelModuleFile = new InstrumentationNodeModuleFile( - "amqplib/lib/callback_model.js", - [">=0.5.5"], + 'amqplib/lib/callback_model.js', + ['>=0.5.5'], this.patchChannelModel.bind(this), this.unpatchChannelModel.bind(this) ); const connectModuleFile = new InstrumentationNodeModuleFile( - "amqplib/lib/connect.js", - [">=0.5.5"], + 'amqplib/lib/connect.js', + ['>=0.5.5'], this.patchConnect.bind(this), this.unpatchConnect.bind(this) ); const module = new InstrumentationNodeModuleDefinition( - "amqplib", - [">=0.5.5"], + 'amqplib', + ['>=0.5.5'], undefined, undefined, [channelModelModuleFile, connectModuleFile, callbackModelModuleFile] @@ -119,14 +119,14 @@ export class AmqplibInstrumentation extends InstrumentationBase { private patchConnect(moduleExports: any) { moduleExports = this.unpatchConnect(moduleExports); if (!isWrapped(moduleExports.connect)) { - this._wrap(moduleExports, "connect", this.getConnectPatch.bind(this)); + this._wrap(moduleExports, 'connect', this.getConnectPatch.bind(this)); } return moduleExports; } private unpatchConnect(moduleExports: any) { if (isWrapped(moduleExports.connect)) { - this._unwrap(moduleExports, "connect"); + this._unwrap(moduleExports, 'connect'); } return moduleExports; } @@ -138,63 +138,63 @@ export class AmqplibInstrumentation extends InstrumentationBase { if (!isWrapped(moduleExports.Channel.prototype.publish)) { this._wrap( moduleExports.Channel.prototype, - "publish", + 'publish', this.getPublishPatch.bind(this, moduleVersion) ); } if (!isWrapped(moduleExports.Channel.prototype.consume)) { this._wrap( moduleExports.Channel.prototype, - "consume", + 'consume', this.getConsumePatch.bind(this, moduleVersion) ); } if (!isWrapped(moduleExports.Channel.prototype.ack)) { this._wrap( moduleExports.Channel.prototype, - "ack", + 'ack', this.getAckPatch.bind(this, false, EndOperation.Ack) ); } if (!isWrapped(moduleExports.Channel.prototype.nack)) { this._wrap( moduleExports.Channel.prototype, - "nack", + 'nack', this.getAckPatch.bind(this, true, EndOperation.Nack) ); } if (!isWrapped(moduleExports.Channel.prototype.reject)) { this._wrap( moduleExports.Channel.prototype, - "reject", + 'reject', this.getAckPatch.bind(this, true, EndOperation.Reject) ); } if (!isWrapped(moduleExports.Channel.prototype.ackAll)) { this._wrap( moduleExports.Channel.prototype, - "ackAll", + 'ackAll', this.getAckAllPatch.bind(this, false, EndOperation.AckAll) ); } if (!isWrapped(moduleExports.Channel.prototype.nackAll)) { this._wrap( moduleExports.Channel.prototype, - "nackAll", + 'nackAll', this.getAckAllPatch.bind(this, true, EndOperation.NackAll) ); } if (!isWrapped(moduleExports.Channel.prototype.emit)) { this._wrap( moduleExports.Channel.prototype, - "emit", + 'emit', this.getChannelEmitPatch.bind(this) ); } if (!isWrapped(moduleExports.ConfirmChannel.prototype.publish)) { this._wrap( moduleExports.ConfirmChannel.prototype, - "publish", + 'publish', this.getConfirmedPublishPatch.bind(this, moduleVersion) ); } @@ -203,31 +203,31 @@ export class AmqplibInstrumentation extends InstrumentationBase { private unpatchChannelModel(moduleExports: any) { if (isWrapped(moduleExports.Channel.prototype.publish)) { - this._unwrap(moduleExports.Channel.prototype, "publish"); + this._unwrap(moduleExports.Channel.prototype, 'publish'); } if (isWrapped(moduleExports.Channel.prototype.consume)) { - this._unwrap(moduleExports.Channel.prototype, "consume"); + this._unwrap(moduleExports.Channel.prototype, 'consume'); } if (isWrapped(moduleExports.Channel.prototype.ack)) { - this._unwrap(moduleExports.Channel.prototype, "ack"); + this._unwrap(moduleExports.Channel.prototype, 'ack'); } if (isWrapped(moduleExports.Channel.prototype.nack)) { - this._unwrap(moduleExports.Channel.prototype, "nack"); + this._unwrap(moduleExports.Channel.prototype, 'nack'); } if (isWrapped(moduleExports.Channel.prototype.reject)) { - this._unwrap(moduleExports.Channel.prototype, "reject"); + this._unwrap(moduleExports.Channel.prototype, 'reject'); } if (isWrapped(moduleExports.Channel.prototype.ackAll)) { - this._unwrap(moduleExports.Channel.prototype, "ackAll"); + this._unwrap(moduleExports.Channel.prototype, 'ackAll'); } if (isWrapped(moduleExports.Channel.prototype.nackAll)) { - this._unwrap(moduleExports.Channel.prototype, "nackAll"); + this._unwrap(moduleExports.Channel.prototype, 'nackAll'); } if (isWrapped(moduleExports.Channel.prototype.emit)) { - this._unwrap(moduleExports.Channel.prototype, "emit"); + this._unwrap(moduleExports.Channel.prototype, 'emit'); } if (isWrapped(moduleExports.ConfirmChannel.prototype.publish)) { - this._unwrap(moduleExports.ConfirmChannel.prototype, "publish"); + this._unwrap(moduleExports.ConfirmChannel.prototype, 'publish'); } return moduleExports; } @@ -277,7 +277,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { this: InstrumentationConsumeChannel, eventName: string ) { - if (eventName === "close") { + if (eventName === 'close') { self.endAllSpansOnChannel( this, true, @@ -289,7 +289,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { clearInterval(activeTimer); } this[CHANNEL_CONSUME_TIMEOUT_TIMER] = undefined; - } else if (eventName === "error") { + } else if (eventName === 'error') { self.endAllSpansOnChannel( this, true, @@ -436,7 +436,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { () => self._config.consumeHook!(span, { moduleVersion, msg }), (e) => { if (e) { - diag.error("amqplib instrumentation: consumerHook error", e); + diag.error('amqplib instrumentation: consumerHook error', e); } }, true @@ -501,9 +501,9 @@ export class AmqplibInstrumentation extends InstrumentationBase { options: modifiedOptions, isConfirmChannel: true, }), - (e) => { + e => { if (e) { - diag.error("amqplib instrumentation: publishHook error", e); + diag.error('amqplib instrumentation: publishHook error', e); } }, true @@ -530,10 +530,10 @@ export class AmqplibInstrumentation extends InstrumentationBase { isConfirmChannel: true, confirmError: err, }), - (e) => { + e => { if (e) { diag.error( - "amqplib instrumentation: publishConfirmHook error", + 'amqplib instrumentation: publishConfirmHook error', e ); } @@ -601,9 +601,9 @@ export class AmqplibInstrumentation extends InstrumentationBase { options: modifiedOptions, isConfirmChannel: false, }), - (e) => { + e => { if (e) { - diag.error("amqplib instrumentation: publishHook error", e); + diag.error('amqplib instrumentation: publishHook error', e); } }, true @@ -673,10 +673,10 @@ export class AmqplibInstrumentation extends InstrumentationBase { operation !== EndOperation.ChannelError ? `${operation} called on message${ requeue === true - ? " with requeue" + ? ' with requeue' : requeue === false - ? " without requeue" - : "" + ? ' without requeue' + : '' }` : operation, }); @@ -712,7 +712,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { () => this._config.consumeEndHook!(span, { msg, rejected, endOperation }), (e) => { if (e) { - diag.error("amqplib instrumentation: consumerEndHook error", e); + diag.error('amqplib instrumentation: consumerEndHook error', e); } }, true diff --git a/plugins/node/instrumentation-amqplib/src/types.ts b/plugins/node/instrumentation-amqplib/src/types.ts index d726bbf325..81231620da 100644 --- a/plugins/node/instrumentation-amqplib/src/types.ts +++ b/plugins/node/instrumentation-amqplib/src/types.ts @@ -13,9 +13,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { Span } from "@opentelemetry/api"; -import { InstrumentationConfig } from "@opentelemetry/instrumentation"; -import { ConsumeMessage, Options } from "./internal-types"; +import { Span } from '@opentelemetry/api'; +import { InstrumentationConfig } from '@opentelemetry/instrumentation'; +import { ConsumeMessage, Options } from './internal-types'; export interface PublishInfo { moduleVersion: string | undefined; @@ -58,15 +58,15 @@ export interface AmqplibConsumeEndCustomAttributeFunction { } export enum EndOperation { - AutoAck = "auto ack", - Ack = "ack", - AckAll = "ackAll", - Reject = "reject", - Nack = "nack", - NackAll = "nackAll", - ChannelClosed = "channel closed", - ChannelError = "channel error", - InstrumentationTimeout = "instrumentation timeout", + AutoAck = 'auto ack', + Ack = 'ack', + AckAll = 'ackAll', + Reject = 'reject', + Nack = 'nack', + NackAll = 'nackAll', + ChannelClosed = 'channel closed', + ChannelError = 'channel error', + InstrumentationTimeout = 'instrumentation timeout', } export interface AmqplibInstrumentationConfig extends InstrumentationConfig { From 657f1696c7d06bfaf4d021854e719037b4668036 Mon Sep 17 00:00:00 2001 From: Abhijeet Prasad Date: Mon, 16 Jan 2023 13:04:59 +0100 Subject: [PATCH 4/7] refactor(instrumentation-amqplib): add code comments for vendoring --- .../instrumentation-amqplib/src/amqplib.ts | 8 +- .../src/internal-types.ts | 230 ---------------- .../node/instrumentation-amqplib/src/types.ts | 257 +++++++++++++++++- 3 files changed, 250 insertions(+), 245 deletions(-) diff --git a/plugins/node/instrumentation-amqplib/src/amqplib.ts b/plugins/node/instrumentation-amqplib/src/amqplib.ts index 0473758f3d..4fcafa1d3f 100644 --- a/plugins/node/instrumentation-amqplib/src/amqplib.ts +++ b/plugins/node/instrumentation-amqplib/src/amqplib.ts @@ -41,16 +41,14 @@ import { MessagingDestinationKindValues, } from '@opentelemetry/semantic-conventions'; import { + AmqplibInstrumentationConfig, Connection, ConsumeMessage, + DEFAULT_CONFIG, + EndOperation, Message, Options, Replies, -} from './internal-types'; -import { - AmqplibInstrumentationConfig, - DEFAULT_CONFIG, - EndOperation, } from './types'; import { CHANNEL_CONSUME_TIMEOUT_TIMER, diff --git a/plugins/node/instrumentation-amqplib/src/internal-types.ts b/plugins/node/instrumentation-amqplib/src/internal-types.ts index b646f1b871..bcd7cadc04 100644 --- a/plugins/node/instrumentation-amqplib/src/internal-types.ts +++ b/plugins/node/instrumentation-amqplib/src/internal-types.ts @@ -13,233 +13,3 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -export namespace Options { - export interface Connect { - /** - * The to be used protocol - * - * Default value: 'amqp' - */ - protocol?: string; - /** - * Hostname used for connecting to the server. - * - * Default value: 'localhost' - */ - hostname?: string; - /** - * Port used for connecting to the server. - * - * Default value: 5672 - */ - port?: number; - /** - * Username used for authenticating against the server. - * - * Default value: 'guest' - */ - username?: string; - /** - * Password used for authenticating against the server. - * - * Default value: 'guest' - */ - password?: string; - /** - * The desired locale for error messages. RabbitMQ only ever uses en_US - * - * Default value: 'en_US' - */ - locale?: string; - /** - * The size in bytes of the maximum frame allowed over the connection. 0 means - * no limit (but since frames have a size field which is an unsigned 32 bit integer, it’s perforce 2^32 - 1). - * - * Default value: 0x1000 (4kb) - That's the allowed minimum, it will fit many purposes - */ - frameMax?: number; - /** - * The period of the connection heartbeat in seconds. - * - * Default value: 0 - */ - heartbeat?: number; - /** - * What VHost shall be used. - * - * Default value: '/' - */ - vhost?: string; - } - - export interface AssertQueue { - exclusive?: boolean; - durable?: boolean; - autoDelete?: boolean; - arguments?: any; - messageTtl?: number; - expires?: number; - deadLetterExchange?: string; - deadLetterRoutingKey?: string; - maxLength?: number; - maxPriority?: number; - } - export interface DeleteQueue { - ifUnused?: boolean; - ifEmpty?: boolean; - } - export interface AssertExchange { - durable?: boolean; - internal?: boolean; - autoDelete?: boolean; - alternateExchange?: string; - arguments?: any; - } - export interface DeleteExchange { - ifUnused?: boolean; - } - export interface Publish { - expiration?: string | number; - userId?: string; - CC?: string | string[]; - - mandatory?: boolean; - persistent?: boolean; - deliveryMode?: boolean | number; - BCC?: string | string[]; - - contentType?: string; - contentEncoding?: string; - headers?: any; - priority?: number; - correlationId?: string; - replyTo?: string; - messageId?: string; - timestamp?: number; - type?: string; - appId?: string; - } - - export interface Consume { - consumerTag?: string; - noLocal?: boolean; - noAck?: boolean; - exclusive?: boolean; - priority?: number; - arguments?: any; - } - - export interface Get { - noAck?: boolean; - } -} - -interface ServerProperties { - host: string; - product: string; - version: string; - platform: string; - copyright?: string; - information: string; - [key: string]: string | undefined; -} - -export namespace Replies { - export interface Empty {} - export interface AssertQueue { - queue: string; - messageCount: number; - consumerCount: number; - } - export interface PurgeQueue { - messageCount: number; - } - export interface DeleteQueue { - messageCount: number; - } - export interface AssertExchange { - exchange: string; - } - export interface Consume { - consumerTag: string; - } -} - -export interface ConfirmChannel { - publish( - exchange: string, - routingKey: string, - content: Buffer, - options?: Options.Publish, - callback?: (err: any, ok: Replies.Empty) => void - ): boolean; - sendToQueue( - queue: string, - content: Buffer, - options?: Options.Publish, - callback?: (err: any, ok: Replies.Empty) => void - ): boolean; - - waitForConfirms(): Promise; -} - -export interface Connection { - close(): Promise; - createChannel(): Promise; - createConfirmChannel(): Promise; - connection: { - serverProperties: ServerProperties; - }; -} - -export interface Message { - content: Buffer; - fields: MessageFields; - properties: MessageProperties; -} - -export interface GetMessage extends Message { - fields: GetMessageFields; -} - -export interface ConsumeMessage extends Message { - fields: ConsumeMessageFields; -} - -export interface CommonMessageFields { - deliveryTag: number; - redelivered: boolean; - exchange: string; - routingKey: string; -} - -export interface MessageFields extends CommonMessageFields { - messageCount?: number; - consumerTag?: string; -} - -export interface GetMessageFields extends CommonMessageFields { - messageCount: number; -} - -export interface ConsumeMessageFields extends CommonMessageFields { - deliveryTag: number; -} - -export interface MessageProperties { - contentType: any | undefined; - contentEncoding: any | undefined; - headers: any; - deliveryMode: any | undefined; - priority: any | undefined; - correlationId: any | undefined; - replyTo: any | undefined; - expiration: any | undefined; - messageId: any | undefined; - timestamp: any | undefined; - type: any | undefined; - userId: any | undefined; - appId: any | undefined; - clusterId: any | undefined; -} diff --git a/plugins/node/instrumentation-amqplib/src/types.ts b/plugins/node/instrumentation-amqplib/src/types.ts index 81231620da..686bfeab22 100644 --- a/plugins/node/instrumentation-amqplib/src/types.ts +++ b/plugins/node/instrumentation-amqplib/src/types.ts @@ -15,7 +15,6 @@ */ import { Span } from '@opentelemetry/api'; import { InstrumentationConfig } from '@opentelemetry/instrumentation'; -import { ConsumeMessage, Options } from './internal-types'; export interface PublishInfo { moduleVersion: string | undefined; @@ -58,15 +57,15 @@ export interface AmqplibConsumeEndCustomAttributeFunction { } export enum EndOperation { - AutoAck = 'auto ack', - Ack = 'ack', - AckAll = 'ackAll', - Reject = 'reject', - Nack = 'nack', - NackAll = 'nackAll', - ChannelClosed = 'channel closed', - ChannelError = 'channel error', - InstrumentationTimeout = 'instrumentation timeout', + AutoAck = "auto ack", + Ack = "ack", + AckAll = "ackAll", + Reject = "reject", + Nack = "nack", + NackAll = "nackAll", + ChannelClosed = "channel closed", + ChannelError = "channel error", + InstrumentationTimeout = "instrumentation timeout", } export interface AmqplibInstrumentationConfig extends InstrumentationConfig { @@ -102,3 +101,241 @@ export interface AmqplibInstrumentationConfig extends InstrumentationConfig { export const DEFAULT_CONFIG: AmqplibInstrumentationConfig = { consumeTimeoutMs: 1000 * 60, // 1 minute }; + +// The following types are vendored from `@types/amqplib@0.10.1` - commit SHA: 4205e03127692a40b4871709a7134fe4e2ed5510 + +// Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/properties.d.ts#LL23C18-L23C25 +export namespace Options { + export interface Connect { + /** + * The to be used protocol + * + * Default value: 'amqp' + */ + protocol?: string; + /** + * Hostname used for connecting to the server. + * + * Default value: 'localhost' + */ + hostname?: string; + /** + * Port used for connecting to the server. + * + * Default value: 5672 + */ + port?: number; + /** + * Username used for authenticating against the server. + * + * Default value: 'guest' + */ + username?: string; + /** + * Password used for authenticating against the server. + * + * Default value: 'guest' + */ + password?: string; + /** + * The desired locale for error messages. RabbitMQ only ever uses en_US + * + * Default value: 'en_US' + */ + locale?: string; + /** + * The size in bytes of the maximum frame allowed over the connection. 0 means + * no limit (but since frames have a size field which is an unsigned 32 bit integer, it’s perforce 2^32 - 1). + * + * Default value: 0x1000 (4kb) - That's the allowed minimum, it will fit many purposes + */ + frameMax?: number; + /** + * The period of the connection heartbeat in seconds. + * + * Default value: 0 + */ + heartbeat?: number; + /** + * What VHost shall be used. + * + * Default value: '/' + */ + vhost?: string; + } + + export interface AssertQueue { + exclusive?: boolean; + durable?: boolean; + autoDelete?: boolean; + arguments?: any; + messageTtl?: number; + expires?: number; + deadLetterExchange?: string; + deadLetterRoutingKey?: string; + maxLength?: number; + maxPriority?: number; + } + export interface DeleteQueue { + ifUnused?: boolean; + ifEmpty?: boolean; + } + export interface AssertExchange { + durable?: boolean; + internal?: boolean; + autoDelete?: boolean; + alternateExchange?: string; + arguments?: any; + } + export interface DeleteExchange { + ifUnused?: boolean; + } + export interface Publish { + expiration?: string | number; + userId?: string; + CC?: string | string[]; + + mandatory?: boolean; + persistent?: boolean; + deliveryMode?: boolean | number; + BCC?: string | string[]; + + contentType?: string; + contentEncoding?: string; + headers?: any; + priority?: number; + correlationId?: string; + replyTo?: string; + messageId?: string; + timestamp?: number; + type?: string; + appId?: string; + } + + export interface Consume { + consumerTag?: string; + noLocal?: boolean; + noAck?: boolean; + exclusive?: boolean; + priority?: number; + arguments?: any; + } + + export interface Get { + noAck?: boolean; + } +} + +// Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/properties.d.ts#L214 +interface ServerProperties { + host: string; + product: string; + version: string; + platform: string; + copyright?: string; + information: string; + [key: string]: string | undefined; +} + +// Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/properties.d.ts#L1 +export namespace Replies { + export interface Empty {} + export interface AssertQueue { + queue: string; + messageCount: number; + consumerCount: number; + } + export interface PurgeQueue { + messageCount: number; + } + export interface DeleteQueue { + messageCount: number; + } + export interface AssertExchange { + exchange: string; + } + export interface Consume { + consumerTag: string; + } +} + +// Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/callback_api.d.ts#L55 +export interface ConfirmChannel { + publish( + exchange: string, + routingKey: string, + content: Buffer, + options?: Options.Publish, + callback?: (err: any, ok: Replies.Empty) => void + ): boolean; + sendToQueue( + queue: string, + content: Buffer, + options?: Options.Publish, + callback?: (err: any, ok: Replies.Empty) => void + ): boolean; + + waitForConfirms(): Promise; +} + +// Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/callback_api.d.ts#L5 +export interface Connection { + close(): Promise; + createChannel(): Promise; + createConfirmChannel(): Promise; + connection: { + serverProperties: ServerProperties; + }; +} + +// Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/properties.d.ts#L142 +export interface Message { + content: Buffer; + fields: MessageFields; + properties: MessageProperties; +} + +export interface GetMessage extends Message { + fields: GetMessageFields; +} + +export interface ConsumeMessage extends Message { + fields: ConsumeMessageFields; +} + +export interface CommonMessageFields { + deliveryTag: number; + redelivered: boolean; + exchange: string; + routingKey: string; +} + +export interface MessageFields extends CommonMessageFields { + messageCount?: number; + consumerTag?: string; +} + +export interface GetMessageFields extends CommonMessageFields { + messageCount: number; +} + +export interface ConsumeMessageFields extends CommonMessageFields { + deliveryTag: number; +} + +export interface MessageProperties { + contentType: any | undefined; + contentEncoding: any | undefined; + headers: any; + deliveryMode: any | undefined; + priority: any | undefined; + correlationId: any | undefined; + replyTo: any | undefined; + expiration: any | undefined; + messageId: any | undefined; + timestamp: any | undefined; + type: any | undefined; + userId: any | undefined; + appId: any | undefined; + clusterId: any | undefined; +} From 104ff7ee5607395ca4afe524c444ae0bce48cf0f Mon Sep 17 00:00:00 2001 From: Abhijeet Prasad Date: Mon, 16 Jan 2023 13:17:00 +0100 Subject: [PATCH 5/7] refactor(instrumentation-amqplib): delete unused file + run lint fixer --- .../instrumentation-amqplib/src/amqplib.ts | 8 ++++---- .../src/internal-types.ts | 15 --------------- .../node/instrumentation-amqplib/src/types.ts | 18 +++++++++--------- 3 files changed, 13 insertions(+), 28 deletions(-) delete mode 100644 plugins/node/instrumentation-amqplib/src/internal-types.ts diff --git a/plugins/node/instrumentation-amqplib/src/amqplib.ts b/plugins/node/instrumentation-amqplib/src/amqplib.ts index 4fcafa1d3f..e990561df4 100644 --- a/plugins/node/instrumentation-amqplib/src/amqplib.ts +++ b/plugins/node/instrumentation-amqplib/src/amqplib.ts @@ -334,7 +334,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { const spansNotEnded: { msg: Message }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; const msgIndex = spansNotEnded.findIndex( - (msgDetails) => msgDetails.msg === message + msgDetails => msgDetails.msg === message ); if (msgIndex < 0) { // should not happen in happy flow @@ -432,7 +432,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { if (self._config.consumeHook) { safeExecuteInTheMiddle( () => self._config.consumeHook!(span, { moduleVersion, msg }), - (e) => { + e => { if (e) { diag.error('amqplib instrumentation: consumerHook error', e); } @@ -692,7 +692,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { ) { const spansNotEnded: { msg: Message }[] = channel[CHANNEL_SPANS_NOT_ENDED] ?? []; - spansNotEnded.forEach((msgDetails) => { + spansNotEnded.forEach(msgDetails => { this.endConsumerSpan(msgDetails.msg, isRejected, operation, requeue); }); channel[CHANNEL_SPANS_NOT_ENDED] = []; @@ -708,7 +708,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { safeExecuteInTheMiddle( () => this._config.consumeEndHook!(span, { msg, rejected, endOperation }), - (e) => { + e => { if (e) { diag.error('amqplib instrumentation: consumerEndHook error', e); } diff --git a/plugins/node/instrumentation-amqplib/src/internal-types.ts b/plugins/node/instrumentation-amqplib/src/internal-types.ts deleted file mode 100644 index bcd7cadc04..0000000000 --- a/plugins/node/instrumentation-amqplib/src/internal-types.ts +++ /dev/null @@ -1,15 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ diff --git a/plugins/node/instrumentation-amqplib/src/types.ts b/plugins/node/instrumentation-amqplib/src/types.ts index 686bfeab22..3e2220840c 100644 --- a/plugins/node/instrumentation-amqplib/src/types.ts +++ b/plugins/node/instrumentation-amqplib/src/types.ts @@ -57,15 +57,15 @@ export interface AmqplibConsumeEndCustomAttributeFunction { } export enum EndOperation { - AutoAck = "auto ack", - Ack = "ack", - AckAll = "ackAll", - Reject = "reject", - Nack = "nack", - NackAll = "nackAll", - ChannelClosed = "channel closed", - ChannelError = "channel error", - InstrumentationTimeout = "instrumentation timeout", + AutoAck = 'auto ack', + Ack = 'ack', + AckAll = 'ackAll', + Reject = 'reject', + Nack = 'nack', + NackAll = 'nackAll', + ChannelClosed = 'channel closed', + ChannelError = 'channel error', + InstrumentationTimeout = 'instrumentation timeout', } export interface AmqplibInstrumentationConfig extends InstrumentationConfig { From 75566b7041ce6600159c77f802205bd7daca2aef Mon Sep 17 00:00:00 2001 From: Abhijeet Prasad Date: Mon, 16 Jan 2023 17:39:37 +0100 Subject: [PATCH 6/7] refactor(instrumentation-amqplib): fix ts eslint errorsOC --- .../node/instrumentation-amqplib/src/types.ts | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/plugins/node/instrumentation-amqplib/src/types.ts b/plugins/node/instrumentation-amqplib/src/types.ts index 3e2220840c..0e5fb3391a 100644 --- a/plugins/node/instrumentation-amqplib/src/types.ts +++ b/plugins/node/instrumentation-amqplib/src/types.ts @@ -13,8 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { Span } from '@opentelemetry/api'; -import { InstrumentationConfig } from '@opentelemetry/instrumentation'; +import { Span } from "@opentelemetry/api"; +import { InstrumentationConfig } from "@opentelemetry/instrumentation"; export interface PublishInfo { moduleVersion: string | undefined; @@ -57,15 +57,15 @@ export interface AmqplibConsumeEndCustomAttributeFunction { } export enum EndOperation { - AutoAck = 'auto ack', - Ack = 'ack', - AckAll = 'ackAll', - Reject = 'reject', - Nack = 'nack', - NackAll = 'nackAll', - ChannelClosed = 'channel closed', - ChannelError = 'channel error', - InstrumentationTimeout = 'instrumentation timeout', + AutoAck = "auto ack", + Ack = "ack", + AckAll = "ackAll", + Reject = "reject", + Nack = "nack", + NackAll = "nackAll", + ChannelClosed = "channel closed", + ChannelError = "channel error", + InstrumentationTimeout = "instrumentation timeout", } export interface AmqplibInstrumentationConfig extends InstrumentationConfig { @@ -105,6 +105,7 @@ export const DEFAULT_CONFIG: AmqplibInstrumentationConfig = { // The following types are vendored from `@types/amqplib@0.10.1` - commit SHA: 4205e03127692a40b4871709a7134fe4e2ed5510 // Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/properties.d.ts#LL23C18-L23C25 +// eslint-disable-next-line @typescript-eslint/no-namespace export namespace Options { export interface Connect { /** @@ -238,8 +239,11 @@ interface ServerProperties { } // Vendored from: https://github.com/DefinitelyTyped/DefinitelyTyped/blob/4205e03127692a40b4871709a7134fe4e2ed5510/types/amqplib/properties.d.ts#L1 +// eslint-disable-next-line @typescript-eslint/no-namespace export namespace Replies { + // eslint-disable-next-line @typescript-eslint/no-empty-interface export interface Empty {} + export interface AssertQueue { queue: string; messageCount: number; From c684f93600d8f6f6fc49ba483fbe3f79630d67ae Mon Sep 17 00:00:00 2001 From: Abhijeet Prasad Date: Mon, 16 Jan 2023 18:00:31 +0100 Subject: [PATCH 7/7] refactor(instrumentation-amqplib): fix lint --- .../node/instrumentation-amqplib/src/types.ts | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/plugins/node/instrumentation-amqplib/src/types.ts b/plugins/node/instrumentation-amqplib/src/types.ts index 0e5fb3391a..fb170571dc 100644 --- a/plugins/node/instrumentation-amqplib/src/types.ts +++ b/plugins/node/instrumentation-amqplib/src/types.ts @@ -13,8 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -import { Span } from "@opentelemetry/api"; -import { InstrumentationConfig } from "@opentelemetry/instrumentation"; +import { Span } from '@opentelemetry/api'; +import { InstrumentationConfig } from '@opentelemetry/instrumentation'; export interface PublishInfo { moduleVersion: string | undefined; @@ -57,15 +57,15 @@ export interface AmqplibConsumeEndCustomAttributeFunction { } export enum EndOperation { - AutoAck = "auto ack", - Ack = "ack", - AckAll = "ackAll", - Reject = "reject", - Nack = "nack", - NackAll = "nackAll", - ChannelClosed = "channel closed", - ChannelError = "channel error", - InstrumentationTimeout = "instrumentation timeout", + AutoAck = 'auto ack', + Ack = 'ack', + AckAll = 'ackAll', + Reject = 'reject', + Nack = 'nack', + NackAll = 'nackAll', + ChannelClosed = 'channel closed', + ChannelError = 'channel error', + InstrumentationTimeout = 'instrumentation timeout', } export interface AmqplibInstrumentationConfig extends InstrumentationConfig {