Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add suspend command #658

Merged
merged 3 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 28 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ for your jobs. For example:
from django_rq.queues import get_queue
queue = get_queue('default')
job = queue.enqueue_at(datetime(2020, 10, 10), func)

If you are using built-in scheduler you have to start workers with scheduler support::

python manage.py rqworker --with-scheduler
Expand Down Expand Up @@ -307,6 +307,33 @@ Here is an example settings fragment for `django-redis`:
},
}


Suspending and Resuming Workers
----------------

Sometimes you may want to suspend RQ to prevent it from processing new jobs.
A classic example is during the initial phase of a deployment script or in advance
of putting your site into maintenance mode. This is particularly helpful when
you have jobs that are relatively long-running and might otherwise be forcibly
killed during the deploy.

The `suspend` command stops workers on _all_ queues (in a single Redis database)
from picking up new jobs. However currently running jobs will continue until
completion.

.. code-block:: bash

# Suspend indefinitely
python manage.py rqsuspend

# Suspend for a specific duration (in seconds) then automatically
# resume work again.
python manage.py rqsuspend -d 600

# Resume work again.
python manage.py rqresume


Queue Statistics
----------------

Expand Down
12 changes: 12 additions & 0 deletions django_rq/management/commands/rqresume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from django.core.management.base import BaseCommand
from rq.suspension import resume

from ...queues import get_connection


class Command(BaseCommand):
help = "Resume all queues."

def handle(self, *args, **options):
connection = get_connection()
resume(connection)
36 changes: 36 additions & 0 deletions django_rq/management/commands/rqsuspend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import logging
import sys

from django.core.management.base import BaseCommand
from rq.suspension import suspend

from ...queues import get_connection

log = logging.getLogger(__name__)

class Command(BaseCommand):
help = "Suspend all queues."

def add_arguments(self, parser):
parser.add_argument(
"--duration",
"-d",
type=int,
help="The duration in seconds to suspend the workers. If not provided, workers will be suspended indefinitely",
)

def handle(self, *args, **options):
connection = get_connection()
duration = options.get("duration")

if duration is not None and duration < 1:
log.error("Duration must be an integer greater than 1")
sys.exit(1)

if duration:
suspend(connection, duration)
msg = f"Suspending workers for {duration} seconds. No new jobs will be started during that time, but then will automatically resume"
log.info(msg)
else:
suspend(connection)
log.info("Suspending workers. No new jobs will be started. But current jobs will be completed")
10 changes: 10 additions & 0 deletions django_rq/tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from rq.exceptions import NoSuchJobError
from rq.job import Job
from rq.registry import FinishedJobRegistry, ScheduledJobRegistry
from rq.suspension import is_suspended
from rq.worker import Worker
from rq.serializers import DefaultSerializer, JSONSerializer

Expand Down Expand Up @@ -790,6 +791,15 @@ def test_custom_class(self):
def test_local_override(self):
self.assertIs(get_job_class('django_rq.tests.fixtures.DummyJob'), DummyJob)

class SuspendResumeTest(TestCase):
def test_suspend_and_resume_commands(self):
connection = get_connection()
self.assertEqual(is_suspended(connection), 0)
call_command('rqsuspend')
self.assertEqual(is_suspended(connection), 1)
call_command('rqresume')
self.assertEqual(is_suspended(connection), 0)


class QueueClassTest(TestCase):
def test_default_queue_class(self):
Expand Down
Loading