From c9be6c4cb85e6ce49345929660f014deb07d3d95 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Mon, 11 Apr 2022 04:19:04 +0000 Subject: [PATCH] Updated dataclass example (#491) * Updated dataclass example Signed-off-by: Kevin Su * Fixed tests Signed-off-by: Kevin Su * Fixed tests Signed-off-by: Kevin Su * Updated example Signed-off-by: Kevin Su * Update flytekit and comment Signed-off-by: Kevin Su * add text Signed-off-by: Samhita Alla * Update dependency Signed-off-by: Kevin Su Co-authored-by: Samhita Alla --- cookbook/core/type_system/custom_objects.py | 113 ++++++++++++++++---- 1 file changed, 92 insertions(+), 21 deletions(-) diff --git a/cookbook/core/type_system/custom_objects.py b/cookbook/core/type_system/custom_objects.py index bbdb0215b8..78d23b342e 100644 --- a/cookbook/core/type_system/custom_objects.py +++ b/cookbook/core/type_system/custom_objects.py @@ -1,34 +1,35 @@ """ Using Custom Python Objects ----------------------------- +--------------------------- -Flyte supports passing JSONs between tasks. But, to simplify the usage for the users and introduce type-safety, -flytekit supports passing custom data objects between tasks. Currently only dataclasses that are decorated with -@dataclasses_json are supported. +Flyte supports passing JSON between tasks. But to simplify the usage for the users and introduce type-safety, +Flytekit supports passing custom data objects between tasks. + +Currently, data classes decorated with ``@dataclass_json`` are supported. +One good use case of a data class would be when you want to wrap all input in a data class in the case of a map task +which can only accept one input and produce one output. This example shows how users can serialize custom JSON-compatible dataclasses between successive tasks using the -excellent `dataclasses_json `__ library +excellent `dataclasses_json `__ library. """ + +# %% +# To get started, let's import the necessary libraries. +import os +import tempfile import typing from dataclasses import dataclass +import pandas as pd from dataclasses_json import dataclass_json from flytekit import task, workflow +from flytekit.types.directory import FlyteDirectory +from flytekit.types.file import FlyteFile +from flytekit.types.schema import FlyteSchema # %% -# This Datum is a user defined complex type, which can be used to pass complex data between tasks. -# Moreover, users can also pass this data between different languages and also input through the Flyteconsole as a -# raw JSON. -# -# .. note:: -# -# Only other supported types can be nested in this class, for example it can only contain other ``@dataclass_json`` -# annotated dataclasses if you want to use complex classes. Arbitrary classes will cause a **failure**. -# -# .. note:: -# -# All variables in DataClasses should be **annotated with their type**. Failure to do should will result in an error +# We define a simple data class that can be sent between tasks. @dataclass_json @dataclass class Datum(object): @@ -42,7 +43,32 @@ class Datum(object): # %% -# Once declared, dataclasses can be returned as outputs or accepted as inputs +# ``Datum`` is a user defined complex type that can be used to pass complex data between tasks. +# Interestingly, users can send this data between different tasks written in different languages and input it through the Flyte Console as raw JSON. +# +# .. note:: +# +# All variables in a data class should be **annotated with their type**. Failure to do should will result in an error. + +# %% +# Next, we define a data class that accepts :std:ref:`FlyteSchema `, :std:ref:`FlyteFile `, +# and :std:ref:`FlyteDirectory `. +@dataclass_json +@dataclass +class Result: + schema: FlyteSchema + file: FlyteFile + directory: FlyteDirectory + + +# %% +# .. note:: +# +# A data class supports the usage of data associated with Python types, data classes, FlyteFile, FlyteDirectory, and FlyteSchema. +# +# Once declared, dataclasses can be returned as outputs or accepted as inputs. +# +# 1. Datum Data Class @task def stringify(x: int) -> Datum: """ @@ -62,15 +88,60 @@ def add(x: Datum, y: Datum) -> Datum: # %% -# Workflow creation remains identical +# The ``stringify`` task outputs a data class, and the ``add`` task accepts data classes as inputs. +# +# 2. Result Data Class +@task +def upload_result() -> Result: + """ + Flytekit will upload FlyteFile, FlyteDirectory, and FlyteSchema to blob store (GCP, S3) + """ + df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) + temp_dir = tempfile.mkdtemp(prefix="flyte-") + + schema_path = temp_dir + "/schema.parquet" + df.to_parquet(schema_path) + + file_path = tempfile.NamedTemporaryFile(delete=False) + file_path.write(b"Hello world!") + fs = Result( + schema=FlyteSchema(temp_dir), + file=FlyteFile(file_path.name), + directory=FlyteDirectory(temp_dir), + ) + return fs + + +@task +def download_result(res: Result): + """ + Flytekit will lazily load FlyteSchema. We download the schema only when users invoke open(). + """ + assert pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}).equals( + res.schema.open().all() + ) + f = open(res.file, "r") + assert f.read() == "Hello world!" + assert os.listdir(res.directory) == ["schema.parquet"] + + +# %% +# The ``upload_result`` task outputs a data class, and the ``download_result`` task accepts data classes as inputs. + +# %% +# Lastly, we create a workflow. @workflow -def wf(x: int, y: int) -> Datum: +def wf(x: int, y: int) -> (Datum, Result): """ Dataclasses (JSON) can be returned from a workflow as well. """ - return add(x=stringify(x=x), y=stringify(x=y)) + res = upload_result() + download_result(res=res) + return add(x=stringify(x=x), y=stringify(x=y)), res +# %% +# We can run the workflow locally. if __name__ == "__main__": """ This workflow can be run locally. During local execution also, the dataclasses will be marshalled to and from json.