Skip to content

Commit

Permalink
feat: parallel worker support
Browse files Browse the repository at this point in the history
  • Loading branch information
asabotovich committed Jan 25, 2024
1 parent aad15dd commit d49769c
Showing 1 changed file with 140 additions and 77 deletions.
217 changes: 140 additions & 77 deletions src/utils/worker/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PrismaClient } from '@prisma/client';
import { PrismaClient, Job, Prisma } from '@prisma/client';
import * as Sentry from '@sentry/nextjs';
import parser from 'cron-parser';

Expand All @@ -10,95 +10,158 @@ const queueInterval = process.env.WORKER_JOBS_INTERVAL ? parseInt(process.env.WO
const retryLimit = process.env.WORKER_JOBS_RETRY ? parseInt(process.env.WORKER_JOBS_RETRY, 10) : 3;

// eslint-disable-next-line no-console
console.log('Worker started successfully');
const log = (...rest: unknown[]) => console.log('[WORKER]:', ...rest);

(() =>
setInterval(async () => {
try {
const jobs = await prisma.job.findMany({
orderBy: {
priority: 'desc',
},
});

if (jobs.length > 300) {
Sentry.captureMessage('Queue too long. Smth went wrong.');
log('Worker started successfully');

const getNextJob = async (state: jobState, exclude: string[]) => {
// get first job with state
// update Status to pending
// lock before updating

const [job] = (await prisma.$queryRaw(Prisma.sql`
WITH cte AS (
SELECT *
FROM "Job"
WHERE "state" = ${state} ${
exclude.length ? Prisma.sql`AND "id" NOT IN (${Prisma.join(exclude)})` : Prisma.empty
}
ORDER BY "priority" DESC
LIMIT 1
FOR UPDATE
SKIP LOCKED
)
UPDATE "Job" job
SET "state" = ${jobState.pending}
FROM cte
WHERE job.id = cte.id
RETURNING *
`)) as Job[];

return job || null;
};

const iterateJobQueue = async (state: jobState, cb: (job: Job) => Promise<void>): Promise<number> => {
const watchedIds: string[] = [];

while (true) {
// eslint-disable-next-line no-await-in-loop
const job = await getNextJob(state, watchedIds);

if (!job) {
break;
}

watchedIds.push(job.id);
// eslint-disable-next-line no-await-in-loop
await cb(job);
}

return watchedIds.length;
};

const worker = async () => {
try {
const completedCount = await iterateJobQueue(jobState.completed, async (job) => {
setTimeout(async () => {
if (job.cron) {
log(`plan cron ${job.id}`);
await prisma.job.update({
where: { id: job.id },
data: {
state: jobState.scheduled,
},
});
} else {
log(`delete job ${job.id}`);
await prisma.job.delete({ where: { id: job.id } });
}
}, 0);
});

const scheduledCount = await iterateJobQueue(jobState.scheduled, async (job) => {
const planJob = () =>
prisma.job.update({
where: { id: job.id },
data: {
state: jobState.scheduled,
},
});

if (job.cron) {
const interval = parser.parseExpression(job.cron, {
currentDate: new Date(job.updatedAt),
});

if (Number(interval.next().toDate()) > Date.now() && !job.force) {
await planJob();

return;
}
}

jobs.forEach(async (job) => {
if (job.state === jobState.completed) {
setTimeout(async () => {
if (job.cron) {
if (job.delay && Date.now() - new Date(job.createdAt).valueOf() < job.delay) {
await planJob();

return;
}

setTimeout(async () => {
try {
log(`resolve job ${job.kind} ${job.id}`);

await resolve[job.kind as jobKind](job.data as any);
await prisma.job.update({
where: { id: job.id },
data: { state: jobState.completed, runs: { increment: 1 }, force: false },
});
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (error: any) {
if (job.retry !== retryLimit) {
const retry = job.retry ? job.retry + 1 : 1;

log(`error job ${job.id}`, error);
log(`retry job ${job.id}`);

setTimeout(async () => {
await prisma.job.update({
where: { id: job.id },
data: {
state: jobState.scheduled,
error: error?.message,
retry,
delay: defaultJobDelay * retry,
},
});
} else {
await prisma.job.delete({ where: { id: job.id } });
}
}, 0);
}

if (job.state === jobState.scheduled) {
if (job.cron) {
const interval = parser.parseExpression(job.cron, {
currentDate: new Date(job.updatedAt),
}, 0);
} else {
Sentry.captureException(error, {
fingerprint: ['worker', 'resolve', 'retry'],
extra: {
job,
},
});

if (Number(interval.next().toDate()) > Date.now() && !job.force) {
return;
}
}
log(`delete job ${job.id} after ${retryLimit} retries`);

if (job.delay && Date.now() - new Date(job.createdAt).valueOf() < job.delay) {
return;
await prisma.job.delete({ where: { id: job.id } });
}

setTimeout(async () => {
await prisma.job.update({ where: { id: job.id }, data: { state: jobState.pending } });
}, 0);

setTimeout(async () => {
try {
await resolve[job.kind as jobKind](job.data as any);
await prisma.job.update({
where: { id: job.id },
data: { state: jobState.completed, runs: { increment: 1 }, force: false },
});
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (error: any) {
if (job.retry !== retryLimit) {
const retry = job.retry ? job.retry + 1 : 1;
setTimeout(async () => {
await prisma.job.update({
where: { id: job.id },
data: {
state: jobState.scheduled,
error: error?.message,
retry,
delay: defaultJobDelay * retry,
},
});
}, 0);
} else {
Sentry.captureException(error, {
fingerprint: ['worker', 'resolve', 'retry'],
extra: {
job,
},
});

await prisma.job.delete({ where: { id: job.id } });
}
}
}, 0);
}
});
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (error: any) {
// eslint-disable-next-line no-console
console.error(error.message);
}, 0);
});

if (completedCount + scheduledCount > 300) {
Sentry.captureMessage('Queue too long. Smth went wrong.');
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (error: any) {
// eslint-disable-next-line no-console
console.error(error.message);
}
};

(() =>
setInterval(async () => {
await worker();
}, queueInterval))();

0 comments on commit d49769c

Please sign in to comment.