From 7d8e166ea68856965f0aad3e08034715d8996f61 Mon Sep 17 00:00:00 2001 From: ovsds Date: Wed, 10 Apr 2024 22:14:21 +0200 Subject: [PATCH] feat: add exit codes on timeout and failed jobs --- backend/bin/main/__main__.py | 6 ++- backend/lib/app/__init__.py | 14 ++--- backend/lib/app/app.py | 18 +++++-- backend/lib/app/errors.py | 18 ++++++- backend/lib/app/settings.py | 2 + backend/lib/task/jobs/event_processor.py | 10 ++-- backend/lib/task/jobs/task_processor.py | 14 ++--- backend/lib/task/jobs/task_spawner.py | 4 +- backend/lib/task/jobs/trigger_processor.py | 14 ++--- backend/lib/task/repositories/queue/base.py | 53 ++++++++++--------- .../task/repositories/queue/local/memory.py | 20 ++++--- backend/lib/task/services/queue_state.py | 34 ++++++------ 12 files changed, 120 insertions(+), 87 deletions(-) diff --git a/backend/bin/main/__main__.py b/backend/bin/main/__main__.py index fb759c9..d84c25e 100644 --- a/backend/bin/main/__main__.py +++ b/backend/bin/main/__main__.py @@ -11,7 +11,11 @@ async def run() -> None: plugin_registration.register_plugins() settings = app.Settings() - application = app.Application.from_settings(settings) + try: + application = app.Application.from_settings(settings) + except Exception as exc: + logger.exception("Failed to initialize application settings") + raise app.ServerStartError("Failed to initialize application settings") from exc try: await application.start() diff --git a/backend/lib/app/__init__.py b/backend/lib/app/__init__.py index f535efa..aa0b399 100644 --- a/backend/lib/app/__init__.py +++ b/backend/lib/app/__init__.py @@ -1,11 +1,3 @@ -from .app import Application -from .errors import ApplicationError, DisposeError, StartServerError -from .settings import Settings - -__all__ = [ - "Application", - "ApplicationError", - "DisposeError", - "Settings", - "StartServerError", -] +from .app import * +from .errors import * +from .settings import * diff --git a/backend/lib/app/app.py b/backend/lib/app/app.py index 8d9ba96..9c2b8b6 100644 --- a/backend/lib/app/app.py +++ b/backend/lib/app/app.py @@ -170,7 +170,7 @@ async def start(self) -> None: await self._lifecycle_manager.on_startup() except lifecycle_manager_utils.LifecycleManager.StartupError as start_error: logger.error("Application has failed to start") - raise app_errors.StartServerError("Application has failed to start, see logs above") from start_error + raise app_errors.ServerStartError("Application has failed to start, see logs above") from start_error logger.info("Application is starting") try: @@ -178,22 +178,30 @@ async def start(self) -> None: except asyncio.CancelledError: logger.info("Application has been interrupted") except BaseException as unexpected_error: - logger.exception("Application failed to start") - - raise app_errors.StartServerError("Application failed to start") from unexpected_error + logger.exception("Application runtime error") + raise app_errors.ServerRuntimeError("Application runtime error") from unexpected_error async def _start(self) -> None: timer = asyncio_utils.TimeoutTimer(timeout=self._settings.tasks.timeout) while not timer.is_expired: all_topics_finished = all( - self._queue_repository.is_topic_finished(topic) for topic in task_repositories.TOPICS + self._queue_repository.is_topic_finished(topic) for topic in task_repositories.JOB_TOPICS ) if self._aiojobs_scheduler.is_empty and all_topics_finished: break await asyncio.sleep(1) else: logger.warning("Application has timed out and will be stopped prematurely") + raise app_errors.ApplicationTimeoutError("Application has timed out") + + logger.info("Application has finished successfully") + failed_topics_empty = all( + self._queue_repository.is_topic_empty(topic) for topic in task_repositories.FAILED_JOB_TOPICS + ) + if not failed_topics_empty: + logger.warning("Application has finished with failed jobs") + raise app_errors.ApplicationFailedJobsError("Application has finished with failed jobs") async def dispose(self) -> None: logger.info("Application is shutting down...") diff --git a/backend/lib/app/errors.py b/backend/lib/app/errors.py index 855d04e..d968206 100644 --- a/backend/lib/app/errors.py +++ b/backend/lib/app/errors.py @@ -11,12 +11,26 @@ class DisposeError(ApplicationError): pass -class StartServerError(ApplicationError): +class ServerStartError(ApplicationError): + pass + + +class ServerRuntimeError(ApplicationError): + pass + + +class ApplicationFailedJobsError(ApplicationError): + pass + + +class ApplicationTimeoutError(ApplicationError): pass __all__ = [ "ApplicationError", + "ApplicationFailedJobsError", + "ApplicationTimeoutError", "DisposeError", - "StartServerError", + "ServerStartError", ] diff --git a/backend/lib/app/settings.py b/backend/lib/app/settings.py index 194ffb6..a87ad47 100644 --- a/backend/lib/app/settings.py +++ b/backend/lib/app/settings.py @@ -104,6 +104,8 @@ def settings_customise_sources( __all__ = [ "AppSettings", + "JobProcessorSettings", "LoggingSettings", "Settings", + "TasksSettings", ] diff --git a/backend/lib/task/jobs/event_processor.py b/backend/lib/task/jobs/event_processor.py index e3acd1b..9ffcdca 100644 --- a/backend/lib/task/jobs/event_processor.py +++ b/backend/lib/task/jobs/event_processor.py @@ -34,7 +34,7 @@ def name(self) -> str: async def _process(self) -> None: try: - async with self._queue_repository.acquire(topic=task_repositories.Topic.EVENT_JOB) as event_job: + async with self._queue_repository.acquire(topic=task_repositories.JobTopic.EVENT) as event_job: assert isinstance(event_job, task_job_models.EventJob) logger.debug("Processing EventJob(%s)", event_job.id) try: @@ -43,20 +43,20 @@ async def _process(self) -> None: logger.error("EventJob(%s) has failed", event_job.id) if event_job.retry_count + 1 < self._max_retries: await self._queue_repository.push( - topic=task_repositories.Topic.EVENT_JOB, + topic=task_repositories.JobTopic.EVENT, item=event_job.copy_retry(), validate_not_closed=False, ) else: logger.error("EventJob(%s) has reached max retries", event_job.id) await self._queue_repository.push( - topic=task_repositories.Topic.FAILED_EVENT_JOB, + topic=task_repositories.JobTopic.FAILED_EVENT, item=event_job, ) - await self._queue_repository.consume(topic=task_repositories.Topic.EVENT_JOB, item=event_job) + await self._queue_repository.consume(topic=task_repositories.JobTopic.EVENT, item=event_job) raise else: - await self._queue_repository.consume(topic=task_repositories.Topic.EVENT_JOB, item=event_job) + await self._queue_repository.consume(topic=task_repositories.JobTopic.EVENT, item=event_job) logger.debug("EventJob(%s) has been processed", event_job.id) except task_repositories.QueueRepositoryProtocol.TopicFinished: logger.debug("Event queue is closed, finishing job") diff --git a/backend/lib/task/jobs/task_processor.py b/backend/lib/task/jobs/task_processor.py index 14420d4..3cd1ef2 100644 --- a/backend/lib/task/jobs/task_processor.py +++ b/backend/lib/task/jobs/task_processor.py @@ -33,7 +33,7 @@ def name(self) -> str: async def _process(self) -> None: try: - async with self._queue_repository.acquire(topic=task_repositories.Topic.TASK_JOB) as task_job: + async with self._queue_repository.acquire(topic=task_repositories.JobTopic.TASK) as task_job: assert isinstance(task_job, task_job_models.TaskJob) logging.debug("Processing TaskJob(%s)", task_job.id) try: @@ -42,31 +42,31 @@ async def _process(self) -> None: logger.error("TaskJob(%s) has failed", task_job.id) if task_job.retry_count + 1 < self._max_retries: await self._queue_repository.push( - topic=task_repositories.Topic.TASK_JOB, + topic=task_repositories.JobTopic.TASK, item=task_job.copy_retry(), validate_not_closed=False, ) else: logger.error("TaskJob(%s) has reached max retries", task_job.id) await self._queue_repository.push( - topic=task_repositories.Topic.FAILED_TASK_JOB, + topic=task_repositories.JobTopic.FAILED_TASK, item=task_job, ) - await self._queue_repository.consume(topic=task_repositories.Topic.TASK_JOB, item=task_job) + await self._queue_repository.consume(topic=task_repositories.JobTopic.TASK, item=task_job) raise else: - await self._queue_repository.consume(topic=task_repositories.Topic.TASK_JOB, item=task_job) + await self._queue_repository.consume(topic=task_repositories.JobTopic.TASK, item=task_job) logger.debug("TaskJob(%s) has been processed", task_job.id) except task_repositories.QueueRepositoryProtocol.TopicFinished: logger.debug("Task queue is closed, finishing job") - await self._queue_repository.close_topic(topic=task_repositories.Topic.TRIGGER_JOB) + await self._queue_repository.close_topic(topic=task_repositories.JobTopic.TRIGGER) self.finish() async def _process_task(self, task_job: task_job_models.TaskJob) -> None: task = task_job.task for trigger in task.triggers: await self._queue_repository.push( - topic=task_repositories.Topic.TRIGGER_JOB, + topic=task_repositories.JobTopic.TRIGGER, item=task_job_models.TriggerJob( id=f"{task.id}/{trigger.id}", task_id=task.id, diff --git a/backend/lib/task/jobs/task_spawner.py b/backend/lib/task/jobs/task_spawner.py index f659f89..94b3217 100644 --- a/backend/lib/task/jobs/task_spawner.py +++ b/backend/lib/task/jobs/task_spawner.py @@ -30,7 +30,7 @@ async def _process(self) -> None: for task in config.tasks: await self._queue_repository.push( - topic=task_repositories.Topic.TASK_JOB, + topic=task_repositories.JobTopic.TASK, item=task_jobs_models.TaskJob( id=task.id, task=task, @@ -40,7 +40,7 @@ async def _process(self) -> None: logger.debug("Task(%s) was spawned", task.id) logger.debug("All tasks have been spawned, closing task topic") - await self._queue_repository.close_topic(task_repositories.Topic.TASK_JOB) + await self._queue_repository.close_topic(task_repositories.JobTopic.TASK) __all__ = [ diff --git a/backend/lib/task/jobs/trigger_processor.py b/backend/lib/task/jobs/trigger_processor.py index 2784af9..6581321 100644 --- a/backend/lib/task/jobs/trigger_processor.py +++ b/backend/lib/task/jobs/trigger_processor.py @@ -36,7 +36,7 @@ def name(self) -> str: async def _process(self) -> None: try: - async with self._queue_repository.acquire(topic=task_repositories.Topic.TRIGGER_JOB) as trigger_job: + async with self._queue_repository.acquire(topic=task_repositories.JobTopic.TRIGGER) as trigger_job: assert isinstance(trigger_job, task_job_models.TriggerJob) logger.debug("Processing TriggerJob(%s)", trigger_job.id) try: @@ -45,24 +45,24 @@ async def _process(self) -> None: logger.exception("Error processing TriggerJob(%s)", trigger_job.id) if trigger_job.retry_count + 1 < self._max_retries: await self._queue_repository.push( - topic=task_repositories.Topic.TRIGGER_JOB, + topic=task_repositories.JobTopic.TRIGGER, item=trigger_job.copy_retry(), validate_not_closed=False, ) else: logger.error("TriggerJob(%s) has reached max retries", trigger_job.id) await self._queue_repository.push( - topic=task_repositories.Topic.FAILED_TRIGGER_JOB, + topic=task_repositories.JobTopic.FAILED_TRIGGER, item=trigger_job, ) - await self._queue_repository.consume(topic=task_repositories.Topic.TRIGGER_JOB, item=trigger_job) + await self._queue_repository.consume(topic=task_repositories.JobTopic.TRIGGER, item=trigger_job) raise else: - await self._queue_repository.consume(topic=task_repositories.Topic.TRIGGER_JOB, item=trigger_job) + await self._queue_repository.consume(topic=task_repositories.JobTopic.TRIGGER, item=trigger_job) logger.debug("TriggerJob(%s) has been processed", trigger_job.id) except task_repositories.QueueRepositoryProtocol.TopicFinished: logger.debug("Trigger queue is closed, finishing job") - await self._queue_repository.close_topic(topic=task_repositories.Topic.EVENT_JOB) + await self._queue_repository.close_topic(topic=task_repositories.JobTopic.EVENT) self.finish() async def _process_trigger(self, trigger_job: task_job_models.TriggerJob) -> None: @@ -79,7 +79,7 @@ async def _process_trigger(self, trigger_job: task_job_models.TriggerJob) -> Non async for raw_event in trigger_processor.produce_events(): for action in trigger_job.actions: await self._queue_repository.push( - topic=task_repositories.Topic.EVENT_JOB, + topic=task_repositories.JobTopic.EVENT, item=task_job_models.EventJob( id=f"{task_id}/{trigger.id}/{action.id}/{raw_event.id}", event=raw_event, diff --git a/backend/lib/task/repositories/queue/base.py b/backend/lib/task/repositories/queue/base.py index d7f86e7..fe82922 100644 --- a/backend/lib/task/repositories/queue/base.py +++ b/backend/lib/task/repositories/queue/base.py @@ -9,19 +9,19 @@ import lib.utils.json as json_utils -class Topic(str, enum.Enum): - TASK_JOB = "task_jobs" - TRIGGER_JOB = "trigger_jobs" - EVENT_JOB = "events_jobs" +class JobTopic(str, enum.Enum): + TASK = "task_jobs" + TRIGGER = "trigger_jobs" + EVENT = "events_jobs" - FAILED_TASK_JOB = "failed_tasks_jobs" - FAILED_TRIGGER_JOB = "failed_trigger_jobs" - FAILED_EVENT_JOB = "failed_event_jobs" + FAILED_TASK = "failed_tasks_jobs" + FAILED_TRIGGER = "failed_trigger_jobs" + FAILED_EVENT = "failed_event_jobs" -ALL_TOPICS = frozenset(topic for topic in Topic) -FAILED_TOPICS = frozenset((Topic.FAILED_TASK_JOB, Topic.FAILED_TRIGGER_JOB, Topic.FAILED_EVENT_JOB)) -TOPICS = ALL_TOPICS - FAILED_TOPICS +ALL_JOB_TOPICS = frozenset(topic for topic in JobTopic) +FAILED_JOB_TOPICS = frozenset((JobTopic.FAILED_TASK, JobTopic.FAILED_TRIGGER, JobTopic.FAILED_EVENT)) +JOB_TOPICS = ALL_JOB_TOPICS - FAILED_JOB_TOPICS class QueueItem(typing.Protocol): @@ -44,21 +44,23 @@ async def dispose(self) -> None: ... @property def is_finished(self) -> bool: ... - def is_topic_finished(self, topic: Topic) -> bool: ... + def is_topic_finished(self, topic: JobTopic) -> bool: ... - async def push(self, topic: Topic, item: QueueItem, validate_not_closed: bool = True) -> None: + def is_topic_empty(self, topic: JobTopic) -> bool: ... + + async def push(self, topic: JobTopic, item: QueueItem, validate_not_closed: bool = True) -> None: """ :raises TopicClosed: if topic is closed """ - def acquire(self, topic: Topic) -> typing.AsyncContextManager[QueueItem]: # pyright: ignore[reportReturnType] + def acquire(self, topic: JobTopic) -> typing.AsyncContextManager[QueueItem]: # pyright: ignore[reportReturnType] """ :raises TopicFinished: if topic is finished """ - async def consume(self, topic: Topic, item: QueueItem) -> None: ... + async def consume(self, topic: JobTopic, item: QueueItem) -> None: ... - async def close_topic(self, topic: Topic) -> None: ... + async def close_topic(self, topic: JobTopic) -> None: ... class BaseQueueSettings(pydantic_settings.BaseSettings): @@ -87,19 +89,22 @@ async def dispose(self) -> None: ... def is_finished(self) -> bool: ... @abc.abstractmethod - def is_topic_finished(self, topic: Topic) -> bool: ... + def is_topic_finished(self, topic: JobTopic) -> bool: ... + + @abc.abstractmethod + def is_topic_empty(self, topic: JobTopic) -> bool: ... @abc.abstractmethod - async def push(self, topic: Topic, item: QueueItem, validate_not_closed: bool = True) -> None: ... + async def push(self, topic: JobTopic, item: QueueItem, validate_not_closed: bool = True) -> None: ... @abc.abstractmethod - def acquire(self, topic: Topic) -> typing.AsyncContextManager[QueueItem]: ... + def acquire(self, topic: JobTopic) -> typing.AsyncContextManager[QueueItem]: ... @abc.abstractmethod - async def consume(self, topic: Topic, item: QueueItem) -> None: ... + async def consume(self, topic: JobTopic, item: QueueItem) -> None: ... @abc.abstractmethod - async def close_topic(self, topic: Topic) -> None: ... + async def close_topic(self, topic: JobTopic) -> None: ... @dataclasses.dataclass @@ -137,15 +142,15 @@ def queue_repository_factory(settings: BaseQueueSettings) -> QueueRepositoryProt __all__ = [ - "ALL_TOPICS", + "ALL_JOB_TOPICS", "BaseQueueRepository", "BaseQueueSettings", - "FAILED_TOPICS", + "FAILED_JOB_TOPICS", + "JOB_TOPICS", + "JobTopic", "QueueItem", "QueueRepositoryProtocol", "RegistryRecord", - "TOPICS", - "Topic", "queue_repository_factory", "queue_settings_factory", "register_queue_backend", diff --git a/backend/lib/task/repositories/queue/local/memory.py b/backend/lib/task/repositories/queue/local/memory.py index a9abdc1..7829780 100644 --- a/backend/lib/task/repositories/queue/local/memory.py +++ b/backend/lib/task/repositories/queue/local/memory.py @@ -104,11 +104,14 @@ def from_settings(cls, settings: MemoryQueueSettings) -> typing.Self: def is_finished(self) -> bool: return all(topic.is_finished for topic in self._topics.values()) - def is_topic_finished(self, topic: queue_base.Topic) -> bool: + def is_topic_finished(self, topic: queue_base.JobTopic) -> bool: return self._topics[topic].is_finished + def is_topic_empty(self, topic: queue_base.JobTopic) -> bool: + return self._topics[topic].empty() + @contextlib.contextmanager - def _wrap_queue_errors(self, topic: queue_base.Topic) -> typing.Iterator[None]: + def _wrap_queue_errors(self, topic: queue_base.JobTopic) -> typing.Iterator[None]: try: yield except Topic.TopicClosed as exc: @@ -116,22 +119,27 @@ def _wrap_queue_errors(self, topic: queue_base.Topic) -> typing.Iterator[None]: except Topic.TopicFinished as exc: raise self.TopicFinished(f"Topic({topic}) is already finished.") from exc - async def push(self, topic: queue_base.Topic, item: queue_base.QueueItem, validate_not_closed: bool = True) -> None: + async def push( + self, + topic: queue_base.JobTopic, + item: queue_base.QueueItem, + validate_not_closed: bool = True, + ) -> None: logger.debug("Pushing item to topic %s: %s", topic, item) with self._wrap_queue_errors(topic): await self._topics[topic].put(item, validate_not_closed=validate_not_closed) @contextlib.asynccontextmanager - async def acquire(self, topic: queue_base.Topic) -> typing.AsyncIterator[queue_base.QueueItem]: + async def acquire(self, topic: queue_base.JobTopic) -> typing.AsyncIterator[queue_base.QueueItem]: with self._wrap_queue_errors(topic): async with self._topics[topic].acquire() as task: yield task - async def consume(self, topic: queue_base.Topic, item: queue_base.QueueItem) -> None: + async def consume(self, topic: queue_base.JobTopic, item: queue_base.QueueItem) -> None: logger.debug("Consuming item from topic %s: %s", topic, item) await self._topics[topic].consume(item) - async def close_topic(self, topic: queue_base.Topic) -> None: + async def close_topic(self, topic: queue_base.JobTopic) -> None: await self._topics[topic].close() diff --git a/backend/lib/task/services/queue_state.py b/backend/lib/task/services/queue_state.py index 0e2582d..18860c3 100644 --- a/backend/lib/task/services/queue_state.py +++ b/backend/lib/task/services/queue_state.py @@ -33,45 +33,45 @@ def __init__( async def dump(self) -> None: if self._task_queue_mode != JobProcessorQueueStateMode.NONE: - await self._dump_topic(topic=task_repositories.Topic.TASK_JOB) - await self._dump_topic(topic=task_repositories.Topic.FAILED_TASK_JOB) + await self._dump_topic(topic=task_repositories.JobTopic.TASK) + await self._dump_topic(topic=task_repositories.JobTopic.FAILED_TASK) if self._trigger_queue_mode != JobProcessorQueueStateMode.NONE: - await self._dump_topic(topic=task_repositories.Topic.TRIGGER_JOB) - await self._dump_topic(topic=task_repositories.Topic.FAILED_TRIGGER_JOB) + await self._dump_topic(topic=task_repositories.JobTopic.TRIGGER) + await self._dump_topic(topic=task_repositories.JobTopic.FAILED_TRIGGER) if self._event_queue_mode != JobProcessorQueueStateMode.NONE: - await self._dump_topic(topic=task_repositories.Topic.EVENT_JOB) - await self._dump_topic(topic=task_repositories.Topic.FAILED_EVENT_JOB) + await self._dump_topic(topic=task_repositories.JobTopic.EVENT) + await self._dump_topic(topic=task_repositories.JobTopic.FAILED_EVENT) async def load(self): await self._load_job_topics( queue_mode=self._task_queue_mode, - topic=task_repositories.Topic.TASK_JOB, - failed_topic=task_repositories.Topic.FAILED_TASK_JOB, + topic=task_repositories.JobTopic.TASK, + failed_topic=task_repositories.JobTopic.FAILED_TASK, model=task_jobs.TaskJob, ) await self._load_job_topics( queue_mode=self._trigger_queue_mode, - topic=task_repositories.Topic.TRIGGER_JOB, - failed_topic=task_repositories.Topic.FAILED_TRIGGER_JOB, + topic=task_repositories.JobTopic.TRIGGER, + failed_topic=task_repositories.JobTopic.FAILED_TRIGGER, model=task_jobs.TriggerJob, ) await self._load_job_topics( queue_mode=self._event_queue_mode, - topic=task_repositories.Topic.EVENT_JOB, - failed_topic=task_repositories.Topic.FAILED_EVENT_JOB, + topic=task_repositories.JobTopic.EVENT, + failed_topic=task_repositories.JobTopic.FAILED_EVENT, model=task_jobs.EventJob, ) - def _get_default_state_path(self, topic: task_repositories.Topic) -> str: + def _get_default_state_path(self, topic: task_repositories.JobTopic) -> str: return f"topics/{topic.value}" async def _dump_topic( self, - topic: task_repositories.Topic, + topic: task_repositories.JobTopic, state_path: str | None = None, ) -> None: state_path = state_path or self._get_default_state_path(topic) @@ -97,8 +97,8 @@ async def _dump_topic( async def _load_job_topics( self, queue_mode: JobProcessorQueueStateMode, - topic: task_repositories.Topic, - failed_topic: task_repositories.Topic, + topic: task_repositories.JobTopic, + failed_topic: task_repositories.JobTopic, model: type[task_jobs.BaseJob], ): if queue_mode == JobProcessorQueueStateMode.PRESERVE: @@ -118,7 +118,7 @@ async def _load_job_topics( async def _load_topic( self, - topic: task_repositories.Topic, + topic: task_repositories.JobTopic, model: type[task_jobs.BaseJob], state_path: str | None = None, reset_retry_count: bool = False,