Skip to content

Commit

Permalink
feat: add queue dump count to logs (#13)
Browse files Browse the repository at this point in the history
* feat: add example

* feat: change settings

* fix: pydantic factory class annotation

* feat: add clear queue and logs

* chore: add backend/Taskfile descriptions
  • Loading branch information
ovsds authored Apr 10, 2024
1 parent bc84d02 commit a088379
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 24 deletions.
4 changes: 4 additions & 0 deletions backend/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ htmlcov/

# Environment variables
.env
.secrets

# Local app state
example/state
15 changes: 15 additions & 0 deletions backend/Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ tasks:
IGNORE_ERRORS: '{{.IGNORE_ERRORS | default "true"}}'

init:
desc: Initialize environment
cmds:
- echo 'Installing python dependencies...'
- poetry install
--no-root
--with dev

lint:
desc: Run lint checks
cmds:
- echo 'Running poetry checks...'
- poetry check --lock
Expand Down Expand Up @@ -70,6 +72,7 @@ tasks:
- task: _pyright

lint-fix:
desc: Fix lint issues
cmds:
- echo 'Running poetry fixes...'
- poetry lock --no-update
Expand Down Expand Up @@ -108,11 +111,13 @@ tasks:
.

test:
desc: Run tests
cmds:
- echo 'Running pytest...'
- "{{.PENV}}/bin/python -m pytest tests"

test-container:
desc: Run tests in container
cmds:
- task: image-build
vars: { TARGET: tests }
Expand All @@ -122,16 +127,19 @@ tasks:
{{.IMAGE_NAME}}:tests

test-coverage-run:
desc: Run tests with coverage
cmds:
- echo 'Running test coverage...'
- "{{.PENV}}/bin/python -m coverage run -m pytest tests"

test-coverage-report:
desc: Show test coverage report
cmds:
- echo 'Reporting test coverage...'
- "{{.PENV}}/bin/python -m coverage report -m"

test-coverage-html:
desc: Show test coverage report in browser
cmds:
- echo 'Generating test coverage report...'
- "{{.PENV}}/bin/python -m coverage html"
Expand All @@ -141,6 +149,7 @@ tasks:
sh: "[ $(uname) = 'Darwin' ] && echo 'file://$(pwd)/htmlcov/index.html' || echo 'htmlcov/index.html'"

clean:
desc: Clean environment
cmds:
- echo 'Cleaning python dependencies...'
- rm -rf {{.PENV}}
Expand All @@ -156,17 +165,20 @@ tasks:
- rm -rf htmlcov

dependencies-update:
desc: Update python dependencies
cmds:
- echo 'Updating python dependencies...'
- poetry update
- poetry show --outdated

dev-server-start:
desc: Start development application
cmds:
- echo 'Starting server...'
- "{{.PENV}}/bin/python -m bin.main"

dev-server-container-start:
desc: Start development application in container
cmds:
- task: image-build
vars: { TARGET: runtime }
Expand All @@ -176,13 +188,15 @@ tasks:
{{.IMAGE_NAME}}:runtime

ci-init:
desc: CI-specific environment initialization
cmds:
- echo 'Installing python dependencies...'
- poetry install
--no-root
--with dev

ci-test:
desc: CI-specific test run
cmds:
- task: test-container

Expand All @@ -199,6 +213,7 @@ tasks:
TAG: "{{.IMAGE_REGISTRY}}/{{.IMAGE_NAME}}:{{.IMAGE_TAG}}"

ci-image-push:
desc: Push image to registry for CI usage
requires:
vars:
- IMAGE_TAG
Expand Down
25 changes: 25 additions & 0 deletions backend/example/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
tasks:
- id: example_task
triggers:
- id: example_trigger
type: github
token_secret:
type: env
key: GITHUB_TOKEN
owner: ovsds
repos:
- github-watcher
- github-watcher-action
subtriggers:
- type: repository_issue_created
- type: repository_pr_created
- type: repository_failed_workflow_run
actions:
- id: example_action
type: telegram_webhook
chat_id_secret:
type: env
key: TELEGRAM_CHAT_ID
token_secret:
type: env
key: TELEGRAM_TOKEN
11 changes: 11 additions & 0 deletions backend/example/settings.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
logs:
level: DEBUG
tasks:
config_backend:
type: yaml_file
path: example/config.yaml
queue_backend:
type: memory
state_backend:
type: local_dir
path: example/state
2 changes: 1 addition & 1 deletion backend/lib/app/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def from_settings(cls, settings: app_settings.Settings) -> typing.Self:
# Logging

logging.basicConfig(
level=settings.logs.min_level,
level=settings.logs.level,
format=settings.logs.format,
)

Expand Down
4 changes: 2 additions & 2 deletions backend/lib/app/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class AppSettings(pydantic_settings.BaseSettings):
env: str = "development"
name: str = "github-watcher-backend"
version: str = "0.0.1"
debug: bool = True
debug: bool = False

@property
def is_development(self) -> bool:
Expand All @@ -29,7 +29,7 @@ def is_debug(self) -> bool:


class LoggingSettings(pydantic_settings.BaseSettings):
min_level: str = "INFO"
level: str = "INFO"
format: str = "%(asctime)s | %(name)s | %(levelname)s | %(message)s"


Expand Down
16 changes: 16 additions & 0 deletions backend/lib/task/base/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,17 @@ def factory(cls, v: typing.Any, info: pydantic.ValidationInfo) -> "BaseActionCon
return action_config_factory(v)


ActionConfigPydanticAnnotation = typing.Annotated[
pydantic.SerializeAsAny[BaseActionConfig],
pydantic.BeforeValidator(BaseActionConfig.factory),
]
ActionConfigListPydanticAnnotation = typing.Annotated[
list[pydantic.SerializeAsAny[BaseActionConfig]],
pydantic.BeforeValidator(pydantic_utils.make_list_factory(BaseActionConfig.factory)),
pydantic.AfterValidator(pydantic_utils.check_unique_ids),
]


ConfigT = typing.TypeVar("ConfigT", bound=BaseActionConfig)


Expand Down Expand Up @@ -57,6 +68,9 @@ def register_action(


def action_config_factory(data: typing.Any) -> BaseActionConfig:
if isinstance(data, BaseActionConfig):
return data

assert isinstance(data, dict)
assert "type" in data

Expand All @@ -72,6 +86,8 @@ def action_processor_factory(


__all__ = [
"ActionConfigListPydanticAnnotation",
"ActionConfigPydanticAnnotation",
"ActionProcessor",
"ActionProcessorProtocol",
"BaseActionConfig",
Expand Down
3 changes: 3 additions & 0 deletions backend/lib/task/base/secret.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ def register_secret(


def action_config_factory(data: typing.Any) -> BaseSecretConfig:
if isinstance(data, BaseSecretConfig):
return data

assert isinstance(data, dict)
assert "type" in data

Expand Down
18 changes: 3 additions & 15 deletions backend/lib/task/base/task.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,11 @@
import typing

import pydantic

import lib.task.base.action as action_base
import lib.task.base.trigger as task_trigger_base
import lib.task.base.trigger as trigger_base
import lib.utils.pydantic as pydantic_utils


class TaskConfig(pydantic_utils.BaseModel, pydantic_utils.IDMixinModel):
triggers: typing.Annotated[
list[pydantic.SerializeAsAny[task_trigger_base.BaseTriggerConfig]],
pydantic.BeforeValidator(pydantic_utils.make_list_factory(task_trigger_base.BaseTriggerConfig.factory)),
pydantic.AfterValidator(pydantic_utils.check_unique_ids),
]
actions: typing.Annotated[
list[pydantic.SerializeAsAny[action_base.BaseActionConfig]],
pydantic.BeforeValidator(pydantic_utils.make_list_factory(action_base.BaseActionConfig.factory)),
pydantic.AfterValidator(pydantic_utils.check_unique_ids),
]
triggers: trigger_base.TriggerConfigListPydanticAnnotation
actions: action_base.ActionConfigListPydanticAnnotation


__all__ = [
Expand Down
16 changes: 16 additions & 0 deletions backend/lib/task/base/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ def factory(cls, v: typing.Any, info: pydantic.ValidationInfo) -> "BaseTriggerCo
return trigger_config_factory(v)


TriggerConfigPydanticAnnotation = typing.Annotated[
pydantic.SerializeAsAny[BaseTriggerConfig],
pydantic.BeforeValidator(BaseTriggerConfig.factory),
]
TriggerConfigListPydanticAnnotation = typing.Annotated[
list[pydantic.SerializeAsAny[BaseTriggerConfig]],
pydantic.BeforeValidator(pydantic_utils.make_list_factory(BaseTriggerConfig.factory)),
pydantic.AfterValidator(pydantic_utils.check_unique_ids),
]


ConfigT = typing.TypeVar("ConfigT", bound=BaseTriggerConfig)


Expand Down Expand Up @@ -58,6 +69,9 @@ def register_trigger(


def trigger_config_factory(data: typing.Any) -> BaseTriggerConfig:
if isinstance(data, BaseTriggerConfig):
return data

assert isinstance(data, dict)
assert "type" in data

Expand All @@ -75,6 +89,8 @@ def trigger_processor_factory(

__all__ = [
"BaseTriggerConfig",
"TriggerConfigListPydanticAnnotation",
"TriggerConfigPydanticAnnotation",
"TriggerProcessor",
"TriggerProcessorProtocol",
"register_trigger",
Expand Down
8 changes: 3 additions & 5 deletions backend/lib/task/jobs/models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import typing

import pydantic

import lib.task.base as task_base
import lib.utils.json as json_utils
import lib.utils.pydantic as pydantic_utils
Expand Down Expand Up @@ -34,13 +32,13 @@ class TaskJob(BaseJob):

class TriggerJob(BaseJob):
task_id: str
trigger: pydantic.SerializeAsAny[task_base.BaseTriggerConfig]
actions: list[pydantic.SerializeAsAny[task_base.BaseActionConfig]]
trigger: task_base.TriggerConfigPydanticAnnotation
actions: task_base.ActionConfigListPydanticAnnotation


class EventJob(BaseJob):
event: task_base.Event
action: pydantic.SerializeAsAny[task_base.BaseActionConfig]
action: task_base.ActionConfigPydanticAnnotation


__all__ = [
Expand Down
9 changes: 9 additions & 0 deletions backend/lib/task/repositories/state/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ async def get(self) -> StateData | None: ...

async def set(self, value: StateData) -> None: ...

async def clear(self) -> None: ...

def acquire(self) -> typing.AsyncContextManager[StateData | None]: ...


Expand All @@ -26,6 +28,8 @@ async def get(self, path: str) -> StateData | None: ...

async def set(self, path: str, value: StateData) -> None: ...

async def clear(self, path: str) -> None: ...

def acquire(self, path: str) -> typing.AsyncContextManager[StateData | None]: ...

async def get_state(self, path: str) -> StateProtocol: ...
Expand Down Expand Up @@ -53,6 +57,9 @@ async def get(self) -> StateData | None:
async def set(self, value: StateData) -> None:
await self._repository.set(self._path, value)

async def clear(self) -> None:
await self._repository.clear(self._path)

@contextlib.asynccontextmanager
async def acquire(self) -> typing.AsyncIterator[StateData | None]:
async with self._repository.acquire(self._path) as state:
Expand All @@ -72,6 +79,8 @@ async def get(self, path: str) -> StateData | None: ...
@abc.abstractmethod
async def set(self, path: str, value: StateData) -> None: ...

async def clear(self, path: str) -> None: ...

@abc.abstractmethod
def acquire(self, path: str) -> typing.AsyncContextManager[StateData | None]: ...

Expand Down
9 changes: 9 additions & 0 deletions backend/lib/task/repositories/state/local/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,19 @@ async def set(self, path: str, value: state_base.StateData) -> None:
async with aiofile.async_open(f"{self._root_path}/{path}.json", "w+") as file:
await file.write(json_utils.dumps_str(value))

async def clear(self, path: str) -> None:
logger.debug("Clearing State(%s)", path)
try:
self._root_path.joinpath(f"{path}.json").unlink()
except FileNotFoundError:
pass

@contextlib.asynccontextmanager
async def acquire(self, path: str) -> typing.AsyncIterator[state_base.StateData | None]:
async with asyncio_utils.acquire_file_lock(f"{self._root_path}/{path}.lock"):
logger.debug("Acquired lock for State(%s)", path)
yield await self.get(path)
logger.debug("Released lock for State(%s)", path)


__all__ = [
Expand Down
5 changes: 4 additions & 1 deletion backend/lib/task/services/queue_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,12 @@ async def _dump_topic(
await self._queue_repository.consume(topic, job)

if len(jobs) == 0:
logger.info("No jobs to dump from Topic(%s)", topic)
await self._state_repository.clear(state_path)
return

await self._state_repository.set(state_path, {"jobs": jobs})
logger.info("%s jobs dumped to Topic(%s)", len(jobs), topic)
logger.info("%s jobs dumped from Topic(%s)", len(jobs), topic)

async def _load_job_topics(
self,
Expand Down Expand Up @@ -127,6 +129,7 @@ async def _load_topic(

state = await self._state_repository.get(state_path)
if state is None:
logger.info("State(%s) not found", state_path)
return

assert isinstance(state, dict)
Expand Down

0 comments on commit a088379

Please sign in to comment.