Skip to content

Commit

Permalink
Merge branch 'master' into allow-runtime-settings-overrides
Browse files Browse the repository at this point in the history
  • Loading branch information
PeterJCLaw committed Jan 27, 2023
2 parents f1e12d6 + f881e7a commit 5f5acbe
Show file tree
Hide file tree
Showing 13 changed files with 216 additions and 120 deletions.
2 changes: 2 additions & 0 deletions django_lightweight_queue/app_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class Settings(Protocol):
REDIS_HOST: str
REDIS_PORT: int
REDIS_PASSWORD: Optional[str]
REDIS_DATABASE: int
REDIS_PREFIX: str

ENABLE_PROMETHEUS: bool
Expand All @@ -57,6 +58,7 @@ class Defaults(Settings):
REDIS_HOST = '127.0.0.1'
REDIS_PORT = 6379
REDIS_PASSWORD = None
REDIS_DATABASE = 0
REDIS_PREFIX = ""

ENABLE_PROMETHEUS = False
Expand Down
6 changes: 6 additions & 0 deletions django_lightweight_queue/backends/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ def deduplicate(
raise NotImplementedError()


class BackendWithClear(BaseBackend, metaclass=ABCMeta):
@abstractmethod
def clear(self, queue: QueueName) -> None:
raise NotImplementedError()


class BackendWithPause(BaseBackend, metaclass=ABCMeta):
@abstractmethod
def pause(self, queue: QueueName, until: datetime.datetime) -> None:
Expand Down
8 changes: 6 additions & 2 deletions django_lightweight_queue/backends/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
import redis

from ..job import Job
from .base import BackendWithPauseResume
from .base import BackendWithClear, BackendWithPauseResume
from ..types import QueueName, WorkerNumber
from ..utils import block_for_time
from ..app_settings import app_settings


class RedisBackend(BackendWithPauseResume):
class RedisBackend(BackendWithPauseResume, BackendWithClear):
"""
This backend has at-most-once semantics.
"""
Expand All @@ -20,6 +20,7 @@ def __init__(self) -> None:
host=app_settings.REDIS_HOST,
port=app_settings.REDIS_PORT,
password=app_settings.REDIS_PASSWORD,
db=app_settings.REDIS_DATABASE,
)

def enqueue(self, job: Job, queue: QueueName) -> None:
Expand Down Expand Up @@ -78,6 +79,9 @@ def resume(self, queue: QueueName) -> None:
def is_paused(self, queue: QueueName) -> bool:
return bool(self.client.exists(self._pause_key(queue)))

def clear(self, queue: QueueName) -> None:
self.client.delete(self._key(queue))

def _key(self, queue: QueueName) -> str:
if app_settings.REDIS_PREFIX:
return '{}:django_lightweight_queue:{}'.format(
Expand Down
12 changes: 10 additions & 2 deletions django_lightweight_queue/backends/reliable_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
import redis

from ..job import Job
from .base import BackendWithDeduplicate, BackendWithPauseResume
from .base import (
BackendWithClear,
BackendWithDeduplicate,
BackendWithPauseResume,
)
from ..types import QueueName, WorkerNumber
from ..utils import block_for_time, get_worker_numbers
from ..app_settings import app_settings
Expand All @@ -15,7 +19,7 @@
T = TypeVar('T')


class ReliableRedisBackend(BackendWithDeduplicate, BackendWithPauseResume):
class ReliableRedisBackend(BackendWithClear, BackendWithDeduplicate, BackendWithPauseResume):
"""
This backend manages a per-queue-per-worker 'processing' queue. E.g. if we
had a queue called 'django_lightweight_queue:things', and two workers, we
Expand All @@ -42,6 +46,7 @@ def __init__(self) -> None:
host=app_settings.REDIS_HOST,
port=app_settings.REDIS_PORT,
password=app_settings.REDIS_PASSWORD,
db=app_settings.REDIS_DATABASE,
)

def startup(self, queue: QueueName) -> None:
Expand Down Expand Up @@ -228,6 +233,9 @@ def resume(self, queue: QueueName) -> None:
def is_paused(self, queue: QueueName) -> bool:
return bool(self.client.exists(self._pause_key(queue)))

def clear(self, queue: QueueName) -> None:
self.client.delete(self._key(queue))

def _key(self, queue: QueueName) -> str:
key = 'django_lightweight_queue:{}'.format(queue)

Expand Down
67 changes: 67 additions & 0 deletions django_lightweight_queue/command_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import warnings
from typing import Any, Optional

from django.core.management.base import BaseCommand, CommandParser

from .utils import load_extra_settings
from .constants import SETTING_NAME_PREFIX


class CommandWithExtraSettings(BaseCommand):
"""
Base class for handling `--extra-settings`.
Derived classes must call `handle_extra_settings` at the top of their
`handle` method. For example:
class Command(CommandWithExtraSettings):
def handle(self, **options: Any) -> None:
super().handle_extra_settings(**options)
...
"""

def add_arguments(self, parser: CommandParser) -> None:
super().add_arguments(parser)

extra_settings_group = parser.add_mutually_exclusive_group()
extra_settings_group.add_argument(
'--config',
action='store',
default=None,
help="The path to an additional django-style config file to load "
"(this spelling is deprecated in favour of '--extra-settings')",
)
extra_settings_group.add_argument(
'--extra-settings',
action='store',
default=None,
help="The path to an additional django-style settings file to load. "
f"{SETTING_NAME_PREFIX}* settings discovered in this file will "
"override those from the default Django settings.",
)

def handle_extra_settings(
self,
*,
config: Optional[str] = None,
extra_settings: Optional[str],
**_: Any
) -> Optional[str]:
"""
Load extra settings if there are any.
Returns the filename (if any) of the extra settings that have been loaded.
"""

if config is not None:
warnings.warn(
"Use of '--config' is deprecated in favour of '--extra-settings'.",
category=DeprecationWarning,
)
extra_settings = config

# Configuration overrides
if extra_settings is not None:
load_extra_settings(extra_settings)

return extra_settings
51 changes: 51 additions & 0 deletions django_lightweight_queue/management/commands/queue_clear.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import argparse

from django.core.management.base import BaseCommand, CommandError

from ...types import QueueName
from ...utils import get_backend
from ...backends.base import BackendWithClear


class Command(BaseCommand):
help = """
Command to clear work on a redis-backed queue.
All pending jobs will be deleted from the queue. In flight jobs won't be
affected.
""" # noqa:A003 # inherited name

def add_arguments(self, parser: argparse.ArgumentParser) -> None:
parser.add_argument(
'queue',
action='store',
help="The queue to pause.",
)

parser.add_argument(
'--yes',
dest='skip_prompt',
action='store_true',
help="Skip confirmation prompt.",
)

def handle(self, queue: QueueName, skip_prompt: bool = False, **options: object) -> None:

backend = get_backend(queue)

if not isinstance(backend, BackendWithClear):
raise CommandError(
"Configured backend '{}.{}' doesn't support clearing".format(
type(backend).__module__,
type(backend).__name__,
),
)

if not skip_prompt:
prompt = "Clear all jobs from queue {}) [y/N] ".format(queue)
choice = input(prompt).lower()

if choice != "y":
raise CommandError("Aborting")

backend.clear(queue)
Original file line number Diff line number Diff line change
@@ -1,46 +1,14 @@
import warnings
from typing import Any

from django.core.management.base import BaseCommand, CommandParser

from ...utils import get_backend, get_queue_counts, load_extra_settings
from ...constants import SETTING_NAME_PREFIX
from ...utils import get_backend, get_queue_counts
from ...app_settings import app_settings
from ...command_utils import CommandWithExtraSettings
from ...cron_scheduler import get_cron_config


class Command(BaseCommand):
def add_arguments(self, parser: CommandParser) -> None:
extra_settings_group = parser.add_mutually_exclusive_group()
extra_settings_group.add_argument(
'--config',
action='store',
default=None,
help="The path to an additional django-style config file to load "
"(this spelling is deprecated in favour of '--extra-settings')",
)
extra_settings_group.add_argument(
'--extra-settings',
action='store',
default=None,
help="The path to an additional django-style settings file to load. "
f"{SETTING_NAME_PREFIX}* settings discovered in this file will "
"override those from the default Django settings.",
)

class Command(CommandWithExtraSettings):
def handle(self, **options: Any) -> None:
extra_config = options.pop('config')
if extra_config is not None:
warnings.warn(
"Use of '--config' is deprecated in favour of '--extra-settings'.",
category=DeprecationWarning,
)
options['extra_settings'] = extra_config

# Configuration overrides
extra_settings = options['extra_settings']
if extra_settings is not None:
load_extra_settings(extra_settings)
super().handle_extra_settings(**options)

print("django-lightweight-queue")
print("========================")
Expand Down
64 changes: 16 additions & 48 deletions django_lightweight_queue/management/commands/queue_runner.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,21 @@
import warnings
from typing import Any, Dict, Optional

import daemonize

from django.apps import apps
from django.core.management.base import (
BaseCommand,
CommandError,
CommandParser,
)
from django.core.management.base import CommandError, CommandParser

from ...types import QueueName
from ...utils import (
get_logger,
get_backend,
get_middleware,
load_extra_settings,
)
from ...utils import get_logger, get_backend, get_middleware
from ...runner import runner
from ...constants import SETTING_NAME_PREFIX
from ...command_utils import CommandWithExtraSettings
from ...machine_types import Machine, PooledMachine, DirectlyConfiguredMachine


class Command(BaseCommand):
class Command(CommandWithExtraSettings):
def add_arguments(self, parser: CommandParser) -> None:
super().add_arguments(parser)

parser.add_argument(
'--pidfile',
action='store',
Expand Down Expand Up @@ -58,22 +50,6 @@ def add_arguments(self, parser: CommandParser) -> None:
default=None,
help="Only run the given queue, useful for local debugging",
)
extra_settings_group = parser.add_mutually_exclusive_group()
extra_settings_group.add_argument(
'--config',
action='store',
default=None,
help="The path to an additional django-style config file to load "
"(this spelling is deprecated in favour of '--extra-settings')",
)
extra_settings_group.add_argument(
'--extra-settings',
action='store',
default=None,
help="The path to an additional django-style settings file to load. "
f"{SETTING_NAME_PREFIX}* settings discovered in this file will "
"override those from the default Django settings.",
)
parser.add_argument(
'--exact-configuration',
action='store_true',
Expand All @@ -83,19 +59,11 @@ def add_arguments(self, parser: CommandParser) -> None:
" '--of'.",
)

def validate_and_normalise(self, options: Dict[str, Any]) -> None:
extra_config = options.pop('config')
if extra_config is not None:
warnings.warn(
"Use of '--config' is deprecated in favour of '--extra-settings'.",
category=DeprecationWarning,
)
options['extra_settings'] = extra_config

def validate_and_normalise(self, options: Dict[str, Any], had_extra_settings: bool) -> None:
if options['exact_configuration']:
if not options['extra_settings']:
if not had_extra_settings:
raise CommandError(
"Must provide a value for '--config' when using "
"Must provide a value for '--extra-settings' when using "
"'--exact-configuration'.",
)

Expand Down Expand Up @@ -127,19 +95,19 @@ def validate_and_normalise(self, options: Dict[str, Any]) -> None:
def handle(self, **options: Any) -> None:
logger = get_logger('dlq.master')

self.validate_and_normalise(options)
extra_settings = super().handle_extra_settings(**options)

self.validate_and_normalise(
options,
had_extra_settings=extra_settings is not None,
)

def touch_filename(name: str) -> Optional[str]:
try:
return options['touchfile'] % name
except TypeError:
return None

# Configuration overrides
extra_config = options['extra_settings']
if extra_config is not None:
load_extra_settings(extra_config)

logger.info("Starting queue master")

# Ensure children will be able to import our backend
Expand All @@ -164,7 +132,7 @@ def touch_filename(name: str) -> Optional[str]:
)

def run() -> None:
runner(touch_filename, machine, logger)
runner(touch_filename, machine, logger, extra_settings)

# fork() only after we have started enough to catch failure, including
# being able to write to our pidfile.
Expand Down
Loading

0 comments on commit 5f5acbe

Please sign in to comment.