Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug; Queuing budget no longer exists. Quality of service is not yet integrated #135

Merged
merged 1 commit into from
Jul 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

import flytekit.plugins

__version__ = '0.10.1'
__version__ = '0.10.2'
5 changes: 2 additions & 3 deletions flytekit/common/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,17 +455,16 @@ def _discover_workflow_components(workflow_class):
return inputs, outputs, nodes


def build_sdk_workflow_from_metaclass(metaclass, queuing_budget=None, on_failure=None, cls=None):
def build_sdk_workflow_from_metaclass(metaclass, on_failure=None, cls=None):
"""
:param T metaclass:
:param cls: This is the class that will be instantiated from the inputs, outputs, and nodes. This will be used
by users extending the base Flyte programming model. If set, it must be a subclass of SdkWorkflow.
:param queuing_budget datetime.timedelta: [Optional] Budget that specifies the amount of time a workflow can be queued up for execution.
:param on_failure flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy: [Optional] The execution policy when the workflow detects a failure.
:rtype: SdkWorkflow
"""
inputs, outputs, nodes = _discover_workflow_components(metaclass)
metadata = _workflow_models.WorkflowMetadata(queuing_budget=queuing_budget if queuing_budget else None, on_failure=on_failure if on_failure else None)
metadata = _workflow_models.WorkflowMetadata(on_failure=on_failure if on_failure else None)
return (cls or SdkWorkflow)(
inputs=[i for i in sorted(inputs, key=lambda x: x.name)],
outputs=[o for o in sorted(outputs, key=lambda x: x.name)],
Expand Down
15 changes: 2 additions & 13 deletions flytekit/models/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,23 +465,14 @@ class OnFailurePolicy(object):
FAIL_IMMEDIATELY = _core_workflow.WorkflowMetadata.FAIL_IMMEDIATELY
FAIL_AFTER_EXECUTABLE_NODES_COMPLETE = _core_workflow.WorkflowMetadata.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE

def __init__(self, queuing_budget=None, on_failure=None):
def __init__(self, on_failure=None):
"""
Metadata for the workflow.

:param queuing_budget datetime.timedelta: [Optional] Budget that specifies the amount of time a workflow can be queued up for execution.
:param on_failure flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy: [Optional] The execution policy when the workflow detects a failure.
"""
self._queuing_budget = queuing_budget
self._on_failure = on_failure

@property
def queuing_budget(self):
"""
:rtype: datetime.timedelta
"""
return self._queuing_budget

@property
def on_failure(self):
"""
Expand All @@ -494,8 +485,6 @@ def to_flyte_idl(self):
:rtype: flyteidl.core.workflow_pb2.WorkflowMetadata
"""
workflow_metadata = _core_workflow.WorkflowMetadata()
if self._queuing_budget:
workflow_metadata.queuing_budget.FromTimedelta(self.queuing_budget)
if self.on_failure:
workflow_metadata.on_failure = self.on_failure
return workflow_metadata
Expand All @@ -507,10 +496,10 @@ def from_flyte_idl(cls, pb2_object):
:rtype: WorkflowMetadata
"""
return cls(
queuing_budget=pb2_object.queuing_budget.ToTimedelta() if pb2_object.queuing_budget else None,
on_failure=pb2_object.on_failure if pb2_object.on_failure else WorkflowMetadata.OnFailurePolicy.FAIL_IMMEDIATELY
)


class WorkflowMetadataDefaults(_common.FlyteIdlEntity):

def __init__(self, interruptible=None):
Expand Down
17 changes: 8 additions & 9 deletions flytekit/sdk/workflow.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
from __future__ import absolute_import

import six as _six

from flytekit.common import workflow as _common_workflow, promise as _promise
from flytekit.common.types import helpers as _type_helpers
import six as _six


class Input(_promise.Input):
Expand Down Expand Up @@ -42,7 +44,7 @@ def __init__(self, value, sdk_type=None, help=None):
)


def workflow_class(_workflow_metaclass=None, cls=None, queuing_budget=None, on_failure=None):
def workflow_class(_workflow_metaclass=None, cls=None, on_failure=None):
"""
This is a decorator for wrapping class definitions into workflows.

Expand All @@ -62,21 +64,20 @@ class MyWorkflow(object):
:param cls: This is the class that will be instantiated from the inputs, outputs, and nodes. This will be used
by users extending the base Flyte programming model. If set, it must be a subclass of
:py:class:`flytekit.common.workflow.SdkWorkflow`.
:param queuing_budget datetime.timedelta: [Optional] Budget that specifies the amount of time a workflow can be queued up for execution.
:param on_failure flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy: [Optional] The execution policy when the workflow detects a failure.
:rtype: flytekit.common.workflow.SdkWorkflow
"""

def wrapper(metaclass):
wf = _common_workflow.build_sdk_workflow_from_metaclass(metaclass, cls=cls, queuing_budget=queuing_budget, on_failure=on_failure)
wf = _common_workflow.build_sdk_workflow_from_metaclass(metaclass, cls=cls, on_failure=on_failure)
return wf

if _workflow_metaclass is not None:
return wrapper(_workflow_metaclass)
return wrapper


def workflow(nodes, inputs=None, outputs=None, cls=None, queuing_budget=None, on_failure=None):
def workflow(nodes, inputs=None, outputs=None, cls=None, on_failure=None):
"""
This function provides a user-friendly interface for authoring workflows.

Expand Down Expand Up @@ -109,14 +110,12 @@ def workflow(nodes, inputs=None, outputs=None, cls=None, queuing_budget=None, on
:param T cls: This is the class that will be instantiated from the inputs, outputs, and nodes. This will be used
by users extending the base Flyte programming model. If set, it must be a subclass of
:py:class:`flytekit.common.workflow.SdkWorkflow`.
:param queuing_budget datetime.timedelta: [Optional] Budget that specifies the amount of time a workflow can be queued up for execution.
:param on_failure flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy: [Optional] The execution policy when the workflow detects a failure.
:param flytekit.models.core.workflow.WorkflowMetadata.OnFailurePolicy on_failure: [Optional] The execution policy when the workflow detects a failure.
:rtype: flytekit.common.workflow.SdkWorkflow
"""
wf = (cls or _common_workflow.SdkWorkflow)(
inputs=[v.rename_and_return_reference(k) for k, v in sorted(_six.iteritems(inputs or {}))],
outputs=[v.rename_and_return_reference(k) for k, v in sorted(_six.iteritems(outputs or {}))],
nodes=[v.assign_id_and_return(k) for k, v in sorted(_six.iteritems(nodes))],
metadata=_common_workflow._workflow_models.WorkflowMetadata(queuing_budget=queuing_budget) if queuing_budget else None
)
metadata=_common_workflow._workflow_models.WorkflowMetadata(on_failure=on_failure))
return wf
43 changes: 6 additions & 37 deletions tests/flytekit/unit/models/core/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def test_alias():
assert obj2.alias == 'myalias'
assert obj2.var == 'myvar'


def test_workflow_template():
task = _workflow.TaskNode(reference_id=_generic_id)
nm = _get_sample_node_metadata()
Expand All @@ -50,62 +51,30 @@ def test_workflow_template():
)
obj = _workflow.WorkflowTemplate(
id=_generic_id,
metadata=wf_metadata,
metadata=wf_metadata,
metadata_defaults=wf_metadata_defaults,
interface=typed_interface,
nodes=[wf_node],
outputs=[])
obj2 = _workflow.WorkflowTemplate.from_flyte_idl(obj.to_flyte_idl())
assert obj2 == obj

def test_workflow_template_with_queuing_budget():
task = _workflow.TaskNode(reference_id=_generic_id)
nm = _get_sample_node_metadata()
int_type = _types.LiteralType(_types.SimpleType.INTEGER)
wf_metadata = _workflow.WorkflowMetadata(queuing_budget=timedelta(seconds=10))
wf_metadata_defaults = _workflow.WorkflowMetadataDefaults()
typed_interface = _interface.TypedInterface(
{'a': _interface.Variable(int_type, "description1")},
{
'b': _interface.Variable(int_type, "description2"),
'c': _interface.Variable(int_type, "description3")
}
)
wf_node = _workflow.Node(
id='some:node:id',
metadata=nm,
inputs=[],
upstream_node_ids=[],
output_aliases=[],
task_node=task
)
obj = _workflow.WorkflowTemplate(
id=_generic_id,
metadata=wf_metadata,
metadata_defaults=wf_metadata_defaults,
interface=typed_interface,
nodes=[wf_node],
outputs=[])
obj2 = _workflow.WorkflowTemplate.from_flyte_idl(obj.to_flyte_idl())
assert obj2 == obj

def test_workflow_metadata_queuing_budget():
obj = _workflow.WorkflowMetadata(queuing_budget=timedelta(seconds=10))
obj2 = _workflow.WorkflowMetadata.from_flyte_idl(obj.to_flyte_idl())
assert obj == obj2

def test_workflow_metadata_failure_policy():
obj = _workflow.WorkflowMetadata(on_failure=_workflow.WorkflowMetadata.OnFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
obj = _workflow.WorkflowMetadata(
on_failure=_workflow.WorkflowMetadata.OnFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
obj2 = _workflow.WorkflowMetadata.from_flyte_idl(obj.to_flyte_idl())
assert obj == obj2
assert obj.on_failure == _workflow.WorkflowMetadata.OnFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE
assert obj2.on_failure == _workflow.WorkflowMetadata.OnFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE


def test_workflow_metadata():
obj = _workflow.WorkflowMetadata()
obj2 = _workflow.WorkflowMetadata.from_flyte_idl(obj.to_flyte_idl())
assert obj == obj2


def test_task_node():
obj = _workflow.TaskNode(reference_id=_generic_id)
assert obj.reference_id == _generic_id
Expand Down
13 changes: 0 additions & 13 deletions tests/flytekit/unit/sdk/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,3 @@ class sup(object):
assert _get_node_by_id(sup, 'b').inputs[0].binding.promise.node_id == constants.GLOBAL_INPUT_NODE_ID
assert _get_node_by_id(sup, 'b').inputs[0].binding.promise.var == 'input_2'
assert _get_node_by_id(sup, 'c').inputs[0].binding.scalar.primitive.integer == 100

def test_workflow_queuing_budget():
@inputs(a=Types.Integer)
@outputs(b=Types.Integer)
@python_task
def my_task(wf_params, a, b):
b.set(a + 1)

@workflow_class(queuing_budget=datetime.timedelta(seconds=10))
class my_workflow(object):
b = my_task(a=100)

assert my_workflow.metadata.queuing_budget == datetime.timedelta(seconds=10)