diff --git a/packages/server/graphql/private/mutations/checkRethinkPgEquality.ts b/packages/server/graphql/private/mutations/checkRethinkPgEquality.ts index 137316624e8..818af10d9e6 100644 --- a/packages/server/graphql/private/mutations/checkRethinkPgEquality.ts +++ b/packages/server/graphql/private/mutations/checkRethinkPgEquality.ts @@ -1,12 +1,11 @@ import fs from 'fs' import path from 'path' import getRethink from '../../../database/rethinkDriver' -import getMeetingTemplatesByIds from '../../../postgres/queries/getMeetingTemplatesByIds' +import getKysely from '../../../postgres/getKysely' import {checkRowCount, checkTableEq} from '../../../postgres/utils/checkEqBase' import { compareDateAlmostEqual, - compareRValUndefinedAsFalse, - compareRValUndefinedAsNull, + compareRValUndefinedAsNullAndTruncateRVal, defaultEqFn } from '../../../postgres/utils/rethinkEqualityFns' import {MutationResolvers} from '../resolverTypes' @@ -35,22 +34,38 @@ const checkRethinkPgEquality: MutationResolvers['checkRethinkPgEquality'] = asyn ) => { const r = await getRethink() - if (tableName === 'MeetingTemplate') { + if (tableName === 'RetroReflectionGroup') { const rowCountResult = await checkRowCount(tableName) - const rethinkQuery = r.table('MeetingTemplate').orderBy('updatedAt', {index: 'updatedAt'}) - const errors = await checkTableEq(rethinkQuery, getMeetingTemplatesByIds, { + const rethinkQuery = (updatedAt: Date, id: string | number) => { + return r + .table('RetroReflectionGroup') + .between([updatedAt, id], [r.maxval, r.maxval], { + index: 'updatedAtId', + leftBound: 'open', + rightBound: 'closed' + }) + .orderBy({index: 'updatedAtId'}) as any + } + const pgQuery = (ids: string[]) => { + return getKysely() + .selectFrom('RetroReflectionGroup') + .selectAll() + .where('id', 'in', ids) + .execute() + } + const errors = await checkTableEq(rethinkQuery, pgQuery, { + id: defaultEqFn, createdAt: defaultEqFn, - isActive: defaultEqFn, - name: defaultEqFn, - teamId: defaultEqFn, updatedAt: compareDateAlmostEqual, - scope: defaultEqFn, - orgId: defaultEqFn, - parentTemplateId: compareRValUndefinedAsNull, - lastUsedAt: compareRValUndefinedAsNull, - type: defaultEqFn, - isStarter: compareRValUndefinedAsFalse, - isFree: compareRValUndefinedAsFalse + isActive: defaultEqFn, + meetingId: defaultEqFn, + promptId: defaultEqFn, + sortOrder: defaultEqFn, + voterIds: defaultEqFn, + smartTitle: compareRValUndefinedAsNullAndTruncateRVal(255), + title: compareRValUndefinedAsNullAndTruncateRVal(255), + summary: compareRValUndefinedAsNullAndTruncateRVal(2000), + discussionPromptQuestion: compareRValUndefinedAsNullAndTruncateRVal(2000) }) return handleResult(tableName, rowCountResult, errors, writeToFile) } diff --git a/packages/server/postgres/migrations/1708127504000_updateEmbeddingMetadata.ts b/packages/server/postgres/migrations/1708127504000_updateEmbeddingMetadata.ts index 3c93617c0b1..279dd6f90b7 100644 --- a/packages/server/postgres/migrations/1708127504000_updateEmbeddingMetadata.ts +++ b/packages/server/postgres/migrations/1708127504000_updateEmbeddingMetadata.ts @@ -4,9 +4,13 @@ import getPgConfig from '../getPgConfig' export async function up() { const client = new Client(getPgConfig()) await client.connect() - await client.query(` + try { + await client.query(` ALTER TABLE "EmbeddingsMetadata" RENAME COLUMN "embedText" TO "fullText"; `) + } catch { + // noop + } await client.end() } diff --git a/packages/server/postgres/migrations/1712075131388_retroReflectionGroups2.ts b/packages/server/postgres/migrations/1712075131388_retroReflectionGroups2.ts new file mode 100644 index 00000000000..c0027a0073d --- /dev/null +++ b/packages/server/postgres/migrations/1712075131388_retroReflectionGroups2.ts @@ -0,0 +1,88 @@ +import {Kysely, PostgresDialect} from 'kysely' +import {r} from 'rethinkdb-ts' +import connectRethinkDB from '../../database/connectRethinkDB' +import getPg from '../getPg' + +export async function up() { + await connectRethinkDB() + const pg = new Kysely({ + dialect: new PostgresDialect({ + pool: getPg() + }) + }) + try { + await r + .table('RetroReflectionGroup') + .indexCreate('updatedAtId', (row: any) => [row('updatedAt'), row('id')]) + .run() + await r.table('RetroReflectionGroup').indexWait().run() + } catch { + // index already exists + } + + const MAX_PG_PARAMS = 65545 + const PG_COLS = [ + 'id', + 'createdAt', + 'updatedAt', + 'isActive', + 'meetingId', + 'promptId', + 'sortOrder', + 'voterIds', + 'smartTitle', + 'title', + 'summary', + 'discussionPromptQuestion' + ] as const + type RetroReflectionGroup = { + [K in (typeof PG_COLS)[number]]: any + } + const BATCH_SIZE = Math.trunc(MAX_PG_PARAMS / PG_COLS.length) + + let curUpdatedAt = r.minval + let curId = r.minval + for (let i = 0; i < 1e6; i++) { + const rawRowsToInsert = (await r + .table('RetroReflectionGroup') + .between([curUpdatedAt, curId], [r.maxval, r.maxval], { + index: 'updatedAtId', + leftBound: 'open', + rightBound: 'closed' + }) + .orderBy({index: 'updatedAtId'}) + .limit(BATCH_SIZE) + .pluck(...PG_COLS) + .run()) as RetroReflectionGroup[] + + const rowsToInsert = rawRowsToInsert.map((row) => ({ + ...row, + title: row.title?.slice(0, 255), + smartTitle: row.smartTitle?.slice(0, 255), + summary: row.summary?.slice(0, 2000) + })) + if (rowsToInsert.length === 0) break + const lastRow = rowsToInsert[rowsToInsert.length - 1] + curUpdatedAt = lastRow.updatedAt + curId = lastRow.id + try { + await pg + .insertInto('RetroReflectionGroup') + .values(rowsToInsert) + .onConflict((oc) => oc.doNothing()) + .execute() + } catch (e) { + console.log({lastRow}, rowsToInsert.length) + throw e + } + } +} + +export async function down() { + await connectRethinkDB() + try { + await r.table('RetroReflectionGroup').indexDrop('updatedAtId').run() + } catch { + // index already dropped + } +} diff --git a/packages/server/postgres/utils/checkEqBase.ts b/packages/server/postgres/utils/checkEqBase.ts index c56da959aa3..20bc9914f43 100644 --- a/packages/server/postgres/utils/checkEqBase.ts +++ b/packages/server/postgres/utils/checkEqBase.ts @@ -1,5 +1,5 @@ +import {RSelection} from 'rethinkdb-ts' import getRethink from '../../database/rethinkDriver' -import {RTable, TableSchema} from '../../database/stricterR' import getPg from '../getPg' interface DBDoc { @@ -33,7 +33,7 @@ export const checkRowCount = async (tableName: string) => { } export async function checkTableEq( - rethinkQuery: RTable, + rethinkQuery: (updatedAt: Date, id: string | number) => RSelection, pgQuery: (ids: string[]) => Promise, equalityMap: Record boolean>, maxErrors = 10 @@ -41,11 +41,17 @@ export async function checkTableEq( const batchSize = 3000 const errors = [] as Diff[] const propsToCheck = Object.keys(equalityMap) - + const r = await getRethink() + let curUpdatedDate = r.minval + let curId = r.minval for (let i = 0; i < 1e6; i++) { - const offset = batchSize * i - const rethinkRows = (await rethinkQuery.skip(offset).limit(batchSize).run()) as RethinkDoc[] - if (!rethinkRows.length) break + const rethinkRows = (await rethinkQuery(curUpdatedDate, curId) + .limit(batchSize) + .run()) as RethinkDoc[] + if (rethinkRows.length === 0) break + const lastRow = rethinkRows[rethinkRows.length - 1]! + curUpdatedDate = lastRow.updatedAt + curId = lastRow.id const ids = rethinkRows.map((t) => t.id) const pgRows = (await pgQuery(ids)) ?? [] const pgRowsById = {} as {[key: string]: PGDoc} diff --git a/packages/server/postgres/utils/rethinkEqualityFns.ts b/packages/server/postgres/utils/rethinkEqualityFns.ts index 8cecaf562c1..69f0ec2bb20 100644 --- a/packages/server/postgres/utils/rethinkEqualityFns.ts +++ b/packages/server/postgres/utils/rethinkEqualityFns.ts @@ -2,6 +2,7 @@ import isValidDate from 'parabol-client/utils/isValidDate' export const defaultEqFn = (a: unknown, b: unknown) => { if (a instanceof Date && b instanceof Date) return a.getTime() === b.getTime() + if (Array.isArray(a) && Array.isArray(b)) return JSON.stringify(a) === JSON.stringify(b) return a === b } export const compareDateAlmostEqual = (rVal: unknown, pgVal: unknown) => { @@ -19,3 +20,10 @@ export const compareRValUndefinedAsFalse = (rVal: unknown, pgVal: unknown) => { const normalizedRVal = rVal === undefined ? false : rVal return normalizedRVal === pgVal } + +export const compareRValUndefinedAsNullAndTruncateRVal = + (length: number) => (rVal: unknown, pgVal: unknown) => { + const truncatedRVal = typeof rVal === 'string' ? rVal.slice(0, length) : rVal + const normalizedRVal = truncatedRVal === undefined ? null : truncatedRVal + return defaultEqFn(normalizedRVal, pgVal) + } diff --git a/packages/server/utils/PubSubPromise.ts b/packages/server/utils/PubSubPromise.ts index 8768ef1eb9d..cd95cbaf325 100644 --- a/packages/server/utils/PubSubPromise.ts +++ b/packages/server/utils/PubSubPromise.ts @@ -5,7 +5,7 @@ import numToBase64 from './numToBase64' import sendToSentry from './sendToSentry' const STANDARD_TIMEOUT = ms('10s') -const ADHOC_TIMEOUT = ms('1m') +const ADHOC_TIMEOUT = ms('10m') interface Job { resolve: (payload: any) => void