Skip to content

Commit

Permalink
Issue #18/EP-4049 Finetune from self-review #21
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Nov 4, 2021
1 parent bc21d27 commit 65c1c4e
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 17 deletions.
7 changes: 5 additions & 2 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,9 @@ class AggregatorBackendImplementation(OpenEoBackendImplementation):
# No basic auth: OIDC auth is required (to get EGI Check-in eduperson_entitlement data)
enable_basic_auth = False

# Simplify mocking time for unit tests.
_clock = time.time

def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig):
self._backends = backends
catalog = AggregatorCollectionCatalog(backends=backends)
Expand Down Expand Up @@ -631,10 +634,10 @@ def health_check(self) -> Union[str, dict, flask.Response]:
for con in self._backends:
backend_status[con.id] = {}
try:
start_time = time.time()
start_time = self._clock()
# TODO: this `/health` endpoint is not standardized. Get it from `aggregator_backends` config?
resp = con.get("/health", check_error=False)
elapsed = time.time() - start_time
elapsed = self._clock() - start_time
backend_status[con.id]["status_code"] = resp.status_code
backend_status[con.id]["response_time"] = elapsed
if resp.status_code >= 400:
Expand Down
1 change: 1 addition & 0 deletions src/openeo_aggregator/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

CACHE_TTL_DEFAULT = 6 * 60 * 60

# Timeouts for requests to back-ends
CONNECTION_TIMEOUT_DEFAULT = 30
CONNECTION_TIMEOUT_RESULT = 15 * 60

Expand Down
33 changes: 18 additions & 15 deletions src/openeo_aggregator/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,17 @@ class MultiBackendConnection:
"""
Collection of multiple connections to different backends
"""
# TODO: API version management: just do single/fixed-version federation, or also handle version discovery?
# TODO: keep track of (recent) backend failures, e.g. to automatically blacklist a backend
# TODO: synchronized backend connection caching/flushing across gunicorn workers, for better consistency?

# TODO: move this caching ttl to config?
_CONNECTION_CACHING_TTL = 5 * 60
# TODO: move this connections caching ttl to config?
_CONNECTIONS_CACHING_TTL = 5 * 60

_TIMEOUT = 5

# TODO: API version management: just do single-version aggregation, or also handle version discovery?
# TODO: keep track of (recent) backend failures, e.g. to automatically blacklist a backend
# TODO: synchronized backend connection caching/flushing across gunicorn workers, for better consistency?
# Simplify mocking time for unit tests.
_clock = time.time

def __init__(self, backends: Dict[str, str]):
if any(not re.match(r"^[a-z0-9]+$", bid) for bid in backends.keys()):
Expand All @@ -146,36 +148,37 @@ def __init__(self, backends: Dict[str, str]):
self._connections_cache = _ConnectionsCache(expiry=0, connections=[])
self._cache = TtlCache(default_ttl=CACHE_TTL_DEFAULT)

self.on_connections_change = EventHandler("connection_change")
self.on_connections_change = EventHandler("connections_change")
self.on_connections_change.add(self._cache.flush_all)

def _get_connections(self, skip_failure=False) -> Iterator[BackendConnection]:
def _get_connections(self, skip_failures=False) -> Iterator[BackendConnection]:
"""Create new backend connections."""
for (bid, url) in self._backend_urls.items():
try:
_log.info(f"Create backend {bid!r} connection to {url!r}")
# TODO: also do a health check on the connection?
# TODO: Creating connection usually involves version discovery and request of capability doc.
# Additional health check necessary?
yield BackendConnection(id=bid, url=url)
except Exception as e:
if skip_failure:
_log.warning(f"Failed to create backend {bid!r} connection to {url!r}: {e!r}")
else:
_log.warning(f"Failed to create backend {bid!r} connection to {url!r}: {e!r}")
if not skip_failures:
raise

def get_connections(self) -> List[BackendConnection]:
"""Get backend connections (re-created automatically if cache ttl expired)"""
now = time.time()
now = self._clock()
if now > self._connections_cache.expiry:
_log.info(f"Connections cache expired ({now:.2f}>{self._connections_cache.expiry:.2f})")
orig_bids = [c.id for c in self._connections_cache.connections]
_log.info(f"Connections cache miss: creating new connections")
self._connections_cache = _ConnectionsCache(
expiry=now + self._CONNECTION_CACHING_TTL,
connections=list(self._get_connections(skip_failure=True))
expiry=now + self._CONNECTIONS_CACHING_TTL,
connections=list(self._get_connections(skip_failures=True))
)
new_bids = [c.id for c in self._connections_cache.connections]
_log.info(
f"Created {len(self._connections_cache.connections)} actual"
f" of {len(self._backend_urls)} configured connections"
f" (TTL {self._CONNECTIONS_CACHING_TTL}s)"
)
if orig_bids != new_bids:
_log.info(f"Connections changed {orig_bids} -> {new_bids}: calling on_connections_change callbacks")
Expand Down

0 comments on commit 65c1c4e

Please sign in to comment.