diff --git a/flytekit/models/execution.py b/flytekit/models/execution.py index a29bad80f1..8a46b09335 100644 --- a/flytekit/models/execution.py +++ b/flytekit/models/execution.py @@ -293,15 +293,17 @@ def from_flyte_idl(cls, pb): class ExecutionClosure(_common_models.FlyteIdlEntity): - def __init__(self, phase, started_at, error=None, outputs=None): + def __init__(self, phase, started_at, duration, error=None, outputs=None): """ :param int phase: From the flytekit.models.core.execution.WorkflowExecutionPhase enum :param datetime.datetime started_at: + :param datetime.timedelta duration: Duration for which the execution has been running. :param flytekit.models.core.execution.ExecutionError error: :param LiteralMapBlob outputs: """ self._phase = phase self._started_at = started_at + self._duration = duration self._error = error self._outputs = outputs @@ -327,6 +329,13 @@ def started_at(self): """ return self._started_at + @property + def duration(self): + """ + :rtype: datetime.timedelta + """ + return self._duration + @property def outputs(self): """ @@ -344,6 +353,7 @@ def to_flyte_idl(self): outputs=self.outputs.to_flyte_idl() if self.outputs is not None else None, ) obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None)) + obj.duration.FromTimedelta(self.duration) return obj @classmethod @@ -363,6 +373,7 @@ def from_flyte_idl(cls, pb2_object): outputs=outputs, phase=pb2_object.phase, started_at=pb2_object.started_at.ToDatetime().replace(tzinfo=_pytz.UTC), + duration=pb2_object.duration.ToTimedelta(), ) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 7784d76d82..829137c928 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -145,6 +145,8 @@ def test_fetch_execute_workflow(flyteclient, flyte_workflows_register): flyte_workflow = remote.fetch_workflow(name="workflows.basic.hello_world.my_wf", version=f"v{VERSION}") execution = remote.execute(flyte_workflow, {}, wait=True) assert execution.outputs["o0"] == "hello world" + assert isinstance(execution.closure.duration, datetime.timedelta) + assert execution.closure.duration > datetime.timedelta(seconds=1) execution_to_terminate = remote.execute(flyte_workflow, {}) remote.terminate(execution_to_terminate, cause="just because") diff --git a/tests/flytekit/unit/models/test_execution.py b/tests/flytekit/unit/models/test_execution.py index c0fdf5ba2a..8c1ac94fce 100644 --- a/tests/flytekit/unit/models/test_execution.py +++ b/tests/flytekit/unit/models/test_execution.py @@ -1,4 +1,7 @@ +import datetime + import pytest +import pytz from flytekit.models import common as _common_models from flytekit.models import execution as _execution @@ -15,6 +18,54 @@ ) +def test_execution_closure_with_output(): + test_datetime = datetime.datetime(year=2022, month=1, day=1, tzinfo=pytz.UTC) + test_timedelta = datetime.timedelta(seconds=10) + test_outputs = _execution.LiteralMapBlob(values=_OUTPUT_MAP, uri="http://foo/") + + obj = _execution.ExecutionClosure( + phase=_core_exec.WorkflowExecutionPhase.SUCCEEDED, + started_at=test_datetime, + duration=test_timedelta, + outputs=test_outputs, + ) + assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED + assert obj.started_at == test_datetime + assert obj.duration == test_timedelta + assert obj.outputs == test_outputs + obj2 = _execution.ExecutionClosure.from_flyte_idl(obj.to_flyte_idl()) + assert obj2 == obj + assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED + assert obj2.started_at == test_datetime + assert obj2.duration == test_timedelta + assert obj2.outputs == test_outputs + + +def test_execution_closure_with_error(): + test_datetime = datetime.datetime(year=2022, month=1, day=1, tzinfo=pytz.UTC) + test_timedelta = datetime.timedelta(seconds=10) + test_error = _core_exec.ExecutionError( + code="foo", message="bar", error_uri="http://foobar", kind=_core_exec.ExecutionError.ErrorKind.USER + ) + + obj = _execution.ExecutionClosure( + phase=_core_exec.WorkflowExecutionPhase.SUCCEEDED, + started_at=test_datetime, + duration=test_timedelta, + error=test_error, + ) + assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED + assert obj.started_at == test_datetime + assert obj.duration == test_timedelta + assert obj.error == test_error + obj2 = _execution.ExecutionClosure.from_flyte_idl(obj.to_flyte_idl()) + assert obj2 == obj + assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED + assert obj2.started_at == test_datetime + assert obj2.duration == test_timedelta + assert obj2.error == test_error + + def test_execution_metadata(): obj = _execution.ExecutionMetadata(_execution.ExecutionMetadata.ExecutionMode.MANUAL, "tester", 1) assert obj.mode == _execution.ExecutionMetadata.ExecutionMode.MANUAL