From c99754fe4b6ed985565fd1049e3a5aa019deb368 Mon Sep 17 00:00:00 2001 From: MaesterChestnut <40321652+MaesterChestnut@users.noreply.github.com> Date: Wed, 10 Apr 2024 09:46:53 -0400 Subject: [PATCH] Refactor Eval for better tracing and error reporting (#3) Co-authored-by: Chris Chestnut Co-authored-by: Cleo Schneider Co-authored-by: Samuel Bushi --- genkit-tools/src/commands/eval-flow-run.ts | 31 +++---- genkit-tools/src/commands/eval-run.ts | 32 +++---- genkit-tools/src/eval/parser.ts | 8 +- genkit-tools/src/eval/types.ts | 2 + genkit-tools/src/types/evaluators.ts | 18 ++-- genkit-tools/tests/eval/parser_test.ts | 4 +- js/ai/src/evaluator.ts | 101 +++++++++++++++++---- js/core/package.json | 1 + js/core/src/index.ts | 3 +- js/core/src/tracing/localFileTraceStore.ts | 50 +++++++--- js/pnpm-lock.yaml | 9 ++ 11 files changed, 175 insertions(+), 84 deletions(-) diff --git a/genkit-tools/src/commands/eval-flow-run.ts b/genkit-tools/src/commands/eval-flow-run.ts index 6fd0d3341..302ab09db 100644 --- a/genkit-tools/src/commands/eval-flow-run.ts +++ b/genkit-tools/src/commands/eval-flow-run.ts @@ -123,25 +123,22 @@ export const evalFlowRun = new Command('eval:flow') return; } - const datasetToEval = await fetchDataSet(runner, flowName, states); + const evalDataset = await fetchDataSet(runner, flowName, states); const scores: Record = {}; - await Promise.all( - filteredEvaluatorActions.map(async (action) => { - const name = evaluatorName(action); - logger.info(`Running evaluator '${name}'...`); - const response = await runner.runAction({ - key: name, - input: { - dataset: datasetToEval, - auth: options.auth ? JSON.parse(options.auth) : undefined, - }, - }); - scores[name] = response.result; - }) - ); - - const scoredResults = enrichResultsWithScoring(scores, datasetToEval); + for (const action of filteredEvaluatorActions) { + const name = evaluatorName(action); + logger.info(`Running evaluator '${name}'...`); + const response = await runner.runAction({ + key: name, + input: { + dataset: evalDataset, + auth: options.auth ? JSON.parse(options.auth) : undefined, + }, + }); + scores[name] = response.result; + } + const scoredResults = enrichResultsWithScoring(scores, evalDataset); if (options.output) { logger.info(`Writing results to '${options.output}'...`); diff --git a/genkit-tools/src/commands/eval-run.ts b/genkit-tools/src/commands/eval-run.ts index bf53a578f..f01532b9e 100644 --- a/genkit-tools/src/commands/eval-run.ts +++ b/genkit-tools/src/commands/eval-run.ts @@ -22,7 +22,7 @@ import { enrichResultsWithScoring, getLocalFileEvalStore, } from '../eval'; -import { EvaluatorResponse } from '../types/evaluators'; +import { EvalResponses } from '../types/evaluators'; import { confirmLlmUse, evaluatorName, isEvaluator } from '../utils/eval'; import { logger } from '../utils/logger'; import { runInRunnerThenStop } from '../utils/runner-utils'; @@ -52,7 +52,7 @@ export const evalRun = new Command('eval:run') const evalStore = getLocalFileEvalStore(); logger.debug(`Loading data from '${dataset}'...`); - const datasetToEval: EvalInput[] = JSON.parse( + const evalDataset: EvalInput[] = JSON.parse( (await readFile(dataset)).toString('utf-8') ).map((testCase: any) => ({ ...testCase, @@ -97,22 +97,20 @@ export const evalRun = new Command('eval:run') return; } - const scores: Record = {}; - await Promise.all( - filteredEvaluatorActions.map(async (action) => { - const name = evaluatorName(action); - logger.info(`Running evaluator '${name}'...`); - const response = await runner.runAction({ - key: name, - input: { - dataset: datasetToEval, - }, - }); - scores[name] = response.result as EvaluatorResponse; - }) - ); + const scores: Record = {}; + for (const action of filteredEvaluatorActions) { + const name = evaluatorName(action); + logger.info(`Running evaluator '${name}'...`); + const response = await runner.runAction({ + key: name, + input: { + evalDataset, + }, + }); + scores[name] = response.result as EvalResponses; + } - const scoredResults = enrichResultsWithScoring(scores, datasetToEval); + const scoredResults = enrichResultsWithScoring(scores, evalDataset); if (options.output) { logger.info(`Writing results to '${options.output}'...`); diff --git a/genkit-tools/src/eval/parser.ts b/genkit-tools/src/eval/parser.ts index cf0cbbc0e..db5897685 100644 --- a/genkit-tools/src/eval/parser.ts +++ b/genkit-tools/src/eval/parser.ts @@ -15,19 +15,19 @@ */ import { EvalInput, EvalMetric, EvalResult } from '../eval'; -import { EvaluatorResponse } from '../types/evaluators'; +import { EvalResponse, EvalResponses } from '../types/evaluators'; /** * Combines EvalInput with the generated scores to create a storable EvalResult. */ export function enrichResultsWithScoring( - scores: Record, + scores: Record, evalDataset: EvalInput[] ): EvalResult[] { const scoreMap: Record = {}; Object.keys(scores).forEach((evaluator) => { const evaluatorResponse = scores[evaluator]; - evaluatorResponse.forEach((scoredSample) => { + evaluatorResponse.forEach((scoredSample: EvalResponse) => { if (!scoredSample.testCaseId) { throw new Error('testCaseId expected to be present'); } @@ -40,6 +40,8 @@ export function enrichResultsWithScoring( score: score.score, rationale: score.details?.reasoning, error: score.error, + traceId: scoredSample.traceId, + spanId: scoredSample.spanId, }); }); }); diff --git a/genkit-tools/src/eval/types.ts b/genkit-tools/src/eval/types.ts index 1416ca7f4..18705ae91 100644 --- a/genkit-tools/src/eval/types.ts +++ b/genkit-tools/src/eval/types.ts @@ -47,6 +47,8 @@ export const EvalMetricSchema = z.object({ score: z.union([z.number(), z.string(), z.boolean()]).optional(), rationale: z.string().optional(), error: z.string().optional(), + traceId: z.string().optional(), + spanId: z.string().optional(), }); export type EvalMetric = z.infer; diff --git a/genkit-tools/src/types/evaluators.ts b/genkit-tools/src/types/evaluators.ts index accb4a02f..540b6bca4 100644 --- a/genkit-tools/src/types/evaluators.ts +++ b/genkit-tools/src/types/evaluators.ts @@ -28,12 +28,14 @@ export const ScoreSchema = z.object({ .optional(), }); -export const EvaluatorResponseSchema = z.array( - z.object({ - sampleIndex: z.number(), - testCaseId: z.string().optional(), - evaluation: ScoreSchema, - }) -); +export const EvalResponseSchema = z.object({ + sampleIndex: z.number(), + testCaseId: z.string().optional(), + traceId: z.string().optional(), + spanId: z.string().optional(), + evaluation: ScoreSchema, +}); +export type EvalResponse = z.infer; -export type EvaluatorResponse = z.infer; +export const EvalResponsesSchema = z.array(EvalResponseSchema); +export type EvalResponses = z.infer; diff --git a/genkit-tools/tests/eval/parser_test.ts b/genkit-tools/tests/eval/parser_test.ts index 87897a187..c12a76598 100644 --- a/genkit-tools/tests/eval/parser_test.ts +++ b/genkit-tools/tests/eval/parser_test.ts @@ -20,7 +20,7 @@ import { EvalResult, enrichResultsWithScoring, } from '../../src/eval'; -import { EvaluatorResponse } from '../../src/types/evaluators'; +import { EvalResponses } from '../../src/types/evaluators'; describe('parser', () => { const evalRunResults: EvalResult[] = [ @@ -50,7 +50,7 @@ describe('parser', () => { }, ]; - const evaluatorOutput: Record = { + const evaluatorOutput: Record = { '/evaluator/ragas/faithfulness': [ { testCaseId: 'case1', diff --git a/js/ai/src/evaluator.ts b/js/ai/src/evaluator.ts index fe3ff5084..842e67548 100644 --- a/js/ai/src/evaluator.ts +++ b/js/ai/src/evaluator.ts @@ -15,10 +15,18 @@ */ import { action, Action } from '@genkit-ai/core'; +import { logger } from '@genkit-ai/core/logging'; import { lookupAction, registerAction } from '@genkit-ai/core/registry'; -import { setCustomMetadataAttributes } from '@genkit-ai/core/tracing'; +import { + runInNewSpan, + setCustomMetadataAttributes, + SPAN_TYPE_ATTR, +} from '@genkit-ai/core/tracing'; import * as z from 'zod'; +export const ATTR_PREFIX = 'genkit'; +export const SPAN_STATE_ATTR = ATTR_PREFIX + ':state'; + export const BaseDataPointSchema = z.object({ input: z.unknown(), output: z.unknown().optional(), @@ -43,33 +51,36 @@ export const ScoreSchema = z.object({ export const EVALUATOR_METADATA_KEY_USES_LLM = 'evaluatorUsesLlm'; export type Score = z.infer; - +export type BaseDataPoint = z.infer; export type Dataset< DataPoint extends typeof BaseDataPointSchema = typeof BaseDataPointSchema, > = Array>; -export const EvaluatorResponseSchema = z.array( - z.object({ - sampleIndex: z.number(), - testCaseId: z.string().optional(), - evaluation: ScoreSchema, - }) -); +export const EvalResponseSchema = z.object({ + sampleIndex: z.number().optional(), + testCaseId: z.string().optional(), + traceId: z.string().optional(), + spanId: z.string().optional(), + evaluation: ScoreSchema, +}); +export type EvalResponse = z.infer; -export type EvaluatorResponse = z.infer; +// TODO remove EvalResponses in favor of EvalResponse[] +export const EvalResponsesSchema = z.array(EvalResponseSchema); +export type EvalResponses = z.infer; type EvaluatorFn< DataPoint extends typeof BaseDataPointSchema = typeof BaseDataPointSchema, CustomOptions extends z.ZodTypeAny = z.ZodTypeAny, > = ( - input: Dataset, + input: z.infer, evaluatorOptions?: z.infer -) => Promise; +) => Promise; export type EvaluatorAction< DataPoint extends typeof BaseDataPointSchema = typeof BaseDataPointSchema, CustomOptions extends z.ZodTypeAny = z.ZodTypeAny, -> = Action & { +> = Action & { __dataPointType?: DataPoint; __configSchema?: CustomOptions; }; @@ -78,7 +89,7 @@ function withMetadata< DataPoint extends typeof BaseDataPointSchema = typeof BaseDataPointSchema, CustomOptions extends z.ZodTypeAny = z.ZodTypeAny, >( - evaluator: Action, + evaluator: Action, dataPointType?: DataPoint, configSchema?: CustomOptions ): EvaluatorAction { @@ -120,18 +131,68 @@ export function defineEvaluator< : z.array(BaseDataPointSchema), options: options.configSchema ?? z.unknown(), }), - outputSchema: EvaluatorResponseSchema, + outputSchema: EvalResponsesSchema, metadata: metadata, }, - (i) => { + async (i) => { setCustomMetadataAttributes({ subtype: 'evaluator' }); - return runner(i.dataset, i.options); + let evalResponses: EvalResponses = []; + for (let index = 0; index < i.dataset.length; index++) { + const datapoint = i.dataset[index]; + let spanId; + let traceId; + try { + await runInNewSpan( + { + metadata: { + name: `Test Case ${datapoint.testCaseId}`, + }, + labels: { + [SPAN_TYPE_ATTR]: 'evaluator', + }, + }, + async (metadata, otSpan) => { + try { + spanId = otSpan.spanContext().spanId; + traceId = otSpan.spanContext().traceId; + metadata.input = datapoint.input; + const testCaseOutput = await runner(datapoint, i.options); + testCaseOutput.sampleIndex = index; + testCaseOutput.spanId = spanId; + testCaseOutput.traceId = traceId; + metadata.output = testCaseOutput; + evalResponses.push(testCaseOutput); + return testCaseOutput; + } catch (e) { + const err = { + sampleIndex: index, + spanId, + traceId, + testCaseId: datapoint.testCaseId, + evaluation: { + error: `Evaluation of test case ${datapoint.testCaseId} failed: \n${(e as Error).stack}`, + }, + }; + metadata.output = err; + evalResponses.push(err); + throw e; + } + } + ); + } catch (e) { + logger.error( + `Evaluation of test case ${datapoint.testCaseId} failed: \n${(e as Error).stack}` + ); + continue; + } + } + return evalResponses; } ); const ewm = withMetadata( evaluator as any as Action< typeof EvalRequestSchema, - typeof EvaluatorResponseSchema + typeof EvalResponsesSchema >, options.dataPointType, options.configSchema @@ -158,7 +219,7 @@ export async function evaluate< evaluator: EvaluatorArgument; dataset: Dataset; options?: z.infer; -}): Promise { +}): Promise { let evaluator: EvaluatorAction; if (typeof params.evaluator === 'string') { evaluator = await lookupAction(`/evaluator/${params.evaluator}`); @@ -176,7 +237,7 @@ export async function evaluate< return (await evaluator({ dataset: params.dataset, options: params.options, - })) as EvaluatorResponse; + })) as EvalResponses; } export const EvaluatorInfoSchema = z.object({ diff --git a/js/core/package.json b/js/core/package.json index 91fe9a550..69a965907 100644 --- a/js/core/package.json +++ b/js/core/package.json @@ -36,6 +36,7 @@ "@opentelemetry/sdk-node": "^0.49.0", "@opentelemetry/sdk-trace-base": "^1.22.0", "ajv": "^8.12.0", + "async-mutex": "^0.5.0", "express": "^4.19.2", "express-openapi-validator": "^5.1.3", "json-schema": "^0.4.0", diff --git a/js/core/src/index.ts b/js/core/src/index.ts index 52fa1332c..2155298f4 100644 --- a/js/core/src/index.ts +++ b/js/core/src/index.ts @@ -19,7 +19,6 @@ export const GENKIT_CLIENT_HEADER = `genkit-node/${GENKIT_VERSION} gl-node/${pro export * from './action.js'; export * from './config.js'; +export { GenkitError } from './error.js'; export * from './flowTypes.js'; export * from './telemetryTypes.js'; - -export { GenkitError } from './error.js'; diff --git a/js/core/src/tracing/localFileTraceStore.ts b/js/core/src/tracing/localFileTraceStore.ts index 05bf3d7db..fc1f31b01 100644 --- a/js/core/src/tracing/localFileTraceStore.ts +++ b/js/core/src/tracing/localFileTraceStore.ts @@ -14,6 +14,7 @@ * limitations under the License. */ +import { Mutex } from 'async-mutex'; import crypto from 'crypto'; import fs from 'fs'; import os from 'os'; @@ -32,6 +33,7 @@ import { */ export class LocalFileTraceStore implements TraceStore { private readonly storeRoot; + private mutexes: Record = {}; constructor() { const rootHash = crypto @@ -59,24 +61,42 @@ export class LocalFileTraceStore implements TraceStore { return TraceDataSchema.parse(parsed); } + getMutex(id: string): Mutex { + if (!this.mutexes[id]) { + this.mutexes[id] = new Mutex(); + } + return this.mutexes[id]; + } + async save(id: string, trace: TraceData): Promise { - const existsing = await this.load(id); - if (existsing) { - Object.keys(trace.spans).forEach( - (spanId) => (existsing.spans[spanId] = trace.spans[spanId]) + const mutex = this.getMutex(id); + await mutex.waitForUnlock(); + const release = await mutex.acquire(); + try { + logger.debug( + `acquired lock to write trace ${id} for trace name ${trace.displayName}` + ); + const existing = await this.load(id); + if (existing) { + Object.keys(trace.spans).forEach( + (spanId) => (existing.spans[spanId] = trace.spans[spanId]) + ); + existing.displayName = trace.displayName; + existing.startTime = trace.startTime; + existing.endTime = trace.endTime; + trace = existing; + } + logger.debug( + `save trace ${id} with name ${trace.displayName} to ` + + path.resolve(this.storeRoot, `${id}`) + ); + fs.writeFileSync( + path.resolve(this.storeRoot, `${id}`), + JSON.stringify(trace) ); - existsing.displayName = trace.displayName; - existsing.startTime = trace.startTime; - existsing.endTime = trace.endTime; - trace = existsing; + } finally { + release(); } - logger.debug( - `save trace ${id} to ` + path.resolve(this.storeRoot, `${id}`) - ); - fs.writeFileSync( - path.resolve(this.storeRoot, `${id}`), - JSON.stringify(trace) - ); } async list(query?: TraceQuery): Promise { diff --git a/js/pnpm-lock.yaml b/js/pnpm-lock.yaml index 1a443365f..35e88fd99 100644 --- a/js/pnpm-lock.yaml +++ b/js/pnpm-lock.yaml @@ -90,6 +90,9 @@ importers: ajv: specifier: ^8.12.0 version: 8.12.0 + async-mutex: + specifier: ^0.5.0 + version: 0.5.0 express: specifier: ^4.19.2 version: 4.19.2 @@ -3437,6 +3440,12 @@ packages: shimmer: 1.2.1 dev: false + /async-mutex@0.5.0: + resolution: {integrity: sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==} + dependencies: + tslib: 2.6.2 + dev: false + /async-retry@1.3.3: resolution: {integrity: sha512-wfr/jstw9xNi/0teMHrRW7dsz3Lt5ARhYNZ2ewpadnhaIp5mbALhOAP+EAdsC7t4Z6wqsDVv9+W6gm1Dk9mEyw==} requiresBuild: true