Skip to content

Commit

Permalink
BatchJobMetadata: better separation of API related fields and interna…
Browse files Browse the repository at this point in the history
…l fields

BatchJobMetadata was originally closely tied to API definition of batch metadata.
Over time, various internal/non-standard fields have been added,
and the unclear separation between API fields and internal fields started causing issues.
BatchJobMetadata now has a more clear and explicit interface to convert from/to the API subset of metadata.

related: Open-EO/openeo-aggregator#31
  • Loading branch information
soxofaan committed Jan 18, 2022
1 parent baf7b0e commit 10e4553
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 62 deletions.
2 changes: 1 addition & 1 deletion openeo_driver/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.14.13a1'
__version__ = '0.15.1a1'
91 changes: 62 additions & 29 deletions openeo_driver/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from openeo.capabilities import ComparableVersion
from openeo.internal.process_graph_visitor import ProcessGraphVisitor
from openeo.util import rfc3339
from openeo.util import rfc3339, dict_no_none
from openeo_driver.datacube import DriverDataCube
from openeo_driver.datastructs import SarBackscatterArgs
from openeo_driver.dry_run import SourceConstraint
Expand Down Expand Up @@ -239,6 +239,7 @@ class BatchJobMetadata(NamedTuple):
budget = None
started: datetime = None
finished: datetime = None
duration_: timedelta = None
memory_time_megabyte: timedelta = None
cpu_time: timedelta = None
geometry: dict = None
Expand All @@ -250,39 +251,71 @@ class BatchJobMetadata(NamedTuple):
links: List[Dict] = None

@property
def duration(self) -> timedelta:
def duration(self) -> Union[timedelta, None]:
"""Returns the job duration if possible, else None."""
return self.finished - self.started if self.started and self.finished else None
if self.duration_:
return self.duration_
elif self.finished and self.started:
return self.finished - self.started

@classmethod
def from_dict(cls, d: dict) -> 'BatchJobMetadata':
d = extract_namedtuple_fields_from_dict(d, BatchJobMetadata)
created = d.get("created")
if isinstance(created, str):
d["created"] = rfc3339.parse_datetime(created)
return cls(**d)

def prepare_for_json(self) -> dict:
"""Prepare metadata for JSON serialization"""
d = self._asdict() # pylint: disable=no-member
d["created"] = rfc3339.datetime(self.created) if self.created else None
d["updated"] = rfc3339.datetime(self.updated) if self.updated else None

usage = {}

if self.cpu_time:
usage["cpu"] = {"value": int(round(self.cpu_time.total_seconds())), "unit": "cpu-seconds"}

if self.duration:
usage["duration"] = {"value": int(round(self.duration.total_seconds())), "unit": "seconds"}

if self.memory_time_megabyte:
usage["memory"] = {"value": int(round(self.memory_time_megabyte.total_seconds())), "unit": "mb-seconds"}
def from_api_dict(cls, d: dict) -> 'BatchJobMetadata':
"""Populate from an openEO API compatible dictionary."""
kwargs = extract_namedtuple_fields_from_dict(
d, BatchJobMetadata, convert_datetime=True, convert_timedelta=True
)

usage = d.get("usage")
if usage:
d["usage"] = usage

return d
if usage.get("cpu"):
# TODO: support other units too
assert usage["cpu"]["unit"] == "cpu-seconds"
kwargs["cpu_time"] = timedelta(seconds=usage["cpu"]["value"])
if usage.get("memory"):
assert usage["memory"]["unit"] == "mb-seconds"
kwargs["memory_time_megabyte"] = timedelta(seconds=usage["memory"]["value"])
if usage.get("duration"):
assert usage["duration"]["unit"] == "seconds"
kwargs["duration_"] = timedelta(seconds=usage["duration"]["value"])

return cls(**kwargs)

def to_api_dict(self, full=True, api_version: ComparableVersion = None) -> dict:
"""
API-version-aware conversion of batch job metadata to jsonable openEO API compatible dict.
see https://openeo.org/documentation/1.0/developers/api/reference.html#operation/describe-job
"""
# Basic/full fields to export
fields = ["id", "status", "created"]
if full:
fields.extend([
"title", "description", "process", "progress", "created", "updated", "plan", "costs", "budget",
])
result = {f: getattr(self, f) for f in fields}

# Additional cleaning and massaging.
result["created"] = rfc3339.datetime(self.created) if self.created else None
result["updated"] = rfc3339.datetime(self.updated) if self.updated else None

if full:
usage = {}
if self.cpu_time:
usage["cpu"] = {"value": int(round(self.cpu_time.total_seconds())), "unit": "cpu-seconds"}
if self.duration:
usage["duration"] = {"value": int(round(self.duration.total_seconds())), "unit": "seconds"}
if self.memory_time_megabyte:
usage["memory"] = {"value": int(round(self.memory_time_megabyte.total_seconds())), "unit": "mb-seconds"}
if usage:
result["usage"] = usage

if api_version and api_version.below("1.0.0"):
result["process_graph"] = result.pop("process", {}).get("process_graph")
result["submitted"] = result.pop("created", None)
# TODO wider status checking coverage?
if result["status"] == "created":
result["status"] = "submitted"

return dict_no_none(result)


class BatchJobs(MicroService):
Expand Down
22 changes: 21 additions & 1 deletion openeo_driver/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""
Small general utilities and helper functions
"""
import datetime

import json
import time
import typing
Expand All @@ -13,6 +15,8 @@
import shapely.geometry
import shapely.ops

from openeo.util import rfc3339


class EvalEnv:
"""
Expand Down Expand Up @@ -309,7 +313,10 @@ def __set__(self, instance, value):
instance[self.key] = value


def extract_namedtuple_fields_from_dict(d: dict, named_tuple_class: typing.Type[typing.NamedTuple]) -> dict:
def extract_namedtuple_fields_from_dict(
d: dict, named_tuple_class: typing.Type[typing.NamedTuple],
convert_datetime: bool = False, convert_timedelta: bool = False,
) -> dict:
"""
Extract `typing.NamedTuple` fields from given dictionary,
silently skipping items not defined as field.
Expand All @@ -329,6 +336,19 @@ def extract_namedtuple_fields_from_dict(d: dict, named_tuple_class: typing.Type[
f"Missing {named_tuple_class.__name__} field{'s' if len(missing) > 1 else ''}: {', '.join(sorted(missing))}."
)

# Additional auto-conversions (by type annotation)
converters = {}
if convert_datetime:
converters[datetime.datetime] = lambda v: rfc3339.parse_datetime(v)
if convert_timedelta:
converters[datetime.timedelta] = lambda v: datetime.timedelta(seconds=v)

if converters:
for k in result:
converter = converters.get(named_tuple_class.__annotations__.get(k))
if converter:
result[k] = converter(result[k])

return result


Expand Down
30 changes: 6 additions & 24 deletions openeo_driver/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -700,27 +700,6 @@ def processes_details(namespace, process_id):
return jsonify(process)


def _jsonable_batch_job_metadata(metadata: BatchJobMetadata, full=True) -> dict:
"""API-version-aware conversion of batch job metadata to jsonable dict"""
d = metadata.prepare_for_json()
# Fields to export
fields = ['id', 'title', 'description', 'status', 'created', 'updated', 'plan', 'costs', 'budget']
if full:
fields.extend([
'process', 'progress', 'usage'
])
d = {k: v for (k, v) in d.items() if k in fields}

if requested_api_version().below("1.0.0"):
d["process_graph"] = d.pop("process", {}).get("process_graph")
d["submitted"] = d.pop("created", None)
# TODO wider status checking coverage?
if d["status"] == "created":
d["status"] = "submitted"

return dict_no_none(**d)


def _properties_from_job_info(job_info: BatchJobMetadata) -> dict:
to_datetime = Rfc3339(propagate_none=True).datetime

Expand Down Expand Up @@ -806,7 +785,10 @@ def list_jobs(user: User):
raise InternalException(f"Invalid user jobs listing {type(listing)}")

resp = dict(
jobs=[_jsonable_batch_job_metadata(m, full=False) for m in jobs],
jobs=[
m.to_api_dict(full=False, api_version=requested_api_version())
for m in jobs
],
links=links,
**extra
)
Expand All @@ -816,8 +798,8 @@ def list_jobs(user: User):
@blueprint.route('/jobs/<job_id>', methods=['GET'])
@auth_handler.requires_bearer_auth
def get_job_info(job_id, user: User):
job_info = backend_implementation.batch_jobs.get_job_info(job_id, user)
return jsonify(_jsonable_batch_job_metadata(job_info))
job_info: BatchJobMetadata = backend_implementation.batch_jobs.get_job_info(job_id, user)
return jsonify(job_info.to_api_dict(full=True, api_version=requested_api_version()))

@api_endpoint()
@blueprint.route('/jobs/<job_id>', methods=['DELETE'])
Expand Down
86 changes: 79 additions & 7 deletions tests/test_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest

from openeo.capabilities import ComparableVersion
from openeo_driver.backend import CollectionCatalog, LoadParameters, UserDefinedProcessMetadata, ServiceMetadata, \
BatchJobMetadata
from openeo_driver.errors import CollectionNotFoundException
Expand Down Expand Up @@ -112,17 +113,88 @@ def test_service_metadata_from_dict_created_date():
assert service.created == datetime.datetime(2020, 5, 18, 12, 34, 56)


def test_batch_job_metadata_from_dict_emtpy():
def test_batch_job_metadata_from_api_dict_emtpy():
with pytest.raises(KeyError, match="Missing BatchJobMetadata fields: created, id, status"):
_ = BatchJobMetadata.from_dict({})
_ = BatchJobMetadata.from_api_dict({})


def test_batch_job_metadata_from_dict_basic():
job = BatchJobMetadata.from_dict({
"id": "ba7c470b", "created": "2021-06-18T12:34:56Z",
"process": {"id": "ndvi", "process_graph": {}}, "status": "running"
def test_batch_job_metadata_from_api_dict_basic():
job = BatchJobMetadata.from_api_dict({
"id": "ba7c470b", "created": "2021-06-18T12:34:56Z", "status": "running",
})
assert job.id == "ba7c470b"
assert job.created == datetime.datetime(2021, 6, 18, 12, 34, 56)
assert job.process == {"id": "ndvi", "process_graph": {}}
assert job.status == "running"

# Full round trip check
assert job == BatchJobMetadata.from_api_dict(job.to_api_dict())


def test_batch_job_metadata_from_api_dict_auto_conversions():
job = BatchJobMetadata.from_api_dict({
"id": "ba7c470b",
"status": "running",
"created": "2021-06-18T12:34:56Z",
"updated": "2021-06-20T20:20:20Z",
})
assert job.created == datetime.datetime(2021, 6, 18, 12, 34, 56)
assert job.updated == datetime.datetime(2021, 6, 20, 20, 20, 20)

# Full round trip check
assert job == BatchJobMetadata.from_api_dict(job.to_api_dict())


def test_batch_job_metadata_from_api_dict_usage():
job = BatchJobMetadata.from_api_dict({
"id": "ba7c470b", "created": "2021-06-18T12:34:56Z", "status": "running",
"usage": {
"cpu": {"value": 1000, "unit": "cpu-seconds"},
"memory": {"value": 2000, "unit": "mb-seconds"},
"duration": {"value": 3000, "unit": "seconds"},
}
})
assert job.id == "ba7c470b"
assert job.created == datetime.datetime(2021, 6, 18, 12, 34, 56)
assert job.status == "running"
assert job.cpu_time == datetime.timedelta(seconds=1000)
assert job.memory_time_megabyte == datetime.timedelta(seconds=2000)
assert job.duration == datetime.timedelta(seconds=3000)
assert job.duration_ == datetime.timedelta(seconds=3000)

# Full round trip check
assert job == BatchJobMetadata.from_api_dict(job.to_api_dict())


def test_batch_job_metadata_to_api_dict():
api_version = ComparableVersion("1.0.0")
job = BatchJobMetadata(
id="123", status="running", created=datetime.datetime(2022, 1, 18, 16, 42, 0),
process={"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}},
title="Untitled01", description="Lorem ipsum.",
progress=0.3,
cpu_time=datetime.timedelta(seconds=1000),
memory_time_megabyte=datetime.timedelta(seconds=2000),
started=datetime.datetime(2022, 1, 18, 17, 0, 0),
finished=datetime.datetime(2022, 1, 18, 17, 20, 0),
epsg=4326,
links=[{}],
)

assert job.to_api_dict(full=False, api_version=api_version) == {
"id": "123",
"created": "2022-01-18T16:42:00Z",
"status": "running",
}
assert job.to_api_dict(full=True, api_version=api_version) == {
"id": "123",
"created": "2022-01-18T16:42:00Z",
"status": "running",
"process": {"add": {"process_id": "add", "arguments": {"x": 3, "y": 5}, "result": True}},
"title": "Untitled01", "description": "Lorem ipsum.",
"progress": 0.3,
"usage": {
"cpu": {"value": 1000, "unit": "cpu-seconds"},
"memory": {"value": 2000, "unit": "mb-seconds"},
"duration": {"value": 1200, "unit": "seconds"},
}
}

0 comments on commit 10e4553

Please sign in to comment.