From b1ae661e7e0e91e276db3dc4d8afa2d0c5c94e8f Mon Sep 17 00:00:00 2001 From: Shruti Padamata Date: Fri, 16 Aug 2024 14:29:19 -0700 Subject: [PATCH] Local file based DatasetStore implementation and Tools API changes --- genkit-tools/common/src/eval/index.ts | 7 +- .../common/src/eval/localFileDatasetStore.ts | 221 ++++++++++++++++++ genkit-tools/common/src/server/router.ts | 48 +++- genkit-tools/common/src/types/apis.ts | 17 +- genkit-tools/common/src/types/eval.ts | 22 +- 5 files changed, 300 insertions(+), 15 deletions(-) create mode 100644 genkit-tools/common/src/eval/localFileDatasetStore.ts diff --git a/genkit-tools/common/src/eval/index.ts b/genkit-tools/common/src/eval/index.ts index 28f2374b9..51b157ae3 100644 --- a/genkit-tools/common/src/eval/index.ts +++ b/genkit-tools/common/src/eval/index.ts @@ -14,7 +14,8 @@ * limitations under the License. */ -import { EvalStore } from '../types/eval'; +import { DatasetStore, EvalStore } from '../types/eval'; +import { LocalFileDatasetStore } from './localFileDatasetStore'; import { LocalFileEvalStore } from './localFileEvalStore'; export { EvalFlowInput, EvalFlowInputSchema } from '../types/eval'; export * from './exporter'; @@ -24,3 +25,7 @@ export function getEvalStore(): EvalStore { // TODO: This should provide EvalStore, based on tools config. return LocalFileEvalStore.getEvalStore(); } + +export function getDatasetStore(): DatasetStore { + return LocalFileDatasetStore.getDatasetStore(); +} diff --git a/genkit-tools/common/src/eval/localFileDatasetStore.ts b/genkit-tools/common/src/eval/localFileDatasetStore.ts new file mode 100644 index 000000000..0838cc0e6 --- /dev/null +++ b/genkit-tools/common/src/eval/localFileDatasetStore.ts @@ -0,0 +1,221 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import crypto from 'crypto'; +import fs from 'fs'; +import { appendFile, readFile, rm, writeFile } from 'fs/promises'; +import os from 'os'; +import path from 'path'; +import { v4 as uuidv4 } from 'uuid'; +import { CreateDatasetRequest, UpdateDatasetRequest } from '../types'; +import { + Dataset, + DatasetMetadata, + DatasetMetadataSchema, + DatasetStore, + EvalFlowInputSchema, +} from '../types/eval'; +import { logger } from '../utils/logger'; + +/** + * A local, file-based DatasetStore implementation. + */ +export class LocalFileDatasetStore implements DatasetStore { + private readonly storeRoot; + private readonly indexFile; + private readonly INDEX_DELIMITER = '\n'; + private static cachedDatasetStore: LocalFileDatasetStore | null = null; + + private constructor() { + this.storeRoot = this.generateRootPath(); + this.indexFile = this.getIndexFilePath(); + fs.mkdirSync(this.storeRoot, { recursive: true }); + if (!fs.existsSync(this.indexFile)) { + fs.writeFileSync(path.resolve(this.indexFile), ''); + } + logger.info( + `Initialized local file dataset store at root: ${this.storeRoot}` + ); + } + + static getDatasetStore() { + if (!this.cachedDatasetStore) { + this.cachedDatasetStore = new LocalFileDatasetStore(); + } + return this.cachedDatasetStore; + } + + static reset() { + this.cachedDatasetStore = null; + } + + async createDataset(req: CreateDatasetRequest): Promise { + return this.createDatasetInternal(req.data, req.displayName); + } + + private async createDatasetInternal( + data: Dataset, + displayName?: string + ): Promise { + const datasetId = this.generateDatasetId(); + const filePath = path.resolve( + this.storeRoot, + this.generateFileName(datasetId) + ); + + if (!fs.existsSync(filePath)) { + logger.error(`Dataset already exists at ` + filePath); + throw new Error( + `Create dataset failed: file already exists at {$filePath}` + ); + } + + logger.info(`Saving Dataset to ` + filePath); + await writeFile(filePath, JSON.stringify(data)); + + const metadata = { + datasetId: datasetId, + size: Array.isArray(data) ? data.length : data.samples.length, + uri: 'file://' + filePath, + version: 1, + displayName: displayName, + createTime: Date.now().toString(), + updateTime: Date.now().toString(), + }; + + logger.debug( + `Saving DatasetMetadata for ID ${datasetId} to ` + + path.resolve(this.indexFile) + ); + await appendFile( + path.resolve(this.indexFile), + JSON.stringify(metadata) + this.INDEX_DELIMITER + ); + return metadata; + } + + async updateDataset(req: UpdateDatasetRequest): Promise { + if (!req.datasetId) { + return this.createDatasetInternal(req.patch, req.displayName); + } + const datasetId = req.datasetId; + const filePath = path.resolve( + this.storeRoot, + this.generateFileName(datasetId) + ); + if (!fs.existsSync(filePath)) { + return this.createDatasetInternal(req.patch, req.displayName); + } + + const metadatas = await this.listDatasets(); + const prevMetadata = metadatas.find( + (metadata) => metadata.datasetId === datasetId + ); + if (!prevMetadata) { + throw new Error(`Update dataset failed: dataset metadata not found`); + } + + logger.info(`Updating Dataset at ` + filePath); + await writeFile(filePath, JSON.stringify(req.patch)); + + const newMetadata = { + datasetId: datasetId, + size: Array.isArray(req.patch) + ? req.patch.length + : req.patch.samples.length, + uri: 'file://' + filePath, + version: prevMetadata.version++, + displayName: req.displayName, + createTime: prevMetadata.createTime, + updateTime: Date.now().toString(), + }; + + logger.debug( + `Updating DatasetMetadata for ID ${datasetId} at ` + + path.resolve(this.indexFile) + ); + const prevMetadataIndex = metadatas.indexOf(prevMetadata); + // Replace the metadata object with the new metadata + metadatas[prevMetadataIndex] = newMetadata; + await writeFile( + path.resolve(this.indexFile), + metadatas + .map((metadata) => JSON.stringify(metadata)) + .join(this.INDEX_DELIMITER) + ); + + return newMetadata; + } + + async getDataset(id: string): Promise { + const filePath = path.resolve(this.storeRoot, this.generateFileName(id)); + if (!fs.existsSync(filePath)) { + throw new Error(`Dataset not found for dataset ID {$id}`); + } + return await readFile(filePath, 'utf8').then((data) => + EvalFlowInputSchema.parse(JSON.parse(data)) + ); + } + + async listDatasets(): Promise { + let metadatas = await readFile(this.indexFile, 'utf8').then((data) => { + if (!data) { + return []; + } + // strip the final carriage return before parsing all lines + return data + .slice(0, -1) + .split(this.INDEX_DELIMITER) + .map(this.parseLineToDatasetMetadata); + }); + + logger.debug(`Found Dataset Metadatas: ${JSON.stringify(metadatas)}`); + + return metadatas; + } + + async deleteDataset(id: string): Promise { + const filePath = path.resolve(this.storeRoot, this.generateFileName(id)); + if (!fs.existsSync(filePath)) { + return; + } + return await rm(filePath); + } + + private generateRootPath(): string { + const rootHash = crypto + .createHash('md5') + .update(process.cwd() || 'unknown') + .digest('hex'); + return path.resolve(os.tmpdir(), `.genkit/${rootHash}/datasets`); + } + + private generateDatasetId(): string { + return uuidv4(); + } + + private generateFileName(datasetId: string): string { + return `${datasetId}.json`; + } + + private getIndexFilePath(): string { + return path.resolve(this.storeRoot, 'index.txt'); + } + + private parseLineToDatasetMetadata(key: string): DatasetMetadata { + return DatasetMetadataSchema.parse(JSON.parse(key)); + } +} diff --git a/genkit-tools/common/src/server/router.ts b/genkit-tools/common/src/server/router.ts index 5f1859db8..65f85c6f1 100644 --- a/genkit-tools/common/src/server/router.ts +++ b/genkit-tools/common/src/server/router.ts @@ -14,7 +14,8 @@ * limitations under the License. */ import { initTRPC, TRPCError } from '@trpc/server'; -import { getEvalStore } from '../eval'; +import { z } from 'zod'; +import { getDatasetStore, getEvalStore } from '../eval'; import { Runner } from '../runner/runner'; import { GenkitToolsError } from '../runner/types'; import { Action } from '../types/action'; @@ -202,6 +203,51 @@ export const TOOLS_SERVER_ROUTER = (runner: Runner) => return evalRun; }), + /** Retrieves all eval datasets */ + listDatasets: loggedProcedure + .input(z.void()) + .output(z.array(evals.DatasetMetadataSchema)) + .query(async () => { + const response = await getDatasetStore().listDatasets(); + return response; + }), + + /** Retrieves an existing dataset */ + getDataset: loggedProcedure + .input(z.string()) + .output(evals.EvalFlowInputSchema) + .query(async ({ input }) => { + const response = await getDatasetStore().getDataset(input); + return response; + }), + + /** Creates a new dataset */ + createDataset: loggedProcedure + .input(apis.CreateDatasetRequestSchema) + .output(evals.DatasetMetadataSchema) + .query(async ({ input }) => { + const response = await getDatasetStore().createDataset(input); + return response; + }), + + /** Updates an exsting dataset */ + updateDataset: loggedProcedure + .input(apis.UpdateDatasetRequestSchema) + .output(evals.DatasetMetadataSchema) + .query(async ({ input }) => { + const response = await getDatasetStore().updateDataset(input); + return response; + }), + + /** Deletes an exsting dataset */ + deleteDataset: loggedProcedure + .input(z.string()) + .output(z.void()) + .query(async ({ input }) => { + const response = await getDatasetStore().deleteDataset(input); + return response; + }), + /** Send a screen view analytics event */ sendPageView: t.procedure .input(apis.PageViewSchema) diff --git a/genkit-tools/common/src/types/apis.ts b/genkit-tools/common/src/types/apis.ts index 6ed3e4961..2f72557ef 100644 --- a/genkit-tools/common/src/types/apis.ts +++ b/genkit-tools/common/src/types/apis.ts @@ -15,7 +15,7 @@ */ import { z } from 'zod'; -import { EvalRunKeySchema } from './eval'; +import { EvalFlowInputSchema, EvalRunKeySchema } from './eval'; import { FlowStateSchema } from './flow'; import { GenerationCommonConfigSchema, @@ -132,3 +132,18 @@ export const GetEvalRunRequestSchema = z.object({ name: z.string(), }); export type GetEvalRunRequest = z.infer; + +export const CreateDatasetRequestSchema = z.object({ + data: EvalFlowInputSchema, + displayName: z.string().optional(), +}); + +export type CreateDatasetRequest = z.infer; + +export const UpdateDatasetRequestSchema = z.object({ + /** Supports upsert */ + patch: EvalFlowInputSchema, + datasetId: z.string().optional(), + displayName: z.string().optional(), +}); +export type UpdateDatasetRequest = z.infer; diff --git a/genkit-tools/common/src/types/eval.ts b/genkit-tools/common/src/types/eval.ts index d9631d13e..02c943246 100644 --- a/genkit-tools/common/src/types/eval.ts +++ b/genkit-tools/common/src/types/eval.ts @@ -15,7 +15,12 @@ */ import { z } from 'zod'; -import { ListEvalKeysRequest, ListEvalKeysResponse } from './apis'; +import { + CreateDatasetRequest, + ListEvalKeysRequest, + ListEvalKeysResponse, + UpdateDatasetRequest, +} from './apis'; /** * This file defines schema and types that are used by the Eval store. @@ -145,7 +150,7 @@ export interface EvalStore { /** * Metadata for Dataset objects containing version, create and update time, etc. */ -export const DatasetMetadaSchema = z.object({ +export const DatasetMetadataSchema = z.object({ /** autogenerated */ datasetId: z.string(), size: z.number(), @@ -156,14 +161,7 @@ export const DatasetMetadaSchema = z.object({ createTime: z.string(), updateTime: z.string(), }); -export type DatasetMetadata = z.infer; - -export const UpdateDatasetRequestSchema = z.object({ - /** Supports upsert */ - patch: EvalFlowInputSchema, - displayName: z.string().optional(), -}); -export type UpdateDatasetRequest = z.infer; +export type DatasetMetadata = z.infer; /** * Eval dataset store persistence interface. @@ -171,10 +169,10 @@ export type UpdateDatasetRequest = z.infer; export interface DatasetStore { /** * Create new dataset with the given data - * @param data data containing eval flow inputs + * @param req create requeest with the data * @returns dataset metadata */ - createDataset(data: Dataset): Promise; + createDataset(req: CreateDatasetRequest): Promise; /** * Update dataset