-
Notifications
You must be signed in to change notification settings - Fork 302
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Pydantic Plugin V2 #2577
[WIP] Pydantic Plugin V2 #2577
Conversation
pydantic v1
|
Note: Langchain write code like this to import pydantic. try:
from pydantic.v1 import * # noqa: F403
except ImportError:
from pydantic import * # type: ignore # noqa: F403 |
I have 2 ways to
|
The old method, pass an encoder to BaseModel https://github.com/pydantic/pydantic/blob/main/pydantic/main.py#L1097-L1127 |
reference: pydantic/pydantic#951 |
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #2577 +/- ##
===========================================
+ Coverage 76.22% 93.68% +17.45%
===========================================
Files 187 42 -145
Lines 18938 2263 -16675
Branches 3706 0 -3706
===========================================
- Hits 14435 2120 -12315
+ Misses 3870 143 -3727
+ Partials 633 0 -633 ☔ View full report in Codecov by Sentry. |
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
d793c02
to
7055eec
Compare
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
flytekit/types/directory/types.py
Outdated
if not ctx.file_access.is_remote(uri): | ||
return expected_python_type(uri, remote_directory=False) | ||
return expected_python_type(uri) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to do this to upload dir to remote storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When pydantic
create FlyteDirectory
, it will call to_python_value
to create a python instance when the first time we initialize FlyteDirectory, which will set remote_directory
variable to false in FlyteDirectory
.
When we called to_literal
, since remote_directory=False
, upload
variable will be false, and it will not be uploaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's 1 more alternative to achieve this:
Add a parameter in user space, but I think it will make the code more messy.
Signed-off-by: Future-Outlier <[email protected]>
Pydantic V2 Examplefrom dataclasses import dataclass
import typing
import pandas as pd
from flytekit import task, ImageSpec, workflow, kwtypes
from flytekit.types.file import FlyteFile
from flytekit.types.schema import FlyteSchema
from typing import Dict, List, Optional
from flytekit.types.structured import StructuredDataset
from flytekit.types.directory import FlyteDirectory
import os
from pydantic import BaseModel
key = "19b4f2f26d2e0e57e6820c603f52678bf7d0ca4f"
flytekit_dev_version = f"https://github.com/flyteorg/flytekit.git@{key}"
pydantic_dev_version = f"git+https://github.com/flyteorg/flytekit.git@{key}#subdirectory=plugins/flytekit-pydantic"
image = ImageSpec(
registry="localhost:30000",
# registry="futureoutlier",
apt_packages=["git"],
packages=[
pydantic_dev_version,
f"git+{flytekit_dev_version}",
"pandas",
],
builder="default",
# builder="envd",
)
@dataclass
class DC:
a: int
b: str
c: Optional[FlyteFile] = None
TestSchema = FlyteSchema[kwtypes(some_str=str)]
CsvFile = FlyteFile[typing.TypeVar("csv")]
SvgDir = FlyteDirectory[typing.TypeVar("svg")]
class CustomType(BaseModel):
"""Custom type that stores the field it was used in."""
value: int
field_name: str
ff: Optional[FlyteFile] = None
fd: Optional[FlyteDirectory] = None
list_sd: Optional[List[StructuredDataset]] = None
def __repr__(self):
return f"CustomType<{self.value} {self.field_name!r}>"
# Note: we use `__get_pydantic_core_schema__` to constuct the schema
class MyModel(BaseModel):
ct: CustomType
ff: Optional[FlyteFile] = None
ff_csv: Optional[CsvFile] = None
fd: Optional[SvgDir] = None
fsc: Optional[TestSchema] = None
sd: Optional[StructuredDataset] = None
dc: Optional[DC] = None
list_int: Optional[List[int]] = []
dict_ff: Optional[Dict[str, FlyteFile]] = None
@task(container_image=image)
def t1() -> MyModel:
# Create a local directory
dir_path = "./build"
os.makedirs(dir_path, exist_ok=True)
file_path = os.path.join(dir_path, "fltedir_example.txt")
with open(file_path, "w") as f:
f.write("FlyteDirectory content")
# write a txt file
file_path = "./build/local_example.txt"
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w") as f:
f.write("Default content")
# write a csv file
data = {
'Column1': ['Row1-Col1', 'Row2-Col1', 'Row3-Col1'],
'Column2': ['Row1-Col2', 'Row2-Col2', 'Row3-Col2'],
'Column3': ['Row1-Col3', 'Row2-Col3', 'Row3-Col3'],
}
# Create DataFrame
df = pd.DataFrame(data)
# Save DataFrame to CSV
csv_path = './build/example.csv'
df.to_csv(csv_path, index=False)
# For FlyteSchema
schema = TestSchema()
df = pd.DataFrame(data={"some_str": ["a", "b", "c"]})
opened_schema = schema.open()
opened_schema.write(df)
# SturcturedDataset
df = pd.DataFrame({"name": ["Tom", "Joseph"], "age": [20, 22]})
m = MyModel(
ct=CustomType(
value=1,
field_name="my_field",
ff=FlyteFile(file_path),
# fd=FlyteDirectory(path=flytekit.current_context().working_directory)
fd=FlyteDirectory(dir_path),
list_sd=[StructuredDataset(dataframe=df), StructuredDataset(dataframe=df)]
),
ff=FlyteFile("s3://my-s3-bucket/a/example.txt"),
ff_csv=CsvFile(csv_path),
fd=SvgDir("s3://my-s3-bucket/a/"),
fsc=schema,
sd = StructuredDataset(dataframe=df),
dc=DC(a=1, b="b"),
list_int=[1, 2, 3],
dict_ff={"a": FlyteFile("s3://my-s3-bucket/a/example.txt"), "b": FlyteFile(file_path)}
)
return m
@task(container_image=image)
def t2(m: MyModel) -> MyModel:
# print(type(m.ct.ff))
print("@@@ running t2")
print(m.ct.ff)
with open(m.ct.ff, "r") as f:
print(f"Local FlyteFile {m.ct.ff.path}: {f.read()}")
with open(m.ff, "r") as f:
print(f"Remote FlyteFile {m.ff.path}: {f.read()}")
print(pd.read_csv(m.ff_csv))
with open(m.dict_ff["a"], "r") as f:
print(f"Dict Local FlyteFile {m.ct.ff.path}: {f.read()}")
print(f"Local FlyteDirectory {m.ct.fd}: {os.listdir(m.ct.fd)}")
print(f"Remote FlyteDirectory {m.fd}: {os.listdir(m.fd)}")
print(f"Local FlyteSchema:\n {m.fsc.open().all()}")
print(f"Local StructuredDataset: \n{m.sd.open(pd.DataFrame).all()}")
print(f"Local StructuredDataset: \n{m.ct.list_sd[0].open(pd.DataFrame).all()}")
print(f"Local list[int]: {m.list_int}")
return m
@task(container_image=image)
def t3(m: MyModel) -> MyModel:
print("@@@ running t2")
print(m.ct.ff)
with open(m.ct.ff, "r") as f:
print(f"Local FlyteFile {m.ct.ff.path}: {f.read()}")
with open(m.ff, "r") as f:
print(f"Remote FlyteFile {m.ff.path}: {f.read()}")
print(pd.read_csv(m.ff_csv))
with open(m.dict_ff["a"], "r") as f:
print(f"Dict Local FlyteFile {m.ct.ff.path}: {f.read()}")
print(f"Local FlyteDirectory {m.ct.fd}: {os.listdir(m.ct.fd)}")
print(f"Remote FlyteDirectory {m.fd}: {os.listdir(m.fd)}")
print(f"Local FlyteSchema:\n {m.fsc.open().all()}")
print(f"Local StructuredDataset: \n{m.sd.open(pd.DataFrame).all()}")
print(f"Local StructuredDataset: \n{m.ct.list_sd[0].open(pd.DataFrame).all()}")
print(f"Local list[int]: {m.list_int}")
return m
@workflow
def wf() -> (MyModel, MyModel):
m = t1()
return t2(m=m), t3(m=m)
if __name__ == "__main__":
wf() Pydantic V1 Example
from dataclasses import dataclass
import typing
import pandas as pd
from flytekit import task, ImageSpec, workflow, kwtypes
from flytekit.types.file import FlyteFile
from flytekit.types.schema import FlyteSchema
from typing import Dict, List, Optional
from flytekit.types.structured import StructuredDataset
from flytekit.types.directory import FlyteDirectory
import os
from pydantic.v1 import BaseModel
key = "19b4f2f26d2e0e57e6820c603f52678bf7d0ca4f"
flytekit_dev_version = f"https://github.com/flyteorg/flytekit.git@{key}"
pydantic_dev_version = f"git+https://github.com/flyteorg/flytekit.git@{key}#subdirectory=plugins/flytekit-pydantic"
image = ImageSpec(
registry="localhost:30000",
# registry="futureoutlier",
apt_packages=["git"],
packages=[
pydantic_dev_version,
f"git+{flytekit_dev_version}",
"pandas",
],
builder="default",
# builder="envd",
)
@dataclass
class DC:
a: int
b: str
c: Optional[FlyteFile] = None
TestSchema = FlyteSchema[kwtypes(some_str=str)]
CsvFile = FlyteFile[typing.TypeVar("csv")]
SvgDir = FlyteDirectory[typing.TypeVar("svg")]
class CustomType(BaseModel):
"""Custom type that stores the field it was used in."""
value: int
field_name: str
ff: Optional[FlyteFile] = None
fd: Optional[FlyteDirectory] = None
list_sd: Optional[List[StructuredDataset]] = None
def __repr__(self):
return f"CustomType<{self.value} {self.field_name!r}>"
# Note: we use `__get_pydantic_core_schema__` to constuct the schema
class MyModel(BaseModel):
ct: CustomType
ff: Optional[FlyteFile] = None
ff_csv: Optional[CsvFile] = None
fd: Optional[SvgDir] = None
fsc: Optional[TestSchema] = None
sd: Optional[StructuredDataset] = None
dc: Optional[DC] = None
list_int: Optional[List[int]] = []
dict_ff: Optional[Dict[str, FlyteFile]] = None
@task(container_image=image)
def t1() -> MyModel:
# Create a local directory
dir_path = "./build"
os.makedirs(dir_path, exist_ok=True)
file_path = os.path.join(dir_path, "fltedir_example.txt")
with open(file_path, "w") as f:
f.write("FlyteDirectory content")
# write a txt file
file_path = "./build/local_example.txt"
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w") as f:
f.write("Default content")
# write a csv file
data = {
'Column1': ['Row1-Col1', 'Row2-Col1', 'Row3-Col1'],
'Column2': ['Row1-Col2', 'Row2-Col2', 'Row3-Col2'],
'Column3': ['Row1-Col3', 'Row2-Col3', 'Row3-Col3'],
}
# Create DataFrame
df = pd.DataFrame(data)
# Save DataFrame to CSV
csv_path = './build/example.csv'
df.to_csv(csv_path, index=False)
# For FlyteSchema
schema = TestSchema()
df = pd.DataFrame(data={"some_str": ["a", "b", "c"]})
opened_schema = schema.open()
opened_schema.write(df)
# SturcturedDataset
df = pd.DataFrame({"name": ["Tom", "Joseph"], "age": [20, 22]})
m = MyModel(
ct=CustomType(
value=1,
field_name="my_field",
ff=FlyteFile(file_path),
# fd=FlyteDirectory(path=flytekit.current_context().working_directory)
fd=FlyteDirectory(dir_path),
list_sd=[StructuredDataset(dataframe=df), StructuredDataset(dataframe=df)]
),
ff=FlyteFile("s3://my-s3-bucket/a/example.txt"),
ff_csv=CsvFile(csv_path),
fd=SvgDir("s3://my-s3-bucket/a/"),
fsc=schema,
sd = StructuredDataset(dataframe=df),
dc=DC(a=1, b="b"),
list_int=[1, 2, 3],
dict_ff={"a": FlyteFile("s3://my-s3-bucket/a/example.txt"), "b": FlyteFile(file_path)}
)
return m
@task(container_image=image)
def t2(m: MyModel) -> MyModel:
# print(type(m.ct.ff))
print("@@@ running t2")
print(m.ct.ff)
with open(m.ct.ff, "r") as f:
print(f"Local FlyteFile {m.ct.ff.path}: {f.read()}")
with open(m.ff, "r") as f:
print(f"Remote FlyteFile {m.ff.path}: {f.read()}")
print(pd.read_csv(m.ff_csv))
with open(m.dict_ff["a"], "r") as f:
print(f"Dict Local FlyteFile {m.ct.ff.path}: {f.read()}")
print(f"Local FlyteDirectory {m.ct.fd}: {os.listdir(m.ct.fd)}")
print(f"Remote FlyteDirectory {m.fd}: {os.listdir(m.fd)}")
print(f"Local FlyteSchema:\n {m.fsc.open().all()}")
print(f"Local StructuredDataset: \n{m.sd.open(pd.DataFrame).all()}")
print(f"Local StructuredDataset: \n{m.ct.list_sd[0].open(pd.DataFrame).all()}")
print(f"Local list[int]: {m.list_int}")
return m
@task(container_image=image)
def t3(m: MyModel) -> MyModel:
print("@@@ running t2")
print(m.ct.ff)
with open(m.ct.ff, "r") as f:
print(f"Local FlyteFile {m.ct.ff.path}: {f.read()}")
with open(m.ff, "r") as f:
print(f"Remote FlyteFile {m.ff.path}: {f.read()}")
print(pd.read_csv(m.ff_csv))
with open(m.dict_ff["a"], "r") as f:
print(f"Dict Local FlyteFile {m.ct.ff.path}: {f.read()}")
print(f"Local FlyteDirectory {m.ct.fd}: {os.listdir(m.ct.fd)}")
print(f"Remote FlyteDirectory {m.fd}: {os.listdir(m.fd)}")
print(f"Local FlyteSchema:\n {m.fsc.open().all()}")
print(f"Local StructuredDataset: \n{m.sd.open(pd.DataFrame).all()}")
print(f"Local StructuredDataset: \n{m.ct.list_sd[0].open(pd.DataFrame).all()}")
print(f"Local list[int]: {m.list_int}")
return m
@workflow
def wf() -> (MyModel, MyModel):
m = t1()
return t2(m=m), t3(m=m)
if __name__ == "__main__":
wf() |
Waiting for Pydantic Community for replying how to support both pydantic v1 and v2. |
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
* Add an exeception when filters' value isn't a list * Make the exception more specific Signed-off-by: Nelson Chen <[email protected]> * add an unit test for value_in Signed-off-by: Nelson Chen <[email protected]> * lint Signed-off-by: Kevin Su <[email protected]> --------- Signed-off-by: Nelson Chen <[email protected]> Signed-off-by: Kevin Su <[email protected]> Co-authored-by: Kevin Su <[email protected]>
Signed-off-by: wayner0628 <[email protected]> Signed-off-by: Kevin Su <[email protected]> Co-authored-by: Kevin Su <[email protected]>
* Show different of types in dataclass when transforming error Signed-off-by: Future-Outlier <[email protected]> * add tests for dataclass Signed-off-by: Future-Outlier <[email protected]> * fix tests Signed-off-by: Future-Outlier <[email protected]> --------- Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Tracking issue
flyteorg/flyte#5033
Notes
https://docs.pydantic.dev/latest/concepts/serialization/#custom-serializers
https://stackoverflow.com/questions/67621046/initializing-a-pydantic-dataclass-from-json
https://github.com/pydantic/bump-pydantic
Deserialization
Why are the changes needed?
What changes were proposed in this pull request?
How was this patch tested?
Example with local and remote execution
Setup process
Screenshots
Check all the applicable boxes
Related PRs
Docs link