diff --git a/requirements/dev.in b/requirements/dev.in index 22c36c5c..69b949c1 100644 --- a/requirements/dev.in +++ b/requirements/dev.in @@ -20,3 +20,4 @@ sqlalchemy[mypy] holdup respx types-PyYAML +types-redis diff --git a/requirements/dev.txt b/requirements/dev.txt index 0c5c3a72..e6899561 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -16,6 +16,10 @@ certifi==2022.12.7 # -c requirements/main.txt # httpcore # httpx +cffi==1.15.1 + # via + # -c requirements/main.txt + # cryptography cfgv==3.3.1 # via pre-commit click==8.1.3 @@ -26,6 +30,11 @@ coverage[toml]==7.2.3 # via # -r requirements/dev.in # pytest-cov +cryptography==40.0.1 + # via + # -c requirements/main.txt + # types-pyopenssl + # types-redis distlib==0.3.6 # via virtualenv filelock==3.11.0 @@ -83,6 +92,10 @@ pluggy==1.0.0 # via pytest pre-commit==3.2.2 # via -r requirements/dev.in +pycparser==2.21 + # via + # -c requirements/main.txt + # cffi pytest==7.3.0 # via # -r requirements/dev.in @@ -114,8 +127,12 @@ sqlalchemy[asyncio,mypy]==2.0.9 # via # -c requirements/main.txt # -r requirements/dev.in +types-pyopenssl==23.1.0.2 + # via types-redis types-pyyaml==6.0.12.9 # via -r requirements/dev.in +types-redis==4.5.4.1 + # via -r requirements/dev.in typing-extensions==4.5.0 # via # -c requirements/main.txt diff --git a/requirements/main.in b/requirements/main.in index 68cd27f6..d20b26da 100644 --- a/requirements/main.in +++ b/requirements/main.in @@ -20,7 +20,6 @@ nbformat nbconvert jsonschema jinja2 -aioredis gidgethub markdown-it-py[linkify,plugins] mdformat diff --git a/requirements/main.txt b/requirements/main.txt index bc0dd519..a2f67745 100644 --- a/requirements/main.txt +++ b/requirements/main.txt @@ -4,8 +4,6 @@ # # pip-compile --allow-unsafe --output-file=requirements/main.txt requirements/main.in # -aioredis==2.0.1 - # via -r requirements/main.in anyio==3.6.2 # via # httpcore @@ -14,9 +12,7 @@ anyio==3.6.2 arq==0.25.0 # via safir async-timeout==4.0.2 - # via - # aioredis - # redis + # via redis asyncpg==0.27.0 # via safir attrs==22.2.0 @@ -46,7 +42,7 @@ dnspython==2.3.0 # via email-validator email-validator==1.3.1 # via pydantic -fastapi==0.95.0 +fastapi==0.95.1 # via # -r requirements/main.in # safir @@ -201,7 +197,6 @@ traitlets==5.9.0 # nbformat typing-extensions==4.5.0 # via - # aioredis # arq # pydantic # sqlalchemy diff --git a/src/timessquare/cli.py b/src/timessquare/cli.py index a8380ee4..af37d33a 100644 --- a/src/timessquare/cli.py +++ b/src/timessquare/cli.py @@ -7,7 +7,7 @@ import click import structlog import uvicorn -from aioredis import Redis +from redis.asyncio import Redis from safir.asyncio import run_with_asyncio from safir.database import create_database_engine, initialize_database @@ -78,11 +78,10 @@ async def reset_html() -> None: redis = Redis.from_url(config.redis_url, password=None) try: html_store = NbHtmlCacheStore(redis) - record_count = await html_store.delete_all() - if record_count > 0: - click.echo(f"Deleted {record_count} HTML records") - else: - click.echo("No HTML records to delete") + html_store.scan("*") + n = len([r async for r in html_store.scan("*")]) + await html_store.delete_all("*") + click.echo(f"Deleted {n} HTML records") except Exception as e: click.echo(str(e)) finally: diff --git a/src/timessquare/dependencies/redis.py b/src/timessquare/dependencies/redis.py index 82741ef1..114da89f 100644 --- a/src/timessquare/dependencies/redis.py +++ b/src/timessquare/dependencies/redis.py @@ -2,13 +2,13 @@ from typing import Optional -from aioredis import Redis +from redis.asyncio import Redis __all__ = ["RedisDependency", "redis_dependency"] class RedisDependency: - """Provides an aioredis pool as a dependency. + """Provides an asyncio-based Redis client as a dependency. Notes ----- @@ -17,7 +17,7 @@ class RedisDependency: """ def __init__(self) -> None: - self.redis: Optional[Redis] = None + self.redis: Redis | None = None async def initialize( self, redis_url: str, password: Optional[str] = None diff --git a/src/timessquare/dependencies/requestcontext.py b/src/timessquare/dependencies/requestcontext.py index b06a7c23..4fc70e67 100644 --- a/src/timessquare/dependencies/requestcontext.py +++ b/src/timessquare/dependencies/requestcontext.py @@ -3,9 +3,9 @@ from dataclasses import dataclass from typing import Optional -import aioredis from fastapi import Depends, Request, Response from httpx import AsyncClient +from redis.asyncio import Redis from safir.dependencies.db_session import db_session_dependency from safir.dependencies.http_client import http_client_dependency from safir.dependencies.logger import logger_dependency @@ -48,7 +48,7 @@ class RequestContext: session: async_scoped_session """The database session.""" - redis: aioredis.Redis + redis: Redis """Redis connection pool.""" http_client: AsyncClient @@ -105,7 +105,7 @@ async def context_dependency( response: Response, logger: BoundLogger = Depends(logger_dependency), session: async_scoped_session = Depends(db_session_dependency), - redis: aioredis.Redis = Depends(redis_dependency), + redis: Redis = Depends(redis_dependency), http_client: AsyncClient = Depends(http_client_dependency), ) -> RequestContext: """Provides a RequestContext as a dependency.""" diff --git a/src/timessquare/services/page.py b/src/timessquare/services/page.py index 6f95c64a..a2d375fb 100644 --- a/src/timessquare/services/page.py +++ b/src/timessquare/services/page.py @@ -292,7 +292,7 @@ async def get_html( values=resolved_values, display_settings=display_settings, ) - nbhtml = await self._html_store.get(page_key) + nbhtml = await self._html_store.get_instance(page_key) if nbhtml is not None: self._logger.debug("Got HTML from cache") return nbhtml @@ -333,7 +333,7 @@ async def _get_html_from_noteburst_job( not presently available. """ # Is there an existing job in the noteburst job store? - job = await self._job_store.get(page_instance) + job = await self._job_store.get_instance(page_instance) if not job: self._logger.debug("No existing noteburst job available") # A record of a noteburst job is not available. Send a request @@ -370,7 +370,7 @@ async def _get_html_from_noteburst_job( self._logger.warning( "Got a 404 from a noteburst job", job_url=job.job_url ) - await self._job_store.delete(page_instance) + await self._job_store.delete_instance(page_instance) await self.request_noteburst_execution(page_instance) else: # server error from noteburst @@ -456,7 +456,7 @@ async def _create_html_matrix( "Stored new HTML", display_settings=asdict(matrix_key) ) - await self._job_store.delete(page_instance) + await self._job_store.delete_instance(page_instance) self._logger.debug("Deleted old job record") return html_matrix diff --git a/src/timessquare/storage/nbhtmlcache.py b/src/timessquare/storage/nbhtmlcache.py index 100724e5..fdb9386a 100644 --- a/src/timessquare/storage/nbhtmlcache.py +++ b/src/timessquare/storage/nbhtmlcache.py @@ -4,22 +4,24 @@ from typing import Optional -import aioredis +from redis.asyncio import Redis from timessquare.domain.nbhtml import NbHtmlModel -from .redisbase import RedisStore +from .redisbase import RedisPageInstanceStore +__all__ = ["NbHtmlCacheStore"] -class NbHtmlCacheStore(RedisStore[NbHtmlModel]): + +class NbHtmlCacheStore(RedisPageInstanceStore[NbHtmlModel]): """Manages the storage of HTML renderings of notebooks. The domain is `timessquare.domain.nbhtml.NbHtmlModel`. """ - def __init__(self, redis: aioredis.Redis) -> None: + def __init__(self, redis: Redis) -> None: super().__init__( - redis=redis, key_prefix="nbhtml", datatype=NbHtmlModel + redis=redis, key_prefix="nbhtml/", datatype=NbHtmlModel ) async def store_nbhtml( @@ -29,11 +31,11 @@ async def store_nbhtml( Parameters ---------- - nbhtml : `timessquare.domain.nbhtml.NbHtmlModel` + nbhtml The HTML page domain model. - lifetime : int, optional + lifetime The lifetime for the record in seconds. `None` to cache the record indefinitely. """ key = nbhtml.create_key() - await super().store(key, nbhtml, lifetime=lifetime) + await super().store_instance(key, nbhtml, lifetime=lifetime) diff --git a/src/timessquare/storage/noteburstjobstore.py b/src/timessquare/storage/noteburstjobstore.py index 77fd1269..0c3b3389 100644 --- a/src/timessquare/storage/noteburstjobstore.py +++ b/src/timessquare/storage/noteburstjobstore.py @@ -2,15 +2,15 @@ from __future__ import annotations -import aioredis +from redis.asyncio import Redis from timessquare.domain.noteburst import NoteburstJobModel from timessquare.domain.page import PageInstanceIdModel -from .redisbase import RedisStore +from .redisbase import RedisPageInstanceStore -class NoteburstJobStore(RedisStore[NoteburstJobModel]): +class NoteburstJobStore(RedisPageInstanceStore[NoteburstJobModel]): """The noteburst job store keeps track of open notebook execution job requests for a given page and set of parameters. @@ -18,9 +18,9 @@ class NoteburstJobStore(RedisStore[NoteburstJobModel]): `timessquare.domain.noteburst.NoteburstJobModel`. """ - def __init__(self, redis: aioredis.Redis) -> None: + def __init__(self, redis: Redis) -> None: super().__init__( - redis=redis, key_prefix="noteburst", datatype=NoteburstJobModel + redis=redis, key_prefix="noteburst/", datatype=NoteburstJobModel ) async def store_job( @@ -34,14 +34,14 @@ async def store_job( Parameters ---------- - job : `timessquare.domain.noteburst.NoteburstJobModel` + job The job record. - page_id : `timessquare.domain.page.PageInstanceIdModel` + page_id Identifier of the page instance, composed of the page's name and the values the page instance is rendered with. - lifetime : int + lifetime The lifetime of the record, in seconds. The lifetime should be set so that if it elapses, it can be assumed that noteburst has failed to process the original job and that a new request can be sent. """ - await super().store(page_id, job, lifetime=lifetime) + await super().store_instance(page_id, job, lifetime=lifetime) diff --git a/src/timessquare/storage/redisbase.py b/src/timessquare/storage/redisbase.py index f810f1db..ec10e688 100644 --- a/src/timessquare/storage/redisbase.py +++ b/src/timessquare/storage/redisbase.py @@ -1,66 +1,55 @@ -""""Base functionality for redis storage.""" +""""Base for Redis page instance cache storage.""" from __future__ import annotations -from typing import AsyncIterable, Generic, Optional, Type, TypeVar +from typing import AsyncIterable, Optional -import aioredis -from pydantic import BaseModel +from safir.redis import PydanticRedisStorage, S from timessquare.domain.page import PageInstanceIdModel -T = TypeVar("T", bound="BaseModel") +__all__ = ["RedisPageInstanceStore"] -class RedisStore(Generic[T]): - """A base class for Redis-based storage of Pydantic models as JSON. +class RedisPageInstanceStore(PydanticRedisStorage[S]): + """A base class for Redis-based storage of page-instance-related Pydantic + models as JSON. Parameters ---------- - redis : `aioredis.Redis` + datatype + The class of Pydantic model to store. + redis The Redis client. - key_prefix : `str` + key_prefix The prefix for any data stored in Redis. - datatype : `pydantic.BaseModel` - type - The pydantic class of object stored through this RedisStore. """ - def __init__( - self, - *, - redis: aioredis.Redis, - key_prefix: str, - datatype: Type[T], - ) -> None: - self._redis = redis - self._key_prefix = key_prefix - self._datatype = datatype - def calculate_redis_key(self, page_id: PageInstanceIdModel) -> str: """Create the redis key for given the page's name and - parameter values with a datastore's redis key prefix. + parameter values. Parameters ---------- - page_id : `timessquare.domain.page.PageInstanceIdModel` + page_id Identifier of the page instance, composed of the page's name and the values the page instance is rendered with. Returns ------- - key : `str` + str The unique redis key for this combination of page name and parameter values for a given datastore. """ - return f"{self._key_prefix}/{page_id.cache_key}" + return page_id.cache_key - async def store( + async def store_instance( self, page_id: PageInstanceIdModel, - data: T, + data: S, lifetime: Optional[int] = None, ) -> None: - """Store a pydantic object to Redis. + """Store a pydantic object for a page instance. The data is persisted in Redis as a JSON string. @@ -77,78 +66,57 @@ async def store( `None`, the data is persisted forever. """ key = self.calculate_redis_key(page_id) - serialized_data = data.json() - await self._redis.set(key, serialized_data, ex=lifetime) + await super().store(key, data, lifetime=lifetime) - async def get(self, page_id: PageInstanceIdModel) -> Optional[T]: + async def get_instance(self, page_id: PageInstanceIdModel) -> S | None: """Get the data stored for a page instance, deserializing it into the Pydantic model type. Parameters ---------- - page_id : `timessquare.domain.page.PageInstanceIdModel` + page_id Identifier of the page instance, composed of the page's name and the values the page instance is rendered with. Returns ------- - data : `pydantic.BaseModel`, optional + pydantic.BaseModel | None The dataset, as a Pydantic model. If the dataset is not found at the key, `None` is returned. """ key = self.calculate_redis_key(page_id) - serialized_data = await self._redis.get(key) - if not serialized_data: - return None - - return self._datatype.parse_raw(serialized_data.decode()) + return await super().get(key) async def iter_keys_for_page(self, page_name: str) -> AsyncIterable[str]: """Iterate over all keys for a page. Parameters ---------- - page_name : str + page_name The name of the page (corresponds to `timessquare.domain.page.PageModel.name`). Yields ------ - key : str + str Yields keys for a page. """ - pattern = f"{self._key_prefix}/{page_name}/*" - async for key in self._redis.scan_iter(pattern): + pattern = f"{page_name}/*" + async for key in super().scan(pattern): yield key - async def delete(self, page_id: PageInstanceIdModel) -> bool: + async def delete_instance(self, page_id: PageInstanceIdModel) -> bool: """Delete a dataset for a specific page instance. Parameters ---------- - key : `str` + key The key where the data is stored. Returns ------- - deleted : bool + bool `True` if a record was deleted, `False` otherwise. """ key = self.calculate_redis_key(page_id) - count = await self._redis.delete(key) - return count > 0 - - async def delete_all(self) -> int: - """Delete all records with the store's key prefix. - - Returns - ------- - count : `int` - The number of records deleted. - """ - pattern = f"{self._key_prefix}/*" - count = 0 - async for key in self._redis.scan_iter(pattern): - await self._redis.delete(key) - count += 1 - return count > 0 + return await super().delete(key) diff --git a/tests/handlers/v1/github_test.py b/tests/handlers/v1/github_test.py index 30ee30b5..db9bb16d 100644 --- a/tests/handlers/v1/github_test.py +++ b/tests/handlers/v1/github_test.py @@ -4,8 +4,8 @@ from pathlib import Path import pytest -from aioredis import Redis from httpx import AsyncClient +from redis.asyncio import Redis from safir.database import create_async_session, create_database_engine from structlog import get_logger