Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(amqplib): use extracted context for message consuming #1354

Merged
2 changes: 1 addition & 1 deletion plugins/node/instrumentation-amqplib/src/amqplib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
msg[MESSAGE_STORED_SPAN] = span;
}

context.with(trace.setSpan(context.active(), span), () => {
context.with(trace.setSpan(parentContext, span), () => {
onMessage.call(this, msg);
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,35 @@ import {
MessagingDestinationKindValues,
SemanticAttributes,
} from '@opentelemetry/semantic-conventions';
import { context, SpanKind } from '@opentelemetry/api';
import { Baggage, context, propagation, SpanKind } from '@opentelemetry/api';
import { asyncConfirmSend, asyncConsume, shouldTest } from './utils';
import {
censoredUrl,
rabbitMqUrl,
TEST_RABBITMQ_HOST,
TEST_RABBITMQ_PORT,
} from './config';
import {
CompositePropagator,
W3CBaggagePropagator,
W3CTraceContextPropagator,
} from '@opentelemetry/core';

const msgPayload = 'payload from test';
const queueName = 'queue-name-from-unittest';

describe('amqplib instrumentation callback model', () => {
let conn: amqpCallback.Connection;
before(() => {
propagation.setGlobalPropagator(
new CompositePropagator({
propagators: [
new W3CBaggagePropagator(),
new W3CTraceContextPropagator(),
],
})
);
});
before(function (done) {
if (!shouldTest) {
this.skip();
Expand Down Expand Up @@ -186,6 +201,35 @@ describe('amqplib instrumentation callback model', () => {
});
});

it('baggage is available while consuming', done => {
const baggageContext = propagation.setBaggage(
context.active(),
propagation.createBaggage({
key1: { value: 'value1' },
})
);
context.with(baggageContext, () => {
channel.sendToQueue(queueName, Buffer.from(msgPayload));
let extractedBaggage: Baggage | undefined;
asyncConsume(
channel,
queueName,
[
msg => {
extractedBaggage = propagation.getActiveBaggage();
},
],
{
noAck: true,
}
).then(() => {
expect(extractedBaggage).toBeDefined();
expect(extractedBaggage!.getEntry('key1')).toBeDefined();
done();
});
});
});

it('end span with ack sync', done => {
channel.sendToQueue(queueName, Buffer.from(msgPayload));

Expand Down