From 68cd7beb1800bd32e7bb6b7b77fad7fecec01998 Mon Sep 17 00:00:00 2001 From: Haytham Abuelfutuh Date: Thu, 1 Jul 2021 13:44:13 -0700 Subject: [PATCH] Add Usage Examples to public APIs (#521) Signed-off-by: wild-endeavor --- docs/source/_templates/custom.rst | 37 ++++ docs/source/conf.py | 26 ++- docs/source/design/authoring.rst | 8 +- docs/source/design/clis.rst | 4 + docs/source/reference/index.rst | 24 --- docs/source/tasks.extend.rst | 12 +- flytekit/__init__.py | 52 ++--- flytekit/clients/friendly.py | 11 + flytekit/common/tasks/sdk_runnable.py | 9 +- flytekit/common/translator.py | 1 - flytekit/core/base_sql_task.py | 10 +- flytekit/core/base_task.py | 127 ++++++++--- flytekit/core/condition.py | 8 +- flytekit/core/container_task.py | 6 + flytekit/core/context_manager.py | 156 +++++++++++--- flytekit/core/dynamic_workflow_task.py | 4 +- flytekit/core/launch_plan.py | 57 ++++- flytekit/core/map_task.py | 15 +- flytekit/core/notification.py | 20 +- flytekit/core/python_auto_container.py | 33 ++- .../core/python_customized_container_task.py | 41 ++-- flytekit/core/python_function_task.py | 37 +++- flytekit/core/reference.py | 22 ++ flytekit/core/resources.py | 5 +- flytekit/core/schedule.py | 12 ++ flytekit/core/shim_task.py | 5 +- flytekit/core/task.py | 20 ++ flytekit/core/type_engine.py | 29 ++- flytekit/core/workflow.py | 57 ++++- flytekit/extend/__init__.py | 6 +- flytekit/extras/sqlite3/task.py | 24 ++- flytekit/models/security.py | 7 + flytekit/types/directory/__init__.py | 2 +- flytekit/types/file/__init__.py | 2 +- flytekit/types/file/file.py | 2 +- flytekit/types/schema/__init__.py | 2 +- .../flytekitplugins/athena/task.py | 17 +- tests/flytekit/unit/core/test_imperative.py | 22 +- tests/flytekit/unit/core/test_launch_plan.py | 204 ++++++++++++++---- tests/flytekit/unit/core/test_map_task.py | 25 ++- tests/flytekit/unit/core/test_references.py | 50 +++-- tests/flytekit/unit/core/test_workflows.py | 49 +++++ .../flytekit/unit/extras/sqlite3/test_task.py | 6 +- 43 files changed, 997 insertions(+), 269 deletions(-) create mode 100644 docs/source/_templates/custom.rst delete mode 100644 docs/source/reference/index.rst diff --git a/docs/source/_templates/custom.rst b/docs/source/_templates/custom.rst new file mode 100644 index 0000000000..f7cfbd333b --- /dev/null +++ b/docs/source/_templates/custom.rst @@ -0,0 +1,37 @@ +{{ fullname | escape | underline}} + +.. currentmodule:: {{ module }} + +{% if objtype == 'class' %} + +.. autoclass:: {{ objname }} + + {% block methods %} + {% if methods %} + + .. rubric:: {{ _('Methods') }} + {% for item in methods %} + .. automethod:: {{ item }} + {%- endfor %} + {% endif %} + {% endblock %} + + {% block attributes %} + {% if attributes %} + + .. rubric:: {{ _('Attributes') }} + {% for item in attributes %} + .. autoattribute:: {{ item }} + {%- endfor %} + + {% endif %} + {% endblock %} + + +{% endif %} + +{% if objtype == 'function' %} + +.. autofunction:: {{ objname }} + +{% endif %} diff --git a/docs/source/conf.py b/docs/source/conf.py index 644eb18263..166c464051 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -27,17 +27,16 @@ # The full version, including alpha/beta/rc tags release = "0.16.0b9" - # -- General configuration --------------------------------------------------- # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. extensions = [ + "sphinx.ext.napoleon", "sphinx.ext.autodoc", "sphinx.ext.autosummary", "sphinx.ext.autosectionlabel", - "sphinx.ext.napoleon", "sphinx.ext.todo", "sphinx.ext.viewcode", "sphinx.ext.doctest", @@ -83,7 +82,6 @@ # This pattern also affects html_static_path and html_extra_path . exclude_patterns = [] - # -- Options for HTML output ------------------------------------------------- # The theme to use for HTML and HTML Help pages. See the documentation for @@ -120,7 +118,7 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ["_static"] +html_static_path = [] # Custom sidebar templates, must be a dictionary that maps document names # to template names. @@ -134,13 +132,11 @@ "home_page": "https://docs.flyte.org", } - # -- Options for HTMLHelp output --------------------------------------------- # Output file base name for HTML help builder. htmlhelp_basename = "flytekitdoc" - # -- Options for LaTeX output ------------------------------------------------ latex_elements = { @@ -165,14 +161,12 @@ (master_doc, "flytekit.tex", "Flytekit Documentation", "Flyte", "manual"), ] - # -- Options for manual page output ------------------------------------------ # One entry per manual page. List of tuples # (source start file, name, description, authors, manual section). man_pages = [(master_doc, "flytekit", "Flytekit Documentation", [author], 1)] - # -- Options for Texinfo output ---------------------------------------------- # Grouping the document tree into Texinfo files. List of tuples @@ -190,13 +184,27 @@ ), ] - # -- Extension configuration ------------------------------------------------- # intersphinx configuration intersphinx_mapping = { "python": ("https://docs.python.org/3", None), "flytectl": ("https://flytectl.readthedocs.io/en/latest/", None), + "idl": ("https://flyteidl.readthedocs.io/en/latest/", None), # "flytectl": ("/Users/yourusername/go/src/github.com/flyteorg/flytectl/docs/build/html", None), "cookbook": ("https://flytecookbook.readthedocs.io/en/latest/", None), "flyte": ("https://flyte.readthedocs.io/en/latest/", None), } + +inheritance_graph_attrs = { + "resolution": 300.0, +} + +inheritance_node_attrs = { + "bgcolor": "aliceblue", +} + +inheritance_edge_attrs = { + "color": "darkgray", +} + +autoclass_content = "both" diff --git a/docs/source/design/authoring.rst b/docs/source/design/authoring.rst index 62a953bf30..b69d7f270a 100644 --- a/docs/source/design/authoring.rst +++ b/docs/source/design/authoring.rst @@ -41,7 +41,7 @@ Please see the documentation on each of the classes for details. .. autoclass:: flytekit.core.base_task.PythonTask :noindex: -.. autoclass:: flytekit.core.python_function_task.PythonAutoContainerTask +.. autoclass:: flytekit.core.python_auto_container.PythonAutoContainerTask :noindex: .. autoclass:: flytekit.core.python_function_task.PythonFunctionTask @@ -49,11 +49,15 @@ Please see the documentation on each of the classes for details. Workflows ========= -There is currently only one :py:class:`Workflow ` class. +There are two workflow classes, which both inherit from the :py:class:`WorkflowBase ` class. .. autoclass:: flytekit.core.workflow.PythonFunctionWorkflow :noindex: +.. autoclass:: flytekit.core.workflow.ImperativeWorkflow + :noindex: + + Launch Plan =========== There is also only one :py:class:`LaunchPlan ` class. diff --git a/docs/source/design/clis.rst b/docs/source/design/clis.rst index a84904bcb6..fb01c0a728 100644 --- a/docs/source/design/clis.rst +++ b/docs/source/design/clis.rst @@ -14,6 +14,10 @@ The client code is located in ``flytekit/clients`` and there are two. * Similar to the :ref:`design-models` files, but a bit more complex, the ``raw`` one is basically a wrapper around the Protobuf generated code, with some handling for authentication in place, and acts as a mechanism for autocompletion and comments. * The ``friendly`` client uses the ``raw`` client, adds handling of things like pagination, and is structurally more aligned with the functionality and call pattern of the CLI itself. +.. autoclass:: flytekit.clients.friendly.SynchronousFlyteClient + +.. autoclass:: flytekit.clients.raw.RawSynchronousFlyteClient + *********************** Command Line Interfaces *********************** diff --git a/docs/source/reference/index.rst b/docs/source/reference/index.rst deleted file mode 100644 index f77a447060..0000000000 --- a/docs/source/reference/index.rst +++ /dev/null @@ -1,24 +0,0 @@ -############# -API Reference -############# - -.. toctree:: - :maxdepth: 1 - :caption: API Reference - :name: apitoc - - Flytekit Python - Flytekit Java - FlyteIDL - Flytectl - -.. toctree:: - :maxdepth: 1 - :caption: Component Reference (Code docs) - :name: componentreftoc - - FlytePropeller - FlyteAdmin - FlytePlugins - DataCatalog - \ No newline at end of file diff --git a/docs/source/tasks.extend.rst b/docs/source/tasks.extend.rst index f01e876024..31cfb124da 100644 --- a/docs/source/tasks.extend.rst +++ b/docs/source/tasks.extend.rst @@ -1,11 +1,15 @@ ############################# -Build your custom task types +Build Custom Task Types ############################# - +These modules are useful to extend the base task types. .. automodule:: flytekit.core.base_task - :members: + :no-members: + :no-inherited-members: + :no-special-members: .. automodule:: flytekit.core.python_function_task - :members: + :no-members: + :no-inherited-members: + :no-special-members: diff --git a/flytekit/__init__.py b/flytekit/__init__.py index 7163d302b8..2bfdbae885 100644 --- a/flytekit/__init__.py +++ b/flytekit/__init__.py @@ -10,19 +10,21 @@ Basic Authoring =============== -These are the essentials needed to get started writing tasks and workflows. The elements here correspond well with :std:ref:`Basic ` section of the cookbook. +These are the essentials needed to get started writing tasks and workflows. The elements here correspond well with :std:ref:`Basics ` section of the user guide. .. autosummary:: :nosignatures: + :template: custom.rst :toctree: generated/ - task - This is the basic decorator to use to turn a properly type-annotated function into a Flyte task. - workflow - This will turn a properly type-annotated function into a Flyte workflow. - kwtypes - Helper function that makes it slightly easier to declare certain functions. - current_context - This function returns an ExecutionParameters object. - ExecutionParameters - This object gives the user at Task run time useful information about the execution environment. - FlyteContext - This is an flytekit-internal context that is sometimes useful for end-users to have access to. - + task + workflow + kwtypes + current_context + ExecutionParameters + FlyteContext + map_task + ~core.workflow.ImperativeWorkflow Running Locally ------------------ @@ -37,6 +39,7 @@ .. autosummary:: :nosignatures: + :template: custom.rst :toctree: generated/ conditional @@ -47,6 +50,7 @@ .. autosummary:: :nosignatures: + :template: custom.rst :toctree: generated/ TaskMetadata - Wrapper object that allows users to specify Task @@ -60,22 +64,30 @@ .. autosummary:: :nosignatures: + :template: custom.rst :toctree: generated/ dynamic -Scheduling and Notifications +Scheduling ============================ -See the :py:mod:`Notifications Module ` and -:py:mod:`Schedules Module ` for more information. - .. autosummary:: :nosignatures: + :template: custom.rst :toctree: generated/ CronSchedule FixedRate + +Notifications +============================ + +.. autosummary:: + :nosignatures: + :template: custom.rst + :toctree: generated/ + Email PagerDuty Slack @@ -85,6 +97,7 @@ .. autosummary:: :nosignatures: + :template: custom.rst :toctree: generated/ get_reference_entity @@ -99,6 +112,7 @@ .. autosummary:: :nosignatures: + :template: custom.rst :toctree: generated/ SQLTask @@ -112,22 +126,12 @@ .. autosummary:: :nosignatures: + :template: custom.rst :toctree: generated/ Secret SecurityContext -Core Modules -============= - -.. autosummary:: - :nosignatures: - :toctree: generated/ - - core.dynamic_workflow_task - core.notification - core.schedule - """ @@ -161,7 +165,7 @@ def current_context() -> ExecutionParameters: Usage - .. code-block:: + .. code-block:: python flytekit.current_context().logging.info(...) diff --git a/flytekit/clients/friendly.py b/flytekit/clients/friendly.py index 3dc78db395..4a42994ef9 100644 --- a/flytekit/clients/friendly.py +++ b/flytekit/clients/friendly.py @@ -28,6 +28,17 @@ class SynchronousFlyteClient(_RawSynchronousFlyteClient): + """ + This is a low-level client that users can use to make direct gRPC service calls to the control plane. See the + :std:doc:`service spec `. This is more user-friendly interface than the + :py:class:`raw client ` so users should try to use this class + first. Create a client by :: + + SynchronousFlyteClient("your.domain:port", insecure=True) + # insecure should be True if your flyteadmin deployment doesn't have SSL enabled + + """ + @property def raw(self): """ diff --git a/flytekit/common/tasks/sdk_runnable.py b/flytekit/common/tasks/sdk_runnable.py index ec43524408..1bb2ebdb5a 100644 --- a/flytekit/common/tasks/sdk_runnable.py +++ b/flytekit/common/tasks/sdk_runnable.py @@ -91,12 +91,19 @@ def check_group_key(group: str, key: str): # TODO: Clean up working dir name class ExecutionParameters(object): """ - This is the context object that is accessible to every @task method. It can be accessed using + This is a run-time user-centric context object that is accessible to every @task method. It can be accessed using .. code-block:: python flytekit.current_context() + This object provides the following + * a statsd handler + * a logging handler + * the execution ID as an :py:class:`flytekit.models.core.identifier.WorkflowExecutionIdentifier` object + * a working directory for the user to write arbitrary files to + + Please do not confuse this object with the :py:class:`flytekit.FlyteContext` object. """ @dataclass(init=False) diff --git a/flytekit/common/translator.py b/flytekit/common/translator.py index 5d26fb6a59..ae1779a706 100644 --- a/flytekit/common/translator.py +++ b/flytekit/common/translator.py @@ -360,7 +360,6 @@ def get_serializable( the parent entity this function is called with. :param settings: used to pick up project/domain/name - to be deprecated. :param entity: The local flyte entity to try to convert (along with its dependencies) - :param fast: For tasks only, fast serialization produces a different command. :return: The resulting control plane entity, in addition to being added to the mutable entity_mapping parameter is also returned. """ diff --git a/flytekit/core/base_sql_task.py b/flytekit/core/base_sql_task.py index d651ecfe1f..7e24cfb886 100644 --- a/flytekit/core/base_sql_task.py +++ b/flytekit/core/base_sql_task.py @@ -9,7 +9,11 @@ class SQLTask(PythonTask[T]): """ - Base task types for all SQL tasks + Base task types for all SQL tasks. See :py:class:`flytekit.extras.sqlite3.task.SQLite3Task` + and :py:class:`flytekitplugins.athena.task.AthenaTask` for examples of how to use it as a base class. + + .. autoclass:: flytekit.extras.sqlite3.task.SQLite3Task + :noindex: """ _INPUT_REGEX = re.compile(r"({{\s*.inputs.(\w+)\s*}})", re.IGNORECASE) @@ -25,6 +29,10 @@ def __init__( outputs: Dict[str, Type] = None, **kwargs, ): + """ + This SQLTask should mostly just be used as a base class for other SQL task types and should not be used + directly. See :py:class:`flytekit.extras.sqlite3.task.SQLite3Task` + """ super().__init__( task_type=task_type, name=name, diff --git a/flytekit/core/base_task.py b/flytekit/core/base_task.py index 567e9f53d3..d851cd702e 100644 --- a/flytekit/core/base_task.py +++ b/flytekit/core/base_task.py @@ -1,3 +1,22 @@ +""" +============================== +:mod:`flytekit.core.base_task` +============================== + +.. currentmodule:: flytekit.core.base_task + +.. autosummary:: + :toctree: generated/ + + kwtypes + PythonTask + Task + TaskMetadata + TaskResolverMixin + IgnoreOutputs + +""" + import collections import datetime from abc import abstractmethod @@ -36,7 +55,11 @@ def kwtypes(**kwargs) -> Dict[str, Type]: """ - Converts the keyword arguments to typed dictionary + This is a small helper function to convert the keyword arguments to an OrderedDict of types. + + .. code-block:: python + + kwtypes(a=int, b=str) """ d = collections.OrderedDict() for k, v in kwargs.items(): @@ -47,19 +70,23 @@ def kwtypes(**kwargs) -> Dict[str, Type]: @dataclass class TaskMetadata(object): """ - Create Metadata to be associated with this Task + Metadata for a Task. Things like retries and whether or not caching is turned on, and cache version are specified + here. + + See the :std:ref:`IDL ` for the protobuf definition. Args: - cache: Boolean that indicates if caching should be enabled - cache_version: Version string to be used for the cached value - interruptible: Boolean that indicates that this task can be interrupted and/or scheduled on nodes - with lower QoS guarantees. This will directly reduce the `$`/`execution cost` associated, - at the cost of performance penalties due to potential interruptions - deprecated: A string that can be used to provide a warning message for deprecated task. Absence / empty str - indicates that the task is active and not deprecated - retries: for retries=n; n > 0, on failures of this task, the task will be retried at-least n number of times. - timeout: the max amount of time for which one execution of this task should be executed for. If the execution - will be terminated if the runtime exceeds the given timeout (approximately) + cache (bool): Indicates if caching should be enabled + cache_version (str): Version to be used for the cached value + interruptible (Optional[bool]): Indicates that this task can be interrupted and/or scheduled on nodes with + lower QoS guarantees that can include pre-emption. This can reduce the monetary cost executions incur at the + cost of performance penalties due to potential interruptions + deprecated (str): Can be used to provide a warning message for deprecated task. Absence or empty str indicates + that the task is active and not deprecated + retries (int): for retries=n; n > 0, on failures of this task, the task will be retried at-least n number of times. + timeout (Optional[Union[datetime.timedelta, int]]): the max amount of time for which one execution of this task + should be executed for. The execution will be terminated if the runtime exceeds the given timeout + (approximately) """ cache: bool = False @@ -106,7 +133,8 @@ class IgnoreOutputs(Exception): This exception should be used to indicate that the outputs generated by this can be safely ignored. This is useful in case of distributed training or peer-to-peer parallel algorithms. - For example look at Sagemaker training. + For example look at Sagemaker training, e.g. + :py:class:`plugins.awssagemaker.flytekitplugins.awssagemaker.training.SagemakerBuiltinAlgorithmsTask`. """ pass @@ -115,8 +143,8 @@ class IgnoreOutputs(Exception): class Task(object): """ The base of all Tasks in flytekit. This task is closest to the FlyteIDL TaskTemplate and captures information in - FlyteIDL specification and does not have python native interfaces associated. For any real extension please - refer to the derived classes. + FlyteIDL specification and does not have python native interfaces associated. Refer to the derived classes for + examples of how to extend this class. """ def __init__( @@ -274,15 +302,28 @@ def compile(self, ctx: FlyteContext, *args, **kwargs): raise Exception("not implemented") def get_container(self, settings: SerializationSettings) -> _task_model.Container: + """ + Returns the container definition (if any) that is used to run the task on hosted Flyte. + """ return None def get_k8s_pod(self, settings: SerializationSettings) -> _task_model.K8sPod: + """ + Returns the kubernetes pod definition (if any) that is used to run the task on hosted Flyte. + """ return None def get_custom(self, settings: SerializationSettings) -> Dict[str, Any]: + """ + Return additional plugin-specific custom data (if any) as a serializable dictionary. + """ return None def get_config(self, settings: SerializationSettings) -> Dict[str, str]: + """ + Returns the task config as a serializable dictionary. This task config consists of metadata about the custom + defined for this task. + """ return None @abstractmethod @@ -311,6 +352,9 @@ def pre_execute(self, user_params: ExecutionParameters) -> ExecutionParameters: @abstractmethod def execute(self, **kwargs) -> Any: + """ + This method will be invoked to execute the task. + """ pass @@ -334,13 +378,15 @@ def __init__( ): """ Args: - task_type: a string that defines a unique task-type for every new extension. If a backend plugin is required - then this has to be done in-concert with the backend plugin identifier - name: A unique name for the task instantiation. This is unique for every instance of task. - task_config: Configuration for the task. This is used to configure the specific plugin that handles this task - interface: A python native typed interface ``(inputs) -> outputs`` that declares the signature of the task - environment: Any environment variables that should be supplied during the execution of the task. Supplied as - a dictionary of key/value pairs + task_type (str): defines a unique task-type for every new extension. If a backend plugin is required then + this has to be done in-concert with the backend plugin identifier + name (str): A unique name for the task instantiation. This is unique for every instance of task. + task_config (T): Configuration for the task. This is used to configure the specific plugin that handles this + task + interface (Optional[Interface]): A python native typed interface ``(inputs) -> outputs`` that declares the + signature of the task + environment (Optional[Dict[str, str]]): Any environment variables that should be supplied during the + execution of the task. Supplied as a dictionary of key/value pairs """ super().__init__( task_type=task_type, @@ -355,22 +401,40 @@ def __init__( # TODO lets call this interface and the other as flyte_interface? @property def python_interface(self) -> Interface: + """ + Returns this task's python interface. + """ return self._python_interface @property def task_config(self) -> T: + """ + Returns the user-specified task config which is used for plugin-specific handling of the task. + """ return self._task_config def get_type_for_input_var(self, k: str, v: Any) -> Optional[Type[Any]]: + """ + Returns the python type for an input variable by name. + """ return self._python_interface.inputs[k] def get_type_for_output_var(self, k: str, v: Any) -> Optional[Type[Any]]: + """ + Returns the python type for the specified output variable by name. + """ return self._python_interface.outputs[k] def get_input_types(self) -> Optional[Dict[str, type]]: + """ + Returns the names and python types as a dictionary for the inputs of this task. + """ return self._python_interface.inputs def construct_node_metadata(self) -> _workflow_model.NodeMetadata: + """ + Used when constructing the node that encapsulates this task as part of a broader workflow definition. + """ return _workflow_model.NodeMetadata( name=f"{self.__module__}.{self.name}", timeout=self.metadata.timeout, @@ -379,6 +443,9 @@ def construct_node_metadata(self) -> _workflow_model.NodeMetadata: ) def compile(self, ctx: FlyteContext, *args, **kwargs): + """ + Generates a node that encapsulates this task in a workflow definition. + """ return create_and_link_node(ctx, entity=self, **kwargs) @property @@ -478,6 +545,9 @@ def pre_execute(self, user_params: ExecutionParameters) -> ExecutionParameters: @abstractmethod def execute(self, **kwargs) -> Any: + """ + This method will be invoked to execute the task. + """ pass def post_execute(self, user_params: ExecutionParameters, rval: Any) -> Any: @@ -493,6 +563,9 @@ def post_execute(self, user_params: ExecutionParameters, rval: Any) -> Any: @property def environment(self) -> Dict[str, str]: + """ + Any environment variables that supplied during the execution of the task. + """ return self._environment @@ -500,9 +573,9 @@ class TaskResolverMixin(object): """ Flytekit tasks interact with the Flyte platform very, very broadly in two steps. They need to be uploaded to Admin, and then they are run by the user upon request (either as a single task execution or as part of a workflow). In any - case, at execution time, the container image containing the task needs to be spun up again (for container tasks at - least which most tasks are) at which point the container needs to know which task it's supposed to run and - how to rehydrate the task object. + case, at execution time, for most tasks (that is those that generate a container target) the container image + containing the task needs to be spun up again at which point the container needs to know which task it's supposed + to run and how to rehydrate the task object. For example, the serialization of a simple task :: @@ -524,12 +597,12 @@ def t1(...) -> ...: ... #. the ``location`` of the task's task resolver, followed by two dashes, followed by #. the arguments provided by calling the ``loader_args`` function below. - The ``default_task_resolver`` declared below knows that :: + The ``default_task_resolver`` declared below knows that * When ``loader_args`` is called on a task, to look up the module the task is in, and the name of the task (the key of the task in the module, either the function name, or the variable it was assigned to). * When ``load_task`` is called, it interprets the first part of the command as the module to call - ``importlib.import_module`` on, and then looks for a key ``t1``. + ``importlib.import_module`` on, and then looks for a key ``t1``. This is just the default behavior. Users should feel free to implement their own resolvers. """ diff --git a/flytekit/core/condition.py b/flytekit/core/condition.py index e69aff8863..9d88910afa 100644 --- a/flytekit/core/condition.py +++ b/flytekit/core/condition.py @@ -399,12 +399,12 @@ def conditional(name: str) -> ConditionalSection: context. Outside of a workflow they will raise an Assertion. The ``conditional`` method returns a new conditional section, that allows to create a - ternary operator like - if-else clauses. The reason why it is called ternary-like is because, it returns the output of the branch result. - So in-effect it is a functional style condition. + if-else clauses. The reason why it is called ternary-like is because, it returns the output of the branch result. + So in-effect it is a functional style condition. - Example of a condition usage. Note the nesting and the assignment to a LHS variable + Example of a condition usage. Note the nesting and the assignment to a LHS variable - .. code-block:: python + .. code-block:: python v = ( conditional("fractions") diff --git a/flytekit/core/container_task.py b/flytekit/core/container_task.py index 13d1a6e731..4c5bad647f 100644 --- a/flytekit/core/container_task.py +++ b/flytekit/core/container_task.py @@ -10,6 +10,12 @@ class ContainerTask(PythonTask): + """ + This is an intermediate class that represents Flyte Tasks that run a container at execution time. This is the vast + majority of tasks - the typical ``@task`` decorated tasks for instance all run a container. An example of + something that doesn't run a container would be something like the Athena SQL task. + """ + class MetadataFormat(Enum): JSON = _task_model.DataLoadingConfig.LITERALMAP_FORMAT_JSON YAML = _task_model.DataLoadingConfig.LITERALMAP_FORMAT_YAML diff --git a/flytekit/core/context_manager.py b/flytekit/core/context_manager.py index 7623780c5a..060750b892 100644 --- a/flytekit/core/context_manager.py +++ b/flytekit/core/context_manager.py @@ -1,3 +1,16 @@ +""" + +.. autoclass:: flytekit.core.context_manager::ExecutionState.Mode + :noindex: +.. autoclass:: flytekit.core.context_manager::ExecutionState.Mode.TASK_EXECUTION + :noindex: +.. autoclass:: flytekit.core.context_manager::ExecutionState.Mode.LOCAL_WORKFLOW_EXECUTION + :noindex: +.. autoclass:: flytekit.core.context_manager::ExecutionState.Mode.LOCAL_TASK_EXECUTION + :noindex: + +""" + from __future__ import annotations import datetime as _datetime @@ -31,21 +44,49 @@ @dataclass(init=True, repr=True, eq=True, frozen=True) class Image(object): + """ + Image is a structured wrapper for task container images used in object serialization. + + Attributes: + name (str): A user-provided name to identify this image. + fqn (str): Fully qualified image name. This consists of + #. a registry location + #. a username + #. a repository name + For example: `hostname/username/reponame` + tag (str): Optional tag used to specify which version of an image to pull + """ + name: str fqn: str tag: str @property - def full(self): + def full(self) -> str: + """ " + Return the full image name with tag. + """ return f"{self.fqn}:{self.tag}" @dataclass(init=True, repr=True, eq=True, frozen=True) class ImageConfig(object): + """ + ImageConfig holds available images which can be used at registration time. A default image can be specified + along with optional additional images. Each image in the config must have a unique name. + + Attributes: + default_image (str): The default image to be used as a container for task serialization. + images (List[Image]): Optional, additional images which can be used in task container definitions. + """ + default_image: Image = None images: List[Image] = None def find_image(self, name) -> Optional[Image]: + """ + Return an image, by name, if it exists. + """ for i in self.images: if i.name == name: return i @@ -85,13 +126,6 @@ def get_image_config(img_name: Optional[str] = None) -> ImageConfig: return ImageConfig(default_image=default_img, images=other_images) -@dataclass -class InstanceVar(object): - module: str - name: str - o: Any - - @dataclass class EntrypointSettings(object): """ @@ -119,6 +153,23 @@ class SerializationSettings(object): """ These settings are provided while serializing a workflow and task, before registration. This is required to get runtime information at serialization time, as well as some defaults. + + Attributes: + project (str): The project (if any) with which to register entities under. + domain (str): The domain (if any) with which to register entities under. + version (str): The version (if any) with which to register entities under. + image_config (ImageConfig): The image config used to define task container images. + env (Optional[Dict[str, str]]): Environment variables injected into task container definitions. + flytekit_virtualenv_root (Optional[str]): During out of container serialize the absolute path of the flytekit + virtualenv at serialization time won't match the in-container value at execution time. This optional value + is used to provide the in-container virtualenv path + python_interpreter (Optional[str]): The python executable to use. This is used for spark tasks in out of + container execution. + entrypoint_settings (Optional[EntrypointSettings]): Information about the command, path and version of the + entrypoint program. + fast_serialization_settings (Optional[FastSerializationSettings]): If the code is being serialized so that it + can be fast registered (and thus omit building a Docker image) this object contains additional parameters + for serialization. """ project: str @@ -161,6 +212,10 @@ def build(self) -> SerializationSettings: ) def new_builder(self) -> Builder: + """ + Creates a ``SerializationSettings.Builder`` that copies the existing serialization settings parameters and + allows for customization. + """ return SerializationSettings.Builder( project=self.project, domain=self.domain, @@ -174,6 +229,9 @@ def new_builder(self) -> Builder: ) def should_fast_serialize(self) -> bool: + """ + Whether or not the serialization settings specify that entities should be serialized for fast registration. + """ return self.fast_serialization_settings is not None and self.fast_serialization_settings.enabled @@ -183,10 +241,13 @@ class CompilationState(object): Compilation state is used during the compilation of a workflow or task. It stores the nodes that were created when walking through the workflow graph. - :param prefix: This is because we may one day want to be able to have subworkflows inside other workflows. If - users choose to not specify their node names, then we can end up with multiple "n0"s. This prefix allows - us to give those nested nodes a distinct name, as well as properly identify them in the workflow. - :param task_resolver: Please see :py:class:`flytekit.extend.TaskResolverMixin` + Attributes: + prefix (str): This is because we may one day want to be able to have subworkflows inside other workflows. If + users choose to not specify their node names, then we can end up with multiple "n0"s. This prefix allows + us to give those nested nodes a distinct name, as well as properly identify them in the workflow. + mode (int): refer to :py:class:`flytekit.extend.ExecutionState.Mode` + task_resolver (Optional["TaskResolverMixin"]): Please see :py:class:`flytekit.extend.TaskResolverMixin` + nodes (Optional[List]): Stores currently compiled nodes so far. """ prefix: str @@ -221,11 +282,10 @@ def with_params( class BranchEvalMode(Enum): """ - This is a 4-way class, with the None value meaning that we are not within a conditional context. The other two + This is a 3-way class, with the None value meaning that we are not within a conditional context. The other two values are * Active - This means that the next ``then`` should run * Skipped - The next ``then`` should not run - * Ignored - This is only used in nested conditionals, and it means that the entire conditional should be ignored. """ BRANCH_ACTIVE = "branch active" @@ -239,26 +299,41 @@ class ExecutionState(object): execute. Some required things during execution deal with temporary directories, ExecutionParameters that are passed to the user etc. + + Attributes: + mode (ExecutionState.Mode): Defines the context in which the task is executed (local, hosted, etc). + working_dir (os.PathLike): Specifies the remote, external directory where inputs, outputs and other protobufs + are uploaded + engine_dir (os.PathLike): + additional_context Optional[Dict[Any, Any]]: Free form dictionary used to store additional values, for example + those used for dynamic, fast registration. + branch_eval_mode Optional[BranchEvalMode]: Used to determine whether a branch node should execute. + user_space_params Optional[ExecutionParameters]: Provides run-time, user-centric context such as a statsd + handler, a logging handler, the current execution id and a working directory. """ class Mode(Enum): - # This is the mode that is used when a task execution mimics the actual runtime environment. - # NOTE: This is important to understand the difference between TASK_EXECUTION and LOCAL_TASK_EXECUTION - # LOCAL_TASK_EXECUTION, is the mode that is run purely locally and in some cases the difference between local - # and runtime environment may be different. For example for Dynamic tasks local_task_execution will just run it - # as a regular function, while task_execution will extract a runtime spec + """ + Defines the possible execution modes, which in turn affects execution behavior. + """ + + #: This is the mode that is used when a task execution mimics the actual runtime environment. + #: NOTE: This is important to understand the difference between TASK_EXECUTION and LOCAL_TASK_EXECUTION + #: LOCAL_TASK_EXECUTION, is the mode that is run purely locally and in some cases the difference between local + #: and runtime environment may be different. For example for Dynamic tasks local_task_execution will just run it + #: as a regular function, while task_execution will extract a runtime spec TASK_EXECUTION = 1 - # This represents when flytekit is locally running a workflow. The behavior of tasks differs in this case - # because instead of running a task's user defined function directly, it'll need to wrap the return values in - # NodeOutput + #: This represents when flytekit is locally running a workflow. The behavior of tasks differs in this case + #: because instead of running a task's user defined function directly, it'll need to wrap the return values in + #: NodeOutput LOCAL_WORKFLOW_EXECUTION = 2 - # This is the mode that is used to to indicate a purely local task execution - i.e. running without a container - # or propeller. + #: This is the mode that is used to to indicate a purely local task execution - i.e. running without a container + #: or propeller. LOCAL_TASK_EXECUTION = 3 - mode: Mode + mode: ExecutionState.Mode working_dir: os.PathLike engine_dir: os.PathLike additional_context: Optional[Dict[Any, Any]] @@ -268,7 +343,7 @@ class Mode(Enum): def __init__( self, working_dir: os.PathLike, - mode: Optional[Mode] = None, + mode: Optional[ExecutionState.Mode] = None, engine_dir: Optional[os.PathLike] = None, additional_context: Optional[Dict[Any, Any]] = None, branch_eval_mode: Optional[BranchEvalMode] = None, @@ -307,6 +382,9 @@ def with_params( branch_eval_mode: Optional[BranchEvalMode] = None, user_space_params: Optional[ExecutionParameters] = None, ) -> ExecutionState: + """ + Produces a copy of the current execution state and overrides the copy's parameters with passed parameter values. + """ if self.additional_context: if additional_context: additional_context = {**self.additional_context, **additional_context} @@ -326,8 +404,14 @@ def with_params( @dataclass(frozen=True) class FlyteContext(object): """ - Top level context for FlyteKit. maintains information that is required either to compile or execute a - workflow / task + This is an internal-facing context object, that most users will not have to deal with. It's essentially a globally + available grab bag of settings and objects that allows flytekit to do things like convert complex types, run and + compile workflows, serialize Flyte entities, etc. + + Even though this object as a ``current_context`` function on it, it should not be called directly. Please use the + :py:class:`flytekit.FlyteContextManager` object instead. + + Please do not confuse this object with the :py:class:`flytekit.ExecutionParameters` object. """ file_access: Optional[_data_proxy.FileAccessProvider] @@ -403,12 +487,11 @@ def new_execution_state(self, working_dir: Optional[os.PathLike] = None) -> Exec @staticmethod def current_context() -> FlyteContext: """ - This method exists only to maintain backwards compatibility. Please use FlyteContextManager.current_context() - If you are a user of flytekit, use - ``` - import flytekit - flytekit.current_context() - ``` + This method exists only to maintain backwards compatibility. Please use + ``FlyteContextManager.current_context()`` instead. + + Users of flytekit should be wary not to confuse the object returned from this function + with :py:func:`flytekit.current_context` """ return FlyteContextManager.current_context() @@ -499,7 +582,10 @@ class FlyteContextManager(object): Context's within Flytekit is useful to manage compilation state and execution state. Refer to ``CompilationState`` and ``ExecutionState`` for for information. FlyteContextManager provides a singleton stack to manage these contexts. - Typical usage is: + Typical usage is + + .. code-block:: python + FlyteContextManager.initialize() with FlyteContextManager.with_context(o) as ctx: pass diff --git a/flytekit/core/dynamic_workflow_task.py b/flytekit/core/dynamic_workflow_task.py index bd59c02da0..59ab6fffe8 100644 --- a/flytekit/core/dynamic_workflow_task.py +++ b/flytekit/core/dynamic_workflow_task.py @@ -28,7 +28,7 @@ body is run to produce a workflow. It is almost as if the decorator changed from ``@task`` to ``@workflow`` except workflows cannot make use of their inputs like native Python values whereas dynamic workflows can. The resulting workflow is passed back to the Flyte engine and is -run as a :std:ref:`subworkflow `. Simple usage +run as a :std:ref:`subworkflow `. Simple usage .. code-block:: @@ -49,5 +49,5 @@ def my_dynamic_subwf(a: int, b: int) -> int: x = t1(a=a) return t2(b=b, x=x) -See the :std:ref:`cookbook ` for a longer discussion. +See the :std:ref:`cookbook ` for a longer discussion. """ # noqa: W293 diff --git a/flytekit/core/launch_plan.py b/flytekit/core/launch_plan.py index 32aa2e65e7..481871977e 100644 --- a/flytekit/core/launch_plan.py +++ b/flytekit/core/launch_plan.py @@ -16,6 +16,61 @@ class LaunchPlan(object): + """ + Launch Plans are one of the core constructs of Flyte. Please take a look at the discussion in the + :std:ref:`core concepts ` if you are unfamiliar with them. + + Every workflow is registered with a default launch plan, which is just a launch plan with none of the additional + attributes set - no default values, fixed values, schedules, etc. Assuming you have the following workflow + + .. code-block:: python + + @workflow + def wf(a: int, c: str) -> str: + ... + + Create the default launch plan with + + .. code-block:: python + + LaunchPlan.get_or_create(workflow=my_wf) + + If you specify additional parameters, you'll also have to give the launch plan a unique name. Default and + fixed inputs can be expressed as Python native values like so: + + .. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py + :start-after: # fixed_and_default_start + :end-before: # fixed_and_default_end + :language: python + :dedent: 4 + + Additionally, a launch plan can be configured to run on a schedule and emit notifications. + + + Please see the relevant Schedule and Notification objects as well. + + To configure the remaining parameters, you'll need to import the relevant model objects as well. + + .. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py + :start-after: # schedule_start + :end-before: # schedule_end + :language: python + :dedent: 4 + + .. code-block:: python + + from flytekit.models.common import Annotations, AuthRole, Labels, RawOutputDataConfig + + Then use as follows + + .. literalinclude:: ../../../tests/flytekit/unit/core/test_launch_plan.py + :start-after: # auth_role_start + :end-before: # auth_role_end + :language: python + :dedent: 4 + + """ + # The reason we cache is simply because users may get the default launch plan twice for a single Workflow. We # don't want to create two defaults, could be confusing. CACHE = {} @@ -185,7 +240,7 @@ def get_or_create( or raw_output_data_config != cached_outputs["_raw_output_data_config"] or max_parallelism != cached_outputs["_max_parallelism"] ): - return AssertionError("The cached values aren't the same as the current call arguments") + raise AssertionError("The cached values aren't the same as the current call arguments") return LaunchPlan.CACHE[name] elif name is None and workflow.name in LaunchPlan.CACHE: diff --git a/flytekit/core/map_task.py b/flytekit/core/map_task.py index 830a1d821d..89ef4343bd 100644 --- a/flytekit/core/map_task.py +++ b/flytekit/core/map_task.py @@ -206,16 +206,11 @@ def map_task(task_function: PythonFunctionTask, concurrency: int = None, min_suc Usage: - .. code-block:: python - - @task - def my_mappable_task(a: int) -> str: - return str(a) - - @workflows - def my_wf(x: typing.List[int]) -> typing.List[str]: - return flytekit.map(my_mappable_task, metadata=TaskMetadata(retries=1), requests=Resources(cpu="10M"), - concurrency=10, min_success_ratio=0.75)(a=x) + .. literalinclude:: ../../../tests/flytekit/unit/core/test_map_task.py + :start-after: # test_map_task_start + :end-before: # test_map_task_end + :language: python + :dedent: 4 At run time, the underlying map task will be run for every value in the input collection. Task-specific attributes such as :py:class:`flytekit.TaskMetadata` and :py:class:`flytekit.Resources` are applied to individual instances diff --git a/flytekit/core/notification.py b/flytekit/core/notification.py index aedff37982..2682a8ea2e 100644 --- a/flytekit/core/notification.py +++ b/flytekit/core/notification.py @@ -5,7 +5,7 @@ .. note:: Notifications require some setup and configuration on the Flyte platform side. Please contact your Flyte platform - admins to get this feature enabled. See :std:doc:`flyte:howto/notifications` + admins to get this feature enabled. See :std:ref:`cookbook:setting up workflow notifications` Each notification type takes a list of :py:class:`flytekit.models.core.execution.WorkflowExecutionPhase` and a list of emails. Even though there are different notification classes in this module, they all just send email. The differentiation @@ -59,6 +59,12 @@ def _validate_phases(self, phases: List[int]): class PagerDuty(Notification): """ This notification should be used when sending emails to the PagerDuty service. + + .. code-block:: python + + from flytekit.models.core.execution import WorkflowExecutionPhase + + PagerDuty(phases=[WorkflowExecutionPhase.SUCCEEDED], recipients_email=["my-team@email.com"]) """ def __init__(self, phases: List[int], recipients_email: List[str]): @@ -73,6 +79,12 @@ def __init__(self, phases: List[int], recipients_email: List[str]): class Email(Notification): """ This notification should be used when sending regular emails to people. + + .. code-block:: python + + from flytekit.models.core.execution import WorkflowExecutionPhase + + Email(phases=[WorkflowExecutionPhase.SUCCEEDED], recipients_email=["my-team@email.com"]) """ def __init__(self, phases: List[int], recipients_email: List[str]): @@ -87,6 +99,12 @@ def __init__(self, phases: List[int], recipients_email: List[str]): class Slack(Notification): """ This notification should be used when sending emails to the Slack. + + .. code-block:: python + + from flytekit.models.core.execution import WorkflowExecutionPhase + + Slack(phases=[WorkflowExecutionPhase.SUCCEEDED], recipients_email=["my-team@email.com"]) """ def __init__(self, phases: List[int], recipients_email: List[str]): diff --git a/flytekit/core/python_auto_container.py b/flytekit/core/python_auto_container.py index 97432d3958..38331d59bc 100644 --- a/flytekit/core/python_auto_container.py +++ b/flytekit/core/python_auto_container.py @@ -50,17 +50,17 @@ def __init__( :param task_resolver: Custom resolver - will pick up the default resolver if empty, or the resolver set in the compilation context if one is set. :param List[Secret] secret_requests: Secrets that are requested by this container execution. These secrets will - be mounted based on the configuration in the Secret and available through - the SecretManager using the name of the secret as the group - Ideally the secret keys should also be semi-descriptive. - The key values will be available from runtime, if the backend is configured - to provide secrets and if secrets are available in the configured secrets store. - Possible options for secret stores are - - `Vault `, - - `Confidant `, - - `Kube secrets ` - - `AWS Parameter store `_ - etc + be mounted based on the configuration in the Secret and available through + the SecretManager using the name of the secret as the group + Ideally the secret keys should also be semi-descriptive. + The key values will be available from runtime, if the backend is configured + to provide secrets and if secrets are available in the configured secrets store. + Possible options for secret stores are + + - `Vault `__ + - `Confidant `__ + - `Kube secrets `__ + - `AWS Parameter store `__ """ sec_ctx = None if secret_requests: @@ -108,6 +108,9 @@ def resources(self) -> ResourceSpec: return self._resources def get_default_command(self, settings: SerializationSettings) -> List[str]: + """ + Returns the default pyflyte-execute command used to run this on hosted Flyte platforms. + """ container_args = [ "pyflyte-execute", "--inputs", @@ -133,9 +136,17 @@ def set_command_fn(self, get_command_fn: Optional[Callable[[SerializationSetting self._get_command_fn = get_command_fn def reset_command_fn(self): + """ + Resets the command which should be used in the container definition of this task to the default arguments. + This is useful when the command line is overriden at serialization time. + """ self._get_command_fn = self.get_default_command def get_command(self, settings: SerializationSettings) -> List[str]: + """ + Returns the command which should be used in the container definition for the serialized version of this task + registered on a hosted Flyte platform. + """ return self._get_command_fn(settings) def get_container(self, settings: SerializationSettings) -> _task_model.Container: diff --git a/flytekit/core/python_customized_container_task.py b/flytekit/core/python_customized_container_task.py index a615f2f6f5..8e6afca698 100644 --- a/flytekit/core/python_customized_container_task.py +++ b/flytekit/core/python_customized_container_task.py @@ -23,19 +23,19 @@ class PythonCustomizedContainerTask(ExecutableTemplateShimTask, PythonTask[TC]): """ - Please take a look at the comments for ``ExecutableTemplateShimTask`` as well. This class should be subclassed - and a custom Executor provided as a default to this parent class constructor when building a new external-container - flytekit-only plugin. + Please take a look at the comments for :py:class`flytekit.extend.ExecutableTemplateShimTask` as well. This class + should be subclassed and a custom Executor provided as a default to this parent class constructor + when building a new external-container flytekit-only plugin. This class provides authors of new task types the basic scaffolding to create task-template based tasks. In order - to write such a task, authors need to: + to write such a task, authors need to - * subclass the ``ShimTaskExecutor`` class and override the ``execute_from_model`` function. This function is - where all the business logic should go. Keep in mind though that you, the plugin author, will not have access - to anything that's not serialized within the ``TaskTemplate`` which is why you'll also need to - * subclass this class, and override the ``get_custom`` function to include all the information the executor - will need to run. - * Also pass the executor you created as the ``executor_type`` argument of this class's constructor. + * subclass the ``ShimTaskExecutor`` class and override the ``execute_from_model`` function. This function is + where all the business logic should go. Keep in mind though that you, the plugin author, will not have access + to anything that's not serialized within the ``TaskTemplate`` which is why you'll also need to + * subclass this class, and override the ``get_custom`` function to include all the information the executor + will need to run. + * Also pass the executor you created as the ``executor_type`` argument of this class's constructor. Keep in mind that the total size of the ``TaskTemplate`` still needs to be small, since these will be accessed frequently by the Flyte engine. @@ -76,17 +76,16 @@ def __init__( :param limits: custom resource limit settings. :param environment: Environment variables you want the task to have when run. :param List[Secret] secret_requests: Secrets that are requested by this container execution. These secrets will - be mounted based on the configuration in the Secret and available through - the SecretManager using the name of the secret as the group - Ideally the secret keys should also be semi-descriptive. - The key values will be available from runtime, if the backend is configured - to provide secrets and if secrets are available in the configured secrets store. - Possible options for secret stores are - - `Vault `, - - `Confidant `, - - `Kube secrets ` - - `AWS Parameter store `_ - etc + be mounted based on the configuration in the Secret and available through + the SecretManager using the name of the secret as the group + Ideally the secret keys should also be semi-descriptive. + The key values will be available from runtime, if the backend is configured to provide secrets and + if secrets are available in the configured secrets store. Possible options for secret stores are + + - `Vault `__ + - `Confidant `__ + - `Kube secrets `__ + - `AWS Parameter store `__ """ sec_ctx = None if secret_requests: diff --git a/flytekit/core/python_function_task.py b/flytekit/core/python_function_task.py index 726e2f1d6f..43b8e864cb 100644 --- a/flytekit/core/python_function_task.py +++ b/flytekit/core/python_function_task.py @@ -1,3 +1,19 @@ +""" +========================================= +:mod:`flytekit.core.python_function_task` +========================================= + +.. currentmodule:: flytekit.core.python_function_task + +.. autosummary:: + :toctree: generated/ + + PythonFunctionTask + PythonInstanceTask + +""" + + import inspect from abc import ABC from collections import OrderedDict @@ -53,14 +69,17 @@ def __init__( task_resolver: Optional[TaskResolverMixin] = None, **kwargs, ): + """ + Please see class level documentation. + """ super().__init__(name=name, task_config=task_config, task_type=task_type, task_resolver=task_resolver, **kwargs) class PythonFunctionTask(PythonAutoContainerTask[T]): """ A Python Function task should be used as the base for all extensions that have a python function. It will - automatically detect interface of the python function and also, create the write execution command to execute the - function + automatically detect interface of the python function and when serialized on the hosted Flyte platform handles the + writing execution command to execute the function It is advised this task is used using the @task decorator as follows @@ -89,11 +108,13 @@ def __init__( **kwargs, ): """ - :param task_config: Configuration object for Task. Should be a unique type for that specific Task - :param task_function: Python function that has type annotations and works for the task - :param ignore_input_vars: When supplied, these input variables will be removed from the interface. This + :param T task_config: Configuration object for Task. Should be a unique type for that specific Task + :param Callable task_function: Python function that has type annotations and works for the task + :param Optional[List[str]] ignore_input_vars: When supplied, these input variables will be removed from the interface. This can be used to inject some client side variables only. Prefer using ExecutionParams - :param task_type: String task type to be associated with this Task + :param Optional[ExecutionBehavior] execution_mode: Defines how the execution should behave, for example + executing normally or specially handling a dynamic case. + :param Optional[TaskResolverMixin] task_type: String task type to be associated with this Task """ if task_function is None: raise ValueError("TaskFunction is a required parameter for PythonFunctionTask") @@ -142,6 +163,10 @@ def execute(self, **kwargs) -> Any: def compile_into_workflow( self, ctx: FlyteContext, task_function: Callable, **kwargs ) -> Union[_dynamic_job.DynamicJobSpec, _literal_models.LiteralMap]: + """ + In the case of dynamic workflows, this function will produce a workflow definition at execution time which will + then proceed to be executed. + """ if not ctx.compilation_state: cs = ctx.new_compilation_state("dynamic") else: diff --git a/flytekit/core/reference.py b/flytekit/core/reference.py index 840d48c3d1..ed16ba6353 100644 --- a/flytekit/core/reference.py +++ b/flytekit/core/reference.py @@ -18,6 +18,28 @@ def get_reference_entity( inputs: Dict[str, Type], outputs: Dict[str, Type], ): + """ + See the documentation for :py:class:`flytekit.reference_task` and :py:class:`flytekit.reference_workflow` as well. + + This function is the general form of the two aforementioned functions. It's better for programmatic usage, as + the interface is passed in as arguments instead of analyzed from type annotations. + + .. literalinclude:: ../../../tests/flytekit/unit/core/test_references.py + :start-after: # docs_ref_start + :end-before: # docs_ref_end + :language: python + :dedent: 4 + + :param resource_type: This is the type of entity it is. Must be one of + :py:class:`flytekit.models.core.identifier.ResourceType` + :param project: The project the entity you're looking for has been registered in. + :param domain: The domain the entity you're looking for has been registered in. + :param name: The name of the registered entity + :param version: The version the entity you're looking for has been registered with. + :param inputs: An ordered dictionary of input names as strings to their Python types. + :param outputs: An ordered dictionary of output names as strings to their Python types. + :return: + """ if resource_type == _identifier_model.ResourceType.TASK: return ReferenceTask(project, domain, name, version, inputs, outputs) elif resource_type == _identifier_model.ResourceType.WORKFLOW: diff --git a/flytekit/core/resources.py b/flytekit/core/resources.py index 14524eb9cb..c110a00e50 100644 --- a/flytekit/core/resources.py +++ b/flytekit/core/resources.py @@ -4,12 +4,15 @@ @dataclass class Resources(object): """ - This class is used to specify both resource requests and resource limits. :: + This class is used to specify both resource requests and resource limits. + + .. code-block:: python Resources(cpu="1", mem="2048") # This is 1 CPU and 2 KB of memory Resources(cpu="100m", mem="2Gi") # This is 1/10th of a CPU and 2 gigabytes of memory .. note:: + Storage is not currently supported on the Flyte backend. Please see the :std:ref:`User Guide ` for detailed examples. diff --git a/flytekit/core/schedule.py b/flytekit/core/schedule.py index c69e8ee105..aa9d35abb0 100644 --- a/flytekit/core/schedule.py +++ b/flytekit/core/schedule.py @@ -147,6 +147,18 @@ def _validate_offset(offset: str): class FixedRate(_schedule_models.Schedule): + """ + Use this class to schedule a fixed-rate interval for a launch plan. + + .. code-block:: python + + from datetime import timedelta + + FixedRate(duration=timedelta(minutes=10)) + + See the :std:ref:`fixed rate intervals` chapter in the cookbook for additional usage examples. + """ + def __init__(self, duration: datetime.timedelta, kickoff_time_input_arg: str = None): """ :param datetime.timedelta duration: diff --git a/flytekit/core/shim_task.py b/flytekit/core/shim_task.py index 5504e212ce..637cfe4b47 100644 --- a/flytekit/core/shim_task.py +++ b/flytekit/core/shim_task.py @@ -17,8 +17,9 @@ class ExecutableTemplateShimTask(object): This class, along with the ``ShimTaskExecutor`` class below, represents another execution pattern. This pattern, has two components: - * The ``TaskTemplate``, or something like it like a ``FlyteTask``. - * An executor, which can use information from the task template (including the ``custom`` field) + + * The ``TaskTemplate``, or something like it like a ``FlyteTask``. + * An executor, which can use information from the task template (including the ``custom`` field) Basically at execution time (both locally and on a Flyte cluster), the task template is given to the executor, which is responsible for computing and returning the results. diff --git a/flytekit/core/task.py b/flytekit/core/task.py index cb00264565..24fe283399 100644 --- a/flytekit/core/task.py +++ b/flytekit/core/task.py @@ -21,6 +21,20 @@ class TaskPlugins(object): TaskPlugins.register_pythontask_plugin(config_object_type, plugin_object_type) # config_object_type is any class that will be passed to the plugin_object as task_config # Plugin_object_type is a derivative of ``PythonFunctionTask`` + + Examples of available task plugins include different query-based plugins such as + :py:class:`flytekitplugins.athena.task.AthenaTask` and :py:class:`flytekitplugins.hive.task.HiveTask`, ML tools like + :py:class:`plugins.awssagemaker.flytekitplugins.awssagemaker.training.SagemakerBuiltinAlgorithmsTask`, kubeflow + operators like :py:class:`plugins.kfpytorch.flytekitplugins.kfpytorch.task.PyTorchFunctionTask` and + :py:class:`plugins.kftensorflow.flytekitplugins.kftensorflow.task.TensorflowFunctionTask`, and generic plugins like + :py:class:`flytekitplugins.pod.task.PodFunctionTask` which doesn't integrate with third party tools or services. + + The `task_config` is different for every task plugin type. This is filled out by users when they define a task to + specify plugin-specific behavior and features. For example, with a query type task plugin, the config might store + information related to which database to query. + + The `plugin_object_type` can be used to customize execution behavior and task serialization properties in tandem + with the `task_config`. """ _PYTHONFUNCTION_TASK_PLUGINS: Dict[type, Type[PythonFunctionTask]] = {} @@ -214,6 +228,12 @@ def reference_task( A reference task is a pointer to a task that already exists on your Flyte installation. This object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface. If at registration time the interface provided causes an issue with compilation, an error will be returned. + + Example: + + .. literalinclude:: ../../../tests/flytekit/unit/core/test_references.py + :pyobject: ref_t1 + """ def wrapper(fn) -> ReferenceTask: diff --git a/flytekit/core/type_engine.py b/flytekit/core/type_engine.py index dec2d76816..df175579c0 100644 --- a/flytekit/core/type_engine.py +++ b/flytekit/core/type_engine.py @@ -67,6 +67,9 @@ def get_literal_type(self, t: Type[T]) -> LiteralType: raise NotImplementedError("Conversion to LiteralType should be implemented") def guess_python_type(self, literal_type: LiteralType) -> Type[T]: + """ + Converts the Flyte LiteralType to a python object type. + """ raise ValueError("By default, transformers do not translate from Flyte types back to Python types") @abstractmethod @@ -348,11 +351,17 @@ def get_transformer(cls, python_type: Type) -> TypeTransformer[T]: @classmethod def to_literal_type(cls, python_type: Type) -> LiteralType: + """ + Converts a python type into a flyte specific ``LiteralType`` + """ transformer = cls.get_transformer(python_type) return transformer.get_literal_type(python_type) @classmethod def to_literal(cls, ctx: FlyteContext, python_val: typing.Any, python_type: Type, expected: LiteralType) -> Literal: + """ + Converts a python value of a given type and expected ``LiteralType`` into a resolved ``Literal`` value. + """ if python_val is None: raise AssertionError(f"Python value cannot be None, expected {python_type}/{expected}") transformer = cls.get_transformer(python_type) @@ -362,11 +371,17 @@ def to_literal(cls, ctx: FlyteContext, python_val: typing.Any, python_type: Type @classmethod def to_python_value(cls, ctx: FlyteContext, lv: Literal, expected_python_type: Type) -> typing.Any: + """ + Converts a Literal value with an expected python type into a python value. + """ transformer = cls.get_transformer(expected_python_type) return transformer.to_python_value(ctx, lv, expected_python_type) @classmethod def named_tuple_to_variable_map(cls, t: typing.NamedTuple) -> _interface_models.VariableMap: + """ + Converts a python-native ``NamedTuple`` to a flyte-specific VariableMap of named literals. + """ variables = {} for idx, (var_name, var_type) in enumerate(t.__annotations__.items()): literal_type = cls.to_literal_type(var_type) @@ -378,7 +393,7 @@ def literal_map_to_kwargs( cls, ctx: FlyteContext, lm: LiteralMap, python_types: typing.Dict[str, type] ) -> typing.Dict[str, typing.Any]: """ - Given a literal Map (usually an input into a task - intermediate), convert to kwargs for the task + Given a ``LiteralMap`` (usually an input into a task - intermediate), convert to kwargs for the task """ if len(lm.literals) != len(python_types): raise ValueError( @@ -413,6 +428,9 @@ def get_available_transformers(cls) -> typing.KeysView[Type]: def guess_python_types( cls, flyte_variable_dict: typing.Dict[str, _interface_models.Variable] ) -> typing.Dict[str, type]: + """ + Transforms a dictionary of flyte-specific ``Variable`` objects to a dictionary of regular python values. + """ python_types = {} for k, v in flyte_variable_dict.items(): python_types[k] = cls.guess_python_type(v.type) @@ -420,6 +438,9 @@ def guess_python_types( @classmethod def guess_python_type(cls, flyte_type: LiteralType) -> type: + """ + Transforms a flyte-specific ``LiteralType`` to a regular python value. + """ for _, transformer in cls._REGISTRY.items(): try: return transformer.guess_python_type(flyte_type) @@ -493,9 +514,15 @@ def get_dict_types(t: Type[dict]) -> (type, type): @staticmethod def dict_to_generic_literal(v: dict) -> Literal: + """ + Creates a flyte-specific ``Literal`` value from a native python dictionary. + """ return Literal(scalar=Scalar(generic=_json_format.Parse(_json.dumps(v), _struct.Struct()))) def get_literal_type(self, t: Type[dict]) -> LiteralType: + """ + Transforms a native python dictionary to a flyte-specific ``LiteralType`` + """ tp = self.get_dict_types(t) if tp: if tp[0] == str: diff --git a/flytekit/core/workflow.py b/flytekit/core/workflow.py index bc103d25c1..e2fba62704 100644 --- a/flytekit/core/workflow.py +++ b/flytekit/core/workflow.py @@ -55,7 +55,15 @@ class WorkflowFailurePolicy(Enum): + """ + Defines the behavior for a workflow execution in the case of an observed node execution failure. By default, a + workflow execution will immediately enter a failed state if a component node fails. + """ + + #: Causes the entire workflow execution to fail once a component node fails. FAIL_IMMEDIATELY = _workflow_model.WorkflowMetadata.OnFailurePolicy.FAIL_IMMEDIATELY + + #: Will proceed to run any remaining runnable nodes once a component node fails. FAIL_AFTER_EXECUTABLE_NODES_COMPLETE = ( _workflow_model.WorkflowMetadata.OnFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE ) @@ -360,6 +368,29 @@ def _local_execute(self, ctx: FlyteContext, **kwargs) -> Union[Tuple[Promise], P class ImperativeWorkflow(WorkflowBase): + """ + An imperative workflow is a programmatic analogue to the typical ``@workflow`` function-based workflow and is + better suited to programmatic applications. + + Assuming you have some tasks like so + + .. literalinclude:: ../../../tests/flytekit/unit/core/test_imperative.py + :start-after: # docs_tasks_start + :end-before: # docs_tasks_end + :language: python + :dedent: 4 + + You could create a workflow imperatively like so + + .. literalinclude:: ../../../tests/flytekit/unit/core/test_imperative.py + :start-after: # docs_start + :end-before: # docs_start + :language: python + :dedent: 4 + + This workflow would be identical on the backed to the + """ + def __init__( self, name: str, @@ -721,14 +752,25 @@ def workflow( This decorator declares a function to be a Flyte workflow. Workflows are declarative entities that construct a DAG of tasks using the data flow between tasks. - Unlike a task, the function body of a workflow is evaluated at serialization-time (aka compile-time). This is because - while we can determine the entire structure of a task by looking at the function's signature, - workflows need to run through the function itself because the body of the function is what expresses the workflow structure. - It's also important to note that, local execution notwithstanding, it is not evaluated again when the workflow runs on Flyte. + Unlike a task, the function body of a workflow is evaluated at serialization-time (aka compile-time). This is + because while we can determine the entire structure of a task by looking at the function's signature, workflows need + to run through the function itself because the body of the function is what expresses the workflow structure. It's + also important to note that, local execution notwithstanding, it is not evaluated again when the workflow runs on + Flyte. That is, workflows should not call non-Flyte entities since they are only run once (again, this is with respect to the platform, local runs notwithstanding). - Please see the :std:doc:`cookbook:sphx_glr_auto_core_flyte_basics_basic_workflow.py` for more usage examples. + Example: + + .. literalinclude:: ../../../tests/flytekit/unit/core/test_workflows.py + :pyobject: my_wf_example + + Again, users should keep in mind that even though the body of the function looks like regular Python, it is + actually not. When flytekit scans the workflow function, the objects being passed around between the tasks are not + your typical Python values. So even though you may have a task ``t1() -> int``, when ``a = t1()`` is called, ``a`` + will not be an integer so if you try to ``range(a)`` you'll get an error. + + Please see the :std:doc:`user guide ` for more usage examples. :param _workflow_function: This argument is implicitly passed and represents the decorated function. :param failure_policy: Use the options in flytekit.WorkflowFailurePolicy @@ -775,6 +817,11 @@ def reference_workflow( A reference workflow is a pointer to a workflow that already exists on your Flyte installation. This object will not initiate a network call to Admin, which is why the user is asked to provide the expected interface. If at registration time the interface provided causes an issue with compilation, an error will be returned. + + Example: + + .. literalinclude:: ../../../tests/flytekit/unit/core/test_references.py + :pyobject: ref_wf1 """ def wrapper(fn) -> ReferenceWorkflow: diff --git a/flytekit/extend/__init__.py b/flytekit/extend/__init__.py index ef0b69d5df..1a89f866fa 100644 --- a/flytekit/extend/__init__.py +++ b/flytekit/extend/__init__.py @@ -26,7 +26,9 @@ T TypeEngine TypeTransformer - + PythonCustomizedContainerTask + ExecutableTemplateShimTask + ShimTaskExecutor """ from flytekit.common.translator import get_serializable @@ -37,5 +39,7 @@ from flytekit.core.context_manager import ExecutionState, Image, ImageConfig, SerializationSettings from flytekit.core.interface import Interface from flytekit.core.promise import Promise +from flytekit.core.python_customized_container_task import PythonCustomizedContainerTask +from flytekit.core.shim_task import ExecutableTemplateShimTask, ShimTaskExecutor from flytekit.core.task import TaskPlugins from flytekit.core.type_engine import DictTransformer, T, TypeEngine, TypeTransformer diff --git a/flytekit/extras/sqlite3/task.py b/flytekit/extras/sqlite3/task.py index e814f2883a..391b88035b 100644 --- a/flytekit/extras/sqlite3/task.py +++ b/flytekit/extras/sqlite3/task.py @@ -37,13 +37,14 @@ class SQLite3Config(object): Use this configuration to configure if sqlite3 files that should be loaded by the task. The file itself is considered as a database and hence is treated like a configuration The path to a static sqlite3 compatible database file can be - - within the container - - or from a publicly downloadable source + + - within the container + - or from a publicly downloadable source Args: uri: default FlyteFile that will be downloaded on execute compressed: Boolean that indicates if the given file is a compressed archive. Supported file types are - [zip, tar, gztar, bztar, xztar] + [zip, tar, gztar, bztar, xztar] """ uri: str @@ -52,10 +53,21 @@ class SQLite3Config(object): class SQLite3Task(PythonCustomizedContainerTask[SQLite3Config], SQLTask[SQLite3Config]): """ - Makes it possible to run client side SQLite3 queries that optionally return a FlyteSchema object + Run client side SQLite3 queries that optionally return a FlyteSchema object. + + .. note:: + + This is a pre-built container task. That is, your user container will not be used at task execution time. + Instead the image defined in this task definition will be used instead. + + .. literalinclude:: ../../../tests/flytekit/unit/extras/sqlite3/test_task.py + :start-after: # sqlite3_start + :end-before: # sqlite3_end + :language: python + :dedent: 4 - TODO: How should we use pre-built containers for running portable tasks like this. Should this always be a - referenced task type? + See the :std:ref:`cookbook ` for additional usage examples and + the base class :py:class:`flytekit.extend.PythonCustomizedContainerTask` as well. """ _SQLITE_TASK_TYPE = "sqlite" diff --git a/flytekit/models/security.py b/flytekit/models/security.py index e4ea655e22..1973967b20 100644 --- a/flytekit/models/security.py +++ b/flytekit/models/security.py @@ -10,6 +10,8 @@ @dataclass class Secret(_common.FlyteIdlEntity): """ + See :std:ref:`cookbook:secrets` for usage examples. + Args: group is the Name of the secret. For example in kubernetes secrets is the name of the secret key is optional and can be an individual secret identifier within the secret For k8s this is required @@ -138,6 +140,11 @@ def from_flyte_idl(cls, pb2_object: _sec.OAuth2TokenRequest) -> "OAuth2TokenRequ @dataclass class SecurityContext(_common.FlyteIdlEntity): + """ + This is a higher level wrapper object that for the most part users shouldn't have to worry about. You should + be able to just use :py:class:`flytekit.Secret` instead. + """ + run_as: Optional[Identity] = None secrets: Optional[List[Secret]] = None tokens: Optional[List[OAuth2TokenRequest]] = None diff --git a/flytekit/types/directory/__init__.py b/flytekit/types/directory/__init__.py index 4d59b28254..4edb5f9205 100644 --- a/flytekit/types/directory/__init__.py +++ b/flytekit/types/directory/__init__.py @@ -1,5 +1,5 @@ """ -Flytekit Directory Type (:mod:`flytekit.types.directory`) +Flytekit Directory Type ========================================================== .. currentmodule:: flytekit.types.directory diff --git a/flytekit/types/file/__init__.py b/flytekit/types/file/__init__.py index caa7d7d84c..f14015b275 100644 --- a/flytekit/types/file/__init__.py +++ b/flytekit/types/file/__init__.py @@ -1,5 +1,5 @@ """ -Flytekit File Type (:mod:`flytekit.types.file`) +Flytekit File Type ========================================================== .. currentmodule:: flytekit.types.file diff --git a/flytekit/types/file/file.py b/flytekit/types/file/file.py index e445b57c3f..90394e1f0a 100644 --- a/flytekit/types/file/file.py +++ b/flytekit/types/file/file.py @@ -178,7 +178,7 @@ def path(self) -> str: @property def remote_source(self) -> str: """ - If this is an input to a task, and the original path is s3://something, flytekit will download the + If this is an input to a task, and the original path is ``s3://something``, flytekit will download the file for the user. In case the user wants access to the original path, it will be here. """ return self._remote_source diff --git a/flytekit/types/schema/__init__.py b/flytekit/types/schema/__init__.py index 8e83456958..f2c3020722 100644 --- a/flytekit/types/schema/__init__.py +++ b/flytekit/types/schema/__init__.py @@ -1,5 +1,5 @@ """ -Flytekit Schema Type (:mod:`flytekit.types.schema`) +Flytekit Schema Type ========================================================== .. currentmodule:: flytekit.types.schema diff --git a/plugins/flytekit-athena/flytekitplugins/athena/task.py b/plugins/flytekit-athena/flytekitplugins/athena/task.py index 5399483d61..45ece39106 100644 --- a/plugins/flytekit-athena/flytekitplugins/athena/task.py +++ b/plugins/flytekit-athena/flytekitplugins/athena/task.py @@ -40,14 +40,15 @@ def __init__( **kwargs, ): """ - Args: - name: Name of this task, should be unique in the project - config: Type AthenaConfig object - inputs: Name and type of inputs specified as an ordered dictionary - query_template: The actual query to run. We use Flyte's Golang templating format for Query templating. - Refer to the templating documentation - output_schema_type: If some data is produced by this query, then you can specify the output schema type - **kwargs: All other args required by Parent type - SQLTask + To be used to query Athena databases. + + :param name: Name of this task, should be unique in the project + :param query_template: The actual query to run. We use Flyte's Golang templating format for Query templating. + Refer to the templating documentation + :param task_config: AthenaConfig object + :param inputs: Name and type of inputs specified as an ordered dictionary + :param output_schema_type: If some data is produced by this query, then you can specify the output schema type + :param kwargs: All other args required by Parent type - SQLTask """ outputs = None if output_schema_type is not None: diff --git a/tests/flytekit/unit/core/test_imperative.py b/tests/flytekit/unit/core/test_imperative.py index 8bb50ae5a9..250c0a0e11 100644 --- a/tests/flytekit/unit/core/test_imperative.py +++ b/tests/flytekit/unit/core/test_imperative.py @@ -4,13 +4,13 @@ import pandas as pd import pytest -from flytekit import Workflow, kwtypes, reference_task from flytekit.common.exceptions.user import FlyteValidationException from flytekit.common.translator import get_serializable from flytekit.core import context_manager +from flytekit.core.base_task import kwtypes from flytekit.core.context_manager import Image, ImageConfig from flytekit.core.launch_plan import LaunchPlan -from flytekit.core.task import task +from flytekit.core.task import reference_task, task from flytekit.core.workflow import ImperativeWorkflow, get_promise, workflow from flytekit.extras.sqlite3.task import SQLite3Config, SQLite3Task from flytekit.models import literals as literal_models @@ -27,7 +27,12 @@ ) +# This is used for docs def test_imperative(): + # Re import with alias + from flytekit.core.workflow import ImperativeWorkflow as Workflow # noqa + + # docs_tasks_start @task def t1(a: str) -> str: return a + " world" @@ -36,11 +41,20 @@ def t1(a: str) -> str: def t2(): print("side effect") - wb = ImperativeWorkflow(name="my.workflow") + # docs_tasks_end + + # docs_start + # Create the workflow with a name. This needs to be unique within the project and takes the place of the function + # name that's used for regular decorated function-based workflows. + wb = Workflow(name="my.workflow") + # Adds a top level input to the workflow. This is like an input to a workflow function. wb.add_workflow_input("in1", str) + # Call your tasks. node = wb.add_entity(t1, a=wb.inputs["in1"]) wb.add_entity(t2) + # This is analagous to a return statement wb.add_workflow_output("from_n0t1", node.outputs["o0"]) + # docs_start assert wb(in1="hello") == "hello world" @@ -291,7 +305,7 @@ def ref_t2( ) -> pd.DataFrame: ... - wb = Workflow(name="core.feature_engineering.workflow.fe_wf") + wb = ImperativeWorkflow(name="core.feature_engineering.workflow.fe_wf") wb.add_workflow_input("sqlite_archive", FlyteFile[typing.TypeVar("sqlite")]) sql_task = SQLite3Task( name="dummy.sqlite.task", diff --git a/tests/flytekit/unit/core/test_launch_plan.py b/tests/flytekit/unit/core/test_launch_plan.py index 9171889233..6da9dae876 100644 --- a/tests/flytekit/unit/core/test_launch_plan.py +++ b/tests/flytekit/unit/core/test_launch_plan.py @@ -24,6 +24,58 @@ ) +# This test is used for documentation. +def test_lp_documentation(): + @task + def t1(a: int) -> typing.NamedTuple("OutputsBC", t1_int_output=int, c=str): + a = a + 2 + return a, "world-" + str(a) + + @workflow + def wf(a: int, c: str) -> (int, str): + x, y = t1(a=a) + return x, y + + # fixed_and_default_start + launch_plan.LaunchPlan.get_or_create( + workflow=wf, name="your_lp_name_1", default_inputs={"a": 3}, fixed_inputs={"c": "4"} + ) + # fixed_and_default_end + + # schedule_start + sched = CronSchedule("* * ? * * *", kickoff_time_input_arg="abc") + email_notif = notification.Email( + phases=[_execution_model.WorkflowExecutionPhase.SUCCEEDED], recipients_email=["my-team@email.com"] + ) + launch_plan.LaunchPlan.get_or_create( + workflow=wf, name="your_lp_name_2", schedule=sched, notifications=[email_notif] + ) + # schedule_end + + # auth_role_start + auth_role_model = AuthRole(assumable_iam_role="my:iam:role") + launch_plan.LaunchPlan.get_or_create( + workflow=wf, + name="your_lp_name_3", + ) + + labels_model = Labels({"label": "foo"}) + annotations_model = Annotations({"annotate": "bar"}) + launch_plan.LaunchPlan.get_or_create( + workflow=wf, + name="your_lp_name_4", + auth_role=auth_role_model, + labels=labels_model, + annotations=annotations_model, + ) + + raw_output_data_config = RawOutputDataConfig("s3://foo/output") + launch_plan.LaunchPlan.get_or_create( + workflow=wf, name="your_lp_name_5", raw_output_data_config=raw_output_data_config + ) + # auth_role_end + + def test_lp(): @task def t1(a: int) -> typing.NamedTuple("OutputsBC", t1_int_output=int, c=str): @@ -64,13 +116,9 @@ def wf(a: int, c: str) -> (int, str): return x, y # Fixed Inputs Parameter - fixed_lp = launch_plan.LaunchPlan.get_or_create( - workflow=wf, name="get_or_create_fixed", fixed_inputs={"a": 1, "c": "4"} - ) - fixed_lp1 = launch_plan.LaunchPlan.get_or_create(workflow=wf, name="get_or_create_fixed") - + launch_plan.LaunchPlan.get_or_create(workflow=wf, name="get_or_create_fixed", fixed_inputs={"a": 1, "c": "4"}) with pytest.raises(AssertionError): - assert fixed_lp is fixed_lp1 + launch_plan.LaunchPlan.get_or_create(workflow=wf, name="get_or_create_fixed") # Schedule Parameter obj = CronSchedule("* * ? * * *", kickoff_time_input_arg="abc") @@ -80,26 +128,21 @@ def wf(a: int, c: str) -> (int, str): assert schedule_lp is schedule_lp1 # Default Inputs Parameter - default_lp = launch_plan.LaunchPlan.get_or_create( - workflow=wf, name="get_or_create_schedule", default_inputs={"a": 9} - ) - default_lp1 = launch_plan.LaunchPlan.get_or_create( - workflow=wf, name="get_or_create_schedule", default_inputs={"a": 19} - ) - - # Validates both schedule and default inputs owing to the same launch plan + launch_plan.LaunchPlan.get_or_create(workflow=wf, name="get_or_create_schedule_def_inputs", default_inputs={"a": 9}) with pytest.raises(AssertionError): - assert default_lp is default_lp1 + launch_plan.LaunchPlan.get_or_create( + workflow=wf, name="get_or_create_schedule_def_inputs", default_inputs={"a": 19} + ) # Notifications Parameter email_notif = notification.Email( phases=[_execution_model.WorkflowExecutionPhase.SUCCEEDED], recipients_email=["my-team@email.com"] ) notification_lp = launch_plan.LaunchPlan.get_or_create( - workflow=wf, name="get_or_create_notification", notifications=email_notif + workflow=wf, name="get_or_create_notification", notifications=[email_notif] ) notification_lp1 = launch_plan.LaunchPlan.get_or_create( - workflow=wf, name="get_or_create_notification", notifications=email_notif + workflow=wf, name="get_or_create_notification", notifications=[email_notif] ) assert notification_lp is notification_lp1 @@ -107,11 +150,95 @@ def wf(a: int, c: str) -> (int, str): # Auth Parameter auth_role_model1 = AuthRole(assumable_iam_role="my:iam:role") auth_role_model2 = _launch_plan_idl.Auth(kubernetes_service_account="my:service:account") - auth_lp = launch_plan.LaunchPlan.get_or_create(workflow=wf, name="get_or_create_auth", auth_role=auth_role_model1) - auth_lp1 = launch_plan.LaunchPlan.get_or_create(workflow=wf, name="get_or_create_auth", auth_role=auth_role_model2) - + launch_plan.LaunchPlan.get_or_create(workflow=wf, name="get_or_create_auth", auth_role=auth_role_model1) with pytest.raises(AssertionError): - assert auth_lp is auth_lp1 + launch_plan.LaunchPlan.get_or_create(workflow=wf, name="get_or_create_auth", auth_role=auth_role_model2) + + # Labels parameters + labels_model1 = Labels({"label": "foo"}) + labels_model2 = Labels({"label": "foo"}) + labels_lp1 = launch_plan.LaunchPlan.get_or_create(workflow=wf, name="get_or_create_labels", labels=labels_model1) + labels_lp2 = launch_plan.LaunchPlan.get_or_create(workflow=wf, name="get_or_create_labels", labels=labels_model2) + assert labels_lp1 is labels_lp2 + + # Annotations parameters + annotations_model1 = Annotations({"anno": "bar"}) + annotations_model2 = Annotations({"anno": "bar"}) + annotations_lp1 = launch_plan.LaunchPlan.get_or_create( + workflow=wf, name="get_or_create_annotations", annotations=annotations_model1 + ) + annotations_lp2 = launch_plan.LaunchPlan.get_or_create( + workflow=wf, name="get_or_create_annotations", annotations=annotations_model2 + ) + assert annotations_lp1 is annotations_lp2 + + # Raw output prefix parameters + raw_output_data_config1 = RawOutputDataConfig("s3://foo/output") + raw_output_data_config2 = RawOutputDataConfig("s3://foo/output") + raw_output_data_config_lp1 = launch_plan.LaunchPlan.get_or_create( + workflow=wf, name="get_or_create_raw_output_prefix", raw_output_data_config=raw_output_data_config1 + ) + raw_output_data_config_lp2 = launch_plan.LaunchPlan.get_or_create( + workflow=wf, name="get_or_create_raw_output_prefix", raw_output_data_config=raw_output_data_config2 + ) + assert raw_output_data_config_lp1 is raw_output_data_config_lp2 + + # Max parallelism + max_parallelism = 100 + max_parallelism_lp1 = launch_plan.LaunchPlan.get_or_create( + workflow=wf, + name="get_or_create_max_parallelism", + max_parallelism=max_parallelism, + ) + max_parallelism_lp2 = launch_plan.LaunchPlan.get_or_create( + workflow=wf, + name="get_or_create_max_parallelism", + max_parallelism=max_parallelism, + ) + assert max_parallelism_lp1 is max_parallelism_lp2 + + # Labels parameters + labels_model1 = Labels({"label": "foo"}) + labels_model2 = Labels({"label": "foo"}) + labels_lp1 = launch_plan.LaunchPlan.get_or_create(workflow=wf, name="get_or_create_labels", labels=labels_model1) + labels_lp2 = launch_plan.LaunchPlan.get_or_create(workflow=wf, name="get_or_create_labels", labels=labels_model2) + assert labels_lp1 is labels_lp2 + + # Annotations parameters + annotations_model1 = Annotations({"anno": "bar"}) + annotations_model2 = Annotations({"anno": "bar"}) + annotations_lp1 = launch_plan.LaunchPlan.get_or_create( + workflow=wf, name="get_or_create_annotations", annotations=annotations_model1 + ) + annotations_lp2 = launch_plan.LaunchPlan.get_or_create( + workflow=wf, name="get_or_create_annotations", annotations=annotations_model2 + ) + assert annotations_lp1 is annotations_lp2 + + # Raw output prefix parameters + raw_output_data_config1 = RawOutputDataConfig("s3://foo/output") + raw_output_data_config2 = RawOutputDataConfig("s3://foo/output") + raw_output_data_config_lp1 = launch_plan.LaunchPlan.get_or_create( + workflow=wf, name="get_or_create_raw_output_prefix", raw_output_data_config=raw_output_data_config1 + ) + raw_output_data_config_lp2 = launch_plan.LaunchPlan.get_or_create( + workflow=wf, name="get_or_create_raw_output_prefix", raw_output_data_config=raw_output_data_config2 + ) + assert raw_output_data_config_lp1 is raw_output_data_config_lp2 + + # Max parallelism + max_parallelism = 100 + max_parallelism_lp1 = launch_plan.LaunchPlan.get_or_create( + workflow=wf, + name="get_or_create_max_parallelism", + max_parallelism=max_parallelism, + ) + max_parallelism_lp2 = launch_plan.LaunchPlan.get_or_create( + workflow=wf, + name="get_or_create_max_parallelism", + max_parallelism=max_parallelism, + ) + assert max_parallelism_lp1 is max_parallelism_lp2 # Labels parameters labels_model1 = Labels({"label": "foo"}) @@ -164,8 +291,10 @@ def wf(a: int, c: str) -> (int, str): def test_lp_all_parameters(): + nt = typing.NamedTuple("OutputsBC", t1_int_output=int, c=str) + @task - def t1(a: int) -> typing.NamedTuple("OutputsBC", t1_int_output=int, c=str): + def t1(a: int) -> nt: a = a + 2 return a, "world-" + str(a) @@ -195,7 +324,7 @@ def wf(a: int, c: str) -> str: default_inputs={"a": 3}, fixed_inputs={"c": "4"}, schedule=obj, - notifications=slack_notif, + notifications=[slack_notif], auth_role=auth_role_model, labels=labels, annotations=annotations, @@ -207,7 +336,7 @@ def wf(a: int, c: str) -> str: default_inputs={"a": 3}, fixed_inputs={"c": "4"}, schedule=obj, - notifications=slack_notif, + notifications=[slack_notif], auth_role=auth_role_model, labels=labels, annotations=annotations, @@ -216,22 +345,19 @@ def wf(a: int, c: str) -> str: assert lp is lp2 - # Check for assertion error when a different scheduler is used - lp3 = launch_plan.LaunchPlan.get_or_create( - workflow=wf, - name="get_or_create", - default_inputs={"a": 3}, - fixed_inputs={"c": "4"}, - schedule=obj1, - notifications=slack_notif, - auth_role=auth_role_model, - labels=labels, - annotations=annotations, - raw_output_data_config=raw_output_data_config, - ) - with pytest.raises(AssertionError): - assert lp is lp3 + launch_plan.LaunchPlan.get_or_create( + workflow=wf, + name="get_or_create", + default_inputs={"a": 3}, + fixed_inputs={"c": "4"}, + schedule=obj1, + notifications=[slack_notif], + auth_role=auth_role_model, + labels=labels, + annotations=annotations, + raw_output_data_config=raw_output_data_config, + ) def test_lp_nodes(): diff --git a/tests/flytekit/unit/core/test_map_task.py b/tests/flytekit/unit/core/test_map_task.py index beefbe5288..80ab499416 100644 --- a/tests/flytekit/unit/core/test_map_task.py +++ b/tests/flytekit/unit/core/test_map_task.py @@ -3,7 +3,7 @@ import pytest -from flytekit import LaunchPlan, map_task +from flytekit import LaunchPlan, Resources, map_task from flytekit.common.translator import get_serializable from flytekit.core import context_manager from flytekit.core.context_manager import Image, ImageConfig @@ -18,6 +18,29 @@ def t1(a: int) -> str: return str(b) +# This test is for documentation. +def test_map_docs(): + # test_map_task_start + @task + def my_mappable_task(a: int) -> str: + return str(a) + + @workflow + def my_wf(x: typing.List[int]) -> typing.List[str]: + return map_task( + my_mappable_task, + metadata=TaskMetadata(retries=1), + requests=Resources(cpu="10M"), + concurrency=10, + min_success_ratio=0.75, + )(a=x) + + # test_map_task_end + + res = my_wf(x=[1, 2, 3]) + assert res == ["1", "2", "3"] + + def test_map_task_types(): strs = map_task(t1, metadata=TaskMetadata(retries=1))(a=[5, 6]) assert strs == ["7", "8"] diff --git a/tests/flytekit/unit/core/test_references.py b/tests/flytekit/unit/core/test_references.py index f846ffdd01..900cb2046a 100644 --- a/tests/flytekit/unit/core/test_references.py +++ b/tests/flytekit/unit/core/test_references.py @@ -18,16 +18,41 @@ from flytekit.models.core import identifier as _identifier_model -def test_ref(): - @reference_task( - project="flytesnacks", - domain="development", - name="recipes.aaa.simple.join_strings", - version="553018f39e519bdb2597b652639c30ce16b99c79", +# This is used for docs +def test_ref_docs(): + # docs_ref_start + ref_entity = get_reference_entity( + _identifier_model.ResourceType.WORKFLOW, + "project", + "dev", + "my.other.workflow", + "abc123", + inputs=kwtypes(a=str, b=int), + outputs={}, ) - def ref_t1(a: typing.List[str]) -> str: - ... + # docs_ref_end + with pytest.raises(Exception) as e: + ref_entity() + assert "You must mock this out" in f"{e}" + + +@reference_task( + project="flytesnacks", + domain="development", + name="recipes.aaa.simple.join_strings", + version="553018f39e519bdb2597b652639c30ce16b99c79", +) +def ref_t1(a: typing.List[str]) -> str: + """ + The empty function acts as a convenient skeleton to make it intuitive to call/reference this task from workflows. + The interface of the task must match that of the remote task. Otherwise, remote compilation of the workflow will + fail. + """ + ... + + +def test_ref(): assert ref_t1.id.project == "flytesnacks" assert ref_t1.id.domain == "development" assert ref_t1.id.name == "recipes.aaa.simple.join_strings" @@ -77,16 +102,17 @@ def wf1(in1: typing.List[str]) -> str: assert wf1(in1=["hello", "world"]) == "hello" +@reference_workflow(project="proj", domain="developement", name="wf_name", version="abc") +def ref_wf1(a: int) -> (str, str): + ... + + def test_reference_workflow(): @task def t1(a: int) -> typing.NamedTuple("OutputsBC", t1_int_output=int, c=str): a = a + 2 return a, "world-" + str(a) - @reference_workflow(project="proj", domain="developement", name="wf_name", version="abc") - def ref_wf1(a: int) -> (str, str): - ... - @workflow def my_wf(a: int, b: str) -> (int, str, str): x, y = t1(a=a).with_overrides() diff --git a/tests/flytekit/unit/core/test_workflows.py b/tests/flytekit/unit/core/test_workflows.py index 724f27fb4b..865861243e 100644 --- a/tests/flytekit/unit/core/test_workflows.py +++ b/tests/flytekit/unit/core/test_workflows.py @@ -193,3 +193,52 @@ def wf2() -> int: assert len(sub_wf.nodes) == 1 assert sub_wf.nodes[0].id == "wf2-n0" assert sub_wf.nodes[0].task_node.reference_id.name == "test_workflows.t1" + + +@task +def add_5(a: int) -> int: + a = a + 5 + return a + + +@workflow +def simple_wf() -> int: + return add_5(a=1) + + +@workflow +def my_wf_example(a: int) -> (int, int): + """ + Workflows can have inputs and return outputs of simple or complex types. + """ + + x = add_5(a=a) + + # You can use outputs of a previous task as inputs to other nodes. + z = add_5(a=x) + + # You can call other workflows from within this workflow + d = simple_wf() + + # You can add conditions that can run on primitive types and execute different branches + e = conditional("bool").if_(a == 5).then(add_5(a=d)).else_().then(add_5(a=z)) + + # Outputs of the workflow have to be outputs returned by prior nodes. + # No outputs and single or multiple outputs are supported + return x, e + + +def test_all_node_types(): + assert my_wf_example(a=1) == (6, 16) + entity_mapping = OrderedDict() + + model_wf = get_serializable(entity_mapping, serialization_settings, my_wf_example) + + assert len(model_wf.template.interface.outputs) == 2 + assert len(model_wf.template.nodes) == 4 + assert model_wf.template.nodes[2].workflow_node is not None + + sub_wf = model_wf.sub_workflows[0] + assert len(sub_wf.nodes) == 1 + assert sub_wf.nodes[0].id == "n0" + assert sub_wf.nodes[0].task_node.reference_id.name == "test_workflows.add_5" diff --git a/tests/flytekit/unit/extras/sqlite3/test_task.py b/tests/flytekit/unit/extras/sqlite3/test_task.py index 21eedcce2e..365a55cee8 100644 --- a/tests/flytekit/unit/extras/sqlite3/test_task.py +++ b/tests/flytekit/unit/extras/sqlite3/test_task.py @@ -27,16 +27,20 @@ def test_task_static(): def test_task_schema(): + # sqlite3_start + DB_LOCATION = "https://cdn.sqlitetutorial.net/wp-content/uploads/2018/03/chinook.zip" + sql_task = SQLite3Task( "test", query_template="select TrackId, Name from tracks limit {{.inputs.limit}}", inputs=kwtypes(limit=int), output_schema_type=FlyteSchema[kwtypes(TrackId=int, Name=str)], task_config=SQLite3Config( - uri=EXAMPLE_DB, + uri=DB_LOCATION, compressed=True, ), ) + # sqlite3_end assert sql_task.output_columns is not None df = sql_task(limit=1)