-
Notifications
You must be signed in to change notification settings - Fork 407
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
perf: keep jobs in waiting list when queue is paused #2769
Changes from 49 commits
481b664
2654bb8
9286d68
1f2f889
a66ddba
87447a8
c3acbb9
e973ec5
65c3beb
7a93835
0cde0c3
537ce42
e593d8a
efe1a56
42ebfcb
616e45d
a8103c0
f5f7426
db131b4
9a3067b
77b81ba
47408f3
67e9306
fcac031
4cd34a1
50b0313
9f97ec0
1f7245a
ee33a81
92d9154
268b9d4
6e20049
d0c4015
fd3ba17
82da69a
f1b85ab
852b41e
7514be9
e2e3c9d
e370b84
ac13902
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
--- | ||
description: Tips and hints on how to migrate to v6. | ||
--- | ||
|
||
# Migration to v6 | ||
|
||
Make sure to call **runMigrations** method from Queue class in order to execute all necessary changes when coming from an older version. | ||
|
||
## Migration of deprecated paused key | ||
|
||
If you have paused queues after upgrading to this version. These jobs will be moved to wait state when initializing any of our instances (Worker, Queue, QueueEvents or FlowProducer). | ||
|
||
Paused key is not longer needed as this state is already represented inside meta key. It also improves the process of pausing or resuming a queue as we don't need to rename any key. | ||
|
||
## Remove legacy markers | ||
|
||
When migrating from versions before v5. | ||
It's recommended to do this process: | ||
|
||
1. Pause your queues. | ||
2. Upgrade to v6. | ||
3. Instantiate a Queue instance and execute runMigrations method where migrations will be executed. | ||
4. Resume your queues. | ||
|
||
This way you will prevent that your workers pick a legacy marker that is no longer used because new markers are added in a different structure. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,4 @@ | ||
# Redis™ Compatibility | ||
|
||
There are several alternatives for Redis and even though BullMQ is full Redis™ compliant, not all the alternatives are going to work properly. In this section we present the vendors that officially support BullMQ and that we regularly test to verify they keep staying compatible. | ||
There are several alternatives for Redis and even though BullMQ is full Redis™ compliant with version 6.2.0 or newer, not all the alternatives are going to work properly. In this section we present the vendors that officially support BullMQ and that we regularly test to verify they keep staying compatible. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,11 +38,11 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection | |
self.redisConnection = redisConnection | ||
self.redisClient = redisConnection.conn | ||
self.commands = { | ||
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-8.lua")), | ||
"addStandardJob": self.redisClient.register_script(self.getScript("addStandardJob-7.lua")), | ||
"addDelayedJob": self.redisClient.register_script(self.getScript("addDelayedJob-6.lua")), | ||
"addParentJob": self.redisClient.register_script(self.getScript("addParentJob-4.lua")), | ||
"addPrioritizedJob": self.redisClient.register_script(self.getScript("addPrioritizedJob-8.lua")), | ||
"changePriority": self.redisClient.register_script(self.getScript("changePriority-7.lua")), | ||
"changePriority": self.redisClient.register_script(self.getScript("changePriority-6.lua")), | ||
"cleanJobsInSet": self.redisClient.register_script(self.getScript("cleanJobsInSet-3.lua")), | ||
"extendLock": self.redisClient.register_script(self.getScript("extendLock-2.lua")), | ||
"getCounts": self.redisClient.register_script(self.getScript("getCounts-1.lua")), | ||
|
@@ -51,18 +51,19 @@ def __init__(self, prefix: str, queueName: str, redisConnection: RedisConnection | |
"getState": self.redisClient.register_script(self.getScript("getState-8.lua")), | ||
"getStateV2": self.redisClient.register_script(self.getScript("getStateV2-8.lua")), | ||
"isJobInList": self.redisClient.register_script(self.getScript("isJobInList-1.lua")), | ||
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-9.lua")), | ||
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-11.lua")), | ||
"moveStalledJobsToWait": self.redisClient.register_script(self.getScript("moveStalledJobsToWait-8.lua")), | ||
"moveToActive": self.redisClient.register_script(self.getScript("moveToActive-10.lua")), | ||
"moveToDelayed": self.redisClient.register_script(self.getScript("moveToDelayed-8.lua")), | ||
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-14.lua")), | ||
"moveToFinished": self.redisClient.register_script(self.getScript("moveToFinished-13.lua")), | ||
"moveToWaitingChildren": self.redisClient.register_script(self.getScript("moveToWaitingChildren-5.lua")), | ||
"obliterate": self.redisClient.register_script(self.getScript("obliterate-2.lua")), | ||
"pause": self.redisClient.register_script(self.getScript("pause-7.lua")), | ||
"promote": self.redisClient.register_script(self.getScript("promote-9.lua")), | ||
"promote": self.redisClient.register_script(self.getScript("promote-8.lua")), | ||
"removeJob": self.redisClient.register_script(self.getScript("removeJob-2.lua")), | ||
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-8.lua")), | ||
"retryJob": self.redisClient.register_script(self.getScript("retryJob-11.lua")), | ||
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-8.lua")), | ||
"migrateDeprecatedPausedKey": self.redisClient.register_script(self.getScript("migrateDeprecatedPausedKey-2.lua")), | ||
"reprocessJob": self.redisClient.register_script(self.getScript("reprocessJob-7.lua")), | ||
"retryJob": self.redisClient.register_script(self.getScript("retryJob-10.lua")), | ||
"moveJobsToWait": self.redisClient.register_script(self.getScript("moveJobsToWait-7.lua")), | ||
"saveStacktrace": self.redisClient.register_script(self.getScript("saveStacktrace-1.lua")), | ||
"updateData": self.redisClient.register_script(self.getScript("updateData-1.lua")), | ||
"updateProgress": self.redisClient.register_script(self.getScript("updateProgress-3.lua")), | ||
|
@@ -131,7 +132,7 @@ def addStandardJob(self, job: Job, timestamp: int, pipe = None): | |
""" | ||
Add a standard job to the queue | ||
""" | ||
keys = self.getKeys(['wait', 'paused', 'meta', 'id', | ||
keys = self.getKeys(['wait', 'meta', 'id', | ||
'completed', 'active', 'events', 'marker']) | ||
args = self.addJobArgs(job, None) | ||
args.append(timestamp) | ||
|
@@ -259,15 +260,15 @@ def saveStacktraceArgs(self, job_id: str, stacktrace: str, failedReason: str): | |
return (keys, args) | ||
|
||
def retryJobArgs(self, job_id: str, lifo: bool, token: str, opts: dict = {}): | ||
keys = self.getKeys(['active', 'wait', 'paused']) | ||
keys = self.getKeys(['active', 'wait']) | ||
keys.append(self.toKey(job_id)) | ||
keys.append(self.keys['meta']) | ||
keys.append(self.keys['events']) | ||
keys.append(self.keys['delayed']) | ||
keys.append(self.keys['prioritized']) | ||
keys.append(self.keys['pc']) | ||
keys.append(self.keys['marker']) | ||
keys.append(self.keys['stalled']) | ||
keys.append(self.keys['marker']) | ||
|
||
push_cmd = "RPUSH" if lifo else "LPUSH" | ||
|
||
|
@@ -302,7 +303,6 @@ def promoteArgs(self, job_id: str): | |
keys = self.getKeys(['delayed', 'wait', 'paused', 'meta', 'prioritized', 'active', 'pc', 'events', 'marker']) | ||
keys.append(self.toKey(job_id)) | ||
keys.append(self.keys['events']) | ||
keys.append(self.keys['paused']) | ||
keys.append(self.keys['meta']) | ||
|
||
args = [self.keys[''], job_id] | ||
|
@@ -374,7 +374,6 @@ async def isJobInList(self, list_key: str, job_id: str): | |
|
||
async def changePriority(self, job_id: str, priority:int = 0, lifo:bool = False): | ||
keys = [self.keys['wait'], | ||
self.keys['paused'], | ||
self.keys['meta'], | ||
self.keys['prioritized'], | ||
self.keys['active'], | ||
|
@@ -408,7 +407,6 @@ async def reprocessJob(self, job: Job, state: str): | |
keys.append(self.keys[state]) | ||
keys.append(self.keys['wait']) | ||
keys.append(self.keys['meta']) | ||
keys.append(self.keys['paused']) | ||
keys.append(self.keys['active']) | ||
keys.append(self.keys['marker']) | ||
|
||
|
@@ -450,7 +448,7 @@ async def obliterate(self, count: int, force: bool = False): | |
|
||
def moveJobsToWaitArgs(self, state: str, count: int, timestamp: int) -> int: | ||
keys = self.getKeys( | ||
['', 'events', state, 'wait', 'paused', 'meta', 'active', 'marker']) | ||
['', 'events', state, 'wait', 'meta', 'active', 'marker']) | ||
|
||
args = [count or 1000, timestamp or round(time.time()*1000), state] | ||
return (keys, args) | ||
|
@@ -465,6 +463,15 @@ async def retryJobs(self, state: str, count: int, timestamp: int): | |
result = await self.commands["moveJobsToWait"](keys=keys, args=args) | ||
return result | ||
|
||
async def migrateDeprecatedPausedKey(self, maxCount: int): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really need this? I think for migration users should use the migration script available in the NodeJs for now. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But what I think we need in the python version is to throw an exception if you are trying to run this version on an older BullMQ version that has not yet been migrated. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll add this logic in a following pr |
||
keys = self.getKeys( | ||
['paused', 'wait']) | ||
|
||
args = [maxCount] | ||
|
||
result = await self.commands["migrateDeprecatedPausedKey"](keys=keys, args=args) | ||
return result | ||
|
||
async def promoteJobs(self, count: int): | ||
""" | ||
Promote jobs in delayed state | ||
|
@@ -483,7 +490,7 @@ async def moveToActive(self, token: str, opts: dict) -> list[Any]: | |
limiter = opts.get("limiter", None) | ||
|
||
keys = self.getKeys(['wait', 'active', 'prioritized', 'events', | ||
'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc', 'marker']) | ||
'stalled', 'limiter', 'delayed', 'meta', 'pc', 'marker']) | ||
packedOpts = msgpack.packb( | ||
{"token": token, "lockDuration": lockDuration, "limiter": limiter}, use_bin_type=True) | ||
args = [self.keys[''], timestamp, packedOpts] | ||
|
@@ -516,7 +523,7 @@ def moveToFinishedArgs(self, job: Job, val: Any, propVal: str, shouldRemove, tar | |
metricsKey = self.toKey('metrics:' + target) | ||
|
||
keys = self.getKeys(['wait', 'active', 'prioritized', 'events', | ||
'stalled', 'limiter', 'delayed', 'paused', 'meta', 'pc', target]) | ||
'stalled', 'limiter', 'delayed', 'meta', 'pc', target]) | ||
keys.append(self.toKey(job.id)) | ||
keys.append(metricsKey) | ||
keys.append(self.keys['marker']) | ||
|
@@ -580,7 +587,7 @@ def extendLock(self, jobId: str, token: str, duration: int, client: Redis = None | |
|
||
def moveStalledJobsToWait(self, maxStalledCount: int, stalledInterval: int): | ||
keys = self.getKeys(['stalled', 'wait', 'active', 'failed', | ||
'stalled-check', 'meta', 'paused', 'marker', 'events']) | ||
'stalled-check', 'meta', 'marker', 'events']) | ||
args = [maxStalledCount, self.keys[''], round( | ||
time.time() * 1000), stalledInterval] | ||
return self.commands["moveStalledJobsToWait"](keys, args) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,10 +41,10 @@ const logger = debuglog('bull'); | |
|
||
const optsDecodeMap = { | ||
de: 'deduplication', | ||
ocf: 'onChildFailure', | ||
fpof: 'failParentOnFailure', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Won't we need to keep this old mappings to not cause a data breaking change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe we should introduce a "migration" mechanism here. Something like migration steps for going between versions that are run if required in Lua scripts for atomicity. But we would need to keep a version number in the meta key, adding complexity. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can keep this mappings that will be only used to format options of jobs with these old values, but no migration is needed as we are evaluation old and new option in lua scripts, new jobs will use new option |
||
idof: 'ignoreDependencyOnFailure', | ||
kl: 'keepLogs', | ||
ocf: 'onChildFailure', | ||
rdof: 'removeDependencyOnFailure', | ||
tm: 'telemetryMetadata' | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are going to need more detailed information on how to safely perform the migration.