From 59ee7a1cfa499c10c6fc4dc7973eca436f963dd8 Mon Sep 17 00:00:00 2001 From: Tomasz Pytel Date: Sat, 12 Nov 2022 16:19:05 -0300 Subject: [PATCH] AWS DynamoDB, Lambda, SQS and SNS plugins --- README.md | 14 +- src/Tag.ts | 10 +- src/agent/protocol/grpc/AuthInterceptor.ts | 7 +- .../protocol/grpc/SegmentObjectAdapter.ts | 95 +++----- src/aws/AWSLambdaGatewayAPIHTTP.ts | 5 +- src/aws/AWSLambdaGatewayAPIREST.ts | 5 +- src/aws/AWSLambdaTriggerPlugin.ts | 118 ++++++--- src/aws/SDK2.ts | 230 ++++++++++++++++++ src/config/AgentConfig.ts | 8 +- src/core/PluginInstaller.ts | 198 ++++++++++++++- src/core/SwPlugin.ts | 1 + src/index.ts | 16 +- src/plugins/AMQPLibPlugin.ts | 2 +- src/plugins/AWS2DynamoDBPlugin.ts | 89 +++++++ src/plugins/AWS2LambdaPlugin.ts | 128 ++++++++++ src/plugins/AWS2SNSPlugin.ts | 127 ++++++++++ src/plugins/AWS2SQSPlugin.ts | 218 +++++++++++++++++ src/plugins/AxiosPlugin.ts | 2 +- src/plugins/ExpressPlugin.ts | 4 +- src/plugins/HttpPlugin.ts | 1 + src/plugins/IORedisPlugin.ts | 2 +- src/plugins/MongoDBPlugin.ts | 6 +- src/plugins/MongoosePlugin.ts | 2 +- src/plugins/MySQL2Plugin.ts | 15 +- src/plugins/MySQLPlugin.ts | 2 +- src/plugins/PgPlugin.ts | 4 +- src/trace/context/ContextManager.ts | 14 +- src/trace/context/SpanContext.ts | 23 +- src/trace/span/DummySpan.ts | 6 +- src/trace/span/Span.ts | 17 +- 30 files changed, 1218 insertions(+), 151 deletions(-) create mode 100644 src/aws/SDK2.ts create mode 100644 src/plugins/AWS2DynamoDBPlugin.ts create mode 100644 src/plugins/AWS2LambdaPlugin.ts create mode 100644 src/plugins/AWS2SNSPlugin.ts create mode 100644 src/plugins/AWS2SQSPlugin.ts diff --git a/README.md b/README.md index c3408d5..dedb188 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,9 @@ Environment Variable | Description | Default | `SW_SQL_PARAMETERS_MAX_LENGTH` | The maximum string length of SQL parameters to log | `512` | | `SW_MONGO_TRACE_PARAMETERS` | If set to 'true' then mongodb query parameters will be included | `false` | | `SW_MONGO_PARAMETERS_MAX_LENGTH` | The maximum string length of mongodb parameters to log | `512` | -| `SW_AWSLAMBDA_FLUSH` | If set to 'true' then AWS Lambda functions will flush segment data when the handler finishes. `false` will be more optimal for high throughput applications but may lose spans. | `true` | +| `SW_AWS_LAMBDA_FLUSH` | Maximum number of float seconds allowed to pass between invocations before consecutive Lambda function calls flush automatically upon exit, 0 means always flush, -1 means never. | `2` | +| `SW_AWS_LAMBDA_CHAIN` | Pass trace ID to AWS Lambda function in its parameters (to allow linking). Only use if both caller and callee will be instrumented. | `false` | +| `SW_AWS_SQS_CHECK_BODY` | Incoming SQS messages check inside the body for trace ID in order to allow linking outgoing SNS messages to incoming SQS. | `false` | | `SW_AGENT_MAX_BUFFER_SIZE` | The maximum buffer size before sending the segment data to backend | `'1000'` | Note that the various ignore options like `SW_IGNORE_SUFFIX`, `SW_TRACE_IGNORE_PATH` and `SW_HTTP_IGNORE_METHOD` as well as endpoints which are not recorded due to exceeding `SW_AGENT_MAX_BUFFER_SIZE` all propagate their ignored status downstream to any other endpoints they may call. If that endpoint is running the Node Skywalking agent then regardless of its ignore settings it will not be recorded since its upstream parent was not recorded. This allows the elimination of entire trees of endpoints you are not interested in as well as eliminating partial traces if a span in the chain is ignored but calls out to other endpoints which are recorded as children of ROOT instead of the actual parent. @@ -87,6 +89,10 @@ Library | Plugin Name | [`Mongoose`](https://github.com/Automattic/mongoose) | `mongoose` | | [`RabbitMQ`](https://github.com/squaremo/amqp.node) | `amqplib` | | [`Redis`](https://github.com/luin/ioredis) | `ioredis` | +| [`AWS2DynamoDB](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html) | `aws-sdk` | +| [`AWS2Lambda](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html) | `aws-sdk` | +| [`AWS2SNS](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SNS.html) | `aws-sdk` | +| [`AWS2SQS](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html) | `aws-sdk` | ### Compatible Libraries @@ -138,6 +144,12 @@ exports.handler = AWSLambdaGatewayAPIHTTP.wrap(async function (event, context, c This is similar to Azure Functions wrapping, just wrap your handler function with `AWSLambdaTriggerPlugin.wrap()` or `AWSLambdaGatewayAPIHTTP.wrap()` or `AWSLambdaGatewayAPIREST.wrap()`. One thing to note is that AWS freezes processes in between invocations of lambda functions so whether you are doing async or sync handler functions with callbacks, you should make sure everything you need to do finishes before returning control to AWS or calling the synchronous callback. These plugins take this into account and automatically flush the segment buffers before closing a trace span. +## Experimental Webpack Support + +Webpack requires that all imports be statically defined at compile-time and so was not compatible with the dynamic search and loading done by the standard `PluginInstaller`. This has been extended to attempt static imports if the application is determined to be running out of a webpack bundle. This requires that any new plugins be manually added to `PluginInstaller.installBundled()`. Only plugins which allow a `require('module/package.json')` will work with this method as `package.json` needs to be loaded to determine the version of the plugin module present. Some modules specifically disallow import of their package.json and so can not be loaded like this. + +Upon compile with `webpack` it will complain about missing modules for which imports are attempted in the sw agent but which are not present. Simply add these modules to the list of modules to be ignored by webpack, for example by `resolve: {alias: {'module': false}}`. + ## Contact Us * Submit [an issue](https://github.com/apache/skywalking/issues/new) by using [Nodejs] as title prefix. * Mail list: **dev@skywalking.apache.org**. Mail to `dev-subscribe@skywalking.apache.org`, follow the reply to subscribe the mail list. diff --git a/src/Tag.ts b/src/Tag.ts index 8009ce6..2001fc9 100644 --- a/src/Tag.ts +++ b/src/Tag.ts @@ -25,7 +25,7 @@ export interface Tag { export default { coldStartKey: 'coldStart', - httpStatusCodeKey: 'http.status.code', // TODO: maybe find a better place to put these? + httpStatusCodeKey: 'http.status.code', // TODO: maybe find a better place to put these? httpStatusMsgKey: 'http.status.msg', httpURLKey: 'http.url', httpMethodKey: 'http.method', @@ -37,6 +37,7 @@ export default { mqBrokerKey: 'mq.broker', mqTopicKey: 'mq.topic', mqQueueKey: 'mq.queue', + arnKey: 'arn', coldStart(val: boolean = true): Tag { return { @@ -129,4 +130,11 @@ export default { val: `${val}`, } as Tag; }, + arn(val: string | undefined): Tag { + return { + key: this.arnKey, + overridable: true, + val: `${val}`, + } as Tag; + }, }; diff --git a/src/agent/protocol/grpc/AuthInterceptor.ts b/src/agent/protocol/grpc/AuthInterceptor.ts index 424c626..0f5ae4d 100644 --- a/src/agent/protocol/grpc/AuthInterceptor.ts +++ b/src/agent/protocol/grpc/AuthInterceptor.ts @@ -20,11 +20,10 @@ import * as grpc from '@grpc/grpc-js'; import config from '../../../config/AgentConfig'; - -export default function AuthInterceptor() { - const mata = new grpc.Metadata() +export default function AuthInterceptor() { + const mata = new grpc.Metadata(); if (config.authorization) { mata.add('Authentication', config.authorization); - } + } return mata; } diff --git a/src/agent/protocol/grpc/SegmentObjectAdapter.ts b/src/agent/protocol/grpc/SegmentObjectAdapter.ts index 94cc0ee..6559f20 100644 --- a/src/agent/protocol/grpc/SegmentObjectAdapter.ts +++ b/src/agent/protocol/grpc/SegmentObjectAdapter.ts @@ -20,13 +20,7 @@ import config from '../../../config/AgentConfig'; import { KeyStringValuePair } from '../../../proto/common/Common_pb'; import Segment from '../../../trace/context/Segment'; -import { - Log, - RefType, - SegmentObject, - SegmentReference, - SpanObject, -} from '../../../proto/language-agent/Tracing_pb'; +import { Log, RefType, SegmentObject, SegmentReference, SpanObject } from '../../../proto/language-agent/Tracing_pb'; /** * An adapter that adapts {@link Segment} objects to gRPC object {@link SegmentObject}. @@ -35,55 +29,46 @@ export default class SegmentObjectAdapter extends SegmentObject { constructor(segment: Segment) { super(); super - .setService(config.serviceName) - .setServiceinstance(config.serviceInstance) - .setTraceid(segment.relatedTraces[0].toString()) - .setTracesegmentid(segment.segmentId.toString()) - .setSpansList( - segment.spans.map((span) => - new SpanObject() - .setSpanid(span.id) - .setParentspanid(span.parentId) - .setStarttime(span.startTime) - .setEndtime(span.endTime) - .setOperationname(span.operation) - .setPeer(span.peer) - .setSpantype(span.type) - .setSpanlayer(span.layer) - .setComponentid(span.component.id) - .setIserror(span.errored) - .setLogsList( - span.logs.map((log) => - new Log() - .setTime(log.timestamp) - .setDataList( - log.items.map((logItem) => - new KeyStringValuePair() - .setKey(logItem.key) - .setValue(logItem.val)), + .setService(config.serviceName) + .setServiceinstance(config.serviceInstance) + .setTraceid(segment.relatedTraces[0].toString()) + .setTracesegmentid(segment.segmentId.toString()) + .setSpansList( + segment.spans.map((span) => + new SpanObject() + .setSpanid(span.id) + .setParentspanid(span.parentId) + .setStarttime(span.startTime) + .setEndtime(span.endTime) + .setOperationname(span.operation) + .setPeer(span.peer) + .setSpantype(span.type) + .setSpanlayer(span.layer) + .setComponentid(span.component.id) + .setIserror(span.errored) + .setLogsList( + span.logs.map((log) => + new Log() + .setTime(log.timestamp) + .setDataList( + log.items.map((logItem) => new KeyStringValuePair().setKey(logItem.key).setValue(logItem.val)), + ), + ), + ) + .setTagsList(span.tags.map((tag) => new KeyStringValuePair().setKey(tag.key).setValue(tag.val))) + .setRefsList( + span.refs.map((ref) => + new SegmentReference() + .setReftype(RefType.CROSSPROCESS) + .setTraceid(ref.traceId.toString()) + .setParenttracesegmentid(ref.segmentId.toString()) + .setParentspanid(ref.spanId) + .setParentservice(ref.service) + .setParentserviceinstance(ref.serviceInstance) + .setNetworkaddressusedatpeer(ref.clientAddress), + ), ), - ), - ) - .setTagsList( - span.tags.map((tag) => - new KeyStringValuePair() - .setKey(tag.key) - .setValue(tag.val), - ), - ) - .setRefsList( - span.refs.map((ref) => - new SegmentReference() - .setReftype(RefType.CROSSPROCESS) - .setTraceid(ref.traceId.toString()) - .setParenttracesegmentid(ref.segmentId.toString()) - .setParentspanid(ref.spanId) - .setParentservice(ref.service) - .setParentserviceinstance(ref.serviceInstance) - .setNetworkaddressusedatpeer(ref.clientAddress), - ), ), - ), - ); + ); } } diff --git a/src/aws/AWSLambdaGatewayAPIHTTP.ts b/src/aws/AWSLambdaGatewayAPIHTTP.ts index 9422358..33f5868 100644 --- a/src/aws/AWSLambdaGatewayAPIHTTP.ts +++ b/src/aws/AWSLambdaGatewayAPIHTTP.ts @@ -29,7 +29,7 @@ import { ignoreHttpMethodCheck } from '../config/AgentConfig'; import { AWSLambdaTriggerPlugin } from './AWSLambdaTriggerPlugin'; class AWSLambdaGatewayAPIHTTP extends AWSLambdaTriggerPlugin { - start(event: any, context: any): Span { + start(event: any, context: any): [Span, any] { const headers = event.headers; const reqCtx = event.requestContext; const http = reqCtx?.http; @@ -56,7 +56,6 @@ class AWSLambdaGatewayAPIHTTP extends AWSLambdaTriggerPlugin { ? DummySpan.create() : ContextManager.current.newEntrySpan(operation, carrier); - span.layer = SpanLayer.HTTP; span.component = Component.AWSLAMBDA_GATEWAYAPIHTTP; span.peer = http?.sourceIp ?? headers?.['x-forwarded-for'] ?? 'Unknown'; @@ -66,7 +65,7 @@ class AWSLambdaGatewayAPIHTTP extends AWSLambdaTriggerPlugin { span.start(); - return span; + return [span, event]; } stop(span: Span, err: Error | null, res: any): void { diff --git a/src/aws/AWSLambdaGatewayAPIREST.ts b/src/aws/AWSLambdaGatewayAPIREST.ts index 7208133..277e85c 100644 --- a/src/aws/AWSLambdaGatewayAPIREST.ts +++ b/src/aws/AWSLambdaGatewayAPIREST.ts @@ -29,7 +29,7 @@ import { ignoreHttpMethodCheck } from '../config/AgentConfig'; import { AWSLambdaTriggerPlugin } from './AWSLambdaTriggerPlugin'; class AWSLambdaGatewayAPIREST extends AWSLambdaTriggerPlugin { - start(event: any, context: any): Span { + start(event: any, context: any): [Span, any] { const headers = event.headers; const reqCtx = event.requestContext; const method = reqCtx?.httpMethod ?? event.httpMethod; @@ -58,7 +58,6 @@ class AWSLambdaGatewayAPIREST extends AWSLambdaTriggerPlugin { ? DummySpan.create() : ContextManager.current.newEntrySpan(operation, carrier); - span.layer = SpanLayer.HTTP; span.component = Component.AWSLAMBDA_GATEWAYAPIREST; span.peer = reqCtx?.identity?.sourceIp ?? headers?.['X-Forwarded-For'] ?? 'Unknown'; @@ -68,7 +67,7 @@ class AWSLambdaGatewayAPIREST extends AWSLambdaTriggerPlugin { span.start(); - return span; + return [span, event]; } stop(span: Span, err: Error | null, res: any): void { diff --git a/src/aws/AWSLambdaTriggerPlugin.ts b/src/aws/AWSLambdaTriggerPlugin.ts index bd91e8b..c08858b 100644 --- a/src/aws/AWSLambdaTriggerPlugin.ts +++ b/src/aws/AWSLambdaTriggerPlugin.ts @@ -17,23 +17,60 @@ * */ +import { performance } from 'perf_hooks'; import config from '../config/AgentConfig'; +import { ContextCarrier } from '../trace/context/ContextCarrier'; import ContextManager from '../trace/context/ContextManager'; import { Component } from '../trace/Component'; +import Tag from '../Tag'; import Span from '../trace/span/Span'; +import { SpanLayer } from '../proto/language-agent/Tracing_pb'; import { default as agent } from '../index'; +let _lastTimestamp = -Infinity; + +const KeyTrace = '__revdTraceId'; +const KeyParams = '__revdParams'; // original params (if not originally an object) + class AWSLambdaTriggerPlugin { // default working start function, should be overridden by the various types of lambda trigger subclasses - start(event: any, context: any): Span { - const span = ContextManager.current.newEntrySpan(context.functionName ? `/${context.functionName}` : '/'); + start(event: any, context: any): [Span, any] { + let peer = 'Unknown'; + let carrier: ContextCarrier | undefined = undefined; + + if (event && typeof event === 'object') { + // pull traceid out of params if it is in there + let traceId = event[KeyTrace]; + + if (traceId && typeof traceId === 'string') { + const idx = traceId.lastIndexOf('/'); + + if (idx !== -1) { + peer = traceId.slice(idx + 1); + traceId = traceId.slice(0, idx); + carrier = ContextCarrier.from({ sw8: traceId }); + + if (carrier) { + if (!carrier.isValid()) carrier = undefined; + else { + const params = event[KeyParams]; + + if (params !== undefined) event = params; + else delete event[KeyTrace]; + } + } + } + } + } + + const span = ContextManager.current.newEntrySpan('AWS/Lambda/' + (context.functionName || ''), carrier); span.component = Component.AWSLAMBDA_FUNCTION; - span.peer = 'Unknown'; + span.peer = peer; span.start(); - return span; + return [span, event]; } // default working stop function @@ -43,54 +80,69 @@ class AWSLambdaTriggerPlugin { wrap(func: any) { return async (event: any, context: any, callback: any) => { - ContextManager.removeTailFinishedContexts(); // need this because AWS seems to chain sequential independent operations linearly instead of hierarchically + const ts = performance.now() / 1000; - const span = this.start(event, context); - let ret: any; + let done = async (err: Error | null, res?: any) => { + done = async (err: Error | null, res?: any) => res; - let stop = async (err: Error | null, res: any) => { - stop = async (err: Error | null, res: any) => {}; + if (err) span.error(err); this.stop(span, err, res); - if (config.awsLambdaFlush) { - await new Promise((resolve) => setTimeout(resolve, 0)); // child spans of this span may have finalization waiting in the event loop in which case we give them a chance to run so that the segment can be archived properly for flushing + if (config.awsLambdaFlush >= 0) { + if (ts - _lastTimestamp >= config.awsLambdaFlush) { + await new Promise((resolve) => setTimeout(resolve, 0)); // child spans of this span may have finalization waiting in the event loop in which case we give them a chance to run so that the segment can be archived properly for flushing - const p = agent.flush(); // flush all data before aws freezes the process on exit + const p = agent.flush(); // flush all data before aws freezes the process on exit - if (p) await p; + if (p) await p; + } + + _lastTimestamp = performance.now() / 1000; } return res; }; - let resolve: any; - let reject: any; - let callbackDone = false; + let cbdone = (err: Error | null, res?: any): any => { + // for the weird AWS done function behaviors + cbdone = (err: Error | null, res?: any) => ({ finally: () => undefined }); - const callbackPromise = new Promise((_resolve: any, _reject: any) => { - resolve = _resolve; - reject = _reject; - }); + return done(err, res); + }; - try { - ret = func(event, context, (err: Error | null, res: any) => { - if (!callbackDone) { - callbackDone = true; + ContextManager.clearAll(); // need this because AWS seems to chain sequential independent operations linearly instead of hierarchically - if (err) reject(err); - else resolve(res); - } + const _done = context.done; + const [span, _event] = this.start(event, context); + + try { + event = _event; + span.layer = SpanLayer.HTTP; + + if (context.invokedFunctionArn) span.tag(Tag.arn(context.invokedFunctionArn)); + + context.done = (err: Error | null, res: any) => { + cbdone(err, res).finally(() => _done(err, res)); + }; + context.succeed = (res: any) => { + cbdone(null, res).finally(() => _done(null, res)); + }; + context.fail = (err: Error | null) => { + cbdone(err).finally(() => _done(err)); + }; + + let ret = func(event, context, (err: Error | null, res: any) => { + cbdone(err, res).finally(() => callback(err, res)); }); - if (typeof ret?.then !== 'function') + if (typeof ret?.then === 'function') // generic Promise check - ret = callbackPromise; + ret = await ret; - return await stop(null, await ret); + return await done(null, ret); } catch (e) { - span.error(e); - await stop(e, null); + await done(e, null); throw e; } @@ -101,4 +153,4 @@ class AWSLambdaTriggerPlugin { // noinspection JSUnusedGlobalSymbols export default new AWSLambdaTriggerPlugin(); -export { AWSLambdaTriggerPlugin }; +export { AWSLambdaTriggerPlugin, KeyTrace, KeyParams }; diff --git a/src/aws/SDK2.ts b/src/aws/SDK2.ts new file mode 100644 index 0000000..2a8a17d --- /dev/null +++ b/src/aws/SDK2.ts @@ -0,0 +1,230 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import Tag from '../Tag'; +import Span from '../trace/span/Span'; +import PluginInstaller from '../core/PluginInstaller'; + +let _AWS: any = null; +let _runTo: any, _send: any, _promise: any; + +// XXX: Special versions of wrapCallback() and wrapPromise() which allows wrapping successful callback. This is used +// when an Exit span is converted to an Entry span on success of getting SQS messages. It does some extra stuff which is +// harmless in this context because it comes from a specialized fork of agent. + +const wrapCallback = (span: Span, callback: any, idxError: any = false, state?: any) => { + return function (this: any, ...args: any[]) { + if (state) { + if (state.stopped) + // don't proc span stuff if span already stopped + return callback.apply(this, args); + + if (!state.noStop) { + state.stopped = true; + + if (state.timeouts) { + for (const timeout of state.timeouts) clearTimeout(timeout); + } + } + } + + span.resync(); + + let ret; + let isErrorOrPostCall = idxError !== false && args[idxError]; + + try { + if (isErrorOrPostCall) span.error(args[idxError]); + else if (state?.beforeCB) { + isErrorOrPostCall = true; + span = state.beforeCB.call(this, span, ...args); + isErrorOrPostCall = false; + + return callback.apply(this, args); + } else isErrorOrPostCall = true; + } catch (err) { + span.error(err); + + throw err; + } finally { + if (!state?.noStop) span.stop(); + else span.async(); + + if (isErrorOrPostCall) ret = callback.apply(this, args); + } + + return ret; + }; +}; + +const wrapPromise = (span: Span, promise: any, state?: any, initialized?: boolean) => { + if (!initialized) { + state = { ...(state || {}), stopped: 0, thend: 0, catched: 0, timeouts: [] }; + + promise = promise.then( + // make sure span ends even if there is no user .then(), .catch() or .finally() + (res: any) => { + state.timeouts.push( + setTimeout(() => { + if (!state.stopped++) { + span.stop(); + } + }), + ); + + return res; + }, + + (err: any) => { + state.timeouts.push( + setTimeout(() => { + if (!state.stopped++) { + span.error(err); + span.stop(); + } + }), + ); + + return Promise.reject(err); + }, + ); + } + + const _then = promise.then; + const _catch = promise.catch; + + Object.defineProperty(promise, 'then', { + configurable: true, + writable: true, + value: function (this: any, ...args: any[]): any { + if (args[0] && !state.thend++) args[0] = wrapCallback(span, args[0], false, state); + if (args[1] && !state.catched++) args[1] = wrapCallback(span, args[1], 0, state); + + const _promise = _then.apply(this, args); + + if (state.thend && state.catched) return _promise; + else return wrapPromise(span, _promise, state, true); + }, + }); + + Object.defineProperty(promise, 'catch', { + configurable: true, + writable: true, + value: function (this: any, err: any): any { + if (!state.catched++) err = wrapCallback(span, err, 0, state); + + const _promise = _catch.call(this, err); + + if (state.thend && state.catched) return _promise; + else return wrapPromise(span, _promise, state, true); + }, + }); + + return promise; +}; + +export function getAWS(installer: PluginInstaller): any { + if (!_AWS) { + _AWS = installer.require?.('aws-sdk') ?? require('aws-sdk'); + _runTo = _AWS.Request.prototype.runTo; + _send = _AWS.Request.prototype.send; + _promise = _AWS.Request.prototype.promise; + } + + return _AWS; +} + +export const execute = ( + span: Span, + _this: any, + func: any, + params: any, + callback: any, + hostTag?: string | null, + beforeCB?: (this: any, span: Span, ...args: any[]) => Span, +) => { + span.start(); + const state = beforeCB ? { beforeCB } : null; + + try { + if (callback) callback = wrapCallback(span, callback, 0, state); + + const req = func.call(_this, params, callback); + + if (hostTag) span.tag((Tag[hostTag as keyof typeof Tag] as any)(req.httpRequest?.endpoint?.href)); + else if (!span.peer && hostTag !== null) span.peer = req.httpRequest?.endpoint?.href; + // span.peer = `${req.httpRequest?.endpoint?.hostname ?? '???'}:${req.httpRequest?.endpoint?.port ?? '???'}`; + + if (!callback) { + req.send = function (send_callback: any) { + if (send_callback) send_callback = callback = wrapCallback(span, send_callback, 0, state); + + _send.call(this, send_callback); + }; + + req.promise = function () { + let ret = _promise.apply(this, arguments); + + if (!callback) { + // we check just in case a .send() was done, which shouldn't be done but users... + callback = true; + ret = wrapPromise( + span, + ret, + // convert from Promise.then(res) success args to aws-sdk2 callback(err, res) success args + beforeCB ? { beforeCB: (span: Span, res: any) => beforeCB(span, null, res) } : null, + ); + } + + return ret; + }; + + req.on('complete', function (res: any) { + if (!callback) { + // we check again because .send() might have introduced a callback + const block = span.resync(); + + if (res.error) span.error(res.error); + + span.stop(); + } + }); + } + + req.runTo = function () { + // we need to resync for this so that the http client picks up our exit span and sees that it inherits from it and doesn't do a whole new span + span.resync(); + + try { + _runTo.apply(this, arguments); + } finally { + span.async(); + } + }; + + span.async(); + + return req; + } catch (e) { + span.error(e); + span.stop(); + + throw e; + } +}; diff --git a/src/config/AgentConfig.ts b/src/config/AgentConfig.ts index dcfc110..ad82b59 100644 --- a/src/config/AgentConfig.ts +++ b/src/config/AgentConfig.ts @@ -35,7 +35,9 @@ export type AgentConfig = { sqlParametersMaxLength?: number; mongoTraceParameters?: boolean; mongoParametersMaxLength?: number; - awsLambdaFlush?: boolean; + awsLambdaFlush?: number; + awsLambdaChain?: boolean; + awsSQSCheckBody?: boolean; // the following is internal state computed from config values reDisablePlugins?: RegExp; reIgnoreOperation?: RegExp; @@ -128,7 +130,9 @@ const _config = { sqlParametersMaxLength: Math.trunc(Math.max(0, Number(process.env.SW_SQL_PARAMETERS_MAX_LENGTH))) || 512, mongoTraceParameters: (process.env.SW_MONGO_TRACE_PARAMETERS || '').toLowerCase() === 'true', mongoParametersMaxLength: Math.trunc(Math.max(0, Number(process.env.SW_MONGO_PARAMETERS_MAX_LENGTH))) || 512, - awsLambdaFlush: (process.env.SW_AWSLAMBDA_FLUSH || 'true').toLowerCase() === 'true', + awsLambdaFlush: ((n) => (Number.isNaN(n) ? -1 : n))(Number(process.env.SW_AWS_LAMBDA_FLUSH || 2)), + awsLambdaChain: (process.env.SW_AWS_LAMBDA_CHAIN || 'false').toLowerCase() === 'true', + awsSQSCheckBody: (process.env.SW_AWS_SQS_CHECK_BODY || 'false').toLowerCase() === 'true', reDisablePlugins: RegExp(''), // temporary placeholder so Typescript doesn't throw a fit reIgnoreOperation: RegExp(''), reHttpIgnoreMethod: RegExp(''), diff --git a/src/core/PluginInstaller.ts b/src/core/PluginInstaller.ts index 58e8e82..fab9b06 100644 --- a/src/core/PluginInstaller.ts +++ b/src/core/PluginInstaller.ts @@ -32,25 +32,24 @@ while (topModule.parent) { topModule = topModule.parent; - if (filename.endsWith('/skywalking-nodejs/lib/index.js')) + if (filename.endsWith('/skywalking-backend-js/lib/index.js')) // stop at the appropriate level in case app is being run by some other framework break; } export default class PluginInstaller { private readonly pluginDir: string; - readonly require: (name: string) => any = topModule.require.bind(topModule); + // if we are running bundled then topModule.require and module.constructor._resolveFilename are undefined (in webpack at least) + readonly require: (name: string) => any = topModule.require?.bind(topModule); readonly resolve = (request: string) => (module.constructor as any)._resolveFilename(request, topModule); constructor() { this.pluginDir = path.resolve(__dirname, '..', 'plugins'); } - private isBuiltIn = (module: string): boolean => this.resolve(module) === module; - private checkModuleVersion = (plugin: SwPlugin): { version: string; isSupported: boolean } => { try { - if (this.isBuiltIn(plugin.module)) { + if (plugin.isBuiltIn) { return { version: '*', isSupported: true, @@ -73,9 +72,8 @@ export default class PluginInstaller { } if (!semver.satisfies(version, plugin.versions)) { - logger.info(`Plugin ${plugin.module} ${version} doesn't satisfy the supported version ${plugin.versions}`); return { - version, + version: version || 'not found,', isSupported: false, }; } @@ -87,7 +85,7 @@ export default class PluginInstaller { isPluginEnabled = (name: string): boolean => !name.match(config.reDisablePlugins); - install(): void { + installNormal(): void { fs.readdirSync(this.pluginDir) .filter((file) => !(file.endsWith('.d.ts') || file.endsWith('.js.map'))) .forEach((file) => { @@ -120,4 +118,188 @@ export default class PluginInstaller { } }); } + + private checkBundledModuleVersion = ( + plugin: SwPlugin, + version: string, + ): { version: string; isSupported: boolean } => { + try { + if (plugin.versions === '!' || plugin.isBuiltIn || version === '*') { + return { + version: '*', + isSupported: true, + }; + } + } catch { + // module not found + return { + version: 'not found,', + isSupported: false, + }; + } + + if (!semver.satisfies(version, plugin.versions)) { + return { + version, + isSupported: false, + }; + } + return { + version, + isSupported: true, + }; + }; + + private installBundledPlugin = (pluginFile: string, plugin: SwPlugin, packageVersion: string) => { + if (pluginFile.match(config.reDisablePlugins)) { + logger.info(`Plugin ${pluginFile} not installed because it is disabled`); + return; + } + + try { + const { isSupported, version } = this.checkBundledModuleVersion(plugin, packageVersion); + + if (!isSupported) { + logger.info(`Plugin ${plugin.module} ${version} doesn't satisfy the supported version ${plugin.versions}`); + return; + } + + if (plugin.versions === '!') { + logger.info(`Explicit instrumentation plugin ${plugin.module} available`); + } else { + logger.info(`Installing plugin ${plugin.module} ${plugin.versions}`); + } + + plugin.install(this); + } catch (e) { + console.error(e); + logger.error(`Error installing plugin ${plugin.module} ${plugin.versions}`); + } + }; + + installBundled(): void { + // XXX: Initial support for running in a bundle, not ideal and doesn't support some plugins but at least it works. + // Webpack does not support dynamic `require(var)`, all imports must be of static form `require('module')`. + + try { + this.installBundledPlugin( + 'AMQPLibPlugin', + require('../plugins/AMQPLibPlugin').default, + require('amqplib/package.json').version, + ); + } catch { + // ESLINT SUCKS! + } + + try { + this.installBundledPlugin( + 'AWS2DynamoDBPlugin', + require('../plugins/AWS2DynamoDBPlugin').default, + require('aws-sdk/package.json').version, + ); + } catch { + // ESLINT SUCKS! + } + + try { + this.installBundledPlugin( + 'AWS2LambdaPlugin', + require('../plugins/AWS2LambdaPlugin').default, + require('aws-sdk/package.json').version, + ); + } catch { + // ESLINT SUCKS! + } + + try { + this.installBundledPlugin( + 'AWS2SNSPlugin', + require('../plugins/AWS2SNSPlugin').default, + require('aws-sdk/package.json').version, + ); + } catch { + // ESLINT SUCKS! + } + + try { + this.installBundledPlugin( + 'AWS2SQSPlugin', + require('../plugins/AWS2SQSPlugin').default, + require('aws-sdk/package.json').version, + ); + } catch { + // ESLINT SUCKS! + } + + // this.installBundledPlugin('AxiosPlugin', require('../plugins/AxiosPlugin').default, require('axios/package.json').version); // this package in all its wisdom disallows import of its package.json where the version number lives + + try { + this.installBundledPlugin( + 'ExpressPlugin', + require('../plugins/ExpressPlugin').default, + require('express/package.json').version, + ); + } catch { + // ESLINT SUCKS! + } + + try { + this.installBundledPlugin('HttpPlugin', require('../plugins/HttpPlugin').default, '*'); + } catch { + // ESLINT SUCKS! + } + + try { + this.installBundledPlugin( + 'IORedisPlugin', + require('../plugins/IORedisPlugin').default, + require('ioredis/package.json').version, + ); + } catch { + // ESLINT SUCKS! + } + + try { + this.installBundledPlugin( + 'MongoDBPlugin', + require('../plugins/MongoDBPlugin').default, + require('mongodb/package.json').version, + ); + } catch { + // ESLINT SUCKS! + } + + try { + this.installBundledPlugin( + 'MongoosePlugin', + require('../plugins/MongoosePlugin').default, + require('mongoose/package.json').version, + ); + } catch { + // ESLINT SUCKS! + } + + // this.installBundledPlugin('MySQL2Plugin', require('../plugins/MySQL2Plugin').default, require('mysql2/package.json').version); // this package in all its wisdom disallows import of its package.json where the version number lives + + try { + this.installBundledPlugin( + 'MySQLPlugin', + require('../plugins/MySQLPlugin').default, + require('mysql/package.json').version, + ); + } catch { + // ESLINT SUCKS! + } + + try { + this.installBundledPlugin('PgPlugin', require('../plugins/PgPlugin').default, require('pg/package.json').version); + } catch { + // ESLINT SUCKS! + } + } + + install(): void { + if (this.require as any) this.installNormal(); + else this.installBundled(); + } } diff --git a/src/core/SwPlugin.ts b/src/core/SwPlugin.ts index 7816fc4..8b26732 100644 --- a/src/core/SwPlugin.ts +++ b/src/core/SwPlugin.ts @@ -24,6 +24,7 @@ import OptionMethods from './OptionMethods'; export default interface SwPlugin extends OptionMethods { readonly module: string; readonly versions: string; + readonly isBuiltIn?: boolean; install(installer: PluginInstaller): void; } diff --git a/src/index.ts b/src/index.ts index 041ee94..a228620 100644 --- a/src/index.ts +++ b/src/index.ts @@ -22,6 +22,7 @@ import Protocol from './agent/protocol/Protocol'; import GrpcProtocol from './agent/protocol/grpc/GrpcProtocol'; import { createLogger } from './logging'; import PluginInstaller from './core/PluginInstaller'; +import SpanContext from './trace/context/SpanContext'; const logger = createLogger(__filename); @@ -57,11 +58,24 @@ class Agent { return null; } - return this.protocol.flush(); + const spanContextFlush = SpanContext.flush(); // if there are spans which haven't finished then wait for them + const protocol = this.protocol; + + if (!spanContextFlush) return protocol.flush(); + + return new Promise((resolve) => { + spanContextFlush.then(() => { + const protocolFlush = protocol.flush(); + + if (!protocolFlush) resolve(null); + else protocolFlush.then(() => resolve(null)); + }); + }); } } export default new Agent(); +export { default as config } from './config/AgentConfig'; export { default as ContextManager } from './trace/context/ContextManager'; export { default as AzureHttpTriggerPlugin } from './azure/AzureHttpTriggerPlugin'; export { default as AWSLambdaTriggerPlugin } from './aws/AWSLambdaTriggerPlugin'; diff --git a/src/plugins/AMQPLibPlugin.ts b/src/plugins/AMQPLibPlugin.ts index f7f6842..fa408f2 100644 --- a/src/plugins/AMQPLibPlugin.ts +++ b/src/plugins/AMQPLibPlugin.ts @@ -30,7 +30,7 @@ class AMQPLibPlugin implements SwPlugin { readonly versions = '*'; install(installer: PluginInstaller): void { - const { BaseChannel } = installer.require('amqplib/lib/channel'); + const { BaseChannel } = installer.require?.('amqplib/lib/channel') ?? require('amqplib/lib/channel'); this.interceptProducer(BaseChannel); this.interceptConsumer(BaseChannel); diff --git a/src/plugins/AWS2DynamoDBPlugin.ts b/src/plugins/AWS2DynamoDBPlugin.ts new file mode 100644 index 0000000..3008555 --- /dev/null +++ b/src/plugins/AWS2DynamoDBPlugin.ts @@ -0,0 +1,89 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import SwPlugin from '../core/SwPlugin'; +import ContextManager from '../trace/context/ContextManager'; +import { Component } from '../trace/Component'; +import Tag from '../Tag'; +import { SpanLayer } from '../proto/language-agent/Tracing_pb'; +import PluginInstaller from '../core/PluginInstaller'; +import { getAWS, execute } from '../aws/SDK2'; + +class AWS2DynamoDBPlugin implements SwPlugin { + readonly module = 'aws-sdk'; + readonly versions = '2.*'; + + install(installer: PluginInstaller): void { + const AWS = getAWS(installer); + const DocumentClient = AWS.DynamoDB.DocumentClient; + + function instrument(name: string): void { + const _func = DocumentClient.prototype[name]; + + DocumentClient.prototype[name] = function (params: any, callback?: any): any { + const span = ContextManager.current.newExitSpan(`AWS/DynamoDB/${name}`, Component.POSTGRESQL, Component.HTTP); + + span.component = Component.POSTGRESQL; + span.layer = SpanLayer.DATABASE; + // span.peer = `${this.service.endpoint.host ?? ''}:${this.service.endpoint.port ?? ''}`; + + span.tag(Tag.dbType('DynamoDB')); + span.tag(Tag.dbStatement(name)); + + return execute(span, this, _func, params, callback); + }; + } + + instrument('batchGet'); + instrument('batchWrite'); + instrument('delete'); + instrument('get'); + instrument('put'); + instrument('query'); + instrument('scan'); + instrument('update'); + instrument('transactGet'); + instrument('transactWrite'); + } +} + +// noinspection JSUnusedGlobalSymbols +export default new AWS2DynamoDBPlugin(); + +// // Example code for test maybe: +// const AWS = require("aws-sdk"); + +// AWS.config.update({region: 'your-region'}); + +// const dynamo = new AWS.DynamoDB.DocumentClient(); + +// function callback(err, data) { +// console.log('... callback err:', err); +// console.log('... callback data:', data); +// } + +// const data = {TableName: "table-name", Item: {id: 1, name: 'Bob'}}; + +// dynamo.put(data, callback); +// // OR: +// dynamo.put(data).send(callback); +// // OR: +// dynamo.put(data).promise() +// .then(r => { console.log('... promise res:', r); }) +// .catch(e => { console.log('... promise err:', e); }); diff --git a/src/plugins/AWS2LambdaPlugin.ts b/src/plugins/AWS2LambdaPlugin.ts new file mode 100644 index 0000000..5085481 --- /dev/null +++ b/src/plugins/AWS2LambdaPlugin.ts @@ -0,0 +1,128 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { hostname } from 'os'; +import config from '../config/AgentConfig'; +import SwPlugin from '../core/SwPlugin'; +import ContextManager from '../trace/context/ContextManager'; +import { Component } from '../trace/Component'; +import Tag from '../Tag'; +import { SpanLayer } from '../proto/language-agent/Tracing_pb'; +import PluginInstaller from '../core/PluginInstaller'; +import { KeyTrace, KeyParams } from '../aws/AWSLambdaTriggerPlugin'; +import { getAWS, execute } from '../aws/SDK2'; + +class AWS2LambdaPlugin implements SwPlugin { + readonly module = 'aws-sdk'; + readonly versions = '2.*'; + + install(installer: PluginInstaller): void { + const AWS = getAWS(installer); + const _Lambda = AWS.Lambda; + + function Lambda(this: any) { + const lambda = _Lambda.apply(this, arguments); + const _invoke = lambda.invoke; + + lambda.invoke = function (params: any, callback: any) { + if (params.InvocationType === 'DryRun') return _invoke.call(this, params, callback); + + let funcName = params.FunctionName; + const li = funcName.lastIndexOf(':'); + let name; + + if (li === -1) name = funcName; + else { + // my-function:v1 + if (funcName.indexOf(':') === -1) name = funcName.slice(li); + else name = funcName.slice(li + 1); // 123456789012:function:my-function, arn:aws:lambda:us-west-2:123456789012:function:my-function + } + + const span = ContextManager.current.newExitSpan( + `AWS/Lambda/invoke/${name || ''}`, + Component.AWSLAMBDA_FUNCTION, + Component.HTTP, + ); + + span.component = Component.AWSLAMBDA_FUNCTION; + span.layer = SpanLayer.HTTP; + + if (li !== -1) span.tag(Tag.arn(funcName)); + + if (config.awsLambdaChain) { + let payload = params.Payload; + + if (payload instanceof Buffer) payload = payload.toString(); + + if (typeof payload === 'string') { + const traceid = JSON.stringify(`${span.inject().value}/${hostname()}`); + const keyTrace = JSON.stringify(KeyTrace); + const keyParams = JSON.stringify(KeyParams); + + if (payload.match(/^\s*{\s*}\s*$/)) payload = `{${keyTrace}:${traceid}}`; + else if (payload.match(/^\s*{/)) + payload = `{${keyTrace}:${traceid},${payload.slice(payload.indexOf('{') + 1)}`; + else payload = `{${keyTrace}:${traceid},${keyParams}:${payload}}`; + + params = Object.assign({}, params, { Payload: payload }); + } + } + + return execute(span, this, _invoke, params, callback); + }; + + return lambda; + } + + Object.assign(Lambda, _Lambda); + + Lambda.prototype = _Lambda.prototype; + AWS.Lambda = Lambda; + } +} + +// noinspection JSUnusedGlobalSymbols +export default new AWS2LambdaPlugin(); + +// // Example code for test maybe: +// const AWS = require("aws-sdk"); + +// AWS.config.update({region: 'your-region'}); + +// const lambda = new AWS.Lambda(); + +// function callback(err, data) { +// console.log('... callback err:', err); +// console.log('... callback data:', data); +// } + +// const params = { +// FunctionName: 'function_to_call', +// InvocationType: 'RequestResponse', // or 'Event', +// // LogType: 'Tail', +// Payload: JSON.stringify({arg1: 'args to function'}), +// }; + +// lambda.invoke(params, callback); +// // OR: +// lambda.invoke(params).send(callback); +// // OR: +// lambda.invoke(params).promise() +// .then(r => { console.log('... promise res:', r); }) +// .catch(e => { console.log('... promise err:', e); }); diff --git a/src/plugins/AWS2SNSPlugin.ts b/src/plugins/AWS2SNSPlugin.ts new file mode 100644 index 0000000..55df582 --- /dev/null +++ b/src/plugins/AWS2SNSPlugin.ts @@ -0,0 +1,127 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { hostname } from 'os'; +import SwPlugin from '../core/SwPlugin'; +import ContextManager from '../trace/context/ContextManager'; +import { Component } from '../trace/Component'; +import Tag from '../Tag'; +import Span from '../trace/span/Span'; +import { SpanLayer } from '../proto/language-agent/Tracing_pb'; +import PluginInstaller from '../core/PluginInstaller'; +import { getAWS, execute } from '../aws/SDK2'; + +class AWS2SNSPlugin implements SwPlugin { + readonly module = 'aws-sdk'; + readonly versions = '2.*'; + + install(installer: PluginInstaller): void { + const AWS = getAWS(installer); + const _SNS = AWS.SNS; + + function SNS(this: any) { + const sns = _SNS.apply(this, arguments); + + function instrument(name: string, addTraceId: any): void { + const _func = sns[name]; + + sns[name] = function (params: any, callback: any) { + const to = params.TopicArn + ? `Topic/${params.TopicArn.slice(params.TopicArn.lastIndexOf(':') + 1)}` + : params.TargetArn + ? `Target/${params.TargetArn.slice(params.TargetArn.lastIndexOf(':') + 1)}` + : params.PhoneNumber + ? `Phone/${params.PhoneNumber}` + : '???'; + const operation = `AWS/SNS/${name}/${to}`; + const span = ContextManager.current.newExitSpan(operation, Component.AWSLAMBDA_FUNCTION, Component.HTTP); + const arn = params.TopicArn || params.TargetArn; + + span.component = Component.AWSLAMBDA_FUNCTION; + span.layer = SpanLayer.MQ; + + if (arn) span.tag(Tag.arn(arn)); + + if (params.TopicArn) params = addTraceId(params, span); + + return execute(span, this, _func, params, callback, 'mqBroker'); + }; + } + + instrument('publish', (params: any, span: Span) => { + params = Object.assign({}, params); + params.MessageAttributes = params.MessageAttributes ? Object.assign({}, params.MessageAttributes) : {}; + params.MessageAttributes.__revdTraceId = { + DataType: 'String', + StringValue: `${span.inject().value}/${hostname()}`, + }; + + return params; + }); + + instrument('publishBatch', (params: any, span: Span) => { + const traceId = { __revdTraceId: { DataType: 'String', StringValue: `${span.inject().value}/${hostname()}` } }; + params = Object.assign({}, params); + params.PublishBatchRequestEntries = params.PublishBatchRequestEntries.map( + (e: any) => + (e = Object.assign({}, e, { + MessageAttributes: e.MessageAttributes ? Object.assign({}, e.MessageAttributes, traceId) : traceId, + })), + ); + + return params; + }); + + return sns; + } + + Object.assign(SNS, _SNS); + + SNS.prototype = _SNS.prototype; + AWS.SNS = SNS; + } +} + +// noinspection JSUnusedGlobalSymbols +export default new AWS2SNSPlugin(); + +// // Example code for test maybe: +// const AWS = require("aws-sdk"); + +// AWS.config.update({region: 'your-region'}); + +// const sns = new AWS.SNS(); + +// function callback(err, data) { +// console.log('... callback err:', err); +// console.log('... callback data:', data); +// } + +// const message = { +// Message: 'MESSAGE_TEXT', /* required */ +// TopicArn: 'topic_arn', /* or other destinations */ +// }; + +// sns.publish(message, callback); +// // OR: +// sns.publish(message).send(callback); +// // OR: +// sns.publish(message).promise() +// .then(r => { console.log('... promise res:', r); }) +// .catch(e => { console.log('... promise err:', e); }); diff --git a/src/plugins/AWS2SQSPlugin.ts b/src/plugins/AWS2SQSPlugin.ts new file mode 100644 index 0000000..078c315 --- /dev/null +++ b/src/plugins/AWS2SQSPlugin.ts @@ -0,0 +1,218 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { hostname } from 'os'; +import config from '../config/AgentConfig'; +import SwPlugin from '../core/SwPlugin'; +import { ContextCarrier } from '../trace/context/ContextCarrier'; +import ContextManager from '../trace/context/ContextManager'; +import { Component } from '../trace/Component'; +import Tag from '../Tag'; +import Span from '../trace/span/Span'; +import { SpanLayer } from '../proto/language-agent/Tracing_pb'; +import PluginInstaller from '../core/PluginInstaller'; +import { getAWS, execute } from '../aws/SDK2'; + +class AWS2SQSPlugin implements SwPlugin { + readonly module = 'aws-sdk'; + readonly versions = '2.*'; + + install(installer: PluginInstaller): void { + const AWS = getAWS(installer); + const _SQS = AWS.SQS; + + function SQS(this: any) { + const sqs = _SQS.apply(this, arguments); + + function instrumentSend(name: string, addTraceId: any): void { + const _func = sqs[name]; + + sqs[name] = function (params: any, callback: any) { + const queueUrl = params.QueueUrl; + const operation = `AWS/SQS/${name}/${queueUrl.slice(queueUrl.lastIndexOf('/') + 1)}`; + const span = ContextManager.current.newExitSpan(operation, Component.AWSLAMBDA_FUNCTION, Component.HTTP); + + span.component = Component.AWSLAMBDA_FUNCTION; + span.layer = SpanLayer.MQ; + + return execute(span, this, _func, addTraceId(params, span), callback, 'mqBroker'); + }; + } + + instrumentSend('sendMessage', (params: any, span: Span) => { + params = Object.assign({}, params); + params.MessageAttributes = params.MessageAttributes ? Object.assign({}, params.MessageAttributes) : {}; + params.MessageAttributes.__revdTraceId = { + DataType: 'String', + StringValue: `${span.inject().value}/${hostname()}`, + }; + + return params; + }); + + instrumentSend('sendMessageBatch', (params: any, span: Span) => { + const traceId = { __revdTraceId: { DataType: 'String', StringValue: `${span.inject().value}/${hostname()}` } }; + params = Object.assign({}, params); + params.Entries = params.Entries.map( + (e: any) => + (e = Object.assign({}, e, { + MessageAttributes: e.MessageAttributes ? Object.assign({}, e.MessageAttributes, traceId) : traceId, + })), + ); + + return params; + }); + + const _receiveMessage = sqs.receiveMessage; + + sqs.receiveMessage = function (params: any, callback: any) { + params = Object.assign({}, params); + const _MessageAttributeNames = params.MessageAttributeNames; + params.MessageAttributeNames = _MessageAttributeNames + ? _MessageAttributeNames.concat(['__revdTraceId']) + : ['__revdTraceId']; + + delete params.MaxNumberOfMessages; // limit to 1 message in order to be able to link all Exit and Entry spans + + const queueUrl = params.QueueUrl; + const operation = `AWS/SQS/receiveMessage/${queueUrl.slice(queueUrl.lastIndexOf('/') + 1)}`; + const span = ContextManager.current.newExitSpan( + `${operation}`, + Component.AWSLAMBDA_FUNCTION, + Component.HTTP, + ); + + span.component = Component.AWSLAMBDA_FUNCTION; + span.layer = SpanLayer.MQ; + + // should always be called on success only, with no err + function beforeCB(this: any, span: Span, err: any, res: any): Span { + if (res.Messages?.length) { + const delall = !_MessageAttributeNames || !_MessageAttributeNames.length; + let traceId; + + // should only be 1 + for (let msg of res.Messages) { + if (msg.MessageAttributes !== undefined || !config.awsSQSCheckBody) + traceId = msg.MessageAttributes?.__revdTraceId?.StringValue; + else { + try { + msg = JSON.parse(msg.Body); + traceId = msg.MessageAttributes?.__revdTraceId?.Value; + } catch { + // NOOP + } + } + + if (traceId) { + if (delall) { + delete msg.MD5OfMessageAttributes; + delete msg.MessageAttributes; + } else { + delete msg.MessageAttributes.__revdTraceId; + + if (!Object.keys(msg.MessageAttributes).length) { + delete msg.MD5OfMessageAttributes; + delete msg.MessageAttributes; + } + } + } + } + + let peer = 'Unknown'; + let carrier: ContextCarrier | undefined = undefined; + + if (traceId) { + const idx = traceId.lastIndexOf('/'); + + if (idx !== -1) { + peer = traceId.slice(idx + 1); + traceId = traceId.slice(0, idx); + carrier = ContextCarrier.from({ sw8: traceId }); + } + } + + span.stop(); + + span = ContextManager.current.newEntrySpan(operation, carrier); + + span.component = Component.AWSLAMBDA_FUNCTION; + span.layer = SpanLayer.MQ; + span.peer = peer; + + span.tag(Tag.mqBroker(queueUrl)); + + span.start(); + } + + return span; + } + + return execute(span, this, _receiveMessage, params, callback, 'mqBroker', beforeCB); + }; + + return sqs; + } + + Object.assign(SQS, _SQS); + + SQS.prototype = _SQS.prototype; + AWS.SQS = SQS; + } +} + +// noinspection JSUnusedGlobalSymbols +export default new AWS2SQSPlugin(); + +// // Example code for test maybe: +// const AWS = require("aws-sdk"); + +// AWS.config.update({region: 'your-region'}); + +// const sqs = new AWS.SQS(); + +// function callback(err, data) { +// console.log('... callback err:', err); +// console.log('... callback data:', data); +// } + +// const send = { +// MessageBody: 'Hello World...', /* required */ +// QueueUrl: 'https://queue_url', /* required */ +// }; + +// sqs.sendMessage(send, callback); +// // OR: +// sqs.sendMessage(send).send(callback); +// // OR: +// sqs.sendMessage(send).promise() +// .then(r => { console.log('... promise res:', r); }) +// .catch(e => { console.log('... promise err:', e); }); + +// const recv = { +// QueueUrl: 'https://queue_url', /* required */ +// }; + +// sqs.receiveMessage(recv, callback); +// // OR: +// sqs.receiveMessage(recv).send(callback); +// // OR: +// sqs.receiveMessage(recv).promise() +// .then(r => { console.log('... promise res:', r); }) +// .catch(e => { console.log('... promise err:', e); }); diff --git a/src/plugins/AxiosPlugin.ts b/src/plugins/AxiosPlugin.ts index 2889105..abe06f7 100644 --- a/src/plugins/AxiosPlugin.ts +++ b/src/plugins/AxiosPlugin.ts @@ -36,7 +36,7 @@ class AxiosPlugin implements SwPlugin { } private interceptClientRequest(installer: PluginInstaller): void { - const Axios = installer.require('axios/lib/core/Axios'); + const Axios = installer.require?.('axios/lib/core/Axios') ?? require('axios/lib/core/Axios'); const _request = Axios.prototype.request; Axios.prototype.request = function (url?: any, config?: any) { diff --git a/src/plugins/ExpressPlugin.ts b/src/plugins/ExpressPlugin.ts index cfbd0d2..d8e7651 100644 --- a/src/plugins/ExpressPlugin.ts +++ b/src/plugins/ExpressPlugin.ts @@ -18,7 +18,7 @@ */ import SwPlugin from '../core/SwPlugin'; -import { IncomingMessage, ServerResponse } from 'http'; +import { ServerResponse } from 'http'; import ContextManager from '../trace/context/ContextManager'; import { Component } from '../trace/Component'; import Tag from '../Tag'; @@ -38,7 +38,7 @@ class ExpressPlugin implements SwPlugin { } private interceptServerRequest(installer: PluginInstaller) { - const router = installer.require('express/lib/router'); + const router = installer.require?.('express/lib/router') ?? require('express/lib/router'); const _handle = router.handle; router.handle = function (req: Request, res: ServerResponse, next: any) { diff --git a/src/plugins/HttpPlugin.ts b/src/plugins/HttpPlugin.ts index 858833a..84c80e1 100644 --- a/src/plugins/HttpPlugin.ts +++ b/src/plugins/HttpPlugin.ts @@ -32,6 +32,7 @@ import { ignoreHttpMethodCheck } from '../config/AgentConfig'; class HttpPlugin implements SwPlugin { readonly module = 'http'; readonly versions = '*'; + readonly isBuiltIn = true; install(): void { const http = require('http'); diff --git a/src/plugins/IORedisPlugin.ts b/src/plugins/IORedisPlugin.ts index d7ce3aa..c496857 100644 --- a/src/plugins/IORedisPlugin.ts +++ b/src/plugins/IORedisPlugin.ts @@ -29,7 +29,7 @@ class IORedisPlugin implements SwPlugin { readonly versions = '*'; install(installer: PluginInstaller): void { - const Redis = installer.require('ioredis'); + const Redis = installer.require?.('ioredis') ?? require('ioredis'); this.interceptOperation(Redis, 'sendCommand'); } diff --git a/src/plugins/MongoDBPlugin.ts b/src/plugins/MongoDBPlugin.ts index 8d022fc..a28b0ec 100644 --- a/src/plugins/MongoDBPlugin.ts +++ b/src/plugins/MongoDBPlugin.ts @@ -43,9 +43,9 @@ class MongoDBPlugin implements SwPlugin { install(installer: PluginInstaller): void { const plugin = this; - this.Collection = installer.require('mongodb/lib/collection'); - this.Cursor = installer.require('mongodb/lib/cursor'); - this.Db = installer.require('mongodb/lib/db'); + this.Collection = installer.require?.('mongodb/lib/collection') ?? require('mongodb/lib/collection'); + this.Cursor = installer.require?.('mongodb/lib/cursor') ?? require('mongodb/lib/cursor'); + this.Db = installer.require?.('mongodb/lib/db') ?? require('mongodb/lib/db'); const wrapCallbackWithCursorMaybe = (span: any, args: any[], idx: number): boolean => { const callback = args.length > idx && typeof args[(idx = args.length - 1)] === 'function' ? args[idx] : null; diff --git a/src/plugins/MongoosePlugin.ts b/src/plugins/MongoosePlugin.ts index 52aa349..234abd5 100644 --- a/src/plugins/MongoosePlugin.ts +++ b/src/plugins/MongoosePlugin.ts @@ -30,7 +30,7 @@ class MongoosePlugin implements SwPlugin { mongodbEnabled?: boolean; install(installer: PluginInstaller): void { - const { Model } = installer.require('mongoose'); + const { Model } = installer.require?.('mongoose') ?? require('mongoose'); this.interceptOperation(Model, 'aggregate'); this.interceptOperation(Model, 'bulkWrite'); diff --git a/src/plugins/MySQL2Plugin.ts b/src/plugins/MySQL2Plugin.ts index 1adc5dd..cddebd7 100644 --- a/src/plugins/MySQL2Plugin.ts +++ b/src/plugins/MySQL2Plugin.ts @@ -32,14 +32,19 @@ class MySQL2Plugin implements SwPlugin { readonly versions = '*'; getVersion(installer: PluginInstaller): string { - let indexPath = installer.resolve(this.module); - let packageSJonStr = fs.readFileSync(`${path.dirname(indexPath)}${path.sep}package.json`, { encoding: 'utf-8' }); - const pkg = JSON.parse(packageSJonStr); - return pkg.version; + // TODO: this method will not work in a bundle + try { + let indexPath = installer.resolve(this.module); + let packageSJonStr = fs.readFileSync(`${path.dirname(indexPath)}${path.sep}package.json`, { encoding: 'utf-8' }); + const pkg = JSON.parse(packageSJonStr); + return pkg.version; + } catch { + return ''; + } } install(installer: PluginInstaller): void { - const Connection = installer.require('mysql2').Connection; + const Connection = (installer.require?.('mysql2') ?? require('mysql2')).Connection; const _query = Connection.prototype.query; Connection.prototype.query = function (sql: any, values: any, cb: any) { diff --git a/src/plugins/MySQLPlugin.ts b/src/plugins/MySQLPlugin.ts index 1e08033..23a6b26 100644 --- a/src/plugins/MySQLPlugin.ts +++ b/src/plugins/MySQLPlugin.ts @@ -30,7 +30,7 @@ class MySQLPlugin implements SwPlugin { readonly versions = '*'; install(installer: PluginInstaller): void { - const Connection = installer.require('mysql/lib/Connection'); + const Connection = installer.require?.('mysql/lib/Connection') ?? require('mysql/lib/Connection'); const _query = Connection.prototype.query; Connection.prototype.query = function (sql: any, values: any, cb: any) { diff --git a/src/plugins/PgPlugin.ts b/src/plugins/PgPlugin.ts index 3addc39..757fead 100644 --- a/src/plugins/PgPlugin.ts +++ b/src/plugins/PgPlugin.ts @@ -30,12 +30,12 @@ class MySQLPlugin implements SwPlugin { readonly versions = '*'; install(installer: PluginInstaller): void { - const Client = installer.require('pg/lib/client'); + const Client = installer.require?.('pg/lib/client') ?? require('pg/lib/client'); let Cursor: any; try { - Cursor = installer.require('pg-cursor'); + Cursor = installer.require?.('pg-cursor') ?? require('pg-cursor'); } catch { /* Linter food */ } diff --git a/src/trace/context/ContextManager.ts b/src/trace/context/ContextManager.ts index cb5da3b..2713369 100644 --- a/src/trace/context/ContextManager.ts +++ b/src/trace/context/ContextManager.ts @@ -133,14 +133,12 @@ class ContextManager { if (spans.indexOf(span) === -1) spans.push(span); } - removeTailFinishedContexts(): void { - // XXX: Normally, SpanContexts that finish and send their segments can remain in the span lists of async contexts. - // This is so that if an async child that was spawned by the original span code and is executed after the parent - // finishes and creates its own span can be linked to the parent segment and span correctly. But in some situations - // where successive independent operations are chained linearly instead of hierarchically (AWS Lambda functions), - // this can cause a false reference by a subsequent operation as if it were a child of the finished previous span. - - for (const spans = this.asyncState.spans; spans.length && spans[spans.length - 1].context.finished; spans.pop()); + clearAll(): void { + // This is for situations where successive independent operations are chained linearly instead of hierarchically + // (AWS Lambda functions), this can cause a false reference by a subsequent operation as if it were a child of the + // previous span. + + this.spansDup().splice(0); } withSpan(span: Span, callback: (...args: any[]) => any, ...args: any[]): any { diff --git a/src/trace/context/SpanContext.ts b/src/trace/context/SpanContext.ts index 7d9f9bf..bd19926 100644 --- a/src/trace/context/SpanContext.ts +++ b/src/trace/context/SpanContext.ts @@ -42,6 +42,8 @@ emitter.on('segments-sent', () => { export default class SpanContext implements Context { static nActiveSegments = 0; // counter to allow only config.maxBufferSize active (non-dummy) segments per reporting frame + static nTotalSegments = 0; // counter of total number of unfinished segments + static flushResolve: ((value: unknown) => void)[] = []; // functions to be called on nTotalSegments reaching 0 spanId = 0; nSpans = 0; finished = false; @@ -114,7 +116,7 @@ export default class SpanContext implements Context { !this.finished && parent?.type === SpanType.ENTRY && inherit && - (inherit instanceof Component ? inherit === parent.component : inherit.indexOf(parent.component) != -1) + (inherit instanceof Component ? inherit === parent.component : inherit.indexOf(parent.component) !== -1) ) { span = parent; parent.operation = operation; @@ -173,6 +175,7 @@ export default class SpanContext implements Context { if (!this.nSpans++) { SpanContext.nActiveSegments += 1; + SpanContext.nTotalSegments += 1; span.isCold = ContextManager.checkCold(); if (span.isCold) span.tag(Tag.coldStart(), true); @@ -198,6 +201,12 @@ export default class SpanContext implements Context { emitter.emit('segment-finished', this.segment); + if (!--SpanContext.nTotalSegments && SpanContext.flushResolve.length) { + let resolve: ((value: unknown) => void) | undefined; + + while ((resolve = SpanContext.flushResolve.pop())) resolve(null); + } + return true; } @@ -224,6 +233,18 @@ export default class SpanContext implements Context { ContextManager.restore(span); } + static flush(): Promise | null { + // This function explicitly returns null instead of a resolved Promise in case of nothing to flush so that in this + // case passing control back to the event loop can be avoided. Even a resolved Promise will run other things in + // the event loop when it is awaited and before it continues. + + return !SpanContext.nTotalSegments + ? null + : new Promise((resolve: (value: unknown) => void) => { + SpanContext.flushResolve.push(resolve); + }); + } + traceId(): string { if (!this.segment.relatedTraces) { return 'N/A'; diff --git a/src/trace/span/DummySpan.ts b/src/trace/span/DummySpan.ts index c75f84a..29afff3 100644 --- a/src/trace/span/DummySpan.ts +++ b/src/trace/span/DummySpan.ts @@ -33,13 +33,11 @@ export default class DummySpan extends Span { } start(): any { - if (!this.depth++) - this.context.start(this); + if (!this.depth++) this.context.start(this); } stop(block?: any): void { - if (!--this.depth) - this.context.stop(this); + if (!--this.depth) this.context.stop(this); } async(block?: any): void { diff --git a/src/trace/span/Span.ts b/src/trace/span/Span.ts index e5fc638..743c898 100644 --- a/src/trace/span/Span.ts +++ b/src/trace/span/Span.ts @@ -84,8 +84,7 @@ export default abstract class Span { } stop(): void { - if (--this.depth === 0) - this.context.stop(this); + if (--this.depth === 0) this.context.stop(this); } async(): void { @@ -97,8 +96,7 @@ export default abstract class Span { } finish(segment: Segment): boolean { - if (this.isCold && config.coldEndpoint) - this.operation = this.operation + ''; + if (this.isCold && config.coldEndpoint) this.operation = this.operation + ''; this.endTime = new Date().getTime(); segment.archive(this); @@ -136,10 +134,8 @@ export default abstract class Span { const tagObj = Object.assign({}, tag); - if (!insert) - this.tags.push(tagObj); - else - this.tags.unshift(tagObj) + if (!insert) this.tags.push(tagObj); + else this.tags.unshift(tagObj); return this; } @@ -147,14 +143,15 @@ export default abstract class Span { log(key: string, val: any): this { this.logs.push({ timestamp: new Date().getTime(), - items: [{key, val: `${val}`}] + items: [{ key, val: `${val}` }], } as Log); return this; } error(error: Error): this { - if (error === this.lastError) // don't store duplicate identical error twice + if (error === this.lastError) + // don't store duplicate identical error twice return this; this.errored = true;