Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: keep jobs in waiting list when queue is paused #2769

Merged
merged 41 commits into from
Nov 19, 2024
Merged

Conversation

roggervalf
Copy link
Collaborator

@roggervalf roggervalf commented Sep 12, 2024

This PR removes the legacy code regarding the old paused queue functionality, which was based on renaming the wait list key to paused. As paused keys are now handled as a queue state, we do not need the old mechanism anymore and so the code can be removed and simplified. We need some migration steps for old queues that may be in paused status at the time of the upgrade.

@roggervalf roggervalf changed the base branch from master to v6 September 12, 2024 05:43
@roggervalf roggervalf changed the base branch from v6 to master September 13, 2024 00:42
@roggervalf roggervalf changed the base branch from master to v6 September 13, 2024 01:42
@roggervalf roggervalf changed the base branch from v6 to master September 14, 2024 02:51
Copy link
Contributor

@manast manast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments, more will come.

@@ -16,13 +16,13 @@ const originalTree = await flow.add({
name,
data: { idx: 0, foo: 'bar' },
queueName: 'childrenQueueName',
opts: { failParentOnFailure: true },
opts: { onChildFailure: 'fail' },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice change.

@@ -451,6 +462,15 @@ async def retryJobs(self, state: str, count: int, timestamp: int):
result = await self.commands["moveJobsToWait"](keys=keys, args=args)
return result

async def repairDeprecatedPausedKey(self, maxCount: int):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need proper documentation on how and when a call to this method is needed.

@@ -39,10 +39,8 @@ const logger = debuglog('bull');

const optsDecodeMap = {
de: 'debounce',
fpof: 'failParentOnFailure',
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't we need to keep this old mappings to not cause a data breaking change?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should introduce a "migration" mechanism here. Something like migration steps for going between versions that are run if required in Lua scripts for atomicity. But we would need to keep a version number in the meta key, adding complexity.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can keep this mappings that will be only used to format options of jobs with these old values, but no migration is needed as we are evaluation old and new option in lua scripts, new jobs will use new option

@@ -1210,27 +1208,6 @@ export class Job<
throw new Error(`Delay and repeat options could not be used together`);
}

if (this.opts.removeDependencyOnFailure && this.opts.failParentOnFailure) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice to be able to remove this

/**
* Remove legacy markers before v5
*/
removeLegacyMarkers(): Promise<void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Documentation about this is also needed. Most importantly practical information in how to apply something like this in the context of a production deployment.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a me migrations section for it

*
* @param maxCount - Max quantity of jobs to be moved to wait per iteration.
*/
async repairDeprecatedPausedKey(maxCount: number = 1000): Promise<void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should "repair" be the proper name or actually something like "migrate" would be more accurate?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change it, let me do it

@roggervalf roggervalf changed the base branch from master to v6 September 20, 2024 12:28
@roggervalf roggervalf changed the base branch from v6 to master September 21, 2024 04:31
@roggervalf roggervalf changed the base branch from master to v6 September 21, 2024 14:10
@roggervalf roggervalf changed the base branch from v6 to master September 24, 2024 03:54
@roggervalf roggervalf changed the base branch from master to v6 September 24, 2024 04:24
@roggervalf roggervalf changed the base branch from v6 to master September 26, 2024 04:58
@roggervalf roggervalf changed the base branch from master to v6 September 26, 2024 04:59
@roggervalf roggervalf changed the base branch from v6 to master September 27, 2024 01:29
@roggervalf roggervalf changed the base branch from master to v6 October 8, 2024 04:26
@roggervalf roggervalf changed the base branch from v6 to master October 12, 2024 19:50
@roggervalf roggervalf changed the base branch from master to v6 October 18, 2024 05:04

If you have paused queues after upgrading to this version. These jobs will be moved to wait state when initializing any of our instances (Worker, Queue, QueueEvents or FlowProducer).

Paused key is not longer needed as this state is already represented by queue meta key. It also improve the process of pausing or resuming a queue as we don't need to rename any key.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by meta key -> inside the meta key

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"It also improves..."


Paused key is not longer needed as this state is already represented by queue meta key. It also improve the process of pausing or resuming a queue as we don't need to rename any key.

## Remove legacy markers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are going to need more detailed information on how to safely perform the migration.


# Migration to v6

Make sure to pass **skipMigrationsExecution** option in any of our instances as false in order to execute all necessary changes when coming from an older version
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think default should be perform the migration? I think it makes more sense to have the reverse, default is not to do the migration, you must explicitly do it, In fact it would be better to give an error if the queue has not been migrated yet and require you to manually migrate first as part of your deployment steps.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So basically we should have only one way to migrate which would be using the run migrations utility, and not implicitly when the queue is instantiated.


1. Pause your queues.
2. Upgrade to v6.
3. Instantiate any instance passing skipMigrationsExecution option as false where migrations will be executed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it feels a bit awkward to require to use false to actually trigger a one time behaviour and then after that you must use true.

'Queue has pending migrations. See https://docs.bullmq.io/guide/migrations',
);
} else {
return runMigrations(client, {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think we should silently run migrations here, this just opens a window to shoot ourselves in the foot.

*
* @param maxCount - Max quantity of jobs to be moved to wait per iteration.
*/
async migrateDeprecatedPausedKey(maxCount = 1000): Promise<void> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be moved to a migration step.

ARGV[1] count
]]

local maxCount = tonumber(ARGV[1])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand this script. It should not be possible to have both wait and paused keys at the same time, either it is wait or paused but not both, so basically just renaming the key from paused to wait should be enough as a migration step + setting the queue status to paused.

@roggervalf roggervalf changed the base branch from v6 to master November 16, 2024 14:56
@roggervalf roggervalf changed the base branch from master to v6 November 16, 2024 18:08
Copy link
Contributor

@manast manast left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a small comment in the python version.

@@ -465,6 +463,15 @@ async def retryJobs(self, state: str, count: int, timestamp: int):
result = await self.commands["moveJobsToWait"](keys=keys, args=args)
return result

async def migrateDeprecatedPausedKey(self, maxCount: int):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this? I think for migration users should use the migration script available in the NodeJs for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what I think we need in the python version is to throw an exception if you are trying to run this version on an older BullMQ version that has not yet been migrated.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add this logic in a following pr

@roggervalf roggervalf changed the base branch from v6 to master November 19, 2024 03:19
@roggervalf roggervalf changed the base branch from master to v6 November 19, 2024 03:38
@roggervalf roggervalf merged commit d304267 into v6 Nov 19, 2024
12 checks passed
@roggervalf roggervalf deleted the refactor-paused branch November 19, 2024 03:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants