Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Pydantic Plugin V2 #2577

Closed
wants to merge 48 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
48 commits
Select commit Hold shift + click to select a range
2a56265
Pydantic Plugin V2
Future-Outlier Jul 10, 2024
69af91d
need to reconstruct flytetypes, for example, flytefile needs method
Future-Outlier Jul 15, 2024
0d4a96b
Customize FlyteFile Serialization Method Succeed
Future-Outlier Jul 15, 2024
4528744
Add Deserialization Method in FlyteFile
Future-Outlier Jul 15, 2024
7055eec
revert FlyteFile's change
Future-Outlier Jul 15, 2024
edb13f7
Add FlyteDirectory, but it doesn't work
Future-Outlier Jul 15, 2024
0a15356
Fix FlyteDirectory Bug
Future-Outlier Jul 16, 2024
620082f
Fix FlyteDirectory Bug and lint
Future-Outlier Jul 16, 2024
c4d8be3
add print
Future-Outlier Jul 16, 2024
403361b
print upload or not
Future-Outlier Jul 16, 2024
5f37eee
debuggin upload file
Future-Outlier Jul 16, 2024
4e7ac59
make remote_directory=False to default value in FlyteDirectory
Future-Outlier Jul 16, 2024
aa59f9f
use type(self) in _serialize function
Future-Outlier Jul 16, 2024
3c5169d
test flyteschema
Future-Outlier Jul 16, 2024
dfe57da
Update FlyteDirectory Behavior
Future-Outlier Jul 16, 2024
84a424a
Update FlyteFile Behavior
Future-Outlier Jul 16, 2024
63e5ea5
Add FlyteSchema in Dataclass Test
Future-Outlier Jul 16, 2024
70848e7
Remove redundant code
Future-Outlier Jul 16, 2024
c9c5128
Support Strucutured Dataset
Future-Outlier Jul 16, 2024
0058e6b
push pydantic v1 and v2 plugin to same plugin folder
Future-Outlier Jul 17, 2024
dc65354
move back v1
Future-Outlier Jul 17, 2024
19b4f2f
support v2 first
Future-Outlier Jul 17, 2024
6f7e695
JSON IDL
Future-Outlier Jul 22, 2024
4f047b3
dict works
Future-Outlier Jul 23, 2024
abe398b
add click types
Future-Outlier Jul 23, 2024
fe05e1a
fix json from_flyte_idl error
Future-Outlier Jul 23, 2024
232144b
fix JSON IDL bug
Future-Outlier Jul 23, 2024
4795f8e
use msgpack encode and decode
Future-Outlier Jul 30, 2024
55a16ba
Add msgpack to pyproject.toml
Future-Outlier Aug 1, 2024
33a2c34
Merge branch 'master' into json-idl
Future-Outlier Aug 5, 2024
3eaf10f
support dict json pickle cases
Future-Outlier Aug 5, 2024
3a689e1
dict success
Future-Outlier Aug 5, 2024
c14d4bf
Raise an exception when filters' value isn't a list. (#2576)
arbaobao Aug 5, 2024
095ba7f
Update error message for TypeTransformerFailedError (#2648)
wayner0628 Aug 5, 2024
8e30116
[Error Message] Dataclasses Mismatched Type (#2650)
Future-Outlier Aug 6, 2024
2aff851
Merge branch 'master' into json-idl
Future-Outlier Aug 6, 2024
7451c6f
need local execution attribute access
Future-Outlier Aug 6, 2024
0328a6a
trying local execution attribute access
Future-Outlier Aug 6, 2024
9ba30a5
Need to support nested case, can refer
Future-Outlier Aug 6, 2024
2a8fe3c
Support attr path local execution now
Future-Outlier Aug 6, 2024
e0f9034
use dict
Future-Outlier Aug 7, 2024
b80c762
lint
Future-Outlier Aug 7, 2024
5256e6d
update dict behavior
Future-Outlier Aug 8, 2024
ea1c766
Skip fix dataclass int for JSON Type
Future-Outlier Aug 8, 2024
c01d85a
Support Local Execution Attribute Access 100% Correct
Future-Outlier Aug 8, 2024
19e3abc
Merge branch 'master' into pydantic-plugin
Future-Outlier Aug 8, 2024
719713e
Merge branch 'json-idl' into pydantic-plugin
Future-Outlier Aug 8, 2024
aa54263
Merge Json IDL FLYTEKIT
Future-Outlier Aug 8, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ docs/source/_tags/
.hypothesis
.npm
/**/target
coverage.xml

# Version file is auto-generated by setuptools_scm
flytekit/_version.py
2 changes: 1 addition & 1 deletion Dockerfile.dev
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ COPY . /flytekit
RUN SETUPTOOLS_SCM_PRETEND_VERSION_FOR_FLYTEKIT=$PSEUDO_VERSION \
SETUPTOOLS_SCM_PRETEND_VERSION_FOR_FLYTEIDL=3.0.0dev0 \
uv pip install --system --no-cache-dir -U \
"git+https://github.com/flyteorg/flyte.git@master#subdirectory=flyteidl" \
"git+https://github.com/flyteorg/flyte.git@32f929a6184d001cd7b471a9b19839649560cc06#subdirectory=flyteidl" \
-e /flytekit \
-e /flytekit/plugins/flytekit-k8s-pod \
-e /flytekit/plugins/flytekit-deck-standard \
Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.in
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-e file:.
flyteidl @ git+https://github.com/flyteorg/flyte.git@master#subdirectory=flyteidl
flyteidl @ git+https://github.com/flyteorg/flyte.git@32f929a6184d001cd7b471a9b19839649560cc06#subdirectory=flyteidl

coverage[toml]
hypothesis
Expand Down
48 changes: 36 additions & 12 deletions flytekit/core/promise.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,15 +91,15 @@ def my_wf(in1: int, in2: int) -> int:
t = native_types[k]
try:
if type(v) is Promise:
v = resolve_attr_path_in_promise(v)
v = resolve_attr_path_in_promise(v, t)
result[k] = TypeEngine.to_literal(ctx, v, t, var.type)
except TypeTransformerFailedError as exc:
raise TypeTransformerFailedError(f"Failed argument '{k}': {exc}") from None

return result


def resolve_attr_path_in_promise(p: Promise) -> Promise:
def resolve_attr_path_in_promise(p: Promise, t: typing.Type) -> Promise:
"""
resolve_attr_path_in_promise resolves the attribute path in a promise and returns a new promise with the resolved value
This is for local execution only. The remote execution will be resolved in flytepropeller.
Expand Down Expand Up @@ -134,16 +134,26 @@ def resolve_attr_path_in_promise(p: Promise) -> Promise:
break

# If the current value is a dataclass, resolve the dataclass with the remaining path
if (
len(p.attr_path) > 0
and type(curr_val.value) is _literals_models.Scalar
and type(curr_val.value.value) is _struct.Struct
):
st = curr_val.value.value
new_st = resolve_attr_path_in_pb_struct(st, attr_path=p.attr_path[used:])
literal_type = TypeEngine.to_literal_type(type(new_st))
# Reconstruct the resolved result to flyte literal (because the resolved result might not be struct)
curr_val = TypeEngine.to_literal(FlyteContextManager.current_context(), new_st, type(new_st), literal_type)
if len(p.attr_path) > 0 and type(curr_val.value) is _literals_models.Scalar:
import json

import msgpack

from flytekit.models.literals import Json

if type(curr_val.value.value) is _struct.Struct:
st = curr_val.value.value
new_st = resolve_attr_path_in_pb_struct(st, attr_path=p.attr_path[used:])
literal_type = TypeEngine.to_literal_type(type(new_st))
# Reconstruct the resolved result to flyte literal (because the resolved result might not be struct)
curr_val = TypeEngine.to_literal(FlyteContextManager.current_context(), new_st, type(new_st), literal_type)
elif type(curr_val.value.value) is Json:
json_bytes = curr_val.value.json.value
json_str = msgpack.loads(json_bytes)
dict_obj = json.loads(json_str)
v = resolve_attr_path_in_dict(dict_obj, attr_path=p.attr_path[used:])
literal_type = TypeEngine.to_literal_type(t)
curr_val = TypeEngine.to_literal(FlyteContextManager.current_context(), v, t, literal_type)

p._val = curr_val
return p
Expand All @@ -160,6 +170,20 @@ def resolve_attr_path_in_pb_struct(st: _struct.Struct, attr_path: List[Union[str
return curr_val


def resolve_attr_path_in_dict(d: dict, attr_path: List[Union[str, int]]) -> Any:
curr_val = d
for attr in attr_path:
try:
curr_val = curr_val[attr]
except (KeyError, IndexError, TypeError) as e:
raise FlytePromiseAttributeResolveException(
f"Failed to resolve attribute path {attr_path} in dict {curr_val}, attribute {attr} not found.\n"
f"Error Message: {e}"
)

return curr_val


def get_primitive_val(prim: Primitive) -> Any:
for value in [
prim.integer,
Expand Down
97 changes: 79 additions & 18 deletions flytekit/core/type_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,12 +487,17 @@ def get_literal_type(self, t: Type[T]) -> LiteralType:

ts = TypeStructure(tag="", dataclass_type=literal_type)

return _type_models.LiteralType(simple=_type_models.SimpleType.STRUCT, metadata=schema, structure=ts)
return _type_models.LiteralType(simple=_type_models.SimpleType.JSON, metadata=schema, structure=ts)

def to_literal(self, ctx: FlyteContext, python_val: T, python_type: Type[T], expected: LiteralType) -> Literal:
import msgpack

from flytekit.models.literals import Json

if isinstance(python_val, dict):
json_str = json.dumps(python_val)
return Literal(scalar=Scalar(generic=_json_format.Parse(json_str, _struct.Struct())))
json_bytes = msgpack.dumps(json_str)
return Literal(scalar=Scalar(json=Json(value=json_bytes)))

if not dataclasses.is_dataclass(python_val):
raise TypeTransformerFailedError(
Expand All @@ -519,7 +524,8 @@ def to_literal(self, ctx: FlyteContext, python_val: T, python_type: Type[T], exp
f" and implement _serialize and _deserialize methods."
)

return Literal(scalar=Scalar(generic=_json_format.Parse(json_str, _struct.Struct()))) # type: ignore
json_bytes = msgpack.dumps(json_str)
return Literal(scalar=Scalar(json=Json(value=json_bytes)))

def _get_origin_type_in_annotation(self, python_type: Type[T]) -> Type[T]:
# dataclass will try to hash python type when calling dataclass.schema(), but some types in the annotation is
Expand Down Expand Up @@ -653,13 +659,21 @@ def _fix_dataclass_int(self, dc_type: Type[dataclasses.dataclass], dc: typing.An
return dc

def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type: Type[T]) -> T:
import msgpack

if not dataclasses.is_dataclass(expected_python_type):
raise TypeTransformerFailedError(
f"{expected_python_type} is not of type @dataclass, only Dataclasses are supported for "
"user defined datatypes in Flytekit"
)

json_str = _json_format.MessageToJson(lv.scalar.generic)
scalar = lv.scalar
json_str = ""
if scalar.json:
json_bytes = lv.scalar.json.value
json_str = msgpack.loads(json_bytes)
elif scalar.generic:
json_str = _json_format.MessageToJson(scalar.generic)

# The function looks up or creates a JSONDecoder specifically designed for the object's type.
# This decoder is then used to convert a JSON string into a data class.
Expand All @@ -670,9 +684,12 @@ def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type:
self._decoder[expected_python_type] = decoder

dc = decoder.decode(json_str)

dc = self._fix_structured_dataset_type(expected_python_type, dc)
return self._fix_dataclass_int(expected_python_type, dc)

if scalar.generic:
self._fix_dataclass_int(expected_python_type, dc)

return dc

# This ensures that calls with the same literal type returns the same dataclass. For example, `pyflyte run``
# command needs to call guess_python_type to get the TypeEngine-derived dataclass. Without caching here, separate
Expand Down Expand Up @@ -1636,6 +1653,28 @@ def dict_to_generic_literal(ctx: FlyteContext, v: dict, allow_pickle: bool) -> L
)
raise e

@staticmethod
def dict_to_json_literal(ctx: FlyteContext, v: dict, allow_pickle: bool) -> Literal:
"""
Creates a flyte-specific ``Literal`` value from a native python dictionary.
"""
import msgpack

from flytekit.models.literals import Json
from flytekit.types.pickle import FlytePickle

try:
json_str = json.dumps(v)
json_bytes = msgpack.dumps(json_str)
return Literal(scalar=Scalar(json=Json(value=json_bytes)), metadata={"format": "json"})
except TypeError as e:
if allow_pickle:
remote_path = FlytePickle.to_pickle(ctx, v)
json_str = json.dumps({"pickle_file": remote_path})
json_bytes = msgpack.dumps(json_str)
return Literal(scalar=Scalar(json=Json(value=json_bytes)), metadata={"format": "pickle"})
raise e

@staticmethod
def is_pickle(python_type: Type[dict]) -> typing.Tuple[bool, Type]:
base_type, *metadata = DictTransformer.extract_types_or_metadata(python_type)
Expand Down Expand Up @@ -1670,7 +1709,7 @@ def get_literal_type(self, t: Type[dict]) -> LiteralType:
return _type_models.LiteralType(map_value_type=sub_type)
except Exception as e:
raise ValueError(f"Type of Generic List type is not supported, {e}")
return _type_models.LiteralType(simple=_type_models.SimpleType.STRUCT)
return _type_models.LiteralType(simple=_type_models.SimpleType.JSON)

def to_literal(
self, ctx: FlyteContext, python_val: typing.Any, python_type: Type[dict], expected: LiteralType
Expand All @@ -1684,8 +1723,11 @@ def to_literal(
if get_origin(python_type) is Annotated:
allow_pickle, base_type = DictTransformer.is_pickle(python_type)

if expected and expected.simple and expected.simple == SimpleType.STRUCT:
return self.dict_to_generic_literal(ctx, python_val, allow_pickle)
if expected and expected.simple:
if expected.simple == SimpleType.STRUCT:
return self.dict_to_generic_literal(ctx, python_val, allow_pickle)
elif expected.simple == SimpleType.JSON:
return self.dict_to_json_literal(ctx, python_val, allow_pickle)

lit_map = {}
for k, v in python_val.items():
Expand Down Expand Up @@ -1720,17 +1762,36 @@ def to_python_value(self, ctx: FlyteContext, lv: Literal, expected_python_type:

# for empty generic we have to explicitly test for lv.scalar.generic is not None as empty dict
# evaluates to false
if lv and lv.scalar and lv.scalar.generic is not None:
if lv.metadata and lv.metadata.get("format", None) == "pickle":
from flytekit.types.pickle import FlytePickle
if lv and lv.scalar:
if lv.scalar.generic is not None:
if lv.metadata and lv.metadata.get("format", None) == "pickle":
from flytekit.types.pickle import FlytePickle

uri = json.loads(_json_format.MessageToJson(lv.scalar.generic)).get("pickle_file")
return FlytePickle.from_pickle(uri)
uri = json.loads(_json_format.MessageToJson(lv.scalar.generic)).get("pickle_file")
return FlytePickle.from_pickle(uri)

try:
return json.loads(_json_format.MessageToJson(lv.scalar.generic))
except TypeError:
raise TypeTransformerFailedError(f"Cannot convert from {lv} to {expected_python_type}")
try:
return json.loads(_json_format.MessageToJson(lv.scalar.generic))
except TypeError:
raise TypeTransformerFailedError(f"Cannot convert from {lv} to {expected_python_type}")
elif lv.scalar.json is not None:
import msgpack

if lv.metadata and lv.metadata.get("format", None) == "pickle":
from flytekit.types.pickle import FlytePickle

json_bytes = lv.scalar.json.value
json_str = msgpack.loads(json_bytes)
uri = json.loads(json_str).get("pickle_file")
return FlytePickle.from_pickle(uri)

try:
json_bytes = lv.scalar.json.value
json_str = msgpack.loads(json_bytes)
return json.loads(json_str)

except TypeError:
raise TypeTransformerFailedError(f"Cannot convert from {lv} to {expected_python_type}")

raise TypeTransformerFailedError(f"Cannot convert from {lv} to {expected_python_type}")

Expand Down
2 changes: 1 addition & 1 deletion flytekit/interaction/click_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def literal_type_to_click_type(lt: LiteralType, python_type: typing.Type) -> cli
Converts a Flyte LiteralType given a python_type to a click.ParamType
"""
if lt.simple:
if lt.simple == SimpleType.STRUCT:
if lt.simple == SimpleType.STRUCT or lt.simple == SimpleType.JSON:
ct = JsonParamType(python_type)
ct.name = f"JSON object {python_type.__name__}"
return ct
Expand Down
2 changes: 2 additions & 0 deletions flytekit/interaction/string_literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def scalar_to_string(scalar: Scalar) -> typing.Any:
return MessageToDict(scalar.generic)
if scalar.union:
return literal_string_repr(scalar.union.value)
if scalar.json:
return scalar.json.value
raise ValueError(f"Unknown scalar type {scalar}")


Expand Down
36 changes: 36 additions & 0 deletions flytekit/models/literals.py
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,32 @@ def from_flyte_idl(cls, proto):
return cls()


class Json(_common.FlyteIdlEntity):
def __init__(self, value: bytes):
self._value = value

@property
def value(self):
"""
:rtype: bytes
"""
return self._value

def to_flyte_idl(self):
"""
:rtype: flyteidl.core.literals_pb2.Json
"""
return _literals_pb2.Json(value=self.value)

@classmethod
def from_flyte_idl(cls, proto):
"""
:param flyteidl.core.literals_pb2.Json proto:
:rtype: Json
"""
return cls(value=proto.value)


class BindingDataMap(_common.FlyteIdlEntity):
def __init__(self, bindings):
"""
Expand Down Expand Up @@ -712,6 +738,7 @@ def __init__(
error: Error = None,
generic: Struct = None,
structured_dataset: StructuredDataset = None,
json: Json = None,
):
"""
Scalar wrapper around Flyte types. Only one can be specified.
Expand All @@ -724,6 +751,7 @@ def __init__(
:param Error error:
:param google.protobuf.struct_pb2.Struct generic:
:param StructuredDataset structured_dataset:
:param Json json:
"""

self._primitive = primitive
Expand All @@ -735,6 +763,7 @@ def __init__(
self._error = error
self._generic = generic
self._structured_dataset = structured_dataset
self._json = json

@property
def primitive(self):
Expand Down Expand Up @@ -796,6 +825,10 @@ def generic(self):
def structured_dataset(self) -> StructuredDataset:
return self._structured_dataset

@property
def json(self) -> Json:
return self._json

@property
def value(self):
"""
Expand All @@ -812,6 +845,7 @@ def value(self):
or self.error
or self.generic
or self.structured_dataset
or self.json
)

def to_flyte_idl(self):
Expand All @@ -828,6 +862,7 @@ def to_flyte_idl(self):
error=self.error.to_flyte_idl() if self.error is not None else None,
generic=self.generic,
structured_dataset=self.structured_dataset.to_flyte_idl() if self.structured_dataset is not None else None,
json=self.json.to_flyte_idl() if self.json is not None else None,
)

@classmethod
Expand All @@ -849,6 +884,7 @@ def from_flyte_idl(cls, pb2_object):
structured_dataset=StructuredDataset.from_flyte_idl(pb2_object.structured_dataset)
if pb2_object.HasField("structured_dataset")
else None,
json=Json.from_flyte_idl(pb2_object.json) if pb2_object.HasField("json") else None,
)


Expand Down
1 change: 1 addition & 0 deletions flytekit/models/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class SimpleType(object):
BINARY = _types_pb2.BINARY
ERROR = _types_pb2.ERROR
STRUCT = _types_pb2.STRUCT
JSON = _types_pb2.JSON


class SchemaType(_common.FlyteIdlEntity):
Expand Down
2 changes: 1 addition & 1 deletion flytekit/types/directory/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import typing

from .types import FlyteDirectory
from .types import FlyteDirectory, FlyteDirToMultipartBlobTransformer

# The following section provides some predefined aliases for commonly used FlyteDirectory formats.

Expand Down
Loading
Loading