diff --git a/plugins/node/instrumentation-amqplib/src/amqplib.ts b/plugins/node/instrumentation-amqplib/src/amqplib.ts index e990561df4..f07663a292 100644 --- a/plugins/node/instrumentation-amqplib/src/amqplib.ts +++ b/plugins/node/instrumentation-amqplib/src/amqplib.ts @@ -452,7 +452,7 @@ export class AmqplibInstrumentation extends InstrumentationBase { msg[MESSAGE_STORED_SPAN] = span; } - context.with(trace.setSpan(context.active(), span), () => { + context.with(trace.setSpan(parentContext, span), () => { onMessage.call(this, msg); }); diff --git a/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts b/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts index e9fbdf96e8..9f2ab1d81e 100644 --- a/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts +++ b/plugins/node/instrumentation-amqplib/test/amqplib-callbacks.test.ts @@ -28,7 +28,7 @@ import { MessagingDestinationKindValues, SemanticAttributes, } from '@opentelemetry/semantic-conventions'; -import { context, SpanKind } from '@opentelemetry/api'; +import { Baggage, context, propagation, SpanKind } from '@opentelemetry/api'; import { asyncConfirmSend, asyncConsume, shouldTest } from './utils'; import { censoredUrl, @@ -36,12 +36,27 @@ import { TEST_RABBITMQ_HOST, TEST_RABBITMQ_PORT, } from './config'; +import { + CompositePropagator, + W3CBaggagePropagator, + W3CTraceContextPropagator, +} from '@opentelemetry/core'; const msgPayload = 'payload from test'; const queueName = 'queue-name-from-unittest'; describe('amqplib instrumentation callback model', () => { let conn: amqpCallback.Connection; + before(() => { + propagation.setGlobalPropagator( + new CompositePropagator({ + propagators: [ + new W3CBaggagePropagator(), + new W3CTraceContextPropagator(), + ], + }) + ); + }); before(function (done) { if (!shouldTest) { this.skip(); @@ -186,6 +201,35 @@ describe('amqplib instrumentation callback model', () => { }); }); + it('baggage is available while consuming', done => { + const baggageContext = propagation.setBaggage( + context.active(), + propagation.createBaggage({ + key1: { value: 'value1' }, + }) + ); + context.with(baggageContext, () => { + channel.sendToQueue(queueName, Buffer.from(msgPayload)); + let extractedBaggage: Baggage | undefined; + asyncConsume( + channel, + queueName, + [ + msg => { + extractedBaggage = propagation.getActiveBaggage(); + }, + ], + { + noAck: true, + } + ).then(() => { + expect(extractedBaggage).toBeDefined(); + expect(extractedBaggage!.getEntry('key1')).toBeDefined(); + done(); + }); + }); + }); + it('end span with ack sync', done => { channel.sendToQueue(queueName, Buffer.from(msgPayload));