From 10e4553690ea5ef18f02e74def1c6a3e60f2d30d Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Tue, 18 Jan 2022 17:56:34 +0100 Subject: [PATCH] BatchJobMetadata: better separation of API related fields and internal fields BatchJobMetadata was originally closely tied to API definition of batch metadata. Over time, various internal/non-standard fields have been added, and the unclear separation between API fields and internal fields started causing issues. BatchJobMetadata now has a more clear and explicit interface to convert from/to the API subset of metadata. related: Open-EO/openeo-aggregator#31 --- openeo_driver/_version.py | 2 +- openeo_driver/backend.py | 91 ++++++++++++++++++++++++++------------- openeo_driver/utils.py | 22 +++++++++- openeo_driver/views.py | 30 +++---------- tests/test_backend.py | 86 +++++++++++++++++++++++++++++++++--- 5 files changed, 169 insertions(+), 62 deletions(-) diff --git a/openeo_driver/_version.py b/openeo_driver/_version.py index 799da1b1..b06608fe 100644 --- a/openeo_driver/_version.py +++ b/openeo_driver/_version.py @@ -1 +1 @@ -__version__ = '0.14.13a1' +__version__ = '0.15.1a1' diff --git a/openeo_driver/backend.py b/openeo_driver/backend.py index 6b33ad09..1a39e70d 100644 --- a/openeo_driver/backend.py +++ b/openeo_driver/backend.py @@ -20,7 +20,7 @@ from openeo.capabilities import ComparableVersion from openeo.internal.process_graph_visitor import ProcessGraphVisitor -from openeo.util import rfc3339 +from openeo.util import rfc3339, dict_no_none from openeo_driver.datacube import DriverDataCube from openeo_driver.datastructs import SarBackscatterArgs from openeo_driver.dry_run import SourceConstraint @@ -239,6 +239,7 @@ class BatchJobMetadata(NamedTuple): budget = None started: datetime = None finished: datetime = None + duration_: timedelta = None memory_time_megabyte: timedelta = None cpu_time: timedelta = None geometry: dict = None @@ -250,39 +251,71 @@ class BatchJobMetadata(NamedTuple): links: List[Dict] = None @property - def duration(self) -> timedelta: + def duration(self) -> Union[timedelta, None]: """Returns the job duration if possible, else None.""" - return self.finished - self.started if self.started and self.finished else None + if self.duration_: + return self.duration_ + elif self.finished and self.started: + return self.finished - self.started @classmethod - def from_dict(cls, d: dict) -> 'BatchJobMetadata': - d = extract_namedtuple_fields_from_dict(d, BatchJobMetadata) - created = d.get("created") - if isinstance(created, str): - d["created"] = rfc3339.parse_datetime(created) - return cls(**d) - - def prepare_for_json(self) -> dict: - """Prepare metadata for JSON serialization""" - d = self._asdict() # pylint: disable=no-member - d["created"] = rfc3339.datetime(self.created) if self.created else None - d["updated"] = rfc3339.datetime(self.updated) if self.updated else None - - usage = {} - - if self.cpu_time: - usage["cpu"] = {"value": int(round(self.cpu_time.total_seconds())), "unit": "cpu-seconds"} - - if self.duration: - usage["duration"] = {"value": int(round(self.duration.total_seconds())), "unit": "seconds"} - - if self.memory_time_megabyte: - usage["memory"] = {"value": int(round(self.memory_time_megabyte.total_seconds())), "unit": "mb-seconds"} + def from_api_dict(cls, d: dict) -> 'BatchJobMetadata': + """Populate from an openEO API compatible dictionary.""" + kwargs = extract_namedtuple_fields_from_dict( + d, BatchJobMetadata, convert_datetime=True, convert_timedelta=True + ) + usage = d.get("usage") if usage: - d["usage"] = usage - - return d + if usage.get("cpu"): + # TODO: support other units too + assert usage["cpu"]["unit"] == "cpu-seconds" + kwargs["cpu_time"] = timedelta(seconds=usage["cpu"]["value"]) + if usage.get("memory"): + assert usage["memory"]["unit"] == "mb-seconds" + kwargs["memory_time_megabyte"] = timedelta(seconds=usage["memory"]["value"]) + if usage.get("duration"): + assert usage["duration"]["unit"] == "seconds" + kwargs["duration_"] = timedelta(seconds=usage["duration"]["value"]) + + return cls(**kwargs) + + def to_api_dict(self, full=True, api_version: ComparableVersion = None) -> dict: + """ + API-version-aware conversion of batch job metadata to jsonable openEO API compatible dict. + see https://openeo.org/documentation/1.0/developers/api/reference.html#operation/describe-job + """ + # Basic/full fields to export + fields = ["id", "status", "created"] + if full: + fields.extend([ + "title", "description", "process", "progress", "created", "updated", "plan", "costs", "budget", + ]) + result = {f: getattr(self, f) for f in fields} + + # Additional cleaning and massaging. + result["created"] = rfc3339.datetime(self.created) if self.created else None + result["updated"] = rfc3339.datetime(self.updated) if self.updated else None + + if full: + usage = {} + if self.cpu_time: + usage["cpu"] = {"value": int(round(self.cpu_time.total_seconds())), "unit": "cpu-seconds"} + if self.duration: + usage["duration"] = {"value": int(round(self.duration.total_seconds())), "unit": "seconds"} + if self.memory_time_megabyte: + usage["memory"] = {"value": int(round(self.memory_time_megabyte.total_seconds())), "unit": "mb-seconds"} + if usage: + result["usage"] = usage + + if api_version and api_version.below("1.0.0"): + result["process_graph"] = result.pop("process", {}).get("process_graph") + result["submitted"] = result.pop("created", None) + # TODO wider status checking coverage? + if result["status"] == "created": + result["status"] = "submitted" + + return dict_no_none(result) class BatchJobs(MicroService): diff --git a/openeo_driver/utils.py b/openeo_driver/utils.py index df212008..8387cabd 100644 --- a/openeo_driver/utils.py +++ b/openeo_driver/utils.py @@ -1,6 +1,8 @@ """ Small general utilities and helper functions """ +import datetime + import json import time import typing @@ -13,6 +15,8 @@ import shapely.geometry import shapely.ops +from openeo.util import rfc3339 + class EvalEnv: """ @@ -309,7 +313,10 @@ def __set__(self, instance, value): instance[self.key] = value -def extract_namedtuple_fields_from_dict(d: dict, named_tuple_class: typing.Type[typing.NamedTuple]) -> dict: +def extract_namedtuple_fields_from_dict( + d: dict, named_tuple_class: typing.Type[typing.NamedTuple], + convert_datetime: bool = False, convert_timedelta: bool = False, +) -> dict: """ Extract `typing.NamedTuple` fields from given dictionary, silently skipping items not defined as field. @@ -329,6 +336,19 @@ def extract_namedtuple_fields_from_dict(d: dict, named_tuple_class: typing.Type[ f"Missing {named_tuple_class.__name__} field{'s' if len(missing) > 1 else ''}: {', '.join(sorted(missing))}." ) + # Additional auto-conversions (by type annotation) + converters = {} + if convert_datetime: + converters[datetime.datetime] = lambda v: rfc3339.parse_datetime(v) + if convert_timedelta: + converters[datetime.timedelta] = lambda v: datetime.timedelta(seconds=v) + + if converters: + for k in result: + converter = converters.get(named_tuple_class.__annotations__.get(k)) + if converter: + result[k] = converter(result[k]) + return result diff --git a/openeo_driver/views.py b/openeo_driver/views.py index f78cd743..04dce043 100644 --- a/openeo_driver/views.py +++ b/openeo_driver/views.py @@ -700,27 +700,6 @@ def processes_details(namespace, process_id): return jsonify(process) -def _jsonable_batch_job_metadata(metadata: BatchJobMetadata, full=True) -> dict: - """API-version-aware conversion of batch job metadata to jsonable dict""" - d = metadata.prepare_for_json() - # Fields to export - fields = ['id', 'title', 'description', 'status', 'created', 'updated', 'plan', 'costs', 'budget'] - if full: - fields.extend([ - 'process', 'progress', 'usage' - ]) - d = {k: v for (k, v) in d.items() if k in fields} - - if requested_api_version().below("1.0.0"): - d["process_graph"] = d.pop("process", {}).get("process_graph") - d["submitted"] = d.pop("created", None) - # TODO wider status checking coverage? - if d["status"] == "created": - d["status"] = "submitted" - - return dict_no_none(**d) - - def _properties_from_job_info(job_info: BatchJobMetadata) -> dict: to_datetime = Rfc3339(propagate_none=True).datetime @@ -806,7 +785,10 @@ def list_jobs(user: User): raise InternalException(f"Invalid user jobs listing {type(listing)}") resp = dict( - jobs=[_jsonable_batch_job_metadata(m, full=False) for m in jobs], + jobs=[ + m.to_api_dict(full=False, api_version=requested_api_version()) + for m in jobs + ], links=links, **extra ) @@ -816,8 +798,8 @@ def list_jobs(user: User): @blueprint.route('/jobs/', methods=['GET']) @auth_handler.requires_bearer_auth def get_job_info(job_id, user: User): - job_info = backend_implementation.batch_jobs.get_job_info(job_id, user) - return jsonify(_jsonable_batch_job_metadata(job_info)) + job_info: BatchJobMetadata = backend_implementation.batch_jobs.get_job_info(job_id, user) + return jsonify(job_info.to_api_dict(full=True, api_version=requested_api_version())) @api_endpoint() @blueprint.route('/jobs/', methods=['DELETE']) diff --git a/tests/test_backend.py b/tests/test_backend.py index 27db14ee..bb17ddac 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -2,6 +2,7 @@ import pytest +from openeo.capabilities import ComparableVersion from openeo_driver.backend import CollectionCatalog, LoadParameters, UserDefinedProcessMetadata, ServiceMetadata, \ BatchJobMetadata from openeo_driver.errors import CollectionNotFoundException @@ -112,17 +113,88 @@ def test_service_metadata_from_dict_created_date(): assert service.created == datetime.datetime(2020, 5, 18, 12, 34, 56) -def test_batch_job_metadata_from_dict_emtpy(): +def test_batch_job_metadata_from_api_dict_emtpy(): with pytest.raises(KeyError, match="Missing BatchJobMetadata fields: created, id, status"): - _ = BatchJobMetadata.from_dict({}) + _ = BatchJobMetadata.from_api_dict({}) -def test_batch_job_metadata_from_dict_basic(): - job = BatchJobMetadata.from_dict({ - "id": "ba7c470b", "created": "2021-06-18T12:34:56Z", - "process": {"id": "ndvi", "process_graph": {}}, "status": "running" +def test_batch_job_metadata_from_api_dict_basic(): + job = BatchJobMetadata.from_api_dict({ + "id": "ba7c470b", "created": "2021-06-18T12:34:56Z", "status": "running", }) assert job.id == "ba7c470b" assert job.created == datetime.datetime(2021, 6, 18, 12, 34, 56) - assert job.process == {"id": "ndvi", "process_graph": {}} assert job.status == "running" + + # Full round trip check + assert job == BatchJobMetadata.from_api_dict(job.to_api_dict()) + + +def test_batch_job_metadata_from_api_dict_auto_conversions(): + job = BatchJobMetadata.from_api_dict({ + "id": "ba7c470b", + "status": "running", + "created": "2021-06-18T12:34:56Z", + "updated": "2021-06-20T20:20:20Z", + }) + assert job.created == datetime.datetime(2021, 6, 18, 12, 34, 56) + assert job.updated == datetime.datetime(2021, 6, 20, 20, 20, 20) + + # Full round trip check + assert job == BatchJobMetadata.from_api_dict(job.to_api_dict()) + + +def test_batch_job_metadata_from_api_dict_usage(): + job = BatchJobMetadata.from_api_dict({ + "id": "ba7c470b", "created": "2021-06-18T12:34:56Z", "status": "running", + "usage": { + "cpu": {"value": 1000, "unit": "cpu-seconds"}, + "memory": {"value": 2000, "unit": "mb-seconds"}, + "duration": {"value": 3000, "unit": "seconds"}, + } + }) + assert job.id == "ba7c470b" + assert job.created == datetime.datetime(2021, 6, 18, 12, 34, 56) + assert job.status == "running" + assert job.cpu_time == datetime.timedelta(seconds=1000) + assert job.memory_time_megabyte == datetime.timedelta(seconds=2000) + assert job.duration == datetime.timedelta(seconds=3000) + assert job.duration_ == datetime.timedelta(seconds=3000) + + # Full round trip check + assert job == BatchJobMetadata.from_api_dict(job.to_api_dict()) + + +def test_batch_job_metadata_to_api_dict(): + api_version = ComparableVersion("1.0.0") + job = BatchJobMetadata( + id="123", status="running", created=datetime.datetime(2022, 1, 18, 16, 42, 0), + process={"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}}, + title="Untitled01", description="Lorem ipsum.", + progress=0.3, + cpu_time=datetime.timedelta(seconds=1000), + memory_time_megabyte=datetime.timedelta(seconds=2000), + started=datetime.datetime(2022, 1, 18, 17, 0, 0), + finished=datetime.datetime(2022, 1, 18, 17, 20, 0), + epsg=4326, + links=[{}], + ) + + assert job.to_api_dict(full=False, api_version=api_version) == { + "id": "123", + "created": "2022-01-18T16:42:00Z", + "status": "running", + } + assert job.to_api_dict(full=True, api_version=api_version) == { + "id": "123", + "created": "2022-01-18T16:42:00Z", + "status": "running", + "process": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}}, + "title": "Untitled01", "description": "Lorem ipsum.", + "progress": 0.3, + "usage": { + "cpu": {"value": 1000, "unit": "cpu-seconds"}, + "memory": {"value": 2000, "unit": "mb-seconds"}, + "duration": {"value": 1200, "unit": "seconds"}, + } + }