diff --git a/packages/ingest/src/index.ts b/packages/ingest/src/index.ts index 34d1d84d4..8c79e2135 100644 --- a/packages/ingest/src/index.ts +++ b/packages/ingest/src/index.ts @@ -5,6 +5,7 @@ import { prisma } from "@oakai/db"; import { aiLogger } from "@oakai/logger"; +import { IngestError } from "./IngestError"; import { getLatestIngestId } from "./db-helpers/getLatestIngestId"; import { ingestStart } from "./steps/0-start"; import { captions } from "./steps/1-captions"; @@ -59,6 +60,11 @@ async function main() { } main().catch((error) => { - log.error("Error running command", error); + log.info(error.toString()); + if (error instanceof IngestError) { + log.info("Ingest ID " + error.ingestId); + log.info("Lesson ID " + error.lessonId); + } + log.error("Error running command, see above"); process.exit(1); }); diff --git a/packages/ingest/src/steps/7-publish.ts b/packages/ingest/src/steps/7-publish.ts index 708ee6c77..8f1aca46e 100644 --- a/packages/ingest/src/steps/7-publish.ts +++ b/packages/ingest/src/steps/7-publish.ts @@ -2,11 +2,9 @@ import { LessonPlanSchema, type LooseLessonPlan, } from "@oakai/aila/src/protocol/schema"; -import type { PrismaClientWithAccelerate } from "@oakai/db"; -import type { Prisma } from "@oakai/db"; +import type { Prisma, PrismaClientWithAccelerate } from "@oakai/db"; import { createId } from "@paralleldrive/cuid2"; import { isTruthy } from "remeda"; -import invariant from "tiny-invariant"; import { z } from "zod"; import { IngestError } from "../IngestError"; @@ -32,7 +30,7 @@ export async function publishToRag({ ingestId: string; }) { log.info("Publishing lesson plans and parts to RAG schema"); - // const ingest = await getIngestById({ prisma, ingestId }); + const lessons = await loadLessonsAndUpdateState({ prisma, ingestId, @@ -42,6 +40,9 @@ export async function publishToRag({ log.info(`Loaded ${lessons.length} lessons`); + /** + * Build list of lesson plans to publish + */ const ragLessonPlans: { oakLessonId?: number; oakLessonSlug: string; @@ -53,7 +54,7 @@ export async function publishToRag({ for (const lesson of lessons) { if (!lesson.lessonPlan) { - throw new IngestError("Lessin is missing lesson plan", { + throw new IngestError("Lesson is missing lesson plan", { ingestId, lessonId: lesson.id, }); @@ -71,6 +72,8 @@ export async function publishToRag({ }); } + log.info("About to chunk"); + /** * Add lesson plans to RAG schema */ @@ -78,9 +81,16 @@ export async function publishToRag({ data: ragLessonPlans, chunkSize: 500, fn: async (data) => { - await prisma.ragLessonPlan.createMany({ - data, - }); + try { + log.info(`Writing ${data.length} lesson plans`); + await prisma.ragLessonPlan.createMany({ + data, + }); + log.info(`Written ${data.length} lesson plans`); + } catch (error) { + log.error(error); + throw error; + } }, }); @@ -114,6 +124,9 @@ export async function publishToRag({ const ingestLessonId = ragLessonPlan.ingestLessonId; const lesson = lessons.find((l) => l.id === ingestLessonId); + /** + * @todo this takes ages one by one. group these queries + */ const lessonPlanParts = await prisma.$queryRaw` SELECT key, value_text, value_json, embedding::text FROM ingest.ingest_lesson_plan_part @@ -164,23 +177,84 @@ export async function publishToRag({ const now = new Date().toISOString(); // Need to use $queryRaw because Prisma doesn't support the vector type await prisma.$queryRaw` - INSERT INTO rag.rag_lesson_plan_parts (id, rag_lesson_plan_id, key, value_text, value_json, created_at, updated_at, embedding) - SELECT * - FROM UNNEST ( - ARRAY[${data.map(() => createId())}]::text[], - ARRAY[${data.map((p) => p.ragLessonPlanId)}]::text[], - ARRAY[${data.map((p) => p.key)}]::text[], - ARRAY[${data.map((p) => p.valueText)}]::text[], - ARRAY[${data.map((p) => JSON.stringify(p.valueJson))}]::jsonb[], - ARRAY[${data.map(() => now)}]::timestamp[], - ARRAY[${data.map(() => now)}]::timestamp[], - ARRAY[${data.map((p) => `[${p.embedding.join(",")}]`)}]::vector(256)[] - ); - `; + INSERT INTO rag.rag_lesson_plan_parts (id, rag_lesson_plan_id, key, value_text, value_json, created_at, updated_at, embedding) + SELECT * + FROM UNNEST ( + ARRAY[${data.map(() => createId())}]::text[], + ARRAY[${data.map((p) => p.ragLessonPlanId)}]::text[], + ARRAY[${data.map((p) => p.key)}]::text[], + ARRAY[${data.map((p) => p.valueText)}]::text[], + ARRAY[${data.map((p) => JSON.stringify(p.valueJson))}]::jsonb[], + ARRAY[${data.map(() => now)}]::timestamp[], + ARRAY[${data.map(() => now)}]::timestamp[], + ARRAY[${data.map((p) => { + return "[" + p.embedding.join(",") + "]"; + })}]::vector[] + ); + `; log.info(prisma.$queryRawUnsafe.toString()); }, }); + /** + * In a transaction, update old versions of lesson plans to be unpublished + * and the new versions to be published + */ + await prisma.$transaction([ + prisma.ragLessonPlan.updateMany({ + where: { + oakLessonSlug: { + in: lessons.map((l) => l.data.lessonSlug), + }, + id: { + notIn: lessons.map((l) => l.id), + }, + }, + data: { + isPublished: false, + }, + }), + prisma.ragLessonPlan.updateMany({ + where: { + id: { + in: lessons.map((l) => l.id), + }, + }, + data: { + isPublished: true, + }, + }), + ]); + + // /** + // * Perform check to ensure no duplicate lesson plans are created + // */ + // const duplicateResults = await chunkAndPromiseAll({ + // data: lessons, + // chunkSize: 500, + // fn: async (data) => + // prisma.ragLessonPlan.findMany({ + // where: { + // oakLessonSlug: { + // in: data.map((l) => l.data.lessonSlug), + // }, + // }, + // select: { + // ingestLessonId: true, + // oakLessonId: true, + // oakLessonSlug: true, + // }, + // }), + // }); + // const duplicateLessonPlans = duplicateResults.flat(); + + // if (duplicateLessonPlans.length > 0) { + // log.error(`Duplicate lesson plans found: ${duplicateLessonPlans.length}`); + // throw new IngestError("Duplicate lesson plans found", { + // ingestId, + // }); + // } + log.info("Published lesson plans and parts to RAG schema"); }