forked from procrastinate-org/procrastinate
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request procrastinate-org#1070 from openradx/job-priority
- Loading branch information
Showing
25 changed files
with
401 additions
and
35 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
# Provide a job priority | ||
|
||
We can assign an optional priority to a job. Jobs with higher priority will be | ||
preferred by a worker. Priority is represented as an (positive or negative) | ||
integer, where a larger number indicates a higher priority. If no priority is | ||
specified, it defaults to 0. | ||
|
||
Priority is used as a way to order available jobs. If a procrastinate worker | ||
requests a job, and there is a high-priority job scheduled that is blocked by a | ||
lock, and a low-priority job that is available, the worker will take | ||
the low-priority job. Procrastinate will never wait for a high-priority job to | ||
become available if there are lower-priority jobs already available. | ||
|
||
## From the code | ||
|
||
Launch a task with a priority of 5: | ||
|
||
```python | ||
my_task.configure(priority=5).defer() | ||
``` | ||
|
||
## From the command line | ||
|
||
```console | ||
$ procrastinate defer --priority=5 path.to.my_task | ||
``` | ||
|
||
:::{warning} | ||
If your setup involves a continuous influx of jobs where workers are | ||
perpetually busy (i.e., jobs are always queuing and workers are never idle), | ||
using priorities could lead to excessive delays in job execution. For example, | ||
if you have a job assigned a priority of -1 while all other jobs have a | ||
priority of 0, the lower priority job might experience significant delays. In | ||
a continuously busy system, this job could potentially take months to execute. | ||
::: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
14 changes: 14 additions & 0 deletions
14
procrastinate/contrib/django/migrations/0026_add_job_priority.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
from __future__ import annotations | ||
|
||
from django.db import migrations, models | ||
|
||
from .. import migrations_utils | ||
|
||
|
||
class Migration(migrations.Migration): | ||
operations = [ | ||
migrations_utils.RunProcrastinateSQL(name="02.00.03_01_add_job_priority.sql"), | ||
migrations.AddField("procrastinatejob", "priority", models.IntegerField()), | ||
] | ||
name = "0026_add_job_priority" | ||
dependencies = [("procrastinate", "0025_add_models")] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
121 changes: 121 additions & 0 deletions
121
procrastinate/sql/migrations/02.00.03_01_add_job_priority.sql
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
ALTER TABLE procrastinate_jobs ADD COLUMN priority integer DEFAULT 0 NOT NULL; | ||
|
||
CREATE OR REPLACE FUNCTION procrastinate_defer_job( | ||
queue_name character varying, | ||
task_name character varying, | ||
priority integer, | ||
lock text, | ||
queueing_lock text, | ||
args jsonb, | ||
scheduled_at timestamp with time zone | ||
) | ||
RETURNS bigint | ||
LANGUAGE plpgsql | ||
AS $$ | ||
DECLARE | ||
job_id bigint; | ||
BEGIN | ||
INSERT INTO procrastinate_jobs (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at) | ||
VALUES (queue_name, task_name, priority, lock, queueing_lock, args, scheduled_at) | ||
RETURNING id INTO job_id; | ||
|
||
RETURN job_id; | ||
END; | ||
$$; | ||
|
||
DROP FUNCTION IF EXISTS procrastinate_defer_periodic_job(character varying, character varying, character varying, character varying, character varying, bigint, jsonb); | ||
CREATE OR REPLACE FUNCTION procrastinate_defer_periodic_job( | ||
_queue_name character varying, | ||
_lock character varying, | ||
_queueing_lock character varying, | ||
_task_name character varying, | ||
_periodic_id character varying, | ||
_defer_timestamp bigint, | ||
_args jsonb | ||
) | ||
RETURNS bigint | ||
LANGUAGE plpgsql | ||
AS $$ | ||
DECLARE | ||
_job_id bigint; | ||
_defer_id bigint; | ||
BEGIN | ||
|
||
INSERT | ||
INTO procrastinate_periodic_defers (task_name, periodic_id, defer_timestamp) | ||
VALUES (_task_name, _periodic_id, _defer_timestamp) | ||
ON CONFLICT DO NOTHING | ||
RETURNING id into _defer_id; | ||
|
||
IF _defer_id IS NULL THEN | ||
RETURN NULL; | ||
END IF; | ||
|
||
UPDATE procrastinate_periodic_defers | ||
SET job_id = procrastinate_defer_job( | ||
_queue_name, | ||
_task_name, | ||
0, | ||
_lock, | ||
_queueing_lock, | ||
_args, | ||
NULL | ||
) | ||
WHERE id = _defer_id | ||
RETURNING job_id INTO _job_id; | ||
|
||
DELETE | ||
FROM procrastinate_periodic_defers | ||
USING ( | ||
SELECT id | ||
FROM procrastinate_periodic_defers | ||
WHERE procrastinate_periodic_defers.task_name = _task_name | ||
AND procrastinate_periodic_defers.periodic_id = _periodic_id | ||
AND procrastinate_periodic_defers.defer_timestamp < _defer_timestamp | ||
ORDER BY id | ||
FOR UPDATE | ||
) to_delete | ||
WHERE procrastinate_periodic_defers.id = to_delete.id; | ||
|
||
RETURN _job_id; | ||
END; | ||
$$; | ||
|
||
DROP FUNCTION IF EXISTS procrastinate_fetch_job(character varying[]); | ||
CREATE OR REPLACE FUNCTION procrastinate_fetch_job( | ||
target_queue_names character varying[] | ||
) | ||
RETURNS procrastinate_jobs | ||
LANGUAGE plpgsql | ||
AS $$ | ||
DECLARE | ||
found_jobs procrastinate_jobs; | ||
BEGIN | ||
WITH candidate AS ( | ||
SELECT jobs.* | ||
FROM procrastinate_jobs AS jobs | ||
WHERE | ||
-- reject the job if its lock has earlier jobs | ||
NOT EXISTS ( | ||
SELECT 1 | ||
FROM procrastinate_jobs AS earlier_jobs | ||
WHERE | ||
jobs.lock IS NOT NULL | ||
AND earlier_jobs.lock = jobs.lock | ||
AND earlier_jobs.status IN ('todo', 'doing') | ||
AND earlier_jobs.id < jobs.id) | ||
AND jobs.status = 'todo' | ||
AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names )) | ||
AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) | ||
ORDER BY jobs.priority DESC, jobs.id ASC LIMIT 1 | ||
FOR UPDATE OF jobs SKIP LOCKED | ||
) | ||
UPDATE procrastinate_jobs | ||
SET status = 'doing' | ||
FROM candidate | ||
WHERE procrastinate_jobs.id = candidate.id | ||
RETURNING procrastinate_jobs.* INTO found_jobs; | ||
|
||
RETURN found_jobs; | ||
END; | ||
$$; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.