Skip to content

Commit

Permalink
Update telemetry to properly attribute failures to correct path. (#563)
Browse files Browse the repository at this point in the history
Previously it was always being attributed to the root flow path.
  • Loading branch information
bryanatkinson authored Jul 10, 2024
1 parent 8721b4a commit 4182c95
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 86 deletions.
3 changes: 3 additions & 0 deletions js/core/src/tracing/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ export async function runInNewSpan<T>(
const start = traceMetadataAls.getStore()?.timestamp || now;
traceMetadataAls.getStore()?.paths?.add({
path: opts.metadata.path,
status: 'success',
latency: now - start,
});
}
Expand All @@ -119,6 +120,8 @@ export async function runInNewSpan<T>(
const start = traceMetadataAls.getStore()?.timestamp || now;
traceMetadataAls.getStore()?.paths?.add({
path: opts.metadata.path,
status: 'failure',
error: (e as any).name,
latency: now - start,
});
opts.metadata.state = 'error';
Expand Down
2 changes: 2 additions & 0 deletions js/core/src/tracing/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ export interface TraceStore {

export const PathMetadataSchema = z.object({
path: z.string(),
status: z.string(),
error: z.string().optional(),
latency: z.number(),
});
export type PathMetadata = z.infer<typeof PathMetadataSchema>;
Expand Down
134 changes: 51 additions & 83 deletions js/flow/src/telemetry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ const flowLatencies = new MetricHistogram(_N('latency'), {
});

export function recordError(err: any) {
const qualifiedPath = spanMetadataAls?.getStore()?.path || '';
const path = toDisplayPath(qualifiedPath);
logger.logStructuredError(`Error[${path}, ${err.name}]`, {
path,
qualifiedPath,
const paths = traceMetadataAls?.getStore()?.paths || new Set<PathMetadata>();
const failedPath =
Array.from(paths).find((p) => p.status === 'failure')?.path ||
spanMetadataAls?.getStore()?.path ||
'';
const displayPath = toDisplayPath(failedPath);
logger.logStructuredError(`Error[${displayPath}, ${err.name}]`, {
path: displayPath,
qualifiedPath: failedPath,
name: err.name,
message: err.message,
stack: err.stack,
Expand All @@ -81,35 +85,7 @@ export function writeFlowSuccess(flowName: string, latencyMs: number) {
flowCounter.add(1, dimensions);
flowLatencies.record(latencyMs, dimensions);

const paths = traceMetadataAls.getStore()?.paths || new Set<PathMetadata>();
if (paths) {
const pathDimensions = {
flowName: flowName,
status: 'success',
source: 'ts',
sourceVersion: GENKIT_VERSION,
};
const relevantPaths = Array.from(paths).filter((meta) =>
meta.path.includes(flowName)
);

logger.logStructured(`Paths[${flowName}]`, {
flowName: flowName,
paths: relevantPaths.map((p) => p.path),
});

relevantPaths.forEach((p) => {
pathCounter.add(1, {
...pathDimensions,
path: p.path,
});

pathLatencies.record(p.latency, {
...pathDimensions,
path: p.path,
});
});
}
writePathMetrics(flowName, latencyMs);
}

export function writeFlowFailure(
Expand All @@ -127,55 +103,7 @@ export function writeFlowFailure(
flowCounter.add(1, dimensions);
flowLatencies.record(latencyMs, dimensions);

const allQualifiedPaths =
traceMetadataAls.getStore()?.paths || new Set<PathMetadata>();
if (allQualifiedPaths) {
const qualifiedFailPath = spanMetadataAls?.getStore()?.path || '';
const failPath = toDisplayPath(qualifiedFailPath);
const relevantPaths = Array.from(allQualifiedPaths).filter(
(meta) => meta.path.includes(flowName) && meta.path !== qualifiedFailPath
);

logger.logStructured(`Paths[${flowName}]`, {
flowName: flowName,
paths: relevantPaths.map((p) => toDisplayPath(p.path)),
});

const pathDimensions = {
flowName: flowName,
source: 'ts',
sourceVersion: GENKIT_VERSION,
};
// All paths that have succeeded need to be tracked as succeeded.
relevantPaths.forEach((p) => {
const path = toDisplayPath(p.path);
pathCounter.add(1, {
...pathDimensions,
status: 'success',
path: p.path,
});

pathLatencies.record(p.latency, {
...pathDimensions,
status: 'success',
path: p.path,
});
});

pathCounter.add(1, {
...pathDimensions,
status: 'failure',
error: err.name,
path: qualifiedFailPath,
});

pathLatencies.record(latencyMs, {
...pathDimensions,
status: 'failure',
error: err.name,
path: qualifiedFailPath,
});
}
writePathMetrics(flowName, latencyMs, err);
}

export function logRequest(flowName: string, req: express.Request) {
Expand Down Expand Up @@ -211,3 +139,43 @@ export function logResponse(flowName: string, respCode: number, respBody: any) {
sourceVersion: GENKIT_VERSION,
});
}

/** Writes all path-level metrics stored in the current flow execution. */
function writePathMetrics(flowName: string, latencyMs: number, err?: any) {
const paths = traceMetadataAls.getStore()?.paths || new Set<PathMetadata>();
const flowPaths = Array.from(paths).filter((meta) =>
meta.path.includes(flowName)
);
if (flowPaths) {
logger.logStructured(`Paths[${flowName}]`, {
flowName: flowName,
paths: flowPaths.map((p) => toDisplayPath(p.path)),
});

flowPaths.forEach((p) => writePathMetric(flowName, p));
// If we're writing a failure, but none of the stored paths have failed,
// this means the root flow threw the error.
if (err && !flowPaths.some((p) => p.status === 'failure')) {
writePathMetric(flowName, {
status: 'failure',
path: spanMetadataAls?.getStore()?.path || '',
error: err,
latency: latencyMs,
});
}
}
}

/** Writes metrics for a single PathMetadata */
function writePathMetric(flowName: string, meta: PathMetadata) {
const pathDimensions = {
flowName: flowName,
status: meta.status,
error: meta.error,
path: meta.path,
source: 'ts',
sourceVersion: GENKIT_VERSION,
};
pathCounter.add(1, pathDimensions);
pathLatencies.record(meta.latency, pathDimensions);
}
43 changes: 40 additions & 3 deletions js/plugins/google-cloud/tests/metrics_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@
import { generate } from '@genkit-ai/ai';
import { defineModel } from '@genkit-ai/ai/model';
import {
configureGenkit,
defineAction,
FlowState,
FlowStateQuery,
FlowStateQueryResponse,
FlowStateStore,
configureGenkit,
defineAction,
} from '@genkit-ai/core';
import { registerFlowStateStore } from '@genkit-ai/core/registry';
import { defineFlow, run, runAction, runFlow } from '@genkit-ai/flow';
import {
GcpOpenTelemetry,
__getMetricExporterForTesting,
GcpOpenTelemetry,
googleCloud,
} from '@genkit-ai/google-cloud';
import {
Expand Down Expand Up @@ -440,6 +440,43 @@ describe('GoogleCloudMetrics', () => {
]);
});

it('writes flow path failure in sub-action metrics', async () => {
const flow = createFlow('testFlow', async () => {
const subPath1 = await run('sub-action-1', async () => {
return 'done';
});
const subPath2 = await run('sub-action-2', async () => {
return Promise.reject(new Error('failed'));
});
return 'done';
});

assert.rejects(async () => {
await runFlow(flow);
});

const reqPoints = await getCounterDataPoints('genkit/flow/path/requests');
const reqStatuses = reqPoints.map((p) => [
p.attributes.path,
p.attributes.status,
]);
assert.deepEqual(reqStatuses, [
['/{testFlow,t:flow}/{sub-action-1,t:flowStep}', 'success'],
['/{testFlow,t:flow}/{sub-action-2,t:flowStep}', 'failure'],
]);
const latencyPoints = await getHistogramDataPoints(
'genkit/flow/path/latency'
);
const latencyStatuses = latencyPoints.map((p) => [
p.attributes.path,
p.attributes.status,
]);
assert.deepEqual(latencyStatuses, [
['/{testFlow,t:flow}/{sub-action-1,t:flowStep}', 'success'],
['/{testFlow,t:flow}/{sub-action-2,t:flowStep}', 'failure'],
]);
});

describe('Configuration', () => {
it('should export only traces', async () => {
const telemetry = new GcpOpenTelemetry({
Expand Down

0 comments on commit 4182c95

Please sign in to comment.