Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing async and preparing child entry/exit #36

Merged
merged 8 commits into from
Mar 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ Environment Variable | Description | Default
| `SW_AGENT_INSTANCE` | The name of the service instance | Randomly generated |
| `SW_AGENT_COLLECTOR_BACKEND_SERVICES` | The backend OAP server address | `127.0.0.1:11800` |
| `SW_AGENT_AUTHENTICATION` | The authentication token to verify that the agent is trusted by the backend OAP, as for how to configure the backend, refer to [the yaml](https://github.com/apache/skywalking/blob/4f0f39ffccdc9b41049903cc540b8904f7c9728e/oap-server/server-bootstrap/src/main/resources/application.yml#L155-L158). | not set |
| `SW_AGENT_LOGGING_LEVEL` | The logging level, could be one of `CRITICAL`, `FATAL`, `ERROR`, `WARN`(`WARNING`), `INFO`, `DEBUG` | `INFO` |
| `SW_AGENT_LOGGING_LEVEL` | The logging level, could be one of `error`, `warn`, `info`, `debug` | `info` |
| `SW_AGENT_DISABLE_PLUGINS` | Comma-delimited list of plugins to disable in the plugins directory (e.g. "mysql", "express"). | `` |
| `SW_IGNORE_SUFFIX` | The suffices of endpoints that will be ignored (not traced), comma separated | `.jpg,.jpeg,.js,.css,.png,.bmp,.gif,.ico,.mp3,.mp4,.html,.svg` |
| `SW_TRACE_IGNORE_PATH` | The paths of endpoints that will be ignored (not traced), comma separated | `` |
| `SW_SQL_TRACE_PARAMETERS` | If set to 'true' then SQL query parameters will be included | `false` |
Expand Down
24 changes: 15 additions & 9 deletions src/config/AgentConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,23 @@ export type AgentConfig = {
collectorAddress?: string;
authorization?: string;
maxBufferSize?: number;
disablePlugins?: string;
ignoreSuffix?: string;
traceIgnorePath?: string;
sql_trace_parameters?: boolean;
sql_parameters_max_length?: number;
mongo_trace_parameters?: boolean;
mongo_parameters_max_length?: number;
sqlTraceParameters?: boolean;
sqlParametersMaxLength?: number;
mongoTraceParameters?: boolean;
mongoParametersMaxLength?: number;
// the following is internal state computed from config values
reDisablePlugins?: RegExp;
reIgnoreOperation?: RegExp;
};

export function finalizeConfig(config: AgentConfig): void {
const escapeRegExp = (s: string) => s.replace(/([.*+?^=!:${}()|\[\]\/\\])/g, "\\$1");

config.reDisablePlugins = RegExp(`^(?:${config.disablePlugins!.split(',').map((s) => escapeRegExp(s.trim()) + 'Plugin\\.js').join('|')})$`, 'i');

const ignoreSuffix =`^.+(?:${config.ignoreSuffix!.split(',').map((s) => escapeRegExp(s.trim())).join('|')})$`;
const ignorePath = '^(?:' + config.traceIgnorePath!.split(',').map(
(s1) => s1.trim().split('**').map(
Expand All @@ -61,11 +65,13 @@ export default {
authorization: process.env.SW_AGENT_AUTHENTICATION,
maxBufferSize: Number.isSafeInteger(process.env.SW_AGENT_MAX_BUFFER_SIZE) ?
Number.parseInt(process.env.SW_AGENT_MAX_BUFFER_SIZE as string, 10) : 1000,
disablePlugins: process.env.SW_AGENT_DISABLE_PLUGINS || '',
ignoreSuffix: process.env.SW_IGNORE_SUFFIX ?? '.jpg,.jpeg,.js,.css,.png,.bmp,.gif,.ico,.mp3,.mp4,.html,.svg',
traceIgnorePath: process.env.SW_TRACE_IGNORE_PATH || '',
sql_trace_parameters: (process.env.SW_SQL_TRACE_PARAMETERS || '').toLowerCase() === 'true',
sql_parameters_max_length: Math.trunc(Math.max(0, Number(process.env.SW_SQL_PARAMETERS_MAX_LENGTH))) || 512,
mongo_trace_parameters: (process.env.SW_MONGO_TRACE_PARAMETERS || '').toLowerCase() === 'true',
mongo_parameters_max_length: Math.trunc(Math.max(0, Number(process.env.SW_MONGO_PARAMETERS_MAX_LENGTH))) || 512,
reIgnoreOperation: RegExp(''), // temporary placeholder so Typescript doesn't throw a fit
sqlTraceParameters: (process.env.SW_SQL_TRACE_PARAMETERS || '').toLowerCase() === 'true',
sqlParametersMaxLength: Math.trunc(Math.max(0, Number(process.env.SW_SQL_PARAMETERS_MAX_LENGTH))) || 512,
mongoTraceParameters: (process.env.SW_MONGO_TRACE_PARAMETERS || '').toLowerCase() === 'true',
mongoParametersMaxLength: Math.trunc(Math.max(0, Number(process.env.SW_MONGO_PARAMETERS_MAX_LENGTH))) || 512,
reDisablePlugins: RegExp(''), // temporary placeholder so Typescript doesn't throw a fit
reIgnoreOperation: RegExp(''),
};
6 changes: 6 additions & 0 deletions src/core/PluginInstaller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import * as path from 'path';
import SwPlugin from '../core/SwPlugin';
import { createLogger } from '../logging';
import * as semver from 'semver';
import config from '../config/AgentConfig';

const logger = createLogger(__filename);

Expand Down Expand Up @@ -76,6 +77,11 @@ export default class PluginInstaller {
fs.readdirSync(this.pluginDir)
.filter((file) => !(file.endsWith('.d.ts') || file.endsWith('.js.map')))
.forEach((file) => {
if (file.match(config.reDisablePlugins)) {
logger.info(`Plugin ${file} not installed because it is disabled`);
return;
}

let plugin;
const pluginFile = path.join(this.pluginDir, file);

Expand Down
2 changes: 1 addition & 1 deletion src/plugins/AMQPLibPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class AMQPLibPlugin implements SwPlugin {
const queue = fields.routingKey || '';
const peer = `${this.connection.stream.remoteAddress}:${this.connection.stream.remotePort}`;

const span = ContextManager.current.newExitSpan('RabbitMQ/' + topic + '/' + queue + '/Producer', peer).start();
const span = ContextManager.current.newExitSpan('RabbitMQ/' + topic + '/' + queue + '/Producer', peer, Component.RABBITMQ_PRODUCER).start();

try {
span.inject().items.forEach((item) => {
Expand Down
20 changes: 13 additions & 7 deletions src/plugins/AxiosPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ class AxiosPlugin implements SwPlugin {

defaults.adapter = (config: any) => {
const { host, pathname: operation } = new URL(config.url); // TODO: this may throw invalid URL
const span = ContextManager.current.newExitSpan(operation, host).start();
const span = ContextManager.current.newExitSpan(operation, host, Component.AXIOS, Component.HTTP).start();

let ret: any;

try {
span.component = Component.AXIOS;
Expand All @@ -51,7 +53,7 @@ class AxiosPlugin implements SwPlugin {
config.headers[item.key] = item.value;
});

const copyStatusAndStop = (response: any) => {
const copyStatus = (response: any) => {
if (response) {
if (response.status) {
span.tag(Tag.httpStatusCode(response.status));
Expand All @@ -64,20 +66,20 @@ class AxiosPlugin implements SwPlugin {
span.tag(Tag.httpStatusMsg(response.statusText));
}
}

span.stop();
};

return defaultAdapter(config).then(
ret = defaultAdapter(config).then(
(response: any) => {
copyStatusAndStop(response);
copyStatus(response);
span.stop();

return response;
},

(error: any) => {
copyStatus(error.response);
span.error(error);
copyStatusAndStop(error.response);
span.stop();

return Promise.reject(error);
}
Expand All @@ -89,6 +91,10 @@ class AxiosPlugin implements SwPlugin {

throw e;
}

span.async();

return ret;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/plugins/ExpressPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ExpressPlugin implements SwPlugin {

const carrier = ContextCarrier.from(headersMap);
const operation = (req.url || '/').replace(/\?.*/g, '');
const span = ContextManager.current.newEntrySpan(operation, carrier).start();
const span = ContextManager.current.newEntrySpan(operation, carrier, Component.HTTP_SERVER).start();

let stopped = 0;
const stopIfNotStopped = (err: Error | null) => {
Expand All @@ -59,7 +59,7 @@ class ExpressPlugin implements SwPlugin {
if (res.statusCode && res.statusCode >= 400) {
span.errored = true;
}
if (err) {
if (err instanceof Error) {
span.error(err);
}
if (res.statusMessage) {
Expand Down
162 changes: 93 additions & 69 deletions src/plugins/HttpPlugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,14 @@ class HttpPlugin implements SwPlugin {
const http = require('http');
const https = require('https');

this.interceptClientRequest(http);
this.interceptClientRequest(http, 'http');
this.interceptServerRequest(http, 'http');
this.interceptClientRequest(https);
this.interceptClientRequest(https, 'https');
this.interceptServerRequest(https, 'https');
}

private interceptClientRequest(module: any) {
const _request = module.request;
private interceptClientRequest(module: any, protocol: string) {
const _request = module.request; // BUG! this doesn't work with "import {request} from http", but haven't found an alternative yet

module.request = function () {
const url: URL | string | RequestOptions = arguments[0];
Expand All @@ -56,64 +56,98 @@ class HttpPlugin implements SwPlugin {
host: (url.host || url.hostname || 'unknown') + ':' + (url.port || 80),
pathname: url.path || '/',
};
const httpMethod = arguments[url instanceof URL || typeof url === 'string' ? 1 : 0]?.method || 'GET';
const httpURL = host + pathname;
const operation = pathname.replace(/\?.*$/g, '');

let stopped = 0; // compensating if request aborted right after creation 'close' is not emitted
const stopIfNotStopped = () => !stopped++ ? span.stop() : null; // make sure we stop only once
const span: ExitSpan = ContextManager.current.newExitSpan(operation, host).start() as ExitSpan;
const httpMethod = arguments[url instanceof URL || typeof url === 'string' ? 1 : 0]?.method || 'GET';
const httpURL = protocol + '://' + host + pathname;
const operation = pathname.replace(/\?.*$/g, '');

const span: ExitSpan = ContextManager.current.newExitSpan(operation, host, Component.HTTP).start() as ExitSpan;

try {
if (span.depth === 1) { // only set HTTP if this span is not overridden by a higher level one
span.component = Component.HTTP;
span.layer = SpanLayer.HTTP;
}
if (!span.peer) {

if (!span.peer)
span.peer = host;
}
if (!span.hasTag(Tag.httpURLKey)) { // only set if a higher level plugin with more info did not already set

if (!span.hasTag(Tag.httpURLKey)) // only set if a higher level plugin with more info did not already set
span.tag(Tag.httpURL(httpURL));
}
if (!span.hasTag(Tag.httpMethodKey)) {

if (!span.hasTag(Tag.httpMethodKey))
span.tag(Tag.httpMethod(httpMethod));
}

const req: ClientRequest = _request.apply(this, arguments);

span.inject().items.forEach((item) => {
req.setHeader(item.key, item.value);
});
span.inject().items.forEach((item) => req.setHeader(item.key, item.value));

req.on('timeout', () => span.log('Timeout', true));
req.on('abort', () => span.log('Abort', span.errored = true));
req.on('error', (err) => span.error(err));

req.on('close', stopIfNotStopped);
req.on('abort', () => (span.errored = true, stopIfNotStopped()));
req.on('error', (err) => (span.error(err), stopIfNotStopped()));
const _emit = req.emit;

req.emit = function(): any {
const event = arguments[0];

req.prependListener('response', (res) => {
span.resync();
span.tag(Tag.httpStatusCode(res.statusCode));

if (res.statusCode && res.statusCode >= 400) {
span.errored = true;
}
if (res.statusMessage) {
span.tag(Tag.httpStatusMsg(res.statusMessage));
}
try {
if (event === 'response') {
const res = arguments[1];

span.tag(Tag.httpStatusCode(res.statusCode));

if (res.statusCode && res.statusCode >= 400)
span.errored = true;

if (res.statusMessage)
span.tag(Tag.httpStatusMsg(res.statusMessage));

const _emitRes = res.emit;

res.emit = function(): any {
span.resync();

try {
return _emitRes.apply(this, arguments);

} catch (err) {
span.error(err);

throw err;

} finally {
span.async();
}
}
}

res.on('end', stopIfNotStopped);
});
return _emit.apply(this, arguments as any);

} catch (err) {
span.error(err);

throw err;

} finally {
if (event === 'close')
span.stop();
else
span.async();
}
};

span.async();

return req;

} catch (e) {
if (!stopped) { // don't want to set error if exception occurs after clean close
span.error(e);
stopIfNotStopped();
}
} catch (err) {
span.error(err);
span.stop();

throw e;
throw err;
}
};
}
Expand All @@ -129,26 +163,13 @@ class HttpPlugin implements SwPlugin {
const headers = req.rawHeaders || [];
const headersMap: { [key: string]: string } = {};

for (let i = 0; i < headers.length / 2; i += 2) {
for (let i = 0; i < headers.length / 2; i += 2)
headersMap[headers[i]] = headers[i + 1];
}

const carrier = ContextCarrier.from(headersMap);
const operation = (req.url || '/').replace(/\?.*/g, '');
const span = ContextManager.current.newEntrySpan(operation, carrier).start();

const copyStatusAndStop = () => {
span.tag(Tag.httpStatusCode(res.statusCode));
if (res.statusCode && res.statusCode >= 400) {
span.errored = true;
}
if (res.statusMessage) {
span.tag(Tag.httpStatusMsg(res.statusMessage));
}

span.stop();
};

try {
span.component = Component.HTTP_SERVER;
span.layer = SpanLayer.HTTP;
Expand All @@ -157,36 +178,39 @@ class HttpPlugin implements SwPlugin {
|| (req.connection.remoteFamily === 'IPv6'
? `[${req.connection.remoteAddress}]:${req.connection.remotePort}`
: `${req.connection.remoteAddress}:${req.connection.remotePort}`);

span.tag(Tag.httpURL(protocol + '://' + (req.headers.host || '') + req.url));
span.tag(Tag.httpMethod(req.method));

let ret = handler.call(this, req, res, ...reqArgs);
const ret = handler.call(this, req, res, ...reqArgs);

if (!ret || typeof ret.then !== 'function') { // generic Promise check
copyStatusAndStop();
let copyStatusAndStopIfNotStopped = () => {
copyStatusAndStopIfNotStopped = () => undefined;

} else {
ret = ret.then((r: any) => {
copyStatusAndStop();
span.tag(Tag.httpStatusCode(res.statusCode));

return r;
},
if (res.statusCode && res.statusCode >= 400)
span.errored = true;

(error: any) => {
span.error(error);
span.stop();
if (res.statusMessage)
span.tag(Tag.httpStatusMsg(res.statusMessage));

return Promise.reject(error);
})
}
span.stop();
};

req.on('end', copyStatusAndStopIfNotStopped); // this insead of 'close' because Node 10 doesn't emit those
res.on('abort', () => (span.errored = true, span.log('Abort', true), copyStatusAndStopIfNotStopped()));
res.on('error', (err) => (span.error(err), copyStatusAndStopIfNotStopped()));

span.async();

return ret;

} catch (e) {
span.error(e);
} catch (err) {
span.error(err);
span.stop();

throw e;
throw err;
}
}
};
Expand Down
Loading