Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add exit codes on timeout and failed jobs #20

Merged
merged 1 commit into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion backend/bin/main/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
async def run() -> None:
plugin_registration.register_plugins()
settings = app.Settings()
application = app.Application.from_settings(settings)
try:
application = app.Application.from_settings(settings)
except Exception as exc:
logger.exception("Failed to initialize application settings")
raise app.ServerStartError("Failed to initialize application settings") from exc

try:
await application.start()
Expand Down
14 changes: 3 additions & 11 deletions backend/lib/app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,3 @@
from .app import Application
from .errors import ApplicationError, DisposeError, StartServerError
from .settings import Settings

__all__ = [
"Application",
"ApplicationError",
"DisposeError",
"Settings",
"StartServerError",
]
from .app import *
from .errors import *
from .settings import *
18 changes: 13 additions & 5 deletions backend/lib/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,30 +170,38 @@ async def start(self) -> None:
await self._lifecycle_manager.on_startup()
except lifecycle_manager_utils.LifecycleManager.StartupError as start_error:
logger.error("Application has failed to start")
raise app_errors.StartServerError("Application has failed to start, see logs above") from start_error
raise app_errors.ServerStartError("Application has failed to start, see logs above") from start_error

logger.info("Application is starting")
try:
await self._start()
except asyncio.CancelledError:
logger.info("Application has been interrupted")
except BaseException as unexpected_error:
logger.exception("Application failed to start")

raise app_errors.StartServerError("Application failed to start") from unexpected_error
logger.exception("Application runtime error")
raise app_errors.ServerRuntimeError("Application runtime error") from unexpected_error

async def _start(self) -> None:
timer = asyncio_utils.TimeoutTimer(timeout=self._settings.tasks.timeout)

while not timer.is_expired:
all_topics_finished = all(
self._queue_repository.is_topic_finished(topic) for topic in task_repositories.TOPICS
self._queue_repository.is_topic_finished(topic) for topic in task_repositories.JOB_TOPICS
)
if self._aiojobs_scheduler.is_empty and all_topics_finished:
break
await asyncio.sleep(1)
else:
logger.warning("Application has timed out and will be stopped prematurely")
raise app_errors.ApplicationTimeoutError("Application has timed out")

logger.info("Application has finished successfully")
failed_topics_empty = all(
self._queue_repository.is_topic_empty(topic) for topic in task_repositories.FAILED_JOB_TOPICS
)
if not failed_topics_empty:
logger.warning("Application has finished with failed jobs")
raise app_errors.ApplicationFailedJobsError("Application has finished with failed jobs")

async def dispose(self) -> None:
logger.info("Application is shutting down...")
Expand Down
18 changes: 16 additions & 2 deletions backend/lib/app/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,26 @@ class DisposeError(ApplicationError):
pass


class StartServerError(ApplicationError):
class ServerStartError(ApplicationError):
pass


class ServerRuntimeError(ApplicationError):
pass


class ApplicationFailedJobsError(ApplicationError):
pass


class ApplicationTimeoutError(ApplicationError):
pass


__all__ = [
"ApplicationError",
"ApplicationFailedJobsError",
"ApplicationTimeoutError",
"DisposeError",
"StartServerError",
"ServerStartError",
]
2 changes: 2 additions & 0 deletions backend/lib/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ def settings_customise_sources(

__all__ = [
"AppSettings",
"JobProcessorSettings",
"LoggingSettings",
"Settings",
"TasksSettings",
]
10 changes: 5 additions & 5 deletions backend/lib/task/jobs/event_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def name(self) -> str:

async def _process(self) -> None:
try:
async with self._queue_repository.acquire(topic=task_repositories.Topic.EVENT_JOB) as event_job:
async with self._queue_repository.acquire(topic=task_repositories.JobTopic.EVENT) as event_job:
assert isinstance(event_job, task_job_models.EventJob)
logger.debug("Processing EventJob(%s)", event_job.id)
try:
Expand All @@ -43,20 +43,20 @@ async def _process(self) -> None:
logger.error("EventJob(%s) has failed", event_job.id)
if event_job.retry_count + 1 < self._max_retries:
await self._queue_repository.push(
topic=task_repositories.Topic.EVENT_JOB,
topic=task_repositories.JobTopic.EVENT,
item=event_job.copy_retry(),
validate_not_closed=False,
)
else:
logger.error("EventJob(%s) has reached max retries", event_job.id)
await self._queue_repository.push(
topic=task_repositories.Topic.FAILED_EVENT_JOB,
topic=task_repositories.JobTopic.FAILED_EVENT,
item=event_job,
)
await self._queue_repository.consume(topic=task_repositories.Topic.EVENT_JOB, item=event_job)
await self._queue_repository.consume(topic=task_repositories.JobTopic.EVENT, item=event_job)
raise
else:
await self._queue_repository.consume(topic=task_repositories.Topic.EVENT_JOB, item=event_job)
await self._queue_repository.consume(topic=task_repositories.JobTopic.EVENT, item=event_job)
logger.debug("EventJob(%s) has been processed", event_job.id)
except task_repositories.QueueRepositoryProtocol.TopicFinished:
logger.debug("Event queue is closed, finishing job")
Expand Down
14 changes: 7 additions & 7 deletions backend/lib/task/jobs/task_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def name(self) -> str:

async def _process(self) -> None:
try:
async with self._queue_repository.acquire(topic=task_repositories.Topic.TASK_JOB) as task_job:
async with self._queue_repository.acquire(topic=task_repositories.JobTopic.TASK) as task_job:
assert isinstance(task_job, task_job_models.TaskJob)
logging.debug("Processing TaskJob(%s)", task_job.id)
try:
Expand All @@ -42,31 +42,31 @@ async def _process(self) -> None:
logger.error("TaskJob(%s) has failed", task_job.id)
if task_job.retry_count + 1 < self._max_retries:
await self._queue_repository.push(
topic=task_repositories.Topic.TASK_JOB,
topic=task_repositories.JobTopic.TASK,
item=task_job.copy_retry(),
validate_not_closed=False,
)
else:
logger.error("TaskJob(%s) has reached max retries", task_job.id)
await self._queue_repository.push(
topic=task_repositories.Topic.FAILED_TASK_JOB,
topic=task_repositories.JobTopic.FAILED_TASK,
item=task_job,
)
await self._queue_repository.consume(topic=task_repositories.Topic.TASK_JOB, item=task_job)
await self._queue_repository.consume(topic=task_repositories.JobTopic.TASK, item=task_job)
raise
else:
await self._queue_repository.consume(topic=task_repositories.Topic.TASK_JOB, item=task_job)
await self._queue_repository.consume(topic=task_repositories.JobTopic.TASK, item=task_job)
logger.debug("TaskJob(%s) has been processed", task_job.id)
except task_repositories.QueueRepositoryProtocol.TopicFinished:
logger.debug("Task queue is closed, finishing job")
await self._queue_repository.close_topic(topic=task_repositories.Topic.TRIGGER_JOB)
await self._queue_repository.close_topic(topic=task_repositories.JobTopic.TRIGGER)
self.finish()

async def _process_task(self, task_job: task_job_models.TaskJob) -> None:
task = task_job.task
for trigger in task.triggers:
await self._queue_repository.push(
topic=task_repositories.Topic.TRIGGER_JOB,
topic=task_repositories.JobTopic.TRIGGER,
item=task_job_models.TriggerJob(
id=f"{task.id}/{trigger.id}",
task_id=task.id,
Expand Down
4 changes: 2 additions & 2 deletions backend/lib/task/jobs/task_spawner.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ async def _process(self) -> None:

for task in config.tasks:
await self._queue_repository.push(
topic=task_repositories.Topic.TASK_JOB,
topic=task_repositories.JobTopic.TASK,
item=task_jobs_models.TaskJob(
id=task.id,
task=task,
Expand All @@ -40,7 +40,7 @@ async def _process(self) -> None:
logger.debug("Task(%s) was spawned", task.id)

logger.debug("All tasks have been spawned, closing task topic")
await self._queue_repository.close_topic(task_repositories.Topic.TASK_JOB)
await self._queue_repository.close_topic(task_repositories.JobTopic.TASK)


__all__ = [
Expand Down
14 changes: 7 additions & 7 deletions backend/lib/task/jobs/trigger_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def name(self) -> str:

async def _process(self) -> None:
try:
async with self._queue_repository.acquire(topic=task_repositories.Topic.TRIGGER_JOB) as trigger_job:
async with self._queue_repository.acquire(topic=task_repositories.JobTopic.TRIGGER) as trigger_job:
assert isinstance(trigger_job, task_job_models.TriggerJob)
logger.debug("Processing TriggerJob(%s)", trigger_job.id)
try:
Expand All @@ -45,24 +45,24 @@ async def _process(self) -> None:
logger.exception("Error processing TriggerJob(%s)", trigger_job.id)
if trigger_job.retry_count + 1 < self._max_retries:
await self._queue_repository.push(
topic=task_repositories.Topic.TRIGGER_JOB,
topic=task_repositories.JobTopic.TRIGGER,
item=trigger_job.copy_retry(),
validate_not_closed=False,
)
else:
logger.error("TriggerJob(%s) has reached max retries", trigger_job.id)
await self._queue_repository.push(
topic=task_repositories.Topic.FAILED_TRIGGER_JOB,
topic=task_repositories.JobTopic.FAILED_TRIGGER,
item=trigger_job,
)
await self._queue_repository.consume(topic=task_repositories.Topic.TRIGGER_JOB, item=trigger_job)
await self._queue_repository.consume(topic=task_repositories.JobTopic.TRIGGER, item=trigger_job)
raise
else:
await self._queue_repository.consume(topic=task_repositories.Topic.TRIGGER_JOB, item=trigger_job)
await self._queue_repository.consume(topic=task_repositories.JobTopic.TRIGGER, item=trigger_job)
logger.debug("TriggerJob(%s) has been processed", trigger_job.id)
except task_repositories.QueueRepositoryProtocol.TopicFinished:
logger.debug("Trigger queue is closed, finishing job")
await self._queue_repository.close_topic(topic=task_repositories.Topic.EVENT_JOB)
await self._queue_repository.close_topic(topic=task_repositories.JobTopic.EVENT)
self.finish()

async def _process_trigger(self, trigger_job: task_job_models.TriggerJob) -> None:
Expand All @@ -79,7 +79,7 @@ async def _process_trigger(self, trigger_job: task_job_models.TriggerJob) -> Non
async for raw_event in trigger_processor.produce_events():
for action in trigger_job.actions:
await self._queue_repository.push(
topic=task_repositories.Topic.EVENT_JOB,
topic=task_repositories.JobTopic.EVENT,
item=task_job_models.EventJob(
id=f"{task_id}/{trigger.id}/{action.id}/{raw_event.id}",
event=raw_event,
Expand Down
53 changes: 29 additions & 24 deletions backend/lib/task/repositories/queue/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@
import lib.utils.json as json_utils


class Topic(str, enum.Enum):
TASK_JOB = "task_jobs"
TRIGGER_JOB = "trigger_jobs"
EVENT_JOB = "events_jobs"
class JobTopic(str, enum.Enum):
TASK = "task_jobs"
TRIGGER = "trigger_jobs"
EVENT = "events_jobs"

FAILED_TASK_JOB = "failed_tasks_jobs"
FAILED_TRIGGER_JOB = "failed_trigger_jobs"
FAILED_EVENT_JOB = "failed_event_jobs"
FAILED_TASK = "failed_tasks_jobs"
FAILED_TRIGGER = "failed_trigger_jobs"
FAILED_EVENT = "failed_event_jobs"


ALL_TOPICS = frozenset(topic for topic in Topic)
FAILED_TOPICS = frozenset((Topic.FAILED_TASK_JOB, Topic.FAILED_TRIGGER_JOB, Topic.FAILED_EVENT_JOB))
TOPICS = ALL_TOPICS - FAILED_TOPICS
ALL_JOB_TOPICS = frozenset(topic for topic in JobTopic)
FAILED_JOB_TOPICS = frozenset((JobTopic.FAILED_TASK, JobTopic.FAILED_TRIGGER, JobTopic.FAILED_EVENT))
JOB_TOPICS = ALL_JOB_TOPICS - FAILED_JOB_TOPICS


class QueueItem(typing.Protocol):
Expand All @@ -44,21 +44,23 @@ async def dispose(self) -> None: ...
@property
def is_finished(self) -> bool: ...

def is_topic_finished(self, topic: Topic) -> bool: ...
def is_topic_finished(self, topic: JobTopic) -> bool: ...

async def push(self, topic: Topic, item: QueueItem, validate_not_closed: bool = True) -> None:
def is_topic_empty(self, topic: JobTopic) -> bool: ...

async def push(self, topic: JobTopic, item: QueueItem, validate_not_closed: bool = True) -> None:
"""
:raises TopicClosed: if topic is closed
"""

def acquire(self, topic: Topic) -> typing.AsyncContextManager[QueueItem]: # pyright: ignore[reportReturnType]
def acquire(self, topic: JobTopic) -> typing.AsyncContextManager[QueueItem]: # pyright: ignore[reportReturnType]
"""
:raises TopicFinished: if topic is finished
"""

async def consume(self, topic: Topic, item: QueueItem) -> None: ...
async def consume(self, topic: JobTopic, item: QueueItem) -> None: ...

async def close_topic(self, topic: Topic) -> None: ...
async def close_topic(self, topic: JobTopic) -> None: ...


class BaseQueueSettings(pydantic_settings.BaseSettings):
Expand Down Expand Up @@ -87,19 +89,22 @@ async def dispose(self) -> None: ...
def is_finished(self) -> bool: ...

@abc.abstractmethod
def is_topic_finished(self, topic: Topic) -> bool: ...
def is_topic_finished(self, topic: JobTopic) -> bool: ...

@abc.abstractmethod
def is_topic_empty(self, topic: JobTopic) -> bool: ...

@abc.abstractmethod
async def push(self, topic: Topic, item: QueueItem, validate_not_closed: bool = True) -> None: ...
async def push(self, topic: JobTopic, item: QueueItem, validate_not_closed: bool = True) -> None: ...

@abc.abstractmethod
def acquire(self, topic: Topic) -> typing.AsyncContextManager[QueueItem]: ...
def acquire(self, topic: JobTopic) -> typing.AsyncContextManager[QueueItem]: ...

@abc.abstractmethod
async def consume(self, topic: Topic, item: QueueItem) -> None: ...
async def consume(self, topic: JobTopic, item: QueueItem) -> None: ...

@abc.abstractmethod
async def close_topic(self, topic: Topic) -> None: ...
async def close_topic(self, topic: JobTopic) -> None: ...


@dataclasses.dataclass
Expand Down Expand Up @@ -137,15 +142,15 @@ def queue_repository_factory(settings: BaseQueueSettings) -> QueueRepositoryProt


__all__ = [
"ALL_TOPICS",
"ALL_JOB_TOPICS",
"BaseQueueRepository",
"BaseQueueSettings",
"FAILED_TOPICS",
"FAILED_JOB_TOPICS",
"JOB_TOPICS",
"JobTopic",
"QueueItem",
"QueueRepositoryProtocol",
"RegistryRecord",
"TOPICS",
"Topic",
"queue_repository_factory",
"queue_settings_factory",
"register_queue_backend",
Expand Down
Loading
Loading