From 5caad7fb1b66337ecc42e5a5f07fd95fa3cb34ed Mon Sep 17 00:00:00 2001 From: Haytham AbuelFutuh Date: Fri, 28 Aug 2020 15:53:35 -0700 Subject: [PATCH 1/5] Add a flag to disable default launch plan generation --- flytekit/common/workflow.py | 36 +++++++++++++------ flytekit/sdk/workflow.py | 10 ++++-- flytekit/tools/module_loader.py | 2 +- .../unit/common_tests/test_workflow.py | 14 ++++++++ 4 files changed, 47 insertions(+), 15 deletions(-) diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index 522baeb387..9a83cab1ab 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -93,15 +93,16 @@ class SdkWorkflow( ) ): def __init__( - self, - inputs, - outputs, - nodes, - id=None, - metadata=None, - metadata_defaults=None, - interface=None, - output_bindings=None, + self, + inputs, + outputs, + nodes, + id=None, + metadata=None, + metadata_defaults=None, + interface=None, + output_bindings=None, + disable_default_launch_plan=False, ): """ :param list[flytekit.common.promise.Input] inputs: @@ -117,6 +118,7 @@ def __init__( the interface field must be bound in order for the workflow to be validated. A workflow has an implicit dependency on all of its nodes to execute successfully in order to bind final outputs. + :param bool disable_default_launch_plan: Determines whether to create a default launch plan for the workflow. """ for n in nodes: @@ -164,6 +166,15 @@ def __init__( ) self._user_inputs = inputs self._upstream_entities = set(n.executable_sdk_object for n in nodes) + self._should_create_default_launch_plan = not disable_default_launch_plan + + @property + def should_create_default_launch_plan(self): + """ + Determines whether registration flow should create a default launch plan for this workflow or not. + :rtype: bool + """ + return self._should_create_default_launch_plan @property def upstream_entities(self): @@ -475,12 +486,14 @@ def _discover_workflow_components(workflow_class): return inputs, outputs, nodes -def build_sdk_workflow_from_metaclass(metaclass, on_failure=None, cls=None): +def build_sdk_workflow_from_metaclass(metaclass, on_failure=None, disable_default_launch_plan=False, cls=None): """ :param T metaclass: :param cls: This is the class that will be instantiated from the inputs, outputs, and nodes. This will be used by users extending the base Flyte programming model. If set, it must be a subclass of SdkWorkflow. - :param on_failure flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy: [Optional] The execution policy when the workflow detects a failure. + :param flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy on_failure: [Optional] The execution policy + when the workflow detects a failure. + :param bool disable_default_launch_plan: Determines whether to create a default launch plan for the workflow or not. :rtype: SdkWorkflow """ inputs, outputs, nodes = _discover_workflow_components(metaclass) @@ -490,4 +503,5 @@ def build_sdk_workflow_from_metaclass(metaclass, on_failure=None, cls=None): outputs=[o for o in sorted(outputs, key=lambda x: x.name)], nodes=[n for n in sorted(nodes, key=lambda x: x.id)], metadata=metadata, + disable_default_launch_plan=disable_default_launch_plan, ) diff --git a/flytekit/sdk/workflow.py b/flytekit/sdk/workflow.py index 5fc1539ee7..223b63081b 100644 --- a/flytekit/sdk/workflow.py +++ b/flytekit/sdk/workflow.py @@ -42,7 +42,7 @@ def __init__(self, value, sdk_type=None, help=None): ) -def workflow_class(_workflow_metaclass=None, cls=None, on_failure=None): +def workflow_class(_workflow_metaclass=None, on_failure=None, disable_default_launch_plan=False, cls=None): """ This is a decorator for wrapping class definitions into workflows. @@ -62,12 +62,16 @@ class MyWorkflow(object): :param cls: This is the class that will be instantiated from the inputs, outputs, and nodes. This will be used by users extending the base Flyte programming model. If set, it must be a subclass of :py:class:`flytekit.common.workflow.SdkWorkflow`. - :param on_failure flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy: [Optional] The execution policy when the workflow detects a failure. + :param flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy on_failure: [Optional] The execution policy + when the workflow detects a failure. + :param bool disable_default_launch_plan: Determines whether to create a default launch plan for the workflow or not. + :rtype: flytekit.common.workflow.SdkWorkflow """ def wrapper(metaclass): - wf = _common_workflow.build_sdk_workflow_from_metaclass(metaclass, cls=cls, on_failure=on_failure) + wf = _common_workflow.build_sdk_workflow_from_metaclass( + metaclass, on_failure=on_failure, disable_default_launch_plan=disable_default_launch_plan, cls=cls) return wf if _workflow_metaclass is not None: diff --git a/flytekit/tools/module_loader.py b/flytekit/tools/module_loader.py index 59e8d0ce1a..a0e7a4349b 100644 --- a/flytekit/tools/module_loader.py +++ b/flytekit/tools/module_loader.py @@ -110,7 +110,7 @@ def iterate_registerable_entities_in_order( if isinstance(o, _registerable.RegisterableEntity): if o.instantiated_in == m.__name__: entity_to_module_key[o] = (m, k) - if isinstance(o, _SdkWorkflow): + if isinstance(o, _SdkWorkflow) and o.should_create_launch_plan: # SDK should create a default launch plan for a workflow. This is a special-case to simplify # authoring of workflows. entity_to_module_key[o.create_launch_plan()] = (m, k) diff --git a/tests/flytekit/unit/common_tests/test_workflow.py b/tests/flytekit/unit/common_tests/test_workflow.py index 53e1b14ade..e9ce2a1c40 100644 --- a/tests/flytekit/unit/common_tests/test_workflow.py +++ b/tests/flytekit/unit/common_tests/test_workflow.py @@ -137,6 +137,8 @@ class my_workflow(object): my_workflow, on_failure=_workflow_models.WorkflowMetadata.OnFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE, ) + assert w.should_create_default_launch_plan == True + assert w.interface.inputs["input_1"].type == primitives.Integer.to_flyte_literal_type() assert w.interface.inputs["input_2"].type == primitives.Integer.to_flyte_literal_type() assert w.nodes[0].inputs[0].var == "a" @@ -353,3 +355,15 @@ def my_list_task(wf_params, a, b): assert len(serialized.template.nodes) == 6 assert len(serialized.template.interface.inputs.variables.keys()) == 2 assert len(serialized.template.interface.outputs.variables.keys()) == 2 + + +def test_workflow_disable_default_launch_plan(): + class MyWorkflow(object): + input_1 = promise.Input("input_1", primitives.Integer) + input_2 = promise.Input("input_2", primitives.Integer, default=5, help="Not required.") + + w = workflow.build_sdk_workflow_from_metaclass( + MyWorkflow, disable_default_launch_plan=True, + ) + + assert w.should_create_default_launch_plan == False From 5c75f307c95d995c6443df7f4f150ede38225550 Mon Sep 17 00:00:00 2001 From: Haytham AbuelFutuh Date: Tue, 1 Sep 2020 14:41:20 -0700 Subject: [PATCH 2/5] Bump version --- flytekit/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/__init__.py b/flytekit/__init__.py index 2991e1334a..afd8c1729b 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -2,4 +2,4 @@ import flytekit.plugins # noqa: F401 -__version__ = "0.12.4" +__version__ = "0.12.5" From 27f1a91916fe75cc001d2517ee3b5b7863092799 Mon Sep 17 00:00:00 2001 From: Haytham AbuelFutuh Date: Tue, 1 Sep 2020 16:33:50 -0700 Subject: [PATCH 3/5] lint --- tests/flytekit/unit/common_tests/test_workflow.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/tests/flytekit/unit/common_tests/test_workflow.py b/tests/flytekit/unit/common_tests/test_workflow.py index e9ce2a1c40..2c2e55a9b5 100644 --- a/tests/flytekit/unit/common_tests/test_workflow.py +++ b/tests/flytekit/unit/common_tests/test_workflow.py @@ -137,7 +137,7 @@ class my_workflow(object): my_workflow, on_failure=_workflow_models.WorkflowMetadata.OnFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE, ) - assert w.should_create_default_launch_plan == True + assert w.should_create_default_launch_plan is True assert w.interface.inputs["input_1"].type == primitives.Integer.to_flyte_literal_type() assert w.interface.inputs["input_2"].type == primitives.Integer.to_flyte_literal_type() @@ -362,8 +362,6 @@ class MyWorkflow(object): input_1 = promise.Input("input_1", primitives.Integer) input_2 = promise.Input("input_2", primitives.Integer, default=5, help="Not required.") - w = workflow.build_sdk_workflow_from_metaclass( - MyWorkflow, disable_default_launch_plan=True, - ) + w = workflow.build_sdk_workflow_from_metaclass(MyWorkflow, disable_default_launch_plan=True,) - assert w.should_create_default_launch_plan == False + assert w.should_create_default_launch_plan is False From 85386920d61234e194b6fca516212cfc0b57dfc9 Mon Sep 17 00:00:00 2001 From: Haytham AbuelFutuh Date: Wed, 2 Sep 2020 11:11:18 -0700 Subject: [PATCH 4/5] typo --- flytekit/tools/module_loader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flytekit/tools/module_loader.py b/flytekit/tools/module_loader.py index a0e7a4349b..d852bf57b7 100644 --- a/flytekit/tools/module_loader.py +++ b/flytekit/tools/module_loader.py @@ -110,7 +110,7 @@ def iterate_registerable_entities_in_order( if isinstance(o, _registerable.RegisterableEntity): if o.instantiated_in == m.__name__: entity_to_module_key[o] = (m, k) - if isinstance(o, _SdkWorkflow) and o.should_create_launch_plan: + if isinstance(o, _SdkWorkflow) and o.should_create_default_launch_plan: # SDK should create a default launch plan for a workflow. This is a special-case to simplify # authoring of workflows. entity_to_module_key[o.create_launch_plan()] = (m, k) From 9fb37a86ce70fdf3733e04919b8b2f978bb75723 Mon Sep 17 00:00:00 2001 From: Haytham AbuelFutuh Date: Wed, 2 Sep 2020 11:23:14 -0700 Subject: [PATCH 5/5] lint --- flytekit/common/workflow.py | 20 ++++++++++---------- flytekit/sdk/workflow.py | 3 ++- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/flytekit/common/workflow.py b/flytekit/common/workflow.py index 9a83cab1ab..4035c97060 100644 --- a/flytekit/common/workflow.py +++ b/flytekit/common/workflow.py @@ -93,16 +93,16 @@ class SdkWorkflow( ) ): def __init__( - self, - inputs, - outputs, - nodes, - id=None, - metadata=None, - metadata_defaults=None, - interface=None, - output_bindings=None, - disable_default_launch_plan=False, + self, + inputs, + outputs, + nodes, + id=None, + metadata=None, + metadata_defaults=None, + interface=None, + output_bindings=None, + disable_default_launch_plan=False, ): """ :param list[flytekit.common.promise.Input] inputs: diff --git a/flytekit/sdk/workflow.py b/flytekit/sdk/workflow.py index 223b63081b..48287d3f2a 100644 --- a/flytekit/sdk/workflow.py +++ b/flytekit/sdk/workflow.py @@ -71,7 +71,8 @@ class MyWorkflow(object): def wrapper(metaclass): wf = _common_workflow.build_sdk_workflow_from_metaclass( - metaclass, on_failure=on_failure, disable_default_launch_plan=disable_default_launch_plan, cls=cls) + metaclass, on_failure=on_failure, disable_default_launch_plan=disable_default_launch_plan, cls=cls + ) return wf if _workflow_metaclass is not None: