Skip to content

Commit

Permalink
add all path variations of CWLProv operations (relates to #673)
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault committed Dec 11, 2024
1 parent f15af39 commit 34fe955
Show file tree
Hide file tree
Showing 9 changed files with 786 additions and 100 deletions.
11 changes: 11 additions & 0 deletions docs/source/appendix.rst
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,14 @@ Glossary
Entity that describes the required inputs, produced outputs, and any applicable metadata for the execution of
the defined script, calculation, or operation.

Provenance
Metadata using the :term:`W3C` |prov|_ standard that is applied to a submitted :term:`Job` execution to allow
retrieving its origin, the related :term:`Application Package`, its :term:`I/O` sources and results, as well as
additional details about the server host and runtime user as applicable to replicate the experiment.

.. seealso::
:ref:`proc_op_job_prov`

Provider
Entity that offers an ensemble of :term:`Process` under it. It is typically a reference to a remote service,
where any :term:`Process` it provides is fetched dynamically on demand.
Expand Down Expand Up @@ -331,6 +339,9 @@ Glossary
Since |ogc-api-standards|_ are based on HTTP and web communications, this consortium establishes the
common foundation definitions used by the :term:`API` specifications.

.. seealso::
|w3c|_

WKT
Well-Known Text geometry representation.

Expand Down
4 changes: 4 additions & 0 deletions docs/source/references.rst
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@
.. _openeo-api: https://openeo.org/documentation/1.0/developers/api/reference.html
.. |OpenAPI-spec| replace:: OpenAPI Specification
.. _OpenAPI-spec: https://spec.openapis.org/oas/v3.1.0
.. |prov| replace:: PROV
.. _prov: https://www.w3.org/TR/prov-overview/
.. |prov-ontology| replace:: PROV-O: The PROV Ontology
.. _prov-ontology: https://www.w3.org/TR/2013/REC-prov-o-20130430/
.. |pywps| replace:: PyWPS
Expand All @@ -186,6 +188,8 @@
.. _weaver-issues: https://github.com/crim-ca/weaver/issues
.. |submit-issue| replace:: submit a new issue
.. _submit-issue: https://github.com/crim-ca/weaver/issues/new/choose
.. |w3c| replace:: W3C
.. _w3c: https://www.w3.org/

.. STAC
.. |stac-spec| replace:: STAC Specification
Expand Down
228 changes: 228 additions & 0 deletions tests/functional/test_job_provenance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
import contextlib
import itertools
import uuid
from typing import TYPE_CHECKING

import pytest
from parameterized import parameterized

from status import Status
from tests.utils import mocked_execute_celery, mocked_sub_requests, mocked_wps_output
from tests.functional.utils import WpsConfigBase, ResourcesUtil
from weaver.formats import ContentType, OutputFormat

if TYPE_CHECKING:
from typing import Optional


@pytest.mark.oap_part4
@pytest.mark.functional
class TestJobProvenance(WpsConfigBase, ResourcesUtil):
"""
Tests to evaluate the various endpoints for :term:`Job` :term:`Provenance`.
"""
job_url = None # type: Optional[str]
proc_id = None # type: Optional[str]

@classmethod
def setUpClass(cls) -> None:
cls.settings = {
"weaver.cwl_prov": True,
"weaver.wps": True,
"weaver.wps_path": "/ows/wps",
"weaver.wps_restapi_path": "/",
"weaver.wps_output_path": "/wpsoutputs",
"weaver.wps_output_url": "http://localhost/wpsoutputs",
"weaver.wps_output_dir": "/tmp/weaver-test/wps-outputs", # nosec: B108 # don't care hardcoded for test
}
super(TestJobProvenance, cls).setUpClass()
cls.job_url = cls.setup_test_job()

@classmethod
def tearDownClass(cls):
cls.process_store.clear_processes()
cls.job_store.clear_jobs()
super(TestJobProvenance, cls).tearDownClass()

@classmethod
def setup_test_job(cls):
cls.proc_id = cls.fully_qualified_test_name("Echo")
cwl = cls.retrieve_payload("Echo", "package", local=True)
body = {
"processDescription": {
"id": cls.proc_id,
},
"executionUnit": [{"unit": cwl}],
}
cls.deploy_process(body)
data = {
"inputs": {"message": "0123456789"},
}
with contextlib.ExitStack() as stack_exec:
for mock_exec in mocked_execute_celery():
stack_exec.enter_context(mock_exec)
stack_exec.enter_context(mocked_wps_output(cls.settings))
proc_url = f"/processes/{cls.proc_id}/execution"
headers = {"Prefer": "respond-async"}
headers.update(cls.json_headers)
resp = mocked_sub_requests(
cls.app, "post_json", proc_url,
data=data, headers=headers,
timeout=5, only_local=True
)
assert resp.status_code == 201, resp.text
status_url = resp.headers.get("location")
cls.monitor_job(status_url, return_status=True)
return status_url

@parameterized.expand([
({}, {}), # default is JSON
({"f": OutputFormat.JSON}, {}),
({}, {"Accept": ContentType.APP_JSON}),
])
def test_job_prov_json(self, queries, headers):
prov_url = f"{self.job_url}/prov"
resp = self.app.get(prov_url, params=queries, headers=headers)
assert resp.status_code == 200
assert resp.content_type == ContentType.APP_JSON
prov = resp.json
assert "prefix" in prov
assert "wfprov" in prov["prefix"]

@parameterized.expand([
({"f": OutputFormat.XML}, {}),
({}, {"Accept": ContentType.TEXT_XML}),
({}, {"Accept": ContentType.APP_XML}),
])
def test_job_prov_xml(self, queries, headers):
prov_url = f"{self.job_url}/prov"
resp = self.app.get(prov_url, params=queries, headers=headers)
assert resp.status_code == 200
assert resp.content_type in ContentType.ANY_XML
prov = resp.text
assert "<prov:document xmlns:wfprov" in prov

def test_job_prov_ttl(self):
prov_url = f"{self.job_url}/prov"
resp = self.app.get(prov_url, headers={"Accept": ContentType.TEXT_TURTLE})
assert resp.status_code == 200
assert resp.content_type == ContentType.TEXT_TURTLE
prov = resp.text
assert "@prefix cwlprov: " in prov

def test_job_prov_nt(self):
prov_url = f"{self.job_url}/prov"
resp = self.app.get(prov_url, headers={"Accept": ContentType.APP_NT})
assert resp.status_code == 200
assert resp.content_type == ContentType.APP_NT
prov = resp.text
assert "_:N" in prov
assert "wfprov" in prov

def test_job_prov_provn(self):
prov_url = f"{self.job_url}/prov"
resp = self.app.get(prov_url, headers={"Accept": ContentType.TEXT_PROVN})
assert resp.status_code == 200
assert resp.content_type == ContentType.TEXT_PROVN
prov = resp.text
assert "prov:type='wfprov:WorkflowEngine'" in prov

def test_job_prov_info_text(self):
prov_url = f"{self.job_url}/prov/info"
job_id = self.job_url.rsplit("/", 1)[-1]
resp = self.app.get(prov_url, headers={"Accept": ContentType.TEXT_PLAIN})
assert resp.status_code == 200
assert resp.content_type == ContentType.TEXT_PLAIN
prov = resp.text
assert f"Workflow run ID: urn:uuid:{job_id}" in prov

def test_job_prov_info_not_acceptable(self):
job = self.job_store.save_job(
"test",
process=self.proc_id,
status=Status.SUCCEEDED
)
prov_url = job.prov_url(self.settings)
headers = self.json_headers # note: this is the test, while only plain text is supported
resp = self.app.get(f"{prov_url}/info", headers=headers, expect_errors=True)
assert resp.status_code == 406
assert resp.content_type == ContentType.APP_JSON, (
"error should be in JSON regardless of Accept header or the normal contents media-type"
)

@parameterized.expand(
itertools.product(
["processes", "jobs"],
["info", "who", "inputs", "outputs", "run"],
)
)
def test_job_prov_commands(self, path, cmd):
job_id = self.job_url.rsplit("/", 1)[-1]
proc_url = f"/{path}/{self.proc_id}" if path == "processes" else ""
prov_url = f"{proc_url}/jobs/{job_id}/prov/{cmd}"
resp = self.app.get(prov_url, headers={"Accept": ContentType.TEXT_PLAIN})
assert resp.content_type == ContentType.TEXT_PLAIN
assert resp.text != ""

@parameterized.expand(
["inputs", "outputs", "run"]
)
def test_job_prov_run_id(self, path):
"""
Validate retrieval of :term:`Provenance` nested ``runID``.
.. note::
In this case, the ``runID`` is somewhat redundant to the ``jobID`` that is applied identically for
the "main" :term:`Process` at the root of the :term:`Job`, since only an atomic operation is executed.
In the case of a :term:`Workflow` however, each step could be retrieved respectively by their ``runID``.
"""
job_id = self.job_url.rsplit("/", 1)[-1]
prov_url = f"{self.job_url}/prov/{path}/{job_id}"
resp = self.app.get(prov_url, headers={"Accept": ContentType.TEXT_PLAIN})
assert resp.content_type == ContentType.TEXT_PLAIN
assert resp.text != ""

def test_job_prov_run_id_invalid(self):
run_id = str(uuid.uuid4())
prov_url = f"{self.job_url}/prov/run/{run_id}"
resp = self.app.get(prov_url, headers={"Accept": ContentType.TEXT_PLAIN}, expect_errors=True)
assert resp.status_code == 404
assert resp.content_type == ContentType.APP_JSON, (
"Custom JSON error contents are expected to be returned. "
"If plain text is returned (as requested by Accept header), "
"this most probably means an error is raised and caught by "
"pyramid's \"not found view\" utility instead of our \"not found run\" error"
)
assert resp.json["error"] == "No such run ID for specified job provenance."
assert resp.json["value"] == {"run_id": run_id}

def test_job_prov_data_generated_missing(self):
"""
Test that data directly obtained from pre-generated files is handled when no :term:`Provenance` exists.
"""
job = self.job_store.save_job(
"test",
process=self.proc_id,
status=Status.SUCCEEDED
)
prov_url = job.prov_url(self.settings)
resp = self.app.get(prov_url, headers=self.json_headers, expect_errors=True)
assert resp.status_code == 410
assert resp.content_type == ContentType.APP_JSON
assert resp.json["detail"] == "Job provenance could not be retrieved for the specified job."

def test_job_prov_data_dynamic_missing(self):
"""
Test that data generated dynamically by invoking :mod:`cwlprov` is handled when no :term:`Provenance` exists.
"""
job = self.job_store.save_job(
"test",
process=self.proc_id,
status=Status.SUCCEEDED
)
prov_url = job.prov_url(self.settings)
headers = {"Accept": ContentType.TEXT_PLAIN}
resp = self.app.get(f"{prov_url}/info", headers=headers, expect_errors=True)
assert resp.status_code == 410
assert resp.content_type == ContentType.APP_JSON
assert resp.json["detail"] == "Job provenance could not be retrieved for the specified job."
33 changes: 21 additions & 12 deletions tests/functional/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
setup_mongodb_servicestore
)
from weaver import WEAVER_ROOT_DIR
from weaver.base import classinstancemethod
from weaver.database import get_db
from weaver.datatype import Job
from weaver.formats import ContentType
Expand Down Expand Up @@ -64,6 +65,7 @@


class GenericUtils(unittest.TestCase):
@classinstancemethod
def fully_qualified_test_name(self, name=""):
"""
Generates a unique name using the current test method full context name and the provided name, if any.
Expand All @@ -72,7 +74,10 @@ def fully_qualified_test_name(self, name=""):
"""
extra_name = f"-{name}" if name else ""
class_name = fully_qualified_name(self)
test_name = f"{class_name}.{self._testMethodName}{extra_name}"
if hasattr(self, "_testMethodName"):
test_name = f"{class_name}.{self._testMethodName}{extra_name}"
else:
test_name = f"{class_name}{extra_name}" # called from class method
test_name = test_name.replace(".", "-").replace("-_", "_").replace("_-", "-")
return test_name

Expand Down Expand Up @@ -449,24 +454,28 @@ def deploy_process(cls,
info.append(deepcopy(resp.json))
return info # type: ignore

def _try_get_logs(self, status_url):
_resp = self.app.get(f"{status_url}/logs", headers=dict(self.json_headers))
@classmethod
def _try_get_logs(cls, status_url):
_resp = cls.app.get(f"{status_url}/logs", headers=dict(cls.json_headers))
if _resp.status_code == 200:
_text = "\n".join(_resp.json)
return f"Error logs:\n{_text}"
return ""

@overload
def monitor_job(self, status_url, **__):
@classmethod
def monitor_job(cls, status_url, **__):
# type: (str, **Any) -> ExecutionResults
...

@overload
def monitor_job(self, status_url, return_status=False, **__):
@classmethod
def monitor_job(cls, status_url, return_status=False, **__):
# type: (str, Literal[True], **Any) -> JobStatusResponse
...

def monitor_job(self,
@classmethod
def monitor_job(cls,
status_url, # type: str
timeout=None, # type: Optional[int]
interval=None, # type: Optional[int]
Expand Down Expand Up @@ -501,17 +510,17 @@ def check_job_status(_resp, running=False):
body = _resp.json
pretty = json.dumps(body, indent=2, ensure_ascii=False)
statuses = [Status.ACCEPTED, Status.RUNNING, final_status] if running else [final_status]
assert _resp.status_code == 200, f"Execution failed:\n{pretty}\n{self._try_get_logs(status_url)}"
assert body["status"] in statuses, f"Error job info:\n{pretty}\n{self._try_get_logs(status_url)}"
assert _resp.status_code == 200, f"Execution failed:\n{pretty}\n{cls._try_get_logs(status_url)}"
assert body["status"] in statuses, f"Error job info:\n{pretty}\n{cls._try_get_logs(status_url)}"
return body["status"] in {final_status, Status.SUCCEEDED, Status.FAILED} # break condition

time.sleep(1) # small delay to ensure process execution had a chance to start before monitoring
left = timeout or self.monitor_timeout
delta = interval or self.monitor_interval
left = timeout or cls.monitor_timeout
delta = interval or cls.monitor_interval
once = True
resp = None
while left >= 0 or once:
resp = self.app.get(status_url, headers=self.json_headers)
resp = cls.app.get(status_url, headers=cls.json_headers)
if check_job_status(resp, running=True):
break
time.sleep(delta)
Expand All @@ -521,7 +530,7 @@ def check_job_status(_resp, running=False):
if return_status or expect_failed:
return resp.json
params = {"schema": JobInputsOutputsSchema.OGC} # not strict to preserve old 'format' field
resp = self.app.get(f"{status_url}/results", params=params, headers=self.json_headers)
resp = cls.app.get(f"{status_url}/results", params=params, headers=cls.json_headers)
assert resp.status_code == 200, f"Error job info:\n{resp.text}"
return resp.json

Expand Down
27 changes: 27 additions & 0 deletions weaver/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,33 @@ def __get__(self, cls, owner): # noqa
return classmethod(self.fget).__get__(None, owner)()


class classinstancemethod(property): # pylint: disable=C0103,invalid-name
"""
Combines :class:`classmethod` decorator and instance method behavior to work with either reference simultaneously.
.. seealso::
https://stackoverflow.com/a/48809254/5936364
"""
def __init__(self, method, instance=None, owner=None): # noqa
# type: (Callable[[Type[Any], Any, ...], Any], Any, Any) -> None
self.method = method
self.instance = instance
self.owner = owner

def __get__(self, instance, owner=None):
return type(self)(self.method, instance, owner)

def __call__(self, *args, **kwargs):
instance = self.instance
if instance is None:
if not args:
raise TypeError('missing required parameter "self"')
instance, args = args[0], args[1:]

cls = self.owner
return self.method(cls, instance, *args, **kwargs)


class _EnumMeta(enum.EnumMeta):
def __contains__(cls, member):
"""
Expand Down
Loading

0 comments on commit 34fe955

Please sign in to comment.