From 78cd4e118e5a41107d84dfd1ae8c4c28e885b27e Mon Sep 17 00:00:00 2001 From: Michael Haberman Date: Fri, 12 Nov 2021 09:04:45 +0200 Subject: [PATCH] feat: AWS-SDK SNS Context propagation (#728) --- .../README.md | 5 +- .../doc/sns.md | 15 ++ .../package.json | 14 +- .../src/services/MessageAttributes.ts | 75 ++++++++ .../src/services/ServicesExtensions.ts | 2 + .../src/services/sns.ts | 96 +++++++++++ .../src/services/sqs.ts | 50 +----- .../test/sns.test.ts | 162 ++++++++++++++++++ 8 files changed, 363 insertions(+), 56 deletions(-) create mode 100644 plugins/node/opentelemetry-instrumentation-aws-sdk/doc/sns.md create mode 100644 plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/MessageAttributes.ts create mode 100644 plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sns.ts create mode 100644 plugins/node/opentelemetry-instrumentation-aws-sdk/test/sns.test.ts diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/README.md b/plugins/node/opentelemetry-instrumentation-aws-sdk/README.md index 04587729eb..fa6673fee0 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/README.md +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/README.md @@ -96,12 +96,9 @@ AWS contains dozens of services accessible with the JS SDK. For many services, t Specific service logic currently implemented for: - [SQS](./docs/sqs.md) +- [SNS](./docs/sns.md) - DynamoDb ---- - -This instrumentation is a work in progress. We implemented some of the specific trace semantics for some of the services, and strive to support more services and extend the already supported services in the future. You can [Open an Issue](https://github.com/aspecto-io/opentelemetry-ext-js/issues), or [Submit a Pull Request](https://github.com/aspecto-io/opentelemetry-ext-js/pulls) if you want to contribute. - ## Potential Side Effects The instrumentation is doing best effort to support the trace specification of OpenTelemetry. For SQS, it involves defining new attributes on the `Messages` array, as well as on the manipulated types generated from this array (to set correct trace context for a single SQS message operation). Those properties are defined as [non-enumerable](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Enumerability_and_ownership_of_properties) properties, so they have minimum side effect on the app. They will, however, show when using the `Object.getOwnPropertyDescriptors` and `Reflect.ownKeys` functions on SQS `Messages` array and for each `Message` in the array. diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/doc/sns.md b/plugins/node/opentelemetry-instrumentation-aws-sdk/doc/sns.md new file mode 100644 index 0000000000..5c62318de3 --- /dev/null +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/doc/sns.md @@ -0,0 +1,15 @@ +# SNS + +SNS is amazon's managed pub/sub system. Thus, it should follow the [OpenTelemetry specification for Messaging systems](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md). + +## Specific trace semantic + +The following methods are automatically enhanced: + +### Publish messages + +- [Messaging Attributes](https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/trace/semantic_conventions/messaging.md#messaging-attributes) are added by this instrumentation according to the spec. +- OpenTelemetry trace context is injected as SNS MessageAttributes, so the service receiving the message can link cascading spans to the trace which created the message. + +### Consumers +There are many potential consumers: SQS, Lambda, HTTP/S, Email, SMS, mobile notifications. each one of them will received the propagated context in its own way. diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/package.json b/plugins/node/opentelemetry-instrumentation-aws-sdk/package.json index ddbb0a3429..8aff4c465c 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/package.json +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/package.json @@ -60,21 +60,23 @@ "@opentelemetry/sdk-trace-base": "1.0.0", "@types/mocha": "^8.2.2", "@types/node": "^14.0.0", + "@types/sinon": "^10.0.6", "aws-sdk": "2.1008.0", + "eslint": "^7.32.0", "expect": "^25", "mocha": "7.2.0", - "ts-mocha": "8.0.0", "nock": "^13.0.11", + "nyc": "^15.1.0", + "rimraf": "^3.0.2", + "sinon": "^12.0.0", "gts": "3.1.0", "@opentelemetry/contrib-test-utils": "^0.27.0", "test-all-versions": "^5.0.1", + "ts-mocha": "8.0.0", "ts-node": "^9.1.1", - "typescript": "4.3.4", - "eslint": "^7.32.0", - "nyc": "^15.1.0", - "rimraf": "^3.0.2" + "typescript": "4.3.4" }, "engines": { "node": ">=8.5.0" } -} \ No newline at end of file +} diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/MessageAttributes.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/MessageAttributes.ts new file mode 100644 index 0000000000..67b864daa1 --- /dev/null +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/MessageAttributes.ts @@ -0,0 +1,75 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { + TextMapGetter, + TextMapSetter, + context, + propagation, + diag, +} from '@opentelemetry/api'; +import type { SQS, SNS } from 'aws-sdk'; + +// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-quotas.html +export const MAX_MESSAGE_ATTRIBUTES = 10; +class ContextSetter + implements + TextMapSetter +{ + set( + carrier: SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap, + key: string, + value: string + ) { + carrier[key] = { + DataType: 'String', + StringValue: value as string, + }; + } +} +export const contextSetter = new ContextSetter(); + +class ContextGetter + implements + TextMapGetter +{ + keys( + carrier: SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap + ): string[] { + return Object.keys(carrier); + } + + get( + carrier: SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap, + key: string + ): undefined | string | string[] { + return carrier?.[key]?.StringValue; + } +} +export const contextGetter = new ContextGetter(); + +export const injectPropagationContext = ( + attributesMap?: SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap +): SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap => { + const attributes = attributesMap ?? {}; + if (Object.keys(attributes).length < MAX_MESSAGE_ATTRIBUTES) { + propagation.inject(context.active(), attributes, contextSetter); + } else { + diag.warn( + 'aws-sdk instrumentation: cannot set context propagation on SQS/SNS message due to maximum amount of MessageAttributes' + ); + } + return attributes; +}; diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServicesExtensions.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServicesExtensions.ts index 48c1e389b4..6796783013 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServicesExtensions.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/ServicesExtensions.ts @@ -22,12 +22,14 @@ import { NormalizedResponse, } from '../types'; import { DynamodbServiceExtension } from './dynamodb'; +import { SnsServiceExtension } from './sns'; export class ServicesExtensions implements ServiceExtension { services: Map = new Map(); constructor() { this.services.set('SQS', new SqsServiceExtension()); + this.services.set('SNS', new SnsServiceExtension()); this.services.set('DynamoDB', new DynamodbServiceExtension()); } diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sns.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sns.ts new file mode 100644 index 0000000000..61b2effce5 --- /dev/null +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sns.ts @@ -0,0 +1,96 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { Span, Tracer, SpanKind } from '@opentelemetry/api'; +import { + MessagingDestinationKindValues, + SemanticAttributes, +} from '@opentelemetry/semantic-conventions'; +import { + NormalizedRequest, + NormalizedResponse, + AwsSdkInstrumentationConfig, +} from '../types'; +import { injectPropagationContext } from './MessageAttributes'; +import { RequestMetadata, ServiceExtension } from './ServiceExtension'; + +export class SnsServiceExtension implements ServiceExtension { + requestPreSpanHook(request: NormalizedRequest): RequestMetadata { + let spanKind: SpanKind = SpanKind.CLIENT; + let spanName = `SNS ${request.commandName}`; + const spanAttributes = { + [SemanticAttributes.MESSAGING_SYSTEM]: 'aws.sns', + }; + + if (request.commandName === 'Publish') { + spanKind = SpanKind.PRODUCER; + + spanAttributes[SemanticAttributes.MESSAGING_DESTINATION_KIND] = + MessagingDestinationKindValues.TOPIC; + const { TopicArn, TargetArn, PhoneNumber } = request.commandInput; + spanAttributes[SemanticAttributes.MESSAGING_DESTINATION] = + this.extractDestinationName(TopicArn, TargetArn, PhoneNumber); + + spanName = `${spanAttributes[SemanticAttributes.MESSAGING_DESTINATION]} ${ + request.commandName + }`; + } + + return { + isIncoming: false, + spanAttributes, + spanKind, + spanName, + }; + } + + requestPostSpanHook(request: NormalizedRequest): void { + if (request.commandName === 'Publish') { + const origMessageAttributes = + request.commandInput['MessageAttributes'] ?? {}; + if (origMessageAttributes) { + request.commandInput['MessageAttributes'] = injectPropagationContext( + origMessageAttributes + ); + } + } + } + + responseHook( + response: NormalizedResponse, + span: Span, + tracer: Tracer, + config: AwsSdkInstrumentationConfig + ): void {} + + extractDestinationName( + topicArn: string, + targetArn: string, + phoneNumber: string + ): string { + if (topicArn || targetArn) { + const arn = topicArn ?? targetArn; + try { + return arn.substr(arn.lastIndexOf(':') + 1); + } catch (err) { + return arn; + } + } else if (phoneNumber) { + return phoneNumber; + } else { + return 'unknown'; + } + } +} diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sqs.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sqs.ts index 9cbc63fcf8..d2c06cf640 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sqs.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sqs.ts @@ -18,9 +18,6 @@ import { SpanKind, Span, propagation, - diag, - TextMapGetter, - TextMapSetter, trace, context, ROOT_CONTEXT, @@ -37,32 +34,7 @@ import { MessagingDestinationKindValues, SemanticAttributes, } from '@opentelemetry/semantic-conventions'; - -// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-quotas.html -const SQS_MAX_MESSAGE_ATTRIBUTES = 10; -class SqsContextSetter implements TextMapSetter { - set(carrier: SQS.MessageBodyAttributeMap, key: string, value: string) { - carrier[key] = { - DataType: 'String', - StringValue: value as string, - }; - } -} -const sqsContextSetter = new SqsContextSetter(); - -class SqsContextGetter implements TextMapGetter { - keys(carrier: SQS.MessageBodyAttributeMap): string[] { - return Object.keys(carrier); - } - - get( - carrier: SQS.MessageBodyAttributeMap, - key: string - ): undefined | string | string[] { - return carrier?.[key]?.StringValue; - } -} -const sqsContextGetter = new SqsContextGetter(); +import { contextGetter, injectPropagationContext } from './MessageAttributes'; export class SqsServiceExtension implements ServiceExtension { requestPreSpanHook(request: NormalizedRequest): RequestMetadata { @@ -118,7 +90,7 @@ export class SqsServiceExtension implements ServiceExtension { request.commandInput['MessageAttributes'] ?? {}; if (origMessageAttributes) { request.commandInput['MessageAttributes'] = - this.InjectPropagationContext(origMessageAttributes); + injectPropagationContext(origMessageAttributes); } } break; @@ -127,7 +99,7 @@ export class SqsServiceExtension implements ServiceExtension { { request.commandInput?.Entries?.forEach( (messageParams: SQS.SendMessageBatchRequestEntry) => { - messageParams.MessageAttributes = this.InjectPropagationContext( + messageParams.MessageAttributes = injectPropagationContext( messageParams.MessageAttributes ?? {} ); } @@ -157,7 +129,7 @@ export class SqsServiceExtension implements ServiceExtension { parentContext: propagation.extract( ROOT_CONTEXT, message.MessageAttributes, - sqsContextGetter + contextGetter ), attributes: { [SemanticAttributes.MESSAGING_SYSTEM]: 'aws.sqs', @@ -193,18 +165,4 @@ export class SqsServiceExtension implements ServiceExtension { return segments[segments.length - 1]; }; - - InjectPropagationContext( - attributesMap?: SQS.MessageBodyAttributeMap - ): SQS.MessageBodyAttributeMap { - const attributes = attributesMap ?? {}; - if (Object.keys(attributes).length < SQS_MAX_MESSAGE_ATTRIBUTES) { - propagation.inject(context.active(), attributes, sqsContextSetter); - } else { - diag.warn( - 'aws-sdk instrumentation: cannot set context propagation on SQS message due to maximum amount of MessageAttributes' - ); - } - return attributes; - } } diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/sns.test.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/sns.test.ts new file mode 100644 index 0000000000..d667d9bd37 --- /dev/null +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/sns.test.ts @@ -0,0 +1,162 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import { AwsInstrumentation } from '../src'; +import { + getTestSpans, + registerInstrumentationTesting, +} from '@opentelemetry/contrib-test-utils'; +const instrumentation = registerInstrumentationTesting( + new AwsInstrumentation() +); +import * as AWS from 'aws-sdk'; + +import { mockV2AwsSend } from './testing-utils'; +import * as expect from 'expect'; +import { ReadableSpan } from '@opentelemetry/sdk-trace-base'; +import * as sinon from 'sinon'; +import { + MessagingDestinationKindValues, + SemanticAttributes, +} from '@opentelemetry/semantic-conventions'; +import { SpanKind } from '@opentelemetry/api'; + +const responseMockSuccess = { + requestId: '0000000000000', + error: null, +}; + +const topicName = 'topic'; +const fakeARN = `arn:aws:sns:region:000000000:${topicName}`; + +describe('SNS', () => { + before(() => { + AWS.config.credentials = { + accessKeyId: 'test key id', + expired: false, + expireTime: new Date(), + secretAccessKey: 'test acc key', + sessionToken: 'test token', + }; + }); + + beforeEach(() => { + mockV2AwsSend(responseMockSuccess, { + MessageId: '1', + } as AWS.SNS.Types.PublishResponse); + }); + + describe('publish', () => { + it('topic arn', async () => { + const sns = new AWS.SNS(); + + await sns + .publish({ + Message: 'sns message', + TopicArn: fakeARN, + }) + .promise(); + + const publishSpans = getTestSpans().filter( + (s: ReadableSpan) => s.name === `${topicName} Publish` + ); + expect(publishSpans.length).toBe(1); + + const publishSpan = publishSpans[0]; + expect( + publishSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION_KIND] + ).toBe(MessagingDestinationKindValues.TOPIC); + expect( + publishSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION] + ).toBe(topicName); + expect(publishSpan.attributes[SemanticAttributes.RPC_METHOD]).toBe( + 'Publish' + ); + expect(publishSpan.attributes[SemanticAttributes.MESSAGING_SYSTEM]).toBe( + 'aws.sns' + ); + expect(publishSpan.kind).toBe(SpanKind.PRODUCER); + }); + + it('phone number', async () => { + const sns = new AWS.SNS(); + const PhoneNumber = 'my phone number'; + await sns + .publish({ + Message: 'sns message', + PhoneNumber, + }) + .promise(); + + const publishSpans = getTestSpans().filter( + (s: ReadableSpan) => s.name === `${PhoneNumber} Publish` + ); + expect(publishSpans.length).toBe(1); + const publishSpan = publishSpans[0]; + expect( + publishSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION] + ).toBe(PhoneNumber); + }); + + it('inject context propagation', async () => { + const sns = new AWS.SNS(); + const hookSpy = sinon.spy( + (instrumentation['servicesExtensions'] as any)['services'].get('SNS'), + 'requestPostSpanHook' + ); + + await sns + .publish({ + Message: 'sns message', + TopicArn: fakeARN, + }) + .promise(); + + const publishSpans = getTestSpans().filter( + (s: ReadableSpan) => s.name === `${topicName} Publish` + ); + expect(publishSpans.length).toBe(1); + expect( + hookSpy.args[0][0].commandInput.MessageAttributes.traceparent + ).toBeDefined(); + }); + }); + + describe('createTopic', () => { + it('basic createTopic creates a valid span', async () => { + const sns = new AWS.SNS(); + + const Name = 'my new topic'; + await sns.createTopic({ Name }).promise(); + + const spans = getTestSpans(); + const createTopicSpans = spans.filter( + (s: ReadableSpan) => s.name === 'SNS CreateTopic' + ); + expect(createTopicSpans.length).toBe(1); + + const createTopicSpan = createTopicSpans[0]; + expect( + createTopicSpan.attributes[ + SemanticAttributes.MESSAGING_DESTINATION_KIND + ] + ).toBeUndefined(); + expect( + createTopicSpan.attributes[SemanticAttributes.MESSAGING_DESTINATION] + ).toBeUndefined(); + expect(createTopicSpan.kind).toBe(SpanKind.CLIENT); + }); + }); +});