Skip to content

Commit

Permalink
feat(queue): support global concurrency (#2496) ref #2465
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Jul 15, 2024
1 parent 7fdd892 commit 47ba055
Show file tree
Hide file tree
Showing 32 changed files with 896 additions and 98 deletions.
24 changes: 22 additions & 2 deletions docs/gitbook/guide/workers/concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,29 @@

There are basically two ways to achieve concurrency with BullMQ. You can run a worker with a concurrency factor larger than 1 \(which is the default value\), or you can run several workers in different node processes.

#### Concurrency factor
#### Global Concurrency factor

The concurrency factor is a worker option that determines how many jobs are allowed to be processed in parallel. This means that the same worker is able to process several jobs in parallel, however the queue guarantees such as "at-least-once" and order of processing are still preserved.
The global concurrency factor is a queue option that determines how many jobs are allowed to be processed in parallel across all your worker instances.

```typescript
import { Queue } from 'bullmq';

await queue.setGlobalConcurrency(4);
```

And in order to get this value:

```typescript
const globalConcurrency = await queue.getGlobalConcurrency();
```

{% hint style="info" %}
Note that if you choose a concurrency level in your workers, it will not override the global one, it will just be the maximum jobs a given worker can process in parallel but never more than the global one.
{% endhint %}

#### Local Concurrency factor

The local concurrency factor is a worker option that determines how many jobs are allowed to be processed in parallel for that instance. This means that the same worker is able to process several jobs in parallel, however the queue guarantees such as "at-least-once" and order of processing are still preserved.

```typescript
import { Worker, Job } from 'bullmq';
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
"nyc": "^15.1.0",
"prettier": "^2.7.1",
"pretty-quick": "^3.1.3",
"progress": "^2.0.3",
"rimraf": "^3.0.2",
"rrule": "^2.6.9",
"semantic-release": "^19.0.3",
Expand Down
26 changes: 14 additions & 12 deletions python/bullmq/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
self.redisConnection = redisConnection
self.redisClient = redisConnection.conn
self.commands = {
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-7.lua")),
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-8.lua")),
"addDelayedJob": self.redisClient.register_script(self.getScript("addDelayedJob-6.lua")),
"addParentJob": self.redisClient.register_script(self.getScript("addParentJob-4.lua")),
"addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-7.lua")),
"changePriority": self.redisClient.register_script(self.getScript("changePriority-6.lua")),
"addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-8.lua")),
"changePriority": self.redisClient.register_script(self.getScript("changePriority-7.lua")),
"cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-2.lua")),
"extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")),
"getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")),
Expand All @@ -51,11 +51,11 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-5.lua")),
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")),
"pause": self.redisClient.register_script(self.getScript("pause-7.lua")),
"promote": self.redisClient.register_script(self.getScript("promote-8.lua")),
"removeJob": self.redisClient.register_script(self.getScript("removeJob-1.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-7.lua")),
"promote": self.redisClient.register_script(self.getScript("promote-9.lua")),
"removeJob": self.redisClient.register_script(self.getScript("removeJob-2.lua")),
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-8.lua")),
"retryJob": self.redisClient.register_script(self.getScript("retryJob-11.lua")),
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-7.lua")),
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-8.lua")),
"saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")),
"updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")),
"updateProgress": self.redisClient.register_script(self.getScript("updateProgress-3.lua")),
Expand Down Expand Up @@ -119,7 +119,7 @@ def addStandardJob(self, job: Job, timestamp: int, pipe = None):
Add a standard job to the queue
"""
keys = self.getKeys(['wait', 'paused', 'meta', 'id',
'completed', 'events', 'marker'])
'completed', 'active', 'events', 'marker'])
args = self.addJobArgs(job, None)
args.append(timestamp)

Expand All @@ -141,7 +141,7 @@ def addPrioritizedJob(self, job: Job, timestamp: int, pipe = None):
Add a prioritized job to the queue
"""
keys = self.getKeys(['marker', 'meta', 'id',
'prioritized', 'completed', 'events', 'pc'])
'prioritized', 'completed', 'active', 'events', 'pc'])
args = self.addJobArgs(job, None)
args.append(timestamp)

Expand Down Expand Up @@ -285,7 +285,7 @@ async def moveToDelayed(self, job_id: str, timestamp: int, delay: int, token: st
return None

def promoteArgs(self, job_id: str):
keys = self.getKeys(['delayed', 'wait', 'paused', 'meta', 'prioritized', 'pc', 'events', 'marker'])
keys = self.getKeys(['delayed', 'wait', 'paused', 'meta', 'prioritized', 'active', 'pc', 'events', 'marker'])
keys.append(self.toKey(job_id))
keys.append(self.keys['events'])
keys.append(self.keys['paused'])
Expand All @@ -306,7 +306,7 @@ async def promote(self, job_id: str):
return None

def remove(self, job_id: str, remove_children: bool):
keys = self.getKeys([''])
keys = self.getKeys(['', 'meta'])
args = [job_id, 1 if remove_children else 0]

return self.commands["removeJob"](keys=keys, args=args)
Expand Down Expand Up @@ -363,6 +363,7 @@ async def changePriority(self, job_id: str, priority:int = 0, lifo:bool = False)
self.keys['paused'],
self.keys['meta'],
self.keys['prioritized'],
self.keys['active'],
self.keys['pc'],
self.keys['marker']]

Expand Down Expand Up @@ -394,6 +395,7 @@ async def reprocessJob(self, job: Job, state: str):
keys.append(self.keys['wait'])
keys.append(self.keys['meta'])
keys.append(self.keys['paused'])
keys.append(self.keys['active'])
keys.append(self.keys['marker'])

args = [
Expand Down Expand Up @@ -434,7 +436,7 @@ async def obliterate(self, count: int, force: bool = False):

def moveJobsToWaitArgs(self, state: str, count: int, timestamp: int) -> int:
keys = self.getKeys(
['', 'events', state, 'wait', 'paused', 'meta', 'marker'])
['', 'events', state, 'wait', 'paused', 'meta', 'active', 'marker'])

args = [count or 1000, timestamp or round(time.time()*1000), state]
return (keys, args)
Expand Down
33 changes: 33 additions & 0 deletions src/classes/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,32 @@ export class Queue<
});
}

/**
* Get global concurrency value.
* Returns null in case no value is set.
*/
async getGlobalConcurrency():Promise<number|null> {
const client = await this.client;
const concurrency = await client.hget(this.keys.meta, 'concurrency');
if(concurrency){
return Number(concurrency);
}
return null;
}

/**
* Enable and set global concurrency value.
* @param concurrency - Maximum number of simultaneous jobs that the workers can handle.
* For instance, setting this value to 1 ensures that no more than one job
* is processed at any given time. If this limit is not defined, there will be no
* restriction on the number of concurrent jobs.
*/
async setGlobalConcurrency(concurrency: number) {
const client = await this.client;
return client.hset(this.keys.meta, 'concurrency', concurrency);
}


/**
* Adds a new job to the queue.
*
Expand Down Expand Up @@ -301,6 +327,13 @@ export class Queue<
return pausedKeyExists === 1;
}

/**
* Returns true if the queue is currently maxed.
*/
isMaxed(): Promise<boolean> {
return this.scripts.isMaxed();
}

/**
* Get all repeatable meta jobs.
*
Expand Down
30 changes: 27 additions & 3 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ export class Scripts {
queueKeys.id,
queueKeys.prioritized,
queueKeys.completed,
queueKeys.active,
queueKeys.events,
queueKeys.pc,
];
Expand Down Expand Up @@ -148,6 +149,7 @@ export class Scripts {
queueKeys.meta,
queueKeys.id,
queueKeys.completed,
queueKeys.active,
queueKeys.events,
queueKeys.marker,
];
Expand Down Expand Up @@ -283,7 +285,9 @@ export class Scripts {
async remove(jobId: string, removeChildren: boolean): Promise<number> {
const client = await this.queue.client;

const keys: (string | number)[] = [''].map(name => this.queue.toKey(name));
const keys: (string | number)[] = ['', 'meta'].map(name =>
this.queue.toKey(name),
);
return (<any>client).removeJob(
keys.concat([jobId, removeChildren ? 1 : 0]),
);
Expand Down Expand Up @@ -614,7 +618,9 @@ export class Scripts {
return (<any>client).getCounts(args);
}

protected getCountsPerPriorityArgs(priorities: number[]): (string | number)[] {
protected getCountsPerPriorityArgs(
priorities: number[],
): (string | number)[] {
const keys: (string | number)[] = [
this.queue.keys.wait,
this.queue.keys.paused,
Expand Down Expand Up @@ -772,6 +778,7 @@ export class Scripts {
this.queue.keys.paused,
this.queue.keys.meta,
this.queue.keys.prioritized,
this.queue.keys.active,
this.queue.keys.pc,
this.queue.keys.marker,
];
Expand Down Expand Up @@ -850,6 +857,20 @@ export class Scripts {
]);
}

isMaxedArgs(): string[] {
const queueKeys = this.queue.keys;
const keys: string[] = [queueKeys.meta, queueKeys.active];

return keys;
}

async isMaxed(): Promise<boolean> {
const client = await this.queue.client;

const args = this.isMaxedArgs();
return !!(await (<any>client).isMaxed(args));
}

async moveToDelayed(
jobId: string,
timestamp: number,
Expand Down Expand Up @@ -984,6 +1005,7 @@ export class Scripts {
this.queue.toKey('wait'),
this.queue.toKey('paused'),
this.queue.keys.meta,
this.queue.keys.active,
this.queue.keys.marker,
];

Expand Down Expand Up @@ -1038,6 +1060,7 @@ export class Scripts {
this.queue.keys.wait,
this.queue.keys.meta,
this.queue.keys.paused,
this.queue.keys.active,
this.queue.keys.marker,
];

Expand Down Expand Up @@ -1108,6 +1131,7 @@ export class Scripts {
this.queue.keys.paused,
this.queue.keys.meta,
this.queue.keys.prioritized,
this.queue.keys.active,
this.queue.keys.pc,
this.queue.keys.events,
this.queue.keys.marker,
Expand Down Expand Up @@ -1179,7 +1203,7 @@ export class Scripts {
const client = await this.queue.client;
const lockKey = `${this.queue.toKey(jobId)}:lock`;

const keys = [
const keys: (string | number)[] = [
this.queue.keys.active,
this.queue.keys.wait,
this.queue.keys.stalled,
Expand Down
2 changes: 1 addition & 1 deletion src/classes/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -619,7 +619,7 @@ will never work with more accuracy than 1ms. */

let timeout: NodeJS.Timeout;
try {
if (!this.closing) {
if (!this.closing && !this.limitUntil) {
let blockTimeout = this.getBlockTimeout(blockUntil);

if (blockTimeout > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@
KEYS[3] 'id'
KEYS[4] 'prioritized'
KEYS[5] 'completed'
KEYS[6] events stream key
KEYS[7] 'pc' priority counter
KEYS[6] 'active'
KEYS[7] events stream key
KEYS[8] 'pc' priority counter
ARGV[1] msgpacked arguments array
[1] key prefix,
Expand All @@ -36,8 +37,9 @@ local idKey = KEYS[3]
local priorityKey = KEYS[4]

local completedKey = KEYS[5]
local eventsKey = KEYS[6]
local priorityCounterKey = KEYS[7]
local activeKey = KEYS[6]
local eventsKey = KEYS[7]
local priorityCounterKey = KEYS[8]

local jobId
local jobIdKey
Expand All @@ -58,7 +60,7 @@ local parentData
--- @include "includes/storeJob"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/handleDuplicatedJob"
--- @include "includes/isQueuePaused"
--- @include "includes/isQueuePausedOrMaxed"

if parentKey ~= nil then
if rcall("EXISTS", parentKey) ~= 1 then return -5 end
Expand Down Expand Up @@ -91,8 +93,8 @@ local delay, priority = storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2],
repeatJobKey)

-- Add the job to the prioritized set
local isPause = isQueuePaused(metaKey)
addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPause)
local isPausedOrMaxed = isQueuePausedOrMaxed(metaKey, activeKey)
addJobWithPriority( KEYS[1], priorityKey, priority, jobId, priorityCounterKey, isPausedOrMaxed)

-- Emit waiting event
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@
KEYS[3] 'meta'
KEYS[4] 'id'
KEYS[5] 'completed'
KEYS[6] events stream key
KEYS[7] marker key
KEYS[6] 'active'
KEYS[7] events stream key
KEYS[8] marker key
ARGV[1] msgpacked arguments array
[1] key prefix,
Expand All @@ -41,7 +42,7 @@
jobId - OK
-5 - Missing parent key
]]
local eventsKey = KEYS[6]
local eventsKey = KEYS[7]

local jobId
local jobIdKey
Expand Down Expand Up @@ -94,11 +95,11 @@ end
storeJob(eventsKey, jobIdKey, jobId, args[3], ARGV[2], opts, timestamp,
parentKey, parentData, repeatJobKey)

local target, paused = getTargetQueueList(metaKey, KEYS[1], KEYS[2])
local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[6], KEYS[1], KEYS[2])

-- LIFO or FIFO
local pushCmd = opts['lifo'] and 'RPUSH' or 'LPUSH'
addJobInTargetList(target, KEYS[7], pushCmd, paused, jobId)
addJobInTargetList(target, KEYS[8], pushCmd, isPausedOrMaxed, jobId)

-- Emit waiting event
rcall("XADD", eventsKey, "MAXLEN", "~", maxEvents, "*", "event", "waiting",
Expand Down
Loading

0 comments on commit 47ba055

Please sign in to comment.