Skip to content

Commit

Permalink
Merge pull request #175 from grillazz/174-implement-scheduler
Browse files Browse the repository at this point in the history
174 implement scheduler
  • Loading branch information
grillazz authored Oct 16, 2024
2 parents db3f728 + 7634531 commit bd9d7ac
Show file tree
Hide file tree
Showing 8 changed files with 349 additions and 203 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ I've included a few of my favorites to kick things off!
- **[MAR 15 2024]** add polars and calamine to project :heart_eyes_cat:
- **[JUN 8 2024]** implement asyncpg connection pool :fast_forward:
- **[AUG 17 2024]** granian use case implemented with docker compose and rich logger :fast_forward:
- **[OCT 16 2024]** apscheduler added to project :fast_forward:

<p align="right">(<a href="#readme-top">back to top</a>)</p>

Expand Down
23 changes: 21 additions & 2 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import asyncpg
from contextlib import asynccontextmanager

from apscheduler.eventbrokers.redis import RedisEventBroker
from apscheduler.datastores.sqlalchemy import SQLAlchemyDataStore
from fastapi import FastAPI, Depends
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
Expand All @@ -9,11 +9,17 @@
from app.api.shakespeare import router as shakespeare_router
from app.api.stuff import router as stuff_router
from app.config import settings as global_settings
from app.database import engine
from app.utils.logging import AppLogger
from app.api.user import router as user_router
from app.api.health import router as health_router
from app.redis import get_redis, get_cache
from app.services.auth import AuthBearer
from app.services.scheduler import SchedulerMiddleware

from contextlib import asynccontextmanager

from apscheduler import AsyncScheduler

logger = AppLogger().get_logger()

Expand Down Expand Up @@ -60,3 +66,16 @@ async def lifespan(_app: FastAPI):
tags=["Health, Bearer"],
dependencies=[Depends(AuthBearer())],
)

_scheduler_data_store = SQLAlchemyDataStore(engine)
_scheduler_event_broker = RedisEventBroker(
client_or_url=global_settings.redis_url.unicode_string()
)
_scheduler_himself = AsyncScheduler(_scheduler_data_store, _scheduler_event_broker)

app.add_middleware(SchedulerMiddleware, scheduler=_scheduler_himself)


# TODO: every not GET meth should reset cache
# TODO: every scheduler task which needs to act on database should have access to connection pool via request - maybe ?
# TODO: https://stackoverflow.com/questions/16053364/make-sure-only-one-worker-launches-the-apscheduler-event-in-a-pyramid-web-app-ru
4 changes: 3 additions & 1 deletion app/models/stuff.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ def compile_sql_or_scalar(func):
async def wrapper(cls, db_session, name, compile_sql=False, *args, **kwargs):
stmt = await func(cls, db_session, name, *args, **kwargs)
if compile_sql:
return stmt.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True})
return stmt.compile(
dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}
)
result = await db_session.execute(stmt)
return result.scalars().first()

Expand Down
8 changes: 4 additions & 4 deletions app/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ def password(self):
@password.setter
def password(self, password: SecretStr):
_password_string = password.get_secret_value().encode("utf-8")
self._password = bcrypt.hashpw(
_password_string, bcrypt.gensalt()
)
self._password = bcrypt.hashpw(_password_string, bcrypt.gensalt())

def check_password(self, password: SecretStr):
return bcrypt.checkpw(password.get_secret_value().encode("utf-8"), self._password)
return bcrypt.checkpw(
password.get_secret_value().encode("utf-8"), self._password
)

@classmethod
async def find(cls, database_session: AsyncSession, where_conditions: list[Any]):
Expand Down
41 changes: 41 additions & 0 deletions app/services/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
from datetime import datetime

from sqlalchemy import text
from starlette.types import ASGIApp, Receive, Scope, Send
from apscheduler import AsyncScheduler
from apscheduler.triggers.interval import IntervalTrigger

from app.database import AsyncSessionFactory
from app.utils.logging import AppLogger

logger = AppLogger().get_logger()


async def tick():
async with AsyncSessionFactory() as session:
stmt = text("select 1;")
logger.info(f">>>> Be or not to be...{datetime.now()}")
result = await session.execute(stmt)
logger.info(f">>>> Result: {result.scalar()}")
return True


class SchedulerMiddleware:
def __init__(
self,
app: ASGIApp,
scheduler: AsyncScheduler,
) -> None:
self.app = app
self.scheduler = scheduler

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
if scope["type"] == "lifespan":
async with self.scheduler:
await self.scheduler.add_schedule(
tick, IntervalTrigger(seconds=25), id="tick-sql-25"
)
await self.scheduler.start_in_background()
await self.app(scope, receive, send)
else:
await self.app(scope, receive, send)
3 changes: 2 additions & 1 deletion app/utils/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ def get_logger(self):
class RichConsoleHandler(RichHandler):
def __init__(self, width=200, style=None, **kwargs):
super().__init__(
console=Console(color_system="256", width=width, style=style, stderr=True), **kwargs
console=Console(color_system="256", width=width, style=style, stderr=True),
**kwargs
)
Loading

0 comments on commit bd9d7ac

Please sign in to comment.