From aedcfd40a0d5cb11c60f97e71b711988e8d5cf76 Mon Sep 17 00:00:00 2001 From: Katrina Rogan Date: Thu, 15 Sep 2022 16:36:57 -0700 Subject: [PATCH] Execution model fields (#1164) --- flytekit/models/execution.py | 189 +++++++++++++++---- tests/flytekit/unit/models/test_execution.py | 75 +++++++- 2 files changed, 228 insertions(+), 36 deletions(-) diff --git a/flytekit/models/execution.py b/flytekit/models/execution.py index 3dbdfb8564..75e040891b 100644 --- a/flytekit/models/execution.py +++ b/flytekit/models/execution.py @@ -1,10 +1,15 @@ +from __future__ import annotations + +import datetime import typing +import flyteidl import flyteidl.admin.execution_pb2 as _execution_pb2 import flyteidl.admin.node_execution_pb2 as _node_execution_pb2 import flyteidl.admin.task_execution_pb2 as _task_execution_pb2 import pytz as _pytz +import flytekit from flytekit.models import common as _common_models from flytekit.models import literals as _literals_models from flytekit.models import security @@ -13,52 +18,126 @@ from flytekit.models.node_execution import DynamicWorkflowNodeMetadata +class SystemMetadata(_common_models.FlyteIdlEntity): + def __init__(self, execution_cluster: str): + self._execution_cluster = execution_cluster + + @property + def execution_cluster(self) -> str: + return self._execution_cluster + + def to_flyte_idl(self) -> flyteidl.admin.execution_pb2.SystemMetadata: + return _execution_pb2.SystemMetadata(execution_cluster=self.execution_cluster) + + @classmethod + def from_flyte_idl(cls, pb2_object: flyteidl.admin.execution_pb2.SystemMetadata) -> SystemMetadata: + return cls( + execution_cluster=pb2_object.execution_cluster, + ) + + class ExecutionMetadata(_common_models.FlyteIdlEntity): class ExecutionMode(object): MANUAL = 0 SCHEDULED = 1 SYSTEM = 2 - def __init__(self, mode, principal, nesting): + def __init__( + self, + mode: int, + principal: str, + nesting: int, + scheduled_at: typing.Optional[datetime.datetime] = None, + parent_node_execution: typing.Optional[_identifier.NodeExecutionIdentifier] = None, + reference_execution: typing.Optional[_identifier.WorkflowExecutionIdentifier] = None, + system_metadata: typing.Optional[SystemMetadata] = None, + ): """ - :param int mode: An enum value from ExecutionMetadata.ExecutionMode which specifies how the job started. - :param Text principal: The entity that triggered the execution - :param int nesting: An integer representing how deeply nested the workflow is (i.e. was it triggered by a parent + :param mode: An enum value from ExecutionMetadata.ExecutionMode which specifies how the job started. + :param principal: The entity that triggered the execution + :param nesting: An integer representing how deeply nested the workflow is (i.e. was it triggered by a parent workflow) + :param scheduled_at: For scheduled executions, the requested time for execution for this specific schedule invocation. + :param parent_node_execution: Which subworkflow node (if any) launched this execution + :param reference_execution: Optional, reference workflow execution related to this execution + :param system_metadata: Optional, platform-specific metadata about the execution. """ self._mode = mode self._principal = principal self._nesting = nesting + self._scheduled_at = scheduled_at + self._parent_node_execution = parent_node_execution + self._reference_execution = reference_execution + self._system_metadata = system_metadata @property - def mode(self): + def mode(self) -> int: """ An enum value from ExecutionMetadata.ExecutionMode which specifies how the job started. - :rtype: int """ return self._mode @property - def principal(self): + def principal(self) -> str: """ The entity that triggered the execution - :rtype: Text """ return self._principal @property - def nesting(self): + def nesting(self) -> int: """ An integer representing how deeply nested the workflow is (i.e. was it triggered by a parent workflow) - :rtype: int """ return self._nesting + @property + def scheduled_at(self) -> datetime.datetime: + """ + For scheduled executions, the requested time for execution for this specific schedule invocation. + """ + return self._scheduled_at + + @property + def parent_node_execution(self) -> _identifier.NodeExecutionIdentifier: + """ + Which subworkflow node (if any) launched this execution + """ + return self._parent_node_execution + + @property + def reference_execution(self) -> _identifier.WorkflowExecutionIdentifier: + """ + Optional, reference workflow execution related to this execution + """ + return self._reference_execution + + @property + def system_metadata(self) -> SystemMetadata: + """ + Optional, platform-specific metadata about the execution. + """ + return self._system_metadata + def to_flyte_idl(self): """ :rtype: flyteidl.admin.execution_pb2.ExecutionMetadata """ - return _execution_pb2.ExecutionMetadata(mode=self.mode, principal=self.principal, nesting=self.nesting) + p = _execution_pb2.ExecutionMetadata( + mode=self.mode, + principal=self.principal, + nesting=self.nesting, + parent_node_execution=self.parent_node_execution.to_flyte_idl() + if self.parent_node_execution is not None + else None, + reference_execution=self.reference_execution.to_flyte_idl() + if self.reference_execution is not None + else None, + system_metadata=self.system_metadata.to_flyte_idl() if self.system_metadata is not None else None, + ) + if self.scheduled_at is not None: + p.scheduled_at.FromDatetime(self.scheduled_at) + return p @classmethod def from_flyte_idl(cls, pb2_object): @@ -70,6 +149,16 @@ def from_flyte_idl(cls, pb2_object): mode=pb2_object.mode, principal=pb2_object.principal, nesting=pb2_object.nesting, + scheduled_at=pb2_object.scheduled_at.ToDatetime() if pb2_object.HasField("scheduled_at") else None, + parent_node_execution=_identifier.NodeExecutionIdentifier.from_flyte_idl(pb2_object.parent_node_execution) + if pb2_object.HasField("parent_node_execution") + else None, + reference_execution=_identifier.WorkflowExecutionIdentifier.from_flyte_idl(pb2_object.reference_execution) + if pb2_object.HasField("reference_execution") + else None, + system_metadata=SystemMetadata.from_flyte_idl(pb2_object.system_metadata) + if pb2_object.HasField("system_metadata") + else None, ) @@ -319,57 +408,82 @@ def from_flyte_idl(cls, pb): ) +class AbortMetadata(_common_models.FlyteIdlEntity): + def __init__(self, cause: str, principal: str): + self._cause = cause + self._principal = principal + + @property + def cause(self) -> str: + return self._cause + + @property + def principal(self) -> str: + return self._principal + + def to_flyte_idl(self) -> flyteidl.admin.execution_pb2.AbortMetadata: + return _execution_pb2.AbortMetadata(cause=self.cause, principal=self.principal) + + @classmethod + def from_flyte_idl(cls, pb2_object: flyteidl.admin.execution_pb2.AbortMetadata) -> AbortMetadata: + return cls( + cause=pb2_object.cause, + principal=pb2_object.principal, + ) + + class ExecutionClosure(_common_models.FlyteIdlEntity): - def __init__(self, phase, started_at, duration, error=None, outputs=None): + def __init__( + self, + phase: int, + started_at: datetime.datetime, + duration: datetime.timedelta, + error: typing.Optional[flytekit.models.core.execution.ExecutionError] = None, + outputs: typing.Optional[LiteralMapBlob] = None, + abort_metadata: typing.Optional[AbortMetadata] = None, + ): """ - :param int phase: From the flytekit.models.core.execution.WorkflowExecutionPhase enum - :param datetime.datetime started_at: - :param datetime.timedelta duration: Duration for which the execution has been running. - :param flytekit.models.core.execution.ExecutionError error: - :param LiteralMapBlob outputs: + :param phase: From the flytekit.models.core.execution.WorkflowExecutionPhase enum + :param started_at: + :param duration: Duration for which the execution has been running. + :param error: + :param outputs: + :param abort_metadata: Specifies metadata around an aborted workflow execution. """ self._phase = phase self._started_at = started_at self._duration = duration self._error = error self._outputs = outputs + self._abort_metadata = abort_metadata @property - def error(self): - """ - :rtype: flytekit.models.core.execution.ExecutionError - """ + def error(self) -> flytekit.models.core.execution.ExecutionError: return self._error @property - def phase(self): + def phase(self) -> int: """ From the flytekit.models.core.execution.WorkflowExecutionPhase enum - :rtype: int """ return self._phase @property - def started_at(self): - """ - :rtype: datetime.datetime - """ + def started_at(self) -> datetime.datetime: return self._started_at @property - def duration(self): - """ - :rtype: datetime.timedelta - """ + def duration(self) -> datetime.timedelta: return self._duration @property - def outputs(self): - """ - :rtype: LiteralMapBlob - """ + def outputs(self) -> LiteralMapBlob: return self._outputs + @property + def abort_metadata(self) -> AbortMetadata: + return self._abort_metadata + def to_flyte_idl(self): """ :rtype: flyteidl.admin.execution_pb2.ExecutionClosure @@ -378,6 +492,7 @@ def to_flyte_idl(self): phase=self.phase, error=self.error.to_flyte_idl() if self.error is not None else None, outputs=self.outputs.to_flyte_idl() if self.outputs is not None else None, + abort_metadata=self.abort_metadata.to_flyte_idl() if self.abort_metadata is not None else None, ) obj.started_at.FromDatetime(self.started_at.astimezone(_pytz.UTC).replace(tzinfo=None)) obj.duration.FromTimedelta(self.duration) @@ -395,12 +510,16 @@ def from_flyte_idl(cls, pb2_object): outputs = None if pb2_object.HasField("outputs"): outputs = LiteralMapBlob.from_flyte_idl(pb2_object.outputs) + abort_metadata = None + if pb2_object.HasField("abort_metadata"): + abort_metadata = AbortMetadata.from_flyte_idl(pb2_object.abort_metadata) return cls( error=error, outputs=outputs, phase=pb2_object.phase, started_at=pb2_object.started_at.ToDatetime().replace(tzinfo=_pytz.UTC), duration=pb2_object.duration.ToTimedelta(), + abort_metadata=abort_metadata, ) diff --git a/tests/flytekit/unit/models/test_execution.py b/tests/flytekit/unit/models/test_execution.py index 3db0a8eb23..b327d5e9d6 100644 --- a/tests/flytekit/unit/models/test_execution.py +++ b/tests/flytekit/unit/models/test_execution.py @@ -66,16 +66,79 @@ def test_execution_closure_with_error(): assert obj2.error == test_error +def test_execution_closure_with_abort_metadata(): + test_datetime = datetime.datetime(year=2022, month=1, day=1, tzinfo=pytz.UTC) + test_timedelta = datetime.timedelta(seconds=10) + abort_metadata = _execution.AbortMetadata(cause="cause", principal="skinner") + + obj = _execution.ExecutionClosure( + phase=_core_exec.WorkflowExecutionPhase.SUCCEEDED, + started_at=test_datetime, + duration=test_timedelta, + abort_metadata=abort_metadata, + ) + assert obj.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED + assert obj.started_at == test_datetime + assert obj.duration == test_timedelta + assert obj.abort_metadata == abort_metadata + obj2 = _execution.ExecutionClosure.from_flyte_idl(obj.to_flyte_idl()) + assert obj2 == obj + assert obj2.phase == _core_exec.WorkflowExecutionPhase.SUCCEEDED + assert obj2.started_at == test_datetime + assert obj2.duration == test_timedelta + assert obj2.abort_metadata == abort_metadata + + +def test_system_metadata(): + obj = _execution.SystemMetadata(execution_cluster="my_cluster") + assert obj.execution_cluster == "my_cluster" + obj2 = _execution.SystemMetadata.from_flyte_idl(obj.to_flyte_idl()) + assert obj == obj2 + assert obj2.execution_cluster == "my_cluster" + + def test_execution_metadata(): - obj = _execution.ExecutionMetadata(_execution.ExecutionMetadata.ExecutionMode.MANUAL, "tester", 1) + scheduled_at = datetime.datetime.now() + system_metadata = _execution.SystemMetadata(execution_cluster="my_cluster") + parent_node_execution = _identifier.NodeExecutionIdentifier( + node_id="node_id", + execution_id=_identifier.WorkflowExecutionIdentifier( + project="project", + domain="domain", + name="parent", + ), + ) + reference_execution = _identifier.WorkflowExecutionIdentifier( + project="project", + domain="domain", + name="reference", + ) + + obj = _execution.ExecutionMetadata( + _execution.ExecutionMetadata.ExecutionMode.MANUAL, + "tester", + 1, + scheduled_at=scheduled_at, + parent_node_execution=parent_node_execution, + reference_execution=reference_execution, + system_metadata=system_metadata, + ) assert obj.mode == _execution.ExecutionMetadata.ExecutionMode.MANUAL assert obj.principal == "tester" assert obj.nesting == 1 + assert obj.scheduled_at == scheduled_at + assert obj.parent_node_execution == parent_node_execution + assert obj.reference_execution == reference_execution + assert obj.system_metadata == system_metadata obj2 = _execution.ExecutionMetadata.from_flyte_idl(obj.to_flyte_idl()) assert obj == obj2 assert obj2.mode == _execution.ExecutionMetadata.ExecutionMode.MANUAL assert obj2.principal == "tester" assert obj2.nesting == 1 + assert obj2.scheduled_at == scheduled_at + assert obj2.parent_node_execution == parent_node_execution + assert obj2.reference_execution == reference_execution + assert obj2.system_metadata == system_metadata @pytest.mark.parametrize("literal_value_pair", _parameterizers.LIST_OF_SCALAR_LITERALS_AND_PYTHON_VALUE) @@ -198,3 +261,13 @@ def test_task_execution_data_response(): assert obj2.outputs == output_blob assert obj2.full_inputs == _INPUT_MAP assert obj2.full_outputs == _OUTPUT_MAP + + +def test_abort_metadata(): + obj = _execution.AbortMetadata(cause="cause", principal="skinner") + assert obj.cause == "cause" + assert obj.principal == "skinner" + obj2 = _execution.AbortMetadata.from_flyte_idl(obj.to_flyte_idl()) + assert obj == obj2 + assert obj2.cause == "cause" + assert obj2.principal == "skinner"