From 20767c4fffee34bc51392894001bbb667576e91d Mon Sep 17 00:00:00 2001 From: Sami Musallam Date: Fri, 28 Oct 2022 12:58:53 +0300 Subject: [PATCH] feat(cassandra-responsehook): added response hook to execute func (#1180) --- .../README.md | 10 +-- .../src/instrumentation.ts | 37 +++++++++- .../src/types.ts | 32 +++++++++ .../test/cassandra-driver.test.ts | 71 ++++++++++++++++++- 4 files changed, 140 insertions(+), 10 deletions(-) diff --git a/plugins/node/opentelemetry-instrumentation-cassandra/README.md b/plugins/node/opentelemetry-instrumentation-cassandra/README.md index 90411b6cee..637bcf4fa2 100644 --- a/plugins/node/opentelemetry-instrumentation-cassandra/README.md +++ b/plugins/node/opentelemetry-instrumentation-cassandra/README.md @@ -39,11 +39,11 @@ await client.execute('select * from foo'); ### Instrumentation options -| Option | Type | Default | Description | -| ------ | ---- | ------- | ----------- | -| `enhancedDatabaseReporting` | `boolean` | `false` | Whether to include database queries with spans. These can contain sensitive information when using unescaped parameters - i.e. `insert into persons (name) values ('Bob')` instead of `insert into persons (name) values (?)`. | -| `maxQueryLength` | `number` | `65536` | If `enhancedDatabaseReporting` is enabled, limits the attached query strings -to this length. | +| Option | Type | Default | Description | +|-----------------------------|--------------------------------------------------|-------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `enhancedDatabaseReporting` | `boolean` | `false` | Whether to include database queries with spans. These can contain sensitive information when using unescaped parameters - i.e. `insert into persons (name) values ('Bob')` instead of `insert into persons (name) values (?)`. | +| `responseHook` | `CassandraDriverResponseCustomAttributeFunction` | `undefined` | Hook for adding custom attributes before response is handled | +| `maxQueryLength` | `number` | `65536` | If `enhancedDatabaseReporting` is enabled, limits the attached query strings to this length. | ### Supported versions diff --git a/plugins/node/opentelemetry-instrumentation-cassandra/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-cassandra/src/instrumentation.ts index 6982586ba6..68f8975ece 100644 --- a/plugins/node/opentelemetry-instrumentation-cassandra/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-cassandra/src/instrumentation.ts @@ -29,7 +29,7 @@ import { isWrapped, safeExecuteInTheMiddle, } from '@opentelemetry/instrumentation'; -import { CassandraDriverInstrumentationConfig } from './types'; +import { CassandraDriverInstrumentationConfig, ResultSet } from './types'; import { SemanticAttributes, DbSystemValues, @@ -41,6 +41,8 @@ import type * as CassandraDriver from 'cassandra-driver'; const supportedVersions = ['>=4.4 <5.0']; export class CassandraDriverInstrumentation extends InstrumentationBase { + protected override _config!: CassandraDriverInstrumentationConfig; + constructor(config: CassandraDriverInstrumentationConfig = {}) { super('@opentelemetry/instrumentation-cassandra-driver', VERSION, config); } @@ -147,7 +149,13 @@ export class CassandraDriverInstrumentation extends InstrumentationBase { } ); - const wrappedPromise = wrapPromise(span, execPromise); + const wrappedPromise = wrapPromise( + span, + execPromise, + (span, result) => { + plugin._callResponseHook(span, result); + } + ); return context.bind(execContext, wrappedPromise); }; @@ -319,6 +327,22 @@ export class CassandraDriverInstrumentation extends InstrumentationBase { attributes, }); } + + private _callResponseHook(span: Span, response: ResultSet) { + if (!this._config.responseHook) { + return; + } + + safeExecuteInTheMiddle( + () => this._config.responseHook!(span, { response: response }), + e => { + if (e) { + this._diag.error('responseHook error', e); + } + }, + true + ); + } } function failSpan(span: Span, error: Error) { @@ -336,10 +360,17 @@ function combineQueries(queries: Array) { .join('\n'); } -function wrapPromise(span: Span, promise: Promise): Promise { +function wrapPromise( + span: Span, + promise: Promise, + successCallback?: (span: Span, result: T) => void +): Promise { return promise .then(result => { return new Promise(resolve => { + if (successCallback) { + successCallback(span, result); + } span.end(); resolve(result); }); diff --git a/plugins/node/opentelemetry-instrumentation-cassandra/src/types.ts b/plugins/node/opentelemetry-instrumentation-cassandra/src/types.ts index eff54d1786..0ddabb3585 100644 --- a/plugins/node/opentelemetry-instrumentation-cassandra/src/types.ts +++ b/plugins/node/opentelemetry-instrumentation-cassandra/src/types.ts @@ -15,6 +15,32 @@ */ import { InstrumentationConfig } from '@opentelemetry/instrumentation'; +import { Span } from '@opentelemetry/api'; + +export interface Row { + get(columnName: string | number): any; + + keys(): string[]; + + forEach(callback: (row: Row) => void): void; + + values(): any[]; + + [key: string]: any; +} + +// https://github.com/datastax/nodejs-driver/blob/d42176e4baa1cfc3df79699cc3b5d575c86e3cec/lib/types/index.d.ts#L323 +export interface ResultSet { + rows: Row[]; +} + +export interface ResponseHookInfo { + response: ResultSet; +} + +export interface CassandraDriverResponseCustomAttributeFunction { + (span: Span, responseInfo: ResponseHookInfo): void; +} export interface CassandraDriverInstrumentationConfig extends InstrumentationConfig { @@ -29,4 +55,10 @@ export interface CassandraDriverInstrumentationConfig * @default 65536 */ maxQueryLength?: number; + /** + * Function for adding custom attributes before response is handled. + * @param span the current span + * @param responseInfo array of the resulting rows. This will only return the first page of results + */ + responseHook?: CassandraDriverResponseCustomAttributeFunction; } diff --git a/plugins/node/opentelemetry-instrumentation-cassandra/test/cassandra-driver.test.ts b/plugins/node/opentelemetry-instrumentation-cassandra/test/cassandra-driver.test.ts index 42b8f9082c..bd696df85f 100644 --- a/plugins/node/opentelemetry-instrumentation-cassandra/test/cassandra-driver.test.ts +++ b/plugins/node/opentelemetry-instrumentation-cassandra/test/cassandra-driver.test.ts @@ -20,7 +20,9 @@ import { ReadableSpan, } from '@opentelemetry/sdk-trace-base'; import { + Attributes, context, + Span, SpanKind, SpanStatus, SpanStatusCode, @@ -38,6 +40,7 @@ import { CassandraDriverInstrumentation, CassandraDriverInstrumentationConfig, } from '../src'; +import { ResponseHookInfo } from '../src/types'; const memoryExporter = new InMemorySpanExporter(); const provider = new NodeTracerProvider(); @@ -53,11 +56,13 @@ function assertSpan( span: ReadableSpan, name: string, query?: string, - status?: SpanStatus + status?: SpanStatus, + customAttributes?: Attributes ) { - const attributes = { + const attributes: Attributes = { [SemanticAttributes.DB_SYSTEM]: DbSystemValues.CASSANDRA, [SemanticAttributes.DB_USER]: 'cassandra', + ...customAttributes, }; if (query !== undefined) { @@ -76,6 +81,13 @@ function assertSingleSpan(name: string, query?: string, status?: SpanStatus) { assertSpan(span, name, query, status); } +function assertAttributeInSingleSpan(name: string, attributes?: Attributes) { + const spans = memoryExporter.getFinishedSpans(); + assert.strictEqual(spans.length, 1); + const [span] = spans; + assertSpan(span, name, undefined, undefined, attributes); +} + function assertErrorSpan( name: string, error: Error & { code?: number }, @@ -131,6 +143,7 @@ describe('CassandraDriverInstrumentation', () => { } instrumentation = new CassandraDriverInstrumentation(); + instrumentation.setTracerProvider(provider); const cassandra = require('cassandra-driver'); @@ -228,6 +241,60 @@ describe('CassandraDriverInstrumentation', () => { assertSingleSpan('cassandra-driver.execute', query.substr(0, 25)); }); }); + + describe('responseHook', () => { + after(() => { + instrumentation.setConfig({}); + }); + + it('adds custom attributes to span', async () => { + const responseAttributeName = 'response.attribute'; + const customAttributeName = 'custom.attribute'; + const customAttributeValue = 'custom attribute value'; + + const config: CassandraDriverInstrumentationConfig = { + responseHook: (span: Span, responseInfo: ResponseHookInfo) => { + const row = responseInfo.response.rows[0]; + const responseValue = row.count.toNumber(); + + span.setAttribute(responseAttributeName, responseValue); + span.setAttribute(customAttributeName, customAttributeValue); + }, + }; + + instrumentation.setConfig(config); + + await client.execute( + "SELECT count(*) FROM system_schema.columns WHERE keyspace_name = 'ot' AND table_name = 'test';" + ); + + assertAttributeInSingleSpan('cassandra-driver.execute', { + [customAttributeName]: customAttributeValue, + [responseAttributeName]: 2, + }); + }); + + it('throws and should not affect user flow or span creation', async () => { + const hookAttributeName = 'hook.attribute'; + const hookAttributeValue = 'hook attribute value'; + + const config: CassandraDriverInstrumentationConfig = { + responseHook: (span: Span, responseInfo: ResponseHookInfo): void => { + span.setAttribute(hookAttributeName, hookAttributeValue); + throw new Error('error inside hook'); + }, + }; + + instrumentation.setConfig(config); + + const query = 'select * from ot.test'; + await client.execute(query); + + assertAttributeInSingleSpan('cassandra-driver.execute', { + [hookAttributeName]: hookAttributeValue, + }); + }); + }); }); describe('batch', () => {