Skip to content

Commit

Permalink
fix publishing step
Browse files Browse the repository at this point in the history
  • Loading branch information
mantagen committed Dec 18, 2024
1 parent dba54e4 commit 7485394
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 22 deletions.
8 changes: 7 additions & 1 deletion packages/ingest/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import { prisma } from "@oakai/db";
import { aiLogger } from "@oakai/logger";

import { IngestError } from "./IngestError";
import { getLatestIngestId } from "./db-helpers/getLatestIngestId";
import { ingestStart } from "./steps/0-start";
import { captions } from "./steps/1-captions";
Expand Down Expand Up @@ -59,6 +60,11 @@ async function main() {
}

main().catch((error) => {
log.error("Error running command", error);
log.info(error.toString());
if (error instanceof IngestError) {
log.info("Ingest ID " + error.ingestId);
log.info("Lesson ID " + error.lessonId);
}
log.error("Error running command, see above");
process.exit(1);
});
116 changes: 95 additions & 21 deletions packages/ingest/src/steps/7-publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ import {
LessonPlanSchema,
type LooseLessonPlan,
} from "@oakai/aila/src/protocol/schema";
import type { PrismaClientWithAccelerate } from "@oakai/db";
import type { Prisma } from "@oakai/db";
import type { Prisma, PrismaClientWithAccelerate } from "@oakai/db";
import { createId } from "@paralleldrive/cuid2";
import { isTruthy } from "remeda";
import invariant from "tiny-invariant";
import { z } from "zod";

import { IngestError } from "../IngestError";
Expand All @@ -32,7 +30,7 @@ export async function publishToRag({
ingestId: string;
}) {
log.info("Publishing lesson plans and parts to RAG schema");
// const ingest = await getIngestById({ prisma, ingestId });

const lessons = await loadLessonsAndUpdateState({
prisma,
ingestId,
Expand All @@ -42,6 +40,9 @@ export async function publishToRag({

log.info(`Loaded ${lessons.length} lessons`);

/**
* Build list of lesson plans to publish
*/
const ragLessonPlans: {
oakLessonId?: number;
oakLessonSlug: string;
Expand All @@ -53,7 +54,7 @@ export async function publishToRag({

for (const lesson of lessons) {
if (!lesson.lessonPlan) {
throw new IngestError("Lessin is missing lesson plan", {
throw new IngestError("Lesson is missing lesson plan", {
ingestId,
lessonId: lesson.id,
});
Expand All @@ -71,16 +72,25 @@ export async function publishToRag({
});
}

log.info("About to chunk");

/**
* Add lesson plans to RAG schema
*/
await chunkAndPromiseAll({
data: ragLessonPlans,
chunkSize: 500,
fn: async (data) => {
await prisma.ragLessonPlan.createMany({
data,
});
try {
log.info(`Writing ${data.length} lesson plans`);
await prisma.ragLessonPlan.createMany({
data,
});
log.info(`Written ${data.length} lesson plans`);
} catch (error) {
log.error(error);
throw error;
}
},
});

Expand Down Expand Up @@ -114,6 +124,9 @@ export async function publishToRag({
const ingestLessonId = ragLessonPlan.ingestLessonId;
const lesson = lessons.find((l) => l.id === ingestLessonId);

/**
* @todo this takes ages one by one. group these queries
*/
const lessonPlanParts = await prisma.$queryRaw`
SELECT key, value_text, value_json, embedding::text
FROM ingest.ingest_lesson_plan_part
Expand Down Expand Up @@ -164,23 +177,84 @@ export async function publishToRag({
const now = new Date().toISOString();
// Need to use $queryRaw because Prisma doesn't support the vector type
await prisma.$queryRaw`
INSERT INTO rag.rag_lesson_plan_parts (id, rag_lesson_plan_id, key, value_text, value_json, created_at, updated_at, embedding)
SELECT *
FROM UNNEST (
ARRAY[${data.map(() => createId())}]::text[],
ARRAY[${data.map((p) => p.ragLessonPlanId)}]::text[],
ARRAY[${data.map((p) => p.key)}]::text[],
ARRAY[${data.map((p) => p.valueText)}]::text[],
ARRAY[${data.map((p) => JSON.stringify(p.valueJson))}]::jsonb[],
ARRAY[${data.map(() => now)}]::timestamp[],
ARRAY[${data.map(() => now)}]::timestamp[],
ARRAY[${data.map((p) => `[${p.embedding.join(",")}]`)}]::vector(256)[]
);
`;
INSERT INTO rag.rag_lesson_plan_parts (id, rag_lesson_plan_id, key, value_text, value_json, created_at, updated_at, embedding)
SELECT *
FROM UNNEST (
ARRAY[${data.map(() => createId())}]::text[],
ARRAY[${data.map((p) => p.ragLessonPlanId)}]::text[],
ARRAY[${data.map((p) => p.key)}]::text[],
ARRAY[${data.map((p) => p.valueText)}]::text[],
ARRAY[${data.map((p) => JSON.stringify(p.valueJson))}]::jsonb[],
ARRAY[${data.map(() => now)}]::timestamp[],
ARRAY[${data.map(() => now)}]::timestamp[],
ARRAY[${data.map((p) => {
return "[" + p.embedding.join(",") + "]";
})}]::vector[]
);
`;

log.info(prisma.$queryRawUnsafe.toString());
},
});

/**
* In a transaction, update old versions of lesson plans to be unpublished
* and the new versions to be published
*/
await prisma.$transaction([
prisma.ragLessonPlan.updateMany({
where: {
oakLessonSlug: {
in: lessons.map((l) => l.data.lessonSlug),
},
id: {
notIn: lessons.map((l) => l.id),
},
},
data: {
isPublished: false,
},
}),
prisma.ragLessonPlan.updateMany({
where: {
id: {
in: lessons.map((l) => l.id),
},
},
data: {
isPublished: true,
},
}),
]);

// /**
// * Perform check to ensure no duplicate lesson plans are created
// */
// const duplicateResults = await chunkAndPromiseAll({
// data: lessons,
// chunkSize: 500,
// fn: async (data) =>
// prisma.ragLessonPlan.findMany({
// where: {
// oakLessonSlug: {
// in: data.map((l) => l.data.lessonSlug),
// },
// },
// select: {
// ingestLessonId: true,
// oakLessonId: true,
// oakLessonSlug: true,
// },
// }),
// });
// const duplicateLessonPlans = duplicateResults.flat();

// if (duplicateLessonPlans.length > 0) {
// log.error(`Duplicate lesson plans found: ${duplicateLessonPlans.length}`);
// throw new IngestError("Duplicate lesson plans found", {
// ingestId,
// });
// }

log.info("Published lesson plans and parts to RAG schema");
}

0 comments on commit 7485394

Please sign in to comment.