From cfee2b75a17255776083050b68b5487d9ad80108 Mon Sep 17 00:00:00 2001 From: ovsds Date: Wed, 10 Apr 2024 20:43:23 +0200 Subject: [PATCH] feat: add clear queue and logs --- backend/lib/task/repositories/state/base.py | 9 +++++++++ backend/lib/task/repositories/state/local/file.py | 9 +++++++++ backend/lib/task/services/queue_state.py | 5 ++++- 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/backend/lib/task/repositories/state/base.py b/backend/lib/task/repositories/state/base.py index 65b88ff..fa45369 100644 --- a/backend/lib/task/repositories/state/base.py +++ b/backend/lib/task/repositories/state/base.py @@ -16,6 +16,8 @@ async def get(self) -> StateData | None: ... async def set(self, value: StateData) -> None: ... + async def clear(self) -> None: ... + def acquire(self) -> typing.AsyncContextManager[StateData | None]: ... @@ -26,6 +28,8 @@ async def get(self, path: str) -> StateData | None: ... async def set(self, path: str, value: StateData) -> None: ... + async def clear(self, path: str) -> None: ... + def acquire(self, path: str) -> typing.AsyncContextManager[StateData | None]: ... async def get_state(self, path: str) -> StateProtocol: ... @@ -53,6 +57,9 @@ async def get(self) -> StateData | None: async def set(self, value: StateData) -> None: await self._repository.set(self._path, value) + async def clear(self) -> None: + await self._repository.clear(self._path) + @contextlib.asynccontextmanager async def acquire(self) -> typing.AsyncIterator[StateData | None]: async with self._repository.acquire(self._path) as state: @@ -72,6 +79,8 @@ async def get(self, path: str) -> StateData | None: ... @abc.abstractmethod async def set(self, path: str, value: StateData) -> None: ... + async def clear(self, path: str) -> None: ... + @abc.abstractmethod def acquire(self, path: str) -> typing.AsyncContextManager[StateData | None]: ... diff --git a/backend/lib/task/repositories/state/local/file.py b/backend/lib/task/repositories/state/local/file.py index b381985..6754b13 100644 --- a/backend/lib/task/repositories/state/local/file.py +++ b/backend/lib/task/repositories/state/local/file.py @@ -46,10 +46,19 @@ async def set(self, path: str, value: state_base.StateData) -> None: async with aiofile.async_open(f"{self._root_path}/{path}.json", "w+") as file: await file.write(json_utils.dumps_str(value)) + async def clear(self, path: str) -> None: + logger.debug("Clearing State(%s)", path) + try: + self._root_path.joinpath(f"{path}.json").unlink() + except FileNotFoundError: + pass + @contextlib.asynccontextmanager async def acquire(self, path: str) -> typing.AsyncIterator[state_base.StateData | None]: async with asyncio_utils.acquire_file_lock(f"{self._root_path}/{path}.lock"): + logger.debug("Acquired lock for State(%s)", path) yield await self.get(path) + logger.debug("Released lock for State(%s)", path) __all__ = [ diff --git a/backend/lib/task/services/queue_state.py b/backend/lib/task/services/queue_state.py index a69bb24..0e2582d 100644 --- a/backend/lib/task/services/queue_state.py +++ b/backend/lib/task/services/queue_state.py @@ -87,10 +87,12 @@ async def _dump_topic( await self._queue_repository.consume(topic, job) if len(jobs) == 0: + logger.info("No jobs to dump from Topic(%s)", topic) + await self._state_repository.clear(state_path) return await self._state_repository.set(state_path, {"jobs": jobs}) - logger.info("%s jobs dumped to Topic(%s)", len(jobs), topic) + logger.info("%s jobs dumped from Topic(%s)", len(jobs), topic) async def _load_job_topics( self, @@ -127,6 +129,7 @@ async def _load_topic( state = await self._state_repository.get(state_path) if state is None: + logger.info("State(%s) not found", state_path) return assert isinstance(state, dict)