Skip to content

Commit

Permalink
[IMP] queue_job: remove cron garbage collector and automatically requ…
Browse files Browse the repository at this point in the history
…eue jobs in timeout

[IMP] queue_job: increment 'retry' when re-queuing job that have been killed
  • Loading branch information
AnizR committed Dec 9, 2024
1 parent 20b0e93 commit 13e159b
Show file tree
Hide file tree
Showing 11 changed files with 185 additions and 118 deletions.
13 changes: 1 addition & 12 deletions queue_job/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,7 @@ Configuration
.. [1] It works with the threaded Odoo server too, although this way
of running Odoo is obviously not for production purposes.
* Be sure to check out *Jobs Garbage Collector* CRON and change *enqueued_delta* and *started_delta* parameters to your needs.

* ``enqueued_delta``: Spent time in minutes after which an enqueued job is considered stuck.
Set it to 0 to disable this check.
* ``started_delta``: Spent time in minutes after which a started job is considered stuck.
This parameter should not be less than ``--limit-time-real // 60`` parameter in your configuration.
Set it to 0 to disable this check. Set it to -1 to automate it, based in the server's ``--limit-time-real`` config parameter.

.. code-block:: python
# `model` corresponds to 'queue.job' model
model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1)
* Jobs that remain in `enqueued` or `started` state (because, for instance, their worker has been killed) will be automatically re-queued.

Usage
=====
Expand Down
2 changes: 1 addition & 1 deletion queue_job/__manifest__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

{
"name": "Job Queue",
"version": "16.0.2.6.8",
"version": "16.0.2.7.0",
"author": "Camptocamp,ACSONE SA/NV,Odoo Community Association (OCA)",
"website": "https://github.com/OCA/queue",
"license": "LGPL-3",
Expand Down
2 changes: 2 additions & 0 deletions queue_job/controllers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ def _try_perform_job(self, env, job):
job.set_started()
job.store()
env.cr.commit()
job.lock()

Check warning on line 34 in queue_job/controllers/main.py

View check run for this annotation

Codecov / codecov/patch

queue_job/controllers/main.py#L34

Added line #L34 was not covered by tests

_logger.debug("%s started", job)

job.perform()
Expand Down
11 changes: 0 additions & 11 deletions queue_job/data/queue_data.xml
Original file line number Diff line number Diff line change
@@ -1,17 +1,6 @@
<?xml version="1.0" encoding="utf-8" ?>
<odoo>
<data noupdate="1">
<record id="ir_cron_queue_job_garbage_collector" model="ir.cron">
<field name="name">Jobs Garbage Collector</field>
<field name="interval_number">5</field>
<field name="interval_type">minutes</field>
<field name="numbercall">-1</field>
<field ref="model_queue_job" name="model_id" />
<field name="state">code</field>
<field
name="code"
>model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1)</field>
</record>
<!-- Queue-job-related subtypes for messaging / Chatter -->
<record id="mt_job_failed" model="mail.message.subtype">
<field name="name">Job failed</field>
Expand Down
46 changes: 46 additions & 0 deletions queue_job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,32 @@ def load_many(cls, env, job_uuids):
recordset = cls.db_records_from_uuids(env, job_uuids)
return {cls._load_from_db_record(record) for record in recordset}

def lock(self):
self.env.cr.execute(

Check warning on line 242 in queue_job/job.py

View check run for this annotation

Codecov / codecov/patch

queue_job/job.py#L242

Added line #L242 was not covered by tests
"""
SELECT
*
FROM
queue_job_locks
WHERE
id in (
SELECT
id
FROM
queue_job
WHERE
uuid = %s
AND state='started'
)
FOR UPDATE;
""",
[self.uuid],
)

# 1 job should be locked
if not 1 == len(self.env.cr.fetchall()):
raise RetryableJobError("Trying to lock job that wasn't started")

Check warning on line 265 in queue_job/job.py

View check run for this annotation

Codecov / codecov/patch

queue_job/job.py#L265

Added line #L265 was not covered by tests

@classmethod
def _load_from_db_record(cls, job_db_record):
stored = job_db_record
Expand Down Expand Up @@ -517,6 +543,9 @@ def perform(self):
The job is executed with the user which has initiated it.
"""
if self.max_retries and self.retry >= self.max_retries:
raise FailedJobError("Max. retries (%d) reached" % (self.max_retries))

Check warning on line 547 in queue_job/job.py

View check run for this annotation

Codecov / codecov/patch

queue_job/job.py#L547

Added line #L547 was not covered by tests

self.retry += 1
try:
self.result = self.func(*tuple(self.args), **self.kwargs)
Expand Down Expand Up @@ -820,6 +849,23 @@ def set_started(self):
self.date_started = datetime.now()
self.worker_pid = os.getpid()

# add job to list of lockable jobs
self.env.cr.execute(
"""
INSERT INTO
queue_job_locks (id)
SELECT
id
FROM
queue_job
WHERE
uuid = %s
ON CONFLICT(id)
DO NOTHING;
""",
[self.uuid],
)

def set_done(self, result=None):
self.state = DONE
self.exc_name = None
Expand Down
101 changes: 85 additions & 16 deletions queue_job/jobrunner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,22 +114,6 @@
* After creating a new database or installing queue_job on an
existing database, Odoo must be restarted for the runner to detect it.
* When Odoo shuts down normally, it waits for running jobs to finish.
However, when the Odoo server crashes or is otherwise force-stopped,
running jobs are interrupted while the runner has no chance to know
they have been aborted. In such situations, jobs may remain in
``started`` or ``enqueued`` state after the Odoo server is halted.
Since the runner has no way to know if they are actually running or
not, and does not know for sure if it is safe to restart the jobs,
it does not attempt to restart them automatically. Such stale jobs
therefore fill the running queue and prevent other jobs to start.
You must therefore requeue them manually, either from the Jobs view,
or by running the following SQL statement *before starting Odoo*:
.. code-block:: sql
update queue_job set state='pending' where state in ('started', 'enqueued')
.. rubric:: Footnotes
.. [1] From a security standpoint, it is safe to have an anonymous HTTP
Expand Down Expand Up @@ -343,6 +327,85 @@ def set_job_enqueued(self, uuid):
(ENQUEUED, uuid),
)

def requeue_dead_jobs(self):
"""
Set started and enqueued jobs but not locked to pending
A job is locked when it's being executed
When a job is killed, it releases the lock
Adding a buffer on 'date_enqueued' to check
that it has been enqueued for more than 10sec.
This prevents from requeuing jobs before they are actually started.
When Odoo shuts down normally, it waits for running jobs to finish.
However, when the Odoo server crashes or is otherwise force-stopped,
running jobs are interrupted while the runner has no chance to know
they have been aborted.
"""

# identify job to requeue
query_id_state_job_to_requeue = """

Check warning on line 348 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L348

Added line #L348 was not covered by tests
SELECT
job.id, job.state
FROM
queue_job_locks AS lock
INNER JOIN
(
SELECT
id,state
FROM
queue_job
WHERE
state IN ('enqueued','started')
AND date_enqueued <
(now() AT TIME ZONE 'utc' - INTERVAL '10 sec')
) AS job
ON
lock.id = job.id
FOR UPDATE SKIP LOCKED
"""
with closing(self.conn.cursor()) as cr:
cr.execute(query_id_state_job_to_requeue)
job_to_requeue = cr.fetchall()
enqueued_jobs, started_jobs = [

Check warning on line 371 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L368-L371

Added lines #L368 - L371 were not covered by tests
job[0] for job in job_to_requeue if job[1] == "pending"
], [job[0] for job in job_to_requeue if job[1] == "started"]
# dead enqueued jobs are set to pending without incrementing retry
if enqueued_jobs:
query_requeue_pending_job = """

Check warning on line 376 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L376

Added line #L376 was not covered by tests
UPDATE
queue_job
SET
state='pending'
WHERE
id in %s;
"""
cr.execute(query_requeue_pending_job, [tuple(enqueued_jobs)])

Check warning on line 384 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L384

Added line #L384 was not covered by tests

_logger.warning(

Check warning on line 386 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L386

Added line #L386 was not covered by tests
"queue_job: requeuing 'enqueued' jobs with ids= %s",
str(enqueued_jobs),
)

# dead started jobs are set to pending without incrementing retry
if started_jobs:
query_started_pending_job = """

Check warning on line 393 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L393

Added line #L393 was not covered by tests
UPDATE
queue_job
SET
state='pending',
retry=retry+1
WHERE
id in %s;
"""
cr.execute(query_started_pending_job, [tuple(started_jobs)])

Check warning on line 402 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L402

Added line #L402 was not covered by tests

_logger.warning(

Check warning on line 404 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L404

Added line #L404 was not covered by tests
"queue_job: requeuing 'started' jobs with ids= %s",
str(started_jobs),
)


class QueueJobRunner(object):
def __init__(
Expand Down Expand Up @@ -424,6 +487,11 @@ def initialize_databases(self):
self.channel_manager.notify(db_name, *job_data)
_logger.info("queue job runner ready for db %s", db_name)

def requeue_dead_jobs(self):
for db in self.db_by_name.values():
if db.has_queue_job:
db.requeue_dead_jobs()

Check warning on line 493 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L493

Added line #L493 was not covered by tests

def run_jobs(self):
now = _odoo_now()
for job in self.channel_manager.get_jobs_to_run(now):
Expand Down Expand Up @@ -516,6 +584,7 @@ def run(self):
_logger.info("database connections ready")
# inner loop does the normal processing
while not self._stop:
self.requeue_dead_jobs()

Check warning on line 587 in queue_job/jobrunner/runner.py

View check run for this annotation

Codecov / codecov/patch

queue_job/jobrunner/runner.py#L587

Added line #L587 was not covered by tests
self.process_notifications()
self.run_jobs()
self.wait_notification()
Expand Down
35 changes: 35 additions & 0 deletions queue_job/migrations/16.0.2.6.9/pre-migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html)


def migrate(cr, version):
# Create job lock table
cr.execute(
"""
CREATE TABLE IF NOT EXISTS queue_job_locks (
id INT PRIMARY KEY,
CONSTRAINT
foreign_key_queue_job
FOREIGN KEY (id)
REFERENCES queue_job (id) ON DELETE CASCADE
);
"""
)

# Deactivate cron garbage collector
cr.execute(
"""
UPDATE
ir_cron
SET
active=False
WHERE id IN (
SELECT res_id
FROM
ir_model_data
WHERE
module='queue_job'
AND model='ir.cron'
AND name='ir_cron_queue_job_garbage_collector'
);
"""
)
53 changes: 0 additions & 53 deletions queue_job/models/queue_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from datetime import datetime, timedelta

from odoo import _, api, exceptions, fields, models
from odoo.osv import expression
from odoo.tools import config, html_escape

from odoo.addons.base_sparse_field.models.fields import Serialized
Expand Down Expand Up @@ -417,58 +416,6 @@ def autovacuum(self):
break
return True

def requeue_stuck_jobs(self, enqueued_delta=1, started_delta=0):
"""Fix jobs that are in a bad states
:param in_queue_delta: lookup time in minutes for jobs
that are in enqueued state,
0 means that it is not checked
:param started_delta: lookup time in minutes for jobs
that are in started state,
0 means that it is not checked,
-1 will use `--limit-time-real` config value
"""
if started_delta == -1:
started_delta = (config["limit_time_real"] // 60) + 1
return self._get_stuck_jobs_to_requeue(
enqueued_delta=enqueued_delta, started_delta=started_delta
).requeue()

def _get_stuck_jobs_domain(self, queue_dl, started_dl):
domain = []
now = fields.datetime.now()
if queue_dl:
queue_dl = now - timedelta(minutes=queue_dl)
domain.append(
[
"&",
("date_enqueued", "<=", fields.Datetime.to_string(queue_dl)),
("state", "=", "enqueued"),
]
)
if started_dl:
started_dl = now - timedelta(minutes=started_dl)
domain.append(
[
"&",
("date_started", "<=", fields.Datetime.to_string(started_dl)),
("state", "=", "started"),
]
)
if not domain:
raise exceptions.ValidationError(
_("If both parameters are 0, ALL jobs will be requeued!")
)
return expression.OR(domain)

def _get_stuck_jobs_to_requeue(self, enqueued_delta, started_delta):
job_model = self.env["queue.job"]
stuck_jobs = job_model.search(
self._get_stuck_jobs_domain(enqueued_delta, started_delta)
)
return stuck_jobs

def related_action_open_record(self):
"""Open a form view with the record(s) of the job.
Expand Down
13 changes: 13 additions & 0 deletions queue_job/post_init_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,16 @@ def post_init_hook(cr, registry):
FOR EACH ROW EXECUTE PROCEDURE queue_job_notify();
"""
)

# Create job lock table
cr.execute(
"""
CREATE TABLE IF NOT EXISTS queue_job_locks (
id INT PRIMARY KEY,
CONSTRAINT
foreign_key_queue_job
FOREIGN KEY (id)
REFERENCES queue_job (id) ON DELETE CASCADE
);
"""
)
13 changes: 1 addition & 12 deletions queue_job/readme/CONFIGURE.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,4 @@
.. [1] It works with the threaded Odoo server too, although this way
of running Odoo is obviously not for production purposes.
* Be sure to check out *Jobs Garbage Collector* CRON and change *enqueued_delta* and *started_delta* parameters to your needs.

* ``enqueued_delta``: Spent time in minutes after which an enqueued job is considered stuck.
Set it to 0 to disable this check.
* ``started_delta``: Spent time in minutes after which a started job is considered stuck.
This parameter should not be less than ``--limit-time-real // 60`` parameter in your configuration.
Set it to 0 to disable this check. Set it to -1 to automate it, based in the server's ``--limit-time-real`` config parameter.

.. code-block:: python
# `model` corresponds to 'queue.job' model
model.requeue_stuck_jobs(enqueued_delta=1, started_delta=-1)
* Jobs that remain in `enqueued` or `started` state (because, for instance, their worker has been killed) will be automatically re-queued.
Loading

0 comments on commit 13e159b

Please sign in to comment.