Skip to content

Commit

Permalink
Issue #84: Implement caching for AggregatorSecondaryServices.service_…
Browse files Browse the repository at this point in the history
…types
  • Loading branch information
JohanKJSchreurs committed Dec 8, 2022
1 parent effd2a9 commit a67cdbf
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 18 deletions.
13 changes: 11 additions & 2 deletions src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -648,10 +648,16 @@ class AggregatorSecondaryServices(SecondaryServices):
def __init__(
self,
backends: MultiBackendConnection,
processing: AggregatorProcessing
processing: AggregatorProcessing,
config: AggregatorConfig
):
super(AggregatorSecondaryServices, self).__init__()
self._backends = backends

self._memoizer = memoizer_from_config(config=config, namespace="SecondaryServices")
self._backends.on_connections_change.add(self._memoizer.invalidate)

# TODO Issue #84 Decide which backend based on service type. Will need to remove self._processing for this.
self._processing = processing

def _get_connection_and_backend_service_id(
Expand All @@ -671,6 +677,9 @@ def _get_connection_and_backend_service_id(
return con, backend_service_id

def service_types(self) -> dict:
return self._memoizer.get_or_call(key=("all_service_types",), callback=self._get_service_types)

def _get_service_types(self) -> dict:
"""https://openeo.org/documentation/1.0/developers/api/reference.html#operation/list-service-types"""
# TODO: add caching. Also see https://github.com/Open-EO/openeo-aggregator/issues/78#issuecomment-1326180557
service_types = {}
Expand Down Expand Up @@ -848,7 +857,7 @@ def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig):
partitioned_job_tracker=partitioned_job_tracker
)

secondary_services = AggregatorSecondaryServices(backends=backends, processing=processing)
secondary_services = AggregatorSecondaryServices(backends=backends, processing=processing, config=config)

super().__init__(
catalog=catalog,
Expand Down
81 changes: 65 additions & 16 deletions tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,60 @@ def test_service_types_simple(
requests_mock.get(backend1 + "/service_types", json=single_service_type)
requests_mock.get(backend2 + "/service_types", json={})
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

service_types = implementation.service_types()
assert service_types == single_service_type

def test_service_types_simple_cached(
self, multi_backend_connection, config, catalog, backend1, backend2, requests_mock
):
"""The service_types call is cached:
When we get the service types several times, the second call that happens before the cache expires,
doesn't hit the backend.
But the third call that happens that happens after the cache has expired does hit the backend again.
"""
single_service_type = {
"WMTS": {
"configuration": {
"colormap": {
"default": "YlGn",
"description":
"The colormap to apply to single band layers",
"type": "string"
},
"version": {
"default": "1.0.0",
"description": "The WMTS version to use.",
"enum": ["1.0.0"],
"type": "string"
}
},
"links": [],
"process_parameters": [],
"title": "Web Map Tile Service"
}
}
mock_be1 = requests_mock.get(backend1 + "/service_types", json=single_service_type)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

service_types = implementation.service_types()
assert mock_be1.call_count == 1
assert service_types == single_service_type

# Second call happens before the cache item expires: it should not query the backend.
service_types = implementation.service_types()
assert mock_be1.call_count == 1
assert service_types == single_service_type

# Third call happens when the cached item has expired: it should query the backend.
with clock_mock(offset=100):
service_types = implementation.service_types()
assert mock_be1.call_count == 2
assert service_types == single_service_type

# TODO: Issue #84 Check if we should remove this merging behavior.
def test_service_types_merging(self, multi_backend_connection, config, catalog,
backend1, backend2, requests_mock
):
Expand Down Expand Up @@ -205,7 +254,7 @@ def test_service_types_merging(self, multi_backend_connection, config, catalog,
requests_mock.get(backend1 + "/service_types", json=service_type_1)
requests_mock.get(backend2 + "/service_types", json=service_type_2)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

actual_service_types = implementation.service_types()

Expand Down Expand Up @@ -251,7 +300,7 @@ def test_service_info_succeeds(
requests_mock.get(backend1 + "/services/wmts-foo", json=json_wmts_foo)
requests_mock.get(backend2 + "/services/wms-bar", json=json_wms_bar)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

# Check the expected metadata on *both* of the services.
with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER):
Expand Down Expand Up @@ -280,7 +329,7 @@ def test_service_info_wrong_backend_id(

requests_mock.get(backend1 + "/services/wmts-foo", json=service_metadata_wmts_foo.prepare_for_json())
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER):
with pytest.raises(ServiceNotFoundException):
Expand All @@ -293,7 +342,7 @@ def test_service_info_wrong_service_id(

requests_mock.get(backend1 + "/services/service-does-not-exist", status_code=404)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER):
with pytest.raises(ServiceNotFoundException):
Expand Down Expand Up @@ -322,7 +371,7 @@ def test_create_service_succeeds(
status_code=201
)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER):
actual_openeo_id = implementation.create_service(
Expand All @@ -349,7 +398,7 @@ def test_create_service_backend_raises_openeoapiexception(
exc=exception_class("Some server error"),
)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER):
with pytest.raises(OpenEOApiException):
Expand Down Expand Up @@ -380,7 +429,7 @@ def test_create_service_backend_reraises(
exc=exception_class("Some server error")
)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER):
# These exception types should be re-raised, not become an OpenEOApiException.
Expand All @@ -400,7 +449,7 @@ def test_remove_service_succeeds(

mock_delete = requests_mock.delete(backend1 + "/services/wmts-foo", status_code=204)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER):
implementation.remove_service(user_id=TEST_USER, service_id="b1-wmts-foo")
Expand All @@ -414,7 +463,7 @@ def test_remove_service_but_backend_id_not_found(
"""When the backend ID/prefix does not exist then the aggregator raises an ServiceNotFoundException."""

processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

# Case 1: the backend doesn't even exist
with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER):
Expand All @@ -428,7 +477,7 @@ def test_remove_service_but_service_id_not_found(
"""When the service ID does not exist for the specified backend then the aggregator raises an ServiceNotFoundException."""

processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

# The backend exists but the service ID does not.
mock_delete1 = requests_mock.delete(
Expand All @@ -450,7 +499,7 @@ def test_remove_service_backend_response_is_an_error_status(

requests_mock.delete(backend1 + "/services/wmts-foo", status_code=500)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)

with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER):
with pytest.raises(OpenEoApiError) as e:
Expand All @@ -471,7 +520,7 @@ def test_update_service_succeeds(
status_code=204,
)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)
process_graph_after = {"bar": {"process_id": "bar", "arguments": {"arg1": "bar"}}}

with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER):
Expand All @@ -490,7 +539,7 @@ def test_update_service_but_backend_id_does_not_exist(
"""When the backend ID/prefix does not exist then the aggregator raises an ServiceNotFoundException."""

processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)
process_graph_after = {"bar": {"process_id": "bar", "arguments": {"arg1": "bar"}}}

with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER):
Expand All @@ -509,7 +558,7 @@ def test_update_service_but_service_id_not_found(
status_code=404,
)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)
process_graph_after = {"bar": {"process_id": "bar", "arguments": {"arg1": "bar"}}}

with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER):
Expand All @@ -529,7 +578,7 @@ def test_update_service_backend_response_is_an_error_status(
status_code=500,
)
processing = AggregatorProcessing(backends=multi_backend_connection, catalog=catalog, config=config)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing)
implementation = AggregatorSecondaryServices(backends=multi_backend_connection, processing=processing, config=config)
new_process_graph = {"bar": {"process_id": "bar", "arguments": {"arg1": "bar"}}}

with flask_app.test_request_context(headers=TEST_USER_AUTH_HEADER):
Expand Down

0 comments on commit a67cdbf

Please sign in to comment.