Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] Binary IDL With MessagePack Bytes #5744

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 27 additions & 11 deletions flyteidl/clients/go/assets/admin.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions flyteidl/gen/pb-es/flyteidl/core/literals_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions flyteidl/gen/pb-go/flyteidl/core/literals.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions flyteidl/gen/pb_rust/flyteidl.core.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions flyteidl/protos/flyteidl/core/literals.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ message BlobMetadata {
// A simple byte array with a tag to help different parts of the system communicate about what is in the byte array.
// It's strongly advisable that consumers of this type define a unique tag and validate the tag before parsing the data.
message Binary {
bytes value = 1;
string tag = 2;
bytes value = 1; // Serialized data (MessagePack) for supported types like Dataclass, Pydantic BaseModel, and dict.
string tag = 2; // The serialization format identifier (e.g., MessagePack). Consumers must define unique tags and validate them before deserialization.
}

// A strongly typed schema that defines the interface of data retrieved from the underlying storage medium.
Expand Down
1 change: 1 addition & 0 deletions flytepropeller/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.16.0
github.com/shamaton/msgpack/v2 v2.2.2
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.7.0
github.com/spf13/pflag v1.0.5
Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shamaton/msgpack/v2 v2.2.2 h1:GOIg0c9LV04VwzOOqZSrmsv/JzjNOOMxnS/HvOHGdgs=
github.com/shamaton/msgpack/v2 v2.2.2/go.mod h1:6khjYnkx73f7VQU7wjcFS9DFjs+59naVWJv1TB7qdOI=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
Expand Down
6 changes: 5 additions & 1 deletion flytepropeller/pkg/compiler/validators/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@

literalType = &core.LiteralType{Type: &core.LiteralType_Blob{Blob: scalar.GetBlob().GetMetadata().GetType()}}
case *core.Scalar_Binary:
literalType = &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_BINARY}}
if len(v.Binary.Tag) > 0 {
literalType = &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRUCT}}
} else {
literalType = &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_BINARY}}

Check warning on line 50 in flytepropeller/pkg/compiler/validators/utils.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/compiler/validators/utils.go#L47-L50

Added lines #L47 - L50 were not covered by tests
}
case *core.Scalar_Schema:
literalType = &core.LiteralType{
Type: &core.LiteralType_Schema{
Expand Down
95 changes: 88 additions & 7 deletions flytepropeller/pkg/controller/nodes/attr_path_resolver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nodes

import (
"github.com/shamaton/msgpack/v2"
"google.golang.org/protobuf/types/known/structpb"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
Expand Down Expand Up @@ -37,13 +38,20 @@
}
}

// resolve dataclass
if currVal.GetScalar() != nil && currVal.GetScalar().GetGeneric() != nil {
st := currVal.GetScalar().GetGeneric()
// start from index "count"
currVal, err = resolveAttrPathInPbStruct(nodeID, st, bindAttrPath[count:])
if err != nil {
return nil, err
// resolve dataclass and Pydantic BaseModel
if scalar := currVal.GetScalar(); scalar != nil {
if binary := scalar.GetBinary(); binary != nil {
// Start from index "count"
currVal, err = resolveAttrPathInBinary(nodeID, binary, bindAttrPath[count:])
if err != nil {
return nil, err

Check warning on line 47 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L45-L47

Added lines #L45 - L47 were not covered by tests
}
} else if generic := scalar.GetGeneric(); generic != nil {
// Start from index "count"
currVal, err = resolveAttrPathInPbStruct(nodeID, generic, bindAttrPath[count:])
if err != nil {
return nil, err
}
}
}

Expand Down Expand Up @@ -84,6 +92,79 @@
return literal, err
}

// resolveAttrPathInBinary resolves the protobuf binary (e.g. dataclass, pydantic basemodel) with attribute path
func resolveAttrPathInBinary(nodeID string, binaryIDL *core.Binary, bindAttrPath []*core.PromiseAttribute) (*core.
Literal,
error) {

Check warning on line 98 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L98

Added line #L98 was not covered by tests

binaryBytes := binaryIDL.GetValue()
serializationFormat := binaryIDL.GetTag()

Check warning on line 101 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L100-L101

Added lines #L100 - L101 were not covered by tests

var currVal interface{}
var tmpVal interface{}
var exist bool

Check warning on line 105 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L103-L105

Added lines #L103 - L105 were not covered by tests

if serializationFormat == "msgpack" {
err := msgpack.Unmarshal(binaryBytes, &currVal)
if err != nil {
return nil, err

Check warning on line 110 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L107-L110

Added lines #L107 - L110 were not covered by tests

}
} else {
return nil, errors.Errorf(errors.PromiseAttributeResolveError, nodeID,
"Unsupported format '%v' found for literal value.\n"+
"Please ensure the serialization format is supported.", serializationFormat)

Check warning on line 116 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L113-L116

Added lines #L113 - L116 were not covered by tests
}

// Turn the current value to a map, so it can be resolved more easily
for _, attr := range bindAttrPath {
switch resolvedVal := currVal.(type) {

Check warning on line 121 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L120-L121

Added lines #L120 - L121 were not covered by tests
// map
case map[interface{}]interface{}:
tmpVal, exist = resolvedVal[attr.GetStringValue()]
if !exist {
return nil, errors.Errorf(errors.PromiseAttributeResolveError, nodeID, "key [%v] does not exist in literal %v", attr.GetStringValue(), currVal)

Check warning on line 126 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L123-L126

Added lines #L123 - L126 were not covered by tests
}
currVal = tmpVal

Check warning on line 128 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L128

Added line #L128 was not covered by tests
// list
case []interface{}:
if int(attr.GetIntValue()) >= len(resolvedVal) {
return nil, errors.Errorf(errors.PromiseAttributeResolveError, nodeID, "index [%v] is out of range of %v", attr.GetIntValue(), currVal)

Check warning on line 132 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L130-L132

Added lines #L130 - L132 were not covered by tests
}
currVal = resolvedVal[attr.GetIntValue()]

Check warning on line 134 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L134

Added line #L134 was not covered by tests
}
}

if serializationFormat == "msgpack" {

Check warning on line 138 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L138

Added line #L138 was not covered by tests
// Marshal the current value to MessagePack bytes
resolvedBinaryBytes, err := msgpack.Marshal(currVal)
if err != nil {
return nil, err

Check warning on line 142 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L140-L142

Added lines #L140 - L142 were not covered by tests
}
// Construct and return the binary-encoded literal
return constructResolvedBinary(resolvedBinaryBytes, serializationFormat), nil

Check warning on line 145 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L145

Added line #L145 was not covered by tests
}
// Unsupported serialization format
return nil, errors.Errorf(errors.PromiseAttributeResolveError, nodeID,
"Unsupported format '%v' found for literal value.\n"+
"Please ensure the serialization format is supported.", serializationFormat)

Check warning on line 150 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L148-L150

Added lines #L148 - L150 were not covered by tests
}

func constructResolvedBinary(resolvedBinaryBytes []byte, serializationFormat string) *core.Literal {
return &core.Literal{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Binary{
Binary: &core.Binary{
Value: resolvedBinaryBytes,
Tag: serializationFormat,
},
},
},
},

Check warning on line 164 in flytepropeller/pkg/controller/nodes/attr_path_resolver.go

View check run for this annotation

Codecov / codecov/patch

flytepropeller/pkg/controller/nodes/attr_path_resolver.go#L153-L164

Added lines #L153 - L164 were not covered by tests
}
}

// convertInterfaceToLiteral converts the protobuf struct (e.g. dataclass) to literal
func convertInterfaceToLiteral(nodeID string, obj interface{}) (*core.Literal, error) {

Expand Down
2 changes: 2 additions & 0 deletions flytepropeller/pkg/controller/nodes/branch/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ const ErrorCodeMalformedBranch = "MalformedBranchUserError"
const ErrorCodeCompilerError = "CompilerError"
const ErrorCodeFailedFetchOutputs = "FailedFetchOutputs"

// TODO: IF THERE'S BINARY, CONVERT IT TO PRIMITIVE.
// NEED ERROR HANDLING FOR NON-PRIMITIVE TYPES
func EvaluateComparison(expr *core.ComparisonExpression, nodeInputs *core.LiteralMap) (bool, error) {
var lValue *core.Literal
var rValue *core.Literal
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ require (
github.com/robfig/cron/v3 v3.0.0 // indirect
github.com/sendgrid/rest v2.6.9+incompatible // indirect
github.com/sendgrid/sendgrid-go v3.10.0+incompatible // indirect
github.com/shamaton/msgpack/v2 v2.2.2 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
Expand Down
Loading
Loading