Skip to content

Commit

Permalink
feat: add clear queue and logs
Browse files Browse the repository at this point in the history
  • Loading branch information
ovsds committed Apr 10, 2024
1 parent b39e9ed commit cfee2b7
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 1 deletion.
9 changes: 9 additions & 0 deletions backend/lib/task/repositories/state/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ async def get(self) -> StateData | None: ...

async def set(self, value: StateData) -> None: ...

async def clear(self) -> None: ...

def acquire(self) -> typing.AsyncContextManager[StateData | None]: ...


Expand All @@ -26,6 +28,8 @@ async def get(self, path: str) -> StateData | None: ...

async def set(self, path: str, value: StateData) -> None: ...

async def clear(self, path: str) -> None: ...

def acquire(self, path: str) -> typing.AsyncContextManager[StateData | None]: ...

async def get_state(self, path: str) -> StateProtocol: ...
Expand Down Expand Up @@ -53,6 +57,9 @@ async def get(self) -> StateData | None:
async def set(self, value: StateData) -> None:
await self._repository.set(self._path, value)

async def clear(self) -> None:
await self._repository.clear(self._path)

@contextlib.asynccontextmanager
async def acquire(self) -> typing.AsyncIterator[StateData | None]:
async with self._repository.acquire(self._path) as state:
Expand All @@ -72,6 +79,8 @@ async def get(self, path: str) -> StateData | None: ...
@abc.abstractmethod
async def set(self, path: str, value: StateData) -> None: ...

async def clear(self, path: str) -> None: ...

@abc.abstractmethod
def acquire(self, path: str) -> typing.AsyncContextManager[StateData | None]: ...

Expand Down
9 changes: 9 additions & 0 deletions backend/lib/task/repositories/state/local/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,19 @@ async def set(self, path: str, value: state_base.StateData) -> None:
async with aiofile.async_open(f"{self._root_path}/{path}.json", "w+") as file:
await file.write(json_utils.dumps_str(value))

async def clear(self, path: str) -> None:
logger.debug("Clearing State(%s)", path)
try:
self._root_path.joinpath(f"{path}.json").unlink()
except FileNotFoundError:
pass

@contextlib.asynccontextmanager
async def acquire(self, path: str) -> typing.AsyncIterator[state_base.StateData | None]:
async with asyncio_utils.acquire_file_lock(f"{self._root_path}/{path}.lock"):
logger.debug("Acquired lock for State(%s)", path)
yield await self.get(path)
logger.debug("Released lock for State(%s)", path)


__all__ = [
Expand Down
5 changes: 4 additions & 1 deletion backend/lib/task/services/queue_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ async def _dump_topic(
await self._queue_repository.consume(topic, job)

if len(jobs) == 0:
logger.info("No jobs to dump from Topic(%s)", topic)
await self._state_repository.clear(state_path)
return

await self._state_repository.set(state_path, {"jobs": jobs})
logger.info("%s jobs dumped to Topic(%s)", len(jobs), topic)
logger.info("%s jobs dumped from Topic(%s)", len(jobs), topic)

async def _load_job_topics(
self,
Expand Down Expand Up @@ -127,6 +129,7 @@ async def _load_topic(

state = await self._state_repository.get(state_path)
if state is None:
logger.info("State(%s) not found", state_path)
return

assert isinstance(state, dict)
Expand Down

0 comments on commit cfee2b7

Please sign in to comment.