From f39d89c2de15fe33c911422206a4e85e33a8bceb Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Mon, 18 Nov 2024 12:02:14 +0800 Subject: [PATCH 1/9] [Docs] MessagePack IDL, Pydantic Support, and Attribute Access Signed-off-by: Future-Outlier --- .../data_types_and_io/attribute_access.py | 88 ------ .../dataclass_attribute_access.py | 286 ++++++++++++++++++ .../pydantic_basemodel_attribute_access.py | 238 +++++++++++++++ 3 files changed, 524 insertions(+), 88 deletions(-) delete mode 100644 examples/data_types_and_io/data_types_and_io/attribute_access.py create mode 100644 examples/data_types_and_io/data_types_and_io/dataclass_attribute_access.py create mode 100644 examples/data_types_and_io/data_types_and_io/pydantic_basemodel_attribute_access.py diff --git a/examples/data_types_and_io/data_types_and_io/attribute_access.py b/examples/data_types_and_io/data_types_and_io/attribute_access.py deleted file mode 100644 index cb430683b..000000000 --- a/examples/data_types_and_io/data_types_and_io/attribute_access.py +++ /dev/null @@ -1,88 +0,0 @@ -from dataclasses import dataclass - -from dataclasses_json import dataclass_json -from flytekit import task, workflow - - -@task -def print_message(message: str): - print(message) - return - - -# Access an output list using index notation -@task -def list_task() -> list[str]: - return ["apple", "banana"] - - -@workflow -def list_wf(): - items = list_task() - first_item = items[0] - print_message(message=first_item) - - -# Access the output dictionary by specifying the key -@task -def dict_task() -> dict[str, str]: - return {"fruit": "banana"} - - -@workflow -def dict_wf(): - fruit_dict = dict_task() - print_message(message=fruit_dict["fruit"]) - - -# Directly access an attribute of a dataclass -@dataclass_json -@dataclass -class Fruit: - name: str - - -@task -def dataclass_task() -> Fruit: - return Fruit(name="banana") - - -@workflow -def dataclass_wf(): - fruit_instance = dataclass_task() - print_message(message=fruit_instance.name) - - -# Combinations of list, dict and dataclass also work effectively -@task -def advance_task() -> (dict[str, list[str]], list[dict[str, str]], dict[str, Fruit]): - return {"fruits": ["banana"]}, [{"fruit": "banana"}], {"fruit": Fruit(name="banana")} - - -@task -def print_list(fruits: list[str]): - print(fruits) - - -@task -def print_dict(fruit_dict: dict[str, str]): - print(fruit_dict) - - -@workflow -def advanced_workflow(): - dictionary_list, list_dict, dict_dataclass = advance_task() - print_message(message=dictionary_list["fruits"][0]) - print_message(message=list_dict[0]["fruit"]) - print_message(message=dict_dataclass["fruit"].name) - - print_list(fruits=dictionary_list["fruits"]) - print_dict(fruit_dict=list_dict[0]) - - -# Run the workflows locally -if __name__ == "__main__": - list_wf() - dict_wf() - dataclass_wf() - advanced_workflow() diff --git a/examples/data_types_and_io/data_types_and_io/dataclass_attribute_access.py b/examples/data_types_and_io/data_types_and_io/dataclass_attribute_access.py new file mode 100644 index 000000000..404243551 --- /dev/null +++ b/examples/data_types_and_io/data_types_and_io/dataclass_attribute_access.py @@ -0,0 +1,286 @@ +from dataclasses import dataclass, field +from typing import Dict, List +from flytekit.types.file import FlyteFile +from flytekit import task, workflow, ImageSpec +from enum import Enum + +image_spec = ImageSpec( + registry="ghcr.io/flyteorg", +) + +class Status(Enum): + PENDING = "pending" + APPROVED = "approved" + REJECTED = "rejected" + + +@dataclass +class InnerDC: + a: int = -1 + b: float = 2.1 + c: str = "Hello, Flyte" + d: bool = False + e: List[int] = field(default_factory=lambda: [0, 1, 2, -1, -2]) + f: List[FlyteFile] = field( + default_factory=lambda: [ + FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" + "examples/data_types_and_io/data_types_and_io/attribute_access.py" + ) + ] + ) + g: List[List[int]] = field(default_factory=lambda: [[0], [1], [-1]]) + h: List[Dict[int, bool]] = field( + default_factory=lambda: [{0: False}, {1: True}, {-1: True}] + ) + i: Dict[int, bool] = field( + default_factory=lambda: {0: False, 1: True, -1: False} + ) + j: Dict[int, FlyteFile] = field( + default_factory=lambda: { + 0: FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" + "examples/data_types_and_io/data_types_and_io/attribute_access.py" + ), + 1: FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" + "examples/data_types_and_io/data_types_and_io/attribute_access.py" + ), + -1: FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" + "examples/data_types_and_io/data_types_and_io/attribute_access.py" + ), + } + ) + k: Dict[int, List[int]] = field( + default_factory=lambda: {0: [0, 1, -1]} + ) + l: Dict[int, Dict[int, int]] = field( + default_factory=lambda: {1: {-1: 0}} + ) + m: dict = field(default_factory=lambda: {"key": "value"}) + n: FlyteFile = field( + default_factory=lambda: FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" + "examples/data_types_and_io/data_types_and_io/attribute_access.py" + ) + ) + enum_status: Status = field(default=Status.PENDING) + + +@dataclass +class DC: + a: int = -1 + b: float = 2.1 + c: str = "Hello, Flyte" + d: bool = False + e: List[int] = field(default_factory=lambda: [0, 1, 2, -1, -2]) + f: List[FlyteFile] = field( + default_factory=lambda: [ + FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" + "examples/data_types_and_io/data_types_and_io/attribute_access.py" + ) + ] + ) + g: List[List[int]] = field(default_factory=lambda: [[0], [1], [-1]]) + h: List[Dict[int, bool]] = field( + default_factory=lambda: [{0: False}, {1: True}, {-1: True}] + ) + i: Dict[int, bool] = field( + default_factory=lambda: {0: False, 1: True, -1: False} + ) + j: Dict[int, FlyteFile] = field( + default_factory=lambda: { + 0: FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" + "examples/data_types_and_io/data_types_and_io/attribute_access.py" + ), + 1: FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" + "examples/data_types_and_io/data_types_and_io/attribute_access.py" + ), + -1: FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" + "examples/data_types_and_io/data_types_and_io/attribute_access.py" + ), + } + ) + k: Dict[int, List[int]] = field( + default_factory=lambda: {0: [0, 1, -1]} + ) + l: Dict[int, Dict[int, int]] = field( + default_factory=lambda: {1: {-1: 0}} + ) + m: dict = field(default_factory=lambda: {"key": "value"}) + n: FlyteFile = field( + default_factory=lambda: FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" + "examples/data_types_and_io/data_types_and_io/attribute_access.py" + ) + ) + inner_dc: InnerDC = field(default_factory=lambda: InnerDC()) + enum_status: Status = field(default=Status.PENDING) + + +@task(container_image=image_spec) +def t_dc(dc: DC) -> DC: + return dc + + +@task(container_image=image_spec) +def t_inner(inner_dc: InnerDC) -> InnerDC: + assert isinstance(inner_dc, InnerDC), "inner_dc is not of type InnerDC" + + # f: List[FlyteFile] + for ff in inner_dc.f: + assert isinstance(ff, FlyteFile), "Expected FlyteFile" + with open(ff, "r") as f: + print(f.read()) + + # j: Dict[int, FlyteFile] + for _, ff in inner_dc.j.items(): + assert isinstance(ff, FlyteFile), "Expected FlyteFile in j" + with open(ff, "r") as f: + print(f.read()) + + # n: FlyteFile + assert isinstance(inner_dc.n, FlyteFile), "n is not FlyteFile" + with open(inner_dc.n, "r") as f: + print(f.read()) + + assert inner_dc.enum_status == Status.PENDING, "enum_status does not match" + + return inner_dc + + +@task(container_image=image_spec) +def t_test_all_attributes( + a: int, + b: float, + c: str, + d: bool, + e: List[int], + f: List[FlyteFile], + g: List[List[int]], + h: List[Dict[int, bool]], + i: Dict[int, bool], + j: Dict[int, FlyteFile], + k: Dict[int, List[int]], + l: Dict[int, Dict[int, int]], + m: dict, + n: FlyteFile, + enum_status: Status, +): + + # Strict type checks for simple types + assert isinstance(a, int), f"a is not int, it's {type(a)}" + assert a == -1 + assert isinstance(b, float), f"b is not float, it's {type(b)}" + assert isinstance(c, str), f"c is not str, it's {type(c)}" + assert isinstance(d, bool), f"d is not bool, it's {type(d)}" + + # Strict type checks for List[int] + assert isinstance(e, list) and all( + isinstance(i, int) for i in e + ), "e is not List[int]" + + # Strict type checks for List[FlyteFile] + assert isinstance(f, list) and all( + isinstance(i, FlyteFile) for i in f + ), "f is not List[FlyteFile]" + + # Strict type checks for List[List[int]] + assert isinstance(g, list) and all( + isinstance(i, list) and all(isinstance(j, int) for j in i) for i in g + ), "g is not List[List[int]]" + + # Strict type checks for List[Dict[int, bool]] + assert isinstance(h, list) and all( + isinstance(i, dict) + and all(isinstance(k, int) and isinstance(v, bool) for k, v in i.items()) + for i in h + ), "h is not List[Dict[int, bool]]" + + # Strict type checks for Dict[int, bool] + assert isinstance(i, dict) and all( + isinstance(k, int) and isinstance(v, bool) for k, v in i.items() + ), "i is not Dict[int, bool]" + + # Strict type checks for Dict[int, FlyteFile] + assert isinstance(j, dict) and all( + isinstance(k, int) and isinstance(v, FlyteFile) for k, v in j.items() + ), "j is not Dict[int, FlyteFile]" + + # Strict type checks for Dict[int, List[int]] + assert isinstance(k, dict) and all( + isinstance(k, int) + and isinstance(v, list) + and all(isinstance(i, int) for i in v) + for k, v in k.items() + ), "k is not Dict[int, List[int]]" + + # Strict type checks for Dict[int, Dict[int, int]] + assert isinstance(l, dict) and all( + isinstance(k, int) + and isinstance(v, dict) + and all( + isinstance(sub_k, int) and isinstance(sub_v, int) + for sub_k, sub_v in v.items() + ) + for k, v in l.items() + ), "l is not Dict[int, Dict[int, int]]" + + # Strict type check for a generic dict + assert isinstance(m, dict), "m is not dict" + + # Strict type check for FlyteFile + assert isinstance(n, FlyteFile), "n is not FlyteFile" + + # Strict type check for Enum + assert isinstance(enum_status, Status), "enum_status is not Status" + + print("All attributes passed strict type checks.") + + +@workflow +def wf(dc: DC): + new_dc = t_dc(dc=dc) + t_inner(new_dc.inner_dc) + t_test_all_attributes( + a=new_dc.a, + b=new_dc.b, + c=new_dc.c, + d=new_dc.d, + e=new_dc.e, + f=new_dc.f, + g=new_dc.g, + h=new_dc.h, + i=new_dc.i, + j=new_dc.j, + k=new_dc.k, + l=new_dc.l, + m=new_dc.m, + n=new_dc.n, + enum_status=new_dc.enum_status, + ) + t_test_all_attributes( + a=new_dc.inner_dc.a, + b=new_dc.inner_dc.b, + c=new_dc.inner_dc.c, + d=new_dc.inner_dc.d, + e=new_dc.inner_dc.e, + f=new_dc.inner_dc.f, + g=new_dc.inner_dc.g, + h=new_dc.inner_dc.h, + i=new_dc.inner_dc.i, + j=new_dc.inner_dc.j, + k=new_dc.inner_dc.k, + l=new_dc.inner_dc.l, + m=new_dc.inner_dc.m, + n=new_dc.inner_dc.n, + enum_status=new_dc.inner_dc.enum_status, + ) + +if __name__ == "__main__": + wf(dc=DC()) diff --git a/examples/data_types_and_io/data_types_and_io/pydantic_basemodel_attribute_access.py b/examples/data_types_and_io/data_types_and_io/pydantic_basemodel_attribute_access.py new file mode 100644 index 000000000..58e3fdeb9 --- /dev/null +++ b/examples/data_types_and_io/data_types_and_io/pydantic_basemodel_attribute_access.py @@ -0,0 +1,238 @@ +from typing import Dict, List +from flytekit.types.file import FlyteFile +from flytekit import task, workflow, ImageSpec +from enum import Enum +from pydantic import BaseModel, Field + +image_spec = ImageSpec( + registry="ghcr.io/flyteorg", + packages=["pydantic"], +) + +class Status(Enum): + PENDING = "pending" + APPROVED = "approved" + REJECTED = "rejected" + + +class InnerBM(BaseModel): + a: int = -1 + b: float = 2.1 + c: str = "Hello, Flyte" + d: bool = False + e: List[int] = Field(default_factory=lambda: [0, 1, 2, -1, -2]) + f: List[FlyteFile] = Field(default_factory=lambda: [ + FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" + ) + ]) + g: List[List[int]] = Field(default_factory=lambda: [[0], [1], [-1]]) + h: List[Dict[int, bool]] = Field(default_factory=lambda: [{0: False}, {1: True}, {-1: True}]) + i: Dict[int, bool] = Field(default_factory=lambda: {0: False, 1: True, -1: False}) + j: Dict[int, FlyteFile] = Field( + default_factory=lambda: { + 0: FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" + ), + 1: FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" + ), + -1: FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" + ), + } + ) + k: Dict[int, List[int]] = Field(default_factory=lambda: {0: [0, 1, -1]}) + l: Dict[int, Dict[int, int]] = Field(default_factory=lambda: {1: {-1: 0}}) + m: dict = Field(default_factory=lambda: {"key": "value"}) + n: FlyteFile = Field( + default_factory=lambda: FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" + ) + ) + enum_status: Status = Status.PENDING + + +class BM(BaseModel): + a: int = -1 + b: float = 2.1 + c: str = "Hello, Flyte" + d: bool = False + e: List[int] = Field(default_factory=lambda: [0, 1, 2, -1, -2]) + f: List[FlyteFile] = Field(default_factory=lambda: [ + FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" + ) + ]) + g: List[List[int]] = Field(default_factory=lambda: [[0], [1], [-1]]) + h: List[Dict[int, bool]] = Field(default_factory=lambda: [{0: False}, {1: True}, {-1: True}]) + i: Dict[int, bool] = Field(default_factory=lambda: {0: False, 1: True, -1: False}) + j: Dict[int, FlyteFile] = Field( + default_factory=lambda: { + 0: FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" + ), + 1: FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" + ), + -1: FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" + ), + } + ) + k: Dict[int, List[int]] = Field(default_factory=lambda: {0: [0, 1, -1]}) + l: Dict[int, Dict[int, int]] = Field(default_factory=lambda: {1: {-1: 0}}) + m: dict = Field(default_factory=lambda: {"key": "value"}) + n: FlyteFile = Field( + default_factory=lambda: FlyteFile( + "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" + ) + ) + inner_bm: InnerBM = Field(default_factory=InnerBM) + enum_status: Status = Status.PENDING + + +@task(container_image=image) +def t_bm(bm: BM) -> BM: + return bm + + +@task(container_image=image) +def t_inner(inner_bm: InnerBM) -> InnerBM: + assert isinstance(inner_bm, InnerBM), "inner_bm is not of type InnerBM" + + # f: List[FlyteFile] + for ff in inner_bm.f: + assert isinstance(ff, FlyteFile), "Expected FlyteFile" + with open(ff, "r") as f: + print(f.read()) + + # j: Dict[int, FlyteFile] + for _, ff in inner_bm.j.items(): + assert isinstance(ff, FlyteFile), "Expected FlyteFile in j" + with open(ff, "r") as f: + print(f.read()) + + # n: FlyteFile + assert isinstance(inner_bm.n, FlyteFile), "n is not FlyteFile" + with open(inner_bm.n, "r") as f: + print(f.read()) + + assert inner_bm.enum_status == Status.PENDING, "enum_status does not match" + + return inner_bm + + +@task(container_image=image) +def t_test_all_attributes( + a: int, + b: float, + c: str, + d: bool, + e: List[int], + f: List[FlyteFile], + g: List[List[int]], + h: List[Dict[int, bool]], + i: Dict[int, bool], + j: Dict[int, FlyteFile], + k: Dict[int, List[int]], + l: Dict[int, Dict[int, int]], + m: dict, + n: FlyteFile, + enum_status: Status, +): + + # Strict type checks for simple types + assert isinstance(a, int), f"a is not int, it's {type(a)}" + assert a == -1 + assert isinstance(b, float), f"b is not float, it's {type(b)}" + assert isinstance(c, str), f"c is not str, it's {type(c)}" + assert isinstance(d, bool), f"d is not bool, it's {type(d)}" + + # Strict type checks for List[int] + assert isinstance(e, list) and all(isinstance(i, int) for i in e), "e is not List[int]" + + # Strict type checks for List[FlyteFile] + assert isinstance(f, list) and all(isinstance(i, FlyteFile) for i in f), "f is not List[FlyteFile]" + + # Strict type checks for List[List[int]] + assert isinstance(g, list) and all( + isinstance(i, list) and all(isinstance(j, int) for j in i) for i in g + ), "g is not List[List[int]]" + + # Strict type checks for List[Dict[int, bool]] + assert isinstance(h, list) and all( + isinstance(i, dict) and all(isinstance(k, int) and isinstance(v, bool) for k, v in i.items()) for i in h + ), "h is not List[Dict[int, bool]]" + + # Strict type checks for Dict[int, bool] + assert isinstance(i, dict) and all( + isinstance(k, int) and isinstance(v, bool) for k, v in i.items() + ), "i is not Dict[int, bool]" + + # Strict type checks for Dict[int, FlyteFile] + assert isinstance(j, dict) and all( + isinstance(k, int) and isinstance(v, FlyteFile) for k, v in j.items() + ), "j is not Dict[int, FlyteFile]" + + # Strict type checks for Dict[int, List[int]] + assert isinstance(k, dict) and all( + isinstance(k, int) and isinstance(v, list) and all(isinstance(i, int) for i in v) for k, v in k.items() + ), "k is not Dict[int, List[int]]" + + # Strict type checks for Dict[int, Dict[int, int]] + assert isinstance(l, dict) and all( + isinstance(k, int) and isinstance(v, dict) and all(isinstance(sub_k, int) and isinstance(sub_v, int) for sub_k, sub_v in v.items()) + for k, v in l.items() + ), "l is not Dict[int, Dict[int, int]]" + + # Strict type check for a generic dict + assert isinstance(m, dict), "m is not dict" + + # Strict type check for FlyteFile + assert isinstance(n, FlyteFile), "n is not FlyteFile" + + # Strict type check for Enum + assert isinstance(enum_status, Status), "enum_status is not Status" + + print("All attributes passed strict type checks.") + + +@workflow +def wf(bm: BM): + new_bm = t_bm(bm=bm) + t_inner(new_bm.inner_bm) + t_test_all_attributes( + a=new_bm.a, + b=new_bm.b, + c=new_bm.c, + d=new_bm.d, + e=new_bm.e, + f=new_bm.f, + g=new_bm.g, + h=new_bm.h, + i=new_bm.i, + j=new_bm.j, + k=new_bm.k, + l=new_bm.l, + m=new_bm.m, + n=new_bm.n, + enum_status=new_bm.enum_status, + ) + t_test_all_attributes( + a=new_bm.inner_bm.a, + b=new_bm.inner_bm.b, + c=new_bm.inner_bm.c, + d=new_bm.inner_bm.d, + e=new_bm.inner_bm.e, + f=new_bm.inner_bm.f, + g=new_bm.inner_bm.g, + h=new_bm.inner_bm.h, + i=new_bm.inner_bm.i, + j=new_bm.inner_bm.j, + k=new_bm.inner_bm.k, + l=new_bm.inner_bm.l, + m=new_bm.inner_bm.m, + n=new_bm.inner_bm.n, + enum_status=new_bm.inner_bm.enum_status, + ) From bfe15cf2e9b0257482361172f66337ffff228c73 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Mon, 18 Nov 2024 12:04:12 +0800 Subject: [PATCH 2/9] upadte Signed-off-by: Future-Outlier --- .../{dataclass_attribute_access.py => attribute_access.py} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename examples/data_types_and_io/data_types_and_io/{dataclass_attribute_access.py => attribute_access.py} (100%) diff --git a/examples/data_types_and_io/data_types_and_io/dataclass_attribute_access.py b/examples/data_types_and_io/data_types_and_io/attribute_access.py similarity index 100% rename from examples/data_types_and_io/data_types_and_io/dataclass_attribute_access.py rename to examples/data_types_and_io/data_types_and_io/attribute_access.py From 7a5196a50ff54eeb984f2c1d68d47f6ff75c64f2 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 20 Nov 2024 11:39:46 +0800 Subject: [PATCH 3/9] update Signed-off-by: Future-Outlier --- .../data_types_and_io/attribute_access.py | 314 ++++-------------- .../pydantic_basemodel_attribute_access.py | 312 +++++------------ 2 files changed, 141 insertions(+), 485 deletions(-) diff --git a/examples/data_types_and_io/data_types_and_io/attribute_access.py b/examples/data_types_and_io/data_types_and_io/attribute_access.py index 404243551..a89320507 100644 --- a/examples/data_types_and_io/data_types_and_io/attribute_access.py +++ b/examples/data_types_and_io/data_types_and_io/attribute_access.py @@ -1,286 +1,86 @@ -from dataclasses import dataclass, field -from typing import Dict, List -from flytekit.types.file import FlyteFile -from flytekit import task, workflow, ImageSpec -from enum import Enum +from dataclasses import dataclass -image_spec = ImageSpec( - registry="ghcr.io/flyteorg", -) +from flytekit import task, workflow -class Status(Enum): - PENDING = "pending" - APPROVED = "approved" - REJECTED = "rejected" - -@dataclass -class InnerDC: - a: int = -1 - b: float = 2.1 - c: str = "Hello, Flyte" - d: bool = False - e: List[int] = field(default_factory=lambda: [0, 1, 2, -1, -2]) - f: List[FlyteFile] = field( - default_factory=lambda: [ - FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" - "examples/data_types_and_io/data_types_and_io/attribute_access.py" - ) - ] - ) - g: List[List[int]] = field(default_factory=lambda: [[0], [1], [-1]]) - h: List[Dict[int, bool]] = field( - default_factory=lambda: [{0: False}, {1: True}, {-1: True}] - ) - i: Dict[int, bool] = field( - default_factory=lambda: {0: False, 1: True, -1: False} - ) - j: Dict[int, FlyteFile] = field( - default_factory=lambda: { - 0: FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" - "examples/data_types_and_io/data_types_and_io/attribute_access.py" - ), - 1: FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" - "examples/data_types_and_io/data_types_and_io/attribute_access.py" - ), - -1: FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" - "examples/data_types_and_io/data_types_and_io/attribute_access.py" - ), - } - ) - k: Dict[int, List[int]] = field( - default_factory=lambda: {0: [0, 1, -1]} - ) - l: Dict[int, Dict[int, int]] = field( - default_factory=lambda: {1: {-1: 0}} - ) - m: dict = field(default_factory=lambda: {"key": "value"}) - n: FlyteFile = field( - default_factory=lambda: FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" - "examples/data_types_and_io/data_types_and_io/attribute_access.py" - ) - ) - enum_status: Status = field(default=Status.PENDING) - - -@dataclass -class DC: - a: int = -1 - b: float = 2.1 - c: str = "Hello, Flyte" - d: bool = False - e: List[int] = field(default_factory=lambda: [0, 1, 2, -1, -2]) - f: List[FlyteFile] = field( - default_factory=lambda: [ - FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" - "examples/data_types_and_io/data_types_and_io/attribute_access.py" - ) - ] - ) - g: List[List[int]] = field(default_factory=lambda: [[0], [1], [-1]]) - h: List[Dict[int, bool]] = field( - default_factory=lambda: [{0: False}, {1: True}, {-1: True}] - ) - i: Dict[int, bool] = field( - default_factory=lambda: {0: False, 1: True, -1: False} - ) - j: Dict[int, FlyteFile] = field( - default_factory=lambda: { - 0: FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" - "examples/data_types_and_io/data_types_and_io/attribute_access.py" - ), - 1: FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" - "examples/data_types_and_io/data_types_and_io/attribute_access.py" - ), - -1: FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" - "examples/data_types_and_io/data_types_and_io/attribute_access.py" - ), - } - ) - k: Dict[int, List[int]] = field( - default_factory=lambda: {0: [0, 1, -1]} - ) - l: Dict[int, Dict[int, int]] = field( - default_factory=lambda: {1: {-1: 0}} - ) - m: dict = field(default_factory=lambda: {"key": "value"}) - n: FlyteFile = field( - default_factory=lambda: FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/" - "examples/data_types_and_io/data_types_and_io/attribute_access.py" - ) - ) - inner_dc: InnerDC = field(default_factory=lambda: InnerDC()) - enum_status: Status = field(default=Status.PENDING) - - -@task(container_image=image_spec) -def t_dc(dc: DC) -> DC: - return dc +@task +def print_message(message: str): + print(message) + return -@task(container_image=image_spec) -def t_inner(inner_dc: InnerDC) -> InnerDC: - assert isinstance(inner_dc, InnerDC), "inner_dc is not of type InnerDC" +# Access an output list using index notation +@task +def list_task() -> list[str]: + return ["apple", "banana"] - # f: List[FlyteFile] - for ff in inner_dc.f: - assert isinstance(ff, FlyteFile), "Expected FlyteFile" - with open(ff, "r") as f: - print(f.read()) - # j: Dict[int, FlyteFile] - for _, ff in inner_dc.j.items(): - assert isinstance(ff, FlyteFile), "Expected FlyteFile in j" - with open(ff, "r") as f: - print(f.read()) +@workflow +def list_wf(): + items = list_task() + first_item = items[0] + print_message(message=first_item) - # n: FlyteFile - assert isinstance(inner_dc.n, FlyteFile), "n is not FlyteFile" - with open(inner_dc.n, "r") as f: - print(f.read()) - assert inner_dc.enum_status == Status.PENDING, "enum_status does not match" +# Access the output dictionary by specifying the key +@task +def dict_task() -> dict[str, str]: + return {"fruit": "banana"} - return inner_dc +@workflow +def dict_wf(): + fruit_dict = dict_task() + print_message(message=fruit_dict["fruit"]) -@task(container_image=image_spec) -def t_test_all_attributes( - a: int, - b: float, - c: str, - d: bool, - e: List[int], - f: List[FlyteFile], - g: List[List[int]], - h: List[Dict[int, bool]], - i: Dict[int, bool], - j: Dict[int, FlyteFile], - k: Dict[int, List[int]], - l: Dict[int, Dict[int, int]], - m: dict, - n: FlyteFile, - enum_status: Status, -): - # Strict type checks for simple types - assert isinstance(a, int), f"a is not int, it's {type(a)}" - assert a == -1 - assert isinstance(b, float), f"b is not float, it's {type(b)}" - assert isinstance(c, str), f"c is not str, it's {type(c)}" - assert isinstance(d, bool), f"d is not bool, it's {type(d)}" +# Directly access an attribute of a dataclass +@dataclass +class Fruit: + name: str - # Strict type checks for List[int] - assert isinstance(e, list) and all( - isinstance(i, int) for i in e - ), "e is not List[int]" - # Strict type checks for List[FlyteFile] - assert isinstance(f, list) and all( - isinstance(i, FlyteFile) for i in f - ), "f is not List[FlyteFile]" +@task +def dataclass_task() -> Fruit: + return Fruit(name="banana") - # Strict type checks for List[List[int]] - assert isinstance(g, list) and all( - isinstance(i, list) and all(isinstance(j, int) for j in i) for i in g - ), "g is not List[List[int]]" - # Strict type checks for List[Dict[int, bool]] - assert isinstance(h, list) and all( - isinstance(i, dict) - and all(isinstance(k, int) and isinstance(v, bool) for k, v in i.items()) - for i in h - ), "h is not List[Dict[int, bool]]" +@workflow +def dataclass_wf(): + fruit_instance = dataclass_task() + print_message(message=fruit_instance.name) - # Strict type checks for Dict[int, bool] - assert isinstance(i, dict) and all( - isinstance(k, int) and isinstance(v, bool) for k, v in i.items() - ), "i is not Dict[int, bool]" - # Strict type checks for Dict[int, FlyteFile] - assert isinstance(j, dict) and all( - isinstance(k, int) and isinstance(v, FlyteFile) for k, v in j.items() - ), "j is not Dict[int, FlyteFile]" +# Combinations of list, dict and dataclass also work effectively +@task +def advance_task() -> (dict[str, list[str]], list[dict[str, str]], dict[str, Fruit]): + return {"fruits": ["banana"]}, [{"fruit": "banana"}], {"fruit": Fruit(name="banana")} - # Strict type checks for Dict[int, List[int]] - assert isinstance(k, dict) and all( - isinstance(k, int) - and isinstance(v, list) - and all(isinstance(i, int) for i in v) - for k, v in k.items() - ), "k is not Dict[int, List[int]]" - # Strict type checks for Dict[int, Dict[int, int]] - assert isinstance(l, dict) and all( - isinstance(k, int) - and isinstance(v, dict) - and all( - isinstance(sub_k, int) and isinstance(sub_v, int) - for sub_k, sub_v in v.items() - ) - for k, v in l.items() - ), "l is not Dict[int, Dict[int, int]]" +@task +def print_list(fruits: list[str]): + print(fruits) - # Strict type check for a generic dict - assert isinstance(m, dict), "m is not dict" - # Strict type check for FlyteFile - assert isinstance(n, FlyteFile), "n is not FlyteFile" +@task +def print_dict(fruit_dict: dict[str, str]): + print(fruit_dict) - # Strict type check for Enum - assert isinstance(enum_status, Status), "enum_status is not Status" - print("All attributes passed strict type checks.") +@workflow +def advanced_workflow(): + dictionary_list, list_dict, dict_dataclass = advance_task() + print_message(message=dictionary_list["fruits"][0]) + print_message(message=list_dict[0]["fruit"]) + print_message(message=dict_dataclass["fruit"].name) + print_list(fruits=dictionary_list["fruits"]) + print_dict(fruit_dict=list_dict[0]) -@workflow -def wf(dc: DC): - new_dc = t_dc(dc=dc) - t_inner(new_dc.inner_dc) - t_test_all_attributes( - a=new_dc.a, - b=new_dc.b, - c=new_dc.c, - d=new_dc.d, - e=new_dc.e, - f=new_dc.f, - g=new_dc.g, - h=new_dc.h, - i=new_dc.i, - j=new_dc.j, - k=new_dc.k, - l=new_dc.l, - m=new_dc.m, - n=new_dc.n, - enum_status=new_dc.enum_status, - ) - t_test_all_attributes( - a=new_dc.inner_dc.a, - b=new_dc.inner_dc.b, - c=new_dc.inner_dc.c, - d=new_dc.inner_dc.d, - e=new_dc.inner_dc.e, - f=new_dc.inner_dc.f, - g=new_dc.inner_dc.g, - h=new_dc.inner_dc.h, - i=new_dc.inner_dc.i, - j=new_dc.inner_dc.j, - k=new_dc.inner_dc.k, - l=new_dc.inner_dc.l, - m=new_dc.inner_dc.m, - n=new_dc.inner_dc.n, - enum_status=new_dc.inner_dc.enum_status, - ) +# Run the workflows locally if __name__ == "__main__": - wf(dc=DC()) + list_wf() + dict_wf() + dataclass_wf() + advanced_workflow() diff --git a/examples/data_types_and_io/data_types_and_io/pydantic_basemodel_attribute_access.py b/examples/data_types_and_io/data_types_and_io/pydantic_basemodel_attribute_access.py index 58e3fdeb9..b1b388c39 100644 --- a/examples/data_types_and_io/data_types_and_io/pydantic_basemodel_attribute_access.py +++ b/examples/data_types_and_io/data_types_and_io/pydantic_basemodel_attribute_access.py @@ -1,238 +1,94 @@ -from typing import Dict, List -from flytekit.types.file import FlyteFile -from flytekit import task, workflow, ImageSpec -from enum import Enum -from pydantic import BaseModel, Field +from flytekit import ImageSpec, task, workflow +from pydantic import BaseModel image_spec = ImageSpec( registry="ghcr.io/flyteorg", packages=["pydantic"], ) +image_spec = "localhost:30000/flytekit:dev" -class Status(Enum): - PENDING = "pending" - APPROVED = "approved" - REJECTED = "rejected" - - -class InnerBM(BaseModel): - a: int = -1 - b: float = 2.1 - c: str = "Hello, Flyte" - d: bool = False - e: List[int] = Field(default_factory=lambda: [0, 1, 2, -1, -2]) - f: List[FlyteFile] = Field(default_factory=lambda: [ - FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" - ) - ]) - g: List[List[int]] = Field(default_factory=lambda: [[0], [1], [-1]]) - h: List[Dict[int, bool]] = Field(default_factory=lambda: [{0: False}, {1: True}, {-1: True}]) - i: Dict[int, bool] = Field(default_factory=lambda: {0: False, 1: True, -1: False}) - j: Dict[int, FlyteFile] = Field( - default_factory=lambda: { - 0: FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" - ), - 1: FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" - ), - -1: FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" - ), - } - ) - k: Dict[int, List[int]] = Field(default_factory=lambda: {0: [0, 1, -1]}) - l: Dict[int, Dict[int, int]] = Field(default_factory=lambda: {1: {-1: 0}}) - m: dict = Field(default_factory=lambda: {"key": "value"}) - n: FlyteFile = Field( - default_factory=lambda: FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" - ) - ) - enum_status: Status = Status.PENDING - - -class BM(BaseModel): - a: int = -1 - b: float = 2.1 - c: str = "Hello, Flyte" - d: bool = False - e: List[int] = Field(default_factory=lambda: [0, 1, 2, -1, -2]) - f: List[FlyteFile] = Field(default_factory=lambda: [ - FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" - ) - ]) - g: List[List[int]] = Field(default_factory=lambda: [[0], [1], [-1]]) - h: List[Dict[int, bool]] = Field(default_factory=lambda: [{0: False}, {1: True}, {-1: True}]) - i: Dict[int, bool] = Field(default_factory=lambda: {0: False, 1: True, -1: False}) - j: Dict[int, FlyteFile] = Field( - default_factory=lambda: { - 0: FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" - ), - 1: FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" - ), - -1: FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" - ), - } - ) - k: Dict[int, List[int]] = Field(default_factory=lambda: {0: [0, 1, -1]}) - l: Dict[int, Dict[int, int]] = Field(default_factory=lambda: {1: {-1: 0}}) - m: dict = Field(default_factory=lambda: {"key": "value"}) - n: FlyteFile = Field( - default_factory=lambda: FlyteFile( - "https://raw.githubusercontent.com/flyteorg/flytesnacks/refs/heads/master/examples/data_types_and_io/data_types_and_io/attribute_access.py" - ) - ) - inner_bm: InnerBM = Field(default_factory=InnerBM) - enum_status: Status = Status.PENDING - - -@task(container_image=image) -def t_bm(bm: BM) -> BM: - return bm - - -@task(container_image=image) -def t_inner(inner_bm: InnerBM) -> InnerBM: - assert isinstance(inner_bm, InnerBM), "inner_bm is not of type InnerBM" - - # f: List[FlyteFile] - for ff in inner_bm.f: - assert isinstance(ff, FlyteFile), "Expected FlyteFile" - with open(ff, "r") as f: - print(f.read()) - - # j: Dict[int, FlyteFile] - for _, ff in inner_bm.j.items(): - assert isinstance(ff, FlyteFile), "Expected FlyteFile in j" - with open(ff, "r") as f: - print(f.read()) - - # n: FlyteFile - assert isinstance(inner_bm.n, FlyteFile), "n is not FlyteFile" - with open(inner_bm.n, "r") as f: - print(f.read()) - - assert inner_bm.enum_status == Status.PENDING, "enum_status does not match" - - return inner_bm - - -@task(container_image=image) -def t_test_all_attributes( - a: int, - b: float, - c: str, - d: bool, - e: List[int], - f: List[FlyteFile], - g: List[List[int]], - h: List[Dict[int, bool]], - i: Dict[int, bool], - j: Dict[int, FlyteFile], - k: Dict[int, List[int]], - l: Dict[int, Dict[int, int]], - m: dict, - n: FlyteFile, - enum_status: Status, -): - - # Strict type checks for simple types - assert isinstance(a, int), f"a is not int, it's {type(a)}" - assert a == -1 - assert isinstance(b, float), f"b is not float, it's {type(b)}" - assert isinstance(c, str), f"c is not str, it's {type(c)}" - assert isinstance(d, bool), f"d is not bool, it's {type(d)}" - - # Strict type checks for List[int] - assert isinstance(e, list) and all(isinstance(i, int) for i in e), "e is not List[int]" - - # Strict type checks for List[FlyteFile] - assert isinstance(f, list) and all(isinstance(i, FlyteFile) for i in f), "f is not List[FlyteFile]" - - # Strict type checks for List[List[int]] - assert isinstance(g, list) and all( - isinstance(i, list) and all(isinstance(j, int) for j in i) for i in g - ), "g is not List[List[int]]" - - # Strict type checks for List[Dict[int, bool]] - assert isinstance(h, list) and all( - isinstance(i, dict) and all(isinstance(k, int) and isinstance(v, bool) for k, v in i.items()) for i in h - ), "h is not List[Dict[int, bool]]" - - # Strict type checks for Dict[int, bool] - assert isinstance(i, dict) and all( - isinstance(k, int) and isinstance(v, bool) for k, v in i.items() - ), "i is not Dict[int, bool]" - - # Strict type checks for Dict[int, FlyteFile] - assert isinstance(j, dict) and all( - isinstance(k, int) and isinstance(v, FlyteFile) for k, v in j.items() - ), "j is not Dict[int, FlyteFile]" - - # Strict type checks for Dict[int, List[int]] - assert isinstance(k, dict) and all( - isinstance(k, int) and isinstance(v, list) and all(isinstance(i, int) for i in v) for k, v in k.items() - ), "k is not Dict[int, List[int]]" - - # Strict type checks for Dict[int, Dict[int, int]] - assert isinstance(l, dict) and all( - isinstance(k, int) and isinstance(v, dict) and all(isinstance(sub_k, int) and isinstance(sub_v, int) for sub_k, sub_v in v.items()) - for k, v in l.items() - ), "l is not Dict[int, Dict[int, int]]" - - # Strict type check for a generic dict - assert isinstance(m, dict), "m is not dict" - - # Strict type check for FlyteFile - assert isinstance(n, FlyteFile), "n is not FlyteFile" - - # Strict type check for Enum - assert isinstance(enum_status, Status), "enum_status is not Status" - - print("All attributes passed strict type checks.") + +@task(container_image=image_spec) +def print_message(message: str): + print(message) + return + + +# Access an output list using index notation +@task(container_image=image_spec) +def list_task() -> list[str]: + return ["apple", "banana"] @workflow -def wf(bm: BM): - new_bm = t_bm(bm=bm) - t_inner(new_bm.inner_bm) - t_test_all_attributes( - a=new_bm.a, - b=new_bm.b, - c=new_bm.c, - d=new_bm.d, - e=new_bm.e, - f=new_bm.f, - g=new_bm.g, - h=new_bm.h, - i=new_bm.i, - j=new_bm.j, - k=new_bm.k, - l=new_bm.l, - m=new_bm.m, - n=new_bm.n, - enum_status=new_bm.enum_status, - ) - t_test_all_attributes( - a=new_bm.inner_bm.a, - b=new_bm.inner_bm.b, - c=new_bm.inner_bm.c, - d=new_bm.inner_bm.d, - e=new_bm.inner_bm.e, - f=new_bm.inner_bm.f, - g=new_bm.inner_bm.g, - h=new_bm.inner_bm.h, - i=new_bm.inner_bm.i, - j=new_bm.inner_bm.j, - k=new_bm.inner_bm.k, - l=new_bm.inner_bm.l, - m=new_bm.inner_bm.m, - n=new_bm.inner_bm.n, - enum_status=new_bm.inner_bm.enum_status, +def list_wf(): + items = list_task() + first_item = items[0] + print_message(message=first_item) + + +# Access the output dictionary by specifying the key +@task(container_image=image_spec) +def dict_task() -> dict[str, str]: + return {"fruit": "banana"} + + +@workflow +def dict_wf(): + fruit_dict = dict_task() + print_message(message=fruit_dict["fruit"]) + + +# Directly access an attribute of a Pydantic BaseModel +class Fruit(BaseModel): + name: str + + +@task(container_image=image_spec) +def basemodel_task() -> Fruit: + return Fruit(name="banana") + + +@workflow +def basemodel_wf(): + fruit_instance = basemodel_task() + print_message(message=fruit_instance.name) + + +# Combinations of list, dict, and BaseModel also work effectively +@task(container_image=image_spec) +def advance_task() -> (dict[str, list[str]], list[dict[str, str]], dict[str, Fruit]): + return ( + {"fruits": ["banana"]}, + [{"fruit": "banana"}], + {"fruit": Fruit(name="banana")}, ) + + +@task(container_image=image_spec) +def print_list(fruits: list[str]): + print(fruits) + + +@task(container_image=image_spec) +def print_dict(fruit_dict: dict[str, str]): + print(fruit_dict) + + +@workflow +def advanced_workflow(): + dictionary_list, list_dict, dict_basemodel = advance_task() + print_message(message=dictionary_list["fruits"][0]) + print_message(message=list_dict[0]["fruit"]) + print_message(message=dict_basemodel["fruit"].name) + + print_list(fruits=dictionary_list["fruits"]) + print_dict(fruit_dict=list_dict[0]) + + +# Run the workflows locally +if __name__ == "__main__": + list_wf() + dict_wf() + basemodel_wf() + advanced_workflow() From c9bbc1cca0e25283c122bf49dcc3f294a0516b16 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 20 Nov 2024 11:41:35 +0800 Subject: [PATCH 4/9] add pydantic Signed-off-by: Future-Outlier --- dev-requirements.in | 1 + 1 file changed, 1 insertion(+) diff --git a/dev-requirements.in b/dev-requirements.in index 23bdb828c..d31cfd256 100644 --- a/dev-requirements.in +++ b/dev-requirements.in @@ -4,6 +4,7 @@ coverage pre-commit codespell mock +pydantic pytest mypy mashumaro From 372d720c9c4b34fd8c616130a119c0f00a1b28f9 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 20 Nov 2024 11:44:16 +0800 Subject: [PATCH 5/9] Add Pydantic Signed-off-by: Future-Outlier --- examples/data_types_and_io/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/data_types_and_io/Dockerfile b/examples/data_types_and_io/Dockerfile index d93ea429b..c4b7079b3 100644 --- a/examples/data_types_and_io/Dockerfile +++ b/examples/data_types_and_io/Dockerfile @@ -19,7 +19,7 @@ RUN python3 -m venv ${VENV} ENV PATH="${VENV}/bin:$PATH" RUN --mount=type=cache,sharing=locked,mode=0777,target=/root/.cache/pip,id=pip \ - pip install flytekit pandas pyarrow + pip install flytekit pandas pyarrow pydantic RUN --mount=type=cache,sharing=locked,mode=0777,target=/root/.cache/pip,id=pip \ pip install torch --index-url https://download.pytorch.org/whl/cpu From 844d2d35e7675752110240588b4a0b5c44b527d2 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 20 Nov 2024 11:44:47 +0800 Subject: [PATCH 6/9] add pydantic Signed-off-by: Future-Outlier --- examples/data_types_and_io/requirements.in | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/data_types_and_io/requirements.in b/examples/data_types_and_io/requirements.in index 2bcce8b12..b2026eec6 100644 --- a/examples/data_types_and_io/requirements.in +++ b/examples/data_types_and_io/requirements.in @@ -3,3 +3,4 @@ torch tabulate tensorflow pyarrow +pydantic From 9a03f5112717adc93683b014349ba49aa6553ca7 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 20 Nov 2024 11:48:51 +0800 Subject: [PATCH 7/9] add pydantic>2 Signed-off-by: Future-Outlier --- dev-requirements.in | 2 +- examples/data_types_and_io/Dockerfile | 2 +- .../data_types_and_io/pydantic_basemodel_attribute_access.py | 3 +-- examples/data_types_and_io/requirements.in | 2 +- 4 files changed, 4 insertions(+), 5 deletions(-) diff --git a/dev-requirements.in b/dev-requirements.in index d31cfd256..203c9c2f5 100644 --- a/dev-requirements.in +++ b/dev-requirements.in @@ -4,7 +4,7 @@ coverage pre-commit codespell mock -pydantic +pydantic>2 pytest mypy mashumaro diff --git a/examples/data_types_and_io/Dockerfile b/examples/data_types_and_io/Dockerfile index c4b7079b3..1d9af3994 100644 --- a/examples/data_types_and_io/Dockerfile +++ b/examples/data_types_and_io/Dockerfile @@ -19,7 +19,7 @@ RUN python3 -m venv ${VENV} ENV PATH="${VENV}/bin:$PATH" RUN --mount=type=cache,sharing=locked,mode=0777,target=/root/.cache/pip,id=pip \ - pip install flytekit pandas pyarrow pydantic + pip install flytekit pandas pyarrow pydantic>2 RUN --mount=type=cache,sharing=locked,mode=0777,target=/root/.cache/pip,id=pip \ pip install torch --index-url https://download.pytorch.org/whl/cpu diff --git a/examples/data_types_and_io/data_types_and_io/pydantic_basemodel_attribute_access.py b/examples/data_types_and_io/data_types_and_io/pydantic_basemodel_attribute_access.py index b1b388c39..8ab19694c 100644 --- a/examples/data_types_and_io/data_types_and_io/pydantic_basemodel_attribute_access.py +++ b/examples/data_types_and_io/data_types_and_io/pydantic_basemodel_attribute_access.py @@ -3,9 +3,8 @@ image_spec = ImageSpec( registry="ghcr.io/flyteorg", - packages=["pydantic"], + packages=["pydantic>2"], ) -image_spec = "localhost:30000/flytekit:dev" @task(container_image=image_spec) diff --git a/examples/data_types_and_io/requirements.in b/examples/data_types_and_io/requirements.in index b2026eec6..502469658 100644 --- a/examples/data_types_and_io/requirements.in +++ b/examples/data_types_and_io/requirements.in @@ -3,4 +3,4 @@ torch tabulate tensorflow pyarrow -pydantic +pydantic>2 From b71e01d45037cea883883f33d8d93f258b9a5023 Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 20 Nov 2024 12:33:54 +0800 Subject: [PATCH 8/9] lint Signed-off-by: Future-Outlier --- .../data_types_and_io/pydantic_basemodel.py | 101 ++++++++++++++++++ .../pydantic_basemodel_attribute_access.py | 93 ---------------- 2 files changed, 101 insertions(+), 93 deletions(-) create mode 100644 examples/data_types_and_io/data_types_and_io/pydantic_basemodel.py delete mode 100644 examples/data_types_and_io/data_types_and_io/pydantic_basemodel_attribute_access.py diff --git a/examples/data_types_and_io/data_types_and_io/pydantic_basemodel.py b/examples/data_types_and_io/data_types_and_io/pydantic_basemodel.py new file mode 100644 index 000000000..b6b4bdebd --- /dev/null +++ b/examples/data_types_and_io/data_types_and_io/pydantic_basemodel.py @@ -0,0 +1,101 @@ +import os +import tempfile + +import pandas as pd +from flytekit import ImageSpec, task, workflow +from flytekit.types.directory import FlyteDirectory +from flytekit.types.file import FlyteFile +from flytekit.types.structured import StructuredDataset +from pydantic import BaseModel + +image_spec = ImageSpec( + registry="ghcr.io/flyteorg", + packages=["pandas", "pyarrow", "pydantic"], +) + + +# Python types +# Define a Pydantic model with `int`, `str`, and `dict` as the data types +class Datum(BaseModel): + x: int + y: str + z: dict[int, str] + + +# Once declared, a Pydantic model can be returned as an output or accepted as an input +@task(container_image=image_spec) +def stringify(s: int) -> Datum: + """ + A Pydantic model return will be treated as a single complex JSON return. + """ + return Datum(x=s, y=str(s), z={s: str(s)}) + + +@task(container_image=image_spec) +def add(x: Datum, y: Datum) -> Datum: + """ + Flytekit automatically converts the provided JSON into a Pydantic model. + If the structures don't match, it triggers a runtime failure. + """ + x.z.update(y.z) + return Datum(x=x.x + y.x, y=x.y + y.y, z=x.z) + + +# Flyte types +class FlyteTypes(BaseModel): + dataframe: StructuredDataset + file: FlyteFile + directory: FlyteDirectory + + +@task(container_image=image_spec) +def upload_data() -> FlyteTypes: + """ + Flytekit will upload FlyteFile, FlyteDirectory, and StructuredDataset to the blob store, + such as GCP or S3. + """ + # 1. StructuredDataset + df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) + + # 2. FlyteDirectory + temp_dir = tempfile.mkdtemp(prefix="flyte-") + df.to_parquet(os.path.join(temp_dir, "df.parquet")) + + # 3. FlyteFile + file_path = tempfile.NamedTemporaryFile(delete=False) + file_path.write(b"Hello, World!") + file_path.close() + + fs = FlyteTypes( + dataframe=StructuredDataset(dataframe=df), + file=FlyteFile(file_path.name), + directory=FlyteDirectory(temp_dir), + ) + return fs + + +@task(container_image=image_spec) +def download_data(res: FlyteTypes): + expected_df = pd.DataFrame({"Name": ["Tom", "Joseph"], "Age": [20, 22]}) + actual_df = res.dataframe.open(pd.DataFrame).all() + assert expected_df.equals(actual_df), "DataFrames do not match!" + + with open(res.file, "r") as f: + assert f.read() == "Hello, World!", "File contents do not match!" + + assert os.listdir(res.directory) == ["df.parquet"], "Directory contents do not match!" + + +# Define a workflow that calls the tasks created above +@workflow +def basemodel_wf(x: int, y: int) -> (Datum, FlyteTypes): + o1 = add(x=stringify(s=x), y=stringify(s=y)) + o2 = upload_data() + download_data(res=o2) + return o1, o2 + + +# Run the workflow locally +if __name__ == "__main__": + result = basemodel_wf(x=10, y=20) + print(result) diff --git a/examples/data_types_and_io/data_types_and_io/pydantic_basemodel_attribute_access.py b/examples/data_types_and_io/data_types_and_io/pydantic_basemodel_attribute_access.py deleted file mode 100644 index 8ab19694c..000000000 --- a/examples/data_types_and_io/data_types_and_io/pydantic_basemodel_attribute_access.py +++ /dev/null @@ -1,93 +0,0 @@ -from flytekit import ImageSpec, task, workflow -from pydantic import BaseModel - -image_spec = ImageSpec( - registry="ghcr.io/flyteorg", - packages=["pydantic>2"], -) - - -@task(container_image=image_spec) -def print_message(message: str): - print(message) - return - - -# Access an output list using index notation -@task(container_image=image_spec) -def list_task() -> list[str]: - return ["apple", "banana"] - - -@workflow -def list_wf(): - items = list_task() - first_item = items[0] - print_message(message=first_item) - - -# Access the output dictionary by specifying the key -@task(container_image=image_spec) -def dict_task() -> dict[str, str]: - return {"fruit": "banana"} - - -@workflow -def dict_wf(): - fruit_dict = dict_task() - print_message(message=fruit_dict["fruit"]) - - -# Directly access an attribute of a Pydantic BaseModel -class Fruit(BaseModel): - name: str - - -@task(container_image=image_spec) -def basemodel_task() -> Fruit: - return Fruit(name="banana") - - -@workflow -def basemodel_wf(): - fruit_instance = basemodel_task() - print_message(message=fruit_instance.name) - - -# Combinations of list, dict, and BaseModel also work effectively -@task(container_image=image_spec) -def advance_task() -> (dict[str, list[str]], list[dict[str, str]], dict[str, Fruit]): - return ( - {"fruits": ["banana"]}, - [{"fruit": "banana"}], - {"fruit": Fruit(name="banana")}, - ) - - -@task(container_image=image_spec) -def print_list(fruits: list[str]): - print(fruits) - - -@task(container_image=image_spec) -def print_dict(fruit_dict: dict[str, str]): - print(fruit_dict) - - -@workflow -def advanced_workflow(): - dictionary_list, list_dict, dict_basemodel = advance_task() - print_message(message=dictionary_list["fruits"][0]) - print_message(message=list_dict[0]["fruit"]) - print_message(message=dict_basemodel["fruit"].name) - - print_list(fruits=dictionary_list["fruits"]) - print_dict(fruit_dict=list_dict[0]) - - -# Run the workflows locally -if __name__ == "__main__": - list_wf() - dict_wf() - basemodel_wf() - advanced_workflow() From e2b6e95fa93920bff330b94862735572ff1a2b3d Mon Sep 17 00:00:00 2001 From: Future-Outlier Date: Wed, 20 Nov 2024 12:45:43 +0800 Subject: [PATCH 9/9] lint Signed-off-by: Future-Outlier --- .../data_types_and_io/data_types_and_io/pydantic_basemodel.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/data_types_and_io/data_types_and_io/pydantic_basemodel.py b/examples/data_types_and_io/data_types_and_io/pydantic_basemodel.py index b6b4bdebd..0242e0b54 100644 --- a/examples/data_types_and_io/data_types_and_io/pydantic_basemodel.py +++ b/examples/data_types_and_io/data_types_and_io/pydantic_basemodel.py @@ -97,5 +97,4 @@ def basemodel_wf(x: int, y: int) -> (Datum, FlyteTypes): # Run the workflow locally if __name__ == "__main__": - result = basemodel_wf(x=10, y=20) - print(result) + basemodel_wf(x=10, y=20)