Skip to content

Commit

Permalink
Issue #18/EP-4049 Add resilience tests about collection metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Oct 20, 2021
1 parent f718e0b commit b9a2442
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 25 deletions.
38 changes: 20 additions & 18 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def _get_all_metadata(self) -> Tuple[List[dict], _InternalCollectionMetadata]:
try:
backend_collections = con.list_collections()
except Exception:
# TODO: instead of log warning: hard fail or log error?
# TODO: user warning https://github.com/Open-EO/openeo-api/issues/412
_log.warning(f"Failed to get collection metadata from {con.id}", exc_info=True)
# On failure: still cache, but with shorter TTL? (#2)
continue
Expand Down Expand Up @@ -227,23 +227,25 @@ def _get_collection_metadata(self, collection_id: str) -> dict:
metadata, internal = self._get_all_metadata_cached()
backends = internal.get_backends_for_collection(collection_id)

if len(backends) == 1:
con = self.backends.get_connection(backend_id=backends[0])
return con.describe_collection(name=collection_id)
elif len(backends) > 1:
by_backend = {}
for bid in backends:
con = self.backends.get_connection(backend_id=bid)
try:
by_backend[bid] = con.describe_collection(name=collection_id)
except OpenEoApiError as e:
_log.warning(f"Failed collection metadata for {collection_id!r} at {con.id}", exc_info=True)
# TODO: avoid caching of final result? (#2)
continue
by_backend = {}
for bid in backends:
con = self.backends.get_connection(backend_id=bid)
try:
by_backend[bid] = con.describe_collection(name=collection_id)
except OpenEoApiError as e:
# TODO: user warning https://github.com/Open-EO/openeo-api/issues/412
_log.warning(f"Failed collection metadata for {collection_id!r} at {con.id}", exc_info=True)
# TODO: avoid caching of final result? (#2)
continue

if len(by_backend) == 0:
raise CollectionNotFoundException(collection_id=collection_id)
elif len(by_backend) == 1:
# TODO: also go through _merge_collection_metadata procedure (for clean up/normalization)?
return by_backend.popitem()[1]
else:
_log.info(f"Merging metadata for collection {collection_id}.")
return self._merge_collection_metadata(by_backend=by_backend)
else:
raise CollectionNotFoundException(collection_id)

def load_collection(self, collection_id: str, load_params: LoadParameters, env: EvalEnv) -> DriverDataCube:
raise RuntimeError("openeo-aggregator does not implement concrete collection loading")
Expand Down Expand Up @@ -306,7 +308,7 @@ def _get_process_registry(self) -> ProcessRegistry:
try:
processes_per_backend[con.id] = {p["id"]: p for p in con.list_processes()}
except Exception:
# TODO: fail instead of warn?
# TODO: user warning https://github.com/Open-EO/openeo-api/issues/412
_log.warning(f"Failed to get processes from {con.id}", exc_info=True)

# TODO #4: combined set of processes: union, intersection or something else?
Expand Down Expand Up @@ -427,8 +429,8 @@ def get_user_jobs(self, user_id: str) -> List[BatchJobMetadata]:
try:
backend_jobs = con.list_jobs()
except OpenEoApiError as e:
# TODO: user warning https://github.com/Open-EO/openeo-api/issues/412
_log.warning(f"Failed to get job listing from backend {con.id!r}: {e!r}")
# TODO attach failure to response? https://github.com/Open-EO/openeo-api/issues/412
backend_jobs = []
for job in backend_jobs:
job["id"] = JobIdMapping.get_aggregator_job_id(backend_job_id=job["id"], backend_id=con.id)
Expand Down
6 changes: 0 additions & 6 deletions tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,14 +293,8 @@ def test_get_collection_metadata_merging_with_error(
catalog = AggregatorCollectionCatalog(backends=multi_backend_connection)
metadata = catalog.get_collection_metadata("S2")
assert metadata == {
"stac_version": "0.9.0",
"id": "S2",
"description": "S2",
"title": "b2's S2",
"extent": {"spatial": {"bbox": [[-180, -90, 180, 90]]}, "temporal": {"interval": [[None, None]]}},
"license": "proprietary",
"summaries": {"provider:backend": ["b2"]},
"links": [],
}
# TODO: test that caching of result is different from merging without error? (#2)

Expand Down
57 changes: 56 additions & 1 deletion tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from openeo_aggregator.backend import AggregatorCollectionCatalog
from openeo_driver.errors import JobNotFoundException, ProcessGraphMissingException, JobNotFinishedException, \
ProcessGraphInvalidException
from openeo_driver.testing import ApiTester, TEST_USER_AUTH_HEADER, TEST_USER, TEST_USER_BEARER_TOKEN
from openeo_driver.testing import ApiTester, TEST_USER_AUTH_HEADER, TEST_USER, TEST_USER_BEARER_TOKEN, DictSubSet
from .conftest import assert_dict_subset, get_api100, get_flask_app


Expand Down Expand Up @@ -88,6 +88,25 @@ def test_collections_duplicate(self, api100, requests_mock, backend1, backend2):
res = api100.get("/collections").assert_status_code(200).json
assert set(c["id"] for c in res["collections"]) == {"S1", "S2", "S3"}

def test_collection_full_metadata(self, api100, requests_mock, backend1, backend2):
requests_mock.get(backend1 + "/collections", json={"collections": [{"id": "S1"}, {"id": "S2"}]})
requests_mock.get(backend1 + "/collections/S1", json={"id": "S1", "title": "b1 S1"})
requests_mock.get(backend1 + "/collections/S2", json={"id": "S2", "title": "b1 S2"})
requests_mock.get(backend2 + "/collections", json={"collections": [{"id": "S3"}]})
requests_mock.get(backend2 + "/collections/S3", json={"id": "S3", "title": "b2 S3"})

res = api100.get("/collections/S1").assert_status_code(200).json
assert res == DictSubSet({"id": "S1", "title": "b1 S1"})

res = api100.get("/collections/S2").assert_status_code(200).json
assert res == DictSubSet({"id": "S2", "title": "b1 S2"})

res = api100.get("/collections/S3").assert_status_code(200).json
assert res == DictSubSet({"id": "S3", "title": "b2 S3"})

res = api100.get("/collections/S4")
res.assert_error(404, "CollectionNotFound")

def test_collection_items(self, api100, requests_mock, backend1, backend2):
requests_mock.get(backend1 + "/collections", json={"collections": [{"id": "S1"}]})
requests_mock.get(backend2 + "/collections", json={"collections": [{"id": "S2"}]})
Expand All @@ -103,6 +122,42 @@ def collection_items(request, context):
res.assert_status_code(200)
assert res.json == {"type": "FeatureCollection", "features": [{"type": "Feature", "geometry": "blabla"}]}

@pytest.mark.parametrize(["backend1_up", "backend2_up", "expected"], [
(True, False, {"S1", "S2"}),
(False, True, {"S3"}),
(False, False, set()),
])
def test_collections_resilience(
self, api100, requests_mock, backend1, backend2, backend1_up, backend2_up, expected
):
if backend1_up:
requests_mock.get(backend1 + "/collections", json={"collections": [{"id": "S1"}, {"id": "S2"}]})
else:
requests_mock.get(backend1 + "/collections", status_code=404, text="down")
if backend2_up:
requests_mock.get(backend2 + "/collections", json={"collections": [{"id": "S3"}]})
else:
requests_mock.get(backend2 + "/collections", status_code=404, text="down")

res = api100.get("/collections").assert_status_code(200).json
assert set(c["id"] for c in res["collections"]) == expected
# TODO: test caching of results

def test_collection_full_metadata_resilience(self, api100, requests_mock, backend1, backend2):
requests_mock.get(backend1 + "/collections", json={"collections": [{"id": "S1"}, {"id": "S2"}]})
requests_mock.get(backend2 + "/collections", json={"collections": [{"id": "S3"}]})
requests_mock.get(backend1 + "/collections/S1", json={"id": "S1", "title": "b1 S1"})
requests_mock.get(backend1 + "/collections/S2", status_code=404, text="down")
requests_mock.get(backend2 + "/collections/S3", status_code=404, text="down")

res = api100.get("/collections/S1").assert_status_code(200).json
assert res == DictSubSet({"id": "S1", "title": "b1 S1"})

api100.get("/collections/S2").assert_error(404, "CollectionNotFound")
api100.get("/collections/S3").assert_error(404, "CollectionNotFound")
api100.get("/collections/S4").assert_error(404, "CollectionNotFound")
# TODO: test caching of results


class TestAuthentication:
def test_credentials_oidc_default(self, api100, backend1, backend2):
Expand Down

0 comments on commit b9a2442

Please sign in to comment.