From b29d5eaff62c108dfb7c8e61d600bd275fb7ae12 Mon Sep 17 00:00:00 2001 From: Tomasz Pytel Date: Tue, 22 Feb 2022 11:06:20 -0300 Subject: [PATCH] experimental AWS Lambda Function support (#70) --- .eslintrc.js | 3 + README.md | 21 +++- src/agent/protocol/Protocol.ts | 2 + src/agent/protocol/grpc/GrpcProtocol.ts | 4 + src/agent/protocol/grpc/clients/Client.ts | 2 + .../protocol/grpc/clients/HeartbeatClient.ts | 55 +++++------ .../grpc/clients/TraceReportClient.ts | 70 ++++++++----- src/aws/AWSLambdaGatewayAPIHTTP.ts | 85 ++++++++++++++++ src/aws/AWSLambdaGatewayAPIREST.ts | 87 ++++++++++++++++ src/aws/AWSLambdaTriggerPlugin.ts | 99 +++++++++++++++++++ src/index.ts | 17 +++- src/trace/Component.ts | 3 + src/trace/context/ContextManager.ts | 57 ++++++----- 13 files changed, 419 insertions(+), 86 deletions(-) create mode 100644 src/aws/AWSLambdaGatewayAPIHTTP.ts create mode 100644 src/aws/AWSLambdaGatewayAPIREST.ts create mode 100644 src/aws/AWSLambdaTriggerPlugin.ts diff --git a/.eslintrc.js b/.eslintrc.js index d6defff..c203bb8 100644 --- a/.eslintrc.js +++ b/.eslintrc.js @@ -17,5 +17,8 @@ module.exports = { }, env: { node: true + }, + globals: { + "NodeJS": true } }; diff --git a/README.md b/README.md index a3395f3..07f90b9 100644 --- a/README.md +++ b/README.md @@ -57,7 +57,7 @@ Environment Variable | Description | Default | `SW_AGENT_AUTHENTICATION` | The authentication token to verify that the agent is trusted by the backend OAP, as for how to configure the backend, refer to [the yaml](https://github.com/apache/skywalking/blob/4f0f39ffccdc9b41049903cc540b8904f7c9728e/oap-server/server-bootstrap/src/main/resources/application.yml#L155-L158). | not set | | `SW_AGENT_LOGGING_LEVEL` | The logging level, could be one of `error`, `warn`, `info`, `debug` | `info` | | `SW_AGENT_DISABLE_PLUGINS` | Comma-delimited list of plugins to disable in the plugins directory (e.g. "mysql", "express") | `` | -| `SW_COLD_ENDPOINT` | Cold start detection is as follows: First span to run within 1 second of skywalking init is considered a cold start. This span gets the tag `coldStart` set to 'true'. This span also optionally gets the text '\' appended to the endpoint name if SW_COLD_ENDPOINT is set to 'true'. | `false` | +| `SW_COLD_ENDPOINT` | Cold start detection is as follows: First span to run is considered a cold start. This span gets the tag `coldStart` set to 'true'. This span also optionally gets the text '\' appended to the endpoint name if SW_COLD_ENDPOINT is set to 'true'. | `false` | | `SW_IGNORE_SUFFIX` | The suffices of endpoints that will be ignored (not traced), comma separated | `.jpg,.jpeg,.js,.css,.png,.bmp,.gif,.ico,.mp3,.mp4,.html,.svg` | | `SW_TRACE_IGNORE_PATH` | The paths of endpoints that will be ignored (not traced), comma separated | `` | | `SW_HTTP_IGNORE_METHOD` | Comma-delimited list of http methods to ignore (GET, POST, HEAD, OPTIONS, etc...) | `` | @@ -117,6 +117,25 @@ module.exports = AzureHttpTriggerPlugin.wrap(async function (context, req) { All that needs to be done is the actual trigger function needs to be wrapped with `azureHttpTriggerPlugin.wrap()`, whether that function is a default export or an explicitly named `entryPoint` or `run` or `index`. +## Experimental AWS Lambda Functions Support + +The plugins `AWSLambdaTriggerPlugin`, `AWSLambdaGatewayAPIHTTP` and `AWSLambdaGatewayAPIREST` provide a wrapper functions for AWS Lambda Functions endpoints. `AWSLambdaTriggerPlugin` is a generic wrapper plugin which should work with any kind of Lambda trigger but also stores the least amount of informations since it does not know anything about the incoming data format. For this reason this type of trigger also can not link back to the caller, but it can create a new segment which will be propagated to all downstream children, this starting its own trace. `AWSLambdaGatewayAPIHTTP` and `AWSLambdaGatewayAPIREST` are specific wrappers for Lambda functions triggered by the GatewayAPI HTTP or REST triggers. They have the advantage of knowing the incoming data format and can thus extract existing trace segment information from incoming requests and chain correctly from upstream to any downstream endpoints. + +### Usage: + +```javascript +const {default: agent, AWSLambdaGatewayAPIHTTP} = require('skywalking-backend-js'); + +agent.start({ ... }); + +exports.handler = AWSLambdaGatewayAPIHTTP.wrap(async function (event, context, callback) { + + /* contents of http trigger function */ + +}); +``` + +This is similar to Azure Functions wrapping, just wrap your handler function with `AWSLambdaTriggerPlugin.wrap()` or `AWSLambdaGatewayAPIHTTP.wrap()` or `AWSLambdaGatewayAPIREST.wrap()`. One thing to note is that AWS freezes processes in between invocations of lambda functions so whether you are doing async or sync handler functions with callbacks, you should make sure everything you need to do finishes before returning control to AWS or calling the synchronous callback. These plugins take this into account and automatically flush the segment buffers before closing a trace span. ## Contact Us * Submit [an issue](https://github.com/apache/skywalking/issues/new) by using [Nodejs] as title prefix. diff --git a/src/agent/protocol/Protocol.ts b/src/agent/protocol/Protocol.ts index 778437e..842f3bc 100644 --- a/src/agent/protocol/Protocol.ts +++ b/src/agent/protocol/Protocol.ts @@ -26,4 +26,6 @@ export default interface Protocol { heartbeat(): this; report(): this; + + flush(): Promise | null; } diff --git a/src/agent/protocol/grpc/GrpcProtocol.ts b/src/agent/protocol/grpc/GrpcProtocol.ts index df0bc1f..08ffa63 100644 --- a/src/agent/protocol/grpc/GrpcProtocol.ts +++ b/src/agent/protocol/grpc/GrpcProtocol.ts @@ -43,4 +43,8 @@ export default class GrpcProtocol implements Protocol { this.traceReportClient.start(); return this; } + + flush(): Promise | null { + return this.traceReportClient.flush(); + } } diff --git a/src/agent/protocol/grpc/clients/Client.ts b/src/agent/protocol/grpc/clients/Client.ts index bbd4361..e9916ab 100644 --- a/src/agent/protocol/grpc/clients/Client.ts +++ b/src/agent/protocol/grpc/clients/Client.ts @@ -21,4 +21,6 @@ export default interface Client { readonly isConnected: boolean; start(): void; + + flush(): Promise | null; } diff --git a/src/agent/protocol/grpc/clients/HeartbeatClient.ts b/src/agent/protocol/grpc/clients/HeartbeatClient.ts index 41a7a0e..9d96114 100755 --- a/src/agent/protocol/grpc/clients/HeartbeatClient.ts +++ b/src/agent/protocol/grpc/clients/HeartbeatClient.ts @@ -39,7 +39,7 @@ export default class HeartbeatClient implements Client { constructor() { this.managementServiceClient = new ManagementServiceClient( config.collectorAddress, - config.secure ? grpc.credentials.createSsl() : grpc.credentials.createInsecure() + config.secure ? grpc.credentials.createSsl() : grpc.credentials.createInsecure(), ); } @@ -58,38 +58,35 @@ export default class HeartbeatClient implements Client { } const keepAlivePkg = new InstancePingPkg() - .setService(config.serviceName) - .setServiceinstance(config.serviceInstance); + .setService(config.serviceName) + .setServiceinstance(config.serviceInstance); const instanceProperties = new InstanceProperties() - .setService(config.serviceName) - .setServiceinstance(config.serviceInstance) - .setPropertiesList([ - new KeyStringValuePair().setKey('language').setValue('NodeJS'), - new KeyStringValuePair().setKey('OS Name').setValue(os.platform()), - new KeyStringValuePair().setValue('hostname').setValue(os.hostname()), - new KeyStringValuePair().setValue('Process No.').setValue(`${process.pid}`), - ]); + .setService(config.serviceName) + .setServiceinstance(config.serviceInstance) + .setPropertiesList([ + new KeyStringValuePair().setKey('language').setValue('NodeJS'), + new KeyStringValuePair().setKey('OS Name').setValue(os.platform()), + new KeyStringValuePair().setValue('hostname').setValue(os.hostname()), + new KeyStringValuePair().setValue('Process No.').setValue(`${process.pid}`), + ]); this.heartbeatTimer = setInterval(() => { - this.managementServiceClient.reportInstanceProperties( - instanceProperties, - AuthInterceptor(), - (error, _) => { - if (error) { - logger.error('Failed to send heartbeat', error); - } - }, - ); - this.managementServiceClient.keepAlive( - keepAlivePkg, - AuthInterceptor(), - (error, _) => { - if (error) { - logger.error('Failed to send heartbeat', error); - } - }, - ); + this.managementServiceClient.reportInstanceProperties(instanceProperties, AuthInterceptor(), (error, _) => { + if (error) { + logger.error('Failed to send heartbeat', error); + } + }); + this.managementServiceClient.keepAlive(keepAlivePkg, AuthInterceptor(), (error, _) => { + if (error) { + logger.error('Failed to send heartbeat', error); + } + }); }, 20000).unref(); } + + flush(): Promise | null { + logger.warn('HeartbeatClient does not need flush().'); + return null; + } } diff --git a/src/agent/protocol/grpc/clients/TraceReportClient.ts b/src/agent/protocol/grpc/clients/TraceReportClient.ts index 0296800..1ef0003 100755 --- a/src/agent/protocol/grpc/clients/TraceReportClient.ts +++ b/src/agent/protocol/grpc/clients/TraceReportClient.ts @@ -38,7 +38,7 @@ export default class TraceReportClient implements Client { constructor() { this.reporterClient = new TraceSegmentReportServiceClient( config.collectorAddress, - config.secure ? grpc.credentials.createSsl() : grpc.credentials.createInsecure() + config.secure ? grpc.credentials.createSsl() : grpc.credentials.createInsecure(), ); emitter.on('segment-finished', (segment) => { this.buffer.push(segment); @@ -50,41 +50,57 @@ export default class TraceReportClient implements Client { return this.reporterClient?.getChannel().getConnectivityState(true) === connectivityState.READY; } - start() { - const reportFunction = () => { - emitter.emit('segments-sent'); // reset limiter in SpanContext + private reportFunction(callback?: any) { + emitter.emit('segments-sent'); // reset limiter in SpanContext - try { - if (this.buffer.length === 0) { - return; - } + try { + if (this.buffer.length === 0) { + if (callback) callback(); - const stream = this.reporterClient.collect(AuthInterceptor(), (error, _) => { - if (error) { - logger.error('Failed to report trace data', error); - } - }); + return; + } + + const stream = this.reporterClient.collect(AuthInterceptor(), (error, _) => { + if (error) { + logger.error('Failed to report trace data', error); + } - try { - for (const segment of this.buffer) { - if (segment) { - if (logger._isDebugEnabled) { - logger.debug('Sending segment ', { segment }); - } + if (callback) callback(); + }); - stream.write(new SegmentObjectAdapter(segment)); + try { + for (const segment of this.buffer) { + if (segment) { + if (logger._isDebugEnabled) { + logger.debug('Sending segment ', { segment }); } + + stream.write(new SegmentObjectAdapter(segment)); } - } finally { - this.buffer.length = 0; } - - stream.end(); } finally { - this.timeout = setTimeout(reportFunction, 1000).unref(); + this.buffer.length = 0; } - }; - this.timeout = setTimeout(reportFunction, 1000).unref(); + stream.end(); + } finally { + this.timeout = setTimeout(this.reportFunction.bind(this), 1000).unref(); + } + } + + start() { + this.timeout = setTimeout(this.reportFunction.bind(this), 1000).unref(); + } + + flush(): Promise | null { + // This function explicitly returns null instead of a resolved Promise in case of nothing to flush so that in this + // case passing control back to the event loop can be avoided. Even a resolved Promise will run other things in + // the event loop when it is awaited and before it continues. + + return this.buffer.length === 0 + ? null + : new Promise((resolve) => { + this.reportFunction(resolve); + }); } } diff --git a/src/aws/AWSLambdaGatewayAPIHTTP.ts b/src/aws/AWSLambdaGatewayAPIHTTP.ts new file mode 100644 index 0000000..9422358 --- /dev/null +++ b/src/aws/AWSLambdaGatewayAPIHTTP.ts @@ -0,0 +1,85 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { URL } from 'url'; +import ContextManager from '../trace/context/ContextManager'; +import { Component } from '../trace/Component'; +import Tag from '../Tag'; +import { SpanLayer } from '../proto/language-agent/Tracing_pb'; +import { ContextCarrier } from '../trace/context/ContextCarrier'; +import Span from '../trace/span/Span'; +import DummySpan from '../trace/span/DummySpan'; +import { ignoreHttpMethodCheck } from '../config/AgentConfig'; +import { AWSLambdaTriggerPlugin } from './AWSLambdaTriggerPlugin'; + +class AWSLambdaGatewayAPIHTTP extends AWSLambdaTriggerPlugin { + start(event: any, context: any): Span { + const headers = event.headers; + const reqCtx = event.requestContext; + const http = reqCtx?.http; + const method = http?.method; + const proto = http?.protocol ? http.protocol.split('/')[0].toLowerCase() : headers?.['x-forwarded-proto']; + const port = headers?.['x-forwarded-port'] || ''; + const host = headers?.host ?? (reqCtx?.domainName || ''); + const hostport = host ? (port ? `${host}:${port}` : host) : port; + const operation = http?.path ?? event.rawPath ?? (context.functionName ? `/${context.functionName}` : '/'); + + const query = event.rawQueryString + ? `?${event.rawQueryString}` + : event.queryStringParameters + ? '?' + + Object.entries(event.queryStringParameters) + .map(([k, v]) => `${k}=${v}`) + .join('&') + : ''; + + const carrier = headers && ContextCarrier.from(headers); + + const span = + method && ignoreHttpMethodCheck(method) + ? DummySpan.create() + : ContextManager.current.newEntrySpan(operation, carrier); + + span.layer = SpanLayer.HTTP; + span.component = Component.AWSLAMBDA_GATEWAYAPIHTTP; + span.peer = http?.sourceIp ?? headers?.['x-forwarded-for'] ?? 'Unknown'; + + if (method) span.tag(Tag.httpMethod(method)); + + if (hostport && proto) span.tag(Tag.httpURL(new URL(`${proto}://${hostport}${operation}${query}`).toString())); + + span.start(); + + return span; + } + + stop(span: Span, err: Error | null, res: any): void { + const statusCode = res?.statusCode || (typeof res === 'number' ? res : err ? 500 : null); + + if (statusCode) { + if (statusCode >= 400) span.errored = true; + + span.tag(Tag.httpStatusCode(statusCode)); + } + + span.stop(); + } +} + +export default new AWSLambdaGatewayAPIHTTP(); diff --git a/src/aws/AWSLambdaGatewayAPIREST.ts b/src/aws/AWSLambdaGatewayAPIREST.ts new file mode 100644 index 0000000..7208133 --- /dev/null +++ b/src/aws/AWSLambdaGatewayAPIREST.ts @@ -0,0 +1,87 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import { URL } from 'url'; +import ContextManager from '../trace/context/ContextManager'; +import { Component } from '../trace/Component'; +import Tag from '../Tag'; +import { SpanLayer } from '../proto/language-agent/Tracing_pb'; +import { ContextCarrier } from '../trace/context/ContextCarrier'; +import Span from '../trace/span/Span'; +import DummySpan from '../trace/span/DummySpan'; +import { ignoreHttpMethodCheck } from '../config/AgentConfig'; +import { AWSLambdaTriggerPlugin } from './AWSLambdaTriggerPlugin'; + +class AWSLambdaGatewayAPIREST extends AWSLambdaTriggerPlugin { + start(event: any, context: any): Span { + const headers = event.headers; + const reqCtx = event.requestContext; + const method = reqCtx?.httpMethod ?? event.httpMethod; + const proto = reqCtx?.protocol ? reqCtx.protocol.split('/')[0].toLowerCase() : headers?.['X-Forwarded-Proto']; + const port = headers?.['X-Forwarded-Port'] || ''; + const host = headers?.Host ?? (reqCtx?.domainName || ''); + const hostport = host ? (port ? `${host}:${port}` : host) : port; + const operation = reqCtx?.path ?? event.path ?? (context.functionName ? `/${context.functionName}` : '/'); + + const query = event.multiValueQueryStringParameters + ? '?' + + Object.entries(event.multiValueQueryStringParameters) + .map(([k, vs]: any[]) => vs.map((v: String) => `${k}=${v}`).join('&')) + .join('&') + : event.queryStringParameters + ? '?' + + Object.entries(event.queryStringParameters) + .map(([k, v]) => `${k}=${v}`) + .join('&') + : ''; + + const carrier = headers && ContextCarrier.from(headers); + + const span = + method && ignoreHttpMethodCheck(method) + ? DummySpan.create() + : ContextManager.current.newEntrySpan(operation, carrier); + + span.layer = SpanLayer.HTTP; + span.component = Component.AWSLAMBDA_GATEWAYAPIREST; + span.peer = reqCtx?.identity?.sourceIp ?? headers?.['X-Forwarded-For'] ?? 'Unknown'; + + if (method) span.tag(Tag.httpMethod(method)); + + if (hostport && proto) span.tag(Tag.httpURL(new URL(`${proto}://${hostport}${operation}${query}`).toString())); + + span.start(); + + return span; + } + + stop(span: Span, err: Error | null, res: any): void { + const statusCode = res?.statusCode || (typeof res === 'number' ? res : err ? 500 : null); + + if (statusCode) { + if (statusCode >= 400) span.errored = true; + + span.tag(Tag.httpStatusCode(statusCode)); + } + + span.stop(); + } +} + +export default new AWSLambdaGatewayAPIREST(); diff --git a/src/aws/AWSLambdaTriggerPlugin.ts b/src/aws/AWSLambdaTriggerPlugin.ts new file mode 100644 index 0000000..a26900b --- /dev/null +++ b/src/aws/AWSLambdaTriggerPlugin.ts @@ -0,0 +1,99 @@ +/*! + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +import ContextManager from '../trace/context/ContextManager'; +import { Component } from '../trace/Component'; +import Span from '../trace/span/Span'; +import { default as agent } from '../index'; + +class AWSLambdaTriggerPlugin { + // default working start function, should be overridden by the various types of lambda trigger subclasses + start(event: any, context: any): Span { + const span = ContextManager.current.newEntrySpan(context.functionName ? `/${context.functionName}` : '/'); + + span.component = Component.AWSLAMBDA_FUNCTION; + span.peer = 'Unknown'; + + span.start(); + + return span; + } + + // default working stop function + stop(span: Span, err: Error | null, res: any): void { + span.stop(); + } + + wrap(func: any) { + return async (event: any, context: any, callback: any) => { + ContextManager.removeTailFinishedContexts(); // need this because AWS seems to chain sequential independent operations linearly instead of hierarchically + + const span = this.start(event, context); + let ret: any; + + let stop = async (err: Error | null, res: any) => { + stop = async (err: Error | null, res: any) => {}; + + this.stop(span, err, res); + + const p = agent.flush(); // flush all data before aws freezes the process on exit + + if (p) await p; + + return res; + }; + + let resolve: any; + let reject: any; + let callbackDone = false; + + const callbackPromise = new Promise((_resolve: any, _reject: any) => { + resolve = _resolve; + reject = _reject; + }); + + try { + ret = func(event, context, (err: Error | null, res: any) => { + if (!callbackDone) { + callbackDone = true; + + if (err) reject(err); + else resolve(res); + } + }); + + if (typeof ret?.then !== 'function') + // generic Promise check + ret = callbackPromise; + + return stop(null, await ret); + } catch (e) { + span.error(e); + stop(e, null); + + throw e; + } + }; + } +} + +// noinspection JSUnusedGlobalSymbols +export default new AWSLambdaTriggerPlugin(); + +export { AWSLambdaTriggerPlugin }; diff --git a/src/index.ts b/src/index.ts index 7d9371b..041ee94 100644 --- a/src/index.ts +++ b/src/index.ts @@ -18,6 +18,7 @@ */ import config, { AgentConfig, finalizeConfig } from './config/AgentConfig'; +import Protocol from './agent/protocol/Protocol'; import GrpcProtocol from './agent/protocol/grpc/GrpcProtocol'; import { createLogger } from './logging'; import PluginInstaller from './core/PluginInstaller'; @@ -26,6 +27,7 @@ const logger = createLogger(__filename); class Agent { private started = false; + private protocol: Protocol | null = null; start(options: AgentConfig = {}): void { if (process.env.SW_DISABLE === 'true') { @@ -43,14 +45,25 @@ class Agent { logger.debug('Starting SkyWalking agent'); + new PluginInstaller().install(); + + this.protocol = new GrpcProtocol().heartbeat().report(); this.started = true; + } - new PluginInstaller().install(); + flush(): Promise | null { + if (this.protocol === null) { + logger.warn('Trying to flush() SkyWalking agent which is not started.'); + return null; + } - new GrpcProtocol().heartbeat().report(); + return this.protocol.flush(); } } export default new Agent(); export { default as ContextManager } from './trace/context/ContextManager'; export { default as AzureHttpTriggerPlugin } from './azure/AzureHttpTriggerPlugin'; +export { default as AWSLambdaTriggerPlugin } from './aws/AWSLambdaTriggerPlugin'; +export { default as AWSLambdaGatewayAPIHTTP } from './aws/AWSLambdaGatewayAPIHTTP'; +export { default as AWSLambdaGatewayAPIREST } from './aws/AWSLambdaGatewayAPIREST'; diff --git a/src/trace/Component.ts b/src/trace/Component.ts index 6b45f17..6c85cff 100644 --- a/src/trace/Component.ts +++ b/src/trace/Component.ts @@ -28,6 +28,9 @@ export class Component { static readonly RABBITMQ_PRODUCER = new Component(52); static readonly RABBITMQ_CONSUMER = new Component(53); static readonly AZURE_HTTPTRIGGER = new Component(111); + static readonly AWSLAMBDA_FUNCTION = new Component(124); + static readonly AWSLAMBDA_GATEWAYAPIHTTP = new Component(125); + static readonly AWSLAMBDA_GATEWAYAPIREST = new Component(126); static readonly EXPRESS = new Component(4002); static readonly AXIOS = new Component(4005); static readonly MONGOOSE = new Component(4006); diff --git a/src/trace/context/ContextManager.ts b/src/trace/context/ContextManager.ts index b0ec89f..cb5da3b 100644 --- a/src/trace/context/ContextManager.ts +++ b/src/trace/context/ContextManager.ts @@ -34,7 +34,8 @@ let store: { if (async_hooks.AsyncLocalStorage) { store = new async_hooks.AsyncLocalStorage(); -} else { // Node 10 doesn't have AsyncLocalStore, so recreate it +} else { + // Node 10 doesn't have AsyncLocalStore, so recreate it const executionAsyncId = async_hooks.executionAsyncId; const asyncLocalStore: { [index: string]: any } = {}; @@ -48,23 +49,21 @@ if (async_hooks.AsyncLocalStorage) { }, }; - async_hooks.createHook({ - init(asyncId: number, type: string, triggerId: number) { - asyncLocalStore[asyncId] = asyncLocalStore[triggerId]; - }, - destroy(asyncId: number) { - delete asyncLocalStore[asyncId]; - }, - }).enable(); + async_hooks + .createHook({ + init(asyncId: number, type: string, triggerId: number) { + asyncLocalStore[asyncId] = asyncLocalStore[triggerId]; + }, + destroy(asyncId: number) { + delete asyncLocalStore[asyncId]; + }, + }) + .enable(); } class ContextManager { isCold = true; - cosntructor() { - setTimeout(() => this.isCold = false, 1000).unref(); - } - checkCold(): boolean { const isCold = this.isCold; this.isCold = false; @@ -87,7 +86,7 @@ class ContextManager { const spans = store.getStore()?.spans; return spans?.[spans.length - 1] as Span; - }; + } get hasContext(): boolean | undefined { return Boolean(store.getStore()?.spans.length); @@ -96,11 +95,9 @@ class ContextManager { get current(): Context { const asyncState = this.asyncState; - if (asyncState.spans.length) - return asyncState.spans[asyncState.spans.length - 1].context; + if (asyncState.spans.length) return asyncState.spans[asyncState.spans.length - 1].context; - if (SpanContext.nActiveSegments < config.maxBufferSize) - return new SpanContext(); + if (SpanContext.nActiveSegments < config.maxBufferSize) return new SpanContext(); return new DummyContext(); } @@ -124,23 +121,30 @@ class ContextManager { } clear(span: Span): void { - const spans = this.spansDup(); // this needed to make sure async tasks created before this call will still have this span at the top of their span list + const spans = this.spansDup(); // this needed to make sure async tasks created before this call will still have this span at the top of their span list const idx = spans.indexOf(span); - if (idx !== -1) - spans.splice(idx, 1); + if (idx !== -1) spans.splice(idx, 1); } restore(span: Span): void { const spans = this.spansDup(); - if (spans.indexOf(span) === -1) - spans.push(span); + if (spans.indexOf(span) === -1) spans.push(span); + } + + removeTailFinishedContexts(): void { + // XXX: Normally, SpanContexts that finish and send their segments can remain in the span lists of async contexts. + // This is so that if an async child that was spawned by the original span code and is executed after the parent + // finishes and creates its own span can be linked to the parent segment and span correctly. But in some situations + // where successive independent operations are chained linearly instead of hierarchically (AWS Lambda functions), + // this can cause a false reference by a subsequent operation as if it were a child of the finished previous span. + + for (const spans = this.asyncState.spans; spans.length && spans[spans.length - 1].context.finished; spans.pop()); } withSpan(span: Span, callback: (...args: any[]) => any, ...args: any[]): any { - if (!span.startTime) - span.start(); + if (!span.startTime) span.start(); try { return callback(...args); } catch (e) { @@ -152,8 +156,7 @@ class ContextManager { } async withSpanAwait(span: Span, callback: (...args: any[]) => any, ...args: any[]): Promise { - if (!span.startTime) - span.start(); + if (!span.startTime) span.start(); try { return await callback(...args); } catch (e) {