Skip to content

Commit

Permalink
feat: Add MeetingTemplate update embeddings trigger (#9838)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dschoordsch authored Jun 17, 2024
1 parent 71b17c2 commit 87e0d86
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 9 deletions.
4 changes: 2 additions & 2 deletions packages/embedder/ai_models/AbstractEmbeddingsModel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ export abstract class AbstractEmbeddingsModel extends AbstractModel {
.columns(['jobType', 'priority', 'embeddingsMetadataId', 'model'])
.expression(({selectFrom}) =>
selectFrom('EmbeddingsMetadata')
.select(({lit, ref}) => [
.select(({ref}) => [
sql.lit('embed:start').as('jobType'),
lit(priority).as('priority'),
priority.as('priority'),
ref('id').as('embeddingsMetadataId'),
sql.lit(this.tableName).as('model')
])
Expand Down
6 changes: 3 additions & 3 deletions packages/embedder/getEmbedderPriority.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import ms from 'ms'
import {sql} from 'kysely'

/*
The Job Queue has a first in first out (FIFO) strategy with a few exceptions:
- Realtime requests from the app should come before historal data processing
Expand All @@ -16,6 +17,5 @@ e.g. Process realtime requests immediately, but start processing this historical
In 5 days, that historical data will be a higher priority than new realtime requests.
*/
export const getEmbedderPriority = (maxDelayInDays: number) => {
const maxDelayInSeconds = maxDelayInDays * ms('1d')
return -(2 ** 31) + ~~(Date.now() / 1000) + maxDelayInSeconds
return sql<number>`"getEmbedderPriority"(${maxDelayInDays})`
}
4 changes: 2 additions & 2 deletions packages/embedder/insertDiscussionsIntoMetadataAndQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ export const insertDiscussionsIntoMetadataAndQueue = async (
.insertInto('EmbeddingsJobQueue')
.columns(['jobType', 'priority', 'embeddingsMetadataId', 'model'])
.expression(({selectFrom}) =>
selectFrom('Metadata').select(({lit, ref}) => [
selectFrom('Metadata').select(({ref}) => [
sql.lit('embed:start').as('jobType'),
lit(priority).as('priority'),
priority.as('priority'),
ref('Metadata.id').as('embeddingsMetadataId'),
ref('Metadata.model').as('model')
])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ export const insertMeetingTemplatesIntoMetadataAndQueue = async (
.insertInto('EmbeddingsJobQueue')
.columns(['jobType', 'priority', 'embeddingsMetadataId', 'model'])
.expression(({selectFrom}) =>
selectFrom('Metadata').select(({lit, ref}) => [
selectFrom('Metadata').select(({ref}) => [
sql.lit('embed:start').as('jobType'),
lit(priority).as('priority'),
priority.as('priority'),
ref('Metadata.id').as('embeddingsMetadataId'),
ref('Metadata.model').as('model')
])
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import {Client} from 'pg'
import getPgConfig from '../getPgConfig'

export async function up() {
const client = new Client(getPgConfig())
await client.connect()
await client.query(`
CREATE OR REPLACE FUNCTION "getEmbedderPriority"(IN "maxDelayInDays" INTEGER)
RETURNS INTEGER LANGUAGE PLPGSQL AS $$
BEGIN
RETURN -(2 ^ 31) + FLOOR(EXTRACT(EPOCH FROM CURRENT_TIMESTAMP) / 1000) + "maxDelayInDays" * 86400;
END
$$;
`)
await client.end()
}

export async function down() {
const client = new Client(getPgConfig())
await client.connect()
await client.query(`
DROP FUNCTION IF EXISTS "getEmbedderPriority"(IN INTEGER);
`)
await client.end()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import {Client} from 'pg'
import getPgConfig from '../getPgConfig'

export async function up() {
const client = new Client(getPgConfig())
await client.connect()
await client.query(`
CREATE OR REPLACE FUNCTION "updateEmbedding"()
RETURNS TRIGGER LANGUAGE PLPGSQL AS $$
DECLARE
"metadataId" INTEGER;
BEGIN
BEGIN
SELECT id FROM "EmbeddingsMetadata" WHERE "objectType" = 'meetingTemplate' AND "refId" = NEW.id INTO STRICT "metadataId";
EXCEPTION
WHEN NO_DATA_FOUND THEN
INSERT INTO "EmbeddingsMetadata" ("objectType", "refId", "teamId", "refUpdatedAt") VALUES ('meetingTemplate', NEW.id, NEW."teamId", NEW."updatedAt") RETURNING id INTO "metadataId";
END;
INSERT INTO "EmbeddingsJobQueue" ("embeddingsMetadataId", "jobType", "priority", "model") VALUES ("metadataId", 'embed:start', "getEmbedderPriority"(1), 'Embeddings_ember_1') ON CONFLICT DO NOTHING;
RETURN NEW;
END
$$;
DROP TRIGGER IF EXISTS "update_embedding_on_MeetingTemplate" ON "MeetingTemplate";
CREATE TRIGGER "update_embedding_on_MeetingTemplate" AFTER UPDATE ON "MeetingTemplate" FOR EACH ROW EXECUTE PROCEDURE "updateEmbedding"();
`)
await client.end()
}

export async function down() {
const client = new Client(getPgConfig())
await client.connect()
await client.query(`
DROP TRIGGER IF EXISTS "update_embedding_on_MeetingTemplate" ON "User";
DROP FUNCTION IF EXISTS "update_embedding_on_MeetingTemplate"();
`)
await client.end()
}

0 comments on commit 87e0d86

Please sign in to comment.