diff --git a/js/core/src/tracing/instrumentation.ts b/js/core/src/tracing/instrumentation.ts index db091b4af..b1ff3b49b 100644 --- a/js/core/src/tracing/instrumentation.ts +++ b/js/core/src/tracing/instrumentation.ts @@ -108,6 +108,7 @@ export async function runInNewSpan( const start = traceMetadataAls.getStore()?.timestamp || now; traceMetadataAls.getStore()?.paths?.add({ path: opts.metadata.path, + status: 'success', latency: now - start, }); } @@ -119,6 +120,8 @@ export async function runInNewSpan( const start = traceMetadataAls.getStore()?.timestamp || now; traceMetadataAls.getStore()?.paths?.add({ path: opts.metadata.path, + status: 'failure', + error: (e as any).name, latency: now - start, }); opts.metadata.state = 'error'; diff --git a/js/core/src/tracing/types.ts b/js/core/src/tracing/types.ts index 1025310a3..c18791abe 100644 --- a/js/core/src/tracing/types.ts +++ b/js/core/src/tracing/types.ts @@ -37,6 +37,8 @@ export interface TraceStore { export const PathMetadataSchema = z.object({ path: z.string(), + status: z.string(), + error: z.string().optional(), latency: z.number(), }); export type PathMetadata = z.infer; diff --git a/js/flow/src/telemetry.ts b/js/flow/src/telemetry.ts index 2a37df90a..39e4ec879 100644 --- a/js/flow/src/telemetry.ts +++ b/js/flow/src/telemetry.ts @@ -58,11 +58,15 @@ const flowLatencies = new MetricHistogram(_N('latency'), { }); export function recordError(err: any) { - const qualifiedPath = spanMetadataAls?.getStore()?.path || ''; - const path = toDisplayPath(qualifiedPath); - logger.logStructuredError(`Error[${path}, ${err.name}]`, { - path, - qualifiedPath, + const paths = traceMetadataAls?.getStore()?.paths || new Set(); + const failedPath = + Array.from(paths).find((p) => p.status === 'failure')?.path || + spanMetadataAls?.getStore()?.path || + ''; + const displayPath = toDisplayPath(failedPath); + logger.logStructuredError(`Error[${displayPath}, ${err.name}]`, { + path: displayPath, + qualifiedPath: failedPath, name: err.name, message: err.message, stack: err.stack, @@ -81,35 +85,7 @@ export function writeFlowSuccess(flowName: string, latencyMs: number) { flowCounter.add(1, dimensions); flowLatencies.record(latencyMs, dimensions); - const paths = traceMetadataAls.getStore()?.paths || new Set(); - if (paths) { - const pathDimensions = { - flowName: flowName, - status: 'success', - source: 'ts', - sourceVersion: GENKIT_VERSION, - }; - const relevantPaths = Array.from(paths).filter((meta) => - meta.path.includes(flowName) - ); - - logger.logStructured(`Paths[${flowName}]`, { - flowName: flowName, - paths: relevantPaths.map((p) => p.path), - }); - - relevantPaths.forEach((p) => { - pathCounter.add(1, { - ...pathDimensions, - path: p.path, - }); - - pathLatencies.record(p.latency, { - ...pathDimensions, - path: p.path, - }); - }); - } + writePathMetrics(flowName, latencyMs); } export function writeFlowFailure( @@ -127,55 +103,7 @@ export function writeFlowFailure( flowCounter.add(1, dimensions); flowLatencies.record(latencyMs, dimensions); - const allQualifiedPaths = - traceMetadataAls.getStore()?.paths || new Set(); - if (allQualifiedPaths) { - const qualifiedFailPath = spanMetadataAls?.getStore()?.path || ''; - const failPath = toDisplayPath(qualifiedFailPath); - const relevantPaths = Array.from(allQualifiedPaths).filter( - (meta) => meta.path.includes(flowName) && meta.path !== qualifiedFailPath - ); - - logger.logStructured(`Paths[${flowName}]`, { - flowName: flowName, - paths: relevantPaths.map((p) => toDisplayPath(p.path)), - }); - - const pathDimensions = { - flowName: flowName, - source: 'ts', - sourceVersion: GENKIT_VERSION, - }; - // All paths that have succeeded need to be tracked as succeeded. - relevantPaths.forEach((p) => { - const path = toDisplayPath(p.path); - pathCounter.add(1, { - ...pathDimensions, - status: 'success', - path: p.path, - }); - - pathLatencies.record(p.latency, { - ...pathDimensions, - status: 'success', - path: p.path, - }); - }); - - pathCounter.add(1, { - ...pathDimensions, - status: 'failure', - error: err.name, - path: qualifiedFailPath, - }); - - pathLatencies.record(latencyMs, { - ...pathDimensions, - status: 'failure', - error: err.name, - path: qualifiedFailPath, - }); - } + writePathMetrics(flowName, latencyMs, err); } export function logRequest(flowName: string, req: express.Request) { @@ -211,3 +139,43 @@ export function logResponse(flowName: string, respCode: number, respBody: any) { sourceVersion: GENKIT_VERSION, }); } + +/** Writes all path-level metrics stored in the current flow execution. */ +function writePathMetrics(flowName: string, latencyMs: number, err?: any) { + const paths = traceMetadataAls.getStore()?.paths || new Set(); + const flowPaths = Array.from(paths).filter((meta) => + meta.path.includes(flowName) + ); + if (flowPaths) { + logger.logStructured(`Paths[${flowName}]`, { + flowName: flowName, + paths: flowPaths.map((p) => toDisplayPath(p.path)), + }); + + flowPaths.forEach((p) => writePathMetric(flowName, p)); + // If we're writing a failure, but none of the stored paths have failed, + // this means the root flow threw the error. + if (err && !flowPaths.some((p) => p.status === 'failure')) { + writePathMetric(flowName, { + status: 'failure', + path: spanMetadataAls?.getStore()?.path || '', + error: err, + latency: latencyMs, + }); + } + } +} + +/** Writes metrics for a single PathMetadata */ +function writePathMetric(flowName: string, meta: PathMetadata) { + const pathDimensions = { + flowName: flowName, + status: meta.status, + error: meta.error, + path: meta.path, + source: 'ts', + sourceVersion: GENKIT_VERSION, + }; + pathCounter.add(1, pathDimensions); + pathLatencies.record(meta.latency, pathDimensions); +} diff --git a/js/plugins/google-cloud/tests/metrics_test.ts b/js/plugins/google-cloud/tests/metrics_test.ts index 316de55ba..f278e269a 100644 --- a/js/plugins/google-cloud/tests/metrics_test.ts +++ b/js/plugins/google-cloud/tests/metrics_test.ts @@ -17,18 +17,18 @@ import { generate } from '@genkit-ai/ai'; import { defineModel } from '@genkit-ai/ai/model'; import { + configureGenkit, + defineAction, FlowState, FlowStateQuery, FlowStateQueryResponse, FlowStateStore, - configureGenkit, - defineAction, } from '@genkit-ai/core'; import { registerFlowStateStore } from '@genkit-ai/core/registry'; import { defineFlow, run, runAction, runFlow } from '@genkit-ai/flow'; import { - GcpOpenTelemetry, __getMetricExporterForTesting, + GcpOpenTelemetry, googleCloud, } from '@genkit-ai/google-cloud'; import { @@ -440,6 +440,43 @@ describe('GoogleCloudMetrics', () => { ]); }); + it('writes flow path failure in sub-action metrics', async () => { + const flow = createFlow('testFlow', async () => { + const subPath1 = await run('sub-action-1', async () => { + return 'done'; + }); + const subPath2 = await run('sub-action-2', async () => { + return Promise.reject(new Error('failed')); + }); + return 'done'; + }); + + assert.rejects(async () => { + await runFlow(flow); + }); + + const reqPoints = await getCounterDataPoints('genkit/flow/path/requests'); + const reqStatuses = reqPoints.map((p) => [ + p.attributes.path, + p.attributes.status, + ]); + assert.deepEqual(reqStatuses, [ + ['/{testFlow,t:flow}/{sub-action-1,t:flowStep}', 'success'], + ['/{testFlow,t:flow}/{sub-action-2,t:flowStep}', 'failure'], + ]); + const latencyPoints = await getHistogramDataPoints( + 'genkit/flow/path/latency' + ); + const latencyStatuses = latencyPoints.map((p) => [ + p.attributes.path, + p.attributes.status, + ]); + assert.deepEqual(latencyStatuses, [ + ['/{testFlow,t:flow}/{sub-action-1,t:flowStep}', 'success'], + ['/{testFlow,t:flow}/{sub-action-2,t:flowStep}', 'failure'], + ]); + }); + describe('Configuration', () => { it('should export only traces', async () => { const telemetry = new GcpOpenTelemetry({