diff --git a/docs/gitbook/SUMMARY.md b/docs/gitbook/SUMMARY.md index 32944bac16..82a163692c 100644 --- a/docs/gitbook/SUMMARY.md +++ b/docs/gitbook/SUMMARY.md @@ -70,7 +70,8 @@ - [Producers](guide/nestjs/producers.md) - [Queue Events Listeners](guide/nestjs/queue-events-listeners.md) - [Going to production](guide/going-to-production.md) -- [Migration to newer versions](guide/migration-to-newer-versions.md) +- [Migration to newer versions](guide/migrations/migration-to-newer-versions.md) + - [Version 6](guide/migrations/v6.md) - [Troubleshooting](guide/troubleshooting.md) ## Patterns diff --git a/docs/gitbook/guide/migration-to-newer-versions.md b/docs/gitbook/guide/migrations/migration-to-newer-versions.md similarity index 99% rename from docs/gitbook/guide/migration-to-newer-versions.md rename to docs/gitbook/guide/migrations/migration-to-newer-versions.md index 0b8ca2a175..ad2c27916b 100644 --- a/docs/gitbook/guide/migration-to-newer-versions.md +++ b/docs/gitbook/guide/migrations/migration-to-newer-versions.md @@ -55,4 +55,3 @@ Since BullMQ supports global pause, one possible strategy, if suitable for your ### Use new queues altogether This drastic solution involves discontinuing use of older queues and creating new ones. You could rename older queues (e.g., "myQueueV2"), use a new Redis host, or maintain two versions of the service—one running an older BullMQ version with old queues, and a newer one with the latest BullMQ and a different set of queues. When the older version has no more jobs to process, it can be retired, leaving only the upgraded version. - diff --git a/docs/gitbook/guide/migrations/v6.md b/docs/gitbook/guide/migrations/v6.md new file mode 100644 index 0000000000..b73e60bf46 --- /dev/null +++ b/docs/gitbook/guide/migrations/v6.md @@ -0,0 +1,25 @@ +--- +description: Tips and hints on how to migrate to v6. +--- + +# Migration to v6 + +Make sure to call **runMigrations** method from Queue class in order to execute all necessary changes when coming from an older version. + +## Migration of deprecated paused key + +If you have paused queues after upgrading to this version. These jobs will be moved to wait state when initializing any of our instances (Worker, Queue, QueueEvents or FlowProducer). + +Paused key is not longer needed as this state is already represented inside meta key. It also improves the process of pausing or resuming a queue as we don't need to rename any key. + +## Remove legacy markers + +When migrating from versions before v5. +It's recommended to do this process: + +1. Pause your queues. +2. Upgrade to v6. +3. Instantiate a Queue instance and execute runMigrations method where migrations will be executed. +4. Resume your queues. + +This way you will prevent that your workers pick a legacy marker that is no longer used because new markers are added in a different structure. diff --git a/python/bullmq/queue.py b/python/bullmq/queue.py index 0bcffc1fae..91e18a37a9 100644 --- a/python/bullmq/queue.py +++ b/python/bullmq/queue.py @@ -141,7 +141,7 @@ async def getJobLogs(self, job_id:str, start = 0, end = -1, asc = True): "logs": result[0], "count": result[1] } - + async def obliterate(self, force: bool = False): """ Completely destroys the queue and all of its contents irreversibly. diff --git a/python/bullmq/scripts.py b/python/bullmq/scripts.py index a6475fd4e5..34cd018014 100644 --- a/python/bullmq/scripts.py +++ b/python/bullmq/scripts.py @@ -38,11 +38,11 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection self.redisConnection = redisConnection self.redisClient = redisConnection.conn self.commands = { - "addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-8.lua")), + "addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-7.lua")), "addDelayedJob": self.redisClient.register_script(self.getScript("addDelayedJob-6.lua")), "addParentJob": self.redisClient.register_script(self.getScript("addParentJob-4.lua")), "addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-8.lua")), - "changePriority": self.redisClient.register_script(self.getScript("changePriority-7.lua")), + "changePriority": self.redisClient.register_script(self.getScript("changePriority-6.lua")), "cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-3.lua")), "extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")), "getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")), @@ -51,18 +51,19 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection "getState": self.redisClient.register_script(self.getScript("getState-8.lua")), "getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")), "isJobInList": self.redisClient.register_script(self.getScript("isJobInList-1.lua")), - "moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-9.lua")), - "moveToActive": self.redisClient.register_script(self.getScript("moveToActive-11.lua")), + "moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")), + "moveToActive": self.redisClient.register_script(self.getScript("moveToActive-10.lua")), "moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")), - "moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")), + "moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-13.lua")), "moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-5.lua")), "obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")), "pause": self.redisClient.register_script(self.getScript("pause-7.lua")), - "promote": self.redisClient.register_script(self.getScript("promote-9.lua")), + "promote": self.redisClient.register_script(self.getScript("promote-8.lua")), "removeJob": self.redisClient.register_script(self.getScript("removeJob-2.lua")), - "reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-8.lua")), - "retryJob": self.redisClient.register_script(self.getScript("retryJob-11.lua")), - "moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-8.lua")), + "migrateDeprecatedPausedKey": self.redisClient.register_script(self.getScript("migrateDeprecatedPausedKey-2.lua")), + "reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-7.lua")), + "retryJob": self.redisClient.register_script(self.getScript("retryJob-10.lua")), + "moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-7.lua")), "saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")), "updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")), "updateProgress": self.redisClient.register_script(self.getScript("updateProgress-3.lua")), @@ -131,7 +132,7 @@ def addStandardJob(self, job: Job, timestamp: int, pipe = None): """ Add a standard job to the queue """ - keys = self.getKeys(['wait', 'paused', 'meta', 'id', + keys = self.getKeys(['wait', 'meta', 'id', 'completed', 'active', 'events', 'marker']) args = self.addJobArgs(job, None) args.append(timestamp) @@ -259,15 +260,15 @@ def saveStacktraceArgs(self, job_id: str, stacktrace: str, failedReason: str): return (keys, args) def retryJobArgs(self, job_id: str, lifo: bool, token: str, opts: dict = {}): - keys = self.getKeys(['active', 'wait', 'paused']) + keys = self.getKeys(['active', 'wait']) keys.append(self.toKey(job_id)) keys.append(self.keys['meta']) keys.append(self.keys['events']) keys.append(self.keys['delayed']) keys.append(self.keys['prioritized']) keys.append(self.keys['pc']) - keys.append(self.keys['marker']) keys.append(self.keys['stalled']) + keys.append(self.keys['marker']) push_cmd = "RPUSH" if lifo else "LPUSH" @@ -302,7 +303,6 @@ def promoteArgs(self, job_id: str): keys = self.getKeys(['delayed', 'wait', 'paused', 'meta', 'prioritized', 'active', 'pc', 'events', 'marker']) keys.append(self.toKey(job_id)) keys.append(self.keys['events']) - keys.append(self.keys['paused']) keys.append(self.keys['meta']) args = [self.keys[''], job_id] @@ -374,7 +374,6 @@ async def isJobInList(self, list_key: str, job_id: str): async def changePriority(self, job_id: str, priority:int = 0, lifo:bool = False): keys = [self.keys['wait'], - self.keys['paused'], self.keys['meta'], self.keys['prioritized'], self.keys['active'], @@ -408,7 +407,6 @@ async def reprocessJob(self, job: Job, state: str): keys.append(self.keys[state]) keys.append(self.keys['wait']) keys.append(self.keys['meta']) - keys.append(self.keys['paused']) keys.append(self.keys['active']) keys.append(self.keys['marker']) @@ -450,7 +448,7 @@ async def obliterate(self, count: int, force: bool = False): def moveJobsToWaitArgs(self, state: str, count: int, timestamp: int) -> int: keys = self.getKeys( - ['', 'events', state, 'wait', 'paused', 'meta', 'active', 'marker']) + ['', 'events', state, 'wait', 'meta', 'active', 'marker']) args = [count or 1000, timestamp or round(time.time()*1000), state] return (keys, args) @@ -483,7 +481,7 @@ async def moveToActive(self, token: str, opts: dict) -> list[Any]: limiter = opts.get("limiter", None) keys = self.getKeys(['wait', 'active', 'prioritized', 'events', - 'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc', 'marker']) + 'stalled', 'limiter', 'delayed', 'meta', 'pc', 'marker']) packedOpts = msgpack.packb( {"token": token, "lockDuration": lockDuration, "limiter": limiter}, use_bin_type=True) args = [self.keys[''], timestamp, packedOpts] @@ -516,7 +514,7 @@ def moveToFinishedArgs(self, job: Job, val: Any, propVal: str, shouldRemove, tar metricsKey = self.toKey('metrics:' + target) keys = self.getKeys(['wait', 'active', 'prioritized', 'events', - 'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc', target]) + 'stalled', 'limiter', 'delayed', 'meta', 'pc', target]) keys.append(self.toKey(job.id)) keys.append(metricsKey) keys.append(self.keys['marker']) @@ -580,7 +578,7 @@ def extendLock(self, jobId: str, token: str, duration: int, client: Redis = None def moveStalledJobsToWait(self, maxStalledCount: int, stalledInterval: int): keys = self.getKeys(['stalled', 'wait', 'active', 'failed', - 'stalled-check', 'meta', 'paused', 'marker', 'events']) + 'stalled-check', 'meta', 'marker', 'events']) args = [maxStalledCount, self.keys[''], round( time.time() * 1000), stalledInterval] return self.commands["moveStalledJobsToWait"](keys, args) diff --git a/src/classes/job.ts b/src/classes/job.ts index 3975628feb..1a16bc3c13 100644 --- a/src/classes/job.ts +++ b/src/classes/job.ts @@ -41,10 +41,10 @@ const logger = debuglog('bull'); const optsDecodeMap = { de: 'deduplication', - ocf: 'onChildFailure', fpof: 'failParentOnFailure', idof: 'ignoreDependencyOnFailure', kl: 'keepLogs', + ocf: 'onChildFailure', rdof: 'removeDependencyOnFailure', tm: 'telemetryMetadata' }; diff --git a/src/classes/migrations.ts b/src/classes/migrations.ts new file mode 100644 index 0000000000..7a25315b4f --- /dev/null +++ b/src/classes/migrations.ts @@ -0,0 +1,133 @@ +import { RedisClient } from '../interfaces'; +import { isVersionLowerThan } from '../utils'; + +export interface MigrationOptions { + prefix: string; + queueName: string; + packageVersion?: string; +} + +export type MigrationFunction = ( + client: RedisClient, + opts: MigrationOptions, +) => Promise; + +export const checkPendingMigrations = async ( + client: RedisClient, + opts: MigrationOptions, +) => { + const metaKey = getRedisKeyFromOpts(opts, 'meta'); + const currentVersion = await client.hget(metaKey, 'version'); + + // If version is not set yet, it means it's an enterily new user + if (!currentVersion) { + return false; + } + + if (isVersionLowerThan(currentVersion, '6.0.0')) { + const migrationsKey = getRedisKeyFromOpts(opts, 'migrations'); + const existingMigrations = await client.zrange(migrationsKey, 0, -1); + return migrations.some( + migration => + !existingMigrations.includes(`${migration.version}-${migration.name}`), + ); + } + + return false; +}; + +const getCommandName = (commandName: string, packageVersion: string) => + `${commandName}:${packageVersion}`; + +export const migrations: { + name: string; + version: string; + migrate: MigrationFunction; +}[] = [ + { + name: 'remove-legacy-markers', + version: '6.0.0', + migrate: async (client: RedisClient, opts: MigrationOptions) => { + const keys: (string | number)[] = [ + getRedisKeyFromOpts(opts, 'wait'), + getRedisKeyFromOpts(opts, 'paused'), + getRedisKeyFromOpts(opts, 'meta'), + getRedisKeyFromOpts(opts, 'completed'), + getRedisKeyFromOpts(opts, 'failed'), + ]; + const args = [getRedisKeyFromOpts(opts, '')]; + + await (client)[ + getCommandName('removeLegacyMarkers', opts.packageVersion) + ](keys.concat(args)); + }, + }, + { + name: 'migrate-paused-jobs', + version: '6.0.0', + migrate: async (client: RedisClient, opts: MigrationOptions) => { + const keys: (string | number)[] = [ + getRedisKeyFromOpts(opts, 'paused'), + getRedisKeyFromOpts(opts, 'wait'), + ]; + await (client)[ + getCommandName('migrateDeprecatedPausedKey', opts.packageVersion) + ](keys); + }, + }, +]; + +/** + * Run Migrations. + * + * This method is used to run possibly existing migrations for the queue. + * + * Normally, if there are pending migrations, the Queue, Worker and QueueEvents instances + * will throw an error when they are instantiated. Use then this method to run the migrations + * before instantiating the instances. + * + * @param redisClient The Redis client instance + * @param opts The options for the migration + * + * @sa https://docs.bullmq.io/guide/migrations + */ +export const runMigrations = async ( + redisClient: RedisClient, + opts: { + prefix?: string; + queueName: string; + packageVersion: string; + }, +) => { + const prefix = opts.prefix || 'bull'; + const migrationsKey = getRedisKeyFromOpts({ prefix, ...opts }, 'migrations'); + + // The migrations key is a ZSET with the migration timestamp as the score + for (const migration of migrations) { + const migrationId = `${migration.version}-${migration.name}`; + const pendingMigration = !!(await redisClient.zscore( + migrationsKey, + migrationId, + )); + if (pendingMigration) { + continue; + } + console.log(`[BULLMQ] Running migration ${migrationId}`); + try { + await migration.migrate(redisClient, { + prefix, + queueName: opts.queueName, + packageVersion: opts.packageVersion, + }); + await redisClient.zadd(migrationsKey, Date.now(), migrationId); + } catch (err) { + console.error(`[BULLMQ] Migration ${migrationId} failed: ${err}`); + break; + } + console.log(`[BULLMQ] Migration ${migrationId} completed`); + } +}; + +function getRedisKeyFromOpts(opts: MigrationOptions, key: string): string { + return `${opts.prefix}:${opts.queueName}:${key}`; +} diff --git a/src/classes/queue-base.ts b/src/classes/queue-base.ts index 2760a7b108..fc9e738f2c 100644 --- a/src/classes/queue-base.ts +++ b/src/classes/queue-base.ts @@ -12,7 +12,9 @@ import { RedisConnection } from './redis-connection'; import { Job } from './job'; import { KeysMap, QueueKeys } from './queue-keys'; import { Scripts } from './scripts'; -import { TelemetryAttributes, SpanKind } from '../enums'; +import { checkPendingMigrations, runMigrations } from './migrations'; +import { SpanKind } from '../enums'; +import { version as packageVersion } from '../version'; /** * @class QueueBase @@ -23,6 +25,8 @@ import { TelemetryAttributes, SpanKind } from '../enums'; * */ export class QueueBase extends EventEmitter implements MinimalQueue { + public readonly qualifiedName: string; + toKey: (type: string) => string; keys: KeysMap; closing: Promise | undefined; @@ -31,7 +35,9 @@ export class QueueBase extends EventEmitter implements MinimalQueue { protected hasBlockingConnection: boolean = false; protected scripts: Scripts; protected connection: RedisConnection; - public readonly qualifiedName: string; + + protected checkedPendingMigrations: boolean = false; + protected packageVersion = packageVersion; /** * @@ -85,9 +91,37 @@ export class QueueBase extends EventEmitter implements MinimalQueue { /** * Returns a promise that resolves to a redis client. Normally used only by subclasses. + * This method will also check if there are pending migrations, if so it will throw an error. */ get client(): Promise { - return this.connection.client; + if (this.checkedPendingMigrations) { + return this.connection.client; + } else { + return this.connection.client.then(client => { + return checkPendingMigrations(client, { + prefix: this.opts.prefix, + queueName: this.name, + }).then(hasPendingMigrations => { + if (hasPendingMigrations) { + throw new Error( + 'Queue has pending migrations. See https://docs.bullmq.io/guide/migrations', + ); + } + this.checkedPendingMigrations = true; + return client; + }); + }); + } + } + + async runMigrations() { + const client = await this.client; + await runMigrations(client, { + prefix: this.opts.prefix, + queueName: this.name, + packageVersion: this.packageVersion, + }); + this.checkedPendingMigrations = true; } protected setScripts() { diff --git a/src/classes/queue-keys.ts b/src/classes/queue-keys.ts index aaea64a130..44e7ae7df3 100644 --- a/src/classes/queue-keys.ts +++ b/src/classes/queue-keys.ts @@ -21,6 +21,7 @@ export class QueueKeys { 'repeat', 'limiter', 'meta', + 'migrations', 'events', 'pc', // priority counter key 'marker', // marker key diff --git a/src/classes/queue.ts b/src/classes/queue.ts index c681ae940c..723860c0da 100644 --- a/src/classes/queue.ts +++ b/src/classes/queue.ts @@ -14,7 +14,6 @@ import { Repeat } from './repeat'; import { RedisConnection } from './redis-connection'; import { SpanKind, TelemetryAttributes } from '../enums'; import { JobScheduler } from './job-scheduler'; -import { version } from '../version'; export interface ObliterateOpts { /** @@ -225,7 +224,7 @@ export class Queue< get metaValues(): Record { return { 'opts.maxLenEvents': this.opts?.streams?.events?.maxLen ?? 10000, - version: `${this.libName}:${version}`, + version: `${this.libName}:${this.packageVersion}`, }; } diff --git a/src/classes/redis-connection.ts b/src/classes/redis-connection.ts index 9621f4d8be..354d95cb95 100644 --- a/src/classes/redis-connection.ts +++ b/src/classes/redis-connection.ts @@ -10,7 +10,7 @@ import { isNotConnectionError, isRedisCluster, isRedisInstance, - isRedisVersionLowerThan, + isVersionLowerThan, } from '../utils'; import { version as packageVersion } from '../version'; import * as scripts from '../scripts'; @@ -235,9 +235,7 @@ export class RedisConnection extends EventEmitter { if (this._client['status'] !== 'end') { this.version = await this.getRedisVersion(); if (this.skipVersionCheck !== true && !this.closing) { - if ( - isRedisVersionLowerThan(this.version, RedisConnection.minimumVersion) - ) { + if (isVersionLowerThan(this.version, RedisConnection.minimumVersion)) { throw new Error( `Redis version needs to be greater or equal than ${RedisConnection.minimumVersion} ` + `Current: ${this.version}`, @@ -245,7 +243,7 @@ export class RedisConnection extends EventEmitter { } if ( - isRedisVersionLowerThan( + isVersionLowerThan( this.version, RedisConnection.recommendedMinimumVersion, ) @@ -258,8 +256,8 @@ export class RedisConnection extends EventEmitter { } this.capabilities = { - canDoubleTimeout: !isRedisVersionLowerThan(this.version, '6.0.0'), - canBlockFor1Ms: !isRedisVersionLowerThan(this.version, '7.0.8'), + canDoubleTimeout: !isVersionLowerThan(this.version, '6.0.0'), + canBlockFor1Ms: !isVersionLowerThan(this.version, '7.0.8'), }; this.status = 'ready'; diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 9b5e37ce7e..9d8d9de1d5 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -34,7 +34,7 @@ import { RedisJobOptions, } from '../types'; import { ErrorCode } from '../enums'; -import { array2obj, getParentKey, isRedisVersionLowerThan } from '../utils'; +import { array2obj, getParentKey, isVersionLowerThan } from '../utils'; import { ChainableCommander } from 'ioredis'; import { version as packageVersion } from '../version'; export type JobData = [JobJsonRaw | number, string?]; @@ -62,7 +62,6 @@ export class Scripts { queueKeys.stalled, queueKeys.limiter, queueKeys.delayed, - queueKeys.paused, queueKeys.meta, queueKeys.pc, undefined, @@ -84,7 +83,7 @@ export class Scripts { async isJobInList(listKey: string, jobId: string): Promise { const client = await this.queue.client; let result; - if (isRedisVersionLowerThan(this.queue.redisVersion, '6.0.6')) { + if (isVersionLowerThan(this.queue.redisVersion, '6.0.6')) { result = await this.execCommand(client, 'isJobInList', [listKey, jobId]); } else { result = await client.lpos(listKey, jobId); @@ -161,7 +160,6 @@ export class Scripts { const queueKeys = this.queue.keys; const keys: (string | Buffer)[] = [ queueKeys.wait, - queueKeys.paused, queueKeys.meta, queueKeys.id, queueKeys.completed, @@ -553,10 +551,10 @@ export class Scripts { const metricsKey = this.queue.toKey(`metrics:${target}`); const keys = this.moveToFinishedKeys; - keys[10] = queueKeys[target]; - keys[11] = this.queue.toKey(job.id ?? ''); - keys[12] = metricsKey; - keys[13] = this.queue.keys.marker; + keys[9] = queueKeys[target]; + keys[10] = this.queue.toKey(job.id ?? ''); + keys[11] = metricsKey; + keys[12] = this.queue.keys.marker; const keepJobs = this.getKeepJobs(shouldRemove, workerKeepJobs); @@ -642,29 +640,6 @@ export class Scripts { return this.execCommand(client, 'drain', args); } - private removeLegacyMarkersArgs(): (string | number)[] { - const queueKeys = this.queue.keys; - - const keys: string[] = [ - queueKeys.wait, - queueKeys.paused, - queueKeys.meta, - queueKeys.completed, - queueKeys.failed, - ]; - - const args = [queueKeys['']]; - - return keys.concat(args); - } - - async removeLegacyMarkers(): Promise { - const client = await this.queue.client; - const args = this.removeLegacyMarkersArgs(); - - return (client).removeLegacyMarkers(args); - } - private removeChildDependencyArgs( jobId: string, parentKey: string, @@ -851,7 +826,7 @@ export class Scripts { return this.queue.toKey(key); }); - if (isRedisVersionLowerThan(this.queue.redisVersion, '6.0.6')) { + if (isVersionLowerThan(this.queue.redisVersion, '6.0.6')) { return this.execCommand(client, 'getState', keys.concat([jobId])); } return this.execCommand(client, 'getStateV2', keys.concat([jobId])); @@ -916,7 +891,6 @@ export class Scripts { ): (string | number)[] { const keys: (string | number)[] = [ this.queue.keys.wait, - this.queue.keys.paused, this.queue.keys.meta, this.queue.keys.prioritized, this.queue.keys.active, @@ -1113,15 +1087,14 @@ export class Scripts { const keys: (string | number)[] = [ this.queue.keys.active, this.queue.keys.wait, - this.queue.keys.paused, this.queue.toKey(jobId), this.queue.keys.meta, this.queue.keys.events, this.queue.keys.delayed, this.queue.keys.prioritized, this.queue.keys.pc, - this.queue.keys.marker, this.queue.keys.stalled, + this.queue.keys.marker, ]; const pushCmd = (lifo ? 'R' : 'L') + 'PUSH'; @@ -1145,7 +1118,6 @@ export class Scripts { this.queue.keys.events, this.queue.toKey(state), this.queue.toKey('wait'), - this.queue.toKey('paused'), this.queue.keys.meta, this.queue.keys.active, this.queue.keys.marker, @@ -1201,7 +1173,6 @@ export class Scripts { this.queue.toKey(state), this.queue.keys.wait, this.queue.keys.meta, - this.queue.keys.paused, this.queue.keys.active, this.queue.keys.marker, ]; @@ -1244,7 +1215,6 @@ export class Scripts { queueKeys.stalled, queueKeys.limiter, queueKeys.delayed, - queueKeys.paused, queueKeys.meta, queueKeys.pc, queueKeys.marker, @@ -1276,7 +1246,6 @@ export class Scripts { const keys = [ this.queue.keys.delayed, this.queue.keys.wait, - this.queue.keys.paused, this.queue.keys.meta, this.queue.keys.prioritized, this.queue.keys.active, @@ -1307,7 +1276,6 @@ export class Scripts { this.queue.keys.failed, this.queue.keys['stalled-check'], this.queue.keys.meta, - this.queue.keys.paused, this.queue.keys.marker, this.queue.keys.events, ]; @@ -1356,7 +1324,6 @@ export class Scripts { this.queue.keys.wait, this.queue.keys.stalled, lockKey, - this.queue.keys.paused, this.queue.keys.meta, this.queue.keys.limiter, this.queue.keys.prioritized, @@ -1485,6 +1452,27 @@ export class Scripts { } } + protected executeMigrationsArgs( + currentMigrationExecution = 1, + ): (string | number)[] { + const keys: (string | number)[] = [ + this.queue.keys.meta, + this.queue.keys.migrations, + this.queue.toKey(''), + ]; + const args = [6, Date.now(), currentMigrationExecution]; + + return keys.concat(args); + } + + async executeMigrations(currentMigrationExecution: number): Promise { + const client = await this.queue.client; + + const args = this.executeMigrationsArgs(currentMigrationExecution); + + return (client).executeMigrations(args); + } + finishedErrors({ code, jobId, diff --git a/src/commands/addStandardJob-8.lua b/src/commands/addStandardJob-7.lua similarity index 84% rename from src/commands/addStandardJob-8.lua rename to src/commands/addStandardJob-7.lua index 7005e91af7..34c6cbf8ad 100644 --- a/src/commands/addStandardJob-8.lua +++ b/src/commands/addStandardJob-7.lua @@ -16,13 +16,12 @@ Input: KEYS[1] 'wait', - KEYS[2] 'paused' - KEYS[3] 'meta' - KEYS[4] 'id' - KEYS[5] 'completed' - KEYS[6] 'active' - KEYS[7] events stream key - KEYS[8] marker key + KEYS[2] 'meta' + KEYS[3] 'id' + KEYS[4] 'completed' + KEYS[5] 'active' + KEYS[6] events stream key + KEYS[7] marker key ARGV[1] msgpacked arguments array [1] key prefix, @@ -43,7 +42,7 @@ jobId - OK -5 - Missing parent key ]] -local eventsKey = KEYS[7] +local eventsKey = KEYS[6] local jobId local jobIdKey @@ -64,8 +63,8 @@ local parentData --- @include "includes/addJobInTargetList" --- @include "includes/deduplicateJob" --- @include "includes/getOrSetMaxEvents" ---- @include "includes/getTargetQueueList" --- @include "includes/handleDuplicatedJob" +--- @include "includes/isQueuePausedOrMaxed" --- @include "includes/storeJob" if parentKey ~= nil then @@ -74,9 +73,10 @@ if parentKey ~= nil then parentData = cjson.encode(parent) end -local jobCounter = rcall("INCR", KEYS[4]) +local jobCounter = rcall("INCR", KEYS[3]) -local metaKey = KEYS[3] +local waitKey = KEYS[1] +local metaKey = KEYS[2] local maxEvents = getOrSetMaxEvents(metaKey) local parentDependenciesKey = args[7] @@ -89,7 +89,7 @@ else jobIdKey = args[1] .. jobId if rcall("EXISTS", jobIdKey) == 1 then return handleDuplicatedJob(jobIdKey, jobId, parentKey, parent, - parentData, parentDependenciesKey, KEYS[5], eventsKey, + parentData, parentDependenciesKey, KEYS[4], eventsKey, maxEvents, timestamp) end end @@ -104,11 +104,11 @@ end storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp, parentKey, parentData, repeatJobKey) -local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KEYS[2]) +local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, KEYS[5]) -- LIFO or FIFO local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH' -addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId) +addJobInTargetList(waitKey, KEYS[7], pushCmd, isPausedOrMaxed, jobId) -- Emit waiting event rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting", diff --git a/src/commands/changePriority-7.lua b/src/commands/changePriority-6.lua similarity index 65% rename from src/commands/changePriority-7.lua rename to src/commands/changePriority-6.lua index 3350f54fbd..e5bc4947b1 100644 --- a/src/commands/changePriority-7.lua +++ b/src/commands/changePriority-6.lua @@ -2,12 +2,11 @@ Change job priority Input: KEYS[1] 'wait', - KEYS[2] 'paused' - KEYS[3] 'meta' - KEYS[4] 'prioritized' - KEYS[5] 'active' - KEYS[6] 'pc' priority counter - KEYS[7] 'marker' + KEYS[2] 'meta' + KEYS[3] 'prioritized' + KEYS[4] 'active' + KEYS[5] 'pc' priority counter + KEYS[6] 'marker' ARGV[1] priority value ARGV[2] prefix key @@ -26,14 +25,14 @@ local rcall = redis.call -- Includes --- @include "includes/addJobInTargetList" --- @include "includes/addJobWithPriority" ---- @include "includes/getTargetQueueList" +--- @include "includes/isQueuePausedOrMaxed" --- @include "includes/pushBackJobWithPriority" -local function reAddJobWithNewPriority( prioritizedKey, markerKey, targetKey, +local function reAddJobWithNewPriority( prioritizedKey, markerKey, waitKey, priorityCounter, lifo, priority, jobId, isPausedOrMaxed) if priority == 0 then local pushCmd = lifo and 'RPUSH' or 'LPUSH' - addJobInTargetList(targetKey, markerKey, pushCmd, isPausedOrMaxed, jobId) + addJobInTargetList(waitKey, markerKey, pushCmd, isPausedOrMaxed, jobId) else if lifo then pushBackJobWithPriority(prioritizedKey, priority, jobId) @@ -45,18 +44,18 @@ local function reAddJobWithNewPriority( prioritizedKey, markerKey, targetKey, end if rcall("EXISTS", jobKey) == 1 then - local metaKey = KEYS[3] - local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[5], KEYS[1], KEYS[2]) - local prioritizedKey = KEYS[4] - local priorityCounterKey = KEYS[6] - local markerKey = KEYS[7] + local metaKey = KEYS[2] + local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, KEYS[4]) + local prioritizedKey = KEYS[3] + local priorityCounterKey = KEYS[5] + local markerKey = KEYS[6] -- Re-add with the new priority if rcall("ZREM", prioritizedKey, jobId) > 0 then - reAddJobWithNewPriority( prioritizedKey, markerKey, target, + reAddJobWithNewPriority( prioritizedKey, markerKey, KEYS[1], priorityCounterKey, ARGV[4] == '1', priority, jobId, isPausedOrMaxed) - elseif rcall("LREM", target, -1, jobId) > 0 then - reAddJobWithNewPriority( prioritizedKey, markerKey, target, + elseif rcall("LREM", KEYS[1], -1, jobId) > 0 then + reAddJobWithNewPriority( prioritizedKey, markerKey, KEYS[1], priorityCounterKey, ARGV[4] == '1', priority, jobId, isPausedOrMaxed) end diff --git a/src/commands/executeMigrations-3.lua b/src/commands/executeMigrations-3.lua new file mode 100644 index 0000000000..1ef403629c --- /dev/null +++ b/src/commands/executeMigrations-3.lua @@ -0,0 +1,64 @@ +--[[ + Execute migrations. + + Input: + KEYS[1] meta key + KEYS[2] migrations key + KEYS[3] prefix key + + ARGV[1] current major version + ARGV[2] timestamp + ARGV[3] current migration execution +]] +local rcall = redis.call + +local currentMajorVersion = rcall("HGET", KEYS[1], "mv") + +local function getCurrentMigrationNumber(migrationKey) + local lastExecutedMigration = rcall("LRANGE", migrationKey, -1, -1) + + if #lastExecutedMigration > 0 then + return tonumber(string.match(lastExecutedMigration[1], "(.*)-.*-.*")) + 1 + else + return 1 + end +end + +local function saveMigration(migrationKey, migrationNumber, timestamp, migrationName) + rcall("RPUSH", migrationKey, migrationNumber .. "-" .. timestamp .. "-" .. migrationName) + return migrationNumber + 1 +end + +if currentMajorVersion then + if currentMajorVersion == ARGV[1] then + return 0 + end +else + local currentMigrationNumber = getCurrentMigrationNumber(KEYS[2]) + if currentMigrationNumber == 1 then + -- delete deprecated priority + rcall("DEL", KEYS[3] .. "priority") + currentMigrationNumber = saveMigration(KEYS[2], currentMigrationNumber, ARGV[2], "removeDeprecatedPriorityKey") + end + + local currentMigrationExecutionNumber = tonumber(ARGV[3]) + if currentMigrationNumber == 2 then + -- remove legacy markers + if currentMigrationNumber >= currentMigrationExecutionNumber then + return 2 + else + currentMigrationNumber = saveMigration(KEYS[2], currentMigrationNumber, ARGV[2], "removeLegacyMarkers") + end + end + + if currentMigrationNumber == 3 then + -- migrate deprecated paused key + if currentMigrationNumber >= currentMigrationExecutionNumber then + return 3 + else + currentMigrationNumber = saveMigration(KEYS[2], currentMigrationNumber, ARGV[2], "migrateDeprecatedPausedKey") + end + end + + return 0 +end diff --git a/src/commands/getCountsPerPriority-4.lua b/src/commands/getCountsPerPriority-4.lua index fa24ba2328..60f6a90d4c 100644 --- a/src/commands/getCountsPerPriority-4.lua +++ b/src/commands/getCountsPerPriority-4.lua @@ -21,10 +21,10 @@ local prioritizedKey = KEYS[4] for i = 1, #ARGV do local priority = tonumber(ARGV[i]) if priority == 0 then - if isQueuePaused(KEYS[3]) then - results[#results+1] = rcall("LLEN", pausedKey) + results[#results+1] = rcall("LLEN", waitKey) + if isQueuePaused(KEYS[3]) then -- TODO: remove in next breaking change + results[#results] = results[#results] + rcall("LLEN", pausedKey) else - results[#results+1] = rcall("LLEN", waitKey) end else results[#results+1] = rcall("ZCOUNT", prioritizedKey, diff --git a/src/commands/includes/getTargetQueueList.lua b/src/commands/includes/getTargetQueueList.lua deleted file mode 100644 index 2a7b03571a..0000000000 --- a/src/commands/includes/getTargetQueueList.lua +++ /dev/null @@ -1,22 +0,0 @@ ---[[ - Function to check for the meta.paused key to decide if we are paused or not - (since an empty list and !EXISTS are not really the same). -]] - -local function getTargetQueueList(queueMetaKey, activeKey, waitKey, pausedKey) - local queueAttributes = rcall("HMGET", queueMetaKey, "paused", "concurrency") - - if queueAttributes[1] then - return pausedKey, true - else - if queueAttributes[2] then - local activeCount = rcall("LLEN", activeKey) - if activeCount >= tonumber(queueAttributes[2]) then - return waitKey, true - else - return waitKey, false - end - end - end - return waitKey, false -end diff --git a/src/commands/includes/getWaitPlusPrioritizedCount.lua b/src/commands/includes/getWaitPlusPrioritizedCount.lua new file mode 100644 index 0000000000..019fcbed87 --- /dev/null +++ b/src/commands/includes/getWaitPlusPrioritizedCount.lua @@ -0,0 +1,11 @@ +--[[ + Get count jobs in wait or prioritized. +]] + +local function getWaitPlusPrioritizedCount(waitKey, prioritizedKey) + local waitCount = rcall("LLEN", waitKey) + local prioritizedCount = rcall("ZCARD", prioritizedKey) + + return waitCount + prioritizedCount +end + \ No newline at end of file diff --git a/src/commands/includes/moveParentToWaitIfNeeded.lua b/src/commands/includes/moveParentToWaitIfNeeded.lua index dff73397ea..ca4ee7f97b 100644 --- a/src/commands/includes/moveParentToWaitIfNeeded.lua +++ b/src/commands/includes/moveParentToWaitIfNeeded.lua @@ -7,7 +7,6 @@ --- @include "addJobInTargetList" --- @include "addJobWithPriority" --- @include "isQueuePausedOrMaxed" ---- @include "getTargetQueueList" local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, parentKey, parentId, timestamp) @@ -35,10 +34,9 @@ local function moveParentToWaitIfNeeded(parentQueueKey, parentDependenciesKey, addDelayMarkerIfNeeded(parentMarkerKey, parentDelayedKey) else if priority == 0 then - local parentTarget, isParentPausedOrMaxed = - getTargetQueueList(parentMetaKey, parentActiveKey, parentWaitKey, - parentPausedKey) - addJobInTargetList(parentTarget, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, + local isParentPausedOrMaxed = + isQueuePausedOrMaxed(parentMetaKey, parentActiveKey) + addJobInTargetList(parentWaitKey, parentMarkerKey, "RPUSH", isParentPausedOrMaxed, parentId) else local isPausedOrMaxed = isQueuePausedOrMaxed(parentMetaKey, parentActiveKey) diff --git a/src/commands/includes/removeDeduplicationKey.lua b/src/commands/includes/removeDeduplicationKey.lua index b93e9e29f8..a4b2a461d7 100644 --- a/src/commands/includes/removeDeduplicationKey.lua +++ b/src/commands/includes/removeDeduplicationKey.lua @@ -9,4 +9,3 @@ local function removeDeduplicationKey(prefixKey, jobKey) rcall("DEL", deduplicationKey) end end - \ No newline at end of file diff --git a/src/commands/includes/removeParentDependencyKey.lua b/src/commands/includes/removeParentDependencyKey.lua index 87262850c9..7e5b34ce90 100644 --- a/src/commands/includes/removeParentDependencyKey.lua +++ b/src/commands/includes/removeParentDependencyKey.lua @@ -7,13 +7,12 @@ -- Includes --- @include "addJobInTargetList" --- @include "destructureJobKey" ---- @include "getTargetQueueList" +--- @include "isQueuePausedOrMaxed" --- @include "removeJobKeys" local function moveParentToWait(parentPrefix, parentId, emitEvent) - local parentTarget, isPausedOrMaxed = getTargetQueueList(parentPrefix .. "meta", parentPrefix .. "active", - parentPrefix .. "wait", parentPrefix .. "paused") - addJobInTargetList(parentTarget, parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId) + local isPausedOrMaxed = isQueuePausedOrMaxed(parentPrefix .. "meta", parentPrefix .. "active") + addJobInTargetList(parentPrefix .. "wait", parentPrefix .. "marker", "RPUSH", isPausedOrMaxed, parentId) if emitEvent then local parentEventStream = parentPrefix .. "events" diff --git a/src/commands/migrateDeprecatedPausedKey-2.lua b/src/commands/migrateDeprecatedPausedKey-2.lua new file mode 100644 index 0000000000..806039c9a5 --- /dev/null +++ b/src/commands/migrateDeprecatedPausedKey-2.lua @@ -0,0 +1,18 @@ +--[[ + Move paused job ids to wait state to repair these states + + Input: + KEYS[1] paused key + KEYS[2] wait key +]] + +local rcall = redis.call + +local hasJobsInPaused = rcall("EXISTS", KEYS[1]) == 1 + +if hasJobsInPaused then + rcall("RENAME", KEYS[1], KEYS[2]) +end + +return 0 + \ No newline at end of file diff --git a/src/commands/moveJobFromActiveToWait-10.lua b/src/commands/moveJobFromActiveToWait-9.lua similarity index 62% rename from src/commands/moveJobFromActiveToWait-10.lua rename to src/commands/moveJobFromActiveToWait-9.lua index e90d6d2d10..f340021ce7 100644 --- a/src/commands/moveJobFromActiveToWait-10.lua +++ b/src/commands/moveJobFromActiveToWait-9.lua @@ -6,12 +6,11 @@ KEYS[3] stalled key KEYS[4] job lock key - KEYS[5] paused key - KEYS[6] meta key - KEYS[7] limiter key - KEYS[8] prioritized key - KEYS[9] marker key - KEYS[10] event key + KEYS[5] meta key + KEYS[6] limiter key + KEYS[7] prioritized key + KEYS[8] marker key + KEYS[9] event key ARGV[1] job id ARGV[2] lock token @@ -23,28 +22,28 @@ local rcall = redis.call --- @include "includes/addJobInTargetList" --- @include "includes/pushBackJobWithPriority" --- @include "includes/getOrSetMaxEvents" ---- @include "includes/getTargetQueueList" +--- @include "includes/isQueuePausedOrMaxed" local jobId = ARGV[1] local token = ARGV[2] local lockKey = KEYS[4] local lockToken = rcall("GET", lockKey) -local pttl = rcall("PTTL", KEYS[7]) +local pttl = rcall("PTTL", KEYS[6]) if lockToken == token then - local metaKey = KEYS[6] + local metaKey = KEYS[5] local removed = rcall("LREM", KEYS[1], 1, jobId) if removed > 0 then - local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[5]) + local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, KEYS[1]) rcall("SREM", KEYS[3], jobId) local priority = tonumber(rcall("HGET", ARGV[3], "priority")) or 0 if priority > 0 then - pushBackJobWithPriority(KEYS[8], priority, jobId) + pushBackJobWithPriority(KEYS[7], priority, jobId) else - addJobInTargetList(target, KEYS[9], "RPUSH", isPausedOrMaxed, jobId) + addJobInTargetList(KEYS[2], KEYS[8], "RPUSH", isPausedOrMaxed, jobId) end rcall("DEL", lockKey) @@ -52,7 +51,7 @@ if lockToken == token then local maxEvents = getOrSetMaxEvents(metaKey) -- Emit waiting event - rcall("XADD", KEYS[10], "MAXLEN", "~", maxEvents, "*", "event", "waiting", + rcall("XADD", KEYS[9], "MAXLEN", "~", maxEvents, "*", "event", "waiting", "jobId", jobId) end end diff --git a/src/commands/moveJobsToWait-8.lua b/src/commands/moveJobsToWait-7.lua similarity index 82% rename from src/commands/moveJobsToWait-8.lua rename to src/commands/moveJobsToWait-7.lua index 15e99c6295..09cf80708e 100644 --- a/src/commands/moveJobsToWait-8.lua +++ b/src/commands/moveJobsToWait-7.lua @@ -8,10 +8,9 @@ KEYS[2] events stream KEYS[3] state key (failed, completed, delayed) KEYS[4] 'wait' - KEYS[5] 'paused' - KEYS[6] 'meta' - KEYS[7] 'active' - KEYS[8] 'marker' + KEYS[5] 'meta' + KEYS[6] 'active' + KEYS[7] 'marker' ARGV[1] count ARGV[2] timestamp @@ -30,10 +29,10 @@ local rcall = redis.call; --- @include "includes/addBaseMarkerIfNeeded" --- @include "includes/batches" --- @include "includes/getOrSetMaxEvents" ---- @include "includes/getTargetQueueList" +--- @include "includes/isQueuePausedOrMaxed" -local metaKey = KEYS[6] -local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[7], KEYS[4], KEYS[5]) +local metaKey = KEYS[5] +local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, KEYS[6]) local jobs = rcall('ZRANGEBYSCORE', KEYS[3], 0, timestamp, 'LIMIT', 0, maxCount) if (#jobs > 0) then @@ -60,10 +59,10 @@ if (#jobs > 0) then for from, to in batches(#jobs, 7000) do rcall("ZREM", KEYS[3], unpack(jobs, from, to)) - rcall("LPUSH", target, unpack(jobs, from, to)) + rcall("LPUSH", KEYS[4], unpack(jobs, from, to)) end - addBaseMarkerIfNeeded(KEYS[8], isPausedOrMaxed) + addBaseMarkerIfNeeded(KEYS[7], isPausedOrMaxed) end maxCount = maxCount - #jobs diff --git a/src/commands/moveStalledJobsToWait-9.lua b/src/commands/moveStalledJobsToWait-8.lua similarity index 92% rename from src/commands/moveStalledJobsToWait-9.lua rename to src/commands/moveStalledJobsToWait-8.lua index edb3bc3f59..123118b5d0 100644 --- a/src/commands/moveStalledJobsToWait-9.lua +++ b/src/commands/moveStalledJobsToWait-8.lua @@ -8,9 +8,8 @@ KEYS[4] 'failed', (ZSET) KEYS[5] 'stalled-check', (KEY) KEYS[6] 'meta', (KEY) - KEYS[7] 'paused', (LIST) - KEYS[8] 'marker' - KEYS[9] 'event stream' (STREAM) + KEYS[7] 'marker' + KEYS[8] 'event stream' (STREAM) ARGV[1] Max stalled job count ARGV[2] queue.toKey('') @@ -26,7 +25,7 @@ local rcall = redis.call -- Includes --- @include "includes/addJobInTargetList" --- @include "includes/batches" ---- @include "includes/getTargetQueueList" +--- @include "includes/isQueuePausedOrMaxed" --- @include "includes/moveParentIfNeeded" --- @include "includes/removeDeduplicationKeyIfNeeded" --- @include "includes/removeJob" @@ -40,9 +39,8 @@ local activeKey = KEYS[3] local failedKey = KEYS[4] local stalledCheckKey = KEYS[5] local metaKey = KEYS[6] -local pausedKey = KEYS[7] -local markerKey = KEYS[8] -local eventStreamKey = KEYS[9] +local markerKey = KEYS[7] +local eventStreamKey = KEYS[8] local maxStalledJobCount = tonumber(ARGV[1]) local queueKeyPrefix = ARGV[2] local timestamp = ARGV[3] @@ -124,11 +122,10 @@ if (#stalling > 0) then table.insert(failed, jobId) else - local target, isPausedOrMaxed= - getTargetQueueList(metaKey, activeKey, waitKey, pausedKey) + local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey) -- Move the job back to the wait queue, to immediately be picked up by a waiting worker. - addJobInTargetList(target, markerKey, "RPUSH", isPausedOrMaxed, jobId) + addJobInTargetList(waitKey, markerKey, "RPUSH", isPausedOrMaxed, jobId) rcall("XADD", eventStreamKey, "*", "event", "waiting", "jobId", jobId, 'prev', 'active') diff --git a/src/commands/moveToActive-11.lua b/src/commands/moveToActive-10.lua similarity index 85% rename from src/commands/moveToActive-11.lua rename to src/commands/moveToActive-10.lua index 9cca40e92c..d1343cded7 100644 --- a/src/commands/moveToActive-11.lua +++ b/src/commands/moveToActive-10.lua @@ -18,12 +18,11 @@ KEYS[7] delayed key -- Delayed jobs - KEYS[8] paused key - KEYS[9] meta key - KEYS[10] pc priority counter + KEYS[8] meta key + KEYS[9] pc priority counter -- Marker - KEYS[11] marker key + KEYS[10] marker key -- Arguments ARGV[1] key prefix @@ -45,17 +44,17 @@ local opts = cmsgpack.unpack(ARGV[3]) -- Includes --- @include "includes/getNextDelayedTimestamp" --- @include "includes/getRateLimitTTL" ---- @include "includes/getTargetQueueList" +--- @include "includes/isQueuePausedOrMaxed" --- @include "includes/moveJobFromPriorityToActive" --- @include "includes/prepareJobForProcessing" --- @include "includes/promoteDelayedJobs" -local target, isPausedOrMaxed = getTargetQueueList(KEYS[9], activeKey, waitKey, KEYS[8]) +local isPausedOrMaxed = isQueuePausedOrMaxed(KEYS[8], activeKey) -- Check if there are delayed jobs that we can move to wait. -local markerKey = KEYS[11] -promoteDelayedJobs(delayedKey, markerKey, target, KEYS[3], eventStreamKey, ARGV[1], - ARGV[2], KEYS[10], isPausedOrMaxed) +local markerKey = KEYS[10] +promoteDelayedJobs(delayedKey, markerKey, KEYS[1], KEYS[3], eventStreamKey, ARGV[1], + ARGV[2], KEYS[9], isPausedOrMaxed) local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max']) local expireTime = getRateLimitTTL(maxJobs, rateLimiterKey) @@ -73,7 +72,7 @@ if jobId then return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2], maxJobs, markerKey, opts) else - jobId = moveJobFromPriorityToActive(KEYS[3], activeKey, KEYS[10]) + jobId = moveJobFromPriorityToActive(KEYS[3], activeKey, KEYS[9]) if jobId then return prepareJobForProcessing(ARGV[1], rateLimiterKey, eventStreamKey, jobId, ARGV[2], maxJobs, markerKey, opts) diff --git a/src/commands/moveToFinished-14.lua b/src/commands/moveToFinished-13.lua similarity index 91% rename from src/commands/moveToFinished-14.lua rename to src/commands/moveToFinished-13.lua index 04d914e070..2694136f09 100644 --- a/src/commands/moveToFinished-14.lua +++ b/src/commands/moveToFinished-13.lua @@ -15,14 +15,13 @@ KEYS[6] rate limiter key KEYS[7] delayed key - KEYS[8] paused key - KEYS[9] meta key - KEYS[10] pc priority counter + KEYS[8] meta key + KEYS[9] pc priority counter - KEYS[11] completed/failed key - KEYS[12] jobId key - KEYS[13] metrics key - KEYS[14] marker key + KEYS[10] completed/failed key + KEYS[11] jobId key + KEYS[12] metrics key + KEYS[13] marker key ARGV[1] jobId ARGV[2] timestamp @@ -56,7 +55,7 @@ local rcall = redis.call --- @include "includes/collectMetrics" --- @include "includes/getNextDelayedTimestamp" --- @include "includes/getRateLimitTTL" ---- @include "includes/getTargetQueueList" +--- @include "includes/isQueuePausedOrMaxed" --- @include "includes/moveJobFromPriorityToActive" --- @include "includes/moveParentIfNeeded" --- @include "includes/prepareJobForProcessing" @@ -70,7 +69,7 @@ local rcall = redis.call --- @include "includes/trimEvents" --- @include "includes/updateParentDepsIfNeeded" -local jobIdKey = KEYS[12] +local jobIdKey = KEYS[11] if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists local opts = cmsgpack.unpack(ARGV[8]) @@ -101,7 +100,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists if (numRemovedElements < 1) then return -3 end local eventStreamKey = KEYS[4] - local metaKey = KEYS[9] + local metaKey = KEYS[8] -- Trim events before emiting them to avoid trimming events emitted in this script trimEvents(metaKey, eventStreamKey) @@ -137,7 +136,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists -- Remove job? if maxCount ~= 0 then - local targetSet = KEYS[11] + local targetSet = KEYS[10] -- Add to complete/failed set rcall("ZADD", targetSet, timestamp, jobId) rcall("HMSET", jobIdKey, ARGV[3], ARGV[4], "finishedOn", timestamp) @@ -173,19 +172,19 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists -- Collect metrics if maxMetricsSize ~= "" then - collectMetrics(KEYS[13], KEYS[13] .. ':data', maxMetricsSize, timestamp) + collectMetrics(KEYS[12], KEYS[12] .. ':data', maxMetricsSize, timestamp) end -- Try to get next job to avoid an extra roundtrip if the queue is not closing, -- and not rate limited. if (ARGV[6] == "1") then - local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[2], KEYS[1], KEYS[8]) + local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, KEYS[2]) - local markerKey = KEYS[14] + local markerKey = KEYS[13] -- Check if there are delayed jobs that can be promoted - promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[3], eventStreamKey, prefix, - timestamp, KEYS[10], isPausedOrMaxed) + promoteDelayedJobs(KEYS[7], markerKey, KEYS[1], KEYS[3], eventStreamKey, prefix, + timestamp, KEYS[9], isPausedOrMaxed) local maxJobs = tonumber(opts['limiter'] and opts['limiter']['max']) -- Check if we are rate limited first. @@ -202,7 +201,7 @@ if rcall("EXISTS", jobIdKey) == 1 then -- // Make sure job exists return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId, timestamp, maxJobs, markerKey, opts) else - jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[10]) + jobId = moveJobFromPriorityToActive(KEYS[3], KEYS[2], KEYS[9]) if jobId then return prepareJobForProcessing(prefix, KEYS[6], eventStreamKey, jobId, timestamp, maxJobs, markerKey, diff --git a/src/commands/obliterate-2.lua b/src/commands/obliterate-2.lua index 1a7be36393..7814f12dbe 100644 --- a/src/commands/obliterate-2.lua +++ b/src/commands/obliterate-2.lua @@ -81,7 +81,7 @@ if(maxCount <= 0) then return 1 end -local waitKey = baseKey .. 'paused' +local waitKey = baseKey .. 'wait' maxCount = removeListJobs(waitKey, true, baseKey, maxCount) if(maxCount <= 0) then return 1 diff --git a/src/commands/pause-7.lua b/src/commands/pause-7.lua index 90d0f945f7..471083b14a 100644 --- a/src/commands/pause-7.lua +++ b/src/commands/pause-7.lua @@ -19,19 +19,27 @@ local rcall = redis.call -- Includes --- @include "includes/addDelayMarkerIfNeeded" +--- @include "includes/getWaitPlusPrioritizedCount" local markerKey = KEYS[7] -local hasJobs = rcall("EXISTS", KEYS[1]) == 1 ---TODO: check this logic to be reused when changing a delay -if hasJobs then rcall("RENAME", KEYS[1], KEYS[2]) end if ARGV[1] == "paused" then rcall("HSET", KEYS[3], "paused", 1) rcall("DEL", markerKey) else rcall("HDEL", KEYS[3], "paused") + --jobs in paused key + local hasJobs = rcall("EXISTS", KEYS[1]) == 1 + + if hasJobs then + --move a maximum of 7000 per resumed call in order to not block + --if users have more jobs in paused state, call resumed multiple times + local jobs = rcall('LRANGE', KEYS[1], 0, 6999) + rcall("RPUSH", KEYS[2], unpack(jobs)) + rcall("LTRIM", KEYS[1], #jobs, -1) + end - if hasJobs or rcall("ZCARD", KEYS[4]) > 0 then + if getWaitPlusPrioritizedCount(KEYS[2], KEYS[4]) > 0 then -- Add marker if there are waiting or priority jobs rcall("ZADD", markerKey, 0, "0") else diff --git a/src/commands/promote-9.lua b/src/commands/promote-8.lua similarity index 56% rename from src/commands/promote-9.lua rename to src/commands/promote-8.lua index f0a273f0d7..f691f13e26 100644 --- a/src/commands/promote-9.lua +++ b/src/commands/promote-8.lua @@ -4,13 +4,12 @@ Input: KEYS[1] 'delayed' KEYS[2] 'wait' - KEYS[3] 'paused' - KEYS[4] 'meta' - KEYS[5] 'prioritized' - KEYS[6] 'active' - KEYS[7] 'pc' priority counter - KEYS[8] 'event stream' - KEYS[9] 'marker' + KEYS[3] 'meta' + KEYS[4] 'prioritized' + KEYS[5] 'active' + KEYS[6] 'pc' priority counter + KEYS[7] 'event stream' + KEYS[8] 'marker' ARGV[1] queue.toKey('') ARGV[2] jobId @@ -28,25 +27,25 @@ local jobId = ARGV[2] -- Includes --- @include "includes/addJobInTargetList" --- @include "includes/addJobWithPriority" ---- @include "includes/getTargetQueueList" +--- @include "includes/isQueuePausedOrMaxed" if rcall("ZREM", KEYS[1], jobId) == 1 then local jobKey = ARGV[1] .. jobId local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0 - local metaKey = KEYS[4] - local markerKey = KEYS[9] + local metaKey = KEYS[3] + local markerKey = KEYS[8] - local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[2], KEYS[3]) + local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, KEYS[5]) if priority == 0 then -- LIFO or FIFO - addJobInTargetList(target, markerKey, "LPUSH", isPausedOrMaxed, jobId) + addJobInTargetList(KEYS[2], markerKey, "LPUSH", isPausedOrMaxed, jobId) else - addJobWithPriority(markerKey, KEYS[5], priority, jobId, KEYS[7], isPausedOrMaxed) + addJobWithPriority(markerKey, KEYS[4], priority, jobId, KEYS[6], isPausedOrMaxed) end -- Emit waiting event (wait..ing@token) - rcall("XADD", KEYS[8], "*", "event", "waiting", "jobId", jobId, "prev", + rcall("XADD", KEYS[7], "*", "event", "waiting", "jobId", jobId, "prev", "delayed"); rcall("HSET", jobKey, "delay", 0) diff --git a/src/commands/reprocessJob-8.lua b/src/commands/reprocessJob-7.lua similarity index 78% rename from src/commands/reprocessJob-8.lua rename to src/commands/reprocessJob-7.lua index 300ab6a1e8..f393ede5da 100644 --- a/src/commands/reprocessJob-8.lua +++ b/src/commands/reprocessJob-7.lua @@ -7,9 +7,8 @@ KEYS[3] job state KEYS[4] wait key KEYS[5] meta - KEYS[6] paused key - KEYS[7] active key - KEYS[8] marker key + KEYS[6] active key + KEYS[7] marker key ARGV[1] job.id ARGV[2] (job.opts.lifo ? 'R' : 'L') + 'PUSH' @@ -26,15 +25,15 @@ local rcall = redis.call; -- Includes --- @include "includes/addJobInTargetList" --- @include "includes/getOrSetMaxEvents" ---- @include "includes/getTargetQueueList" +--- @include "includes/isQueuePausedOrMaxed" if rcall("EXISTS", KEYS[1]) == 1 then local jobId = ARGV[1] if (rcall("ZREM", KEYS[3], jobId) == 1) then rcall("HDEL", KEYS[1], "finishedOn", "processedOn", ARGV[3]) - local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[7], KEYS[4], KEYS[6]) - addJobInTargetList(target, KEYS[8], ARGV[2], isPausedOrMaxed, jobId) + local isPausedOrMaxed = isQueuePausedOrMaxed(KEYS[5], KEYS[6]) + addJobInTargetList(KEYS[4], KEYS[7], ARGV[2], isPausedOrMaxed, jobId) local maxEvents = getOrSetMaxEvents(KEYS[5]) -- Emit waiting event diff --git a/src/commands/retryJob-11.lua b/src/commands/retryJob-10.lua similarity index 55% rename from src/commands/retryJob-11.lua rename to src/commands/retryJob-10.lua index 33d1f7a85a..d5977b41d2 100644 --- a/src/commands/retryJob-11.lua +++ b/src/commands/retryJob-10.lua @@ -4,15 +4,14 @@ Input: KEYS[1] 'active', KEYS[2] 'wait' - KEYS[3] 'paused' - KEYS[4] job key - KEYS[5] 'meta' - KEYS[6] events stream - KEYS[7] delayed key - KEYS[8] prioritized key - KEYS[9] 'pc' priority counter + KEYS[3] job key + KEYS[4] 'meta' + KEYS[5] events stream + KEYS[6] delayed key + KEYS[7] prioritized key + KEYS[8] 'pc' priority counter + KEYS[9] 'stalled' KEYS[10] 'marker' - KEYS[11] 'stalled' ARGV[1] key prefix ARGV[2] timestamp @@ -35,20 +34,21 @@ local rcall = redis.call --- @include "includes/addJobInTargetList" --- @include "includes/addJobWithPriority" --- @include "includes/getOrSetMaxEvents" ---- @include "includes/getTargetQueueList" --- @include "includes/promoteDelayedJobs" --- @include "includes/removeLock" --- @include "includes/isQueuePausedOrMaxed" -local target, isPausedOrMaxed = getTargetQueueList(KEYS[5], KEYS[1], KEYS[2], KEYS[3]) +local jobKey = KEYS[3] +local metaKey = KEYS[4] +local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, KEYS[1]) local markerKey = KEYS[10] -- Check if there are delayed jobs that we can move to wait. -- test example: when there are delayed jobs between retries -promoteDelayedJobs(KEYS[7], markerKey, target, KEYS[8], KEYS[6], ARGV[1], ARGV[2], KEYS[9], isPausedOrMaxed) +promoteDelayedJobs(KEYS[6], markerKey, KEYS[2], KEYS[7], KEYS[5], ARGV[1], ARGV[2], KEYS[8], isPausedOrMaxed) -if rcall("EXISTS", KEYS[4]) == 1 then - local errorCode = removeLock(KEYS[4], KEYS[11], ARGV[5], ARGV[4]) +if rcall("EXISTS", jobKey) == 1 then + local errorCode = removeLock(jobKey, KEYS[9], ARGV[5], ARGV[4]) if errorCode < 0 then return errorCode end @@ -56,24 +56,24 @@ if rcall("EXISTS", KEYS[4]) == 1 then local numRemovedElements = rcall("LREM", KEYS[1], -1, ARGV[4]) if (numRemovedElements < 1) then return -3 end - local priority = tonumber(rcall("HGET", KEYS[4], "priority")) or 0 + local priority = tonumber(rcall("HGET", jobKey, "priority")) or 0 --need to re-evaluate after removing job from active - isPausedOrMaxed = isQueuePausedOrMaxed(KEYS[5], KEYS[1]) + isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, KEYS[1]) -- Standard or priority add if priority == 0 then - addJobInTargetList(target, markerKey, ARGV[3], isPausedOrMaxed, ARGV[4]) + addJobInTargetList(KEYS[2], markerKey, ARGV[3], isPausedOrMaxed, ARGV[4]) else - addJobWithPriority(markerKey, KEYS[8], priority, ARGV[4], KEYS[9], isPausedOrMaxed) + addJobWithPriority(markerKey, KEYS[7], priority, ARGV[4], KEYS[8], isPausedOrMaxed) end - rcall("HINCRBY", KEYS[4], "atm", 1) + rcall("HINCRBY", jobKey, "atm", 1) - local maxEvents = getOrSetMaxEvents(KEYS[5]) + local maxEvents = getOrSetMaxEvents(metaKey) -- Emit waiting event - rcall("XADD", KEYS[6], "MAXLEN", "~", maxEvents, "*", "event", "waiting", + rcall("XADD", KEYS[5], "MAXLEN", "~", maxEvents, "*", "event", "waiting", "jobId", ARGV[4], "prev", "failed") return 0 diff --git a/src/utils.ts b/src/utils.ts index 4efa2e3467..aaf998a37c 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -200,7 +200,7 @@ export const childSend = ( msg: ChildMessage, ): Promise => asyncSend(proc, msg); -export const isRedisVersionLowerThan = ( +export const isVersionLowerThan = ( currentVersion: string, minimumVersion: string, ): boolean => { diff --git a/src/version.ts b/src/version.ts index 192be5e052..58016e0cb5 100644 --- a/src/version.ts +++ b/src/version.ts @@ -1 +1 @@ -export const version = '5.27.0'; +export const version = '6.0.0'; diff --git a/tests/test_clean.ts b/tests/test_clean.ts index 1e19b2a79f..f330e09962 100644 --- a/tests/test_clean.ts +++ b/tests/test_clean.ts @@ -28,8 +28,8 @@ describe('Cleaner', () => { queueName = `test-${v4()}`; queue = new Queue(queueName, { connection, prefix }); queueEvents = new QueueEvents(queueName, { connection, prefix }); - await queueEvents.waitUntilReady(); await queue.waitUntilReady(); + await queueEvents.waitUntilReady(); }); afterEach(async function () { diff --git a/tests/test_concurrency.ts b/tests/test_concurrency.ts index 400ae7bbcc..45ed82ccb3 100644 --- a/tests/test_concurrency.ts +++ b/tests/test_concurrency.ts @@ -156,6 +156,7 @@ describe('Concurrency', () => { }); const queueEvents = new QueueEvents(queueName, { connection, prefix }); await queueEvents.waitUntilReady(); + await queue.waitUntilReady(); await queue.setGlobalConcurrency(1); const worker = new Worker( @@ -239,6 +240,7 @@ describe('Concurrency', () => { prefix, }); const queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queue.waitUntilReady(); await queueEvents.waitUntilReady(); await queue.setGlobalConcurrency(1); diff --git a/tests/test_job.ts b/tests/test_job.ts index 039c457572..3b2d810ff2 100644 --- a/tests/test_job.ts +++ b/tests/test_job.ts @@ -1433,7 +1433,7 @@ describe('Job', function () { await queue.pause(); await delayedJob.promote(); - const pausedJobsCount = await queue.getJobCountByTypes('paused'); + const pausedJobsCount = await queue.getWaitingCount(); expect(pausedJobsCount).to.be.equal(2); await queue.resume(); @@ -1455,7 +1455,7 @@ describe('Job', function () { await queue.pause(); await delayedJob.promote(); - const pausedJobsCount = await queue.getJobCountByTypes('paused'); + const pausedJobsCount = await queue.getWaitingCount(); expect(pausedJobsCount).to.be.equal(1); await queue.resume(); diff --git a/tests/test_migrations.ts b/tests/test_migrations.ts new file mode 100644 index 0000000000..7006c1ac16 --- /dev/null +++ b/tests/test_migrations.ts @@ -0,0 +1,90 @@ +import { expect } from 'chai'; +import { default as IORedis } from 'ioredis'; +import { describe, beforeEach, it, before, after as afterAll } from 'mocha'; +import * as sinon from 'sinon'; +import { v4 } from 'uuid'; +import { Queue } from '../src/classes'; +import { removeAllQueueData } from '../src/utils'; + +describe('migrations', function () { + const redisHost = process.env.REDIS_HOST || 'localhost'; + const prefix = process.env.BULLMQ_TEST_PREFIX || 'bull'; + + const sandbox = sinon.createSandbox(); + + let queue: Queue; + let queueName: string; + + let connection; + before(async function () { + connection = new IORedis(redisHost, { maxRetriesPerRequest: null }); + }); + + beforeEach(async function () { + queueName = `test-${v4()}`; + queue = new Queue(queueName, { connection, prefix }); + await queue.waitUntilReady(); + }); + + afterEach(async function () { + sandbox.restore(); + await queue.close(); + await removeAllQueueData(new IORedis(redisHost), queueName); + }); + + afterAll(async function () { + await connection.quit(); + }); + + describe('execute migrations', () => { + describe('removeLegacyMarkers', () => { + it('removes old markers', async () => { + const client = await queue.client; + const completedKey = `${prefix}:${queueName}:completed`; + const failedKey = `${prefix}:${queueName}:failed`; + const waitingKey = `${prefix}:${queueName}:wait`; + await client.zadd(completedKey, 1, '0:2'); + await client.zadd(completedKey, 1, '0:2'); + await client.zadd(failedKey, 2, '0:1'); + await client.rpush(waitingKey, '0:0'); + + await queue.runMigrations(); + + const keys = await client.keys(`${prefix}:${queueName}:*`); + + // meta key, migrations + expect(keys.length).to.be.eql(2); + + const completedCount = await client.zcard(completedKey); + expect(completedCount).to.be.eql(0); + + const failedCount = await client.zcard(failedKey); + expect(failedCount).to.be.eql(0); + + const waitingCount = await client.llen(waitingKey); + expect(waitingCount).to.be.eql(0); + }); + }); + + describe('migratePausedKey', () => { + it('moves jobs from paused to wait', async () => { + const client = await queue.client; + await client.lpush( + `${prefix}:${queueName}:paused`, + '1', + '2', + '3', + '4', + '5', + '6', + ); + + await queue.runMigrations(); + + const jobs = await client.lrange(`${prefix}:${queueName}:wait`, 0, -1); + + expect(jobs).to.be.eql(['6', '5', '4', '3', '2', '1']); + }); + }); + }); +}); diff --git a/tests/test_obliterate.ts b/tests/test_obliterate.ts index 48967bd5df..89b8223b5a 100644 --- a/tests/test_obliterate.ts +++ b/tests/test_obliterate.ts @@ -74,6 +74,7 @@ describe('Obliterate', function () { await queue.obliterate(); const client = await queue.client; const keys = await client.keys(`${prefix}:${queue.name}:*`); + expect(keys.length).to.be.eql(0); await worker.close(); @@ -375,6 +376,7 @@ describe('Obliterate', function () { await queue.obliterate({ force: true }); const client = await queue.client; const keys = await client.keys(`${prefix}:${queue.name}:*`); + // only migration key should be kept expect(keys.length).to.be.eql(0); await worker.close(); diff --git a/tests/test_pause.ts b/tests/test_pause.ts index 9ad1f6e5c5..38e809da18 100644 --- a/tests/test_pause.ts +++ b/tests/test_pause.ts @@ -58,9 +58,8 @@ describe('Pause', function () { if (processed) { throw new Error('should not process delayed jobs in paused queue.'); } - const counts2 = await queue.getJobCounts('waiting', 'paused', 'delayed'); - expect(counts2).to.have.property('waiting', 0); - expect(counts2).to.have.property('paused', 1); + const counts2 = await queue.getJobCounts('waiting', 'delayed'); + expect(counts2).to.have.property('waiting', 1); expect(counts2).to.have.property('delayed', 0); await worker.close(); @@ -380,7 +379,7 @@ describe('Pause', function () { const waitingEvent = new Promise(resolve => { queueEvents.on('waiting', async ({ prev }) => { if (prev === 'failed') { - const count = await queue.getJobCountByTypes('paused'); + const count = await queue.getJobCountByTypes('wait'); expect(count).to.be.equal(1); await queue.resume(); resolve(); diff --git a/tests/test_queue.ts b/tests/test_queue.ts index 6bd97a426a..51c9ed9c51 100644 --- a/tests/test_queue.ts +++ b/tests/test_queue.ts @@ -122,7 +122,14 @@ describe('queues', function () { for (const key of keys) { const type = key.split(':')[2]; - expect(['marker', 'events', 'meta', 'pc', 'id']).to.include(type); + expect([ + 'marker', + 'migrations', + 'events', + 'meta', + 'pc', + 'id', + ]).to.include(type); } }).timeout(10000); @@ -407,7 +414,7 @@ describe('queues', function () { describe('when queue is paused', () => { it('clean queue including paused jobs', async () => { const maxJobs = 50; - const added = []; + const added: Promise[] = []; await queue.pause(); for (let i = 1; i <= maxJobs; i++) { @@ -417,8 +424,8 @@ describe('queues', function () { await Promise.all(added); const count = await queue.count(); expect(count).to.be.eql(maxJobs); - const count2 = await queue.getJobCounts('paused'); - expect(count2.paused).to.be.eql(maxJobs); + const count2 = await queue.getJobCounts('wait'); + expect(count2.wait).to.be.eql(maxJobs); await queue.drain(); const countAfterEmpty = await queue.count(); expect(countAfterEmpty).to.be.eql(0); @@ -620,7 +627,7 @@ describe('queues', function () { }); describe('when queue is paused', () => { - it('moves retried jobs to paused', async () => { + it('moves retried jobs to wait', async () => { await queue.waitUntilReady(); const jobCount = 8; @@ -663,8 +670,8 @@ describe('queues', function () { await queue.pause(); await queue.retryJobs({ count: 2 }); - const pausedCount = await queue.getJobCounts('paused'); - expect(pausedCount.paused).to.be.equal(jobCount); + const pausedCount = await queue.getJobCounts('wait'); + expect(pausedCount.wait).to.be.equal(jobCount); await worker.close(); }); diff --git a/tests/test_rate_limiter.ts b/tests/test_rate_limiter.ts index 28e4c5c9ad..789088bba4 100644 --- a/tests/test_rate_limiter.ts +++ b/tests/test_rate_limiter.ts @@ -28,6 +28,7 @@ describe('Rate Limiter', function () { queueName = `test-${v4()}`; queue = new Queue(queueName, { connection, prefix }); queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queue.waitUntilReady(); await queueEvents.waitUntilReady(); }); @@ -137,7 +138,7 @@ describe('Rate Limiter', function () { }); describe('when queue is paused between rate limit', () => { - it('should add active jobs to paused', async function () { + it('should add active jobs to wait', async function () { this.timeout(20000); const numJobs = 4; @@ -184,10 +185,9 @@ describe('Rate Limiter', function () { await delay(500); - const counts = await queue.getJobCounts('paused', 'completed', 'wait'); - expect(counts).to.have.property('paused', numJobs - 1); + const counts = await queue.getJobCounts('completed', 'wait'); expect(counts).to.have.property('completed', 1); - expect(counts).to.have.property('wait', 0); + expect(counts).to.have.property('wait', numJobs - 1); await worker1.close(); await worker2.close(); @@ -692,6 +692,8 @@ describe('Rate Limiter', function () { }, ); + await worker.waitUntilReady(); + const result = new Promise((resolve, reject) => { queueEvents.on( 'completed', @@ -797,7 +799,7 @@ describe('Rate Limiter', function () { }); describe('when queue is paused', () => { - it('moves job to paused', async function () { + it('moves job to wait', async function () { const dynamicLimit = 250; const duration = 100; @@ -839,7 +841,7 @@ describe('Rate Limiter', function () { await result; - const pausedCount = await queue.getJobCountByTypes('paused'); + const pausedCount = await queue.getJobCountByTypes('wait'); expect(pausedCount).to.equal(1); await worker.close(); diff --git a/tests/test_sandboxed_process.ts b/tests/test_sandboxed_process.ts index c246a06458..b7d9bc65de 100644 --- a/tests/test_sandboxed_process.ts +++ b/tests/test_sandboxed_process.ts @@ -28,6 +28,7 @@ describe('Sandboxed process using child processes', () => { queueName = `test-${v4()}`; queue = new Queue(queueName, { connection, prefix }); queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queue.waitUntilReady(); await queueEvents.waitUntilReady(); }); @@ -462,6 +463,7 @@ function sandboxProcessTests( resolve(); }); }); + await worker.waitUntilReady(); const inspect = stderr.inspect(); await queue.add('test', { foo: 'bar' }); @@ -696,6 +698,7 @@ function sandboxProcessTests( drainDelay: 1, useWorkerThreads, }); + await worker.waitUntilReady(); const completing = new Promise((resolve, reject) => { worker.on('completed', async (job: Job, value: any) => { diff --git a/tests/test_worker.ts b/tests/test_worker.ts index a99dde18dd..f95fd96473 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -15,11 +15,7 @@ import { } from '../src/classes'; import { KeepJobs, MinimalJob } from '../src/interfaces'; import { JobsOptions } from '../src/types'; -import { - delay, - isRedisVersionLowerThan, - removeAllQueueData, -} from '../src/utils'; +import { delay, isVersionLowerThan, removeAllQueueData } from '../src/utils'; describe('workers', function () { const redisHost = process.env.REDIS_HOST || 'localhost'; @@ -40,6 +36,7 @@ describe('workers', function () { queueName = `test-${v4()}`; queue = new Queue(queueName, { connection, prefix }); queueEvents = new QueueEvents(queueName, { connection, prefix }); + await queue.waitUntilReady(); await queueEvents.waitUntilReady(); }); @@ -99,46 +96,6 @@ describe('workers', function () { await worker.close(); }); - describe('when legacy marker is present', () => { - it('does not get stuck', async () => { - const client = await queue.client; - await client.rpush(`${prefix}:${queue.name}:wait`, '0:0'); - - const worker = new Worker( - queueName, - async () => { - await delay(200); - }, - { autorun: false, connection, prefix }, - ); - await worker.waitUntilReady(); - - const secondJob = await queue.add('test', { foo: 'bar' }); - - const completing = new Promise((resolve, reject) => { - worker.on('completed', async job => { - try { - if (job.id === secondJob.id) { - resolve(); - } - } catch (err) { - reject(err); - } - }); - }); - - worker.run(); - - await completing; - - const completedCount = await queue.getCompletedCount(); - - expect(completedCount).to.be.equal(2); - - await worker.close(); - }); - }); - it('process several jobs serially', async () => { let counter = 1; const maxJobs = 35; @@ -532,8 +489,6 @@ describe('workers', function () { expect(job.data.foo).to.be.eql('bar'); } - expect(bclientSpy.callCount).to.be.equal(1); - await new Promise((resolve, reject) => { worker.on('completed', (_job: Job, _result: any) => { completedJobs++; @@ -576,8 +531,6 @@ describe('workers', function () { expect(job.data.foo).to.be.eql('bar'); } - expect(bclientSpy.callCount).to.be.equal(1); - await new Promise((resolve, reject) => { worker.on('completed', (job: Job, result: any) => { completedJobs++; @@ -868,7 +821,7 @@ describe('workers', function () { }); await worker.waitUntilReady(); const client = await worker.client; - if (isRedisVersionLowerThan(worker.redisVersion, '7.0.8')) { + if (isVersionLowerThan(worker.redisVersion, '7.0.8')) { await client.bzpopmin(`key`, 0.002); } else { await client.bzpopmin(`key`, 0.001); @@ -982,7 +935,7 @@ describe('workers', function () { }); await worker.waitUntilReady(); - if (isRedisVersionLowerThan(worker.redisVersion, '7.0.8')) { + if (isVersionLowerThan(worker.redisVersion, '7.0.8')) { expect(worker['getBlockTimeout'](0)).to.be.equal(0.002); } else { expect(worker['getBlockTimeout'](0)).to.be.equal(0.001); @@ -1018,7 +971,7 @@ describe('workers', function () { }); await worker.waitUntilReady(); - if (isRedisVersionLowerThan(worker.redisVersion, '7.0.8')) { + if (isVersionLowerThan(worker.redisVersion, '7.0.8')) { expect( worker['getBlockTimeout'](Date.now() + 100), ).to.be.greaterThan(0.002); @@ -1854,7 +1807,7 @@ describe('workers', function () { }); describe('when queue is paused and retry a job', () => { - it('moves job to paused', async () => { + it('moves job to wait', async () => { const worker = new Worker( queueName, async () => { @@ -1884,7 +1837,7 @@ describe('workers', function () { await queue.pause(); await job.retry('completed'); - const pausedJobsCount = await queue.getJobCountByTypes('paused'); + const pausedJobsCount = await queue.getJobCountByTypes('wait'); expect(pausedJobsCount).to.be.equal(1); await worker.close(); @@ -4362,7 +4315,7 @@ describe('workers', function () { }, }); - if (isRedisVersionLowerThan(childrenWorker.redisVersion, '7.2.0')) { + if (isVersionLowerThan(childrenWorker.redisVersion, '7.2.0')) { expect(unprocessed1!.length).to.be.greaterThanOrEqual(50); expect(nextCursor1).to.not.be.equal(0); } else { @@ -4378,7 +4331,7 @@ describe('workers', function () { }, }); - if (isRedisVersionLowerThan(childrenWorker.redisVersion, '7.2.0')) { + if (isVersionLowerThan(childrenWorker.redisVersion, '7.2.0')) { expect(unprocessed2!.length).to.be.lessThanOrEqual(15); expect(nextCursor2).to.be.equal(0); } else {