Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Commit

Permalink
feat: Service object callback pattern (#22)
Browse files Browse the repository at this point in the history
* docs: Pattern description

* feat(worker): worker callback and service receiver.

Slight refactor so that service object receive a session, not repository
on instantiation.

Plugin automatically adds callback worker function to worker.

* chore(deps): SQLAlchemy pre-release on pypi

* test(worker): test worker callback and dto cache

* feat(worker): service object `enqueue_callback()` method.

* chore(docs): remove obsoleted arg

* chore(docs): remove obsoleted type

* chore(deps): update deps
  • Loading branch information
peterschutt authored Oct 29, 2022
1 parent c6f807a commit b59c2d5
Show file tree
Hide file tree
Showing 15 changed files with 434 additions and 162 deletions.
23 changes: 23 additions & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,26 @@ Configuration via environment.
```dotenv title="Example .env"
--8<-- ".env.example"
```

## Pattern

``` mermaid
sequenceDiagram
Client ->> Controller: Inbound request data
Controller ->> Service: Invoke service with data validated by DTO
Service ->> Repository: View or modify the collection
Repository ->> Service: Detached SQLAlchemy instance(s)
Service ->> Queue: Enqueue async callback
Service ->> Controller: Outbound data
Controller ->> Client: Serialize via DTO
Queue ->> Worker: Worker invoked
Worker ->> Service: Makes async callback
```

- Request data is deserialized and validated by Starlite before it is received by controller.
- Controller invokes relevant service object method and waits for response.
- Service method handles business logic of the request and triggers an asynchronous callback.
- Service method returns to controller and response is made to client.
- Async worker makes callback to service object where any async tasks can be performed.
Depending on architecture, this may not even be the same instance of the application that handled
the request.
7 changes: 3 additions & 4 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ plugins:
show_root_toc_entry: no
- autorefs

# extra:
# version:
# provider: mike
# default: latest
extra:
version:
provider: mike
232 changes: 137 additions & 95 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pydantic = "*"
redis = "*"
saq = "*"
sentry-sdk = "*"
sqlalchemy = { git = "https://github.com/sqlalchemy/sqlalchemy.git", branch = "main" }
sqlalchemy = "==2.0.0b2"
starlite = "~=1.32"

[tool.poetry.urls]
Expand Down
8 changes: 5 additions & 3 deletions src/starlite_saqlalchemy/init_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,10 @@ def example_handler() -> dict:
)
from .health import health_check
from .repository.exceptions import RepositoryException
from .service import ServiceException
from .service import ServiceException, make_service_callback
from .worker import create_worker_instance

if TYPE_CHECKING:
from collections import abc

from starlite.config.app import AppConfig

Expand All @@ -71,7 +70,7 @@ class PluginConfig(BaseModel):
application.
"""

worker_functions: "abc.Sequence[WorkerFunction]" = []
worker_functions: "list[WorkerFunction | tuple[str, WorkerFunction]]" = []
"""
Queue worker functions.
"""
Expand Down Expand Up @@ -335,6 +334,9 @@ def configure_worker(self, app_config: "AppConfig") -> None:
app_config: The Starlite application config object.
"""
if self.config.do_worker and self.config.worker_functions:
self.config.worker_functions.append(
(make_service_callback.__qualname__, make_service_callback)
)
worker_instance = create_worker_instance(self.config.worker_functions)
app_config.on_shutdown.append(worker_instance.stop)
app_config.on_startup.append(worker_instance.on_app_startup)
5 changes: 5 additions & 0 deletions src/starlite_saqlalchemy/repository/abc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from .exceptions import RepositoryNotFoundException

if TYPE_CHECKING:
from sqlalchemy.ext.asyncio import AsyncSession

from .types import FilterTypes

__all__ = ["AbstractRepository"]
Expand All @@ -22,6 +24,9 @@ class AbstractRepository(Generic[T], metaclass=ABCMeta):
id_attribute = "id"
"""Name of the primary identifying attribute on `model_type`."""

def __init__(self, session: "AsyncSession") -> None:
self.session = session

@abstractmethod
async def add(self, data: T) -> T:
"""Add `data` to the collection.
Expand Down
44 changes: 23 additions & 21 deletions src/starlite_saqlalchemy/repository/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,31 +63,31 @@ class SQLAlchemyRepository(AbstractRepository[ModelT]):
def __init__(
self, session: "AsyncSession", select_: "Select[tuple[ModelT]] | None" = None
) -> None:
self._session = session
super().__init__(session)
self._select = select(self.model_type) if select_ is None else select_

async def add(self, data: ModelT) -> ModelT:
with wrap_sqlalchemy_exception():
instance = await self._attach_to_session(data)
await self._session.flush()
await self._session.refresh(instance)
self._session.expunge(instance)
await self.session.flush()
await self.session.refresh(instance)
self.session.expunge(instance)
return instance

async def delete(self, id_: Any) -> ModelT:
with wrap_sqlalchemy_exception():
instance = await self.get(id_)
await self._session.delete(instance)
await self._session.flush()
self._session.expunge(instance)
await self.session.delete(instance)
await self.session.flush()
self.session.expunge(instance)
return instance

async def get(self, id_: Any) -> ModelT:
with wrap_sqlalchemy_exception():
self._filter_select_by_kwargs(**{self.id_attribute: id_})
instance = (await self._execute()).scalar_one_or_none()
instance = self.check_not_found(instance)
self._session.expunge(instance)
self.session.expunge(instance)
return instance

async def list(self, *filters: "FilterTypes", **kwargs: Any) -> list[ModelT]:
Expand All @@ -105,7 +105,7 @@ async def list(self, *filters: "FilterTypes", **kwargs: Any) -> list[ModelT]:
result = await self._execute()
instances = list(result.scalars())
for instance in instances:
self._session.expunge(instance)
self.session.expunge(instance)
return instances

async def update(self, data: ModelT) -> ModelT:
Expand All @@ -115,31 +115,31 @@ async def update(self, data: ModelT) -> ModelT:
await self.get(id_)
# this will merge the inbound data to the instance we just put in the session
instance = await self._attach_to_session(data, strategy="merge")
await self._session.flush()
await self._session.refresh(instance)
self._session.expunge(instance)
await self.session.flush()
await self.session.refresh(instance)
self.session.expunge(instance)
return instance

async def upsert(self, data: ModelT) -> ModelT:
with wrap_sqlalchemy_exception():
instance = await self._attach_to_session(data, strategy="merge")
await self._session.flush()
await self._session.refresh(instance)
self._session.expunge(instance)
await self.session.flush()
await self.session.refresh(instance)
self.session.expunge(instance)
return instance

@classmethod
async def check_health(cls, db_session: "AsyncSession") -> bool:
async def check_health(cls, session: "AsyncSession") -> bool:
"""Perform a health check on the database.
Args:
db_session: through which we runa check statement
session: through which we runa check statement
Returns:
`True` if healthy.
"""
return ( # type:ignore[no-any-return] # pragma: no cover
await db_session.execute(text("SELECT 1"))
await session.execute(text("SELECT 1"))
).scalar_one() == 1

# the following is all sqlalchemy implementation detail, and shouldn't be directly accessed
Expand All @@ -154,6 +154,8 @@ async def _attach_to_session(
Parameters
----------
session: AsyncSession
DB transaction.
model : ModelT
The instance to be attached to the session.
strategy : Literal["add", "merge"]
Expand All @@ -165,15 +167,15 @@ async def _attach_to_session(
"""
match strategy: # noqa: R503
case "add":
self._session.add(model)
self.session.add(model)
return model
case "merge":
return await self._session.merge(model)
return await self.session.merge(model)
case _:
raise ValueError("Unexpected value for `strategy`, must be `'add'` or `'merge'`")

async def _execute(self) -> "Result[tuple[ModelT, ...]]":
return await self._session.execute(self._select)
return await self.session.execute(self._select)

def _filter_in_collection(self, field_name: str, values: "abc.Collection[Any]") -> None:
if not values:
Expand Down
Loading

0 comments on commit b59c2d5

Please sign in to comment.