Skip to content

Commit

Permalink
[IMP] queue_job: Add split method
Browse files Browse the repository at this point in the history
  • Loading branch information
paradoxxxzero committed Dec 4, 2024
1 parent eb58253 commit 0c63fd6
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 108 deletions.
228 changes: 130 additions & 98 deletions queue_job/README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -61,20 +61,20 @@ instantaneous if no other job is running.

Features:

- Views for jobs, jobs are stored in PostgreSQL
- Jobrunner: execute the jobs, highly efficient thanks to PostgreSQL's
NOTIFY
- Channels: give a capacity for the root channel and its sub-channels
and segregate jobs in them. Allow for instance to restrict heavy jobs
to be executed one at a time while little ones are executed 4 at a
times.
- Retries: Ability to retry jobs by raising a type of exception
- Retry Pattern: the 3 first tries, retry after 10 seconds, the 5 next
tries, retry after 1 minutes, ...
- Job properties: priorities, estimated time of arrival (ETA), custom
description, number of retries
- Related Actions: link an action on the job view, such as open the
record concerned by the job
- Views for jobs, jobs are stored in PostgreSQL
- Jobrunner: execute the jobs, highly efficient thanks to PostgreSQL's
NOTIFY
- Channels: give a capacity for the root channel and its sub-channels
and segregate jobs in them. Allow for instance to restrict heavy jobs
to be executed one at a time while little ones are executed 4 at a
times.
- Retries: Ability to retry jobs by raising a type of exception
- Retry Pattern: the 3 first tries, retry after 10 seconds, the 5 next
tries, retry after 1 minutes, ...
- Job properties: priorities, estimated time of arrival (ETA), custom
description, number of retries
- Related Actions: link an action on the job view, such as open the
record concerned by the job

**Table of contents**

Expand All @@ -89,18 +89,18 @@ Be sure to have the ``requests`` library.
Configuration
=============

- Using environment variables and command line:
- Using environment variables and command line:

- Adjust environment variables (optional):
- Adjust environment variables (optional):

- ``ODOO_QUEUE_JOB_CHANNELS=root:4`` or any other channels
configuration. The default is ``root:1``
- if ``xmlrpc_port`` is not set: ``ODOO_QUEUE_JOB_PORT=8069``
- ``ODOO_QUEUE_JOB_CHANNELS=root:4`` or any other channels
configuration. The default is ``root:1``
- if ``xmlrpc_port`` is not set: ``ODOO_QUEUE_JOB_PORT=8069``

- Start Odoo with ``--load=web,queue_job`` and ``--workers`` greater
than 1. [1]_
- Start Odoo with ``--load=web,queue_job`` and ``--workers`` greater
than 1. [1]_

- Using the Odoo configuration file:
- Using the Odoo configuration file:

.. code:: ini
Expand All @@ -113,8 +113,8 @@ Configuration
[queue_job]
channels = root:2
- Confirm the runner is starting correctly by checking the odoo log
file:
- Confirm the runner is starting correctly by checking the odoo log
file:

::

Expand All @@ -123,10 +123,10 @@ Configuration
...INFO...queue_job.jobrunner.runner: queue job runner ready for db <dbname>
...INFO...queue_job.jobrunner.runner: database connections ready

- Create jobs (eg using ``base_import_async``) and observe they start
immediately and in parallel.
- Tip: to enable debug logging for the queue job, use
``--log-handler=odoo.addons.queue_job:DEBUG``
- Create jobs (eg using ``base_import_async``) and observe they start
immediately and in parallel.
- Tip: to enable debug logging for the queue job, use
``--log-handler=odoo.addons.queue_job:DEBUG``

.. [1]
It works with the threaded Odoo server too, although this way of
Expand Down Expand Up @@ -247,23 +247,56 @@ is at the top of the graph. In the example above, if it was called on
``group_a``, then ``group_b`` would never be delayed (but a warning
would be shown).

It is also possible to split a job into several jobs, each one
processing a part of the work. This can be useful to avoid very long
jobs, parallelize some task and get more specific errors. Usage is as
follows:

.. code:: python
def button_split_delayable(self):
(
self # Can be a big recordset, let's say 1000 records
.delayable()
.generate_thumbnail((50, 50))
.set(priority=30)
.set(description=_("generate xxx"))
.split(50) # Split the job in 20 jobs of 50 records each
.delay()
)
The ``split()`` method takes a ``chain`` boolean keyword argument. If
set to True, the jobs will be chained, meaning that the next job will
only start when the previous one is done:

.. code:: python
def button_increment_var(self):
(
self
.delayable()
.increment_counter()
.split(1, chain=True) # Will exceute the jobs one after the other
.delay()
)
Enqueing Job Options
~~~~~~~~~~~~~~~~~~~~

- priority: default is 10, the closest it is to 0, the faster it will
be executed
- eta: Estimated Time of Arrival of the job. It will not be executed
before this date/time
- max_retries: default is 5, maximum number of retries before giving up
and set the job state to 'failed'. A value of 0 means infinite
retries.
- description: human description of the job. If not set, description is
computed from the function doc or method name
- channel: the complete name of the channel to use to process the
function. If specified it overrides the one defined on the function
- identity_key: key uniquely identifying the job, if specified and a
job with the same key has not yet been run, the new job will not be
created
- priority: default is 10, the closest it is to 0, the faster it will be
executed
- eta: Estimated Time of Arrival of the job. It will not be executed
before this date/time
- max_retries: default is 5, maximum number of retries before giving up
and set the job state to 'failed'. A value of 0 means infinite
retries.
- description: human description of the job. If not set, description is
computed from the function doc or method name
- channel: the complete name of the channel to use to process the
function. If specified it overrides the one defined on the function
- identity_key: key uniquely identifying the job, if specified and a job
with the same key has not yet been run, the new job will not be
created

Configure default options for jobs
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -334,11 +367,11 @@ dictionary on the job function:
"kwargs": {"name": "Partner"},
}
- ``enable``: when ``False``, the button has no effect (default:
``True``)
- ``func_name``: name of the method on ``queue.job`` that returns an
action
- ``kwargs``: extra arguments to pass to the related action method
- ``enable``: when ``False``, the button has no effect (default:
``True``)
- ``func_name``: name of the method on ``queue.job`` that returns an
action
- ``kwargs``: extra arguments to pass to the related action method

Example of related action code:

Expand Down Expand Up @@ -382,10 +415,10 @@ integers:
Based on this configuration, we can tell that:

- 5 first retries are postponed 10 seconds later
- retries 5 to 10 postponed 20 seconds later
- retries 10 to 15 postponed 30 seconds later
- all subsequent retries postponed 5 minutes later
- 5 first retries are postponed 10 seconds later
- retries 5 to 10 postponed 20 seconds later
- retries 10 to 15 postponed 30 seconds later
- all subsequent retries postponed 5 minutes later

**Job Context**

Expand Down Expand Up @@ -432,11 +465,11 @@ Testing
The recommended way to test jobs, rather than running them directly and
synchronously is to split the tests in two parts:

- one test where the job is mocked (trap jobs with ``trap_jobs()``
and the test only verifies that the job has been delayed with the
expected arguments
- one test that only calls the method of the job synchronously, to
validate the proper behavior of this method only
- one test where the job is mocked (trap jobs with ``trap_jobs()``
and the test only verifies that the job has been delayed with the
expected arguments
- one test that only calls the method of the job synchronously, to
validate the proper behavior of this method only

Proceeding this way means that you can prove that jobs will be enqueued
properly at runtime, and it ensures your code does not have a different
Expand Down Expand Up @@ -560,14 +593,14 @@ synchronously
Tips and tricks
~~~~~~~~~~~~~~~
- **Idempotency**
(https://www.restapitutorial.com/lessons/idempotency.html): The
queue_job should be idempotent so they can be retried several times
without impact on the data.
- **The job should test at the very beginning its relevance**: the
moment the job will be executed is unknown by design. So the first
task of a job should be to check if the related work is still
relevant at the moment of the execution.
- **Idempotency**
(https://www.restapitutorial.com/lessons/idempotency.html): The
queue_job should be idempotent so they can be retried several times
without impact on the data.
- **The job should test at the very beginning its relevance**: the
moment the job will be executed is unknown by design. So the first
task of a job should be to check if the related work is still relevant
at the moment of the execution.
Patterns
~~~~~~~~
Expand All @@ -584,20 +617,19 @@ Through the time, two main patterns emerged:
Known issues / Roadmap
======================
- After creating a new database or installing ``queue_job`` on an
existing database, Odoo must be restarted for the runner to detect
it.
- When Odoo shuts down normally, it waits for running jobs to finish.
However, when the Odoo server crashes or is otherwise force-stopped,
running jobs are interrupted while the runner has no chance to know
they have been aborted. In such situations, jobs may remain in
``started`` or ``enqueued`` state after the Odoo server is halted.
Since the runner has no way to know if they are actually running or
not, and does not know for sure if it is safe to restart the jobs, it
does not attempt to restart them automatically. Such stale jobs
therefore fill the running queue and prevent other jobs to start. You
must therefore requeue them manually, either from the Jobs view, or
by running the following SQL statement *before starting Odoo*:
- After creating a new database or installing ``queue_job`` on an
existing database, Odoo must be restarted for the runner to detect it.
- When Odoo shuts down normally, it waits for running jobs to finish.
However, when the Odoo server crashes or is otherwise force-stopped,
running jobs are interrupted while the runner has no chance to know
they have been aborted. In such situations, jobs may remain in
``started`` or ``enqueued`` state after the Odoo server is halted.
Since the runner has no way to know if they are actually running or
not, and does not know for sure if it is safe to restart the jobs, it
does not attempt to restart them automatically. Such stale jobs
therefore fill the running queue and prevent other jobs to start. You
must therefore requeue them manually, either from the Jobs view, or by
running the following SQL statement *before starting Odoo*:
.. code:: sql
Expand All @@ -609,11 +641,11 @@ Changelog
Next
----
- [ADD] Run jobrunner as a worker process instead of a thread in the
main process (when running with --workers > 0)
- [REF] ``@job`` and ``@related_action`` deprecated, any method can be
delayed, and configured using ``queue.job.function`` records
- [MIGRATION] from 13.0 branched at rev. e24ff4b
- [ADD] Run jobrunner as a worker process instead of a thread in the
main process (when running with --workers > 0)
- [REF] ``@job`` and ``@related_action`` deprecated, any method can be
delayed, and configured using ``queue.job.function`` records
- [MIGRATION] from 13.0 branched at rev. e24ff4b
Bug Tracker
===========
Expand All @@ -637,21 +669,21 @@ Authors
Contributors
------------
- Guewen Baconnier <guewen.baconnier@camptocamp.com>
- Stéphane Bidoul <stephane.bidoul@acsone.eu>
- Matthieu Dietrich <matthieu.dietrich@camptocamp.com>
- Jos De Graeve <Jos.DeGraeve@apertoso.be>
- David Lefever <dl@taktik.be>
- Laurent Mignon <laurent.mignon@acsone.eu>
- Laetitia Gangloff <laetitia.gangloff@acsone.eu>
- Cédric Pigeon <cedric.pigeon@acsone.eu>
- Tatiana Deribina <tatiana.deribina@avoin.systems>
- Souheil Bejaoui <souheil.bejaoui@acsone.eu>
- Eric Antones <eantones@nuobit.com>
- Simone Orsi <simone.orsi@camptocamp.com>
- Nguyen Minh Chien <chien@trobz.com>
- Tran Quoc Duong <duongtq@trobz.com>
- Vo Hong Thien <thienvh@trobz.com>
- Guewen Baconnier <guewen.baconnier@camptocamp.com>
- Stéphane Bidoul <stephane.bidoul@acsone.eu>
- Matthieu Dietrich <matthieu.dietrich@camptocamp.com>
- Jos De Graeve <Jos.DeGraeve@apertoso.be>
- David Lefever <dl@taktik.be>
- Laurent Mignon <laurent.mignon@acsone.eu>
- Laetitia Gangloff <laetitia.gangloff@acsone.eu>
- Cédric Pigeon <cedric.pigeon@acsone.eu>
- Tatiana Deribina <tatiana.deribina@avoin.systems>
- Souheil Bejaoui <souheil.bejaoui@acsone.eu>
- Eric Antones <eantones@nuobit.com>
- Simone Orsi <simone.orsi@camptocamp.com>
- Nguyen Minh Chien <chien@trobz.com>
- Tran Quoc Duong <duongtq@trobz.com>
- Vo Hong Thien <thienvh@trobz.com>
Other credits
-------------
Expand Down
45 changes: 45 additions & 0 deletions queue_job/delay.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,51 @@ def delay(self):
"""Delay the whole graph"""
self._graph.delay()

def split(self, size, chain=False):
"""Split the Delayables.
Use `DelayableGroup` or `DelayableChain`
if `chain` is True containing batches of size `size`
"""
if not self._job_method:
raise ValueError("No method set on the Delayable")

total_records = len(self.recordset)

delayables = []
for index in range(0, total_records, size):
recordset = self.recordset[index : index + size]
delayable = Delayable(
recordset,
priority=self.priority,
eta=self.eta,
max_retries=self.max_retries,
description=self.description,
channel=self.channel,
identity_key=self.identity_key,
)
# Update the __self__
delayable._job_method = getattr(recordset, self._job_method.__name__)
delayable._job_args = self._job_args
delayable._job_kwargs = self._job_kwargs

delayables.append(delayable)

description = self.description or (
self._job_method.__doc__.splitlines()[0].strip()
if self._job_method.__doc__
else f"{self.recordset._name}.{self._job_method.__name__}"
)
for index, delayable in enumerate(delayables):
delayable.set(
description=f"{description} (split {index + 1}/{len(delayables)})"
)

# Prevent warning on deletion
self._generated_job = True

return (DelayableChain if chain else DelayableGroup)(*delayables)

def _build_job(self):
if self._generated_job:
return self._generated_job
Expand Down
Loading

0 comments on commit 0c63fd6

Please sign in to comment.