From afd7a94ead056534d8234a398f4bdc068ac4fed5 Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 16 Dec 2022 10:13:12 -0800 Subject: [PATCH 1/3] adding created and updated at to execution closure Signed-off-by: Yee Hing Tong --- flytekit/models/execution.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/flytekit/models/execution.py b/flytekit/models/execution.py index 75e040891b..6c34672186 100644 --- a/flytekit/models/execution.py +++ b/flytekit/models/execution.py @@ -441,6 +441,8 @@ def __init__( error: typing.Optional[flytekit.models.core.execution.ExecutionError] = None, outputs: typing.Optional[LiteralMapBlob] = None, abort_metadata: typing.Optional[AbortMetadata] = None, + created_at: typing.Optional[datetime.datetime] = None, + updated_at: typing.Optional[datetime.datetime] = None, ): """ :param phase: From the flytekit.models.core.execution.WorkflowExecutionPhase enum @@ -456,6 +458,8 @@ def __init__( self._error = error self._outputs = outputs self._abort_metadata = abort_metadata + self._created_at = created_at + self._updated_at = updated_at @property def error(self) -> flytekit.models.core.execution.ExecutionError: @@ -476,6 +480,14 @@ def started_at(self) -> datetime.datetime: def duration(self) -> datetime.timedelta: return self._duration + @property + def created_at(self) -> datetime.datetime: + return self._created_at + + @property + def updated_at(self) -> datetime.datetime: + return self._updated_at + @property def outputs(self) -> LiteralMapBlob: return self._outputs @@ -496,6 +508,10 @@ def to_flyte_idl(self): ) obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None)) obj.duration.FromTimedelta(self.duration) + if self.created_at: + obj.created_at.FromDatetime(self.created_at.astimezone(_pytz.UTC).replace(tzinfo=None)) + if self.updated_at: + obj.updated_at.FromDatetime(self.updated_at.astimezone(_pytz.UTC).replace(tzinfo=None)) return obj @classmethod @@ -520,6 +536,8 @@ def from_flyte_idl(cls, pb2_object): started_at=pb2_object.started_at.ToDatetime().replace(tzinfo=_pytz.UTC), duration=pb2_object.duration.ToTimedelta(), abort_metadata=abort_metadata, + created_at=pb2_object.created_at.ToDatetime().replace(tzinfo=_pytz.UTC) if pb2_object.HasField("created_at") else None, + updated_at=pb2_object.updated_at.ToDatetime().replace(tzinfo=_pytz.UTC) if pb2_object.HasField("updated_at") else None, ) From 49efa397283fc66ebe82ad1097ba51fa797cd56a Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 16 Dec 2022 10:16:54 -0800 Subject: [PATCH 2/3] test Signed-off-by: Yee Hing Tong --- flytekit/models/execution.py | 8 ++++++-- tests/flytekit/unit/models/test_execution.py | 8 ++++++++ 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/flytekit/models/execution.py b/flytekit/models/execution.py index 6c34672186..c6fe20bd72 100644 --- a/flytekit/models/execution.py +++ b/flytekit/models/execution.py @@ -536,8 +536,12 @@ def from_flyte_idl(cls, pb2_object): started_at=pb2_object.started_at.ToDatetime().replace(tzinfo=_pytz.UTC), duration=pb2_object.duration.ToTimedelta(), abort_metadata=abort_metadata, - created_at=pb2_object.created_at.ToDatetime().replace(tzinfo=_pytz.UTC) if pb2_object.HasField("created_at") else None, - updated_at=pb2_object.updated_at.ToDatetime().replace(tzinfo=_pytz.UTC) if pb2_object.HasField("updated_at") else None, + created_at=pb2_object.created_at.ToDatetime().replace(tzinfo=_pytz.UTC) + if pb2_object.HasField("created_at") + else None, + updated_at=pb2_object.updated_at.ToDatetime().replace(tzinfo=_pytz.UTC) + if pb2_object.HasField("updated_at") + else None, ) diff --git a/tests/flytekit/unit/models/test_execution.py b/tests/flytekit/unit/models/test_execution.py index b327d5e9d6..fac5604543 100644 --- a/tests/flytekit/unit/models/test_execution.py +++ b/tests/flytekit/unit/models/test_execution.py @@ -28,6 +28,8 @@ def test_execution_closure_with_output(): started_at=test_datetime, duration=test_timedelta, outputs=test_outputs, + created_at=None, + updated_at=test_datetime, ) assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED assert obj.started_at == test_datetime @@ -39,6 +41,8 @@ def test_execution_closure_with_output(): assert obj2.started_at == test_datetime assert obj2.duration == test_timedelta assert obj2.outputs == test_outputs + assert obj2.created_at is None + assert obj2.updated_at == test_datetime def test_execution_closure_with_error(): @@ -53,6 +57,8 @@ def test_execution_closure_with_error(): started_at=test_datetime, duration=test_timedelta, error=test_error, + created_at=test_datetime, + updated_at=None, ) assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED assert obj.started_at == test_datetime @@ -62,6 +68,8 @@ def test_execution_closure_with_error(): assert obj2 == obj assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED assert obj2.started_at == test_datetime + assert obj2.created_at == test_datetime + assert obj2.updated_at is None assert obj2.duration == test_timedelta assert obj2.error == test_error From f5418ef436423d7c41427780ed687852456d3fbc Mon Sep 17 00:00:00 2001 From: Yee Hing Tong Date: Fri, 16 Dec 2022 10:36:26 -0800 Subject: [PATCH 3/3] add to nodeexecutionclosure too Signed-off-by: Yee Hing Tong --- flytekit/models/execution.py | 4 ++-- flytekit/models/node_execution.py | 19 +++++++++++++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/flytekit/models/execution.py b/flytekit/models/execution.py index c6fe20bd72..08fb3c938d 100644 --- a/flytekit/models/execution.py +++ b/flytekit/models/execution.py @@ -481,11 +481,11 @@ def duration(self) -> datetime.timedelta: return self._duration @property - def created_at(self) -> datetime.datetime: + def created_at(self) -> typing.Optional[datetime.datetime]: return self._created_at @property - def updated_at(self) -> datetime.datetime: + def updated_at(self) -> typing.Optional[datetime.datetime]: return self._updated_at @property diff --git a/flytekit/models/node_execution.py b/flytekit/models/node_execution.py index 220db5cc5f..335a793db6 100644 --- a/flytekit/models/node_execution.py +++ b/flytekit/models/node_execution.py @@ -1,3 +1,4 @@ +import datetime import typing import flyteidl.admin.node_execution_pb2 as _node_execution_pb2 @@ -96,6 +97,8 @@ def __init__( error=None, workflow_node_metadata: typing.Optional[WorkflowNodeMetadata] = None, task_node_metadata: typing.Optional[TaskNodeMetadata] = None, + created_at: typing.Optional[datetime.datetime] = None, + updated_at: typing.Optional[datetime.datetime] = None, ): """ :param int phase: @@ -113,6 +116,8 @@ def __init__( self._workflow_node_metadata = workflow_node_metadata self._task_node_metadata = task_node_metadata # TODO: Add output_data field as well. + self._created_at = created_at + self._updated_at = updated_at @property def phase(self): @@ -135,6 +140,14 @@ def duration(self): """ return self._duration + @property + def created_at(self) -> typing.Optional[datetime.datetime]: + return self._created_at + + @property + def updated_at(self) -> typing.Optional[datetime.datetime]: + return self._updated_at + @property def output_uri(self): """ @@ -184,6 +197,10 @@ def to_flyte_idl(self): ) obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None)) obj.duration.FromTimedelta(self.duration) + if self.created_at: + obj.created_at.FromDatetime(self.created_at.astimezone(_pytz.UTC).replace(tzinfo=None)) + if self.updated_at: + obj.updated_at.FromDatetime(self.updated_at.astimezone(_pytz.UTC).replace(tzinfo=None)) return obj @classmethod @@ -205,6 +222,8 @@ def from_flyte_idl(cls, p): task_node_metadata=TaskNodeMetadata.from_flyte_idl(p.task_node_metadata) if p.HasField("task_node_metadata") else None, + created_at=p.created_at.ToDatetime().replace(tzinfo=_pytz.UTC) if p.HasField("created_at") else None, + updated_at=p.updated_at.ToDatetime().replace(tzinfo=_pytz.UTC) if p.HasField("updated_at") else None, )