diff --git a/src/openeo_aggregator/partitionedjobs/tracking.py b/src/openeo_aggregator/partitionedjobs/tracking.py index 1335bd1f..6c63ce68 100644 --- a/src/openeo_aggregator/partitionedjobs/tracking.py +++ b/src/openeo_aggregator/partitionedjobs/tracking.py @@ -22,6 +22,7 @@ STATUS_INSERTED, STATUS_RUNNING, PartitionedJob, + PartitionedJobFailure, SubJob, ) from openeo_aggregator.partitionedjobs.crossbackend import ( @@ -103,9 +104,20 @@ def create_crossbackend_pjob( def get_replacement(node_id: str, node: dict, subgraph_id: SubGraphId) -> dict: nonlocal batch_jobs - # TODO: use canonical URL to circumvent auth issues - # but how does `parial` work there? (https://github.com/Open-EO/openeo-api/issues/507) - stac_url = batch_jobs[subgraph_id].get_results_metadata_url(full=True) + "?partial=true" + try: + job: BatchJob = batch_jobs[subgraph_id] + with job.connection.authenticated_from_request(flask.request): + result_url = job.get_results_metadata_url(full=True) + result_metadata = job.connection.get( + result_url, params={"partial": "true"}, expected_status=200 + ).json() + # Will canonical link also be partial? (https://github.com/Open-EO/openeo-api/issues/507) + canonical_links = [link for link in result_metadata.get("links", {}) if link.get("rel") == "canonical"] + stac_url = canonical_links[0]["href"] + except Exception as e: + msg = f"Failed to obtain partial canonical batch job result URL for {subgraph_id=}: {e}" + _log.exception(msg) + raise PartitionedJobFailure(msg) from e return { node_id: { "process_id": "load_stac", diff --git a/tests/partitionedjobs/test_api.py b/tests/partitionedjobs/test_api.py index dfc8b941..20807c02 100644 --- a/tests/partitionedjobs/test_api.py +++ b/tests/partitionedjobs/test_api.py @@ -695,7 +695,7 @@ def test_create_job_simple(self, flask_app, api100, zk_db, dummy1): assert pg == {"lc1": {"arguments": {"id": "S2"}, "process_id": "load_collection", "result": True}} @now.mock - def test_create_job_basic(self, flask_app, api100, zk_db, dummy1): + def test_create_job_basic(self, flask_app, api100, zk_db, dummy1, requests_mock): api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) pg = { @@ -708,6 +708,11 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1): }, } + requests_mock.get( + "https://b1.test/v1/jobs/1-jb-0/results?partial=true", + json={"links": [{"rel": "canonical", "href": "https://data.b1.test/123abc"}]}, + ) + res = api100.post( "/jobs", json={"process": {"process_graph": pg}, "job_options": {"split_strategy": "crossbackend"}}, @@ -765,7 +770,7 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1): "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, "lc2": { "process_id": "load_stac", - "arguments": {"url": "https://b1.test/v1/jobs/1-jb-0/results?partial=true"}, + "arguments": {"url": "https://data.b1.test/123abc"}, }, "merge": { "process_id": "merge_cubes", @@ -790,7 +795,7 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1): "lc1": {"process_id": "load_collection", "arguments": {"id": "S2"}}, "lc2": { "process_id": "load_stac", - "arguments": {"url": "https://b1.test/v1/jobs/1-jb-0/results?partial=true"}, + "arguments": {"url": "https://data.b1.test/123abc"}, }, "merge": { "process_id": "merge_cubes", @@ -818,7 +823,7 @@ def test_create_job_basic(self, flask_app, api100, zk_db, dummy1): } @now.mock - def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1): + def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1, requests_mock): """Run the jobs and get results""" api100.set_auth_bearer_token(token=TEST_USER_BEARER_TOKEN) @@ -832,6 +837,11 @@ def test_start_and_job_results(self, flask_app, api100, zk_db, dummy1): }, } + requests_mock.get( + "https://b1.test/v1/jobs/1-jb-0/results?partial=true", + json={"links": [{"rel": "canonical", "href": "https://data.b1.test/123abc"}]}, + ) + res = api100.post( "/jobs", json={