diff --git a/cookbook/core/extend_flyte/README.rst b/cookbook/core/extend_flyte/README.rst index 1ed078f4e..abb2acbfb 100644 --- a/cookbook/core/extend_flyte/README.rst +++ b/cookbook/core/extend_flyte/README.rst @@ -5,16 +5,13 @@ Extending Flyte ############### The core of Flyte is a container execution engine, where you can write one or more tasks and compose them together to -form a data dependency DAG, called a ``workflow``. If your work involves writing simple python or java tasks that can -either perform operations on their own or can call out to :ref:`supported external services `, +form a data dependency DAG, called a ``workflow``. If your work involves writing simple Python or Java tasks that can +either perform operations on their own or call out to :ref:`Supported external services `, then there's *no need to extend Flyte*. -================= -Why Extend Flyte? -================= - -Define Custom Types -=================== +==================== +Define a Custom Type +==================== Flyte, just like a programming language, has a core type-system, which can be extended by adding user-defined data types. For example, Flyte supports adding support for a dataframe type from a new library, a custom user data structure, or a @@ -23,9 +20,8 @@ grouping of images in a specific encoding. Flytekit natively supports structured data like :py:func:`~dataclasses.dataclass` using JSON as the representation format (see :ref:`Using Custom Python Objects `). -For types that are not simply representable as JSON documents, Flytekit allows users to extend Flyte's type system and -implement these types in Python. The user has to implement a :py:class:`~flytekit.extend.TypeTransformer` -class to enable translation of the type from the user type to the Flyte-understood type. +Flytekit allows users to extend Flyte's type system and implement types in Python that are not representable as JSON documents. The user has to implement a :py:class:`~flytekit.extend.TypeTransformer` +class to enable the translation of type from user type to Flyte-understood type. As an example, instead of using :py:class:`pandas.DataFrame` directly, you may want to use `Pandera `__ to perform validation of an input or output dataframe @@ -33,22 +29,22 @@ As an example, instead of using :py:class:`pandas.DataFrame` directly, you may w To extend the type system, refer to :std:ref:`advanced_custom_types`. +===================== +Add a New Task Plugin +===================== -Adding a New Task Type -====================== - -Often times you want to interact with services like: +Often you want to interact with services like: -- Databases (e.g. Postgres, MySQL, etc) -- DataWarehouses (e.g. Snowflake, BigQuery, Redshift etc) -- Computation (e.g. AWS EMR, Databricks etc) +- Databases (e.g., Postgres, MySQL, etc.) +- DataWarehouses (e.g., Snowflake, BigQuery, Redshift etc.) +- Computation (e.g., AWS EMR, Databricks etc.) -You might want this to be available like a template for the open source community or within your organization. This +You might want this interaction to be available as a template for the open-source community or in your organization. This can be done by creating a task plugin, which makes it possible to reuse the task's underlying functionality within Flyte workflows. -If you want users to write code simply using the :py:func:`~flytekit.task` decorator, but you want to provide the -capability of running the function as a spark job or a sagemaker training job, then you can extend Flyte's task system: +If you want users to write code simply using the :py:func:`~flytekit.task` decorator, but want to provide the +capability of running the function as a spark job or a sagemaker training job, then you can extend Flyte's task system. .. code-block:: python @@ -75,14 +71,17 @@ Alternatively, you can provide an interface like this: df = query_task(time=t) return process(df=df) -There are two options when writing a new task type: you can write a task plugin as an extension in Flytekit or you +There are two options when writing a new task plugin: you can write a task plugin as an extension in Flytekit or you can go deeper and write a plugin in the Flyte backend. -Flytekit-only plugin --------------------- +Flytekit-Only Task Plugin +========================= + +Flytekit is designed to be extremely extensible. You can add new task-types that are useful only for your use-case. +Flyte does come with the capability of extending the backend, but that is only required if you want the capability to be +extended to all users of Flyte, or there is a cost/visibility benefit of doing so. -:std:ref:`Writing your own Flytekit plugin ` is simple and is typically where you want to -start when enabling custom task functionality. +Writing your own Flytekit plugin is simple and is typically where you want to start when enabling custom task functionality. .. list-table:: :widths: 50 50 @@ -90,29 +89,57 @@ start when enabling custom task functionality. * - Pros - Cons - * - Simple to write, just implement in python. Flyte will treat it like a container execution and blindly pass - control to the plugin - - Limited ways of providing additional visibility in progress, or external links etc + * - Simple to write — implement in Python. Flyte will treat it like a container execution and blindly pass + control to the plugin. + - Limited ways of providing additional visibility in progress, external links, etc. * - Simple to publish: ``flytekitplugins`` can be published as independent libraries and they follow a simple API. - - Has to be implemented again in every language as these are SDK side plugins only - * - Simple to perform testing: just test locally in flytekit - - In case of side-effects, potential of causing resource leaks. For example if the plugins runs a BigQuery job, - it is possible that the plugin may crash after running the Job and Flyte cannot guarantee that the BigQuery job + - Has to be implemented in every language as these are SDK-side plugins only. + * - Simple to perform testing: test locally in flytekit. + - In case of side-effects, it could lead to resource leaks. For example, if the plugin runs a BigQuery job, + it is possible that the plugin may crash after running the job and Flyte cannot guarantee that the BigQuery job will be successfully terminated. * - - Potentially expensive: in cases where the plugin runs a remote job, running a new pod for every task execution - causes severe strain on k8s and the task itself uses almost no CPUs. Also because of its stateful nature, + causes severe strain on Kubernetes and the task itself uses almost no CPUs. Also because of its stateful nature, using spot-instances is not trivial. - * - - - A bug fix to the runtime, needs a new library version of the plugin - * - - - Not trivial to implement resource controls - e.g. throttling, resource pooling etc + * - + - A bug fix to the runtime needs a new library version of the plugin. + * - + - Not trivial to implement resource controls, like throttling, resource pooling, etc. + +User Container vs. Pre-built Container Task Plugin +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +A Flytekit-only task plugin can be a :ref:`user container ` or :ref:`pre-built container ` task plugin. + +.. list-table:: + :widths: 10 50 50 + :header-rows: 1 + + * - + - User Container + - Pre-built Container + * - Serialization + - At serialization time, a Docker container image is required. The assumption is that this Docker image has the task code. + - The Docker container image is hardcoded at serialization time into the task definition by the author of that task plugin. + * - Serialization + - The serialized task contains instructions to the container on how to reconstitute the task. + - Serialized task should contain all the information needed to run that task instance (but not necessarily to reconstitute it). + * - Run-time + - When Flyte runs the task, the container is launched, and the user-given instructions recreate a Python object representing the task. + - When Flyte runs the task, the container is launched. The container should have an executor built into it that knows how to execute the task. + * - Run-time + - The task object that gets serialized at compile-time is recreated using the user's code at run time. + - The task object that gets serialized at compile-time does not exist at run time. + * - Run-time + - At platform-run-time, the user-decorated function is executed. + - At platform-run-time, there is no user function, and the executor is responsible for producing outputs, given the inputs to the task. Backend Plugin --------------- +============== :std:ref:`Writing a Backend plugin ` makes it possible for users to write extensions for -FlytePropeller, which is Flyte's scheduling engine. This enables complete control on the visualization and availability +FlytePropeller - Flyte's scheduling engine. This enables complete control of the visualization and availability of the plugin. .. list-table:: @@ -121,17 +148,17 @@ of the plugin. * - Pros - Cons - * - Service oriented way of deploying new plugins - strong contracts. Maintainers can deploy new versions of the backend plugin, fix bugs, without needing the users to upgrade Libraries etc - - Need to be implemented in golang - * - Drastically cheaper and more efficient to execute. FlytePropeller is written in Golang and uses an event loop model. Each process of FlytePropeller can execute 1000's of tasks concurrently. - - Needs a FlytePropeller build - *currently* - * - Flyte will guarantee resource cleanup - - Need to implement contract in some spec language like protobuf, openAPI etc - * - Flyteconsole plugins (capability coming soon) can be added to customize visualization and progress tracking of the execution - - Development cycle can be much slower than flytekit only plugins - * - Resource controls and backpressure management is available + * - Service oriented way of deploying new plugins - strong contracts. Maintainers can deploy new versions of the backend plugin, fix bugs, without needing the users to upgrade libraries, etc. + - Need to be implemented in GOlang. + * - Drastically cheaper and more efficient to execute. FlytePropeller is written in GOlang and uses an event loop model. Each process of FlytePropeller can execute thousands of tasks concurrently. + - Needs a FlytePropeller build (*currently*). + * - Flyte guarantees resource cleanup. + - Need to implement contract in a spec language like protobuf, OpenAPI, etc. + * - Flyteconsole plugins (capability coming soon!) can be added to customize visualization and progress tracking of the execution. + - Development cycle can be much slower than flytekit-only plugins. + * - Resource controls and backpressure management is available. - - * - Implement once, use in any SDK or language + * - Implement once, use in any SDK or language! - ======= diff --git a/cookbook/core/extend_flyte/container_interface.py b/cookbook/core/extend_flyte/container_interface.py index 8d42f164f..714495666 100644 --- a/cookbook/core/extend_flyte/container_interface.py +++ b/cookbook/core/extend_flyte/container_interface.py @@ -3,6 +3,7 @@ Container Interface ------------------- + Flyte typically interacts with containers in the course of its task execution (since most tasks are container tasks). This is what that process looks like: diff --git a/cookbook/core/extend_flyte/custom_types.py b/cookbook/core/extend_flyte/custom_types.py index fe0883fdc..3add80624 100644 --- a/cookbook/core/extend_flyte/custom_types.py +++ b/cookbook/core/extend_flyte/custom_types.py @@ -4,16 +4,16 @@ Writing Custom Flyte Types -------------------------- -Flyte is a strongly-typed framework for authoring tasks and workflows. But there are situations when the existing set -of types do not directly work. In fact, this is true with any programming language! +Flyte is a strongly-typed framework for authoring tasks and workflows. But there are situations when the existing +types do not directly work. This is true with any programming language! -Just like a programming language enabling higher-level concepts to describe user-specific objects - like classes in Python/Java/C++, struct in C/Golang, etc., +Similar to a programming language enabling higher-level concepts to describe user-specific objects such as classes in Python/Java/C++, struct in C/Golang, etc., Flytekit allows modeling user classes. The idea is to make an interface that is more productive for the -use case, but write a transformer that transforms the user-defined type to one of the generic constructs in Flyte's type system. +use case, while writing a transformer that converts the user-defined type into one of the generic constructs in Flyte's type system. -This example will try to model an example user-defined dataset and show how it can be integrated seamlessly with Flytekit's typing engine. +This example will try to model an example user-defined dataset and show how it can be seamlessly integrated with Flytekit's type engine. -The video below will walk you through the example. +The example is demonstrated in the video below: .. youtube:: 1xExpRzz8Tw @@ -38,7 +38,7 @@ # Defined type here represents a list of files on the disk. We will refer to it as ``MyDataset``. class MyDataset(object): """ - MyDataset is a collection of files. In Flyte, this maps to a multi-part blob or directory + ``MyDataset`` is a collection of files. In Flyte, this maps to a multi-part blob or directory. """ def __init__(self, base_dir: str = None): @@ -66,8 +66,8 @@ def new_file(self, name: str) -> str: # %% -# ``MyDataset`` represents a set of files locally, but when a workflow consists of multiple steps, we want the data to -# flow between the different steps. To achieve this, it is necessary to explain how the data will be transformed to +# ``MyDataset`` represents a set of files locally. However, when a workflow consists of multiple steps, we want the data to +# flow between different steps. To achieve this, it is necessary to explain how the data will be transformed to # Flyte's remote references. To do this, we create a new instance of # :py:class:`~flytekit:flytekit.extend.TypeTransformer`, for the type ``MyDataset`` as follows: # @@ -102,10 +102,10 @@ def to_literal( """ This method is used to convert from the given python type object ``MyDataset`` to the Literal representation. """ - # Step 1: let's upload all the data into a remote place recommended by Flyte + # Step 1: Upload all the data into a remote place recommended by Flyte remote_dir = ctx.file_access.get_random_remote_directory() ctx.file_access.upload_directory(python_val.base_dir, remote_dir) - # Step 2: let's return a pointer to this remote_dir in the form of a Literal + # Step 2: Return a pointer to this remote_dir in the form of a Literal return Literal( scalar=Scalar( blob=Blob(uri=remote_dir, metadata=BlobMetadata(type=self._TYPE_INFO)) @@ -118,10 +118,10 @@ def to_python_value( """ In this method, we want to be able to re-hydrate the custom object from Flyte Literal value. """ - # Step 1: let's download remote data locally + # Step 1: Download remote data locally local_dir = ctx.file_access.get_random_local_directory() ctx.file_access.download_directory(lv.scalar.blob.uri, local_dir) - # Step 2: create the ``MyDataset`` object + # Step 2: Create the ``MyDataset`` object return MyDataset(base_dir=local_dir) @@ -160,6 +160,6 @@ def wf() -> str: # %% -# We can run this workflow locally and test it. Even when you run it locally, Flytekit will exercise the entire path. +# This workflow can be executed and tested locally. Flytekit will exercise the entire path even if you run it locally. if __name__ == "__main__": print(wf()) diff --git a/cookbook/core/extend_flyte/prebuilt_container.py b/cookbook/core/extend_flyte/prebuilt_container.py new file mode 100644 index 000000000..acb72f6cf --- /dev/null +++ b/cookbook/core/extend_flyte/prebuilt_container.py @@ -0,0 +1,90 @@ +""" +.. _prebuilt_container: + +Pre-built Container Task Plugin +------------------------------- + +A pre-built container task plugin runs a pre-built container. The following are the advantages of using a pre-built container in comparison to a user-defined container: + +- Shifts the burden of writing Dockerfile from the user who uses the task in workflows to the author of the task type. +- Allows the author to optimize the image that the task runs on. +- Makes it possible to (largely) extend the Flyte task execution behavior without using the backend GOlang plugin. + The caveat is that these tasks can't access the K8s cluster, so you'll still need a backend plugin if you want a custom task type that generates CRD. + +Usage +***** + +Take a look at the `example PR `__, where we switched the built-in SQLite3 task from the old (user-container) to the new style of writing tasks. + +There aren't many changes from the user's standpoint: +- Install whichever Python library has the task type definition (in the case of SQLite3, it's bundled in Flytekit, but this isn't always the case + (for example, `SQLAlchemy `__)). +- Import and instantiate the task as you would for any other type of non-function-based task. + +How to Write a Task +******************* + +Writing a pre-built container task consists of three steps: + +1. Defining a Task class +2. Defining an Executor class +3. Creating a Dockerfile that is executed when any user runs your task. It'll most likely include Flytekit, Python, and your task extension code. + +To follow along, use the `PR (mentioned above) `__ where we migrate the SQLite3 task. + +Python Library +************** + +Task +==== + +New tasks of this type must be created as a subclass of the ``PythonCustomizedContainerTask`` class. + +Specifically, you need to customize the following three arguments which would be sent to the parent class constructor: + +* ``container_image``: This is the container image that will run on a Flyte platform when the user invokes the job. +* ``executor_type``: This should be the Python class that inherits the ``ShimTaskExecutor``. +* ``task_type``: All types have a task type. Flyte engine uses this string to identify which plugin to use when running a task. + +The container plugin will be used for everything that doesn't have an explicit match (which is correct in this case). +So you may call it whatever you want, just not something that's already been claimed (like "spark"). + +Referring to the SQLite3 example, :: + + container_image="ghcr.io/flyteorg/flytekit:py38-v0.19.0b7", + executor_type=SQLite3TaskExecutor, + task_type="sqlite", + +Note that the container is special in this case since we utilize the Flytekit image. + +Furthermore, you need to override the ``get_custom`` function to include all the information the executor will need to run. + +Keep in mind that the task's execution behavior is entirely defined by the task's serialized form (that is, the serialized ``TaskTemplate``). +This function stores and inserts the data into the template's `custom field `__. +However, keep the task template's overall size to a minimum. + +Executor +======== + +You must subclass and override the ``execute_from_model`` function for the ``ShimTaskExecutor`` abstract class. +This function will be invoked in both local workflow execution and platform-run-time execution, and will include all of the business logic of your task. + +The signature of this execute function differs from the ``execute`` functions of most other tasks since the ``TaskTemplate`` determines all the business logic, including how the task is run. + +Image +===== + +This is the custom image that you specified in the subclass ``PythonCustomizedContainerTask``. Out of the box, when Flyte runs the container, these tasks will run a command that looks like this :: + + pyflyte-execute --inputs s3://inputs.pb --output-prefix s3://outputs --raw-output-data-prefix s3://user-data --resolver flytekit.core.python_customized_container_task.default_task_template_resolver -- {{.taskTemplatePath}} path.to.your.executor.subclass + +This means that your `Docker image `_ will need Python and Flytekit installed. +The container's Python interpreter should be able to find your custom executor class at the import path ``path.to.your.executor.subclass``. + +---- + +The key takeaways of a pre-built container task plugin are: + +- The task object serialized at compile time does not exist at run time. +- There is no user function at platform run time, and the executor is responsible for producing outputs based on the task's inputs. +""" diff --git a/cookbook/core/extend_flyte/custom_task_plugin.py b/cookbook/core/extend_flyte/user_container.py similarity index 52% rename from cookbook/core/extend_flyte/custom_task_plugin.py rename to cookbook/core/extend_flyte/user_container.py index 9f5f95e73..ec76f0ef0 100644 --- a/cookbook/core/extend_flyte/custom_task_plugin.py +++ b/cookbook/core/extend_flyte/user_container.py @@ -1,28 +1,23 @@ """ -.. _advanced_custom_task_plugin: +.. _user_container: -Writing Your Own Flytekit Task Plugins ---------------------------------------- +User Container Task Plugin +-------------------------- -Flytekit is designed to be extremely extensible. You can add new task-types that are useful only for your use-cases. -Flyte does come with the capability of extending the backend, but that is only required if you want the capability to be -extended to all users of Flyte, or there is a cost/visibility benefit of doing so. +A user container task plugin runs a user-defined container that has the user code. -The following demo shows how to build Flyte container task extensions, with an SQLAlchemy extension as an example: - -.. youtube:: hfKbfcJbawE - -This tutorial will walk you through writing your own Sensor style plugin, that allows users to wait for a file to land -in the object store. Remember, if you follow the Flyte/flytekit constructs, you will automatically make your plugin portable +This tutorial will walk you through writing your own sensor-style plugin that allows users to wait for a file to land +in the object store. Remember that if you follow the flyte/flytekit constructs, you will automatically make your plugin portable across all cloud platforms that Flyte supports. Sensor Plugin -^^^^^^^^^^^^^^ -A sensor Plugin waits for some event to happen, before marking the task as success. You do not need to worry about the -timeout as that will be handled by the flyte engine itself, when running in production +************* + +A sensor plugin waits for some event to happen before marking the task as success. You need not worry about the +timeout as that will be handled by the flyte engine itself when running in production. Plugin API -^^^^^^^^^^^^ +^^^^^^^^^^ .. code-block:: python @@ -46,14 +41,15 @@ def wait_and_run(path: str) -> int: # %% # Plugin Structure -# ^^^^^^^^^^^^^^^^^ -# As illustrated above to achieve this structure we need to create a Class called ``WaitForObjectStoreFile``, which +# ^^^^^^^^^^^^^^^^ +# +# As illustrated above, to achieve this structure we need to create a class named ``WaitForObjectStoreFile``, which # derives from :py:class:`flytekit.PythonFunctionTask` as follows. # class WaitForObjectStoreFile(PythonTask): """ - Add documentation here for your Plugin. - This plugin creates an object store file sensor, that waits and exits only when the file exists. + Add documentation here for your plugin. + This plugin creates an object store file sensor that waits and exits only when the file exists. """ _VAR_NAME: str = "path" @@ -76,7 +72,7 @@ def __init__( self._poll_interval = poll_interval def execute(self, **kwargs) -> typing.Any: - # No need to check for existence, as that is guaranteed + # No need to check for existence, as that is guaranteed. path = kwargs[self._VAR_NAME] ctx = context_manager.FlyteContext.current_context() user_context = ctx.user_space_params @@ -90,28 +86,28 @@ def execute(self, **kwargs) -> typing.Any: # %% -# Note about Config Objects -# ^^^^^^^^^^^^^^^^^^^^^^^^^^ +# Config Objects +# ============== # -# Flytekit routes to the right plugin only based on the type of task_config class, if using the @task decorator. -# Config is very useful for cases when one wants to customize the behavior of the plugin or pass the config information -# to the backend plugin, but in this case we have no real configuration. The config object is any class, that your -# plugin understands +# Flytekit routes to the right plugin based on the type of ``task_config`` class if using the ``@task`` decorator. +# Config is very useful for cases when you want to customize the behavior of the plugin or pass the config information +# to the backend plugin; however, in this case there's no real configuration. The config object can be any class that your +# plugin understands. # # .. note:: # -# Observe that the base class is Generic, it is parameterized with the desired config class +# Observe that the base class is Generic; it is parameterized with the desired config class. # # .. note:: # -# To create a task decorator based plugin the Config is required In this example, we are creating a named class plugin -# This construct does not need a plugin +# To create a task decorator-based plugin, ``task_config`` is required. +# In this example, we are creating a named class plugin, and hence, this construct does not need a plugin. # -# We will try to cover an example of Config objects in a subsequent tutorial +# Refer to the `spark plugin `__ for an example of a config object. # %% # Actual Usage -# ^^^^^^^^^^^^^ +# ^^^^^^^^^^^^ sensor = WaitForObjectStoreFile( name="my-objectstore-sensor", @@ -132,10 +128,16 @@ def my_workflow(path: str) -> str: # %% -# Ofcourse you can run the workflow using your own new shiny plugin locally +# And of course, you can run the workflow locally using your own new shiny plugin! if __name__ == "__main__": f = "/tmp/some-file" with open(f, "w") as w: w.write("Hello World!") print(my_workflow(path=f)) + +# %% +# The key takeaways of a user container task plugin are: +# +# - The task object that gets serialized at compile-time is recreated using the user's code at run time. +# - At platform-run-time, the user-decorated function is executed. \ No newline at end of file diff --git a/cookbook/docs/_static/sphx_gallery_autogen.css b/cookbook/docs/_static/sphx_gallery_autogen.css index 30a0f5368..b7e81f497 100644 --- a/cookbook/docs/_static/sphx_gallery_autogen.css +++ b/cookbook/docs/_static/sphx_gallery_autogen.css @@ -12,6 +12,7 @@ #sphx-glr-download-auto-remote-access-remote-task-py, #sphx-glr-download-auto-core-extend-flyte-backend-plugins-py, #sphx-glr-download-auto-core-extend-flyte-container-interface-py, +#sphx-glr-download-auto-core-extend-flyte-prebuilt-container-py, #sphx-glr-download-auto-larger-apps-larger-apps-iterate-py, #sphx-glr-download-auto-larger-apps-larger-apps-deploy-py { height: 0px;