Skip to content

Commit

Permalink
chore: migrate ScheduledJob from rethinkdb to pg
Browse files Browse the repository at this point in the history
  • Loading branch information
jordanh committed Mar 1, 2024
1 parent 4e2e2ca commit 4cd113e
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 36 deletions.
16 changes: 9 additions & 7 deletions packages/server/billing/helpers/removeTeamsLimitObjects.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
import getKysely from '../../postgres/getKysely'
import {DB} from '../../postgres/pg'
import {r} from 'rethinkdb-ts'
import {RValue} from '../../database/stricterR'
import {DataLoaderWorker} from '../../graphql/graphql'
import updateNotification from '../../graphql/public/mutations/helpers/updateNotification'

const removeTeamsLimitObjects = async (orgId: string, dataLoader: DataLoaderWorker) => {
const removeJobTypes = ['LOCK_ORGANIZATION', 'WARN_ORGANIZATION']
const removeJobTypes = ['LOCK_ORGANIZATION', 'WARN_ORGANIZATION'] as DB['ScheduledJob']['type'][]
const removeNotificationTypes = ['TEAMS_LIMIT_EXCEEDED', 'TEAMS_LIMIT_REMINDER']
const pg = getKysely()

// Remove team limits jobs and existing notifications
const [, updateNotificationsChanges] = await Promise.all([
r
.table('ScheduledJob')
.getAll(orgId, {index: 'orgId'})
.filter((row: RValue) => r.expr(removeJobTypes).contains(row('type')))
.delete()
.run(),
pg
.deleteFrom('ScheduledJob')
.where('orgId', '=', orgId)
.where('type', 'in', removeJobTypes)
.execute(),
r
.table('Notification')
.getAll(orgId, {index: 'orgId'})
Expand Down
3 changes: 0 additions & 3 deletions packages/server/database/types/ScheduledJob.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
import generateUID from '../../generateUID'

export type ScheduledJobType =
| 'MEETING_STAGE_TIME_LIMIT_END'
| 'LOCK_ORGANIZATION'
| 'WARN_ORGANIZATION'

export default abstract class ScheduledJob {
id = generateUID()
protected constructor(public type: ScheduledJobType, public runAt: Date) {}
}
20 changes: 11 additions & 9 deletions packages/server/database/types/scheduleTeamLimitsJobs.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
import ms from 'ms'
import {r} from 'rethinkdb-ts'
import getKysely from '../../postgres/getKysely'
import {Threshold} from '../../../client/types/constEnums'
import ScheduledTeamLimitsJob from './ScheduledTeamLimitsJob'

const scheduleTeamLimitsJobs = async (scheduledLockAt: Date, orgId: string) => {
const scheduledLock = r
.table('ScheduledJob')
.insert(new ScheduledTeamLimitsJob(scheduledLockAt, orgId, 'LOCK_ORGANIZATION'))
.run()
const pg = getKysely()
const scheduledLock = pg
.insertInto('ScheduledJob')
.values(new ScheduledTeamLimitsJob(scheduledLockAt, orgId, 'LOCK_ORGANIZATION'))
.execute()

const oneWeekBeforeLock = new Date(
scheduledLockAt.getTime() - ms(`${Threshold.FINAL_WARNING_DAYS_BEFORE_LOCK}d`)
)
const scheduledWarn = r
.table('ScheduledJob')
.insert(new ScheduledTeamLimitsJob(oneWeekBeforeLock, orgId, 'WARN_ORGANIZATION'))
.run()

const scheduledWarn = pg
.insertInto('ScheduledJob')
.values(new ScheduledTeamLimitsJob(oneWeekBeforeLock, orgId, 'WARN_ORGANIZATION'))
.execute()

await Promise.all([scheduledLock, scheduledWarn])
}
Expand Down
19 changes: 15 additions & 4 deletions packages/server/graphql/mutations/helpers/removeScheduledJobs.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
import getRethink from '../../../database/rethinkDriver'
import {Updateable} from 'kysely'
import {DB} from '../../../postgres/pg'
import getKysely from '../../../postgres/getKysely'

const removeScheduledJobs = async (runAt: Date, filter: {[key: string]: any}) => {
const r = await getRethink()
return r.table('ScheduledJob').getAll(runAt, {index: 'runAt'}).filter(filter).delete().run()
type FilterType = Omit<Updateable<DB['ScheduledJob']>, 'runAt'>

const removeScheduledJobs = async (runAt: Date, filter?: FilterType) => {
const pg = getKysely()
let query = pg.deleteFrom('ScheduledJob').where('runAt', '=', runAt)
if (filter) {
Object.keys(filter).forEach((key) => {
const value = filter[key as keyof FilterType]
if (value) query = query.where(key as keyof FilterType, '=', value)
})
}
return query.execute()
}

export default removeScheduledJobs
10 changes: 6 additions & 4 deletions packages/server/graphql/mutations/setStageTimer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {GraphQLFloat, GraphQLID, GraphQLNonNull} from 'graphql'
import {SubscriptionChannel} from 'parabol-client/types/constEnums'
import findStageById from 'parabol-client/utils/meetings/findStageById'
import getKysely from '../../postgres/getKysely'
import getRethink from '../../database/rethinkDriver'
import ScheduledJobMeetingStageTimeLimit from '../../database/types/ScheduledJobMetingStageTimeLimit'
import {getUserId, isTeamMember} from '../../utils/authorization'
Expand Down Expand Up @@ -90,12 +91,13 @@ export default {
? new Date(now.getTime() + timeRemaining - AVG_PING)
: newScheduledEndTime
} else {
const pg = getKysely()
stage.isAsync = true
stage.scheduledEndTime = newScheduledEndTime
await r
.table('ScheduledJob')
.insert(new ScheduledJobMeetingStageTimeLimit(newScheduledEndTime, meetingId))
.run()
await pg
.insertInto('ScheduledJob')
.values(new ScheduledJobMeetingStageTimeLimit(newScheduledEndTime, meetingId))
.execute()
IntegrationNotifier.startTimeLimit(dataLoader, newScheduledEndTime, meetingId, teamId)
}
} else {
Expand Down
23 changes: 14 additions & 9 deletions packages/server/graphql/private/mutations/runScheduledJobs.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import {Selectable} from 'kysely'
import {SubscriptionChannel} from 'parabol-client/types/constEnums'
import getKysely from '../../../postgres/getKysely'
import {DB} from '../../../postgres/pg'
import getRethink from '../../../database/rethinkDriver'
import NotificationMeetingStageTimeLimitEnd from '../../../database/types/NotificationMeetingStageTimeLimitEnd'
import processTeamsLimitsJob from '../../../database/types/processTeamsLimitsJob'
Expand Down Expand Up @@ -39,11 +42,11 @@ const processMeetingStageTimeLimits = async (

export type ScheduledJobUnion = ScheduledJobMeetingStageTimeLimit | ScheduledTeamLimitsJob

const processJob = async (job: ScheduledJobUnion, dataLoader: DataLoaderWorker) => {
const r = await getRethink()
const res = await r.table('ScheduledJob').get(job.id).delete().run()
const processJob = async (job: Selectable<DB['ScheduledJob']>, dataLoader: DataLoaderWorker) => {
const pg = getKysely()
const res = await pg.deleteFrom('ScheduledJob').where('id', '=', job.id).executeTakeFirst()
// prevent duplicates. after this point, we assume the job finishes to completion (ignores server crashes, etc.)
if (res.deleted !== 1) return
if (res.numDeletedRows !== BigInt(1)) return

if (job.type === 'MEETING_STAGE_TIME_LIMIT_END') {
return processMeetingStageTimeLimits(
Expand All @@ -60,15 +63,17 @@ const runScheduledJobs: MutationResolvers['runScheduledJobs'] = async (
{seconds},
{dataLoader}
) => {
const r = await getRethink()
const pg = getKysely()
const now = new Date()

// RESOLUTION
const before = new Date(now.getTime() + seconds * 1000)
const upcomingJobs = (await r
.table('ScheduledJob')
.between(r.minval, before, {index: 'runAt'})
.run()) as ScheduledJobUnion[]
const upcomingJobs = await pg
.selectFrom('ScheduledJob')
.selectAll()
.where('runAt', '>=', new Date(0))
.where('runAt', '<', before)
.execute()

upcomingJobs.forEach((job) => {
const {runAt} = job
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import {Client} from 'pg'
import getPgConfig from '../getPgConfig'

export async function up() {
const client = new Client(getPgConfig())
await client.connect()
await client.query(`
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'ScheduledJobTypeEnum') THEN
EXECUTE 'CREATE TYPE "ScheduledJobTypeEnum" AS ENUM (''MEETING_STAGE_TIME_LIMIT_END'', ''LOCK_ORGANIZATION'', ''WARN_ORGANIZATION'')';
END IF;
END $$;
CREATE TABLE "ScheduledJob" (
"id" SERIAL PRIMARY KEY,
"runAt" TIMESTAMP WITH TIME ZONE DEFAULT NOW() NOT NULL,
"type" "ScheduledJobTypeEnum" NOT NULL,
"orgId" VARCHAR(100),
"meetingId" VARCHAR(100)
);
CREATE INDEX IF NOT EXISTS "idx_ScheduledJob_orgId" ON "ScheduledJob"("orgId");
CREATE INDEX IF NOT EXISTS "idx_ScheduledJob_runAt" ON "ScheduledJob"("runAt");
CREATE INDEX IF NOT EXISTS "idx_ScheduledJob_type" ON "ScheduledJob"("type");
`)
await client.end()
}

export async function down() {
const client = new Client(getPgConfig())
await client.connect()
await client.query(`
DROP TABLE IF EXISTS "ScheduledJob";
DROP TYPE IF EXISTS "ScheduledJobTypeEnum";
`)
await client.end()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import {FirstParam} from 'parabol-client/types/generics'
import {Client} from 'pg'
import {r} from 'rethinkdb-ts'
import getPgConfig from '../getPgConfig'
import connectRethinkDB from '../../database/connectRethinkDB'
import getPgp from '../getPgp'

export async function up() {
await connectRethinkDB()
const {pgp, pg} = getPgp()
const batchSize = 1000

const columnSet = new pgp.helpers.ColumnSet(
['runAt', 'type', {name: 'orgId', def: null}, {name: 'meetingId', def: null}],
{table: 'ScheduledJob'}
)

const transformRethinkRow = (row: any) => {
const {runAt, type, orgId, meetingId} = row
return {
runAt,
type,
orgId,
meetingId
}
}

const getNextData = async (leftBoundCursor: Date | undefined) => {
const startAt = leftBoundCursor || r.minval
const nextBatch = (
await r
.table('ScheduledJob')
.between(startAt, r.maxval, {index: 'runAt', leftBound: 'open'})
.orderBy({index: 'runAt'})
.limit(batchSize)
.run()
).map(transformRethinkRow)
if (nextBatch.length === 0) return null
if (nextBatch.length < batchSize) return nextBatch
const lastItem = nextBatch.pop()
const lastMatchingRunAt = nextBatch.findLastIndex((item) => item.runAt !== lastItem!.runAt)
if (lastMatchingRunAt === -1) {
throw new Error(
'batchSize is smaller than the number of items that share the same cursor. Increase batchSize'
)
}
return nextBatch.slice(0, lastMatchingRunAt + 1)
}

await pg.tx('ScheduledJob', (task) => {
const fetchAndProcess: FirstParam<typeof task.sequence> = async (
_index,
leftBoundCursor: undefined | Date
) => {
const nextData = await getNextData(leftBoundCursor)
if (!nextData) return undefined
const insert = pgp.helpers.insert(nextData, columnSet)
await task.none(insert)
return nextData.at(-1)!.runAt
}
return task.sequence(fetchAndProcess)
})
await r.getPoolMaster()?.drain()
}

export async function down() {
const client = new Client(getPgConfig())
await client.connect()
await client.query(`DELETE FROM "ScheduledJob"`)
await client.end()
}

0 comments on commit 4cd113e

Please sign in to comment.