From edaf65a247c0e086a08a32ace1aad2dfef05057f Mon Sep 17 00:00:00 2001 From: Bernhard Stadlbauer Date: Tue, 25 Jan 2022 13:44:48 +0100 Subject: [PATCH 1/4] Parse duration field from flyteidl to `flytekit.models.execution.ExecutionClosure` Signed-off-by: Bernhard Stadlbauer --- flytekit/models/execution.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/flytekit/models/execution.py b/flytekit/models/execution.py index a29bad80f1..8a46b09335 100644 --- a/flytekit/models/execution.py +++ b/flytekit/models/execution.py @@ -293,15 +293,17 @@ def from_flyte_idl(cls, pb): class ExecutionClosure(_common_models.FlyteIdlEntity): - def __init__(self, phase, started_at, error=None, outputs=None): + def __init__(self, phase, started_at, duration, error=None, outputs=None): """ :param int phase: From the flytekit.models.core.execution.WorkflowExecutionPhase enum :param datetime.datetime started_at: + :param datetime.timedelta duration: Duration for which the execution has been running. :param flytekit.models.core.execution.ExecutionError error: :param LiteralMapBlob outputs: """ self._phase = phase self._started_at = started_at + self._duration = duration self._error = error self._outputs = outputs @@ -327,6 +329,13 @@ def started_at(self): """ return self._started_at + @property + def duration(self): + """ + :rtype: datetime.timedelta + """ + return self._duration + @property def outputs(self): """ @@ -344,6 +353,7 @@ def to_flyte_idl(self): outputs=self.outputs.to_flyte_idl() if self.outputs is not None else None, ) obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None)) + obj.duration.FromTimedelta(self.duration) return obj @classmethod @@ -363,6 +373,7 @@ def from_flyte_idl(cls, pb2_object): outputs=outputs, phase=pb2_object.phase, started_at=pb2_object.started_at.ToDatetime().replace(tzinfo=_pytz.UTC), + duration=pb2_object.duration.ToTimedelta(), ) From 455398bb3482955fe54929e0a76fc5ddc11478dc Mon Sep 17 00:00:00 2001 From: Bernhard Stadlbauer Date: Wed, 26 Jan 2022 09:36:29 +0100 Subject: [PATCH 2/4] Add test for execution closure Signed-off-by: Bernhard Stadlbauer --- tests/flytekit/unit/models/test_execution.py | 32 ++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/flytekit/unit/models/test_execution.py b/tests/flytekit/unit/models/test_execution.py index c0fdf5ba2a..c7520f75f5 100644 --- a/tests/flytekit/unit/models/test_execution.py +++ b/tests/flytekit/unit/models/test_execution.py @@ -1,4 +1,7 @@ +import datetime + import pytest +import pytz from flytekit.models import common as _common_models from flytekit.models import execution as _execution @@ -15,6 +18,35 @@ ) +def test_execution_closure(): + test_datetime = datetime.datetime(year=2022, month=1, day=1, tzinfo=pytz.UTC) + test_timedelta = datetime.timedelta(seconds=10) + test_error = _core_exec.ExecutionError( + code="foo", message="bar", error_uri="http://foobar", kind=_core_exec.ExecutionError.ErrorKind.USER + ) + test_outputs = _execution.LiteralMapBlob(values=_OUTPUT_MAP, uri="http://foo/") + + obj = _execution.ExecutionClosure( + phase=_core_exec.WorkflowExecutionPhase.SUCCEEDED, + started_at=test_datetime, + duration=test_timedelta, + error=test_error, + outputs=test_outputs, + ) + assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED + assert obj.started_at == test_datetime + assert obj.duration == test_timedelta + assert obj.error == test_error + assert obj.outputs == test_outputs + obj2 = _execution.ExecutionClosure.from_flyte_idl(obj.to_flyte_idl()) + assert obj2 == obj + assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED + assert obj2.started_at == test_datetime + assert obj2.duration == test_timedelta + # assert obj2.error == test_error # FIXME: This won't work for some reason? + assert obj2.outputs == test_outputs + + def test_execution_metadata(): obj = _execution.ExecutionMetadata(_execution.ExecutionMetadata.ExecutionMode.MANUAL, "tester", 1) assert obj.mode == _execution.ExecutionMetadata.ExecutionMode.MANUAL From adea5c0372cf0cbb0af35356a6344e1c234bb754 Mon Sep 17 00:00:00 2001 From: Bernhard Stadlbauer Date: Thu, 27 Jan 2022 11:24:41 +0100 Subject: [PATCH 3/4] Add tests to Flyte remote Signed-off-by: Bernhard Stadlbauer --- tests/flytekit/integration/remote/test_remote.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/flytekit/integration/remote/test_remote.py b/tests/flytekit/integration/remote/test_remote.py index 7784d76d82..829137c928 100644 --- a/tests/flytekit/integration/remote/test_remote.py +++ b/tests/flytekit/integration/remote/test_remote.py @@ -145,6 +145,8 @@ def test_fetch_execute_workflow(flyteclient, flyte_workflows_register): flyte_workflow = remote.fetch_workflow(name="workflows.basic.hello_world.my_wf", version=f"v{VERSION}") execution = remote.execute(flyte_workflow, {}, wait=True) assert execution.outputs["o0"] == "hello world" + assert isinstance(execution.closure.duration, datetime.timedelta) + assert execution.closure.duration > datetime.timedelta(seconds=1) execution_to_terminate = remote.execute(flyte_workflow, {}) remote.terminate(execution_to_terminate, cause="just because") From 1adbbbbb08d3dd5d2e74fe6c4abbd62f392a5743 Mon Sep 17 00:00:00 2001 From: Bernhard Stadlbauer Date: Sat, 29 Jan 2022 10:07:27 +0100 Subject: [PATCH 4/4] Split execution test into with output and with error Signed-off-by: Bernhard Stadlbauer --- tests/flytekit/unit/models/test_execution.py | 31 ++++++++++++++++---- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/tests/flytekit/unit/models/test_execution.py b/tests/flytekit/unit/models/test_execution.py index c7520f75f5..8c1ac94fce 100644 --- a/tests/flytekit/unit/models/test_execution.py +++ b/tests/flytekit/unit/models/test_execution.py @@ -18,33 +18,52 @@ ) -def test_execution_closure(): +def test_execution_closure_with_output(): + test_datetime = datetime.datetime(year=2022, month=1, day=1, tzinfo=pytz.UTC) + test_timedelta = datetime.timedelta(seconds=10) + test_outputs = _execution.LiteralMapBlob(values=_OUTPUT_MAP, uri="http://foo/") + + obj = _execution.ExecutionClosure( + phase=_core_exec.WorkflowExecutionPhase.SUCCEEDED, + started_at=test_datetime, + duration=test_timedelta, + outputs=test_outputs, + ) + assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED + assert obj.started_at == test_datetime + assert obj.duration == test_timedelta + assert obj.outputs == test_outputs + obj2 = _execution.ExecutionClosure.from_flyte_idl(obj.to_flyte_idl()) + assert obj2 == obj + assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED + assert obj2.started_at == test_datetime + assert obj2.duration == test_timedelta + assert obj2.outputs == test_outputs + + +def test_execution_closure_with_error(): test_datetime = datetime.datetime(year=2022, month=1, day=1, tzinfo=pytz.UTC) test_timedelta = datetime.timedelta(seconds=10) test_error = _core_exec.ExecutionError( code="foo", message="bar", error_uri="http://foobar", kind=_core_exec.ExecutionError.ErrorKind.USER ) - test_outputs = _execution.LiteralMapBlob(values=_OUTPUT_MAP, uri="http://foo/") obj = _execution.ExecutionClosure( phase=_core_exec.WorkflowExecutionPhase.SUCCEEDED, started_at=test_datetime, duration=test_timedelta, error=test_error, - outputs=test_outputs, ) assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED assert obj.started_at == test_datetime assert obj.duration == test_timedelta assert obj.error == test_error - assert obj.outputs == test_outputs obj2 = _execution.ExecutionClosure.from_flyte_idl(obj.to_flyte_idl()) assert obj2 == obj assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED assert obj2.started_at == test_datetime assert obj2.duration == test_timedelta - # assert obj2.error == test_error # FIXME: This won't work for some reason? - assert obj2.outputs == test_outputs + assert obj2.error == test_error def test_execution_metadata():