-
Notifications
You must be signed in to change notification settings - Fork 669
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[RFC] JSON IDL #5607
[RFC] JSON IDL #5607
Conversation
Signed-off-by: Future-Outlier <[email protected]>
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #5607 +/- ##
==========================================
+ Coverage 35.89% 36.21% +0.31%
==========================================
Files 1301 1303 +2
Lines 109419 109560 +141
==========================================
+ Hits 39281 39674 +393
+ Misses 66041 65766 -275
- Partials 4097 4120 +23
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Signed-off-by: Future-Outlier <[email protected]>
rfc/system/5606-json-idl.md
Outdated
Json = "json" | ||
|
||
@task | ||
def t1() -> Annotated[dict, Json]: // Json Byte Strings |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will the user have to use this function signature? As a user I would want to be able to just use -> dict
and not worry about this.
rfc/system/5606-json-idl.md
Outdated
def t2(a: <dict>): | ||
print(a["integer"]) # wrong |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def t2(a: <dict>): | |
print(a["integer"]) # wrong | |
def t2(a: dict): | |
print(a["integer"]) # Now actually an integer |
Is this what you mean? Or is it still wrong?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the wrong
one, it means that we will get a float value but not in value due to the Struct
protobuf doesn't have int
field.
rfc/system/5606-json-idl.md
Outdated
- We use Annotated[dict, Json] instead of dict to ensure backward compatibility. | ||
- This helps us avoid breaking changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As a user, I wouldn't want to use Annotated[dict, Json]
to be honest as this requires way to much understanding about details of the type engine :/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it would be possible that we "put some kind of metadata/version into the Literal" that "tells the type engine to use the new mechanism"? If this "metadata" is not there, fall back to the old transformer.
In other words, "version" the literals/type transformers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Future-Outlier is there any way to just make the reading backwards compatible? if this happens to read a cached value from an upstream execution that written with the old code, fall back to reading the old code (and just log a warning).
I think most people update admin & propeller at the same time. If they just upgrade flytekit with this new type, it'll fail at registration time right? if they just upgrade flytekit & admin, but not propeller, it'll fail at launch time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fg91 @wild-endeavor
For flytekit, there's defintely a way to make it backward compatible by adding metadata.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it'll fail at registration time right?
If we specify with default input, and propeller didn't upgrade (take sandbox as an example)
propeller will crash. (This is definitely a bug in propeller)
from flytekit import task, workflow, ImageSpec
flytekit_hash = "4795f8ef3291e5f90730c0a030025ec6bc189190"
flyteidl_hash = "c1182565d62b4885156454da9d0a100b2facf313"
json_idl = f"git+https://github.com/flyteorg/flyte.git@{flyteidl_hash}#subdirectory=flyteidl"
flytekit = f"git+https://github.com/flyteorg/flytekit.git@{flytekit_hash}"
image = ImageSpec(
packages=[flytekit, json_idl],
apt_packages=["git"],
registry="localhost:30000",
)
@task(container_image=image)
def t2(input: dict) -> dict:
print(type(input["a"]))
print(type(input["a"]))
return input
@workflow
def wf(input: dict={'a': 1, }) -> dict:
return t2(input=input)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if they just upgrade flytekit & admin, but not propeller, it'll fail at launch time?
It'll fail before launch time, but I also think it'll fail at launch time too.
The worst part is that the propeller will crash.
Do you think it's ok?
We didn't have good error handling about handling unknown literal types registered to the propeller's compiler should return a good error message response to us, instead out propeller will just crash.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#5612 improves the error message at registration time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update: it's ok to support backward compatibility, but not forward compatibility.
Thank you two for telling me do this.
rfc/system/5606-json-idl.md
Outdated
} | ||
} | ||
``` | ||
### FlyteKit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit:
I feel this detail is not needed in the RFC, the addition of the Json message to flyteidl contains all the information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not needed yeah, we don't want to draw attention to model files in general. But could you add an example showing how pyflyte run would work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem, I will delete it and add pyflyte run
example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you remove this section?
rfc/system/5606-json-idl.md
Outdated
#### Show input/output on FlyteConsole | ||
We will get node’s input output literal value by FlyteAdmin’s API, and get the json byte string in the literal value. | ||
|
||
We can use MsgPack dumps the json byte string to a dictionary, and shows it to the flyteconsole. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make sure I understand correctly, the user will still be able to see the content of the dict, ... because flyte console too will deserialize?
If yes, this is quite neat.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, you're right, thank you
## 5 Drawbacks | ||
There's no breaking changes if we use `Annotated[dict, Json]`, but we need to be very careful about will there be any breaking changes. | ||
|
||
## 6 Alternatives |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please elaborate why specifically the pydantic transformer might suffer performance issues?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me clarify the serialization process (deserialization is the reverse):
-
Using msgpack:
basemodel -> json string -> dictionary -> msgpack bytes -
Using utf-8:
basemodel -> json string -> json byte string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But the benefit is that if we use msgpack
, we can have smaller data bytes when uploading to remote storage and downloading from it.
This is really a HUGE benefit and lots of people wants it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UTF-8 and msgpack are not the only possible formats, right? Can we add more color to this? What if of instead dictating msgpack as the serialization format everywhere we added that as an explicit field as part of the new literal type? This way we could have more specialized serialization formats used by plugins in case they want/need to.
A while ago we discussed this issue which is also related to how dataclasses etc. are serialized. @eapolinario remarked:
Is this RFC the mentioned process to revisit the dataclass transformer? |
yes, exactly. We've been experimenting with this idea and decided to make it official by following the RFC process. |
I can tell what might be a potential solution for it. |
Signed-off-by: Future-Outlier <[email protected]>
7a8cfe0
to
957f543
Compare
Signed-off-by: Future-Outlier <[email protected]>
6ff95c7
to
f06c006
Compare
Co-authored-by: Fabio M. Graetz, Ph.D. <[email protected]> Signed-off-by: Future-Outlier <[email protected]>
Co-authored-by: Fabio M. Graetz, Ph.D. <[email protected]> Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
@fg91 @eapolinario @wild-endeavor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove all uses of strikethrough (i.e. ~~
)?
Also, can we clarify why we need to encode objects to json and then msgpack before storing those values? In other words, why can't we go straight to msgpack-encoded objects whenever applicable?
rfc/system/5606-json-idl.md
Outdated
- We use Annotated[dict, Json] instead of dict to ensure backward compatibility. | ||
- This helps us avoid breaking changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#5612 improves the error message at registration time.
rfc/system/5606-json-idl.md
Outdated
} | ||
} | ||
``` | ||
### FlyteKit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you remove this section?
## 5 Drawbacks | ||
There's no breaking changes if we use `Annotated[dict, Json]`, but we need to be very careful about will there be any breaking changes. | ||
|
||
## 6 Alternatives |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
UTF-8 and msgpack are not the only possible formats, right? Can we add more color to this? What if of instead dictating msgpack as the serialization format everywhere we added that as an explicit field as part of the new literal type? This way we could have more specialized serialization formats used by plugins in case they want/need to.
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Hi all, I recently conducted performance tests and discovered that Additionally, we need to handle cases like the ones in this PR. Serialization behavior:
performanceencode
decode
exampleencodeimport msgpack
from dataclasses import dataclass, fields
from typing import List, Dict
from mashumaro.mixins.msgpack import DataClassMessagePackMixin
from mashumaro.codecs.json import JSONEncoder, JSONDecoder
from mashumaro.codecs.msgpack import MessagePackEncoder
import timeit
@dataclass
class InnerDataClass:
inner_attr1: int
inner_attr2: str
@dataclass
class MyDataClassWithMsgPackMixin(DataClassMessagePackMixin):
attr1: int
attr2: str
attr3: float
attr4: bool
attr5: List[int]
attr6: InnerDataClass
attr7: str
attr8: float
attr9: Dict[str, int]
attr10: bool
@dataclass
class MyDataClass:
attr1: int
attr2: str
attr3: float
attr4: bool
attr5: List[int]
attr6: InnerDataClass
attr7: str
attr8: float
attr9: Dict[str, int]
attr10: bool
# Initialize objects
dc_with_mixin = MyDataClassWithMsgPackMixin(
attr1=1, attr2='2', attr3=3.0, attr4=True, attr5=[1, 2, 3],
attr6=InnerDataClass(1, '2'), attr7='7', attr8=8.0,
attr9={'9': 9}, attr10=True
)
dc = MyDataClass(
attr1=1, attr2='2', attr3=3.0, attr4=True, attr5=[1, 2, 3],
attr6=InnerDataClass(1, '2'), attr7='7', attr8=8.0,
attr9={'9': 9}, attr10=True
)
new_dc = type(MyDataClass.__name__ + "WithMessagePackMixin", (MyDataClass, DataClassMessagePackMixin), {})
encoder = JSONEncoder(MyDataClass)
# Test function 1: Directly use msgpack mixin
def test_msgpack_mixin():
return dc_with_mixin.to_msgpack()
# Test function 2: Dynamically create a class with msgpack mixin
def test_dynamic_msgpack_mixin():
new_instance = new_dc(**{field.name: getattr(dc_with_mixin, field.name) for field in fields(dc_with_mixin)})
return new_instance.to_msgpack()
def test_create_new_dc():
return type(MyDataClass.__name__ + "WithMessagePackMixin", (MyDataClass, DataClassMessagePackMixin), {})
# Test function 3: Convert dataclass to JSON string and then encode to msgpack
def test_json_str_to_msgpack():
return msgpack.dumps(encoder.encode(dc))
def test_create_encoder():
return JSONEncoder(MyDataClass)
# Test function 4: Use MessagePackEncoder from mashumaro (without initialization)
def test_mashumaro_msgpack_encoder():
return mashumaro_encoder.encode(dc)
# Test function for initializing the mashumaro MessagePackEncoder
def test_create_mashumaro_encoder():
return MessagePackEncoder(MyDataClass)
# Initialize mashumaro encoder outside the test loop
mashumaro_encoder = test_create_mashumaro_encoder()
# Use timeit to test each method's performance, running 10000 times
iterations = 10000
time_mixin = timeit.timeit(test_msgpack_mixin, number=iterations)
time_dynamic = timeit.timeit(test_dynamic_msgpack_mixin, number=iterations)
time_dynamic_create_base_class = timeit.timeit(test_create_new_dc, number=1)
time_json = timeit.timeit(test_json_str_to_msgpack, number=iterations)
time_create_json_encoder = timeit.timeit(test_create_encoder, number=1)
time_mashumaro_msgpack = timeit.timeit(test_mashumaro_msgpack_encoder, number=iterations)
time_create_mashumaro_encoder = timeit.timeit(test_create_mashumaro_encoder, number=1)
# Output the results
print(f'{iterations} times dataclass with msgpack mixin elapsed time:', time_mixin)
print(f'{iterations} times dynamically create msgpack mixin elapsed time:', time_dynamic + time_dynamic_create_base_class)
print(f'{iterations} times dataclass -> json str -> msgpack bytes elapsed time:', time_json + time_create_json_encoder)
print(f'{iterations} times mashumaro msgpack encoder elapsed time:', time_mashumaro_msgpack + time_create_mashumaro_encoder)
# print(f'1 time creating mashumaro msgpack encoder elapsed time:', time_create_mashumaro_encoder)
import matplotlib.pyplot as plt
# Labels and timings for the four methods
methods = ['Msgpack Mixin', 'Dynamic Msgpack Mixin', 'JSON String to Msgpack', 'Mashumaro Msgpack Encoder']
times = [time_mixin, time_dynamic + time_dynamic_create_base_class, time_json + time_create_json_encoder, time_mashumaro_msgpack + time_create_mashumaro_encoder]
# Plotting the comparison
plt.figure(figsize=(30, 5))
plt.bar(methods, times, color=['blue', 'green', 'red', 'orange'])
plt.xlabel('Methods')
plt.ylabel('Elapsed Time (seconds)')
plt.title('Comparison of Serialization Methods')
plt.savefig('/Users/future-outlier/code/dev/flytekit/build/PR/JSON/eduardo/performance_comparison/encode.png') decodeimport msgpack
from dataclasses import dataclass, fields
from typing import List, Dict
from mashumaro.mixins.msgpack import DataClassMessagePackMixin
from mashumaro.codecs.json import JSONEncoder, JSONDecoder
from mashumaro.codecs.msgpack import MessagePackEncoder, MessagePackDecoder
import timeit
@dataclass
class InnerDataClass(DataClassMessagePackMixin):
inner_attr1: int
inner_attr2: str
@dataclass
class MyDataClassWithMsgPackMixin(DataClassMessagePackMixin):
attr1: int
attr2: str
attr3: float
attr4: bool
attr5: List[int]
attr6: InnerDataClass
attr7: str
attr8: float
attr9: Dict[str, int]
attr10: bool
@dataclass
class MyDataClass:
attr1: int
attr2: str
attr3: float
attr4: bool
attr5: List[int]
attr6: InnerDataClass
attr7: str
attr8: float
attr9: Dict[str, int]
attr10: bool
# Initialize objects
dc_with_mixin = MyDataClassWithMsgPackMixin(
attr1=1, attr2='2', attr3=3.0, attr4=True, attr5=[1, 2, 3],
attr6=InnerDataClass(1, '2'), attr7='7', attr8=8.0,
attr9={'9': 9}, attr10=True
)
dc = MyDataClass(
attr1=1, attr2='2', attr3=3.0, attr4=True, attr5=[1, 2, 3],
attr6=InnerDataClass(1, '2'), attr7='7', attr8=8.0,
attr9={'9': 9}, attr10=True
)
new_dc = type(MyDataClass.__name__ + "WithMessagePackMixin", (MyDataClass, DataClassMessagePackMixin), {})
encoder = JSONEncoder(MyDataClass)
decoder = JSONDecoder(MyDataClass)
# Serialize the object first to test decoding
msgpack_bytes_mixin = dc_with_mixin.to_msgpack()
msgpack_bytes_dynamic = new_dc(**{field.name: getattr(dc_with_mixin, field.name) for field in fields(dc_with_mixin)}).to_msgpack()
msgpack_bytes_json = msgpack.dumps(encoder.encode(dc))
# Test function 1: Decode using msgpack mixin
def test_msgpack_mixin_decode():
return MyDataClassWithMsgPackMixin.from_msgpack(msgpack_bytes_mixin)
# Test function 2: Dynamically create a class and decode using msgpack mixin
def test_dynamic_msgpack_mixin_decode():
new_instance = new_dc(**{field.name: getattr(dc_with_mixin, field.name) for field in fields(dc_with_mixin)})
return new_instance.from_msgpack(msgpack_bytes_dynamic)
def test_create_new_dc():
return type(MyDataClass.__name__ + "WithMessagePackMixin", (MyDataClass, DataClassMessagePackMixin), {})
# Test function 3: Convert msgpack bytes to JSON string and decode back to dataclass
def test_json_str_to_msgpack_decode():
return decoder.decode(msgpack.loads(msgpack_bytes_json))
def test_create_decoder():
return JSONDecoder(MyDataClass)
# Test function 4: Use MessagePackDecoder from mashumaro
def test_create_mashumaro_decoder():
return MessagePackDecoder(MyDataClass)
# Initialize mashumaro decoder outside the test loop
mashumaro_decoder = test_create_mashumaro_decoder()
def test_mashumaro_msgpack_decoder():
return mashumaro_decoder.decode(msgpack_bytes_mixin)
# Test function for initializing the mashumaro MessagePackDecoder
# Use timeit to test each method's performance, running 10000 times
iterations = 10000
time_mixin_decode = timeit.timeit(test_msgpack_mixin_decode, number=iterations)
time_dynamic_decode = timeit.timeit(test_dynamic_msgpack_mixin_decode, number=iterations)
time_dynamic_create_base_class = timeit.timeit(test_create_new_dc, number=1)
time_json_decode = timeit.timeit(test_json_str_to_msgpack_decode, number=iterations)
time_create_json_decoder = timeit.timeit(test_create_decoder, number=1)
time_mashumaro_decode = timeit.timeit(test_mashumaro_msgpack_decoder, number=iterations)
time_create_mashumaro_decoder = timeit.timeit(test_create_mashumaro_decoder, number=1)
# Output the results
print(f'{iterations} times dataclass with msgpack mixin decode elapsed time:', time_mixin_decode)
print(f'{iterations} times dynamically create msgpack mixin decode elapsed time:', time_dynamic_decode + time_dynamic_create_base_class)
print(f'{iterations} times dataclass -> json str -> msgpack bytes decode elapsed time:', time_json_decode + time_create_json_decoder)
print(f'{iterations} times mashumaro msgpack decode elapsed time:', time_mashumaro_decode + time_create_mashumaro_decoder)
import matplotlib.pyplot as plt
# Labels and timings for the four methods
methods = ['Msgpack Mixin Decode', 'Dynamic Msgpack Mixin Decode', 'JSON String to Msgpack Decode', 'Mashumaro Msgpack Decode']
times = [time_mixin_decode, time_dynamic_decode + time_dynamic_create_base_class, time_json_decode + time_create_json_decoder, time_mashumaro_decode + time_create_mashumaro_decoder]
# Plotting the comparison
plt.figure(figsize=(30, 5))
plt.bar(methods, times, color=['blue', 'green', 'red', 'orange'])
plt.xlabel('Methods')
plt.ylabel('Elapsed Time (seconds)')
plt.title('Comparison of Decoding Methods')
plt.savefig('/Users/future-outlier/code/dev/flytekit/build/PR/JSON/eduardo/performance_comparison/decode.png') pydantic failure examplefrom pydantic import BaseModel
from mashumaro.mixins.msgpack import DataClassMessagePackMixin
from mashumaro.codecs.msgpack import MessagePackEncoder, MessagePackDecoder
# 1. Define a Pydantic model
class TrainConfig(BaseModel):
lr: float = 1e-3 # learning rate
batch_size: int = 32 # batch size for training
# 2. Use Mashumaro's MessagePackEncoder and MessagePackDecoder for serializing and deserializing Pydantic objects
# Serialization function
def serialize_train_config(train_config: TrainConfig) -> bytes:
encoder = MessagePackEncoder(TrainConfig) # Create an encoder for TrainConfig
return encoder.encode(train_config) # Serialize the TrainConfig object to MessagePack format
# Deserialization function
def deserialize_train_config(encoded_data: bytes) -> TrainConfig:
decoder = MessagePackDecoder(TrainConfig) # Create a decoder for TrainConfig
return decoder.decode(encoded_data) # Deserialize the MessagePack data back to a TrainConfig object
# Example usage:
train_config = TrainConfig(lr=0.001, batch_size=64) # Create a TrainConfig object with custom values
# Serialize the TrainConfig object
serialized_data = serialize_train_config(train_config)
print(f"Serialized Data: {serialized_data}")
# Deserialize the data back to a TrainConfig object
deserialized_config = deserialize_train_config(serialized_data)
print(f"Deserialized Config: lr={deserialized_config.lr}, batch_size={deserialized_config.batch_size}") pydantic failure example error message/Users/future-outlier/miniconda3/envs/dev/bin/python /Users/future-outlier/code/dev/flytekit/build/PR/JSON/eduardo/pydantic_example.py
Traceback (most recent call last):
File "/Users/future-outlier/code/dev/flytekit/build/PR/JSON/eduardo/pydantic_example.py", line 26, in <module>
serialized_data = serialize_train_config(train_config)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/future-outlier/code/dev/flytekit/build/PR/JSON/eduardo/pydantic_example.py", line 14, in serialize_train_config
encoder = MessagePackEncoder(TrainConfig) # Create an encoder for TrainConfig
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/future-outlier/miniconda3/envs/dev/lib/python3.12/site-packages/mashumaro/codecs/msgpack.py", line 107, in __init__
code_builder.add_encode_method(shape_type, self, post_encoder_func)
File "/Users/future-outlier/miniconda3/envs/dev/lib/python3.12/site-packages/mashumaro/codecs/_builder.py", line 78, in add_encode_method
packed_value = PackerRegistry.get(
^^^^^^^^^^^^^^^^^^^
File "/Users/future-outlier/miniconda3/envs/dev/lib/python3.12/site-packages/mashumaro/core/meta/types/common.py", line 241, in get
raise UnserializableField(
mashumaro.exceptions.UnserializableField: Field "" of type TrainConfig in __root__ is not serializable
Process finished with exit code 1 |
Building a Whether you have a pure You can verify this by running the following example: import os
import tempfile
from dataclasses import dataclass
from flytekit.core.type_engine import TypeEngine
import pandas as pd
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
from flytekit.types.structured import StructuredDataset
from mashumaro.mixins.json import DataClassJSONMixin
from mashumaro.mixins.msgpack import DataClassMessagePackMixin
@dataclass
class Datum:
x: int
y: str
z: dict[int, str]
@dataclass
class FlyteTypes:
dataframe: StructuredDataset
file: FlyteFile
directory: FlyteDirectory
transformer = TypeEngine.get_transformer(FlyteTypes)
lt1_1 = transformer.get_literal_type(Datum)
lt1_2 = transformer.get_literal_type(FlyteTypes)
@dataclass
class Datum(DataClassMessagePackMixin, DataClassJSONMixin):
x: int
y: str
z: dict[int, str]
@dataclass
class FlyteTypes(DataClassMessagePackMixin, DataClassJSONMixin):
dataframe: StructuredDataset
file: FlyteFile
directory: FlyteDirectory
lt2_1 = transformer.get_literal_type(Datum)
lt2_2 = transformer.get_literal_type(FlyteTypes)
@dataclass
class Datum(DataClassJSONMixin):
x: int
y: str
z: dict[int, str]
@dataclass
class FlyteTypes(DataClassJSONMixin):
dataframe: StructuredDataset
file: FlyteFile
directory: FlyteDirectory
lt3_1 = transformer.get_literal_type(Datum)
lt3_2 = transformer.get_literal_type(FlyteTypes)
@dataclass
class Datum(DataClassMessagePackMixin):
x: int
y: str
z: dict[int, str]
@dataclass
class FlyteTypes(DataClassMessagePackMixin):
dataframe: StructuredDataset
file: FlyteFile
directory: FlyteDirectory
lt4_1 = transformer.get_literal_type(Datum)
lt4_2 = transformer.get_literal_type(FlyteTypes)
assert lt1_1 == lt2_1 and lt1_1 == lt3_1 and lt1_1 == lt4_1
assert lt1_2 == lt2_2 and lt1_2 == lt3_2 and lt1_2 == lt4_2 |
I also found that directly encoding by terminal outputoutput 1 MsgPack serialized data size: 15757449 bytes
MsgPack serialized JSON string size: 21777797 bytes
UTF-8 encoded JSON string size: 21777792 bytes code exampleoutput 1 import msgpack
import json
# Step 1: Create a dictionary with 100000 key-value pairs using a for loop
d = {f'key_{i}': i for i in range(1, 1000001)}
# Step 2: Serialize dictionary using msgpack
msgpack_data = msgpack.dumps(d)
msgpack_data_size = len(msgpack_data)
# Step 3: Convert dictionary to JSON string and serialize using msgpack
json_str = json.dumps(d)
msgpack_json_data = msgpack.dumps(json_str)
msgpack_json_data_size = len(msgpack_json_data)
# Step 4: Encode the JSON string using UTF-8 and compare size
encoded_json_str = json_str.encode('utf-8')
encoded_json_str_size = len(encoded_json_str)
# Print size comparison
print(f"MsgPack serialized data size: {msgpack_data_size} bytes")
print(f"MsgPack serialized JSON string size: {msgpack_json_data_size} bytes")
print(f"UTF-8 encoded JSON string size: {encoded_json_str_size} bytes") output 2 import msgpack
import json
import matplotlib.pyplot as plt
# Function to measure sizes for given number of attributes
def measure_sizes(n):
# Create a dictionary with n key-value pairs
d = {f'key_{i}': i for i in range(1, n+1)}
# Serialize dictionary using msgpack
msgpack_data = msgpack.dumps(d)
msgpack_data_size = len(msgpack_data)
# Convert dictionary to JSON string and serialize using msgpack
json_str = json.dumps(d)
msgpack_json_data = msgpack.dumps(json_str)
msgpack_json_data_size = len(msgpack_json_data)
# Encode the JSON string using UTF-8
encoded_json_str = json_str.encode('utf-8')
encoded_json_str_size = len(encoded_json_str)
return msgpack_data_size, msgpack_json_data_size, encoded_json_str_size
# Define test points for 100000, 200000, ..., 1000000 attributes
num_attributes = [100000 * i for i in range(1, 11)]
msgpack_sizes = []
msgpack_json_sizes = []
utf8_json_sizes = []
# Measure sizes for each point
for n in num_attributes:
msgpack_size, msgpack_json_size, utf8_json_size = measure_sizes(n)
msgpack_sizes.append(msgpack_size)
msgpack_json_sizes.append(msgpack_json_size)
utf8_json_sizes.append(utf8_json_size)
# Plot the results
plt.plot(num_attributes, msgpack_sizes, label='MsgPack serialized data size')
plt.plot(num_attributes, msgpack_json_sizes, label='MsgPack serialized JSON string size')
plt.plot(num_attributes, utf8_json_sizes, label='UTF-8 encoded JSON string size')
plt.xlabel('Number of attributes')
plt.ylabel('Size (bytes)')
plt.title('Comparison of Serialized Data Sizes')
plt.legend()
plt.grid(True)
plt.show() |
The official msgpack documentation says: I also found that if we use |
JSON IDL DiscussionPrerequisite:The idea is to store There's 2 dimension we can think of, the JSON types and the way to do serialization and deserialization. JSON typesNow we only support python's default JSON type, see here. serialization and deserialization APIsWe decided to use for example, sizespeedDiscussionWe have 2 ways to implement it, either use a new IDL type or reuse the Binary IDL. Add a new IDL called JSONmessage Json {
bytes value = 1;
} propeller compilercheck if literal scalar simple type is propeller attribute accesswe can use message TypeStructure {
// Must exactly match for types to be castable
string tag = 1;
// dataclass_type only exists for dataclasses.
// This is used to resolve the type of the fields of dataclass
// The key is the field name, and the value is the literal type of the field
// e.g. For dataclass Foo, with fields a, and a is a string
// Foo.a will be resolved as a literal type of string from dataclass_type
map<string, LiteralType> dataclass_type = 2;
} if `TypeStructure.tag == "json-msgpack", then we deserialize it using msgpack api and to json type. flytectlcheck if the type is flyteconsolecheck if the type is ( Reuse BINARY IDLmessage Binary {
bytes value = 1;
string tag = 2;
} propeller compilercheck if literal scalar simple type is ( propeller attribute accessThe same as flytectlcheck if the type is ( flyteconsolecheck if the type is ( |
I have a preference for the "type alias" JSON (why not call it Messagepack?) over reusing Binary as we'd avoid comparing string tags. |
I have 1 idea, but implementing, will show all of you today, and thank for your extra work in the weekend, really appreciate that. |
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Signed-off-by: Future-Outlier <[email protected]>
Writing this here so others can comment. @Future-Outlier and I had a chat today about the most recent draft. I think we've strayed a bit far from the intent of the original issue. The current implementation now doesn't really consider the JSON component very much at all. I'm basing this off of the conversation we had but it's also summarized by the first to literal block in the summary...
In the original issue, the JSON component is the primary motivator, and msgpack was only picked because it is a more efficient transport mechanism for bytes across the wire than say, utf-8, or some other normal encoding format. (it's also understandable by browsers, unlike a generic compression format.) Now the json component seems to be an afterthought with msgpack being the primary motivator for this work. As far as I know (please correct me if I'm wrong), this is not the intent. If this was switched and intentionally so, could someone please add the context here? If I understand correctly, the genesis of this project was the realization that json
For these reasons it was thought (again, correct me if i'm wrong) that this would be a powerful abstraction. A couple things that might be enabled by a stronger json notion. CompatibilityThis was covered back in the issue with unions and dataclasses, but the existence of a json schema allows for more useful type/compatibility checking I think. Attribute access of dataclass-like structuresThe second part that I think is worth thinking about and improving upon we uncovered in our chat today. The data accessor featured added last year works well for native flyte idl container types (like lists and maps) but doesn't work with dataclasses. This example fails both subsequent tasks from dataclasses import dataclass
from mashumaro.mixins.json import DataClassJSONMixin
from flytekit import task, workflow, WorkflowFailurePolicy
from flytekit.types.file import FlyteFile
@dataclass
class MyDC(DataClassJSONMixin):
data: FlyteFile
number: int = 10
@task
def produce_data() -> MyDC:
return MyDC(data=FlyteFile(path=__file__))
@task
def t1(input_file: FlyteFile):
with open(input_file, "r") as f:
print(f.read())
@task
def t2(nn: int):
print(f"Number is {nn}")
@workflow(failure_policy=WorkflowFailurePolicy.FAIL_AFTER_EXECUTABLE_NODES_COMPLETE)
def wf():
my_dc = produce_data()
t1(my_dc.data)
t2(my_dc.number) The errors look pretty similar for both tasks Error for
Error for
The second issue (the mismatch in float/int) will be addressed by the PR as it stands, but I don't think the first can be. However I think teaching type transformers how to handle We can somehow instruct flytekit that the incoming data is of JSON form, rather than flyteidl proto binary form. And sure we can pick some encoding (utf-8, msgpack, doesn't matter). |
I agree that having a good abstraction layer in Flytekit would be perfect. |
Not entirely understood this case, if possible, someone please explain it, thank you. |
@wild-endeavor @eapolinario @pingsutw Serialization: Python value -> I think if we require everyone using Thank you. |
To make the JSON IDL more useful for users, I think that if we require users to have a A feasible approach is as follows:
But if we convert language value -> JSON string, some serialization APIs may not gain a performance boost because this adds extra bytes to the string. As mentioned in the official MsgPack documentation: 'MessagePack is an efficient binary serialization format. It allows you to exchange data between multiple languages, like JSON, but it is faster and smaller. Small integers are encoded in a single byte, and typical short strings require only one extra byte beyond the string itself.' |
@Future-Outlier, @wild-endeavor and myself met to discuss the direction of this RFC. Here is what we landed on:
The rationale:
@wild-endeavor and @Future-Outlier will add more details about the implementation and flytekit changes needed... cc @eapolinario |
Move to #5742 |
Tracking issue
#5318
Why are the changes needed?
What changes were proposed in this pull request?
How was this patch tested?
Setup process
Screenshots
Check all the applicable boxes
Related PRs
Docs link