Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Commit

Permalink
feat(worker): adds job config.
Browse files Browse the repository at this point in the history
Default job config can be setup by environment, and individual jobs can
be explicitly configured with `JobConfig`.

Closes #169
  • Loading branch information
peterschutt committed Jan 11, 2023
1 parent 87de918 commit 5543a9a
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 20 deletions.
10 changes: 9 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ API_HEALTH_PATH=/health
# Log
LOG_EXCLUDE_PATHS="\A(?!x)x"
LOG_HTTP_EVENT="HTTP"
c$LOG_INCLUDE_COMPRESSED_BODY=false
LOG_INCLUDE_COMPRESSED_BODY=false
LOG_LEVEL=20
LOG_OBFUSCATE_COOKIES='["session"]'
LOG_OBFUSCATE_HEADERS='["Authorization","X-API-KEY"]'
Expand Down Expand Up @@ -56,3 +56,11 @@ SERVER_KEEPALIVE=65
SERVER_PORT=8000
SERVER_RELOAD=false
SERVER_RELOAD_DIRS='[]'

# Worker
WORKER_JOB_TIMEOUT=10
WORKER_JOB_HEARTBEAT=0
WORKER_JOB_RETRIES=10
WORKER_JOB_TTL=600
WORKER_JOB_RETRY_DELAY=1.0
WORKER_JOB_RETRY_BACKOFF=60
35 changes: 23 additions & 12 deletions src/starlite_saqlalchemy/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
import logging
from typing import TYPE_CHECKING, Any, ClassVar, Generic, TypeVar

from saq.job import Job

from starlite_saqlalchemy import utils
from starlite_saqlalchemy.db import async_session_factory
from starlite_saqlalchemy.exceptions import NotFoundError
from starlite_saqlalchemy.repository.sqlalchemy import ModelT
from starlite_saqlalchemy.worker import queue
from starlite_saqlalchemy.worker import default_job_config_dict, queue

if TYPE_CHECKING:
from collections.abc import AsyncIterator
Expand All @@ -23,6 +26,7 @@

from starlite_saqlalchemy.repository.abc import AbstractRepository
from starlite_saqlalchemy.repository.types import FilterTypes
from starlite_saqlalchemy.worker import JobConfig


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -121,23 +125,34 @@ async def delete(self, id_: Any) -> T:
"""
raise NotFoundError

async def enqueue_background_task(self, method_name: str, **kwargs: Any) -> None:
async def enqueue_background_task(
self, method_name: str, job_config: JobConfig | None = None, **kwargs: Any
) -> None:
"""Enqueue an async callback for the operation and data.
Args:
method_name: Method on the service object that should be called by the async worker.
job_config: Configuration object to control the job that is enqueued.
**kwargs: Arguments to be passed to the method when called. Must be JSON serializable.
"""
module = inspect.getmodule(self)
if module is None: # pragma: no cover
logger.warning("Callback not enqueued, no module resolved for %s", self)
return
await queue.enqueue(
make_service_callback.__qualname__,
service_type_id=self.__id__,
service_method_name=method_name,
**kwargs,
job_config_dict: dict[str, Any]
if job_config is None:
job_config_dict = default_job_config_dict
else:
job_config_dict = utils.dataclass_as_dict_shallow(job_config, exclude_none=True)

kwargs["service_type_id"] = self.__id__
kwargs["service_method_name"] = method_name
job = Job(
function=make_service_callback.__qualname__,
kwargs=kwargs,
**job_config_dict,
)
await queue.enqueue(job)

@classmethod
@contextlib.asynccontextmanager
Expand Down Expand Up @@ -249,11 +264,7 @@ async def new(cls: type[RepoServiceT]) -> AsyncIterator[RepoServiceT]:


async def make_service_callback(
_ctx: Context,
*,
service_type_id: str,
service_method_name: str,
**kwargs: Any,
_ctx: Context, *, service_type_id: str, service_method_name: str, **kwargs: Any
) -> None:
"""Make an async service callback.
Expand Down
40 changes: 40 additions & 0 deletions src/starlite_saqlalchemy/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,44 @@ class Config:
EXPONENTIAL_BACKOFF_MULTIPLIER: float = 1


class WorkerSettings(BaseSettings):
"""Global SAQ Job configuration."""

class Config:
case_sensitive = True
env_file = ".env"
env_prefix = "WORKER_"

JOB_TIMEOUT: int = 10
"""Max time a job can run for, in seconds.
Set to `0` for no timeout.
"""
JOB_HEARTBEAT: int = 0
"""Max time a job can survive without emitting a heartbeat. `0` to disable.
`job.update()` will trigger a heartbeat.
"""
JOB_RETRIES: int = 10
"""Max attempts for any job."""
JOB_TTL: int = 600
"""Lifetime of available job information, in seconds.
0: indefinite
-1: disabled (no info retained)
"""
JOB_RETRY_DELAY: float = 1.0
"""Seconds to delay before retrying a job."""
JOB_RETRY_BACKOFF: bool | float = 60
"""If true, use exponential backoff for retry delays.
- The first retry will have whatever retry_delay is.
- The second retry will have retry_delay*2. The third retry will have retry_delay*4. And so on.
- This always includes jitter, where the final retry delay is a random number between 0 and the calculated retry delay.
- If retry_backoff is set to a number, that number is the maximum retry delay, in seconds."
"""


# `.parse_obj()` thing is a workaround for pyright and pydantic interplay, see:
# https://github.com/pydantic/pydantic/issues/3753#issuecomment-1087417884
api = APISettings.parse_obj({})
Expand All @@ -275,3 +313,5 @@ class Config:
"""Sentry settings."""
server = ServerSettings.parse_obj({})
"""Server settings."""
worker = WorkerSettings.parse_obj({})
"""Worker settings."""
14 changes: 14 additions & 0 deletions src/starlite_saqlalchemy/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""General utility functions."""
import dataclasses
from typing import Any


def dataclass_as_dict_shallow(dataclass: Any, *, exclude_none: bool = False) -> dict[str, Any]:
"""Convert a dataclass to dict, without deepcopy."""
ret: dict[str, Any] = {}
for field in dataclasses.fields(dataclass):
value = getattr(dataclass, field.name)
if exclude_none and value is None:
continue
ret[field.name] = value
return ret
52 changes: 51 additions & 1 deletion src/starlite_saqlalchemy/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@
from __future__ import annotations

import asyncio
import dataclasses
from functools import partial
from typing import TYPE_CHECKING, Any

import msgspec
import saq
from starlite.utils.serialization import default_serializer

from starlite_saqlalchemy import redis, settings, type_encoders
from starlite_saqlalchemy import redis, settings, type_encoders, utils

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Collection
from signal import Signals

__all__ = [
"JobConfig",
"Queue",
"Worker",
"create_worker_instance",
"default_job_config_dict",
"queue",
]

Expand Down Expand Up @@ -70,6 +73,53 @@ async def on_app_startup(self) -> None: # pragma: no cover
"""


@dataclasses.dataclass()
class JobConfig:
"""Configure a Job.
Used to configure jobs enqueued via
`Service.enqueue_background_task()`
"""

# pylint:disable=too-many-instance-attributes

queue: Queue = queue
"""Queue associated with the job."""
key: str | None = None
"""Pass in to control duplicate jobs."""
timeout: int = settings.worker.JOB_TIMEOUT
"""Max time a job can run for, in seconds.
Set to `0` for no timeout.
"""
heartbeat: int = settings.worker.JOB_HEARTBEAT
"""Max time a job can survive without emitting a heartbeat. `0` to disable.
`job.update()` will trigger a heartbeat.
"""
retries: int = settings.worker.JOB_RETRIES
"""Max attempts for any job."""
ttl: int = settings.worker.JOB_TTL
"""Lifetime of available job information, in seconds.
0: indefinite
-1: disabled (no info retained)
"""
retry_delay: float = settings.worker.JOB_TTL
"""Seconds to delay before retrying a job."""
retry_backoff: bool | float = settings.worker.JOB_RETRY_BACKOFF
"""If true, use exponential backoff for retry delays.
- The first retry will have whatever retry_delay is.
- The second retry will have retry_delay*2. The third retry will have retry_delay*4. And so on.
- This always includes jitter, where the final retry delay is a random number between 0 and the calculated retry delay.
- If retry_backoff is set to a number, that number is the maximum retry delay, in seconds."
"""


default_job_config_dict = utils.dataclass_as_dict_shallow(JobConfig(), exclude_none=True)


def create_worker_instance(
functions: Collection[Callable[..., Any] | tuple[str, Callable]],
before_process: Callable[[dict[str, Any]], Awaitable[Any]] | None = None,
Expand Down
16 changes: 10 additions & 6 deletions tests/unit/test_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from uuid import uuid4

import pytest
from saq import Job

from starlite_saqlalchemy import db, service, worker
from starlite_saqlalchemy.exceptions import NotFoundError
Expand Down Expand Up @@ -119,12 +120,15 @@ async def test_enqueue_service_callback(monkeypatch: "MonkeyPatch") -> None:
monkeypatch.setattr(worker.queue, "enqueue", enqueue_mock)
service_instance = domain.authors.Service(session=db.async_session_factory())
await service_instance.enqueue_background_task("receive_callback", raw_obj={"a": "b"})
enqueue_mock.assert_called_once_with(
"make_service_callback",
service_type_id="tests.utils.domain.authors.Service",
service_method_name="receive_callback",
raw_obj={"a": "b"},
)
enqueue_mock.assert_called_once()
assert isinstance(enqueue_mock.mock_calls[0].args[0], Job)
job = enqueue_mock.mock_calls[0].args[0]
assert job.function == service.make_service_callback.__qualname__
assert job.kwargs == {
"service_type_id": "tests.utils.domain.authors.Service",
"service_method_name": "receive_callback",
"raw_obj": {"a": "b"},
}


async def test_service_new_context_manager() -> None:
Expand Down

0 comments on commit 5543a9a

Please sign in to comment.