Skip to content

Commit

Permalink
[Docs] improve data management doc
Browse files Browse the repository at this point in the history
Signed-off-by: DenChenn <[email protected]>
  • Loading branch information
DenChenn committed Oct 16, 2024
1 parent 197ae13 commit c81b534
Showing 1 changed file with 110 additions and 43 deletions.
153 changes: 110 additions & 43 deletions docs/user_guide/concepts/main_concepts/data_management.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,70 +9,65 @@ Understand How Flyte Handles Data
Types of Data
=============

There are two parts to the data in Flyte:
In Flyte, data is categorized into metadata and raw data to optimize data handling and improve performance and security.

1. Metadata
* **Metadata**: Small values, like integers and strings, are treated as "stack parameters" (passed by value). This metadata is globally accessible to Flyte components (FlytePropeller, FlyteAdmin, and other running pods/jobs). Each entry is limited to 10MB and is passed directly between tasks. On top of that, Metadata allow in-memory computations for branches, partial outputs, and composition of multiple outputs as input for other tasks.

* It consists of data about inputs to a task, and other artifacts.
* It is configured globally for FlytePropeller, FlyteAdmin etc., and the running pods/jobs need access to this bucket to get the data.
* **Raw Data**: Larger data, such as files and dataframes, are treated as "heap parameters" (passed by reference). Flyte stores raw data in an object store (e.g., S3), uploading it on first use and passing only a reference thereafter. Tasks can then access this data via Flyte’s automated download or streaming, enabling efficient access to large datasets without needing to transfer full copies.

2. Raw data
*Source code reference for auto-offloading value sizes limitation:*

* It is the actual data (such as the Pandas DataFrame, Spark DataFrame, etc.).
* Raw data paths are unique for every execution, and the prefixes can be modified per execution.
* None of the Flyte control plane components would access the raw data. This provides great separation of data between the control plane and the data plane.
.. literalinclude:: ../../../../flytepropeller/pkg/controller/config/config.go
:caption: flytepropeller/pkg/controller/config/config.go
:language: go
:lines: 184-192

.. note:
Metadata and raw data can be present in entirely separate buckets.
Data Flow and Security
~~~~~~~~~~~~~~~~~~~~~~

Flyte’s data separation avoids bottlenecks and security risks:

Let us consider a simple Python task:
* **Metadata** remains within Flyte’s control plane, making it accessible through the Flyte Console or CLI.
* **Raw Data** is accessible only by tasks, stored securely in an external blob store, preventing Flyte’s control plane from directly handling large data files.

.. code-block:: python
Moreover, a unique property of this separation is that all meta values are read by FlytePropeller engine and available on the FlyteConsole or CLI from the control plane.

Example
~~~~~~~

Consider a basic Flyte task:

@task
def my_task(m: int, n: str, o: FlyteFile) -> pd.DataFrame:
...
.. code-block:: python
@task
def my_task(m: int, n: str, o: FlyteFile) -> pd.DataFrame:
...
In the above code sample, ``m``, ``n``, ``o`` are inputs to the task.
``m`` of type ``int`` and ``n`` of type ``str`` are simple primitive types, while ``o`` is an arbitrarily sized file.
All of them from Flyte's point of view are ``data``.
The difference lies in how Flyte stores and passes each of these data items.
For every task that receives input, Flyte sends an **Inputs Metadata** object, which contains all the primitive or simple scalar values inlined, but in the case of
complex, large objects, they are offloaded and the `Metadata` simply stores a reference to the object. In our example, ``m`` and ``n`` are inlined while
``o`` and the output ``pd.DataFrame`` are offloaded to an object store, and their reference is captured in the metadata.
In this task, ``m``, ``n``, and ``o`` are inputs: ``m`` (int) and ``n`` (str) are simple types, while ``o`` is a large, arbitrarily sized file.
Flyte treats each differently:

`Flytekit TypeTransformers` make it possible to use complex objects as if they are available locally - just like persistent filehandles. But Flyte backend only deals with
the references.
* Metadata: Small values like ``m`` and ``n`` are inlined within Flyte’s metadata and passed directly between tasks.
* Raw Data: Objects like ``o`` and the output pd.DataFrame are offloaded to an object store (e.g., S3), with only references retained in metadata.

Thus, primitive data types and references to large objects fall under Metadata - `Meta input` or `Meta output`, and the actual large object is known as **Raw data**.
A unique property of this separation is that all `meta values` are read by FlytePropeller engine and available on the FlyteConsole or CLI from the control plane.
`Raw` data is not read by any of the Flyte components and hence it is possible to store it in a completely separate blob storage or alternate stores, which can't be accessed by Flyte control plane components
but can be accessed by users's container/tasks.
Flytekit TypeTransformers make it possible to use complex objects as if they are available locally - just like persistent filehandles. But Flyte backend only deals with the references.

Raw Data Prefix
~~~~~~~~~~~~~~~

Every task can read/write its own data files. If ``FlyteFile`` or any natively supported type like ``pandas.DataFrame`` is used, Flyte will automatically offload and download
data from the configured object-store paths. These paths are completely customizable per `LaunchPlan` or `Execution`.

- The default Rawoutput path (prefix in an object store like S3/GCS) can be configured during registration as shown in :std:ref:`flytectl_register_files`.
* The default Rawoutput path (prefix in an object store like S3/GCS) can be configured during registration as shown in :std:ref:`flytectl_register_files`.
The argument ``--outputLocationPrefix`` allows us to set the destination directory for all the raw data produced. Flyte will create randomized folders in this path to store the data.
- To override the ``RawOutput`` path (prefix in an object store like S3/GCS), you can specify an alternate location when invoking a Flyte execution, as shown in the following screenshot of the LaunchForm in FlyteConsole:
* To override the ``RawOutput`` path (prefix in an object store like S3/GCS),
you can specify an alternate location when invoking a Flyte execution, as shown in the following screenshot of the LaunchForm in FlyteConsole:

.. image:: https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/data_movement/launch_raw_output.png
.. image:: https://github.com/flyteorg/static-resources/blob/ddec386ab74897e3c09d89fdaf305e77a2ecc52c/flyte/concepts/data_movement/launch_raw_output.png

- In the sandbox, the default Rawoutput-prefix is configured to be the root of the local bucket. Hence Flyte will write all the raw data (reference types like blob, file, df/schema/parquet, etc.) under a path defined by the execution.
* In the sandbox, the default Rawoutput-prefix is configured to be the root of the local bucket.
Hence Flyte will write all the raw data (reference types like blob, file, df/schema/parquet, etc.) under a path defined by the execution.


Metadata
~~~~~~~~

Metadata in Flyte is critical to enable the passing of data between tasks. It allows to perform in-memory computations for branches or send partial outputs from one task to another or compose outputs from multiple tasks into one input to be sent to a task.

Thus, metadata is restricted due to its omnipresence. Each `meta output`/`input` cannot be larger than 1MB. If you have `List[int]`, it cannot be larger than 1MB, considering other input entities. In scenarios where large lists or strings need to be sent between tasks, file abstraction is preferred.

``LiteralType`` & Literals
~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -154,15 +149,15 @@ The illustration below explains how data flows from engine to the task and how t
We could use fast metadata stores to speed up data movement or exploit locality.

Between Flytepropeller and Tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. image:: https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/data_movement/flyte_data_movement.png
.. image:: https://github.com/flyteorg/static-resources/blob/ddec386ab74897e3c09d89fdaf305e77a2ecc52c/flyte/concepts/data_movement/flyte_data_movement.png


Between Tasks
~~~~~~~~~~~~~~
~~~~~~~~~~~~~

.. image:: https://raw.githubusercontent.com/flyteorg/static-resources/main/flyte/concepts/data_movement/flyte_data_transfer.png
.. image:: https://github.com/flyteorg/static-resources/blob/ddec386ab74897e3c09d89fdaf305e77a2ecc52c/flyte/concepts/data_movement/flyte_data_transfer.png


Bringing in Your Own Datastores for Raw Data
Expand All @@ -174,3 +169,75 @@ For example, it is theoretically possible to use S3 ``s3://`` for metadata and G
But for Metadata, the data should be accessible to Flyte control plane.

Data persistence is also pluggable. By default, it supports all major blob stores and uses an interface defined in Flytestdlib.

Practical Example
~~~~~~~~~~~~~~~~~

Let's consider a simple example where we have some tasks that needs to operate huge dataframes.

The first task reads a file from the object store, shuffles the data, save to local disk and passes the path to the next task.

.. code-block:: python
@task()
def task_remove_column(input_file: FlyteFile, column_name: str) -> FlyteFile:
"""
Reads the input file as a DataFrame, removes a specified column, and outputs it as a new file.
"""
input_file.download()
df = pd.read_csv(input_file.path)
# remove column
if column_name in df.columns:
df = df.drop(columns=[column_name])
output_file_path = "data_finished.csv"
df.to_csv(output_file_path, index=False)
return FlyteFile(output_file_path)
...
The second task reads the file from the previous task, removes a column, save to local disk and return the path.

.. code-block:: python
@task()
def task_remove_column(input_file: FlyteFile, column_name: str) -> FlyteFile:
"""
Reads the input file as a DataFrame, removes a specified column, and outputs it as a new file.
"""
input_file.download()
df = pd.read_csv(input_file.path)
# remove column
if column_name in df.columns:
df = df.drop(columns=[column_name])
output_file_path = "data_finished.csv"
df.to_csv(output_file_path, index=False)
return FlyteFile(output_file_path)
...
And this is how the workflow looks like:

.. code-block:: python
@workflow
def wf() -> FlyteFile:
existed_file = FlyteFile("s3://custom-bucket/data.csv")
shuffled_file = task_read_and_shuffle_file(input_file=existed_file)
result_file = task_remove_column(input_file=shuffled_file, column_name="County")
return result_file
...
This example shows how to access an existing file in a MinIO bucket from the Flyte Sandbox and pass it between tasks with ``FlyteFile``.
When a workflow outputs a local file as a ``FlyteFile``, Flyte automatically uploads it to MinIO and provides an S3 URL for downstream tasks, no manual uploads needed. Take a look at the following:

First task output metadata:

.. image:: https://github.com/flyteorg/static-resources/blob/ddec386ab74897e3c09d89fdaf305e77a2ecc52c/flyte/concepts/data_movement/flyte_data_movement_example_output.png

Second task input metadata:

.. image:: https://github.com/flyteorg/static-resources/blob/ddec386ab74897e3c09d89fdaf305e77a2ecc52c/flyte/concepts/data_movement/flyte_data_movement_example_input.png

0 comments on commit c81b534

Please sign in to comment.