Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1 from paperless-ngx/apply-pull-patches
Browse files Browse the repository at this point in the history
Chore: Apply useful patches from upstream
  • Loading branch information
stumpylog authored Aug 10, 2022
2 parents bf20d57 + 81f9639 commit 9838d0d
Show file tree
Hide file tree
Showing 7 changed files with 131 additions and 41 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@
- Dropped support for Django 2.2
- Removed Django 2.2 testing
- Added testing for Python 3.10
- Applied the following parent fork PRs:
- https://github.com/Koed00/django-q/pull/603
- https://github.com/Koed00/django-q/pull/604
- https://github.com/Koed00/django-q/pull/605
- https://github.com/Koed00/django-q/pull/423
- https://github.com/Koed00/django-q/pull/672
- https://github.com/Koed00/django-q/pull/659

Full Changes: https://github.com/Koed00/django-q/compare/master...paperless-ngx:paperless-main

Expand Down
12 changes: 0 additions & 12 deletions IDEAS.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
- [Cons](#cons)
- [Simplify Human Hash](#simplify-human-hash)
- [Event Driven vs Polling](#event-driven-vs-polling)
- [Add .editorconfig](#add-editorconfig)
- [Add .pre-commit-config.yaml](#add-pre-commit-configyaml)
- [Sub-classing Process](#sub-classing-process)
- [Better Process Naming](#better-process-naming)
- [File Organization](#file-organization)
- [Useful PRs](#useful-prs)

# Ideas

Expand Down Expand Up @@ -64,10 +62,6 @@ task in the future, instead of polling continuously? Set the future timeout in
This would be a very large change in the architecture, but if it (or even potions of it) are possible, it would be
a much better solution than consistently polling and eating up cycles.

## Add .editorconfig

Pretty easy, adding an .editorconfig file will help keep the styling consistent across multiple authors.

## Add .pre-commit-config.yaml

Along the same idea as above, adding a pre-commit configuration will help enforce styling and formatting,
Expand All @@ -93,9 +87,3 @@ an idea of how many times a task has been re-incarnated or recycled)
The `cluster.py` file contains a lot more than Cluster. Simplify the file by moving other classes to their own
files and common functionality to an appropriate file as well.

## Useful PRs

- https://github.com/Koed00/django-q/pull/603
- https://github.com/Koed00/django-q/pull/604
- https://github.com/Koed00/django-q/pull/605
- https://github.com/Koed00/django-q/pull/423
42 changes: 34 additions & 8 deletions django_q/admin.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
"""Admin module for Django."""
from django.urls import reverse
from django.utils.html import format_html
from django.contrib import admin
from django.utils.translation import gettext_lazy as _
from django.db.models.expressions import OuterRef, Subquery

from django_q.conf import Conf, croniter
from django_q.models import Failure, OrmQ, Schedule, Success
from django_q.models import Failure, OrmQ, Schedule, Success, Task
from django_q.tasks import async_task


class TaskAdmin(admin.ModelAdmin):
"""model admin for success tasks."""

list_display = ("name", "func", "started", "stopped", "time_taken", "group")
list_display = ("name", "group", "func", "started", "stopped", "time_taken")

def has_add_permission(self, request):
"""Don't allow adds."""
Expand Down Expand Up @@ -43,14 +46,14 @@ def retry_failed(FailAdmin, request, queryset):
class FailAdmin(admin.ModelAdmin):
"""model admin for failed tasks."""

list_display = ("name", "func", "started", "stopped", "short_result")
list_display = ("name", "group", "func", "started", "stopped", "short_result")

def has_add_permission(self, request):
"""Don't allow adds."""
return False

actions = [retry_failed]
search_fields = ("name", "func")
search_fields = ("name", "func", "group")
list_filter = ("group",)
readonly_fields = []

Expand All @@ -70,23 +73,46 @@ class ScheduleAdmin(admin.ModelAdmin):
"repeats",
"cluster",
"next_run",
"last_run",
"success",
"get_last_run",
"get_success",
)

# optional cron strings
if not croniter:
readonly_fields = ("cron",)

list_filter = ("next_run", "schedule_type", "cluster")
search_fields = ("func",)
search_fields = ("name", "func",)
list_display_links = ("id", "name")

def get_queryset(self, request):
qs = super().get_queryset(request)
task_query = Task.objects.filter(id=OuterRef('task')).values('id', 'name', 'success')
qs = qs.annotate(task_id=Subquery(task_query.values('id')), task_name=Subquery(task_query.values('name')),
task_success=Subquery(task_query.values('success')))
return qs

def get_success(self, obj):
return obj.task_success
get_success.boolean = True
get_success.short_description = _("success")

def get_last_run(self, obj):
if obj.task_name is not None:
if obj.task_success:
url = reverse("admin:django_q_success_change", args=(obj.task_id,))
else:
url = reverse("admin:django_q_failure_change", args=(obj.task_id,))
return format_html(f'<a href="{url}">[{obj.task_name}]</a>')
return None
get_last_run.allow_tags = True
get_last_run.short_description = _("last_run")


class QueueAdmin(admin.ModelAdmin):
"""queue admin for ORM broker"""

list_display = ("id", "key", "task_id", "name", "func", "lock")
list_display = ("id", "key", "name", "group", "func", "lock", "task_id")

def save_model(self, request, obj, form, change):
obj.save(using=Conf.ORM)
Expand Down
56 changes: 37 additions & 19 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,9 @@ def reincarnate(self, process):
:param process: the process to reincarnate
:type process: Process or None
"""
# close connections before spawning new process
if not Conf.SYNC:
db.connections.close_all() # Close any old connections
db.connections.close_all()
if process == self.monitor:
self.monitor = self.spawn_monitor()
logger.error(_(f"reincarnated monitor {process.name} after sudden death"))
Expand All @@ -238,8 +239,9 @@ def reincarnate(self, process):
def spawn_cluster(self):
self.pool = []
Stat(self).save()
# close connections before spawning new process
if not Conf.SYNC:
db.connection.close()
db.connections.close_all()
# spawn worker pool
for __ in range(self.pool_size):
self.spawn_worker()
Expand Down Expand Up @@ -474,12 +476,21 @@ def save_task(task, broker: Broker):
# SAVE LIMIT > 0: Prune database, SAVE_LIMIT 0: No pruning
close_old_django_connections()
try:
with db.transaction.atomic():
last = Success.objects.select_for_update().last()
if task["success"] and 0 < Conf.SAVE_LIMIT <= Success.objects.count():
last.delete()
if task["success"]:
# first apply per group success history limit
if "group" in task:
with db.transaction.atomic():
qs = Success.objects.filter(group=task["group"])
last = qs.select_for_update().last()
if Conf.SAVE_LIMIT_PER_GROUP <= qs.count():
last.delete()
# then apply global success history limit
with db.transaction.atomic():
last = Success.objects.select_for_update().last()
if Conf.SAVE_LIMIT <= Success.objects.count():
last.delete()
# check if this task has previous results
if Task.objects.filter(id=task["id"], name=task["name"]).exists():
try:
existing_task = Task.objects.get(id=task["id"], name=task["name"])
# only update the result if it hasn't succeeded yet
if not existing_task.success:
Expand All @@ -494,21 +505,22 @@ def save_task(task, broker: Broker):
and existing_task.attempt_count >= Conf.MAX_ATTEMPTS
):
broker.acknowledge(task["ack_id"])

else:
except Task.DoesNotExist:
func = task["func"]
# convert func to string
if inspect.isfunction(func):
func = f"{func.__module__}.{func.__name__}"
elif inspect.ismethod(func):
func = (
func_name = f"{func.__module__}.{func.__name__}"
elif inspect.ismethod(func) and hasattr(func.__self__, '__name__'):
func_name = (
f"{func.__self__.__module__}."
f"{func.__self__.__name__}.{func.__name__}"
)
else:
func_name = str(func)
Task.objects.create(
id=task["id"],
name=task["name"],
func=func,
func=func_name,
hook=task.get("hook"),
args=task["args"],
kwargs=task["kwargs"],
Expand Down Expand Up @@ -598,10 +610,15 @@ def scheduler(broker: Broker = None):
# get args, kwargs and hook
if s.kwargs:
try:
# eval should be safe here because dict()
kwargs = eval(f"dict({s.kwargs})")
except SyntaxError:
kwargs = {}
# first try the dict syntax
kwargs = ast.literal_eval(s.kwargs)
except (SyntaxError, ValueError):
# else use the kwargs syntax
try:
parsed_kwargs = ast.parse(f"f({s.kwargs})").body[0].value.keywords
kwargs = {kwarg.arg: ast.literal_eval(kwarg.value) for kwarg in parsed_kwargs}
except (SyntaxError, ValueError):
kwargs = {}
if s.args:
args = ast.literal_eval(s.args)
# single value won't eval to tuple, so:
Expand Down Expand Up @@ -647,7 +664,8 @@ def scheduler(broker: Broker = None):
if settings.USE_TZ
else next_run.datetime.replace(tzinfo=None)
)
s.repeats += -1
if s.repeats > 0:
s.repeats -= 1
# send it to the cluster
scheduled_broker = broker
try:
Expand Down Expand Up @@ -692,7 +710,7 @@ def close_old_django_connections():
logger.warning(
"Preserving django database connections because sync=True. Beware "
"that tasks are now injected in the calling context/transactions "
"which may result in unexpected bahaviour."
"which may result in unexpected behaviour."
)
else:
db.close_old_connections()
Expand Down
8 changes: 6 additions & 2 deletions django_q/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ class Conf:
# Failures are always saved
SAVE_LIMIT = conf.get("save_limit", 250)

# Maximum number of successful tasks of the same group kept in the database. 0 saves everything. -1 saves none
# Failures are always saved
SAVE_LIMIT_PER_GROUP = conf.get("save_limit_per_group", 5)

# Guard loop sleep in seconds. Should be between 0 and 60 seconds.
GUARD_CYCLE = conf.get("guard_cycle", 0.5)

Expand Down Expand Up @@ -137,8 +141,8 @@ class Conf:
# Verify if retry and timeout settings are correct
if not TIMEOUT or (TIMEOUT > RETRY):
warn(
"""Retry and timeout are misconfigured. Set retry larger than timeout,
failure to do so will cause the tasks to be retriggered before completion.
"""Retry and timeout are misconfigured. Set retry larger than timeout,
failure to do so will cause the tasks to be retriggered before completion.
See https://django-q.readthedocs.io/en/latest/configure.html#retry for details."""
)

Expand Down
6 changes: 6 additions & 0 deletions django_q/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,10 @@ def __str__(self):
return self.func

success.boolean = True
success.short_description = _("success")
last_run.allow_tags = True
last_run.short_description = _("last_run")


class Meta:
app_label = "django_q"
Expand All @@ -246,6 +249,9 @@ def task_id(self):
def name(self):
return self.task()["name"]

def group(self):
return self.task().get("group")

class Meta:
app_label = "django_q"
verbose_name = _("Queued task")
Expand Down
41 changes: 41 additions & 0 deletions django_q/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,47 @@ def test_enqueue(broker, admin_user):
broker.delete_queue()


class TestAsyncSelf:
# __name__ = "TestSelf"

def run(self):
return 5

@pytest.mark.django_db
def test_async_self_method(self, broker):
broker.list_key = "cluster_test:q"
broker.delete_queue()
b = async_task(
self.run,
broker=broker,
)
assert isinstance(b, str)

# run through async
task_queue = Queue()
stop_event = Event()
stop_event.set()
pusher(task_queue, stop_event, broker=broker)
assert broker.queue_size() == 0
assert task_queue.qsize() == 1
task_queue.put("STOP")

result_queue = Queue()
worker(task_queue, result_queue, Value("f", -1))
assert result_queue.qsize() == 1
result_queue.put("STOP")

monitor(result_queue)
assert result_queue.qsize() == 0

# check results
result_b = fetch(b)
assert result_b is not None
assert result_b.success is True
assert result(b) == 5
broker.delete_queue()


@pytest.mark.django_db
@pytest.mark.parametrize(
"cluster_config_timeout, async_task_kwargs",
Expand Down

0 comments on commit 9838d0d

Please sign in to comment.