Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parse duration field from flyteidl to flytekit.models.execution.ExecutionClosure #829

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion flytekit/models/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,15 +293,17 @@ def from_flyte_idl(cls, pb):


class ExecutionClosure(_common_models.FlyteIdlEntity):
def __init__(self, phase, started_at, error=None, outputs=None):
def __init__(self, phase, started_at, duration, error=None, outputs=None):
"""
:param int phase: From the flytekit.models.core.execution.WorkflowExecutionPhase enum
:param datetime.datetime started_at:
:param datetime.timedelta duration: Duration for which the execution has been running.
:param flytekit.models.core.execution.ExecutionError error:
:param LiteralMapBlob outputs:
"""
self._phase = phase
self._started_at = started_at
self._duration = duration
self._error = error
self._outputs = outputs

Expand All @@ -327,6 +329,13 @@ def started_at(self):
"""
return self._started_at

@property
def duration(self):
"""
:rtype: datetime.timedelta
"""
return self._duration

@property
def outputs(self):
"""
Expand All @@ -344,6 +353,7 @@ def to_flyte_idl(self):
outputs=self.outputs.to_flyte_idl() if self.outputs is not None else None,
)
obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None))
obj.duration.FromTimedelta(self.duration)
return obj

@classmethod
Expand All @@ -363,6 +373,7 @@ def from_flyte_idl(cls, pb2_object):
outputs=outputs,
phase=pb2_object.phase,
started_at=pb2_object.started_at.ToDatetime().replace(tzinfo=_pytz.UTC),
duration=pb2_object.duration.ToTimedelta(),
)


Expand Down
2 changes: 2 additions & 0 deletions tests/flytekit/integration/remote/test_remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@ def test_fetch_execute_workflow(flyteclient, flyte_workflows_register):
flyte_workflow = remote.fetch_workflow(name="workflows.basic.hello_world.my_wf", version=f"v{VERSION}")
execution = remote.execute(flyte_workflow, {}, wait=True)
assert execution.outputs["o0"] == "hello world"
assert isinstance(execution.closure.duration, datetime.timedelta)
assert execution.closure.duration > datetime.timedelta(seconds=1)

execution_to_terminate = remote.execute(flyte_workflow, {})
remote.terminate(execution_to_terminate, cause="just because")
Expand Down
51 changes: 51 additions & 0 deletions tests/flytekit/unit/models/test_execution.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import datetime

import pytest
import pytz

from flytekit.models import common as _common_models
from flytekit.models import execution as _execution
Expand All @@ -15,6 +18,54 @@
)


def test_execution_closure_with_output():
test_datetime = datetime.datetime(year=2022, month=1, day=1, tzinfo=pytz.UTC)
test_timedelta = datetime.timedelta(seconds=10)
test_outputs = _execution.LiteralMapBlob(values=_OUTPUT_MAP, uri="http://foo/")

obj = _execution.ExecutionClosure(
phase=_core_exec.WorkflowExecutionPhase.SUCCEEDED,
started_at=test_datetime,
duration=test_timedelta,
outputs=test_outputs,
)
assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj.started_at == test_datetime
assert obj.duration == test_timedelta
assert obj.outputs == test_outputs
obj2 = _execution.ExecutionClosure.from_flyte_idl(obj.to_flyte_idl())
assert obj2 == obj
assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj2.started_at == test_datetime
assert obj2.duration == test_timedelta
assert obj2.outputs == test_outputs


def test_execution_closure_with_error():
test_datetime = datetime.datetime(year=2022, month=1, day=1, tzinfo=pytz.UTC)
test_timedelta = datetime.timedelta(seconds=10)
test_error = _core_exec.ExecutionError(
code="foo", message="bar", error_uri="http://foobar", kind=_core_exec.ExecutionError.ErrorKind.USER
)

obj = _execution.ExecutionClosure(
phase=_core_exec.WorkflowExecutionPhase.SUCCEEDED,
started_at=test_datetime,
duration=test_timedelta,
error=test_error,
)
assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj.started_at == test_datetime
assert obj.duration == test_timedelta
assert obj.error == test_error
obj2 = _execution.ExecutionClosure.from_flyte_idl(obj.to_flyte_idl())
assert obj2 == obj
assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj2.started_at == test_datetime
assert obj2.duration == test_timedelta
assert obj2.error == test_error


def test_execution_metadata():
obj = _execution.ExecutionMetadata(_execution.ExecutionMetadata.ExecutionMode.MANUAL, "tester", 1)
assert obj.mode == _execution.ExecutionMetadata.ExecutionMode.MANUAL
Expand Down