Skip to content

Commit

Permalink
middle
Browse files Browse the repository at this point in the history
Signed-off-by: wild-endeavor <[email protected]>

rename sqlalchemy

entrypoint flyte task

Signed-off-by: wild-endeavor <[email protected]>

init

Signed-off-by: wild-endeavor <[email protected]>

bump

Signed-off-by: wild-endeavor <[email protected]>
  • Loading branch information
wild-endeavor committed Apr 29, 2021
1 parent 371f386 commit 760da2b
Show file tree
Hide file tree
Showing 14 changed files with 342 additions and 217 deletions.
5 changes: 2 additions & 3 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from flytekit.configuration import internal as _internal_config
from flytekit.configuration import platform as _platform_config
from flytekit.configuration import sdk as _sdk_config
from flytekit.control_plane.tasks.task import FlyteTask
from flytekit.core.base_task import IgnoreOutputs, PythonTask, TaskResolverMixin
from flytekit.core.context_manager import ExecutionState, FlyteContext, SerializationSettings, get_image_config
from flytekit.core.map_task import MapPythonTask
Expand All @@ -34,7 +35,6 @@
from flytekit.interfaces.stats.taggable import get_stats as _get_stats
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import literals as _literal_models
from flytekit.models import task as task_models
from flytekit.models.core import errors as _error_models
from flytekit.models.core import identifier as _identifier
from flytekit.tools.fast_registration import download_distribution as _download_distribution
Expand Down Expand Up @@ -96,12 +96,11 @@ def _dispatch_execute(
ctx.file_access.get_data(inputs_path, local_inputs_file)
input_proto = _utils.load_proto_from_file(_literals_pb2.LiteralMap, local_inputs_file)
idl_input_literals = _literal_models.LiteralMap.from_flyte_idl(input_proto)
from flytekit.core.python_third_party_task import ExecutorTask

# Step2
if isinstance(task_def, PythonTask):
outputs = task_def.dispatch_execute(ctx, idl_input_literals)
elif isinstance(task_def, ExecutorTask):
elif isinstance(task_def, FlyteTask):
outputs = task_def.dispatch_execute(ctx, idl_input_literals)
else:
raise Exception("Task def was neither PythonTask nor TaskTemplate")
Expand Down
Empty file.
109 changes: 109 additions & 0 deletions flytekit/control_plane/tasks/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from __future__ import annotations

from typing import Any, Generic, TypeVar, Union

from flytekit.common.tasks.sdk_runnable import ExecutionParameters
from flytekit.core.context_manager import FlyteContext
from flytekit.core.tracker import TrackedInstance
from flytekit.core.type_engine import TypeEngine
from flytekit.loggers import logger
from flytekit.models import dynamic_job as _dynamic_job
from flytekit.models import literals as _literal_models
from flytekit.models import task as _task_model

T = TypeVar("T")


class FlyteTaskExecutor(TrackedInstance, Generic[T]):
@classmethod
def execute_from_model(cls, tt: _task_model.TaskTemplate, **kwargs) -> Any:
raise NotImplementedError

@classmethod
def pre_execute(cls, user_params: ExecutionParameters) -> ExecutionParameters:
"""
This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask.
"""
return user_params

@classmethod
def post_execute(cls, user_params: ExecutionParameters, rval: Any) -> Any:
"""
This function is a stub, just here to keep dispatch_execute compatibility between this class and PythonTask.
"""
return rval

@classmethod
def dispatch_execute(
cls, ctx: FlyteContext, tt: _task_model.TaskTemplate, input_literal_map: _literal_models.LiteralMap
) -> Union[_literal_models.LiteralMap, _dynamic_job.DynamicJobSpec]:
"""
This function is copied from PythonTask.dispatch_execute. Will need to make it a mixin and refactor in the
future.
"""

# Invoked before the task is executed
new_user_params = cls.pre_execute(ctx.user_space_params)

# Create another execution context with the new user params, but let's keep the same working dir
with ctx.new_execution_context(
mode=ctx.execution_state.mode,
execution_params=new_user_params,
working_dir=ctx.execution_state.working_dir,
) as exec_ctx:
# Added: Have to reverse the Python interface from the task template Flyte interface
# This will be moved into the FlyteTask promote logic instead
guessed_python_input_types = TypeEngine.guess_python_types(tt.interface.inputs)
native_inputs = TypeEngine.literal_map_to_kwargs(exec_ctx, input_literal_map, guessed_python_input_types)

logger.info(f"Invoking FlyteTask executor {tt.id.name} with inputs: {native_inputs}")
try:
native_outputs = cls.execute_from_model(tt, **native_inputs)
except Exception as e:
logger.exception(f"Exception when executing {e}")
raise e

logger.info(f"Task executed successfully in user level, outputs: {native_outputs}")
# Lets run the post_execute method. This may result in a IgnoreOutputs Exception, which is
# bubbled up to be handled at the callee layer.
native_outputs = cls.post_execute(new_user_params, native_outputs)

# Short circuit the translation to literal map because what's returned may be a dj spec (or an
# already-constructed LiteralMap if the dynamic task was a no-op), not python native values
if isinstance(native_outputs, _literal_models.LiteralMap) or isinstance(
native_outputs, _dynamic_job.DynamicJobSpec
):
return native_outputs

expected_output_names = list(tt.interface.outputs.keys())
if len(expected_output_names) == 1:
# Here we have to handle the fact that the task could've been declared with a typing.NamedTuple of
# length one. That convention is used for naming outputs - and single-length-NamedTuples are
# particularly troublesome but elegant handling of them is not a high priority
# Again, we're using the output_tuple_name as a proxy.
# Deleted some stuff
native_outputs_as_map = {expected_output_names[0]: native_outputs}
elif len(expected_output_names) == 0:
native_outputs_as_map = {}
else:
native_outputs_as_map = {
expected_output_names[i]: native_outputs[i] for i, _ in enumerate(native_outputs)
}

# We manually construct a LiteralMap here because task inputs and outputs actually violate the assumption
# built into the IDL that all the values of a literal map are of the same type.
literals = {}
for k, v in native_outputs_as_map.items():
literal_type = tt.interface.outputs[k].type
py_type = type(v)

if isinstance(v, tuple):
raise AssertionError(f"Output({k}) in task{tt.id.name} received a tuple {v}, instead of {py_type}")
try:
literals[k] = TypeEngine.to_literal(exec_ctx, v, py_type, literal_type)
except Exception as e:
raise AssertionError(f"failed to convert return value for var {k}") from e

outputs_literal_map = _literal_models.LiteralMap(literals=literals)
# After the execute has been successfully completed
return outputs_literal_map
49 changes: 47 additions & 2 deletions flytekit/control_plane/tasks/task.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,35 @@
from typing import Any, Optional, Union

from flytekit.common.exceptions import scopes as _exception_scopes
from flytekit.common.exceptions import user as _user_exceptions
from flytekit.common.mixins import hash as _hash_mixin
from flytekit.control_plane import identifier as _identifier
from flytekit.control_plane import interface as _interfaces
from flytekit.core.base_task import ExecutableTaskMixin
from flytekit.core.context_manager import FlyteContext
from flytekit.core.task_executor import FlyteTaskExecutor
from flytekit.engines.flyte import engine as _flyte_engine
from flytekit.models import common as _common_model
from flytekit.models import dynamic_job as dynamic_job
from flytekit.models import literals as literal_models
from flytekit.models import task as _task_model
from flytekit.models.admin import common as _admin_common
from flytekit.models.core import identifier as _identifier_model


class FlyteTask(_hash_mixin.HashOnReferenceMixin, _task_model.TaskTemplate):
def __init__(self, id, type, metadata, interface, custom, container=None, task_type_version=0, config=None):
class FlyteTask(ExecutableTaskMixin, _hash_mixin.HashOnReferenceMixin, _task_model.TaskTemplate):
def __init__(
self,
id,
type,
metadata,
interface,
custom,
container=None,
task_type_version=0,
config=None,
executor: Optional[FlyteTaskExecutor] = None,
):
super(FlyteTask, self).__init__(
id,
type,
Expand All @@ -22,6 +41,12 @@ def __init__(self, id, type, metadata, interface, custom, container=None, task_t
config=config,
)

self._executor = executor

@property
def executor(self) -> Optional[FlyteTaskExecutor]:
return self._executor

@property
def interface(self) -> _interfaces.TypedInterface:
return super(FlyteTask, self).interface
Expand Down Expand Up @@ -93,3 +118,23 @@ def fetch_latest(cls, project: str, domain: str, name: str) -> "FlyteTask":
flyte_task = cls.promote_from_model(admin_task.closure.compiled_task.template)
flyte_task._id = admin_task.id
return flyte_task

def execute(self, **kwargs) -> Any:
"""
This function directs execute to the executor instead of attempting to run itself.
"""
if self.executor is None:
raise ValueError(f"Cannot execute without an executor")

return self.executor.execute_from_model(self, **kwargs)

def dispatch_execute(
self, ctx: FlyteContext, input_literal_map: literal_models.LiteralMap
) -> Union[literal_models.LiteralMap, dynamic_job.DynamicJobSpec]:
"""
This function directs execute to the executor instead of attempting to run itself.
"""
if self.executor is None:
raise ValueError(f"Cannot run dispatch_execute without an executor")

return self.executor.dispatch_execute(ctx, self, input_literal_map)
109 changes: 50 additions & 59 deletions flytekit/core/base_task.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import collections
import datetime
from abc import abstractmethod
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, Generic, List, Optional, Tuple, Type, TypeVar, Union

Expand All @@ -21,6 +21,7 @@
create_task_output,
translate_inputs_to_literals,
)
from flytekit.core.tracked_abc import FlyteTrackedABC
from flytekit.core.tracker import TrackedInstance
from flytekit.core.type_engine import TypeEngine
from flytekit.loggers import logger
Expand Down Expand Up @@ -110,7 +111,52 @@ class IgnoreOutputs(Exception):
pass


class Task(object):
class ExecutableTaskMixin(object):
def pre_execute(self, user_params: ExecutionParameters) -> ExecutionParameters:
"""
This is the method that will be invoked directly before executing the task method and before all the inputs
are converted. One particular case where this is useful is if the context is to be modified for the user process
to get some user space parameters. This also ensures that things like SparkSession are already correctly
setup before the type transformers are called
This should return either the same context of the mutated context
"""
return user_params

def post_execute(self, user_params: ExecutionParameters, rval: Any) -> Any:
"""
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
or alter the outputs to match the intended tasks outputs. If not overriden, then this function is a No-op
Args:
rval is returned value from call to execute
user_params: are the modified user params as created during the pre_execute step
"""
return rval

@abstractmethod
def dispatch_execute(
self, ctx: FlyteContext, input_literal_map: _literal_models.LiteralMap
) -> Union[_literal_models.LiteralMap, _dynamic_job.DynamicJobSpec]:
"""
This method translates Flyte's Type system based input values and invokes the actual call to the executor
This method is also invoked during runtime. It takes in a Flyte LiteralMap and returns a Flyte LiteralMap.
This should call execute below.
To support dynamic tasks, it can optionally return a DynamicJobSpec instead.
"""
raise NotImplementedError

@abstractmethod
def execute(self, **kwargs) -> Any:
"""
This function should take in Python native kwargs and return Python native values. This should be called by
dispatch execute.
"""
raise NotImplementedError


class Task(ExecutableTaskMixin, ABC):
"""
The base of all Tasks in flytekit. This task is closest to the FlyteIDL TaskTemplate and captures information in
FlyteIDL specification and does not have python native interfaces associated. For any real extension please
Expand Down Expand Up @@ -276,39 +322,11 @@ def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]:
def get_config(self, settings: SerializationSettings) -> Dict[str, str]:
return None

@abstractmethod
def dispatch_execute(
self,
ctx: FlyteContext,
input_literal_map: _literal_models.LiteralMap,
) -> _literal_models.LiteralMap:
"""
This method translates Flyte's Type system based input values and invokes the actual call to the executor
This method is also invoked during runtime.
"""
pass

@abstractmethod
def pre_execute(self, user_params: ExecutionParameters) -> ExecutionParameters:
"""
This is the method that will be invoked directly before executing the task method and before all the inputs
are converted. One particular case where this is useful is if the context is to be modified for the user process
to get some user space parameters. This also ensures that things like SparkSession are already correctly
setup before the type transformers are called
This should return either the same context of the mutated context
"""
pass

@abstractmethod
def execute(self, **kwargs) -> Any:
pass


T = TypeVar("T")


class PythonTask(TrackedInstance, Task, Generic[T]):
class PythonTask(TrackedInstance, Task, Generic[T], metaclass=FlyteTrackedABC): # noqa: doesn't realize it's ABC
"""
Base Class for all Tasks with a Python native ``Interface``. This should be directly used for task types, that do
not have a python function to be executed. Otherwise refer to :py:class:`flytekit.PythonFunctionTask`.
Expand Down Expand Up @@ -343,7 +361,6 @@ def __init__(
self._environment = environment if environment else {}
self._task_config = task_config

# TODO lets call this interface and the other as flyte_interface?
@property
def python_interface(self) -> Interface:
return self._python_interface
Expand Down Expand Up @@ -457,32 +474,6 @@ def dispatch_execute(
# After the execute has been successfully completed
return outputs_literal_map

def pre_execute(self, user_params: ExecutionParameters) -> ExecutionParameters:
"""
This is the method that will be invoked directly before executing the task method and before all the inputs
are converted. One particular case where this is useful is if the context is to be modified for the user process
to get some user space parameters. This also ensures that things like SparkSession are already correctly
setup before the type transformers are called
This should return either the same context of the mutated context
"""
return user_params

@abstractmethod
def execute(self, **kwargs) -> Any:
pass

def post_execute(self, user_params: ExecutionParameters, rval: Any) -> Any:
"""
Post execute is called after the execution has completed, with the user_params and can be used to clean-up,
or alter the outputs to match the intended tasks outputs. If not overriden, then this function is a No-op
Args:
rval is returned value from call to execute
user_params: are the modified user params as created during the pre_execute step
"""
return rval

@property
def environment(self) -> Dict[str, str]:
return self._environment
Expand Down Expand Up @@ -536,7 +527,7 @@ def name(self) -> str:
pass

@abstractmethod
def load_task(self, loader_args: List[str]) -> Task:
def load_task(self, loader_args: List[str]) -> ExecutableTaskMixin:
"""
Given the set of identifier keys, should return one Python Task or raise an error if not found
"""
Expand Down
4 changes: 2 additions & 2 deletions flytekit/core/python_auto_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import importlib
import re
from abc import abstractmethod
from abc import ABC
from typing import Dict, List, Optional, TypeVar

from flytekit.common.tasks.raw_container import _get_container_definition
Expand All @@ -18,7 +18,7 @@
T = TypeVar("T")


class PythonAutoContainerTask(PythonTask[T], metaclass=FlyteTrackedABC):
class PythonAutoContainerTask(PythonTask[T], metaclass=FlyteTrackedABC): # noqa: doesn't realize it's ABC
"""
A Python AutoContainer task should be used as the base for all extensions that want the user's code to be in the
container and the container information to be automatically captured.
Expand Down
Loading

0 comments on commit 760da2b

Please sign in to comment.