Skip to content

Commit

Permalink
Updated definition of a workflow (flyteorg#470)
Browse files Browse the repository at this point in the history
* Updated definition of a workflow

Signed-off-by: Ketan Umare <[email protected]>

* update docs

Signed-off-by: Samhita Alla <[email protected]>

* incorporate @cosmicBboy's suggestions

Signed-off-by: Samhita Alla <[email protected]>

* fix keras version

Signed-off-by: Samhita Alla <[email protected]>

Co-authored-by: Samhita Alla <[email protected]>
  • Loading branch information
kumare3 and samhita-alla authored Nov 8, 2021
1 parent c91def6 commit ab5a943
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 24 deletions.
1 change: 1 addition & 0 deletions cookbook/case_studies/ml_training/spark_horovod/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ RUN sed -i 's/[ #]\(.*StrictHostKeyChecking \).*/ \1no/g' /etc/ssh/ssh_config &&

# Horovod-related installations
RUN pip install tensorflow==2.6.0
RUN pip install keras==2.6.0
# Enable GPU
# ENV HOROVOD_GPU_OPERATIONS NCCL
RUN HOROVOD_WITH_MPI=1 HOROVOD_WITH_TENSORFLOW=1 pip install --no-cache-dir horovod[spark,tensorflow]==0.22.1
Expand Down
26 changes: 19 additions & 7 deletions cookbook/core/control_flow/dynamics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,24 @@
Dynamic Workflows
------------------
A workflow is typically static where the directed acyclic graph's (DAG) structure is known at compile-time. However,
scenarios exist where a run-time parameter (e.g. the output of an earlier task) determines the full DAG structure.
A workflow is typically static where the directed acyclic graph's (DAG) structure is known at compile-time.
However, in cases where a run-time parameter (e.g. the output of an earlier task) determines the full DAG structure, you can use dynamic workflows by decorating a function with ``@dynamic``.
Dynamic workflows can be used in such cases. Here's a code example that counts the common characters between any two
strings.
A dynamic workflow is similar to the :py:func:`~flytekit.workflow`, in that it represents a python-esque DSL to
declare task interactions or new workflows. One significant difference between a regular workflow and dynamic (workflow) is that
the latter is evaluated at runtime. This means that the inputs are first materialized and sent to the actual function,
as if it were a task. However, the return value from a dynamic workflow is a promise rather than an actual value,
which can be fulfilled by evaluating the various tasks that were invoked in the dynamic workflow.
Within the ``@dynamic`` context (function), every invocation of a :py:func:`~flytekit.task` or a derivative of
:py:class:`~flytekit.core.base_task.Task` class will result in deferred evaluation using a promise, instead
of the actual value being materialized. You can also nest other ``@dynamic`` and ``@workflow`` constructs within this
task, but it is not possible to interact with the outputs of a ``task/workflow`` as they are lazily evaluated.
If you want to interact with the outputs, break up the logic in dynamic and create a new task to read and resolve the outputs.
Refer to :py:func:`~flytekit.dynamic` for more documentation.
Here's a code example that counts the common characters between any two strings.
"""

# %%
Expand Down Expand Up @@ -62,12 +74,12 @@ def derive_count(freq1: typing.List[int], freq2: typing.List[int]) -> int:
# Therefore, ``@dynamic`` decorator has to be used.
#
# Dynamic workflow is effectively both a task and a workflow. The key thing to note is that the _body of tasks is run at run time and the
# body of workflows is run at compile (aka registration) time_. Essentially, this is what a dynamic workflow leverages -- it’s a workflow that is compiled at run time (the best of both worlds)!
# body of workflows is run at compile (aka registration) time. Essentially, this is what a dynamic workflow leverages -- it’s a workflow that is compiled at run time (the best of both worlds)!
#
# At execution (run) time, Flytekit runs the compilation step, and produces
# a ``WorkflowTemplate`` (from the dynamic workflow), which Flytekit then passes back to Flyte Propeller for further running, exactly how sub-workflows are handled.
#
# .. note:: For iterating over a list, the dynamic pattern is not always the most efficient method. `Map tasks <https://github.com/flyteorg/flytekit/blob/8528268a29a07fe7e9ce9f7f08fea68c41b6a60b/flytekit/core/map_task.py/>`_ might be more efficient in certain cases, keeping in mind they only work for Python tasks (tasks decorated with the @task decorator, not sql/spark/etc).
# .. note:: For iterating over a list, the dynamic pattern is not always the most efficient method. `Map tasks <https://github.com/flyteorg/flytekit/blob/8528268a29a07fe7e9ce9f7f08fea68c41b6a60b/flytekit/core/map_task.py/>`_ might be more efficient in certain cases, keeping in mind they only work for Python tasks (tasks decorated with the @task decorator, not sql/spark/etc).
#
# We now define the dynamic workflow encapsulating the above mentioned points.
@dynamic
Expand Down Expand Up @@ -104,7 +116,7 @@ def count_characters(s1: str, s2: str) -> int:
# Because of this fact, operations on the ``index`` variable like ``index + 1`` are not valid.
# To manage this problem, the values need to be passed to the other tasks to unwrap them.
#
# .. note:: The local execution will work when a ``@dynamic`` decorator is used because Flytekit treats it like a ``task`` that will run with the Python native inputs.
# .. note:: The local execution will work when a ``@dynamic`` decorator is used because Flytekit treats it like a ``task`` that will run with the Python native inputs.
# Therefore, there are no Promise objects locally within the function decorated with ``@dynamic`` as it is treated as a ``task``.

# %%
Expand Down
56 changes: 39 additions & 17 deletions cookbook/core/flyte_basics/basic_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@
Workflows
----------
Once you've had a handle on tasks, we can move to workflows. Workflow are the other basic building block of Flyte.
Once you have a handle on tasks, we can dive into Flyte workflows. Together, Flyte tasks and workflows make up the fundamental building blocks of Flyte.
Workflows string together two or more tasks. They are also written as Python functions, but it is important to make a
Workflows string together two or more tasks. They are also written as Python functions, but it is essential to make a
critical distinction between tasks and workflows.
The body of a task's function runs at "run time", i.e. on the K8s cluster, using the task's container. The body of a
workflow is not used for computation, it is only used to structure the tasks, i.e. the output of ``t1`` is an input
of ``t2`` in the workflow below. As such, the body of workflows is run at "registration" time. Please refer to the
registration docs for additional information as well since it is actually a two-step process.
Take a look at the conceptual :std:ref:`discussion <flyte:divedeep-workflow-nodes>`.
behind workflows for additional information.
The body of a task's function runs at "run time", i.e., on a K8s cluster (using the task's container), in a Query
Engine like BigQuery, or some other hosted service like AWS Batch, Sagemaker, etc. The body of a
workflow is not used for computation; it is only used to structure tasks.
The body of a workflow runs at "registration" time, i.e., the workflow unwraps during registration.
Registration refers to uploading the packaged (serialized) code to the Flyte backend so that the workflow can be triggered.
Please refer to the :std:ref:`registration docs <flyte:divedeep-registration>` to understand registration in Flyte.
Now, let's get started with a simple workflow.
"""
import typing

Expand All @@ -32,9 +32,6 @@ def t2(a: str, b: str) -> str:
return b + a


# %%
# You can treat the outputs of a task as you normally would a Python function. Assign the output to two variables
# and use them in subsequent tasks as normal. See :py:func:`flytekit.workflow`
@workflow
def my_wf(a: int, b: str) -> Tuple[int, str]:
x, y = t1(a=a)
Expand All @@ -43,13 +40,38 @@ def my_wf(a: int, b: str) -> Tuple[int, str]:


# %%
# Execute the Workflow, simply by invoking it like a function and passing in
# the necessary parameters
# ``my_wf`` is the Flyte workflow that accepts two inputs and calls two tasks from within it.
#
# How does a Flyte workflow work?
# ===============================
#
# The ``@workflow`` decorator wraps Python functions (Flyte tasks) which can be assumed as ``lazily`` evaluated promises. The functions are
# parsed, and each function call is deferred until the execution time. The function calls return "promises", which can be
# passed downstream to other functions but cannot be accessed within the workflow.
# The actual ``evaluation`` (evaluation refers to the execution of a task within the workflow) happens when the
# workflow executes.
#
# A workflow can be executed locally where the evaluation will happen immediately, or using the CLI, UI, etc., which will trigger an evaluation.
# Although Flyte workflows decorated with ``@workflow`` look like Python functions, they are actually python-esque, Domain Specific Language (DSL) entities
# that recognize the ``@task`` decorators. When a workflow encounters a ``@task``-decorated Python function, it creates a
# :py:class:`flytekit.core.promise.Promise` object. This promise doesn't contain the actual output of the task, and is only fulfilled at execution time.
#
# .. note::
# Refer to :py:func:`flytekit.dynamic` to create Flyte workflows dynamically. In a dynamic workflow, unlike a simple workflow,
# the actual inputs are pre-materialized; however, every invocation of a task still
# results in a promise to be evaluated lazily.


# %%
# Now, we can execute the workflow by invoking it like a function and sending in the required inputs.
#
# .. note::
#
# One thing to remember, currently we only support ``Keyword arguments``. So
# every argument should be passed in the form ``arg=value``. Failure to do so
# will result in an error
# Currently we only support ``keyword arguments``, so
# every argument should be passed in the form of ``arg=value``. Failure to do so
# will result in an error.
if __name__ == "__main__":
print(f"Running my_wf(a=50, b='hello') {my_wf(a=50, b='hello')}")

# %%
# To know more about workflows, take a look at the conceptual :std:ref:`discussion <flyte:divedeep-workflow-nodes>`.

0 comments on commit ab5a943

Please sign in to comment.