diff --git a/flytekit/__init__.py b/flytekit/__init__.py index 4ebe7115b2..1679c05c5a 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -2,4 +2,4 @@ import flytekit.plugins -__version__ = '0.10.1' +__version__ = '0.10.2' diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index 2eec22ed44..cbf1368945 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -455,17 +455,16 @@ def _discover_workflow_components(workflow_class): return inputs, outputs, nodes -def build_sdk_workflow_from_metaclass(metaclass, queuing_budget=None, on_failure=None, cls=None): +def build_sdk_workflow_from_metaclass(metaclass, on_failure=None, cls=None): """ :param T metaclass: :param cls: This is the class that will be instantiated from the inputs, outputs, and nodes. This will be used by users extending the base Flyte programming model. If set, it must be a subclass of SdkWorkflow. - :param queuing_budget datetime.timedelta: [Optional] Budget that specifies the amount of time a workflow can be queued up for execution. :param on_failure flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy: [Optional] The execution policy when the workflow detects a failure. :rtype: SdkWorkflow """ inputs, outputs, nodes = _discover_workflow_components(metaclass) - metadata = _workflow_models.WorkflowMetadata(queuing_budget=queuing_budget if queuing_budget else None, on_failure=on_failure if on_failure else None) + metadata = _workflow_models.WorkflowMetadata(on_failure=on_failure if on_failure else None) return (cls or SdkWorkflow)( inputs=[i for i in sorted(inputs, key=lambda x: x.name)], outputs=[o for o in sorted(outputs, key=lambda x: x.name)], diff --git a/flytekit/models/core/workflow.py b/flytekit/models/core/workflow.py index 6e70016108..fdf12c694f 100644 --- a/flytekit/models/core/workflow.py +++ b/flytekit/models/core/workflow.py @@ -465,23 +465,14 @@ class OnFailurePolicy(object): FAIL_IMMEDIATELY = _core_workflow.WorkflowMetadata.FAIL_IMMEDIATELY FAIL_AFTER_EXECUTABLE_NODES_COMPLETE = _core_workflow.WorkflowMetadata.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE - def __init__(self, queuing_budget=None, on_failure=None): + def __init__(self, on_failure=None): """ Metadata for the workflow. - :param queuing_budget datetime.timedelta: [Optional] Budget that specifies the amount of time a workflow can be queued up for execution. :param on_failure flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy: [Optional] The execution policy when the workflow detects a failure. """ - self._queuing_budget = queuing_budget self._on_failure = on_failure - @property - def queuing_budget(self): - """ - :rtype: datetime.timedelta - """ - return self._queuing_budget - @property def on_failure(self): """ @@ -494,8 +485,6 @@ def to_flyte_idl(self): :rtype: flyteidl.core.workflow_pb2.WorkflowMetadata """ workflow_metadata = _core_workflow.WorkflowMetadata() - if self._queuing_budget: - workflow_metadata.queuing_budget.FromTimedelta(self.queuing_budget) if self.on_failure: workflow_metadata.on_failure = self.on_failure return workflow_metadata @@ -507,10 +496,10 @@ def from_flyte_idl(cls, pb2_object): :rtype: WorkflowMetadata """ return cls( - queuing_budget=pb2_object.queuing_budget.ToTimedelta() if pb2_object.queuing_budget else None, on_failure=pb2_object.on_failure if pb2_object.on_failure else WorkflowMetadata.OnFailurePolicy.FAIL_IMMEDIATELY ) + class WorkflowMetadataDefaults(_common.FlyteIdlEntity): def __init__(self, interruptible=None): diff --git a/flytekit/sdk/workflow.py b/flytekit/sdk/workflow.py index 57c9c2d71b..a6abe2bb26 100644 --- a/flytekit/sdk/workflow.py +++ b/flytekit/sdk/workflow.py @@ -1,7 +1,9 @@ from __future__ import absolute_import + +import six as _six + from flytekit.common import workflow as _common_workflow, promise as _promise from flytekit.common.types import helpers as _type_helpers -import six as _six class Input(_promise.Input): @@ -42,7 +44,7 @@ def __init__(self, value, sdk_type=None, help=None): ) -def workflow_class(_workflow_metaclass=None, cls=None, queuing_budget=None, on_failure=None): +def workflow_class(_workflow_metaclass=None, cls=None, on_failure=None): """ This is a decorator for wrapping class definitions into workflows. @@ -62,13 +64,12 @@ class MyWorkflow(object): :param cls: This is the class that will be instantiated from the inputs, outputs, and nodes. This will be used by users extending the base Flyte programming model. If set, it must be a subclass of :py:class:`flytekit.common.workflow.SdkWorkflow`. - :param queuing_budget datetime.timedelta: [Optional] Budget that specifies the amount of time a workflow can be queued up for execution. :param on_failure flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy: [Optional] The execution policy when the workflow detects a failure. :rtype: flytekit.common.workflow.SdkWorkflow """ def wrapper(metaclass): - wf = _common_workflow.build_sdk_workflow_from_metaclass(metaclass, cls=cls, queuing_budget=queuing_budget, on_failure=on_failure) + wf = _common_workflow.build_sdk_workflow_from_metaclass(metaclass, cls=cls, on_failure=on_failure) return wf if _workflow_metaclass is not None: @@ -76,7 +77,7 @@ def wrapper(metaclass): return wrapper -def workflow(nodes, inputs=None, outputs=None, cls=None, queuing_budget=None, on_failure=None): +def workflow(nodes, inputs=None, outputs=None, cls=None, on_failure=None): """ This function provides a user-friendly interface for authoring workflows. @@ -109,14 +110,12 @@ def workflow(nodes, inputs=None, outputs=None, cls=None, queuing_budget=None, on :param T cls: This is the class that will be instantiated from the inputs, outputs, and nodes. This will be used by users extending the base Flyte programming model. If set, it must be a subclass of :py:class:`flytekit.common.workflow.SdkWorkflow`. - :param queuing_budget datetime.timedelta: [Optional] Budget that specifies the amount of time a workflow can be queued up for execution. - :param on_failure flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy: [Optional] The execution policy when the workflow detects a failure. + :param flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy on_failure: [Optional] The execution policy when the workflow detects a failure. :rtype: flytekit.common.workflow.SdkWorkflow """ wf = (cls or _common_workflow.SdkWorkflow)( inputs=[v.rename_and_return_reference(k) for k, v in sorted(_six.iteritems(inputs or {}))], outputs=[v.rename_and_return_reference(k) for k, v in sorted(_six.iteritems(outputs or {}))], nodes=[v.assign_id_and_return(k) for k, v in sorted(_six.iteritems(nodes))], - metadata=_common_workflow._workflow_models.WorkflowMetadata(queuing_budget=queuing_budget) if queuing_budget else None - ) + metadata=_common_workflow._workflow_models.WorkflowMetadata(on_failure=on_failure)) return wf diff --git a/tests/flytekit/unit/models/core/test_workflow.py b/tests/flytekit/unit/models/core/test_workflow.py index c5a34ed094..8a3cb26cae 100644 --- a/tests/flytekit/unit/models/core/test_workflow.py +++ b/tests/flytekit/unit/models/core/test_workflow.py @@ -27,6 +27,7 @@ def test_alias(): assert obj2.alias == 'myalias' assert obj2.var == 'myvar' + def test_workflow_template(): task = _workflow.TaskNode(reference_id=_generic_id) nm = _get_sample_node_metadata() @@ -50,7 +51,7 @@ def test_workflow_template(): ) obj = _workflow.WorkflowTemplate( id=_generic_id, - metadata=wf_metadata, + metadata=wf_metadata, metadata_defaults=wf_metadata_defaults, interface=typed_interface, nodes=[wf_node], @@ -58,54 +59,22 @@ def test_workflow_template(): obj2 = _workflow.WorkflowTemplate.from_flyte_idl(obj.to_flyte_idl()) assert obj2 == obj -def test_workflow_template_with_queuing_budget(): - task = _workflow.TaskNode(reference_id=_generic_id) - nm = _get_sample_node_metadata() - int_type = _types.LiteralType(_types.SimpleType.INTEGER) - wf_metadata = _workflow.WorkflowMetadata(queuing_budget=timedelta(seconds=10)) - wf_metadata_defaults = _workflow.WorkflowMetadataDefaults() - typed_interface = _interface.TypedInterface( - {'a': _interface.Variable(int_type, "description1")}, - { - 'b': _interface.Variable(int_type, "description2"), - 'c': _interface.Variable(int_type, "description3") - } - ) - wf_node = _workflow.Node( - id='some:node:id', - metadata=nm, - inputs=[], - upstream_node_ids=[], - output_aliases=[], - task_node=task - ) - obj = _workflow.WorkflowTemplate( - id=_generic_id, - metadata=wf_metadata, - metadata_defaults=wf_metadata_defaults, - interface=typed_interface, - nodes=[wf_node], - outputs=[]) - obj2 = _workflow.WorkflowTemplate.from_flyte_idl(obj.to_flyte_idl()) - assert obj2 == obj - -def test_workflow_metadata_queuing_budget(): - obj = _workflow.WorkflowMetadata(queuing_budget=timedelta(seconds=10)) - obj2 = _workflow.WorkflowMetadata.from_flyte_idl(obj.to_flyte_idl()) - assert obj == obj2 def test_workflow_metadata_failure_policy(): - obj = _workflow.WorkflowMetadata(on_failure=_workflow.WorkflowMetadata.OnFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE) + obj = _workflow.WorkflowMetadata( + on_failure=_workflow.WorkflowMetadata.OnFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE) obj2 = _workflow.WorkflowMetadata.from_flyte_idl(obj.to_flyte_idl()) assert obj == obj2 assert obj.on_failure == _workflow.WorkflowMetadata.OnFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE assert obj2.on_failure == _workflow.WorkflowMetadata.OnFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE + def test_workflow_metadata(): obj = _workflow.WorkflowMetadata() obj2 = _workflow.WorkflowMetadata.from_flyte_idl(obj.to_flyte_idl()) assert obj == obj2 + def test_task_node(): obj = _workflow.TaskNode(reference_id=_generic_id) assert obj.reference_id == _generic_id diff --git a/tests/flytekit/unit/sdk/test_workflow.py b/tests/flytekit/unit/sdk/test_workflow.py index 75daae0303..eee4efda91 100644 --- a/tests/flytekit/unit/sdk/test_workflow.py +++ b/tests/flytekit/unit/sdk/test_workflow.py @@ -156,16 +156,3 @@ class sup(object): assert _get_node_by_id(sup, 'b').inputs[0].binding.promise.node_id == constants.GLOBAL_INPUT_NODE_ID assert _get_node_by_id(sup, 'b').inputs[0].binding.promise.var == 'input_2' assert _get_node_by_id(sup, 'c').inputs[0].binding.scalar.primitive.integer == 100 - -def test_workflow_queuing_budget(): - @inputs(a=Types.Integer) - @outputs(b=Types.Integer) - @python_task - def my_task(wf_params, a, b): - b.set(a + 1) - - @workflow_class(queuing_budget=datetime.timedelta(seconds=10)) - class my_workflow(object): - b = my_task(a=100) - - assert my_workflow.metadata.queuing_budget == datetime.timedelta(seconds=10)