Skip to content

Commit

Permalink
OpenTelemetry support (#104)
Browse files Browse the repository at this point in the history
* OpenTelemetry support

* OpenTelemetry tests

Add test for OpenTelemetry error tracking

* Set error.type on OpenTelemetry spans

* Provide client version when getting tracer

Co-authored-by: Trent Mick <[email protected]>

* Represent OTel dependencies more accurately

* Fetch OTel tracer at instantiation

* Refactor _request into private method

* Collect OTel attributes before starting span

---------

Co-authored-by: Trent Mick <[email protected]>
  • Loading branch information
JoshMock and trentm authored Jun 28, 2024
1 parent 59f6829 commit 4f7b1ef
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 7 deletions.
1 change: 1 addition & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ export type {

export type {
TransportOptions,
TransportRequestMetadata,
TransportRequestParams,
TransportRequestOptions,
TransportRequestOptionsWithMeta,
Expand Down
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
"node": ">=18"
},
"devDependencies": {
"@opentelemetry/sdk-trace-base": "^1.25.0",
"@sinonjs/fake-timers": "github:sinonjs/fake-timers#0bfffc1",
"@tapjs/clock": "^1.1.24",
"@types/debug": "^4.1.7",
Expand All @@ -58,6 +59,7 @@
"workq": "^3.0.0"
},
"dependencies": {
"@opentelemetry/api": "1.x",
"debug": "^4.3.4",
"hpagent": "^1.0.0",
"ms": "^2.1.3",
Expand All @@ -68,7 +70,8 @@
"tap": {
"allow-incomplete-coverage": true,
"plugin": [
"@tapjs/clock"
"@tapjs/clock",
"@tapjs/before"
]
}
}
84 changes: 78 additions & 6 deletions src/Transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,11 @@ import {
kNdjsonContentType,
kAcceptHeader,
kRedaction,
kRetryBackoff
kRetryBackoff,
kOtelTracer
} from './symbols'
import { setTimeout as setTimeoutPromise } from 'node:timers/promises'
import opentelemetry, { Attributes, Exception, SpanKind, SpanStatusCode, Span, Tracer } from '@opentelemetry/api'

const { version: clientVersion } = require('../package.json') // eslint-disable-line
const debug = Debug('elasticsearch')
Expand Down Expand Up @@ -119,12 +121,18 @@ export interface TransportOptions {
retryBackoff?: (min: number, max: number, attempt: number) => number
}

export interface TransportRequestMetadata {
name: string
pathParts?: Record<string, any>
}

export interface TransportRequestParams {
method: string
path: string
body?: RequestBody
bulkBody?: RequestNDBody
querystring?: Record<string, any> | string
meta?: TransportRequestMetadata
}

export interface TransportRequestOptions {
Expand Down Expand Up @@ -221,6 +229,7 @@ export default class Transport {
[kAcceptHeader]: string
[kRedaction]: RedactionOptions
[kRetryBackoff]: (min: number, max: number, attempt: number) => number
[kOtelTracer]: Tracer

static sniffReasons = {
SNIFF_ON_START: 'sniff-on-start',
Expand Down Expand Up @@ -283,6 +292,7 @@ export default class Transport {
this[kAcceptHeader] = opts.vendoredHeaders?.accept ?? 'application/json, text/plain'
this[kRedaction] = opts.redaction ?? { type: 'replace', additionalKeys: [] }
this[kRetryBackoff] = opts.retryBackoff ?? retryBackoff
this[kOtelTracer] = opentelemetry.trace.getTracer('@elastic/transport', clientVersion)

if (opts.sniffOnStart === true) {
this.sniff({
Expand Down Expand Up @@ -327,10 +337,10 @@ export default class Transport {
return this[kDiagnostic]
}

async request<TResponse = unknown> (params: TransportRequestParams, options?: TransportRequestOptionsWithOutMeta): Promise<TResponse>
async request<TResponse = unknown, TContext = any> (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta): Promise<TransportResult<TResponse, TContext>>
async request<TResponse = unknown> (params: TransportRequestParams, options?: TransportRequestOptions): Promise<TResponse>
async request (params: TransportRequestParams, options: TransportRequestOptions = {}): Promise<any> {
private async _request<TResponse = unknown> (params: TransportRequestParams, options?: TransportRequestOptionsWithOutMeta, otelSpan?: Span): Promise<TResponse>
private async _request<TResponse = unknown, TContext = any> (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta, otelSpan?: Span): Promise<TransportResult<TResponse, TContext>>
private async _request<TResponse = unknown> (params: TransportRequestParams, options?: TransportRequestOptions, otelSpan?: Span): Promise<TResponse>
private async _request (params: TransportRequestParams, options: TransportRequestOptions = {}, otelSpan?: Span): Promise<any> {
const connectionParams: ConnectionRequestParams = {
method: params.method,
path: params.path
Expand Down Expand Up @@ -370,7 +380,6 @@ export default class Transport {
if (this.headers?.warning == null) {
return null
}

const { warning } = this.headers
// if multiple HTTP headers have the same name, Undici represents them as an array
const warnings: string[] = Array.isArray(warning) ? warning : [warning]
Expand Down Expand Up @@ -489,6 +498,22 @@ export default class Transport {
throw new NoLivingConnectionsError('There are no living connections', result, errorOptions)
}

// generate required OpenTelemetry attributes from the request URL
const requestUrl = meta.connection.url
otelSpan?.setAttributes({
'url.full': requestUrl.toString(),
'server.address': requestUrl.hostname
})
if (requestUrl.port === '') {
if (requestUrl.protocol === 'https:') {
otelSpan?.setAttribute('server.port', 443)
} else if (requestUrl.protocol === 'http:') {
otelSpan?.setAttribute('server.port', 80)
}
} else if (requestUrl.port !== '9200') {
otelSpan?.setAttribute('server.port', parseInt(requestUrl.port, 10))
}

this[kDiagnostic].emit('request', null, result)

// perform the actual http request
Expand All @@ -505,6 +530,14 @@ export default class Transport {
result.statusCode = statusCode
result.headers = headers

if (headers['x-found-handling-cluster'] != null) {
otelSpan?.setAttribute('db.elasticsearch.cluster.name', headers['x-found-handling-cluster'])
}

if (headers['x-found-handling-instance'] != null) {
otelSpan?.setAttribute('db.elasticsearch.node.name', headers['x-found-handling-instance'])
}

if (this[kProductCheck] != null && headers['x-elastic-product'] !== this[kProductCheck] && statusCode >= 200 && statusCode < 300) {
/* eslint-disable @typescript-eslint/prefer-ts-expect-error */
// @ts-ignore
Expand Down Expand Up @@ -647,6 +680,45 @@ export default class Transport {
return returnMeta ? result : result.body
}

async request<TResponse = unknown> (params: TransportRequestParams, options?: TransportRequestOptionsWithOutMeta): Promise<TResponse>
async request<TResponse = unknown, TContext = any> (params: TransportRequestParams, options?: TransportRequestOptionsWithMeta): Promise<TransportResult<TResponse, TContext>>
async request<TResponse = unknown> (params: TransportRequestParams, options?: TransportRequestOptions): Promise<TResponse>
async request (params: TransportRequestParams, options: TransportRequestOptions = {}): Promise<any> {
// wrap in OpenTelemetry span
if (params.meta?.name != null) {
// gather OpenTelemetry attributes
const attributes: Attributes = {
'db.system': 'elasticsearch',
'http.request.method': params.method,
'db.operation.name': params.meta?.name
}
if (params.meta?.pathParts != null) {
for (const key of Object.keys(params.meta.pathParts)) {
attributes[`db.elasticsearch.path_parts.${key}`] = params.meta.pathParts[key]
}
}

return await this[kOtelTracer].startActiveSpan(params.meta.name, { attributes, kind: SpanKind.CLIENT }, async (otelSpan: Span) => {
let response
try {
response = await this._request(params, options, otelSpan)
} catch (err: any) {
otelSpan.recordException(err as Exception)
otelSpan.setStatus({ code: SpanStatusCode.ERROR })
otelSpan.setAttribute('error.type', err.name ?? 'Error')

throw err
} finally {
otelSpan.end()
}

return response
})
} else {
return await this._request(params, options)
}
}

getConnection (opts: GetConnectionOptions): Connection | null {
const now = Date.now()
if (this[kSniffEnabled] && now > this[kNextSniff]) {
Expand Down
1 change: 1 addition & 0 deletions src/symbols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,4 @@ export const kNdjsonContentType = Symbol('ndjson content type')
export const kAcceptHeader = Symbol('accept header')
export const kRedaction = Symbol('redaction')
export const kRetryBackoff = Symbol('retry backoff')
export const kOtelTracer = Symbol('opentelemetry tracer')
121 changes: 121 additions & 0 deletions test/unit/transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import os from 'os'
import { Readable } from 'stream'
import intoStream from 'into-stream'
import * as http from 'http'
import { BasicTracerProvider, InMemorySpanExporter, SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base'
import {
Transport,
Serializer,
Expand Down Expand Up @@ -2269,3 +2270,123 @@ test('redaction does not get leaked to original object', async t => {
}
server.stop()
})

test('OpenTelemetry', t => {
let processor: SimpleSpanProcessor
let provider: BasicTracerProvider
let exporter: InMemorySpanExporter

t.before(() => {
exporter = new InMemorySpanExporter()
processor = new SimpleSpanProcessor(exporter)
provider = new BasicTracerProvider()
provider.addSpanProcessor(processor)
provider.register()
})

t.afterEach(async () => {
await provider.forceFlush()
exporter.reset()
})

t.after(async () => {
await provider.shutdown()
})

t.test('basic details', async t => {
t.plan(2)

function handler (req: http.IncomingMessage, res: http.ServerResponse) {
res.end('ok')
}

const [{ port }, server] = await buildServer(handler)
const pool = new WeightedConnectionPool({ Connection: UndiciConnection })
pool.addConnection(`http://localhost:${port}`)
const transport = new Transport({ connectionPool: pool })

await transport.request({
path: '/hello',
method: 'GET',
meta: { name: 'hello' },
})

const spans = exporter.getFinishedSpans()

t.same(spans[0].attributes, {
'db.system': 'elasticsearch',
'http.request.method': 'GET',
'db.operation.name': 'hello',
'url.full': `http://localhost:${port}/`,
'server.address': 'localhost',
'server.port': port,
})
t.equal(spans[0].status.code, 0)

server.stop()
})

t.test('cloud cluster and instance details', async t => {
t.plan(2)

function handler (_req: http.IncomingMessage, res: http.ServerResponse) {
res.setHeader('x-found-handling-cluster', 'foobar')
res.setHeader('x-found-handling-instance', 'instance-1')
res.end('ok')
}

const [{ port }, server] = await buildServer(handler)
const pool = new WeightedConnectionPool({ Connection: UndiciConnection })
pool.addConnection(`http://localhost:${port}`)
const transport = new Transport({ connectionPool: pool })

await transport.request({
path: '/hello2',
method: 'GET',
meta: { name: 'hello.2' },
})

const spans = exporter.getFinishedSpans()
t.same(spans[0].attributes, {
'db.system': 'elasticsearch',
'http.request.method': 'GET',
'db.operation.name': 'hello.2',
'url.full': `http://localhost:${port}/`,
'server.address': 'localhost',
'server.port': port,
'db.elasticsearch.cluster.name': 'foobar',
'db.elasticsearch.node.name': 'instance-1',
})
t.equal(spans[0].status.code, 0)

server.stop()
})

t.test('span records error state', async t => {
t.plan(3)

const pool = new WeightedConnectionPool({ Connection: MockConnectionTimeout })
pool.addConnection('http://localhost:9200')

const transport = new Transport({
connectionPool: pool,
})

try {
await transport.request({
path: '/hello2',
method: 'GET',
meta: { name: 'hello.2' },
})
} catch (err: any) {
t.ok(err instanceof Error)
}

const spans = exporter.getFinishedSpans()

t.equal(spans[0].attributes['error.type'], 'TimeoutError')
t.not(spans[0].status.code, 0)
})

t.end()
})

0 comments on commit 4f7b1ef

Please sign in to comment.