diff --git a/pkg/processor/schemaregistry/decoder.go b/pkg/processor/schemaregistry/decoder.go new file mode 100644 index 000000000..14dbd2165 --- /dev/null +++ b/pkg/processor/schemaregistry/decoder.go @@ -0,0 +1,90 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schemaregistry + +import ( + "context" + + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/record" + "github.com/lovromazgon/franz-go/pkg/sr" +) + +type Decoder struct { + client *Client + serde *sr.Serde + logger log.CtxLogger +} + +func NewDecoder(client *Client, logger log.CtxLogger, serde *sr.Serde) *Decoder { + return &Decoder{ + client: client, + serde: serde, + logger: logger.WithComponent("schemaregistry.Decoder"), + } +} + +func (d *Decoder) Decode(ctx context.Context, b record.RawData) (record.StructuredData, error) { + var out record.StructuredData + err := d.serde.Decode(b.Raw, &out) + if cerrors.Is(err, sr.ErrNotRegistered) { + err = d.findAndRegisterSchema(ctx, b) + if err != nil { + return nil, err + } + // retry decoding + err = d.serde.Decode(b.Raw, &out) + } + if err != nil { + return nil, cerrors.Errorf("failed to decode raw data: %w", err) + } + + return out, nil +} + +func (d *Decoder) findAndRegisterSchema(ctx context.Context, b record.RawData) error { + id, _, _ := d.serde.Header().DecodeID(b.Raw) // we know this won't throw an error since Decode didn't return ErrBadHeader + s, err := d.client.SchemaByID(ctx, id) + if err != nil { + return cerrors.Errorf("failed to get schema: %w", err) + } + sf, ok := DefaultSchemaFactories[s.Type] + if !ok { + return cerrors.Errorf("unknown schema type %q (%d)", s.Type.String(), s.Type) + } + schema, err := sf.Parse(s.Schema) + if err != nil { + return cerrors.Errorf("failed to parse schema: %w", err) + } + + d.serde.Register( + id, + record.StructuredData{}, + sr.EncodeFn(encodeFn(schema, sr.SubjectSchema{ID: id})), + sr.DecodeFn(decodeFn(schema, sr.SubjectSchema{ID: id})), + ) + return nil +} + +func decodeFn(schema Schema, ss sr.SubjectSchema) func(b []byte, a any) error { + return func(b []byte, a any) error { + err := schema.Unmarshal(b, a) + if err != nil { + return cerrors.Errorf("failed to unmarshal data with schema (ID: %v, subject: %v, version: %v): %w", ss.ID, ss.Subject, ss.Version, err) + } + return nil + } +} diff --git a/pkg/processor/schemaregistry/encoder.go b/pkg/processor/schemaregistry/encoder.go new file mode 100644 index 000000000..f69a5a66d --- /dev/null +++ b/pkg/processor/schemaregistry/encoder.go @@ -0,0 +1,133 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schemaregistry + +import ( + "context" + + "github.com/conduitio/conduit/pkg/foundation/cerrors" + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/record" + "github.com/lovromazgon/franz-go/pkg/sr" +) + +type Encoder struct { + client *Client + serde *sr.Serde + logger log.CtxLogger + + SchemaStrategy +} + +type SchemaStrategy interface { + GetSchema(context.Context, *Client, log.CtxLogger, record.StructuredData) (Schema, sr.SubjectSchema, error) +} + +func NewEncoder(client *Client, logger log.CtxLogger, serde *sr.Serde, strategy SchemaStrategy) *Encoder { + return &Encoder{ + client: client, + serde: serde, + logger: logger.WithComponent("schemaregistry.Encoder"), + SchemaStrategy: strategy, + } +} + +func (e *Encoder) Encode(ctx context.Context, sd record.StructuredData) (record.RawData, error) { + s, ss, err := e.GetSchema(ctx, e.client, e.logger, sd) + if err != nil { + return record.RawData{}, cerrors.Errorf("failed to get schema: %w", err) + } + + b, err := e.serde.Encode(sd, sr.ID(ss.ID)) + if cerrors.Is(err, sr.ErrNotRegistered) { + // TODO note that we need to register specific indexes when adding support for protobuf + e.serde.Register( + ss.ID, + record.StructuredData{}, + sr.EncodeFn(encodeFn(s, ss)), + sr.DecodeFn(decodeFn(s, ss)), + ) + + // try to encode again + b, err = e.serde.Encode(sd, sr.ID(ss.ID)) + } + if err != nil { + return record.RawData{}, cerrors.Errorf("failed to encode data: %w", err) + } + return record.RawData{Raw: b}, nil +} + +type ExtractAndUploadSchemaStrategy struct { + Type sr.SchemaType + Subject string +} + +func (str ExtractAndUploadSchemaStrategy) GetSchema(ctx context.Context, client *Client, _ log.CtxLogger, sd record.StructuredData) (Schema, sr.SubjectSchema, error) { + sf, ok := DefaultSchemaFactories[str.Type] + if !ok { + return nil, sr.SubjectSchema{}, cerrors.Errorf("unknown schema type %q (%d)", str.Type.String(), str.Type) + } + + s, err := sf.SchemaForType(sd) + if err != nil { + return nil, sr.SubjectSchema{}, cerrors.Errorf("could not extract avro schema: %w", err) + } + + ss, err := client.CreateSchema(ctx, str.Subject, sr.Schema{ + Schema: s.String(), + Type: str.Type, + References: nil, + }) + if err != nil { + return nil, sr.SubjectSchema{}, cerrors.Errorf("could not create schema: %w", err) + } + + return s, ss, nil +} + +type DownloadSchemaStrategy struct { + Subject string + // TODO add support for specifying "latest" + Version int +} + +func (str DownloadSchemaStrategy) GetSchema(ctx context.Context, client *Client, _ log.CtxLogger, _ record.StructuredData) (Schema, sr.SubjectSchema, error) { + // fetch schema from registry + ss, err := client.SchemaBySubjectVersion(ctx, str.Subject, str.Version) + if err != nil { + return nil, sr.SubjectSchema{}, cerrors.Errorf("could not fetch schema with subject %q and version %q: %w", str.Subject, str.Version, err) + } + + sf, ok := DefaultSchemaFactories[ss.Type] + if !ok { + return nil, sr.SubjectSchema{}, cerrors.Errorf("unknown schema type %q (%d)", ss.Type.String(), ss.Type) + } + + s, err := sf.Parse(ss.Schema.Schema) + if err != nil { + return nil, sr.SubjectSchema{}, err + } + return s, ss, nil +} + +func encodeFn(schema Schema, ss sr.SubjectSchema) func(v any) ([]byte, error) { + return func(v any) ([]byte, error) { + b, err := schema.Marshal(v) + if err != nil { + return nil, cerrors.Errorf("failed to marshal data with schema (ID: %v, subject: %v, version: %v): %w", ss.ID, ss.Subject, ss.Version, err) + } + return b, nil + } +} diff --git a/pkg/processor/schemaregistry/encoder_test.go b/pkg/processor/schemaregistry/encoder_test.go new file mode 100644 index 000000000..9200a0d19 --- /dev/null +++ b/pkg/processor/schemaregistry/encoder_test.go @@ -0,0 +1,129 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schemaregistry + +import ( + "context" + "testing" + + "github.com/conduitio/conduit/pkg/foundation/log" + "github.com/conduitio/conduit/pkg/record" + "github.com/lovromazgon/franz-go/pkg/sr" + "github.com/matryer/is" +) + +func TestEncodeDecode_ExtractAndUploadSchemaStrategy(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.Nop() + + var serde sr.Serde + client, err := NewClient(logger, sr.URLs(testSchemaRegistryURL(t))) + is.NoErr(err) + + have := record.StructuredData{ + "myString": "bar", + "myInt": 1, + "myFloat": 2.3, + "myMap": map[string]any{ + "foo": true, + "bar": 2.2, + }, + "myStruct": record.StructuredData{ + "foo": 1, + "bar": false, + }, + "mySlice": []int{1, 2, 3}, + } + want := record.StructuredData{ + "myString": "bar", + "myInt": 1, + "myFloat": 2.3, + "myMap": map[string]any{ + "foo": true, + "bar": 2.2, + }, + "myStruct": map[string]any{ // records are unmarshaled into a map + "foo": 1, + "bar": false, + }, + "mySlice": []any{1, 2, 3}, // slice without type + } + + for schemaType := range DefaultSchemaFactories { + t.Run(schemaType.String(), func(t *testing.T) { + is := is.New(t) + enc := NewEncoder(client, logger, &serde, ExtractAndUploadSchemaStrategy{ + Type: schemaType, + Subject: "test1" + schemaType.String(), + }) + dec := NewDecoder(client, logger, &serde) + + bytes, err := enc.Encode(ctx, have) + is.NoErr(err) + + got, err := dec.Decode(ctx, bytes) + is.NoErr(err) + + is.Equal(want, got) + }) + } +} + +func TestEncodeDecode_DownloadStrategy_Avro(t *testing.T) { + is := is.New(t) + ctx := context.Background() + logger := log.Nop() + + var serde sr.Serde + client, err := NewClient(logger, sr.URLs(testSchemaRegistryURL(t))) + is.NoErr(err) + + have := record.StructuredData{ + "myString": "bar", + "myInt": 1, + } + want := record.StructuredData{ + "myString": "bar", + "myInt": 1, + } + ss, err := client.CreateSchema(ctx, "test2", sr.Schema{ + Type: sr.TypeAvro, + Schema: ` +{ + "type":"record", + "name":"record", + "fields":[ + {"name":"myString","type":"string"}, + {"name":"myInt","type":"int"} + ] +}`, + }) + is.NoErr(err) + + enc := NewEncoder(client, logger, &serde, DownloadSchemaStrategy{ + Subject: ss.Subject, + Version: ss.Version, + }) + dec := NewDecoder(client, logger, &serde) + + bytes, err := enc.Encode(ctx, have) + is.NoErr(err) + + got, err := dec.Decode(ctx, bytes) + is.NoErr(err) + + is.Equal(want, got) +} diff --git a/pkg/processor/schemaregistry/schema.go b/pkg/processor/schemaregistry/schema.go new file mode 100644 index 000000000..69b276d86 --- /dev/null +++ b/pkg/processor/schemaregistry/schema.go @@ -0,0 +1,45 @@ +// Copyright © 2023 Meroxa, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package schemaregistry + +import ( + "github.com/conduitio/conduit/pkg/processor/schemaregistry/avro" + "github.com/lovromazgon/franz-go/pkg/sr" +) + +type Schema interface { + // Marshal returns the encoded representation of v. + Marshal(v any) ([]byte, error) + // Unmarshal parses encoded data and stores the result in the value pointed + // to by v. If v is nil or not a pointer, Unmarshal returns an error. + Unmarshal(b []byte, v any) error + // String returns the textual representation of the schema. + String() string +} + +type SchemaFactory struct { + // Parse takes the textual representation of the schema and parses it into + // a Schema. + Parse func(string) (Schema, error) + // SchemaForType returns a Schema that matches the structure of v. + SchemaForType func(v any) (Schema, error) +} + +var DefaultSchemaFactories = map[sr.SchemaType]SchemaFactory{ + avro.Type: { + Parse: func(s string) (Schema, error) { return avro.Parse(s) }, + SchemaForType: func(v any) (Schema, error) { return avro.SchemaForType(v) }, + }, +}