Skip to content

Commit

Permalink
Merge pull request #1061 from onlyann/improved-type-annotations
Browse files Browse the repository at this point in the history
  • Loading branch information
ewjoachim authored May 21, 2024
2 parents 49a54aa + 7b45614 commit 5c99ca7
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 100 deletions.
134 changes: 101 additions & 33 deletions procrastinate/blueprints.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@
import functools
import logging
import sys
from typing import TYPE_CHECKING, Any, Callable
from typing import TYPE_CHECKING, Any, Callable, Literal, Union, cast, overload

from typing_extensions import Concatenate, ParamSpec, Unpack

from procrastinate import exceptions, jobs, periodic, retry, utils
from procrastinate.job_context import JobContext

if TYPE_CHECKING:
from procrastinate import tasks
from procrastinate.tasks import ConfigureTaskOptions, Task


logger = logging.getLogger(__name__)

P = ParamSpec("P")


class Blueprint:
"""
Expand Down Expand Up @@ -63,7 +69,7 @@ def my_task():
"""

def __init__(self) -> None:
self.tasks: dict[str, tasks.Task] = {}
self.tasks: dict[str, Task] = {}
self.periodic_registry = periodic.PeriodicRegistry()
self._check_stack()

Expand All @@ -88,7 +94,7 @@ def _check_stack(self):
extra={"action": "app_defined_in___main__"},
)

def _register_task(self, task: tasks.Task) -> None:
def _register_task(self, task: Task) -> None:
"""
Register the task into the blueprint task registry.
Raises exceptions.TaskAlreadyRegistered if the task name
Expand All @@ -98,15 +104,15 @@ def _register_task(self, task: tasks.Task) -> None:
# Each call to _add_task may raise TaskAlreadyRegistered.
# We're using an intermediary dict to make sure that if the registration
# is interrupted midway though, self.tasks is left unmodified.
to_add: dict[str, tasks.Task] = {}
to_add: dict[str, Task] = {}
self._add_task(task=task, name=task.name, to=to_add)

for alias in task.aliases:
self._add_task(task=task, name=alias, to=to_add)

self.tasks.update(to_add)

def _add_task(self, task: tasks.Task, name: str, to: dict | None = None) -> None:
def _add_task(self, task: Task, name: str, to: dict | None = None) -> None:
# Add a task to a dict of task while making
# sure a task of the same name was not already in self.tasks.
# This lets us prepare a dict of tasks we might add while not adding
Expand All @@ -120,7 +126,7 @@ def _add_task(self, task: tasks.Task, name: str, to: dict | None = None) -> None
result_dict = self.tasks if to is None else to
result_dict[name] = task

def add_task_alias(self, task: tasks.Task, alias: str) -> None:
def add_task_alias(self, task: Task, alias: str) -> None:
"""
Add an alias to a task. This can be useful if a task was in a given
Blueprint and moves to a different blueprint.
Expand Down Expand Up @@ -189,37 +195,22 @@ def add_tasks_from(self, blueprint: Blueprint, *, namespace: str) -> None:
configure_kwargs=periodic_task.configure_kwargs,
)

@overload
def task(
self,
_func: Callable[..., Any] | None = None,
*,
_func: None = None,
name: str | None = None,
aliases: list[str] | None = None,
retry: retry.RetryValue = False,
pass_context: bool = False,
pass_context: Literal[False] = False,
queue: str = jobs.DEFAULT_QUEUE,
lock: str | None = None,
queueing_lock: str | None = None,
) -> Any:
"""
Declare a function as a task. This method is meant to be used as a decorator::
@app.task(...)
def my_task(args):
...
or::
@app.task
def my_task(args):
...
The second form will use the default value for all parameters.
) -> Callable[[Callable[P]], Task[P, P]]:
"""Declare a function as a task. This method is meant to be used as a decorator
Parameters
----------
_func :
The decorated function
queue :
The name of the queue in which jobs from this task will be launched, if
the queue is not overridden at launch.
Expand Down Expand Up @@ -251,11 +242,73 @@ def my_task(args):
pass_context :
Passes the task execution context in the task as first
"""
...

@overload
def task(
self,
*,
_func: None = None,
name: str | None = None,
aliases: list[str] | None = None,
retry: retry.RetryValue = False,
pass_context: Literal[True],
queue: str = jobs.DEFAULT_QUEUE,
lock: str | None = None,
queueing_lock: str | None = None,
) -> Callable[
[Callable[Concatenate[JobContext, P]]],
Task[Concatenate[JobContext, P], P],
]:
"""Declare a function as a task. This method is meant to be used as a decorator
Parameters
----------
_func :
The decorated function
"""
...

def _wrap(func: Callable[..., tasks.Task]):
from procrastinate import tasks
@overload
def task(self, _func: Callable[P]) -> Task[P, P]:
"""Declare a function as a task. This method is meant to be used as a decorator
Parameters
----------
_func :
The decorated function
"""
...

def task(
self,
_func: Callable[P] | None = None,
*,
name: str | None = None,
aliases: list[str] | None = None,
retry: retry.RetryValue = False,
pass_context: bool = False,
queue: str = jobs.DEFAULT_QUEUE,
lock: str | None = None,
queueing_lock: str | None = None,
):
from procrastinate.tasks import Task

task = tasks.Task(
"""
Declare a function as a task. This method is meant to be used as a decorator::
@app.task(...)
def my_task(args):
...
or::
@app.task
def my_task(args):
...
The second form will use the default value for all parameters.
"""

def _wrap(func: Callable[P]):
task = Task(
func,
blueprint=self,
queue=queue,
Expand All @@ -271,11 +324,26 @@ def _wrap(func: Callable[..., tasks.Task]):
return functools.update_wrapper(task, func, updated=())

if _func is None: # Called as @app.task(...)
return _wrap
return cast(
Union[
Callable[[Callable[P, Any]], Task[P, P]],
Callable[
[Callable[Concatenate[JobContext, P], Any]],
Task[Concatenate[JobContext, P], P],
],
],
_wrap,
)

return _wrap(_func) # Called as @app.task

def periodic(self, *, cron: str, periodic_id: str = "", **kwargs: dict[str, Any]):
def periodic(
self,
*,
cron: str,
periodic_id: str = "",
**configure_kwargs: Unpack[ConfigureTaskOptions],
):
"""
Task decorator, marks task as being scheduled for periodic deferring (see
`howto/advanced/cron`).
Expand All @@ -290,7 +358,7 @@ def periodic(self, *, cron: str, periodic_id: str = "", **kwargs: dict[str, Any]
Additional parameters are passed to `Task.configure`.
"""
return self.periodic_registry.periodic_decorator(
cron=cron, periodic_id=periodic_id, **kwargs
cron=cron, periodic_id=periodic_id, **configure_kwargs
)

def will_configure_task(self) -> None:
Expand Down
41 changes: 29 additions & 12 deletions procrastinate/periodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
import functools
import logging
import time
from typing import Any, Iterable, Tuple
from typing import Callable, Generic, Iterable, Tuple, cast

import attr
import croniter
from typing_extensions import Concatenate, ParamSpec, Unpack

from procrastinate import exceptions, tasks

P = ParamSpec("P")
Args = ParamSpec("Args")

# The maximum delay after which tasks will be considered as
# outdated, and ignored.
MAX_DELAY = 60 * 10 # 10 minutes
Expand All @@ -24,11 +28,11 @@


@attr.dataclass(frozen=True)
class PeriodicTask:
task: tasks.Task
class PeriodicTask(Generic[P, Args]):
task: tasks.Task[P, Args]
cron: str
periodic_id: str
configure_kwargs: dict[str, Any]
configure_kwargs: tasks.ConfigureTaskOptions

@cached_property
def croniter(self) -> croniter.croniter:
Expand All @@ -42,28 +46,36 @@ class PeriodicRegistry:
def __init__(self):
self.periodic_tasks: dict[tuple[str, str], PeriodicTask] = {}

def periodic_decorator(self, cron: str, periodic_id: str, **kwargs):
def periodic_decorator(
self,
cron: str,
periodic_id: str,
**configure_kwargs: Unpack[tasks.ConfigureTaskOptions],
) -> Callable[[tasks.Task[P, Concatenate[int, Args]]], tasks.Task[P, Args]]:
"""
Decorator over a task definition that registers that task for periodic
launch. This decorator should not be used directly, ``@app.periodic()`` is meant
to be used instead.
"""

def wrapper(task: tasks.Task):
def wrapper(task: tasks.Task[P, Concatenate[int, Args]]) -> tasks.Task[P, Args]:
self.register_task(
task=task, cron=cron, periodic_id=periodic_id, configure_kwargs=kwargs
task=task,
cron=cron,
periodic_id=periodic_id,
configure_kwargs=configure_kwargs,
)
return task
return cast(tasks.Task[P, Args], task)

return wrapper

def register_task(
self,
task: tasks.Task,
task: tasks.Task[P, Concatenate[int, Args]],
cron: str,
periodic_id: str,
configure_kwargs: dict[str, Any],
) -> PeriodicTask:
configure_kwargs: tasks.ConfigureTaskOptions,
) -> PeriodicTask[P, Concatenate[int, Args]]:
key = (task.name, periodic_id)
if key in self.periodic_tasks:
raise exceptions.TaskAlreadyRegistered(
Expand Down Expand Up @@ -190,7 +202,12 @@ async def defer_jobs(self, jobs_to_defer: Iterable[TaskAtTime]) -> None:
task = periodic_task.task
periodic_id = periodic_task.periodic_id
configure_kwargs = periodic_task.configure_kwargs
configure_kwargs.setdefault("task_kwargs", {})["timestamp"] = timestamp
task_kwargs = configure_kwargs.get("task_kwargs")
if task_kwargs is None:
task_kwargs = {}
configure_kwargs["task_kwargs"] = task_kwargs
task_kwargs["timestamp"] = timestamp

description = {
"task_name": task.name,
"periodic_id": periodic_id,
Expand Down
Loading

0 comments on commit 5c99ca7

Please sign in to comment.