Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: migration reflection groups to pg #9514

Merged
merged 7 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 31 additions & 16 deletions packages/server/graphql/private/mutations/checkRethinkPgEquality.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import fs from 'fs'
import path from 'path'
import getRethink from '../../../database/rethinkDriver'
import getMeetingTemplatesByIds from '../../../postgres/queries/getMeetingTemplatesByIds'
import getKysely from '../../../postgres/getKysely'
import {checkRowCount, checkTableEq} from '../../../postgres/utils/checkEqBase'
import {
compareDateAlmostEqual,
compareRValUndefinedAsFalse,
compareRValUndefinedAsNull,
compareRValUndefinedAsNullAndTruncateRVal,
defaultEqFn
} from '../../../postgres/utils/rethinkEqualityFns'
import {MutationResolvers} from '../resolverTypes'
Expand Down Expand Up @@ -35,22 +34,38 @@ const checkRethinkPgEquality: MutationResolvers['checkRethinkPgEquality'] = asyn
) => {
const r = await getRethink()

if (tableName === 'MeetingTemplate') {
if (tableName === 'RetroReflectionGroup') {
const rowCountResult = await checkRowCount(tableName)
const rethinkQuery = r.table('MeetingTemplate').orderBy('updatedAt', {index: 'updatedAt'})
const errors = await checkTableEq(rethinkQuery, getMeetingTemplatesByIds, {
const rethinkQuery = (updatedAt: Date, id: string | number) => {
return r
.table('RetroReflectionGroup')
.between([updatedAt, id], [r.maxval, r.maxval], {
index: 'updatedAtId',
leftBound: 'open',
rightBound: 'closed'
})
.orderBy({index: 'updatedAtId'}) as any
}
const pgQuery = (ids: string[]) => {
return getKysely()
.selectFrom('RetroReflectionGroup')
.selectAll()
.where('id', 'in', ids)
.execute()
}
const errors = await checkTableEq(rethinkQuery, pgQuery, {
id: defaultEqFn,
createdAt: defaultEqFn,
isActive: defaultEqFn,
name: defaultEqFn,
teamId: defaultEqFn,
updatedAt: compareDateAlmostEqual,
scope: defaultEqFn,
orgId: defaultEqFn,
parentTemplateId: compareRValUndefinedAsNull,
lastUsedAt: compareRValUndefinedAsNull,
type: defaultEqFn,
isStarter: compareRValUndefinedAsFalse,
isFree: compareRValUndefinedAsFalse
isActive: defaultEqFn,
meetingId: defaultEqFn,
promptId: defaultEqFn,
sortOrder: defaultEqFn,
voterIds: defaultEqFn,
smartTitle: compareRValUndefinedAsNullAndTruncateRVal(255),
title: compareRValUndefinedAsNullAndTruncateRVal(255),
summary: compareRValUndefinedAsNullAndTruncateRVal(2000),
discussionPromptQuestion: compareRValUndefinedAsNullAndTruncateRVal(2000)
})
return handleResult(tableName, rowCountResult, errors, writeToFile)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ import getPgConfig from '../getPgConfig'
export async function up() {
const client = new Client(getPgConfig())
await client.connect()
await client.query(`
try {
await client.query(`
ALTER TABLE "EmbeddingsMetadata" RENAME COLUMN "embedText" TO "fullText";
`)
} catch {
// noop
}
await client.end()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import {Kysely, PostgresDialect} from 'kysely'
import {r} from 'rethinkdb-ts'
import connectRethinkDB from '../../database/connectRethinkDB'
import getPg from '../getPg'

export async function up() {
await connectRethinkDB()
const pg = new Kysely<any>({
dialect: new PostgresDialect({
pool: getPg()
})
})
try {
await r
.table('RetroReflectionGroup')
.indexCreate('updatedAtId', (row: any) => [row('updatedAt'), row('id')])
.run()
await r.table('RetroReflectionGroup').indexWait().run()
} catch {
// index already exists
}

const MAX_PG_PARAMS = 65545
const PG_COLS = [
'id',
'createdAt',
'updatedAt',
'isActive',
'meetingId',
'promptId',
'sortOrder',
'voterIds',
'smartTitle',
'title',
'summary',
'discussionPromptQuestion'
] as const
type RetroReflectionGroup = {
[K in (typeof PG_COLS)[number]]: any
}
const BATCH_SIZE = Math.trunc(MAX_PG_PARAMS / PG_COLS.length)

let curUpdatedAt = r.minval
let curId = r.minval
for (let i = 0; i < 1e6; i++) {
const rawRowsToInsert = (await r
.table('RetroReflectionGroup')
.between([curUpdatedAt, curId], [r.maxval, r.maxval], {
index: 'updatedAtId',
leftBound: 'open',
rightBound: 'closed'
})
.orderBy({index: 'updatedAtId'})
.limit(BATCH_SIZE)
.pluck(...PG_COLS)
.run()) as RetroReflectionGroup[]

const rowsToInsert = rawRowsToInsert.map((row) => ({
...row,
title: row.title?.slice(0, 255),
smartTitle: row.smartTitle?.slice(0, 255),
summary: row.summary?.slice(0, 2000)
}))
if (rowsToInsert.length === 0) break
const lastRow = rowsToInsert[rowsToInsert.length - 1]
curUpdatedAt = lastRow.updatedAt
curId = lastRow.id
try {
await pg
.insertInto('RetroReflectionGroup')
.values(rowsToInsert)
.onConflict((oc) => oc.doNothing())
.execute()
} catch (e) {
console.log({lastRow}, rowsToInsert.length)
throw e
}
}
}

export async function down() {
await connectRethinkDB()
try {
await r.table('RetroReflectionGroup').indexDrop('updatedAtId').run()
} catch {
// index already dropped
}
}
18 changes: 12 additions & 6 deletions packages/server/postgres/utils/checkEqBase.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {RSelection} from 'rethinkdb-ts'
import getRethink from '../../database/rethinkDriver'
import {RTable, TableSchema} from '../../database/stricterR'
import getPg from '../getPg'

interface DBDoc {
Expand Down Expand Up @@ -33,19 +33,25 @@ export const checkRowCount = async (tableName: string) => {
}

export async function checkTableEq(
rethinkQuery: RTable<TableSchema>,
rethinkQuery: (updatedAt: Date, id: string | number) => RSelection,
pgQuery: (ids: string[]) => Promise<PGDoc[] | null>,
equalityMap: Record<string, (a: unknown, b: unknown) => boolean>,
maxErrors = 10
) {
const batchSize = 3000
const errors = [] as Diff[]
const propsToCheck = Object.keys(equalityMap)

const r = await getRethink()
let curUpdatedDate = r.minval
let curId = r.minval
for (let i = 0; i < 1e6; i++) {
const offset = batchSize * i
const rethinkRows = (await rethinkQuery.skip(offset).limit(batchSize).run()) as RethinkDoc[]
if (!rethinkRows.length) break
const rethinkRows = (await rethinkQuery(curUpdatedDate, curId)
.limit(batchSize)
.run()) as RethinkDoc[]
if (rethinkRows.length === 0) break
const lastRow = rethinkRows[rethinkRows.length - 1]!
curUpdatedDate = lastRow.updatedAt
curId = lastRow.id
const ids = rethinkRows.map((t) => t.id)
const pgRows = (await pgQuery(ids)) ?? []
const pgRowsById = {} as {[key: string]: PGDoc}
Expand Down
8 changes: 8 additions & 0 deletions packages/server/postgres/utils/rethinkEqualityFns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import isValidDate from 'parabol-client/utils/isValidDate'

export const defaultEqFn = (a: unknown, b: unknown) => {
if (a instanceof Date && b instanceof Date) return a.getTime() === b.getTime()
if (Array.isArray(a) && Array.isArray(b)) return JSON.stringify(a) === JSON.stringify(b)
return a === b
}
export const compareDateAlmostEqual = (rVal: unknown, pgVal: unknown) => {
Expand All @@ -19,3 +20,10 @@ export const compareRValUndefinedAsFalse = (rVal: unknown, pgVal: unknown) => {
const normalizedRVal = rVal === undefined ? false : rVal
return normalizedRVal === pgVal
}

export const compareRValUndefinedAsNullAndTruncateRVal =
(length: number) => (rVal: unknown, pgVal: unknown) => {
const truncatedRVal = typeof rVal === 'string' ? rVal.slice(0, length) : rVal
const normalizedRVal = truncatedRVal === undefined ? null : truncatedRVal
return defaultEqFn(normalizedRVal, pgVal)
}
4 changes: 2 additions & 2 deletions packages/server/utils/PubSubPromise.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import ms from 'ms'
import GQLExecutorChannelId from '../../client/shared/gqlIds/GQLExecutorChannelId'
import numToBase64 from './numToBase64'
import RedisInstance from './RedisInstance'
import numToBase64 from './numToBase64'
import sendToSentry from './sendToSentry'

const STANDARD_TIMEOUT = ms('10s')
const ADHOC_TIMEOUT = ms('1m')
const ADHOC_TIMEOUT = ms('10m')

interface Job {
resolve: (payload: any) => void
Expand Down
Loading