Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Flyte][3][flytepropeller][Attribute Access][flytectl] Binary IDL With MessagePack #5763

Merged
merged 11 commits into from
Oct 5, 2024
1 change: 1 addition & 0 deletions flyteadmin/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ require (
github.com/prometheus/procfs v0.10.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20200313005456-10cdbea86bc0 // indirect
github.com/sendgrid/rest v2.6.9+incompatible // indirect
github.com/shamaton/msgpack/v2 v2.2.2 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions flytecopilot/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/shamaton/msgpack/v2 v2.2.2 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions flytecopilot/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shamaton/msgpack/v2 v2.2.2 h1:GOIg0c9LV04VwzOOqZSrmsv/JzjNOOMxnS/HvOHGdgs=
github.com/shamaton/msgpack/v2 v2.2.2/go.mod h1:6khjYnkx73f7VQU7wjcFS9DFjs+59naVWJv1TB7qdOI=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
Expand Down
1 change: 1 addition & 0 deletions flytectl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ require (
github.com/prometheus/procfs v0.10.1 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/shamaton/msgpack/v2 v2.2.2 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions flytectl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,8 @@ github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shamaton/msgpack/v2 v2.2.2 h1:GOIg0c9LV04VwzOOqZSrmsv/JzjNOOMxnS/HvOHGdgs=
github.com/shamaton/msgpack/v2 v2.2.2/go.mod h1:6khjYnkx73f7VQU7wjcFS9DFjs+59naVWJv1TB7qdOI=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
Expand Down
2 changes: 2 additions & 0 deletions flyteidl/clients/go/coreutils/extract_literal.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ func ExtractFromLiteral(literal *core.Literal) (interface{}, error) {
default:
return nil, fmt.Errorf("unsupported literal scalar primitive type %T", scalarValue)
}
case *core.Scalar_Binary:
return scalarValue.Binary, nil
case *core.Scalar_Blob:
return scalarValue.Blob.Uri, nil
case *core.Scalar_Schema:
Expand Down
30 changes: 1 addition & 29 deletions flyteidl/clients/go/coreutils/extract_literal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestFetchLiteral(t *testing.T) {
s := MakeBinaryLiteral([]byte{'h'})
assert.Equal(t, []byte{'h'}, s.GetScalar().GetBinary().GetValue())
_, err := ExtractFromLiteral(s)
assert.NotNil(t, err)
assert.Nil(t, err)
})

t.Run("NoneType", func(t *testing.T) {
Expand All @@ -124,34 +124,6 @@ func TestFetchLiteral(t *testing.T) {
assert.Nil(t, err)
})

t.Run("Generic", func(t *testing.T) {
literalVal := map[string]interface{}{
"x": 1,
"y": "ystringvalue",
}
var literalType = &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRUCT}}
lit, err := MakeLiteralForType(literalType, literalVal)
Comment on lines -127 to -133
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert.NoError(t, err)
extractedLiteralVal, err := ExtractFromLiteral(lit)
assert.NoError(t, err)
fieldsMap := map[string]*structpb.Value{
"x": {
Kind: &structpb.Value_NumberValue{NumberValue: 1},
},
"y": {
Kind: &structpb.Value_StringValue{StringValue: "ystringvalue"},
},
}
expectedStructVal := &structpb.Struct{
Fields: fieldsMap,
}
extractedStructValue := extractedLiteralVal.(*structpb.Struct)
assert.Equal(t, len(expectedStructVal.Fields), len(extractedStructValue.Fields))
for key, val := range expectedStructVal.Fields {
assert.Equal(t, val.Kind, extractedStructValue.Fields[key].Kind)
}
})

t.Run("Generic Passed As String", func(t *testing.T) {
literalVal := "{\"x\": 1,\"y\": \"ystringvalue\"}"
var literalType = &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRUCT}}
Expand Down
34 changes: 30 additions & 4 deletions flyteidl/clients/go/coreutils/literals.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
package coreutils

import (
"encoding/json"
"fmt"
"math"
"reflect"
Expand All @@ -14,11 +13,14 @@ import (
"github.com/golang/protobuf/ptypes"
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/pkg/errors"
"github.com/shamaton/msgpack/v2"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/storage"
)

const messagepack = "msgpack"

func MakePrimitive(v interface{}) (*core.Primitive, error) {
switch p := v.(type) {
case int:
Expand Down Expand Up @@ -144,6 +146,7 @@ func MakeBinaryLiteral(v []byte) *core.Literal {
Value: &core.Scalar_Binary{
Binary: &core.Binary{
Value: v,
Tag: messagepack,
},
},
},
Expand Down Expand Up @@ -389,7 +392,7 @@ func MakeLiteralForSimpleType(t core.SimpleType, s string) (*core.Literal, error
scalar.Value = &core.Scalar_Binary{
Binary: &core.Binary{
Value: []byte(s),
// TODO Tag not supported at the moment
Tag: messagepack,
},
}
case core.SimpleType_ERROR:
Expand Down Expand Up @@ -559,12 +562,35 @@ func MakeLiteralForType(t *core.LiteralType, v interface{}) (*core.Literal, erro
strValue = fmt.Sprintf("%.0f", math.Trunc(f))
}
if newT.Simple == core.SimpleType_STRUCT {
// If the type is a STRUCT, we expect the input to be a complex object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this not break if the clients or otherside have the old view of struct?
i.e, this seems backwards incompatible?
cc @EngHabu

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will not break JSON string case, since this is not a string type.

// like the following example:
// inputs:
// dc:
// a: 1
// b: 3.14
// c: "example_string"
// Instead of storing it directly as a structured value, we will serialize
// the input object using MsgPack and return it as a binary IDL object.

// If the value is not already a string (meaning it's not already serialized),
// proceed with serialization.
if _, isValueStringType := v.(string); !isValueStringType {
byteValue, err := json.Marshal(v)
byteValue, err := msgpack.Marshal(v)
if err != nil {
return nil, fmt.Errorf("unable to marshal to json string for struct value %v", v)
}
strValue = string(byteValue)
return &core.Literal{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Binary{
Binary: &core.Binary{
Value: byteValue,
Tag: messagepack,
},
},
},
},
}, nil
}
}
lv, err := MakeLiteralForSimpleType(newT.Simple, strValue)
Expand Down
73 changes: 73 additions & 0 deletions flyteidl/clients/go/coreutils/literals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/golang/protobuf/ptypes"
structpb "github.com/golang/protobuf/ptypes/struct"
"github.com/pkg/errors"
"github.com/shamaton/msgpack/v2"
"github.com/stretchr/testify/assert"

"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
Expand Down Expand Up @@ -242,6 +243,16 @@ func TestMakeDefaultLiteralForType(t *testing.T) {
assert.NotNil(t, l.GetScalar().GetError())
})

t.Run("binary", func(t *testing.T) {
l, err := MakeDefaultLiteralForType(&core.LiteralType{Type: &core.LiteralType_Simple{
Simple: core.SimpleType_BINARY,
}})
assert.NoError(t, err)
assert.NotNil(t, l.GetScalar().GetBinary())
assert.NotNil(t, l.GetScalar().GetBinary().GetValue())
assert.NotNil(t, l.GetScalar().GetBinary().GetTag())
})

t.Run("struct", func(t *testing.T) {
l, err := MakeDefaultLiteralForType(&core.LiteralType{Type: &core.LiteralType_Simple{
Simple: core.SimpleType_STRUCT,
Expand Down Expand Up @@ -444,6 +455,68 @@ func TestMakeLiteralForType(t *testing.T) {
assert.Equal(t, expectedVal, actualVal)
})

t.Run("SimpleBinary", func(t *testing.T) {
// We compare the deserialized values instead of the raw msgpack bytes because Go does not guarantee the order
// of map keys during serialization. This means that while the serialized bytes may differ, the deserialized
// values should be logically equivalent.

var literalType = &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRUCT}}
v := map[string]interface{}{
"a": int64(1),
"b": 3.14,
"c": "example_string",
"d": map[string]interface{}{
"1": int64(100),
"2": int64(200),
},
"e": map[string]interface{}{
"a": int64(1),
"b": 3.14,
},
"f": []string{"a", "b", "c"},
}

val, err := MakeLiteralForType(literalType, v)
assert.NoError(t, err)

msgpackBytes, err := msgpack.Marshal(v)
assert.NoError(t, err)

literalVal := &core.Literal{
Value: &core.Literal_Scalar{
Scalar: &core.Scalar{
Value: &core.Scalar_Binary{
Binary: &core.Binary{
Value: msgpackBytes,
Tag: messagepack,
},
},
},
},
}

expectedLiteralVal, err := ExtractFromLiteral(literalVal)
assert.NoError(t, err)
actualLiteralVal, err := ExtractFromLiteral(val)
assert.NoError(t, err)

// Check if the extracted value is of type *core.Binary (not []byte)
expectedBinary, ok := expectedLiteralVal.(*core.Binary)
assert.True(t, ok, "expectedLiteralVal is not of type *core.Binary")
actualBinary, ok := actualLiteralVal.(*core.Binary)
assert.True(t, ok, "actualLiteralVal is not of type *core.Binary")

// Now check if the Binary values match
var expectedVal, actualVal map[string]interface{}
err = msgpack.Unmarshal(expectedBinary.Value, &expectedVal)
assert.NoError(t, err)
err = msgpack.Unmarshal(actualBinary.Value, &actualVal)
assert.NoError(t, err)

// Finally, assert that the deserialized values are equal
assert.Equal(t, expectedVal, actualVal)
})

t.Run("ArrayStrings", func(t *testing.T) {
var literalType = &core.LiteralType{Type: &core.LiteralType_CollectionType{
CollectionType: &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRING}}}}
Expand Down
1 change: 1 addition & 0 deletions flyteidl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/mitchellh/mapstructure v1.5.0
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c
github.com/pkg/errors v0.9.1
github.com/shamaton/msgpack/v2 v2.2.2
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.9.0
golang.org/x/net v0.27.0
Expand Down
2 changes: 2 additions & 0 deletions flyteidl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPH
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shamaton/msgpack/v2 v2.2.2 h1:GOIg0c9LV04VwzOOqZSrmsv/JzjNOOMxnS/HvOHGdgs=
github.com/shamaton/msgpack/v2 v2.2.2/go.mod h1:6khjYnkx73f7VQU7wjcFS9DFjs+59naVWJv1TB7qdOI=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
Expand Down
1 change: 1 addition & 0 deletions flyteplugins/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ require (
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/shamaton/msgpack/v2 v2.2.2 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.4.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions flyteplugins/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shamaton/msgpack/v2 v2.2.2 h1:GOIg0c9LV04VwzOOqZSrmsv/JzjNOOMxnS/HvOHGdgs=
github.com/shamaton/msgpack/v2 v2.2.2/go.mod h1:6khjYnkx73f7VQU7wjcFS9DFjs+59naVWJv1TB7qdOI=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
Expand Down
4 changes: 3 additions & 1 deletion flytepropeller/pkg/compiler/validators/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
)

const messagepack = "msgpack"

func containsBindingByVariableName(bindings []*core.Binding, name string) (found bool) {
for _, b := range bindings {
if b.Var == name {
Expand Down Expand Up @@ -47,7 +49,7 @@ func literalTypeForScalar(scalar *core.Scalar) *core.LiteralType {
// If the binary has a tag, treat it as a structured type (e.g., dict, dataclass, Pydantic BaseModel).
// Otherwise, treat it as raw binary data.
// Reference: https://github.com/flyteorg/flyte/blob/master/rfc/system/5741-binary-idl-with-message-pack.md
if len(v.Binary.Tag) > 0 {
if v.Binary.Tag == messagepack {
literalType = &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_STRUCT}}
} else {
literalType = &core.LiteralType{Type: &core.LiteralType_Simple{Simple: core.SimpleType_BINARY}}
Expand Down
4 changes: 2 additions & 2 deletions flytepropeller/pkg/compiler/validators/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestLiteralTypeForLiterals(t *testing.T) {
Value: &core.Scalar_Binary{
Binary: &core.Binary{
Value: serializedBinaryData,
Tag: "msgpack",
Tag: messagepack,
},
},
},
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestLiteralTypeForLiterals(t *testing.T) {
Value: &core.Scalar_Binary{
Binary: &core.Binary{
Value: serializedBinaryData,
Tag: "msgpack",
Tag: messagepack,
},
},
},
Expand Down
Loading
Loading