Skip to content

Commit

Permalink
AWS DynamoDB, Lambda, SQS and SNS plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-pytel committed Nov 12, 2022
1 parent 80d8148 commit 59ee7a1
Show file tree
Hide file tree
Showing 30 changed files with 1,218 additions and 151 deletions.
14 changes: 13 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ Environment Variable | Description | Default
| `SW_SQL_PARAMETERS_MAX_LENGTH` | The maximum string length of SQL parameters to log | `512` |
| `SW_MONGO_TRACE_PARAMETERS` | If set to 'true' then mongodb query parameters will be included | `false` |
| `SW_MONGO_PARAMETERS_MAX_LENGTH` | The maximum string length of mongodb parameters to log | `512` |
| `SW_AWSLAMBDA_FLUSH` | If set to 'true' then AWS Lambda functions will flush segment data when the handler finishes. `false` will be more optimal for high throughput applications but may lose spans. | `true` |
| `SW_AWS_LAMBDA_FLUSH` | Maximum number of float seconds allowed to pass between invocations before consecutive Lambda function calls flush automatically upon exit, 0 means always flush, -1 means never. | `2` |
| `SW_AWS_LAMBDA_CHAIN` | Pass trace ID to AWS Lambda function in its parameters (to allow linking). Only use if both caller and callee will be instrumented. | `false` |
| `SW_AWS_SQS_CHECK_BODY` | Incoming SQS messages check inside the body for trace ID in order to allow linking outgoing SNS messages to incoming SQS. | `false` |
| `SW_AGENT_MAX_BUFFER_SIZE` | The maximum buffer size before sending the segment data to backend | `'1000'` |

Note that the various ignore options like `SW_IGNORE_SUFFIX`, `SW_TRACE_IGNORE_PATH` and `SW_HTTP_IGNORE_METHOD` as well as endpoints which are not recorded due to exceeding `SW_AGENT_MAX_BUFFER_SIZE` all propagate their ignored status downstream to any other endpoints they may call. If that endpoint is running the Node Skywalking agent then regardless of its ignore settings it will not be recorded since its upstream parent was not recorded. This allows the elimination of entire trees of endpoints you are not interested in as well as eliminating partial traces if a span in the chain is ignored but calls out to other endpoints which are recorded as children of ROOT instead of the actual parent.
Expand All @@ -87,6 +89,10 @@ Library | Plugin Name
| [`Mongoose`](https://github.com/Automattic/mongoose) | `mongoose` |
| [`RabbitMQ`](https://github.com/squaremo/amqp.node) | `amqplib` |
| [`Redis`](https://github.com/luin/ioredis) | `ioredis` |
| [`AWS2DynamoDB](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html) | `aws-sdk` |
| [`AWS2Lambda](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/Lambda.html) | `aws-sdk` |
| [`AWS2SNS](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SNS.html) | `aws-sdk` |
| [`AWS2SQS](https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html) | `aws-sdk` |

### Compatible Libraries

Expand Down Expand Up @@ -138,6 +144,12 @@ exports.handler = AWSLambdaGatewayAPIHTTP.wrap(async function (event, context, c

This is similar to Azure Functions wrapping, just wrap your handler function with `AWSLambdaTriggerPlugin.wrap()` or `AWSLambdaGatewayAPIHTTP.wrap()` or `AWSLambdaGatewayAPIREST.wrap()`. One thing to note is that AWS freezes processes in between invocations of lambda functions so whether you are doing async or sync handler functions with callbacks, you should make sure everything you need to do finishes before returning control to AWS or calling the synchronous callback. These plugins take this into account and automatically flush the segment buffers before closing a trace span.

## Experimental Webpack Support

Webpack requires that all imports be statically defined at compile-time and so was not compatible with the dynamic search and loading done by the standard `PluginInstaller`. This has been extended to attempt static imports if the application is determined to be running out of a webpack bundle. This requires that any new plugins be manually added to `PluginInstaller.installBundled()`. Only plugins which allow a `require('module/package.json')` will work with this method as `package.json` needs to be loaded to determine the version of the plugin module present. Some modules specifically disallow import of their package.json and so can not be loaded like this.

Upon compile with `webpack` it will complain about missing modules for which imports are attempted in the sw agent but which are not present. Simply add these modules to the list of modules to be ignored by webpack, for example by `resolve: {alias: {'module': false}}`.

## Contact Us
* Submit [an issue](https://github.com/apache/skywalking/issues/new) by using [Nodejs] as title prefix.
* Mail list: **[email protected]**. Mail to `[email protected]`, follow the reply to subscribe the mail list.
Expand Down
10 changes: 9 additions & 1 deletion src/Tag.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export interface Tag {

export default {
coldStartKey: 'coldStart',
httpStatusCodeKey: 'http.status.code', // TODO: maybe find a better place to put these?
httpStatusCodeKey: 'http.status.code', // TODO: maybe find a better place to put these?
httpStatusMsgKey: 'http.status.msg',
httpURLKey: 'http.url',
httpMethodKey: 'http.method',
Expand All @@ -37,6 +37,7 @@ export default {
mqBrokerKey: 'mq.broker',
mqTopicKey: 'mq.topic',
mqQueueKey: 'mq.queue',
arnKey: 'arn',

coldStart(val: boolean = true): Tag {
return {
Expand Down Expand Up @@ -129,4 +130,11 @@ export default {
val: `${val}`,
} as Tag;
},
arn(val: string | undefined): Tag {
return {
key: this.arnKey,
overridable: true,
val: `${val}`,
} as Tag;
},
};
7 changes: 3 additions & 4 deletions src/agent/protocol/grpc/AuthInterceptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@
import * as grpc from '@grpc/grpc-js';
import config from '../../../config/AgentConfig';


export default function AuthInterceptor() {
const mata = new grpc.Metadata()
export default function AuthInterceptor() {
const mata = new grpc.Metadata();
if (config.authorization) {
mata.add('Authentication', config.authorization);
}
}
return mata;
}
95 changes: 40 additions & 55 deletions src/agent/protocol/grpc/SegmentObjectAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,7 @@
import config from '../../../config/AgentConfig';
import { KeyStringValuePair } from '../../../proto/common/Common_pb';
import Segment from '../../../trace/context/Segment';
import {
Log,
RefType,
SegmentObject,
SegmentReference,
SpanObject,
} from '../../../proto/language-agent/Tracing_pb';
import { Log, RefType, SegmentObject, SegmentReference, SpanObject } from '../../../proto/language-agent/Tracing_pb';

/**
* An adapter that adapts {@link Segment} objects to gRPC object {@link SegmentObject}.
Expand All @@ -35,55 +29,46 @@ export default class SegmentObjectAdapter extends SegmentObject {
constructor(segment: Segment) {
super();
super
.setService(config.serviceName)
.setServiceinstance(config.serviceInstance)
.setTraceid(segment.relatedTraces[0].toString())
.setTracesegmentid(segment.segmentId.toString())
.setSpansList(
segment.spans.map((span) =>
new SpanObject()
.setSpanid(span.id)
.setParentspanid(span.parentId)
.setStarttime(span.startTime)
.setEndtime(span.endTime)
.setOperationname(span.operation)
.setPeer(span.peer)
.setSpantype(span.type)
.setSpanlayer(span.layer)
.setComponentid(span.component.id)
.setIserror(span.errored)
.setLogsList(
span.logs.map((log) =>
new Log()
.setTime(log.timestamp)
.setDataList(
log.items.map((logItem) =>
new KeyStringValuePair()
.setKey(logItem.key)
.setValue(logItem.val)),
.setService(config.serviceName)
.setServiceinstance(config.serviceInstance)
.setTraceid(segment.relatedTraces[0].toString())
.setTracesegmentid(segment.segmentId.toString())
.setSpansList(
segment.spans.map((span) =>
new SpanObject()
.setSpanid(span.id)
.setParentspanid(span.parentId)
.setStarttime(span.startTime)
.setEndtime(span.endTime)
.setOperationname(span.operation)
.setPeer(span.peer)
.setSpantype(span.type)
.setSpanlayer(span.layer)
.setComponentid(span.component.id)
.setIserror(span.errored)
.setLogsList(
span.logs.map((log) =>
new Log()
.setTime(log.timestamp)
.setDataList(
log.items.map((logItem) => new KeyStringValuePair().setKey(logItem.key).setValue(logItem.val)),
),
),
)
.setTagsList(span.tags.map((tag) => new KeyStringValuePair().setKey(tag.key).setValue(tag.val)))
.setRefsList(
span.refs.map((ref) =>
new SegmentReference()
.setReftype(RefType.CROSSPROCESS)
.setTraceid(ref.traceId.toString())
.setParenttracesegmentid(ref.segmentId.toString())
.setParentspanid(ref.spanId)
.setParentservice(ref.service)
.setParentserviceinstance(ref.serviceInstance)
.setNetworkaddressusedatpeer(ref.clientAddress),
),
),
),
)
.setTagsList(
span.tags.map((tag) =>
new KeyStringValuePair()
.setKey(tag.key)
.setValue(tag.val),
),
)
.setRefsList(
span.refs.map((ref) =>
new SegmentReference()
.setReftype(RefType.CROSSPROCESS)
.setTraceid(ref.traceId.toString())
.setParenttracesegmentid(ref.segmentId.toString())
.setParentspanid(ref.spanId)
.setParentservice(ref.service)
.setParentserviceinstance(ref.serviceInstance)
.setNetworkaddressusedatpeer(ref.clientAddress),
),
),
),
);
);
}
}
5 changes: 2 additions & 3 deletions src/aws/AWSLambdaGatewayAPIHTTP.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { ignoreHttpMethodCheck } from '../config/AgentConfig';
import { AWSLambdaTriggerPlugin } from './AWSLambdaTriggerPlugin';

class AWSLambdaGatewayAPIHTTP extends AWSLambdaTriggerPlugin {
start(event: any, context: any): Span {
start(event: any, context: any): [Span, any] {
const headers = event.headers;
const reqCtx = event.requestContext;
const http = reqCtx?.http;
Expand All @@ -56,7 +56,6 @@ class AWSLambdaGatewayAPIHTTP extends AWSLambdaTriggerPlugin {
? DummySpan.create()
: ContextManager.current.newEntrySpan(operation, carrier);

span.layer = SpanLayer.HTTP;
span.component = Component.AWSLAMBDA_GATEWAYAPIHTTP;
span.peer = http?.sourceIp ?? headers?.['x-forwarded-for'] ?? 'Unknown';

Expand All @@ -66,7 +65,7 @@ class AWSLambdaGatewayAPIHTTP extends AWSLambdaTriggerPlugin {

span.start();

return span;
return [span, event];
}

stop(span: Span, err: Error | null, res: any): void {
Expand Down
5 changes: 2 additions & 3 deletions src/aws/AWSLambdaGatewayAPIREST.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { ignoreHttpMethodCheck } from '../config/AgentConfig';
import { AWSLambdaTriggerPlugin } from './AWSLambdaTriggerPlugin';

class AWSLambdaGatewayAPIREST extends AWSLambdaTriggerPlugin {
start(event: any, context: any): Span {
start(event: any, context: any): [Span, any] {
const headers = event.headers;
const reqCtx = event.requestContext;
const method = reqCtx?.httpMethod ?? event.httpMethod;
Expand Down Expand Up @@ -58,7 +58,6 @@ class AWSLambdaGatewayAPIREST extends AWSLambdaTriggerPlugin {
? DummySpan.create()
: ContextManager.current.newEntrySpan(operation, carrier);

span.layer = SpanLayer.HTTP;
span.component = Component.AWSLAMBDA_GATEWAYAPIREST;
span.peer = reqCtx?.identity?.sourceIp ?? headers?.['X-Forwarded-For'] ?? 'Unknown';

Expand All @@ -68,7 +67,7 @@ class AWSLambdaGatewayAPIREST extends AWSLambdaTriggerPlugin {

span.start();

return span;
return [span, event];
}

stop(span: Span, err: Error | null, res: any): void {
Expand Down
Loading

0 comments on commit 59ee7a1

Please sign in to comment.