diff --git a/js/core/src/tracing/instrumentation.ts b/js/core/src/tracing/instrumentation.ts index 0641eba17..b4168cc8a 100644 --- a/js/core/src/tracing/instrumentation.ts +++ b/js/core/src/tracing/instrumentation.ts @@ -21,7 +21,8 @@ import { trace, } from '@opentelemetry/api'; import { AsyncLocalStorage } from 'node:async_hooks'; -import { SpanMetadata, TraceMetadata } from './types.js'; +import { performance } from 'node:perf_hooks'; +import { PathMetadata, SpanMetadata, TraceMetadata } from './types.js'; export const spanMetadataAls = new AsyncLocalStorage(); export const traceMetadataAls = new AsyncLocalStorage(); @@ -43,7 +44,8 @@ export async function newTrace( fn: (metadata: SpanMetadata, rootSpan: ApiSpan) => Promise ) { const traceMetadata = traceMetadataAls.getStore() || { - paths: new Set(), + paths: new Set(), + timestamp: performance.now(), }; return await traceMetadataAls.run(traceMetadata, () => runInNewSpan( @@ -99,13 +101,23 @@ export async function runInNewSpan( opts.metadata.path = decoratePathWithSubtype(opts.metadata); if (pathCount == getCurrentPathCount()) { - traceMetadataAls.getStore()?.paths?.add(opts.metadata.path); + const now = performance.now(); + const start = traceMetadataAls.getStore()?.timestamp || now; + traceMetadataAls.getStore()?.paths?.add({ + path: opts.metadata.path, + latency: now - start, + }); } return output; } catch (e) { opts.metadata.path = decoratePathWithSubtype(opts.metadata); - traceMetadataAls.getStore()?.paths?.add(opts.metadata.path); + const now = performance.now(); + const start = traceMetadataAls.getStore()?.timestamp || now; + traceMetadataAls.getStore()?.paths?.add({ + path: opts.metadata.path, + latency: now - start, + }); opts.metadata.state = 'error'; otSpan.setStatus({ code: SpanStatusCode.ERROR, diff --git a/js/core/src/tracing/types.ts b/js/core/src/tracing/types.ts index 008a2e140..19340a64a 100644 --- a/js/core/src/tracing/types.ts +++ b/js/core/src/tracing/types.ts @@ -35,8 +35,15 @@ export interface TraceStore { list(query?: TraceQuery): Promise; } +export const PathMetadataSchema = z.object({ + path: z.string(), + latency: z.number(), +}); +export type PathMetadata = z.infer; + export const TraceMetadataSchema = z.object({ - paths: z.set(z.string()).optional(), + paths: z.set(PathMetadataSchema).optional(), + timestamp: z.number(), }); export type TraceMetadata = z.infer; diff --git a/js/flow/src/telemetry.ts b/js/flow/src/telemetry.ts index 1e0492d3e..975977983 100644 --- a/js/flow/src/telemetry.ts +++ b/js/flow/src/telemetry.ts @@ -21,7 +21,11 @@ import { MetricCounter, MetricHistogram, } from '@genkit-ai/core/metrics'; -import { spanMetadataAls, traceMetadataAls } from '@genkit-ai/core/tracing'; +import { + PathMetadata, + spanMetadataAls, + traceMetadataAls, +} from '@genkit-ai/core/tracing'; import { ValueType } from '@opentelemetry/api'; import express from 'express'; @@ -40,6 +44,12 @@ const variantCounter = new MetricCounter(_N('variants'), { valueType: ValueType.INT, }); +const variantLatencies = new MetricHistogram(_N('variants/latency'), { + description: 'Latencies per flow variant.', + ValueType: ValueType.DOUBLE, + unit: 'ms', +}); + const flowLatencies = new MetricHistogram(_N('latency'), { description: 'Latencies when calling Genkit flows.', valueType: ValueType.DOUBLE, @@ -67,24 +77,29 @@ export function writeFlowSuccess(flowName: string, latencyMs: number) { flowCounter.add(1, dimensions); flowLatencies.record(latencyMs, dimensions); - const paths = traceMetadataAls.getStore()?.paths || new Set(); + const paths = traceMetadataAls.getStore()?.paths || new Set(); if (paths) { - const relevantVariants = Array.from(paths).filter((path) => - path.includes(flowName) + const relevantVariants = Array.from(paths).filter((meta) => + meta.path.includes(flowName) ); logger.logStructured(`Variants[/${flowName}]`, { flowName: flowName, - variants: relevantVariants, + variants: relevantVariants.map((variant) => variant.path), }); - relevantVariants.forEach((variant) => + relevantVariants.forEach((variant) => { variantCounter.add(1, { ...dimensions, success: 'success', - variant, - }) - ); + variant: variant.path, + }); + + variantLatencies.record(variant.latency, { + ...dimensions, + variant: variant.path, + }); + }); } } @@ -102,26 +117,32 @@ export function writeFlowFailure( flowCounter.add(1, dimensions); flowLatencies.record(latencyMs, dimensions); - const allPaths = traceMetadataAls.getStore()?.paths || new Set(); + const allPaths = + traceMetadataAls.getStore()?.paths || new Set(); if (allPaths) { const failPath = spanMetadataAls?.getStore()?.path; const relevantVariants = Array.from(allPaths).filter( - (path) => path.includes(flowName) && path !== failPath + (meta) => meta.path.includes(flowName) && meta.path !== failPath ); logger.logStructured(`Variants[/${flowName}]`, { flowName: flowName, - variants: relevantVariants, + variants: relevantVariants.map((variant) => variant.path), }); // All variants that have succeeded need to be tracked as succeeded. - relevantVariants.forEach((variant) => + relevantVariants.forEach((variant) => { variantCounter.add(1, { flowName: flowName, success: 'success', - variant: variant, - }) - ); + variant: variant.path, + }); + + variantLatencies.record(variant.latency, { + ...dimensions, + variant: variant.path, + }); + }); variantCounter.add(1, { flowName: flowName,