diff --git a/db/db.go b/db/db.go index 5cde749a4..7b584ecd2 100644 --- a/db/db.go +++ b/db/db.go @@ -222,11 +222,12 @@ func createTables(MetadataDbClient MetadataStorage) error { ALTER TABLE schemas DROP CONSTRAINT IF EXISTS name; ALTER TABLE schemas DROP CONSTRAINT IF EXISTS schemas_name_tenant_name_key; ALTER TABLE schemas ADD CONSTRAINT schemas_name_tenant_name_key UNIQUE(name, tenant_name); + ALTER TYPE enum_type ADD VALUE 'avro'; END IF; END $$;` schemasTable := ` - CREATE TYPE enum_type AS ENUM ('json', 'graphql', 'protobuf'); + CREATE TYPE enum_type AS ENUM ('json', 'graphql', 'protobuf', 'avro'); CREATE TABLE IF NOT EXISTS schemas( id SERIAL NOT NULL, name VARCHAR NOT NULL, diff --git a/go.mod b/go.mod index 71604d225..901a27617 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/docker/docker v20.10.24+incompatible github.com/golang-jwt/jwt/v4 v4.5.0 github.com/graph-gophers/graphql-go v1.5.0 + github.com/hamba/avro/v2 v2.12.0 github.com/jackc/pgx/v5 v5.3.1 github.com/santhosh-tekuri/jsonschema/v5 v5.1.0 github.com/slack-go/slack v0.11.4 @@ -46,6 +47,7 @@ require ( require ( github.com/jackc/puddle/v2 v2.2.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect + github.com/mitchellh/mapstructure v1.5.0 // indirect ) require ( diff --git a/go.sum b/go.sum index e987d30e3..55b576c80 100644 --- a/go.sum +++ b/go.sum @@ -225,6 +225,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/graph-gophers/graphql-go v1.5.0 h1:fDqblo50TEpD0LY7RXk/LFVYEVqo3+tXMNMPSVXA1yc= github.com/graph-gophers/graphql-go v1.5.0/go.mod h1:YtmJZDLbF1YYNrlNAuiO5zAStUWc3XZT07iGsVqe1Os= +github.com/hamba/avro/v2 v2.12.0 h1:QZvbrfOfHQ7kZnlxRdwRU0opSf9ZrqlzpKzJuIUjIjU= +github.com/hamba/avro/v2 v2.12.0/go.mod h1:Q9YK+qxAhtVrNqOhwlZTATLgLA8qxG2vtvkhK8fJ7Jo= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -282,6 +284,8 @@ github.com/memphisdev/memphis.go v1.0.5 h1:sotMXz7p5j6ZUjYReiTy4uRohQvbT9YRboLIH github.com/memphisdev/memphis.go v1.0.5/go.mod h1:VsFe2Wrght9LnzP2JWRA+tDeJS/12+3xxREWYieV6FQ= github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= +github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= +github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/moby/spdystream v0.2.0 h1:cjW1zVyyoiM0T7b6UoySUFqzXMoqRckQtXwGPiBhOM8= github.com/moby/spdystream v0.2.0/go.mod h1:f7i0iNDQJ059oMTcWxx8MA/zKFIuD/lY+0GqbN2Wy8c= github.com/moby/term v0.0.0-20221205130635-1aeaba878587 h1:HfkjXDfhgVaN5rmueG8cL8KKeFNecRCXFhaJ2qZ5SKA= diff --git a/server/memphis_handlers_schemas.go b/server/memphis_handlers_schemas.go index 2997f6af3..78381db3d 100644 --- a/server/memphis_handlers_schemas.go +++ b/server/memphis_handlers_schemas.go @@ -31,6 +31,7 @@ import ( "github.com/graph-gophers/graphql-go" "github.com/jhump/protoreflect/desc/protoparse" "github.com/santhosh-tekuri/jsonschema/v5" + "github.com/hamba/avro/v2" ) type SchemasHandler struct{ S *Server } @@ -76,6 +77,14 @@ func validateGraphqlSchemaContent(schemaContent string) error { return nil } +func validateAvroSchemaContent(schemaContent string) error { + _, err := avro.Parse(schemaContent) + if err != nil { + return fmt.Errorf("your Avro file is invalid: %v", err.Error()) + } + return nil +} + func generateProtobufDescriptor(schemaName string, schemaVersionNum int, schemaContent string) ([]byte, error) { filename := fmt.Sprintf("%v_%v.proto", schemaName, schemaVersionNum) descFilename := fmt.Sprintf("%v_%v_desc", schemaName, schemaVersionNum) @@ -114,13 +123,9 @@ func validateSchemaName(schemaName string) error { func validateSchemaType(schemaType string) error { invalidTypeErrStr := "unsupported schema type" invalidTypeErr := errors.New(invalidTypeErrStr) - invalidSupportTypeErrStr := "avro is not supported at this time" - invalidSupportTypeErr := errors.New(invalidSupportTypeErrStr) - if schemaType == "protobuf" || schemaType == "json" || schemaType == "graphql" { + if schemaType == "protobuf" || schemaType == "json" || schemaType == "graphql" || schemaType == "avro" { return nil - } else if schemaType == "avro" { - return invalidSupportTypeErr } else { return invalidTypeErr } @@ -148,7 +153,10 @@ func validateSchemaContent(schemaContent, schemaType string) error { return err } case "avro": - break + err := validateAvroSchemaContent(schemaContent) + if err != nil { + return err + } } return nil } diff --git a/server/memphis_handlers_user_mgmt.go b/server/memphis_handlers_user_mgmt.go index c7cac26bb..f0bc9b28d 100644 --- a/server/memphis_handlers_user_mgmt.go +++ b/server/memphis_handlers_user_mgmt.go @@ -729,7 +729,7 @@ func (umh UserMgmtHandler) GetFilterDetails(c *gin.Context) { return } - schemaType := []string{"protobuf", "json", "graphql"} + schemaType := []string{"protobuf", "json", "graphql", "avro"} usage := []string{"used", "not used"} c.IndentedJSON(200, gin.H{"tags": tags, "users": users, "type": schemaType, "usage": usage}) return diff --git a/ui_src/package.json b/ui_src/package.json index d91932d82..c495600a3 100644 --- a/ui_src/package.json +++ b/ui_src/package.json @@ -13,6 +13,7 @@ "ajv": "^8.11.2", "ajv-draft-04": "^1.0.0", "antd": "4.23.1", + "avro-js": "^1.11.2", "axios": "^0.25.0", "buffer": "^6.0.3", "chart.js": "^2.9.4", diff --git a/ui_src/src/domain/schema/components/createSchema/index.js b/ui_src/src/domain/schema/components/createSchema/index.js index 80da479ae..7525b8044 100644 --- a/ui_src/src/domain/schema/components/createSchema/index.js +++ b/ui_src/src/domain/schema/components/createSchema/index.js @@ -43,6 +43,7 @@ import { Context } from '../../../../hooks/store'; import Input from '../../../../components/Input'; import Modal from '../../../../components/modal'; import AttachStationModal from '../attachStationModal'; +const avro = require('avro-js') loader.init(); loader.config({ monaco }); @@ -73,15 +74,14 @@ const schemaTypes = [ }, { id: 4, - value: 'avro', - label: 'Avro (Coming soon)', + value: 'Avro', + label: 'Avro', description: ( The popular. Apache Avro™ is the leading serialization format for record data, and first choice for streaming data pipelines. It offers excellent schema evolution. - ), - disabled: true + ) } ]; @@ -96,15 +96,15 @@ message Test { }` }, Avro: { - language: 'avro', + language: 'json', // Avro stores the data definition in JSON format. value: `{ "type": "record", "namespace": "com.example", - "name": "test-schema", + "name": "test_schema", "fields": [ { "name": "username", "type": "string", "default": "-2" }, - { "name": "age", "type": "int", "default": "none" }, - { "name": "phone", "type": "int", "default": "NONE" }, + { "name": "age", "type": "int" }, + { "name": "phone", "type": "long" }, { "name": "country", "type": "string", "default": "NONE" } ] }` @@ -355,6 +355,17 @@ function CreateSchema({ createNew }) { } }; + const validateAvroSchema = (value) => { + try { + avro.parse(value); + setValidateSuccess(''); + setValidateError(''); + } catch (error) { + setValidateSuccess(''); + setValidateError('Your schema is invalid'); + } + }; + const checkContent = (value) => { const { type } = formFields; if (value === ' ' || value === '') { @@ -368,6 +379,8 @@ function CreateSchema({ createNew }) { validateJsonSchema(value); } else if (type === 'GraphQL') { validateGraphQlSchema(value); + } else if (type === 'Avro') { + validateAvroSchema(value); } } }; @@ -536,6 +549,7 @@ function CreateSchema({ createNew }) { {formFields?.type === 'Protobuf' && schemaContentEditor} {formFields?.type === 'Json' && schemaContentEditor} {formFields?.type === 'GraphQL' && schemaContentEditor} + {formFields?.type === 'Avro' && schemaContentEditor}