Skip to content

Commit

Permalink
Updated dataclass example (flyteorg#491)
Browse files Browse the repository at this point in the history
* Updated dataclass example

Signed-off-by: Kevin Su <[email protected]>

* Fixed tests

Signed-off-by: Kevin Su <[email protected]>

* Fixed tests

Signed-off-by: Kevin Su <[email protected]>

* Updated example

Signed-off-by: Kevin Su <[email protected]>

* Update flytekit and comment

Signed-off-by: Kevin Su <[email protected]>

* add text

Signed-off-by: Samhita Alla <[email protected]>

* Update dependency

Signed-off-by: Kevin Su <[email protected]>

Co-authored-by: Samhita Alla <[email protected]>
  • Loading branch information
pingsutw and samhita-alla authored Apr 11, 2022
1 parent a2b462b commit c9be6c4
Showing 1 changed file with 92 additions and 21 deletions.
113 changes: 92 additions & 21 deletions cookbook/core/type_system/custom_objects.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,35 @@
"""
Using Custom Python Objects
----------------------------
---------------------------
Flyte supports passing JSONs between tasks. But, to simplify the usage for the users and introduce type-safety,
flytekit supports passing custom data objects between tasks. Currently only dataclasses that are decorated with
@dataclasses_json are supported.
Flyte supports passing JSON between tasks. But to simplify the usage for the users and introduce type-safety,
Flytekit supports passing custom data objects between tasks.
Currently, data classes decorated with ``@dataclass_json`` are supported.
One good use case of a data class would be when you want to wrap all input in a data class in the case of a map task
which can only accept one input and produce one output.
This example shows how users can serialize custom JSON-compatible dataclasses between successive tasks using the
excellent `dataclasses_json <https://pypi.org/project/dataclasses-json/>`__ library
excellent `dataclasses_json <https://pypi.org/project/dataclasses-json/>`__ library.
"""

# %%
# To get started, let's import the necessary libraries.
import os
import tempfile
import typing
from dataclasses import dataclass

import pandas as pd
from dataclasses_json import dataclass_json
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
from flytekit.types.schema import FlyteSchema


# %%
# This Datum is a user defined complex type, which can be used to pass complex data between tasks.
# Moreover, users can also pass this data between different languages and also input through the Flyteconsole as a
# raw JSON.
#
# .. note::
#
# Only other supported types can be nested in this class, for example it can only contain other ``@dataclass_json``
# annotated dataclasses if you want to use complex classes. Arbitrary classes will cause a **failure**.
#
# .. note::
#
# All variables in DataClasses should be **annotated with their type**. Failure to do should will result in an error
# We define a simple data class that can be sent between tasks.
@dataclass_json
@dataclass
class Datum(object):
Expand All @@ -42,7 +43,32 @@ class Datum(object):


# %%
# Once declared, dataclasses can be returned as outputs or accepted as inputs
# ``Datum`` is a user defined complex type that can be used to pass complex data between tasks.
# Interestingly, users can send this data between different tasks written in different languages and input it through the Flyte Console as raw JSON.
#
# .. note::
#
# All variables in a data class should be **annotated with their type**. Failure to do should will result in an error.

# %%
# Next, we define a data class that accepts :std:ref:`FlyteSchema <typed_schema>`, :std:ref:`FlyteFile <sphx_glr_auto_core_flyte_basics_files.py>`,
# and :std:ref:`FlyteDirectory <sphx_glr_auto_core_flyte_basics_folders.py>`.
@dataclass_json
@dataclass
class Result:
schema: FlyteSchema
file: FlyteFile
directory: FlyteDirectory


# %%
# .. note::
#
# A data class supports the usage of data associated with Python types, data classes, FlyteFile, FlyteDirectory, and FlyteSchema.
#
# Once declared, dataclasses can be returned as outputs or accepted as inputs.
#
# 1. Datum Data Class
@task
def stringify(x: int) -> Datum:
"""
Expand All @@ -62,15 +88,60 @@ def add(x: Datum, y: Datum) -> Datum:


# %%
# Workflow creation remains identical
# The ``stringify`` task outputs a data class, and the ``add`` task accepts data classes as inputs.
#
# 2. Result Data Class
@task
def upload_result() -> Result:
"""
Flytekit will upload FlyteFile, FlyteDirectory, and FlyteSchema to blob store (GCP, S3)
"""
df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]})
temp_dir = tempfile.mkdtemp(prefix="flyte-")

schema_path = temp_dir + "/schema.parquet"
df.to_parquet(schema_path)

file_path = tempfile.NamedTemporaryFile(delete=False)
file_path.write(b"Hello world!")
fs = Result(
schema=FlyteSchema(temp_dir),
file=FlyteFile(file_path.name),
directory=FlyteDirectory(temp_dir),
)
return fs


@task
def download_result(res: Result):
"""
Flytekit will lazily load FlyteSchema. We download the schema only when users invoke open().
"""
assert pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}).equals(
res.schema.open().all()
)
f = open(res.file, "r")
assert f.read() == "Hello world!"
assert os.listdir(res.directory) == ["schema.parquet"]


# %%
# The ``upload_result`` task outputs a data class, and the ``download_result`` task accepts data classes as inputs.

# %%
# Lastly, we create a workflow.
@workflow
def wf(x: int, y: int) -> Datum:
def wf(x: int, y: int) -> (Datum, Result):
"""
Dataclasses (JSON) can be returned from a workflow as well.
"""
return add(x=stringify(x=x), y=stringify(x=y))
res = upload_result()
download_result(res=res)
return add(x=stringify(x=x), y=stringify(x=y)), res


# %%
# We can run the workflow locally.
if __name__ == "__main__":
"""
This workflow can be run locally. During local execution also, the dataclasses will be marshalled to and from json.
Expand Down

0 comments on commit c9be6c4

Please sign in to comment.