Skip to content

Commit

Permalink
Add dynamic sidecar tasks (#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeevb authored Aug 12, 2020
1 parent 74a8163 commit f451cc5
Show file tree
Hide file tree
Showing 6 changed files with 413 additions and 53 deletions.
2 changes: 1 addition & 1 deletion flytekit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@

import flytekit.plugins

__version__ = '0.11.5'
__version__ = '0.11.6'
118 changes: 67 additions & 51 deletions flytekit/common/tasks/sdk_dynamic.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,64 +51,19 @@ def _append_node(generated_files, node, nodes, sub_task_node):
sub_task_node.inputs})


class SdkDynamicTask(_six.with_metaclass(_sdk_bases.ExtendedSdkType, _sdk_runnable.SdkRunnableTask)):
class SdkDynamicTaskMixin(object):

"""
This class includes the additional logic for building a task that executes parent-child tasks in Python code. It
has even more validation checks to ensure proper behavior than it's superclasses.
This mixin implements logic for building a task that executes
parent-child tasks in Python code.
Since an SdkDynamicTask is assumed to run by hooking into Python code, we will provide additional shortcuts and
methods on this object.
"""

def __init__(
self,
task_function,
task_type,
discovery_version,
retries,
interruptible,
deprecated,
storage_request,
cpu_request,
gpu_request,
memory_request,
storage_limit,
cpu_limit,
gpu_limit,
memory_limit,
discoverable,
timeout,
allowed_failure_ratio,
max_concurrency,
environment,
custom
):
def __init__(self, allowed_failure_ratio, max_concurrency):
"""
:param task_function: Function container user code. This will be executed via the SDK's engine.
:param Text task_type: string describing the task type
:param Text discovery_version: string describing the version for task discovery purposes
:param int retries: Number of retries to attempt
:param bool interruptible: Whether or not task is interruptible
:param Text deprecated:
:param Text storage_request:
:param Text cpu_request:
:param Text gpu_request:
:param Text memory_request:
:param Text storage_limit:
:param Text cpu_limit:
:param Text gpu_limit:
:param Text memory_limit:
:param bool discoverable:
:param datetime.timedelta timeout:
:param float allowed_failure_ratio:
:param int max_concurrency:
:param dict[Text, Text] environment:
:param dict[Text, T] custom:
"""
super(SdkDynamicTask, self).__init__(
task_function, task_type, discovery_version, retries, interruptible, deprecated,
storage_request, cpu_request, gpu_request, memory_request, storage_limit,
cpu_limit, gpu_limit, memory_limit, discoverable, timeout, environment, custom)

# These will only appear in the generated futures
self._allowed_failure_ratio = allowed_failure_ratio
Expand Down Expand Up @@ -166,7 +121,7 @@ def _produce_dynamic_job_spec(self, context, inputs):
# before calling user code
inputs_dict.update(outputs_dict)
yielded_sub_tasks = [sub_task for sub_task in
super(SdkDynamicTask, self)._execute_user_code(context, inputs_dict) or []]
self._execute_user_code(context, inputs_dict) or []]

upstream_nodes = list()
output_bindings = [_literal_models.Binding(var=name, binding=_interface.BindingData.from_python_std(
Expand Down Expand Up @@ -305,3 +260,64 @@ def execute(self, context, inputs):
})

return generated_files


class SdkDynamicTask(SdkDynamicTaskMixin, _sdk_runnable.SdkRunnableTask, metaclass=_sdk_bases.ExtendedSdkType):

"""
This class includes the additional logic for building a task that executes
parent-child tasks in Python code.
"""

def __init__(
self,
task_function,
task_type,
discovery_version,
retries,
interruptible,
deprecated,
storage_request,
cpu_request,
gpu_request,
memory_request,
storage_limit,
cpu_limit,
gpu_limit,
memory_limit,
discoverable,
timeout,
allowed_failure_ratio,
max_concurrency,
environment,
custom
):
"""
:param task_function: Function container user code. This will be executed via the SDK's engine.
:param Text task_type: string describing the task type
:param Text discovery_version: string describing the version for task discovery purposes
:param int retries: Number of retries to attempt
:param bool interruptible: Whether or not task is interruptible
:param Text deprecated:
:param Text storage_request:
:param Text cpu_request:
:param Text gpu_request:
:param Text memory_request:
:param Text storage_limit:
:param Text cpu_limit:
:param Text gpu_limit:
:param Text memory_limit:
:param bool discoverable:
:param datetime.timedelta timeout:
:param float allowed_failure_ratio:
:param int max_concurrency:
:param dict[Text, Text] environment:
:param dict[Text, T] custom:
"""
_sdk_runnable.SdkRunnableTask.__init__(
self, task_function, task_type, discovery_version, retries, interruptible, deprecated,
storage_request, cpu_request, gpu_request, memory_request, storage_limit,
cpu_limit, gpu_limit, memory_limit, discoverable, timeout, environment, custom)

SdkDynamicTaskMixin.__init__(self, allowed_failure_ratio, max_concurrency)
84 changes: 83 additions & 1 deletion flytekit/common/tasks/sidecar_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from flyteidl.core import tasks_pb2 as _core_task

from flytekit.common.exceptions import user as _user_exceptions
from flytekit.common.tasks import sdk_dynamic as _sdk_dynamic
from flytekit.common.tasks import sdk_runnable as _sdk_runnable
from flytekit.common import sdk_bases as _sdk_bases

Expand All @@ -14,7 +15,7 @@
from flytekit.plugins import k8s as _lazy_k8s


class SdkSidecarTask(_six.with_metaclass(_sdk_bases.ExtendedSdkType, _sdk_runnable.SdkRunnableTask)):
class SdkSidecarTask(_sdk_runnable.SdkRunnableTask, metaclass=_sdk_bases.ExtendedSdkType):

"""
This class includes the additional logic for building a task that executes as a Sidecar Job.
Expand Down Expand Up @@ -138,3 +139,84 @@ def reconcile_partial_pod_spec_and_task(self,
).to_flyte_idl()

self.assign_custom_and_return(_MessageToDict(sidecar_job_plugin))


class SdkDynamicSidecarTask(_sdk_dynamic.SdkDynamicTaskMixin, SdkSidecarTask, metaclass=_sdk_bases.ExtendedSdkType):

"""
This class includes the additional logic for building a task that runs as
a Sidecar Job and executes parent-child tasks.
"""

def __init__(self,
task_function,
task_type,
discovery_version,
retries,
interruptible,
deprecated,
storage_request,
cpu_request,
gpu_request,
memory_request,
storage_limit,
cpu_limit,
gpu_limit,
memory_limit,
discoverable,
timeout,
allowed_failure_ratio,
max_concurrency,
environment,
pod_spec=None,
primary_container_name=None):
"""
:param task_function: Function container user code. This will be executed via the SDK's engine.
:param Text task_type: string describing the task type
:param Text discovery_version: string describing the version for task discovery purposes
:param int retries: Number of retries to attempt
:param bool interruptible: Whether or not task is interruptible
:param Text deprecated:
:param Text storage_request:
:param Text cpu_request:
:param Text gpu_request:
:param Text memory_request:
:param Text storage_limit:
:param Text cpu_limit:
:param Text gpu_limit:
:param Text memory_limit:
:param bool discoverable:
:param datetime.timedelta timeout:
:param float allowed_failure_ratio:
:param int max_concurrency:
:param dict[Text, Text] environment:
:param generated_pb2.PodSpec pod_spec:
:param Text primary_container_name:
:raises: flytekit.common.exceptions.user.FlyteValidationException
"""

SdkSidecarTask.__init__(
self,
task_function,
task_type,
discovery_version,
retries,
interruptible,
deprecated,
storage_request,
cpu_request,
gpu_request,
memory_request,
storage_limit,
cpu_limit,
gpu_limit,
memory_limit,
discoverable,
timeout,
environment,
pod_spec=pod_spec,
primary_container_name=primary_container_name
)

_sdk_dynamic.SdkDynamicTaskMixin.__init__(self, allowed_failure_ratio, max_concurrency)
Loading

0 comments on commit f451cc5

Please sign in to comment.