Skip to content

Commit

Permalink
feat(cassandra-responsehook): added response hook to execute func (#1180
Browse files Browse the repository at this point in the history
)
  • Loading branch information
samimusallam authored Oct 28, 2022
1 parent d0a10eb commit 20767c4
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 10 deletions.
10 changes: 5 additions & 5 deletions plugins/node/opentelemetry-instrumentation-cassandra/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ await client.execute('select * from foo');

### Instrumentation options

| Option | Type | Default | Description |
| ------ | ---- | ------- | ----------- |
| `enhancedDatabaseReporting` | `boolean` | `false` | Whether to include database queries with spans. These can contain sensitive information when using unescaped parameters - i.e. `insert into persons (name) values ('Bob')` instead of `insert into persons (name) values (?)`. |
| `maxQueryLength` | `number` | `65536` | If `enhancedDatabaseReporting` is enabled, limits the attached query strings
to this length. |
| Option | Type | Default | Description |
|-----------------------------|--------------------------------------------------|-------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `enhancedDatabaseReporting` | `boolean` | `false` | Whether to include database queries with spans. These can contain sensitive information when using unescaped parameters - i.e. `insert into persons (name) values ('Bob')` instead of `insert into persons (name) values (?)`. |
| `responseHook` | `CassandraDriverResponseCustomAttributeFunction` | `undefined` | Hook for adding custom attributes before response is handled |
| `maxQueryLength` | `number` | `65536` | If `enhancedDatabaseReporting` is enabled, limits the attached query strings to this length. |

### Supported versions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import {
isWrapped,
safeExecuteInTheMiddle,
} from '@opentelemetry/instrumentation';
import { CassandraDriverInstrumentationConfig } from './types';
import { CassandraDriverInstrumentationConfig, ResultSet } from './types';
import {
SemanticAttributes,
DbSystemValues,
Expand All @@ -41,6 +41,8 @@ import type * as CassandraDriver from 'cassandra-driver';
const supportedVersions = ['>=4.4 <5.0'];

export class CassandraDriverInstrumentation extends InstrumentationBase {
protected override _config!: CassandraDriverInstrumentationConfig;

constructor(config: CassandraDriverInstrumentationConfig = {}) {
super('@opentelemetry/instrumentation-cassandra-driver', VERSION, config);
}
Expand Down Expand Up @@ -147,7 +149,13 @@ export class CassandraDriverInstrumentation extends InstrumentationBase {
}
);

const wrappedPromise = wrapPromise(span, execPromise);
const wrappedPromise = wrapPromise(
span,
execPromise,
(span, result) => {
plugin._callResponseHook(span, result);
}
);

return context.bind(execContext, wrappedPromise);
};
Expand Down Expand Up @@ -319,6 +327,22 @@ export class CassandraDriverInstrumentation extends InstrumentationBase {
attributes,
});
}

private _callResponseHook(span: Span, response: ResultSet) {
if (!this._config.responseHook) {
return;
}

safeExecuteInTheMiddle(
() => this._config.responseHook!(span, { response: response }),
e => {
if (e) {
this._diag.error('responseHook error', e);
}
},
true
);
}
}

function failSpan(span: Span, error: Error) {
Expand All @@ -336,10 +360,17 @@ function combineQueries(queries: Array<string | { query: string }>) {
.join('\n');
}

function wrapPromise<T>(span: Span, promise: Promise<T>): Promise<T> {
function wrapPromise<T>(
span: Span,
promise: Promise<T>,
successCallback?: (span: Span, result: T) => void
): Promise<T> {
return promise
.then(result => {
return new Promise<T>(resolve => {
if (successCallback) {
successCallback(span, result);
}
span.end();
resolve(result);
});
Expand Down
32 changes: 32 additions & 0 deletions plugins/node/opentelemetry-instrumentation-cassandra/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,32 @@
*/

import { InstrumentationConfig } from '@opentelemetry/instrumentation';
import { Span } from '@opentelemetry/api';

export interface Row {
get(columnName: string | number): any;

keys(): string[];

forEach(callback: (row: Row) => void): void;

values(): any[];

[key: string]: any;
}

// https://github.com/datastax/nodejs-driver/blob/d42176e4baa1cfc3df79699cc3b5d575c86e3cec/lib/types/index.d.ts#L323
export interface ResultSet {
rows: Row[];
}

export interface ResponseHookInfo {
response: ResultSet;
}

export interface CassandraDriverResponseCustomAttributeFunction {
(span: Span, responseInfo: ResponseHookInfo): void;
}

export interface CassandraDriverInstrumentationConfig
extends InstrumentationConfig {
Expand All @@ -29,4 +55,10 @@ export interface CassandraDriverInstrumentationConfig
* @default 65536
*/
maxQueryLength?: number;
/**
* Function for adding custom attributes before response is handled.
* @param span the current span
* @param responseInfo array of the resulting rows. This will only return the first page of results
*/
responseHook?: CassandraDriverResponseCustomAttributeFunction;
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import {
ReadableSpan,
} from '@opentelemetry/sdk-trace-base';
import {
Attributes,
context,
Span,
SpanKind,
SpanStatus,
SpanStatusCode,
Expand All @@ -38,6 +40,7 @@ import {
CassandraDriverInstrumentation,
CassandraDriverInstrumentationConfig,
} from '../src';
import { ResponseHookInfo } from '../src/types';

const memoryExporter = new InMemorySpanExporter();
const provider = new NodeTracerProvider();
Expand All @@ -53,11 +56,13 @@ function assertSpan(
span: ReadableSpan,
name: string,
query?: string,
status?: SpanStatus
status?: SpanStatus,
customAttributes?: Attributes
) {
const attributes = {
const attributes: Attributes = {
[SemanticAttributes.DB_SYSTEM]: DbSystemValues.CASSANDRA,
[SemanticAttributes.DB_USER]: 'cassandra',
...customAttributes,
};

if (query !== undefined) {
Expand All @@ -76,6 +81,13 @@ function assertSingleSpan(name: string, query?: string, status?: SpanStatus) {
assertSpan(span, name, query, status);
}

function assertAttributeInSingleSpan(name: string, attributes?: Attributes) {
const spans = memoryExporter.getFinishedSpans();
assert.strictEqual(spans.length, 1);
const [span] = spans;
assertSpan(span, name, undefined, undefined, attributes);
}

function assertErrorSpan(
name: string,
error: Error & { code?: number },
Expand Down Expand Up @@ -131,6 +143,7 @@ describe('CassandraDriverInstrumentation', () => {
}

instrumentation = new CassandraDriverInstrumentation();

instrumentation.setTracerProvider(provider);

const cassandra = require('cassandra-driver');
Expand Down Expand Up @@ -228,6 +241,60 @@ describe('CassandraDriverInstrumentation', () => {
assertSingleSpan('cassandra-driver.execute', query.substr(0, 25));
});
});

describe('responseHook', () => {
after(() => {
instrumentation.setConfig({});
});

it('adds custom attributes to span', async () => {
const responseAttributeName = 'response.attribute';
const customAttributeName = 'custom.attribute';
const customAttributeValue = 'custom attribute value';

const config: CassandraDriverInstrumentationConfig = {
responseHook: (span: Span, responseInfo: ResponseHookInfo) => {
const row = responseInfo.response.rows[0];
const responseValue = row.count.toNumber();

span.setAttribute(responseAttributeName, responseValue);
span.setAttribute(customAttributeName, customAttributeValue);
},
};

instrumentation.setConfig(config);

await client.execute(
"SELECT count(*) FROM system_schema.columns WHERE keyspace_name = 'ot' AND table_name = 'test';"
);

assertAttributeInSingleSpan('cassandra-driver.execute', {
[customAttributeName]: customAttributeValue,
[responseAttributeName]: 2,
});
});

it('throws and should not affect user flow or span creation', async () => {
const hookAttributeName = 'hook.attribute';
const hookAttributeValue = 'hook attribute value';

const config: CassandraDriverInstrumentationConfig = {
responseHook: (span: Span, responseInfo: ResponseHookInfo): void => {
span.setAttribute(hookAttributeName, hookAttributeValue);
throw new Error('error inside hook');
},
};

instrumentation.setConfig(config);

const query = 'select * from ot.test';
await client.execute(query);

assertAttributeInSingleSpan('cassandra-driver.execute', {
[hookAttributeName]: hookAttributeValue,
});
});
});
});

describe('batch', () => {
Expand Down

0 comments on commit 20767c4

Please sign in to comment.