Skip to content

Commit

Permalink
Issue #90 Add basic UDP support
Browse files Browse the repository at this point in the history
Just forward to "first" upstream backend
  • Loading branch information
soxofaan committed Sep 13, 2023
1 parent 47b8ef4 commit ec21bc4
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 3 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"requests",
"attrs",
"openeo>=0.17.0",
"openeo_driver>=0.65.0.dev",
"openeo_driver>=0.66.0.dev",
"flask~=2.0",
"gunicorn~=20.0",
"python-json-logger>=2.0.0",
Expand Down
43 changes: 42 additions & 1 deletion src/openeo_aggregator/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)

import flask
import openeo
import openeo_driver.util.view_helpers
from openeo.capabilities import ComparableVersion
from openeo.rest import OpenEoApiError, OpenEoClientException, OpenEoRestError
Expand All @@ -36,6 +37,8 @@
Processing,
SecondaryServices,
ServiceMetadata,
UserDefinedProcesses,
UserDefinedProcessMetadata,
)
from openeo_driver.datacube import DriverDataCube
from openeo_driver.errors import (
Expand All @@ -47,6 +50,7 @@
PermissionsInsufficientException,
ProcessGraphInvalidException,
ProcessGraphMissingException,
ProcessGraphNotFoundException,
ServiceNotFoundException,
ServiceUnsupportedException,
)
Expand Down Expand Up @@ -1172,6 +1176,42 @@ def update_service(self, user_id: str, service_id: str, process_graph: dict) ->
) from e


class AggregatorUserDefinedProcesses(UserDefinedProcesses):
def __init__(self, backends: MultiBackendConnection):
super(AggregatorUserDefinedProcesses, self).__init__()
self._backends = backends

@contextlib.contextmanager
def _get_connection(self, process_graph_id: Optional[str] = None) -> Iterator[openeo.Connection]:
"""Get connection and handle/translate common errors"""
try:
# TODO: we blindly pick "first" upstream backend for now. Do better!
with self._backends.first().authenticated_from_request(request=flask.request) as con:
yield con
except OpenEoApiError as e:
if e.code == ProcessGraphNotFoundException.code:
raise ProcessGraphNotFoundException(process_graph_id=process_graph_id)
raise

def get(self, user_id: str, process_id: str) -> Union[UserDefinedProcessMetadata, None]:
with self._get_connection(process_graph_id=process_id) as con:
metadata = con.get(f"/process_graphs/{process_id}", expected_status=200).json()
return UserDefinedProcessMetadata.from_dict(metadata)

def get_for_user(self, user_id: str) -> List[UserDefinedProcessMetadata]:
with self._get_connection() as con:
data = con.get(f"/process_graphs", expected_status=200).json()
return [UserDefinedProcessMetadata.from_dict(p) for p in data["processes"]]

def save(self, user_id: str, process_id: str, spec: dict) -> None:
with self._get_connection(process_graph_id=process_id) as con:
con.put(f"/process_graphs/{process_id}", json=spec, expected_status=200)

def delete(self, user_id: str, process_id: str) -> None:
with self._get_connection(process_graph_id=process_id) as con:
con.delete(f"/process_graphs/{process_id}", expected_status=204)


class AggregatorBackendImplementation(OpenEoBackendImplementation):
# No basic auth: OIDC auth is required (to get EGI Check-in eduperson_entitlement data)
enable_basic_auth = False
Expand Down Expand Up @@ -1200,13 +1240,14 @@ def __init__(self, backends: MultiBackendConnection, config: AggregatorConfig):
)

secondary_services = AggregatorSecondaryServices(backends=backends, processing=processing, config=config)
user_defined_processes = AggregatorUserDefinedProcesses(backends=backends)

super().__init__(
catalog=catalog,
processing=processing,
secondary_services=secondary_services,
batch_jobs=batch_jobs,
user_defined_processes=None,
user_defined_processes=user_defined_processes,
)
self._configured_oidc_providers: List[OidcProvider] = config.configured_oidc_providers
self._auth_entitlement_check: Union[bool, dict] = config.auth_entitlement_check
Expand Down
4 changes: 3 additions & 1 deletion src/openeo_aggregator/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ def _get_bearer(self, request: flask.Request) -> str:
raise AuthenticationSchemeInvalidException

@contextlib.contextmanager
def authenticated_from_request(self, request: flask.Request, user: Optional[User] = None):
def authenticated_from_request(
self, request: flask.Request, user: Optional[User] = None
) -> Iterator["BackendConnection"]:
"""
Context manager to temporarily authenticate upstream connection based on current incoming flask request.
"""
Expand Down
114 changes: 114 additions & 0 deletions tests/test_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
JobNotFoundException,
ProcessGraphInvalidException,
ProcessGraphMissingException,
ProcessGraphNotFoundException,
)
from openeo_driver.testing import (
TEST_USER,
Expand Down Expand Up @@ -2363,6 +2364,119 @@ def test_update_service_backend_response_is_an_error_status(
assert mock_patch.last_request.json() == json_payload


class TestUserDefinedProcesses:
_UDP_EVI = {
"id": "evi",
"summary": "Enhanced Vegetation Index",
"description": "Computes the Enhanced Vegetation Index (EVI).",
"parameters": [
{"name": "red", "description": "Value from the red band.", "schema": {"type": "number"}},
{"name": "blue", "description": "Value from the blue band.", "schema": {"type": "number"}},
{"name": "nir", "description": "Value from the near infrared band.", "schema": {"type": "number"}},
],
"returns": {"description": "Computed EVI.", "schema": {"type": "number"}},
}

def _with_expected_auth_headers(self, data: dict):
"""Helper to build a dynamic requests_mock response handler that also checks for auth headers"""

def handle(request, context):
assert request.headers["Authorization"] == TEST_USER_AUTH_HEADER["Authorization"]
return data

return handle

def test_list_udps_no_auth(self, api100):
api100.get("/process_graphs").assert_error(401, "AuthenticationRequired")

def test_list_udps_empty(self, api100, requests_mock, backend1):
upstream = requests_mock.get(
backend1 + "/process_graphs", status_code=200, json=self._with_expected_auth_headers({"processes": []})
)
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
res = api100.get("/process_graphs").assert_status_code(200).json
assert res == {"processes": [], "links": []}
assert upstream.call_count == 1

def test_list_udps_existing(self, api100, requests_mock, backend1):
upstream = requests_mock.get(
backend1 + "/process_graphs",
json=self._with_expected_auth_headers(
{
"processes": [
# A full UDP metadata entry
self._UDP_EVI,
# A Minimal UDP metadata entry
{"id": "somethingelse"},
],
"links": [],
}
),
)
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
res = api100.get("/process_graphs").assert_status_code(200).json
assert res == {
"processes": [
self._UDP_EVI,
{"id": "somethingelse"},
],
"links": [],
}
assert upstream.call_count == 1

def test_get_existing(self, api100, requests_mock, backend1):
upstream = requests_mock.get(
backend1 + "/process_graphs/evi", json=self._with_expected_auth_headers(self._UDP_EVI)
)
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
res = api100.get("/process_graphs/evi").assert_status_code(200).json
expected = self._UDP_EVI.copy()
assert res == expected
assert upstream.call_count == 1

def test_get_non_existing(self, api100, requests_mock, backend1):
upstream = requests_mock.get(
backend1 + "/process_graphs/evi",
status_code=ProcessGraphNotFoundException.status_code,
json=self._with_expected_auth_headers(ProcessGraphNotFoundException(process_graph_id="dummy").to_dict()),
)
api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
api100.get("/process_graphs/evi").assert_error(
status_code=404, error_code="ProcessGraphNotFound", message="'evi' does not exist"
)
assert upstream.call_count == 1

def test_store(self, api100, requests_mock, backend1):
udp_id = "add35"
data = {
"id": udp_id,
"process_graph": {
"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True},
},
}

def handle_put(request, context):
assert request.headers["Authorization"] == TEST_USER_AUTH_HEADER["Authorization"]
assert request.json() == data
context.status_code = 200
return {}

upstream = requests_mock.put(backend1 + f"/process_graphs/{udp_id}", json=handle_put)

api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
api100.put(f"/process_graphs/{udp_id}", json=data).assert_status_code(200)
assert upstream.call_count == 1

def test_delete_existing(self, api100, requests_mock, backend1):
upstream = requests_mock.delete(
backend1 + f"/process_graphs/evi", status_code=204, json=self._with_expected_auth_headers({})
)

api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN)
api100.delete(f"/process_graphs/evi").assert_status_code(204)
assert upstream.call_count == 1


class TestResilience:

@pytest.fixture
Expand Down

0 comments on commit ec21bc4

Please sign in to comment.