Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding created and updated at to ExecutionClosure model #1371

Merged
merged 3 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions flytekit/models/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ def __init__(
error: typing.Optional[flytekit.models.core.execution.ExecutionError] = None,
outputs: typing.Optional[LiteralMapBlob] = None,
abort_metadata: typing.Optional[AbortMetadata] = None,
created_at: typing.Optional[datetime.datetime] = None,
updated_at: typing.Optional[datetime.datetime] = None,
):
"""
:param phase: From the flytekit.models.core.execution.WorkflowExecutionPhase enum
Expand All @@ -456,6 +458,8 @@ def __init__(
self._error = error
self._outputs = outputs
self._abort_metadata = abort_metadata
self._created_at = created_at
self._updated_at = updated_at

@property
def error(self) -> flytekit.models.core.execution.ExecutionError:
Expand All @@ -476,6 +480,14 @@ def started_at(self) -> datetime.datetime:
def duration(self) -> datetime.timedelta:
return self._duration

@property
def created_at(self) -> typing.Optional[datetime.datetime]:
return self._created_at

@property
def updated_at(self) -> typing.Optional[datetime.datetime]:
return self._updated_at

@property
def outputs(self) -> LiteralMapBlob:
return self._outputs
Expand All @@ -496,6 +508,10 @@ def to_flyte_idl(self):
)
obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None))
obj.duration.FromTimedelta(self.duration)
if self.created_at:
obj.created_at.FromDatetime(self.created_at.astimezone(_pytz.UTC).replace(tzinfo=None))
if self.updated_at:
obj.updated_at.FromDatetime(self.updated_at.astimezone(_pytz.UTC).replace(tzinfo=None))
return obj

@classmethod
Expand All @@ -520,6 +536,12 @@ def from_flyte_idl(cls, pb2_object):
started_at=pb2_object.started_at.ToDatetime().replace(tzinfo=_pytz.UTC),
duration=pb2_object.duration.ToTimedelta(),
abort_metadata=abort_metadata,
created_at=pb2_object.created_at.ToDatetime().replace(tzinfo=_pytz.UTC)
if pb2_object.HasField("created_at")
else None,
updated_at=pb2_object.updated_at.ToDatetime().replace(tzinfo=_pytz.UTC)
if pb2_object.HasField("updated_at")
else None,
)


Expand Down
19 changes: 19 additions & 0 deletions flytekit/models/node_execution.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import typing

import flyteidl.admin.node_execution_pb2 as _node_execution_pb2
Expand Down Expand Up @@ -96,6 +97,8 @@ def __init__(
error=None,
workflow_node_metadata: typing.Optional[WorkflowNodeMetadata] = None,
task_node_metadata: typing.Optional[TaskNodeMetadata] = None,
created_at: typing.Optional[datetime.datetime] = None,
updated_at: typing.Optional[datetime.datetime] = None,
):
"""
:param int phase:
Expand All @@ -113,6 +116,8 @@ def __init__(
self._workflow_node_metadata = workflow_node_metadata
self._task_node_metadata = task_node_metadata
# TODO: Add output_data field as well.
self._created_at = created_at
self._updated_at = updated_at

@property
def phase(self):
Expand All @@ -135,6 +140,14 @@ def duration(self):
"""
return self._duration

@property
def created_at(self) -> typing.Optional[datetime.datetime]:
return self._created_at

@property
def updated_at(self) -> typing.Optional[datetime.datetime]:
return self._updated_at

@property
def output_uri(self):
"""
Expand Down Expand Up @@ -184,6 +197,10 @@ def to_flyte_idl(self):
)
obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None))
obj.duration.FromTimedelta(self.duration)
if self.created_at:
obj.created_at.FromDatetime(self.created_at.astimezone(_pytz.UTC).replace(tzinfo=None))
if self.updated_at:
obj.updated_at.FromDatetime(self.updated_at.astimezone(_pytz.UTC).replace(tzinfo=None))
return obj

@classmethod
Expand All @@ -205,6 +222,8 @@ def from_flyte_idl(cls, p):
task_node_metadata=TaskNodeMetadata.from_flyte_idl(p.task_node_metadata)
if p.HasField("task_node_metadata")
else None,
created_at=p.created_at.ToDatetime().replace(tzinfo=_pytz.UTC) if p.HasField("created_at") else None,
updated_at=p.updated_at.ToDatetime().replace(tzinfo=_pytz.UTC) if p.HasField("updated_at") else None,
)


Expand Down
8 changes: 8 additions & 0 deletions tests/flytekit/unit/models/test_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ def test_execution_closure_with_output():
started_at=test_datetime,
duration=test_timedelta,
outputs=test_outputs,
created_at=None,
updated_at=test_datetime,
)
assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj.started_at == test_datetime
Expand All @@ -39,6 +41,8 @@ def test_execution_closure_with_output():
assert obj2.started_at == test_datetime
assert obj2.duration == test_timedelta
assert obj2.outputs == test_outputs
assert obj2.created_at is None
assert obj2.updated_at == test_datetime


def test_execution_closure_with_error():
Expand All @@ -53,6 +57,8 @@ def test_execution_closure_with_error():
started_at=test_datetime,
duration=test_timedelta,
error=test_error,
created_at=test_datetime,
updated_at=None,
)
assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj.started_at == test_datetime
Expand All @@ -62,6 +68,8 @@ def test_execution_closure_with_error():
assert obj2 == obj
assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED
assert obj2.started_at == test_datetime
assert obj2.created_at == test_datetime
assert obj2.updated_at is None
assert obj2.duration == test_timedelta
assert obj2.error == test_error

Expand Down