Skip to content

Commit

Permalink
Issue #18/EP-4049 trigger cache flush when connections change
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Nov 4, 2021
1 parent e97f5f9 commit bc21d27
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 5 deletions.
3 changes: 3 additions & 0 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class AggregatorCollectionCatalog(AbstractCollectionCatalog):
def __init__(self, backends: MultiBackendConnection):
self.backends = backends
self._cache = TtlCache(default_ttl=CACHE_TTL_DEFAULT)
self.backends.on_connections_change.add(self._cache.flush_all)

def get_all_metadata(self) -> List[dict]:
metadata, internal = self._get_all_metadata_cached()
Expand Down Expand Up @@ -292,6 +293,7 @@ def __init__(
self.backends = backends
# TODO Cache per backend results instead of output?
self._cache = TtlCache(default_ttl=CACHE_TTL_DEFAULT)
self.backends.on_connections_change.add(self._cache.flush_all)
self._catalog = catalog
self._stream_chunk_size = stream_chunk_size

Expand Down Expand Up @@ -557,6 +559,7 @@ def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig):
user_defined_processes=None,
)
self._cache = TtlCache(default_ttl=CACHE_TTL_DEFAULT)
self._backends.on_connections_change.add(self._cache.flush_all)
self._auth_entitlement_check: Union[bool, dict] = config.auth_entitlement_check
self._configured_oidc_providers: List[OidcProvider] = config.configured_oidc_providers

Expand Down
21 changes: 17 additions & 4 deletions src/openeo_aggregator/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from openeo.capabilities import ComparableVersion
from openeo.rest.auth.auth import BearerAuth, OpenEoApiAuthBase
from openeo_aggregator.config import CACHE_TTL_DEFAULT, CONNECTION_TIMEOUT_DEFAULT, STREAM_CHUNK_SIZE_DEFAULT
from openeo_aggregator.utils import TtlCache, _UNSET
from openeo_aggregator.utils import TtlCache, _UNSET, EventHandler
from openeo_driver.backend import OidcProvider
from openeo_driver.errors import OpenEOApiException, AuthenticationRequiredException, \
AuthenticationSchemeInvalidException, InternalException
Expand Down Expand Up @@ -130,6 +130,7 @@ class MultiBackendConnection:

_TIMEOUT = 5

# TODO: API version management: just do single-version aggregation, or also handle version discovery?
# TODO: keep track of (recent) backend failures, e.g. to automatically blacklist a backend
# TODO: synchronized backend connection caching/flushing across gunicorn workers, for better consistency?

Expand All @@ -141,11 +142,13 @@ def __init__(self, backends: Dict[str, str]):
)
# TODO: backend_urls as dict does not have explicit order, while this is important.
self._backend_urls = backends

self._connections_cache = _ConnectionsCache(expiry=0, connections=[])
# TODO: API version management: just do single-version aggregation, or also handle version discovery?
self.api_version = self._get_api_version()
self._cache = TtlCache(default_ttl=CACHE_TTL_DEFAULT)

self.on_connections_change = EventHandler("connection_change")
self.on_connections_change.add(self._cache.flush_all)

def _get_connections(self, skip_failure=False) -> Iterator[BackendConnection]:
"""Create new backend connections."""
for (bid, url) in self._backend_urls.items():
Expand All @@ -163,15 +166,21 @@ def get_connections(self) -> List[BackendConnection]:
"""Get backend connections (re-created automatically if cache ttl expired)"""
now = time.time()
if now > self._connections_cache.expiry:
_log.info(f"Connections cache miss: setting up new connections")
orig_bids = [c.id for c in self._connections_cache.connections]
_log.info(f"Connections cache miss: creating new connections")
self._connections_cache = _ConnectionsCache(
expiry=now + self._CONNECTION_CACHING_TTL,
connections=list(self._get_connections(skip_failure=True))
)
new_bids = [c.id for c in self._connections_cache.connections]
_log.info(
f"Created {len(self._connections_cache.connections)} actual"
f" of {len(self._backend_urls)} configured connections"
)
if orig_bids != new_bids:
_log.info(f"Connections changed {orig_bids} -> {new_bids}: calling on_connections_change callbacks")
self.on_connections_change.trigger(skip_failures=True)

return self._connections_cache.connections

def __iter__(self) -> Iterator[BackendConnection]:
Expand Down Expand Up @@ -206,6 +215,10 @@ def _get_api_version(self) -> ComparableVersion:
raise OpenEOApiException(f"Only single version is supported, but found: {versions}")
return ComparableVersion(versions.pop())

@property
def api_version(self) -> ComparableVersion:
return self._cache.get_or_call(key="api_version", callback=self._get_api_version)

def map(self, callback: Callable[[BackendConnection], Any]) -> Iterator[Tuple[str, Any]]:
"""
Query each backend connection with given callable and return results as iterator
Expand Down
25 changes: 24 additions & 1 deletion src/openeo_aggregator/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
import time
from typing import Callable, Iterable, Iterator
from typing import Callable, Iterable, Iterator, List

# Generic "sentinel object" for unset values (where `None` is valid value)
# https://python-patterns.guide/python/sentinel-object/)
Expand Down Expand Up @@ -126,3 +126,26 @@ def dict_merge(*args, **kwargs) -> dict:
for d in args + (kwargs,):
result.update(d)
return result


class EventHandler:
"""Simple event handler that allows to collect callbacks to call on a certain event."""

def __init__(self, name: str):
self._name = name
self._callbacks: List[Callable[[], None]] = []

def add(self, callback: Callable[[], None]):
"""Add a callback to call when the event is triggered."""
self._callbacks.append(callback)

def trigger(self, skip_failures=False):
"""Call all callbacks."""
_log.info(f"Triggering event {self._name!r}")
for callback in self._callbacks:
try:
callback()
except Exception as e:
_log.error(f"Failure calling event {self._name!r} callback {callback!r}: {e}")
if not skip_failures:
raise

0 comments on commit bc21d27

Please sign in to comment.