Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduled jobs support #537

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
946 changes: 694 additions & 252 deletions package-lock.json

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@
"version": "0.27.0",
"private": true,
"scripts": {
"dev": "next dev",
"build": "next build",
"dev": "concurrently -k \"npm run dev:worker\" \"next dev\"",
"build": "concurrently \"next build\" \"npm run build:worker\"",
"build:worker": "tsc -p ./src/utils/worker",
"dev:worker": "tsx -r dotenv/config src/utils/worker/index.ts",
"build:styles": "npx sass --no-source-map ./src/styles/react-big-calendar/styles.scss ./react-big-calendar.css",
"start": "next start",
"lint": "next lint && eslint --ext .js,.ts,.tsx .",
Expand All @@ -18,7 +20,7 @@
"type-check": "npx tsc"
},
"prisma": {
"seed": "ts-node --files --compiler-options {\"module\":\"CommonJS\"} prisma/seed.ts"
"seed": "tsx -r dotenv/config prisma/seed.ts"
},
"dependencies": {
"@aws-sdk/client-s3": "3.451.0",
Expand All @@ -39,6 +41,7 @@
"body-parser": "1.20.2",
"color-contrast-checker": "2.1.0",
"concurrently": "8.2.2",
"cron-parser": "4.9.0",
"date-fns": "2.30.0",
"easy-typed-intl": "1.0.3",
"formidable": "2.1.1",
Expand Down Expand Up @@ -102,6 +105,7 @@
"@welldone-software/why-did-you-render": "8.0.1",
"babel-eslint": "10.1.0",
"babel-loader": "9.1.2",
"dotenv": "16.4.1",
"eslint": "8.48.0",
"eslint-config-airbnb": "19.0.4",
"eslint-config-next": "14.1.0",
Expand All @@ -116,7 +120,7 @@
"lint-staged": "14.0.1",
"prettier": "2.8.8",
"prisma": "5.2.0",
"ts-node": "10.9.2",
"tsx": "4.7.0",
"typescript": "5.3.3",
"webpack": "5.90.1"
}
Expand Down
18 changes: 18 additions & 0 deletions prisma/migrations/20240202101116_jobs/migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
-- CreateTable
CREATE TABLE "Job" (
"id" TEXT NOT NULL,
"state" TEXT NOT NULL,
"priority" INTEGER NOT NULL DEFAULT 0,
"kind" TEXT NOT NULL,
"data" JSONB NOT NULL,
"delay" INTEGER,
"retry" INTEGER,
"runs" INTEGER NOT NULL DEFAULT 0,
"force" BOOLEAN NOT NULL DEFAULT false,
"cron" TEXT,
"error" TEXT,
"createdAt" TIMESTAMP NOT NULL DEFAULT timezone('utc'::text, now()),
"updatedAt" TIMESTAMP(3) NOT NULL DEFAULT timezone('utc'::text, now()),

CONSTRAINT "Job_pkey" PRIMARY KEY ("id")
);
17 changes: 17 additions & 0 deletions prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -453,3 +453,20 @@ model ApiToken {
id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
description String
}

model Job {
id String @id @default(cuid())
state String
priority Int @default(0)
kind String
data Json
delay Int?
retry Int?
runs Int @default(0)
force Boolean @default(false)
cron String?
error String?

createdAt DateTime @default(dbgenerated("timezone('utc'::text, now())")) @db.Timestamp()
updatedAt DateTime @default(dbgenerated("timezone('utc'::text, now())")) @updatedAt
}
5 changes: 5 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ export default {
authUser: process.env.MAIL_USER,
enabled: process.env.MAIL_ENABLE,
},
worker: {
queueInterval: process.env.WORKER_JOBS_INTERVAL,
retryLimit: process.env.WORKER_JOBS_RETRY,
workerJobsDelay: process.env.WORKER_JOBS_DELAY,
},
pluginMenuItems: parsePluginMenuItems(process.env.NEXT_PUBLIC_PLUGIN_MENU_ITEMS),
debugCookieEnabled: process.env.DEBUG_COOKIE_ENABLE,
nextAuthEnabled: process.env.NEXT_AUTH_ENABLE,
Expand Down
26 changes: 13 additions & 13 deletions src/modules/calendarEventMethods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { RRule } from 'rrule';
import { prisma } from '../utils/prisma';
import { calendarEvents, createIcalEventData } from '../utils/ical';
import { userOfEvent } from '../utils/calendar';
import { createEmailJob } from '../utils/worker/create';

import { calendarRecurrenceMethods } from './calendarRecurrenceMethods';
import { calendarMethods } from './calendarMethods';
Expand All @@ -18,7 +19,6 @@ import {
UpdateCalendarEvent,
UpdateCalendarException,
} from './calendarTypes';
import { sendMail } from './nodemailer';

async function createEvent(params: CreateCalendarEvent, user: User): Promise<CalendarEventCreateResult> {
const { date, title, duration, description = '', recurrence } = params;
Expand Down Expand Up @@ -46,7 +46,7 @@ async function createEvent(params: CreateCalendarEvent, user: User): Promise<Cal
rule: rRule.options.freq,
});

await sendMail({
await createEmailJob({
to: user.email,
subject: title,
text: '',
Expand Down Expand Up @@ -124,7 +124,7 @@ async function updateEventSeries(params: UpdateCalendarEvent, user: User): Promi
sequence: event.sequence + 1,
});

await sendMail({
await createEmailJob({
to: user.email,
subject: event.eventDetails.title,
text: '',
Expand Down Expand Up @@ -246,7 +246,7 @@ async function splitEventSeries(params: UpdateCalendarEvent, user: User): Promis
until: newRRule.options.until || undefined,
});

await sendMail({
await createEmailJob({
to: user.email,
subject: oldEvent.eventDetails.title,
text: '',
Expand All @@ -255,7 +255,7 @@ async function splitEventSeries(params: UpdateCalendarEvent, user: User): Promis
events: [icalEventDataOldEvent],
}),
});
await sendMail({
await createEmailJob({
to: user.email,
subject: newEvent.eventDetails.title,
text: '',
Expand Down Expand Up @@ -323,7 +323,7 @@ async function createEventException(params: UpdateCalendarEvent, user: User): Pr
summary: title ?? eventDetails.title,
});

await sendMail({
await createEmailJob({
to: user.email,
subject: eventDetails.title,
text: '',
Expand All @@ -333,7 +333,7 @@ async function createEventException(params: UpdateCalendarEvent, user: User): Pr
}),
});

await sendMail({
await createEmailJob({
to: user.email,
subject: title ?? eventDetails.title,
text: '',
Expand Down Expand Up @@ -373,7 +373,7 @@ async function updateEventException(params: UpdateCalendarException, user: User)
sequence: eventException.sequence,
});

await sendMail({
await createEmailJob({
to: user.email,
subject: title ?? eventDetails.title,
text: '',
Expand Down Expand Up @@ -432,7 +432,7 @@ async function stopEventSeries(eventId: string, originalDate: Date, user: User):
sequence,
});

await sendMail({
await createEmailJob({
to: user.email,
subject: eventDetails.title,
text: '',
Expand Down Expand Up @@ -466,7 +466,7 @@ async function cancelEventException(eventId: string, exceptionId: string, user:
status: ICalEventStatus.CANCELLED,
});

await sendMail({
await createEmailJob({
to: user.email,
subject: restException.eventDetails.title,
text: '',
Expand Down Expand Up @@ -518,7 +518,7 @@ async function createEventCancellation(eventId: string, originalDate: Date, user
status: ICalEventStatus.CANCELLED,
});

await sendMail({
await createEmailJob({
to: user.email,
subject: eventDetails.title,
text: '',
Expand All @@ -528,7 +528,7 @@ async function createEventCancellation(eventId: string, originalDate: Date, user
}),
});

await sendMail({
await createEmailJob({
to: user.email,
subject: eventDetails.title,
text: '',
Expand Down Expand Up @@ -556,7 +556,7 @@ async function removeEventSeries(eventId: string, user: User): Promise<void> {
sequence,
});

await sendMail({
await createEmailJob({
to: creator?.email || user.email,
subject: eventDetails.title,
text: '',
Expand Down
10 changes: 5 additions & 5 deletions src/modules/emailMethods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import { formatDateToLocaleString } from '../utils/date';
import { Paths, generatePath } from '../utils/paths';
import { calendarEvents, createIcalEventData } from '../utils/ical';
import { userOfEvent } from '../utils/calendar';
import { createEmailJob } from '../utils/worker/create';

import { UpdateSection } from './sectionTypes';
import { tr } from './modules.i18n';
import { sendMail } from './nodemailer';
import { calendarMethods } from './calendarMethods';

export const notifyHR = async (id: number, data: UpdateSection) => {
Expand All @@ -28,7 +28,7 @@ export const notifyHR = async (id: number, data: UpdateSection) => {
});
if (section?.interview?.creator?.email) {
// TODO: add localization after https://github.com/taskany-inc/hire/issues/191
return sendMail({
return createEmailJob({
to: section?.interview?.creator?.email,
subject: `Interviewer ${section.interviewer.name || ''} left feedback for the section with ${
section.interview.candidate.name
Expand Down Expand Up @@ -90,7 +90,7 @@ export const cancelSectionEmail = async (sectionId: number) => {
cancelEmailSubject = restException.eventDetails.title;
}

return sendMail({
return createEmailJob({
to: section.interviewer.email,
subject: cancelEmailSubject,
text: `${tr('Canceled section with')} ${section.interview.candidate.name} ${date}
Expand Down Expand Up @@ -163,7 +163,7 @@ export const assignSectionEmail = async (
sequence,
});

await sendMail({
await createEmailJob({
to: interviewer.email,
subject: eventDetails.title,
text: '',
Expand All @@ -174,7 +174,7 @@ export const assignSectionEmail = async (
});
}

await sendMail({
await createEmailJob({
to: interviewer.email,
subject: `${tr('Interview with')} ${candidateName}`,
text: `${config.defaultPageURL}${generatePath(Paths.SECTION, {
Expand Down
46 changes: 46 additions & 0 deletions src/utils/worker/create.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { prisma } from '../prisma';
import { MessageBody } from '../../modules/nodemailer';
import config from '../../config';

export const defaultJobDelay = config.worker.workerJobsDelay ? parseInt(config.worker.workerJobsDelay, 10) : 1000;

export enum jobState {
scheduled = 'scheduled',
pending = 'pending',
completed = 'completed',
}

export interface JobDataMap {
email: {
data: MessageBody;
};
}

export type JobKind = keyof JobDataMap;

interface CreateJobProps<K extends keyof JobDataMap> {
data: JobDataMap[K];
priority?: number;
delay?: number;
cron?: string;
}

export function createJob<K extends keyof JobDataMap>(
kind: K,
{ data, priority, delay = defaultJobDelay, cron }: CreateJobProps<K>,
) {
return prisma.job.create({
data: {
state: jobState.scheduled,
data,
kind,
priority,
delay,
cron,
},
});
}

export function createEmailJob(data: MessageBody) {
return createJob('email', { data: { data } });
}
48 changes: 48 additions & 0 deletions src/utils/worker/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import * as Sentry from '@sentry/nextjs';

import config from '../../config';

import { worker, Job } from './worker';
import { defaultJobDelay } from './create';
import * as resolve from './resolve';
import { getNextJob, jobDelete, jobUpdate } from './jobOperations';

const queueInterval = config.worker.queueInterval ? parseInt(config.worker.queueInterval, 10) : 3000;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be parsed in config

const retryLimit = config.worker.retryLimit ? parseInt(config.worker.retryLimit, 10) : 3;

// eslint-disable-next-line no-console
const log = (...rest: unknown[]) => console.log('[WORKER]:', ...rest);

log('Worker started successfully');

const onRetryLimitExeed = (error: any, job: Job) =>
Sentry.captureException(error, {
fingerprint: ['worker', 'resolve', 'retry'],
extra: {
job,
},
});

const onQueeTooLong = () => Sentry.captureMessage('Queue too long. Smth went wrong.');

// eslint-disable-next-line no-console
const onError = (error: any) => console.log('onerror', error.message);

const init = () =>
worker(
getNextJob,
jobUpdate,
jobDelete,
resolve,
onRetryLimitExeed,
onQueeTooLong,
log,
onError,
defaultJobDelay,
retryLimit,
);

(() =>
setInterval(async () => {
await init();
}, queueInterval))();
Loading
Loading