Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

avro schema validation #105

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ c.Produce("station_name_c_produce", "producer_name_a", []byte("Hey There!"), []m

Creating a producer first (receiver function of the producer struct).
```go
p.Produce("<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema)>", memphis.AckWaitSec(15)) // defaults to 15 seconds
p.Produce("<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema) or []byte or map[string]interface{} or struct with avro tags(schema validated station - avro schema)>", memphis.AckWaitSec(15)) // defaults to 15 seconds
```

### Add headers
Expand All @@ -230,7 +230,7 @@ hdrs := memphis.Headers{}
hdrs.New()
err := hdrs.Add("key", "value")
p.Produce(
"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema)>",
"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema) or []byte or map[string]interface{} or struct with avro tags(schema validated station - avro schema)>",
memphis.AckWaitSec(15),
memphis.MsgHeaders(hdrs) // defaults to empty
)
Expand All @@ -241,7 +241,7 @@ Meaning your application won't wait for broker acknowledgement - use only in cas

```go
p.Produce(
"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema)>",
"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema) or []byte or map[string]interface{} or struct with avro tags(schema validated station - avro schema)>",
memphis.AckWaitSec(15),
memphis.AsyncProduce()
)
Expand All @@ -252,7 +252,7 @@ Stations are idempotent by default for 2 minutes (can be configured), Idempotenc

```go
p.Produce(
"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema)>",
"<message in []byte or map[string]interface{}/[]byte or protoreflect.ProtoMessage or map[string]interface{}(schema validated station - protobuf)/struct with json tags or map[string]interface{} or interface{}(schema validated station - json schema) or []byte/string (schema validated station - graphql schema) or []byte or map[string]interface{} or struct with avro tags(schema validated station - avro schema)>",
memphis.AckWaitSec(15),
memphis.MsgId("343")
)
Expand Down
5 changes: 5 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ require (

require (
github.com/golang/protobuf v1.5.2 // indirect
github.com/hamba/avro/v2 v2.13.0 // indirect
github.com/jhump/protoreflect v1.13.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/nats-io/nats-server/v2 v2.9.5 // indirect
github.com/nats-io/nkeys v0.4.4 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
Expand Down
13 changes: 13 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
Expand Down Expand Up @@ -37,11 +38,14 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/graph-gophers/graphql-go v1.4.0 h1:JE9wveRTSXwJyjdRd6bOQ7Ob5bewTUQ58Jv4OiVdpdE=
github.com/graph-gophers/graphql-go v1.4.0/go.mod h1:YtmJZDLbF1YYNrlNAuiO5zAStUWc3XZT07iGsVqe1Os=
github.com/graph-gophers/graphql-go v1.5.0 h1:fDqblo50TEpD0LY7RXk/LFVYEVqo3+tXMNMPSVXA1yc=
github.com/graph-gophers/graphql-go v1.5.0/go.mod h1:YtmJZDLbF1YYNrlNAuiO5zAStUWc3XZT07iGsVqe1Os=
github.com/hamba/avro/v2 v2.13.0 h1:QY2uX2yvJTW0OoMKelGShvq4v1hqab6CxJrPwh0fnj0=
github.com/hamba/avro/v2 v2.13.0/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo=
github.com/jhump/gopoet v0.0.0-20190322174617-17282ff210b3/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
github.com/jhump/gopoet v0.1.0/go.mod h1:me9yfT6IJSlOL3FCfrg+L6yzUEZ+5jW6WHt4Sk+UPUI=
github.com/jhump/goprotoc v0.5.0/go.mod h1:VrbvcYrQOrTi3i0Vf+m+oqQWk9l72mjkJCYo7UvLHRQ=
Expand All @@ -50,11 +54,20 @@ github.com/jhump/protoreflect v1.13.0 h1:zrrZqa7JAc2YGgPSzZZkmUXJ5G6NRPdxOg/9t7I
github.com/jhump/protoreflect v1.13.0/go.mod h1:JytZfP5d0r8pVNLZvai7U/MCuTWITgrI4tTg7puQFKI=
github.com/jhump/protoreflect v1.15.1 h1:HUMERORf3I3ZdX05WaQ6MIpd/NJ434hTp5YiKgfCL6c=
github.com/jhump/protoreflect v1.15.1/go.mod h1:jD/2GMKKE6OqX8qTjhADU1e6DShO+gavG9e0Q693nKo=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/nats-server/v2 v2.9.5 h1:TlduKZ9YGoM0n34Lhm6AN0zRFOt/G3jTy9mPxXnE6dU=
github.com/nats-io/nats-server/v2 v2.9.5/go.mod h1:AB6hAnGZDlYfqb7CTAm66ZKMZy9DpfierY1/PbpvI2g=
Expand Down
6 changes: 1 addition & 5 deletions schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,9 @@ func validateName(name, objectType string) error {
func validateSchemaType(schemaType string) error {
invalidTypeErrStr := "unsupported schema type"
invalidTypeErr := errors.New(invalidTypeErrStr)
invalidSupportTypeErrStr := "avro is not supported at this time"
invalidSupportTypeErr := errors.New(invalidSupportTypeErrStr)

if schemaType == "protobuf" || schemaType == "json" || schemaType == "graphql" {
if schemaType == "protobuf" || schemaType == "json" || schemaType == "graphql" || schemaType == "avro" {
return nil
} else if schemaType == "avro" {
return invalidSupportTypeErr
} else {
return invalidTypeErr
}
Expand Down
6 changes: 6 additions & 0 deletions schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,10 @@ func TestCreateSchema(t *testing.T) {
fmt.Println("json Created!!")
}

err = c.CreateSchema("sdk_test_schema_avro", "avro", "./test_schemas/test.avsc")
if err != nil {
fmt.Println(err.Error())
} else {
fmt.Println("avro Created!!")
}
}
72 changes: 72 additions & 0 deletions station.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"github.com/hamba/avro/v2"
"github.com/nats-io/nats.go"

graphqlParse "github.com/graph-gophers/graphql-go"
Expand Down Expand Up @@ -314,6 +315,7 @@ type schemaDetails struct {
msgDescriptor protoreflect.MessageDescriptor
jsonSchema *jsonschema.Schema
graphQlSchema *graphqlParse.Schema
avroSchema avro.Schema
}

func (c *Conn) listenToSchemaUpdates(stationName string) error {
Expand Down Expand Up @@ -425,6 +427,10 @@ func (sd *schemaDetails) handleSchemaUpdateInit(sui SchemaUpdateInit) {
if err := sd.compileGraphQl(); err != nil {
log.Println(err.Error())
}
} else if sd.schemaType == "avro" {
if err := sd.compileAvroSchema(); err != nil {
log.Println(err.Error())
}
}
}

Expand Down Expand Up @@ -480,6 +486,15 @@ func (sd *schemaDetails) compileGraphQl() error {
return nil
}

func (sd *schemaDetails) compileAvroSchema() error {
sch, err := avro.Parse(sd.activeVersion.Content)
if err != nil {
return memphisError(err)
}
sd.avroSchema = sch
return nil
}

func (sd *schemaDetails) validateMsg(msg any) ([]byte, error) {
switch sd.schemaType {
case "protobuf":
Expand All @@ -488,6 +503,8 @@ func (sd *schemaDetails) validateMsg(msg any) ([]byte, error) {
return sd.validJsonSchemaMsg(msg)
case "graphql":
return sd.validateGraphQlMsg(msg)
case "avro":
return sd.validAvroSchemaMsg(msg)
default:
return nil, memphisError(errors.New("invalid schema type"))
}
Expand Down Expand Up @@ -614,3 +631,58 @@ func (sd *schemaDetails) validateGraphQlMsg(msg any) ([]byte, error) {
}
return msgBytes, nil
}

func (sd *schemaDetails) validAvroSchemaMsg(msg any) ([]byte, error) {
var (
msgBytes []byte
err error
message interface{}
)

if err != nil {
log.Fatal(err)
}

switch msg.(type) {
case []byte:
msgBytes = msg.([]byte)
if err := json.Unmarshal(msgBytes, &message); err != nil {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

myData := []byte({"username": "john", "age": 30})
When json unmarshal the above data, int type is converted to float64 thus causing schema validation error. So when user is defining avro schema, schema can't have int type. It should be double rather than int.

https://github.com/hamba/avro#types-conversions

Screenshot 2023-07-26 at 1 18 08 PM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any workaround to solve this? otherwise we should mention it on the UI as well

Copy link
Contributor Author

@Big-Vi Big-Vi Jul 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any workaround to solve this? otherwise we should mention it on the UI as well

I can think of the below solution. But this solution converts int to int64. Still we need to add note in the UI.
So i reckon leave it as it is and add note in the UI.

https://go.dev/play/p/SkG6qlolz1B

myData := `{"username": "John", "age": 30}`
var m map[string]interface{}

dec := json.NewDecoder(strings.NewReader(myData))
dec.UseNumber()

err := dec.Decode(&m)
if err != nil {
     log.Fatal(err)
}

m["age"], err = m["age"].(json.Number).Int64()
if err != nil {
	log.Fatal(err)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok totally agree

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this thing happening on other SDKs as well or only in Go? if no so let's add a comment about it in the readme file of Go sdk it is enough

err = errors.New("Bad Avro format - " + err.Error())
return nil, memphisError(err)
}
case map[string]interface{}:
msgBytes, err = json.Marshal(msg)
if err != nil {
return nil, memphisError(err)
}
if err := json.Unmarshal(msgBytes, &message); err != nil {
err = errors.New("Bad Avro format - " + err.Error())
return nil, memphisError(err)
}

default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In protobuf we allow users to send map[string]interface{} in order people won't have to hole the proto file locally, is it possible here as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I've added it now.

msgType := reflect.TypeOf(msg).Kind()
if msgType == reflect.Struct {
msgBytes, err = avro.Marshal(sd.avroSchema, msg)
if err != nil {
return nil, memphisError(err)
}
if err := avro.Unmarshal(sd.avroSchema, msgBytes, &message); err != nil {
return nil, memphisError(err)
}
// Serialize it back after validation and unmarshalling
msgBytes, err = json.Marshal(message)
if err != nil {
return nil, memphisError(err)
}
} else {
return nil, memphisError(errors.New("unsupported message type"))
}
}

if _, err = avro.Marshal(sd.avroSchema, message); err != nil {
return msgBytes, memphisError(err)
}

return msgBytes, nil
}
11 changes: 11 additions & 0 deletions test_schemas/test.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"type": "record",
"namespace": "com.example",
"name": "test_schema",
"fields": [
{ "name": "username", "type": "string", "default": "-2" },
{ "name": "age", "type": "int" },
{ "name": "phone", "type": "long" },
{ "name": "country", "type": "string", "default": "NONE" }
]
}