diff --git a/packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts b/packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts index 05aafbdfc7..607b7d987b 100644 --- a/packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts +++ b/packages/job-queue-plugin/src/bullmq/bullmq-job-queue-strategy.ts @@ -10,13 +10,22 @@ import { Logger, PaginatedList, } from '@vendure/core'; -import Bull, { ConnectionOptions, JobType, Processor, Queue, Worker, WorkerOptions } from 'bullmq'; +import Bull, { + ConnectionOptions, + JobType, + Processor, + Queue, + Worker, + WorkerOptions, + Job as BullJob, +} from 'bullmq'; import { EventEmitter } from 'events'; import { Cluster, Redis, RedisOptions } from 'ioredis'; import { ALL_JOB_TYPES, BULLMQ_PLUGIN_OPTIONS, loggerCtx } from './constants'; import { RedisHealthIndicator } from './redis-health-indicator'; -import { BullMQPluginOptions } from './types'; +import { getJobsByType } from './scripts/get-jobs-by-type'; +import { BullMQPluginOptions, CustomScriptDefinition } from './types'; const QUEUE_NAME = 'vendure-job-queue'; const DEFAULT_CONCURRENCY = 3; @@ -53,6 +62,8 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy { ? this.connectionOptions : new Redis(this.connectionOptions); + this.defineCustomLuaScripts(); + const redisHealthIndicator = injector.get(RedisHealthIndicator); Logger.info('Checking Redis connection...', loggerCtx); const health = await redisHealthIndicator.isHealthy('redis'); @@ -137,8 +148,8 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy { } async findMany(options?: JobListOptions): Promise> { - const start = options?.skip ?? 0; - const end = start + (options?.take ?? 10); + const skip = options?.skip ?? 0; + const take = options?.take ?? 10; let jobTypes: JobType[] = ALL_JOB_TYPES; const stateFilter = options?.filter?.state; if (stateFilter?.eq) { @@ -170,26 +181,31 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy { ? ['completed', 'failed'] : ['wait', 'waiting-children', 'active', 'repeat', 'delayed', 'paused']; } + let items: Bull.Job[] = []; - let jobCounts: { [index: string]: number } = {}; - try { - items = await this.queue.getJobs(jobTypes, start, end); - } catch (e: any) { - Logger.error(e.message, loggerCtx, e.stack); - } + let totalItems = 0; + try { - jobCounts = await this.queue.getJobCounts(...jobTypes); + const [total, jobIds] = await this.callCustomScript(getJobsByType, [ + skip, + take, + options?.filter?.queueName?.eq ?? '', + ...jobTypes, + ]); + items = ( + await Promise.all( + jobIds.map(id => { + return BullJob.fromId(this.queue, id); + }), + ) + ).filter(notNullOrUndefined); + totalItems = total; } catch (e: any) { - Logger.error(e.message, loggerCtx, e.stack); + throw new InternalServerError(e.message); } - const totalItems = Object.values(jobCounts).reduce((sum, count) => sum + count, 0); return { - items: await Promise.all( - items - .sort((a, b) => b.timestamp - a.timestamp) - .map(bullJob => this.createVendureJob(bullJob)), - ), + items: await Promise.all(items.map(bullJob => this.createVendureJob(bullJob))), totalItems, }; } @@ -252,6 +268,7 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy { } private stopped = false; + async stop = object>( queueName: string, process: (job: Job) => Promise, @@ -308,4 +325,31 @@ export class BullMQJobQueueStrategy implements InspectableJobQueueStrategy { throw new InternalServerError('Could not determine job state'); // TODO: how to handle "cancelled" state? Currently when we cancel a job, we simply remove all record of it. } + + private callCustomScript( + scriptDef: CustomScriptDefinition, + args: Args, + ): Promise { + return new Promise((resolve, reject) => { + (this.redisConnection as any)[scriptDef.name]( + `bull:${this.queue.name}:`, + ...args, + (err: any, result: any) => { + if (err) { + reject(err); + } else { + resolve(result); + } + }, + ); + }); + } + + private defineCustomLuaScripts() { + const redis = this.redisConnection; + redis.defineCommand(getJobsByType.name, { + numberOfKeys: getJobsByType.numberOfKeys, + lua: getJobsByType.script, + }); + } } diff --git a/packages/job-queue-plugin/src/bullmq/scripts/get-jobs-by-type.ts b/packages/job-queue-plugin/src/bullmq/scripts/get-jobs-by-type.ts new file mode 100644 index 0000000000..862d53e48d --- /dev/null +++ b/packages/job-queue-plugin/src/bullmq/scripts/get-jobs-by-type.ts @@ -0,0 +1,117 @@ +// language=Lua +import { CustomScriptDefinition } from '../types'; + +const script = `--[[ + Get job ids per provided states and filter by name + Input: + KEYS[1] 'prefix' + ARGV[1] start + ARGV[2] end + ARGV[3] filterName + ARGV[4...] types +]] +local rcall = redis.call +local prefix = KEYS[1] +local rangeStart = tonumber(ARGV[1]) +local rangeEnd = tonumber(ARGV[2]) +local filterName = ARGV[3] +local results = {} + +local targetSets = {} + +-- Initialize an empty array to hold the sets to unionize. The "completed" and "failed" lists +-- are sorted sets +local setsToUnionize = {} +local typesInUnion = {} + +-- Initialize an empty array to hold lists to include. The "active" and "wait" lists are +-- regular lists +local listsToInclude = {} + + +-- Iterate through ARGV starting from the first element (ARGV[1]) up to the end +for i = 4, #ARGV do + local setKey = prefix .. ARGV[i] + + -- Check if the setKey is valid (e.g., it exists and is a sorted set) + local targetExists = redis.call('EXISTS', setKey) + local listType = redis.call('TYPE', setKey).ok + + if targetExists == 1 and listType == 'zset' then + -- Add the valid set to the array + table.insert(setsToUnionize, setKey) + table.insert(typesInUnion, ARGV[i]) + end + if targetExists == 1 and listType == 'list' then + -- Add the valid set to the array + table.insert(listsToInclude, setKey) + table.insert(typesInUnion, ARGV[i]) + end +end + +-- Define the destination key for the concatenated sorted set +local tempSortedSetUnionKey = prefix .. 'union:' .. table.concat(typesInUnion, ':'); + +if #listsToInclude == 0 and #setsToUnionize == 0 then + return {0, {}} +end + +-- Check if there are valid sets to unionize +if #setsToUnionize > 0 then + -- Use ZUNIONSTORE to concatenate the valid sorted sets into the destination key + local numSets = #setsToUnionize + redis.call('ZUNIONSTORE', tempSortedSetUnionKey, numSets, unpack(setsToUnionize)) +end + +local originalResults = rcall("ZREVRANGE", tempSortedSetUnionKey, 0, -1) + + +if #listsToInclude > 0 then + for _, listKey in ipairs(listsToInclude) do + local list = rcall("LRANGE", listKey, 0, -1) + for _, jobId in ipairs(list) do + table.insert(originalResults, jobId) + end + end +end + + +-- Define a custom comparison function for sorting in descending order +local function compareDescending(a, b) + return tonumber(a) > tonumber(b) +end + +-- Sort the table in descending order +table.sort(originalResults, compareDescending) + +local filteredResults = {} +local totalResults = 0 + +for _, job in ipairs(originalResults) do + local jobName = rcall("HGET", prefix .. job, "name"); + if filterName ~= "" and jobName == filterName then + if rangeStart <= totalResults and #filteredResults < rangeEnd then + table.insert(filteredResults, job) + end + totalResults = totalResults + 1 + elseif filterName == "" then + if rangeStart <= totalResults and #filteredResults < rangeEnd then + table.insert(filteredResults, job) + end + totalResults = totalResults + 1 + end +end + +rcall("DEL", tempSortedSetUnionKey) + +return {totalResults, filteredResults} +`; + +export const getJobsByType: CustomScriptDefinition< + [totalItems: number, jobIds: string[]], + [rangeStart: number, rangeEnd: number, queueName: string | undefined, ...states: string[]] +> = { + script, + numberOfKeys: 1, + name: 'getJobsByType', +}; diff --git a/packages/job-queue-plugin/src/bullmq/types.ts b/packages/job-queue-plugin/src/bullmq/types.ts index e3b7bff8c7..2109e44e25 100644 --- a/packages/job-queue-plugin/src/bullmq/types.ts +++ b/packages/job-queue-plugin/src/bullmq/types.ts @@ -92,3 +92,13 @@ export interface BackoffOptions { type: 'exponential' | 'fixed'; delay: number; } + +/** + * @description + * A definition for a Lua script used to define custom behavior in Redis + */ +export interface CustomScriptDefinition { + name: string; + script: string; + numberOfKeys: number; +}