diff --git a/src/openeo_aggregator/config.py b/src/openeo_aggregator/config.py index bdf50a4c..03cb6b7d 100644 --- a/src/openeo_aggregator/config.py +++ b/src/openeo_aggregator/config.py @@ -58,6 +58,9 @@ def from_py_file(path: Union[str, Path]) -> 'AggregatorConfig': raise ConfigException(f"Variable 'config' from {path} is not AggregatorConfig but {type(config)}") return config + def copy(self) -> 'AggregatorConfig': + return AggregatorConfig(self) + def get_config(x: Any) -> AggregatorConfig: """ diff --git a/tests/test_views.py b/tests/test_views.py index 9fea76c2..0e5dbe65 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -2,10 +2,12 @@ import logging import pytest import requests +from typing import Tuple from openeo_aggregator.backend import AggregatorCollectionCatalog +from openeo_aggregator.config import AggregatorConfig from openeo_aggregator.connection import MultiBackendConnection -from openeo_driver.errors import JobNotFoundException, ProcessGraphMissingException, JobNotFinishedException, \ +from openeo_driver.errors import JobNotFoundException, JobNotFinishedException, \ ProcessGraphInvalidException from openeo_driver.testing import ApiTester, TEST_USER_AUTH_HEADER, TEST_USER, TEST_USER_BEARER_TOKEN, DictSubSet from .conftest import assert_dict_subset, get_api100, get_flask_app @@ -593,6 +595,48 @@ def test_list_jobs(self, api100, requests_mock, backend1, backend2): {"id": "b2-job05", "status": "running", "created": "2021-06-05T12:34:56Z"}, ] + @pytest.mark.parametrize("b2_oidc_pid", ["egi", "aho"]) + def test_list_jobs_oidc_pid_mapping(self, config, requests_mock, backend1, backend2, b2_oidc_pid): + # Override /credentials/oidc of backend2 before building flask app and ApiTester + requests_mock.get(backend2 + "/credentials/oidc", json={"providers": [ + {"id": b2_oidc_pid, "issuer": "https://egi.test", "title": "EGI"} + ]}) + api100 = get_api100(get_flask_app(config)) + + # OIDC setup + def get_userinfo(request: requests.Request, context): + assert request.headers["Authorization"] == "Bearer t0k3n" + return {"sub": "john"} + + requests_mock.get("https://egi.test/.well-known/openid-configuration", json={ + "userinfo_endpoint": "https://egi.test/userinfo" + }) + requests_mock.get("https://egi.test/userinfo", json=get_userinfo) + + def b1_get_jobs(request, context): + assert request.headers["Authorization"] == "Bearer oidc/egi/t0k3n" + return {"jobs": [ + {"id": "job03", "status": "running", "created": "2021-06-03T12:34:56Z"}, + {"id": "job08", "status": "running", "created": "2021-06-08T12:34:56Z"}, + ]} + + def b2_get_jobs(request, context): + assert request.headers["Authorization"] == f"Bearer oidc/{b2_oidc_pid}/t0k3n" + return {"jobs": [ + {"id": "job05", "status": "running", "created": "2021-06-05T12:34:56Z"}, + ]} + + requests_mock.get(backend1 + "/jobs", json=b1_get_jobs) + requests_mock.get(backend2 + "/jobs", json=b2_get_jobs) + + api100.set_auth_bearer_token(token="oidc/egi/t0k3n") + res = api100.get("/jobs").assert_status_code(200).json + assert res["jobs"] == [ + {"id": "b1-job03", "status": "running", "created": "2021-06-03T12:34:56Z"}, + {"id": "b1-job08", "status": "running", "created": "2021-06-08T12:34:56Z"}, + {"id": "b2-job05", "status": "running", "created": "2021-06-05T12:34:56Z"}, + ] + @pytest.mark.parametrize("status_code", [204, 303, 404, 500]) def test_list_jobs_failing_backend(self, api100, requests_mock, backend1, backend2, caplog, status_code): requests_mock.get(backend1 + "/jobs", json={"jobs": [ @@ -886,20 +930,29 @@ def test_get_logs_not_found_on_aggregator(self, api100): class TestResilience: - def test_startup_during_backend_downtime(self, backend1, base_config, requests_mock, caplog): + @pytest.fixture + def broken_backend2( + self, backend1, requests_mock, base_config + ) -> Tuple[str, AggregatorConfig, 'requests_mock.adapter._Matcher']: + """Fixture to quickly set up a config with broken backend2""" + backend2 = "https://b2.test/v1" + # TODO: return 500 on all requests? + root_mock = requests_mock.get(backend2 + "/", status_code=500) + + config = base_config.copy() + config.aggregator_backends = {"b1": backend1, "b2": backend2} + return backend2, config, root_mock + + def test_startup_during_backend_downtime(self, backend1, broken_backend2, requests_mock, caplog): caplog.set_level(logging.WARNING) - # Backend1 is up + # Initial backend setup with broken backend2 requests_mock.get(backend1 + "/health", text="OK") - - # Instead of `backend2` from fixture, set up a broken one - backend2 = "https://b2.test/v1" - m = requests_mock.get(backend2 + "/", status_code=500) - base_config.aggregator_backends = {"b1": backend1, "b2": backend2} - api100 = get_api100(get_flask_app(base_config)) + backend2, config, b2_root = broken_backend2 + api100 = get_api100(get_flask_app(config)) assert "Failed to create backend 'b2' connection" in caplog.text - assert m.call_count == 1 + assert b2_root.call_count == 1 api100.get("/").assert_status_code(200) @@ -911,17 +964,14 @@ def test_startup_during_backend_downtime(self, backend1, base_config, requests_m "status_code": 200, } - def test_startup_during_backend_downtime_and_recover(self, backend1, base_config, requests_mock): + def test_startup_during_backend_downtime_and_recover(self, backend1, broken_backend2, requests_mock): # Set up fake clock MultiBackendConnection._clock = itertools.count(1).__next__ + # Initial backend setup with broken backend2 requests_mock.get(backend1 + "/health", text="OK") - - # Instead of `backend2` from fixture, set up a broken one - backend2 = "https://b2.test/v1" - m = requests_mock.get(backend2 + "/", status_code=500) - base_config.aggregator_backends = {"b1": backend1, "b2": backend2} - api100 = get_api100(get_flask_app(base_config)) + backend2, config, b2_root = broken_backend2 + api100 = get_api100(get_flask_app(config)) assert api100.get("/health").assert_status_code(200).json["backend_status"] == { "b1": {"status_code": 200, "text": "OK", "response_time": pytest.approx(0.1, abs=0.1)}, @@ -935,8 +985,67 @@ def test_startup_during_backend_downtime_and_recover(self, backend1, base_config } # Wait a bit so that cache is flushed - MultiBackendConnection._clock = itertools.count(10 * 60).__next__ + MultiBackendConnection._clock = itertools.count(1000).__next__ assert api100.get("/health").assert_status_code(200).json["backend_status"] == { "b1": {"status_code": 200, "text": "OK", "response_time": pytest.approx(0.1, abs=0.1)}, "b2": {"status_code": 200, "text": "ok again", "response_time": pytest.approx(0.1, abs=0.1)}, } + + @pytest.mark.parametrize("b2_oidc_provider_id", ["egi", "aho"]) + def test_oidc_mapping_after_recover(self, backend1, broken_backend2, requests_mock, b2_oidc_provider_id): + # Set up fake clock + MultiBackendConnection._clock = itertools.count(1).__next__ + + # Initial backend setup with broken backend2 + backend2, config, b2_root = broken_backend2 + api100 = get_api100(get_flask_app(config)) + + # OIDC setup + def get_userinfo(request: requests.Request, context): + assert request.headers["Authorization"] == "Bearer t0k3n" + return {"sub": "john"} + + requests_mock.get("https://egi.test/.well-known/openid-configuration", json={ + "userinfo_endpoint": "https://egi.test/userinfo" + }) + requests_mock.get("https://egi.test/userinfo", json=get_userinfo) + + # Job listings: backend1 works, backend2 is down + requests_mock.get(backend1 + "/jobs", json={"jobs": [ + {"id": "j0b1", "status": "running", "created": "2021-01-11T11:11:11Z"} + ]}) + requests_mock.get(backend2 + "/jobs", status_code=500, text="nope") + + api100.set_auth_bearer_token(token="oidc/egi/t0k3n") + jobs = api100.get("/jobs").assert_status_code(200).json + assert jobs["jobs"] == [ + {"id": "b1-j0b1", "status": "running", "created": "2021-01-11T11:11:11Z"} + ] + + # Backend2 is up again (but still cached as down) + requests_mock.get(backend2 + "/", json={"api_version": "1.0.0"}) + requests_mock.get(backend2 + "/credentials/oidc", json={"providers": [ + {"id": b2_oidc_provider_id, "issuer": "https://egi.test", "title": "EGI"} + ]}) + + def get_jobs(request, context): + assert request.headers["Authorization"] == f"Bearer oidc/{b2_oidc_provider_id}/t0k3n" + return {"jobs": [ + {"id": "j0b2", "status": "running", "created": "2021-02-22T22:22:22Z"} + ]} + + requests_mock.get(backend2 + "/jobs", json=get_jobs) + + jobs = api100.get("/jobs").assert_status_code(200).json + assert jobs["jobs"] == [ + {"id": "b1-j0b1", "status": "running", "created": "2021-01-11T11:11:11Z"} + ] + + # Skip time so that connection cache is cleared + MultiBackendConnection._clock = itertools.count(1000).__next__ + jobs = api100.get("/jobs").assert_status_code(200).json + assert jobs["jobs"] == [ + {"id": "b1-j0b1", "status": "running", "created": "2021-01-11T11:11:11Z"}, + {"id": "b2-j0b2", "status": "running", "created": "2021-02-22T22:22:22Z"}, + ] +