Skip to content

Commit

Permalink
experimental AWS Lambda Function support (#70)
Browse files Browse the repository at this point in the history
  • Loading branch information
tom-pytel authored Feb 22, 2022
1 parent 8045364 commit b29d5ea
Show file tree
Hide file tree
Showing 13 changed files with 419 additions and 86 deletions.
3 changes: 3 additions & 0 deletions .eslintrc.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,8 @@ module.exports = {
},
env: {
node: true
},
globals: {
"NodeJS": true
}
};
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ Environment Variable | Description | Default
| `SW_AGENT_AUTHENTICATION` | The authentication token to verify that the agent is trusted by the backend OAP, as for how to configure the backend, refer to [the yaml](https://github.com/apache/skywalking/blob/4f0f39ffccdc9b41049903cc540b8904f7c9728e/oap-server/server-bootstrap/src/main/resources/application.yml#L155-L158). | not set |
| `SW_AGENT_LOGGING_LEVEL` | The logging level, could be one of `error`, `warn`, `info`, `debug` | `info` |
| `SW_AGENT_DISABLE_PLUGINS` | Comma-delimited list of plugins to disable in the plugins directory (e.g. "mysql", "express") | `` |
| `SW_COLD_ENDPOINT` | Cold start detection is as follows: First span to run within 1 second of skywalking init is considered a cold start. This span gets the tag `coldStart` set to 'true'. This span also optionally gets the text '\<cold\>' appended to the endpoint name if SW_COLD_ENDPOINT is set to 'true'. | `false` |
| `SW_COLD_ENDPOINT` | Cold start detection is as follows: First span to run is considered a cold start. This span gets the tag `coldStart` set to 'true'. This span also optionally gets the text '\<cold\>' appended to the endpoint name if SW_COLD_ENDPOINT is set to 'true'. | `false` |
| `SW_IGNORE_SUFFIX` | The suffices of endpoints that will be ignored (not traced), comma separated | `.jpg,.jpeg,.js,.css,.png,.bmp,.gif,.ico,.mp3,.mp4,.html,.svg` |
| `SW_TRACE_IGNORE_PATH` | The paths of endpoints that will be ignored (not traced), comma separated | `` |
| `SW_HTTP_IGNORE_METHOD` | Comma-delimited list of http methods to ignore (GET, POST, HEAD, OPTIONS, etc...) | `` |
Expand Down Expand Up @@ -117,6 +117,25 @@ module.exports = AzureHttpTriggerPlugin.wrap(async function (context, req) {

All that needs to be done is the actual trigger function needs to be wrapped with `azureHttpTriggerPlugin.wrap()`, whether that function is a default export or an explicitly named `entryPoint` or `run` or `index`.

## Experimental AWS Lambda Functions Support

The plugins `AWSLambdaTriggerPlugin`, `AWSLambdaGatewayAPIHTTP` and `AWSLambdaGatewayAPIREST` provide a wrapper functions for AWS Lambda Functions endpoints. `AWSLambdaTriggerPlugin` is a generic wrapper plugin which should work with any kind of Lambda trigger but also stores the least amount of informations since it does not know anything about the incoming data format. For this reason this type of trigger also can not link back to the caller, but it can create a new segment which will be propagated to all downstream children, this starting its own trace. `AWSLambdaGatewayAPIHTTP` and `AWSLambdaGatewayAPIREST` are specific wrappers for Lambda functions triggered by the GatewayAPI HTTP or REST triggers. They have the advantage of knowing the incoming data format and can thus extract existing trace segment information from incoming requests and chain correctly from upstream to any downstream endpoints.

### Usage:

```javascript
const {default: agent, AWSLambdaGatewayAPIHTTP} = require('skywalking-backend-js');

agent.start({ ... });

exports.handler = AWSLambdaGatewayAPIHTTP.wrap(async function (event, context, callback) {

/* contents of http trigger function */

});
```

This is similar to Azure Functions wrapping, just wrap your handler function with `AWSLambdaTriggerPlugin.wrap()` or `AWSLambdaGatewayAPIHTTP.wrap()` or `AWSLambdaGatewayAPIREST.wrap()`. One thing to note is that AWS freezes processes in between invocations of lambda functions so whether you are doing async or sync handler functions with callbacks, you should make sure everything you need to do finishes before returning control to AWS or calling the synchronous callback. These plugins take this into account and automatically flush the segment buffers before closing a trace span.

## Contact Us
* Submit [an issue](https://github.com/apache/skywalking/issues/new) by using [Nodejs] as title prefix.
Expand Down
2 changes: 2 additions & 0 deletions src/agent/protocol/Protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,6 @@ export default interface Protocol {
heartbeat(): this;

report(): this;

flush(): Promise<any> | null;
}
4 changes: 4 additions & 0 deletions src/agent/protocol/grpc/GrpcProtocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ export default class GrpcProtocol implements Protocol {
this.traceReportClient.start();
return this;
}

flush(): Promise<any> | null {
return this.traceReportClient.flush();
}
}
2 changes: 2 additions & 0 deletions src/agent/protocol/grpc/clients/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,6 @@ export default interface Client {
readonly isConnected: boolean;

start(): void;

flush(): Promise<any> | null;
}
55 changes: 26 additions & 29 deletions src/agent/protocol/grpc/clients/HeartbeatClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export default class HeartbeatClient implements Client {
constructor() {
this.managementServiceClient = new ManagementServiceClient(
config.collectorAddress,
config.secure ? grpc.credentials.createSsl() : grpc.credentials.createInsecure()
config.secure ? grpc.credentials.createSsl() : grpc.credentials.createInsecure(),
);
}

Expand All @@ -58,38 +58,35 @@ export default class HeartbeatClient implements Client {
}

const keepAlivePkg = new InstancePingPkg()
.setService(config.serviceName)
.setServiceinstance(config.serviceInstance);
.setService(config.serviceName)
.setServiceinstance(config.serviceInstance);

const instanceProperties = new InstanceProperties()
.setService(config.serviceName)
.setServiceinstance(config.serviceInstance)
.setPropertiesList([
new KeyStringValuePair().setKey('language').setValue('NodeJS'),
new KeyStringValuePair().setKey('OS Name').setValue(os.platform()),
new KeyStringValuePair().setValue('hostname').setValue(os.hostname()),
new KeyStringValuePair().setValue('Process No.').setValue(`${process.pid}`),
]);
.setService(config.serviceName)
.setServiceinstance(config.serviceInstance)
.setPropertiesList([
new KeyStringValuePair().setKey('language').setValue('NodeJS'),
new KeyStringValuePair().setKey('OS Name').setValue(os.platform()),
new KeyStringValuePair().setValue('hostname').setValue(os.hostname()),
new KeyStringValuePair().setValue('Process No.').setValue(`${process.pid}`),
]);

this.heartbeatTimer = setInterval(() => {
this.managementServiceClient.reportInstanceProperties(
instanceProperties,
AuthInterceptor(),
(error, _) => {
if (error) {
logger.error('Failed to send heartbeat', error);
}
},
);
this.managementServiceClient.keepAlive(
keepAlivePkg,
AuthInterceptor(),
(error, _) => {
if (error) {
logger.error('Failed to send heartbeat', error);
}
},
);
this.managementServiceClient.reportInstanceProperties(instanceProperties, AuthInterceptor(), (error, _) => {
if (error) {
logger.error('Failed to send heartbeat', error);
}
});
this.managementServiceClient.keepAlive(keepAlivePkg, AuthInterceptor(), (error, _) => {
if (error) {
logger.error('Failed to send heartbeat', error);
}
});
}, 20000).unref();
}

flush(): Promise<any> | null {
logger.warn('HeartbeatClient does not need flush().');
return null;
}
}
70 changes: 43 additions & 27 deletions src/agent/protocol/grpc/clients/TraceReportClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export default class TraceReportClient implements Client {
constructor() {
this.reporterClient = new TraceSegmentReportServiceClient(
config.collectorAddress,
config.secure ? grpc.credentials.createSsl() : grpc.credentials.createInsecure()
config.secure ? grpc.credentials.createSsl() : grpc.credentials.createInsecure(),
);
emitter.on('segment-finished', (segment) => {
this.buffer.push(segment);
Expand All @@ -50,41 +50,57 @@ export default class TraceReportClient implements Client {
return this.reporterClient?.getChannel().getConnectivityState(true) === connectivityState.READY;
}

start() {
const reportFunction = () => {
emitter.emit('segments-sent'); // reset limiter in SpanContext
private reportFunction(callback?: any) {
emitter.emit('segments-sent'); // reset limiter in SpanContext

try {
if (this.buffer.length === 0) {
return;
}
try {
if (this.buffer.length === 0) {
if (callback) callback();

const stream = this.reporterClient.collect(AuthInterceptor(), (error, _) => {
if (error) {
logger.error('Failed to report trace data', error);
}
});
return;
}

const stream = this.reporterClient.collect(AuthInterceptor(), (error, _) => {
if (error) {
logger.error('Failed to report trace data', error);
}

try {
for (const segment of this.buffer) {
if (segment) {
if (logger._isDebugEnabled) {
logger.debug('Sending segment ', { segment });
}
if (callback) callback();
});

stream.write(new SegmentObjectAdapter(segment));
try {
for (const segment of this.buffer) {
if (segment) {
if (logger._isDebugEnabled) {
logger.debug('Sending segment ', { segment });
}

stream.write(new SegmentObjectAdapter(segment));
}
} finally {
this.buffer.length = 0;
}

stream.end();
} finally {
this.timeout = setTimeout(reportFunction, 1000).unref();
this.buffer.length = 0;
}
};

this.timeout = setTimeout(reportFunction, 1000).unref();
stream.end();
} finally {
this.timeout = setTimeout(this.reportFunction.bind(this), 1000).unref();
}
}

start() {
this.timeout = setTimeout(this.reportFunction.bind(this), 1000).unref();
}

flush(): Promise<any> | null {
// This function explicitly returns null instead of a resolved Promise in case of nothing to flush so that in this
// case passing control back to the event loop can be avoided. Even a resolved Promise will run other things in
// the event loop when it is awaited and before it continues.

return this.buffer.length === 0
? null
: new Promise((resolve) => {
this.reportFunction(resolve);
});
}
}
85 changes: 85 additions & 0 deletions src/aws/AWSLambdaGatewayAPIHTTP.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*!
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

import { URL } from 'url';
import ContextManager from '../trace/context/ContextManager';
import { Component } from '../trace/Component';
import Tag from '../Tag';
import { SpanLayer } from '../proto/language-agent/Tracing_pb';
import { ContextCarrier } from '../trace/context/ContextCarrier';
import Span from '../trace/span/Span';
import DummySpan from '../trace/span/DummySpan';
import { ignoreHttpMethodCheck } from '../config/AgentConfig';
import { AWSLambdaTriggerPlugin } from './AWSLambdaTriggerPlugin';

class AWSLambdaGatewayAPIHTTP extends AWSLambdaTriggerPlugin {
start(event: any, context: any): Span {
const headers = event.headers;
const reqCtx = event.requestContext;
const http = reqCtx?.http;
const method = http?.method;
const proto = http?.protocol ? http.protocol.split('/')[0].toLowerCase() : headers?.['x-forwarded-proto'];
const port = headers?.['x-forwarded-port'] || '';
const host = headers?.host ?? (reqCtx?.domainName || '');
const hostport = host ? (port ? `${host}:${port}` : host) : port;
const operation = http?.path ?? event.rawPath ?? (context.functionName ? `/${context.functionName}` : '/');

const query = event.rawQueryString
? `?${event.rawQueryString}`
: event.queryStringParameters
? '?' +
Object.entries(event.queryStringParameters)
.map(([k, v]) => `${k}=${v}`)
.join('&')
: '';

const carrier = headers && ContextCarrier.from(headers);

const span =
method && ignoreHttpMethodCheck(method)
? DummySpan.create()
: ContextManager.current.newEntrySpan(operation, carrier);

span.layer = SpanLayer.HTTP;
span.component = Component.AWSLAMBDA_GATEWAYAPIHTTP;
span.peer = http?.sourceIp ?? headers?.['x-forwarded-for'] ?? 'Unknown';

if (method) span.tag(Tag.httpMethod(method));

if (hostport && proto) span.tag(Tag.httpURL(new URL(`${proto}://${hostport}${operation}${query}`).toString()));

span.start();

return span;
}

stop(span: Span, err: Error | null, res: any): void {
const statusCode = res?.statusCode || (typeof res === 'number' ? res : err ? 500 : null);

if (statusCode) {
if (statusCode >= 400) span.errored = true;

span.tag(Tag.httpStatusCode(statusCode));
}

span.stop();
}
}

export default new AWSLambdaGatewayAPIHTTP();
Loading

0 comments on commit b29d5ea

Please sign in to comment.