Skip to content

Commit

Permalink
implement schema registry encoder and decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
lovromazgon committed Jun 30, 2023
1 parent 9e0fcbd commit 43feb98
Show file tree
Hide file tree
Showing 4 changed files with 397 additions and 0 deletions.
90 changes: 90 additions & 0 deletions pkg/processor/schemaregistry/decoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schemaregistry

import (
"context"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/record"
"github.com/lovromazgon/franz-go/pkg/sr"
)

type Decoder struct {
client *Client
serde *sr.Serde
logger log.CtxLogger
}

func NewDecoder(client *Client, logger log.CtxLogger, serde *sr.Serde) *Decoder {
return &Decoder{
client: client,
serde: serde,
logger: logger.WithComponent("schemaregistry.Decoder"),
}
}

func (d *Decoder) Decode(ctx context.Context, b record.RawData) (record.StructuredData, error) {
var out record.StructuredData
err := d.serde.Decode(b.Raw, &out)
if cerrors.Is(err, sr.ErrNotRegistered) {
err = d.findAndRegisterSchema(ctx, b)
if err != nil {
return nil, err
}
// retry decoding
err = d.serde.Decode(b.Raw, &out)
}
if err != nil {
return nil, cerrors.Errorf("failed to decode raw data: %w", err)
}

return out, nil
}

func (d *Decoder) findAndRegisterSchema(ctx context.Context, b record.RawData) error {
id, _, _ := d.serde.Header().DecodeID(b.Raw) // we know this won't throw an error since Decode didn't return ErrBadHeader
s, err := d.client.SchemaByID(ctx, id)
if err != nil {
return cerrors.Errorf("failed to get schema: %w", err)
}
sf, ok := DefaultSchemaFactories[s.Type]
if !ok {
return cerrors.Errorf("unknown schema type %q (%d)", s.Type.String(), s.Type)
}
schema, err := sf.Parse(s.Schema)
if err != nil {
return cerrors.Errorf("failed to parse schema: %w", err)
}

d.serde.Register(
id,
record.StructuredData{},
sr.EncodeFn(encodeFn(schema, sr.SubjectSchema{ID: id})),
sr.DecodeFn(decodeFn(schema, sr.SubjectSchema{ID: id})),
)
return nil
}

func decodeFn(schema Schema, ss sr.SubjectSchema) func(b []byte, a any) error {
return func(b []byte, a any) error {
err := schema.Unmarshal(b, a)
if err != nil {
return cerrors.Errorf("failed to unmarshal data with schema (ID: %v, subject: %v, version: %v): %w", ss.ID, ss.Subject, ss.Version, err)
}
return nil
}
}
133 changes: 133 additions & 0 deletions pkg/processor/schemaregistry/encoder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schemaregistry

import (
"context"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/record"
"github.com/lovromazgon/franz-go/pkg/sr"
)

type Encoder struct {
client *Client
serde *sr.Serde
logger log.CtxLogger

SchemaStrategy
}

type SchemaStrategy interface {
GetSchema(context.Context, *Client, log.CtxLogger, record.StructuredData) (Schema, sr.SubjectSchema, error)
}

func NewEncoder(client *Client, logger log.CtxLogger, serde *sr.Serde, strategy SchemaStrategy) *Encoder {
return &Encoder{
client: client,
serde: serde,
logger: logger.WithComponent("schemaregistry.Encoder"),
SchemaStrategy: strategy,
}
}

func (e *Encoder) Encode(ctx context.Context, sd record.StructuredData) (record.RawData, error) {
s, ss, err := e.GetSchema(ctx, e.client, e.logger, sd)
if err != nil {
return record.RawData{}, cerrors.Errorf("failed to get schema: %w", err)
}

b, err := e.serde.Encode(sd, sr.ID(ss.ID))
if cerrors.Is(err, sr.ErrNotRegistered) {
// TODO note that we need to register specific indexes when adding support for protobuf
e.serde.Register(
ss.ID,
record.StructuredData{},
sr.EncodeFn(encodeFn(s, ss)),
sr.DecodeFn(decodeFn(s, ss)),
)

// try to encode again
b, err = e.serde.Encode(sd, sr.ID(ss.ID))
}
if err != nil {
return record.RawData{}, cerrors.Errorf("failed to encode data: %w", err)
}
return record.RawData{Raw: b}, nil
}

type ExtractAndUploadSchemaStrategy struct {
Type sr.SchemaType
Subject string
}

func (str ExtractAndUploadSchemaStrategy) GetSchema(ctx context.Context, client *Client, _ log.CtxLogger, sd record.StructuredData) (Schema, sr.SubjectSchema, error) {
sf, ok := DefaultSchemaFactories[str.Type]
if !ok {
return nil, sr.SubjectSchema{}, cerrors.Errorf("unknown schema type %q (%d)", str.Type.String(), str.Type)
}

s, err := sf.SchemaForType(sd)
if err != nil {
return nil, sr.SubjectSchema{}, cerrors.Errorf("could not extract avro schema: %w", err)
}

ss, err := client.CreateSchema(ctx, str.Subject, sr.Schema{
Schema: s.String(),
Type: str.Type,
References: nil,
})
if err != nil {
return nil, sr.SubjectSchema{}, cerrors.Errorf("could not create schema: %w", err)
}

return s, ss, nil
}

type DownloadSchemaStrategy struct {
Subject string
// TODO add support for specifying "latest"
Version int
}

func (str DownloadSchemaStrategy) GetSchema(ctx context.Context, client *Client, _ log.CtxLogger, _ record.StructuredData) (Schema, sr.SubjectSchema, error) {
// fetch schema from registry
ss, err := client.SchemaBySubjectVersion(ctx, str.Subject, str.Version)
if err != nil {
return nil, sr.SubjectSchema{}, cerrors.Errorf("could not fetch schema with subject %q and version %q: %w", str.Subject, str.Version, err)
}

sf, ok := DefaultSchemaFactories[ss.Type]
if !ok {
return nil, sr.SubjectSchema{}, cerrors.Errorf("unknown schema type %q (%d)", ss.Type.String(), ss.Type)
}

s, err := sf.Parse(ss.Schema.Schema)
if err != nil {
return nil, sr.SubjectSchema{}, err
}
return s, ss, nil
}

func encodeFn(schema Schema, ss sr.SubjectSchema) func(v any) ([]byte, error) {
return func(v any) ([]byte, error) {
b, err := schema.Marshal(v)
if err != nil {
return nil, cerrors.Errorf("failed to marshal data with schema (ID: %v, subject: %v, version: %v): %w", ss.ID, ss.Subject, ss.Version, err)
}
return b, nil
}
}
129 changes: 129 additions & 0 deletions pkg/processor/schemaregistry/encoder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schemaregistry

import (
"context"
"testing"

"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/record"
"github.com/lovromazgon/franz-go/pkg/sr"
"github.com/matryer/is"
)

func TestEncodeDecode_ExtractAndUploadSchemaStrategy(t *testing.T) {
is := is.New(t)
ctx := context.Background()
logger := log.Nop()

var serde sr.Serde
client, err := NewClient(logger, sr.URLs(testSchemaRegistryURL(t)))
is.NoErr(err)

have := record.StructuredData{
"myString": "bar",
"myInt": 1,
"myFloat": 2.3,
"myMap": map[string]any{
"foo": true,
"bar": 2.2,
},
"myStruct": record.StructuredData{
"foo": 1,
"bar": false,
},
"mySlice": []int{1, 2, 3},
}
want := record.StructuredData{
"myString": "bar",
"myInt": 1,
"myFloat": 2.3,
"myMap": map[string]any{
"foo": true,
"bar": 2.2,
},
"myStruct": map[string]any{ // records are unmarshaled into a map
"foo": 1,
"bar": false,
},
"mySlice": []any{1, 2, 3}, // slice without type
}

for schemaType := range DefaultSchemaFactories {
t.Run(schemaType.String(), func(t *testing.T) {
is := is.New(t)
enc := NewEncoder(client, logger, &serde, ExtractAndUploadSchemaStrategy{
Type: schemaType,
Subject: "test1" + schemaType.String(),
})
dec := NewDecoder(client, logger, &serde)

bytes, err := enc.Encode(ctx, have)
is.NoErr(err)

got, err := dec.Decode(ctx, bytes)
is.NoErr(err)

is.Equal(want, got)
})
}
}

func TestEncodeDecode_DownloadStrategy_Avro(t *testing.T) {
is := is.New(t)
ctx := context.Background()
logger := log.Nop()

var serde sr.Serde
client, err := NewClient(logger, sr.URLs(testSchemaRegistryURL(t)))
is.NoErr(err)

have := record.StructuredData{
"myString": "bar",
"myInt": 1,
}
want := record.StructuredData{
"myString": "bar",
"myInt": 1,
}
ss, err := client.CreateSchema(ctx, "test2", sr.Schema{
Type: sr.TypeAvro,
Schema: `
{
"type":"record",
"name":"record",
"fields":[
{"name":"myString","type":"string"},
{"name":"myInt","type":"int"}
]
}`,
})
is.NoErr(err)

enc := NewEncoder(client, logger, &serde, DownloadSchemaStrategy{
Subject: ss.Subject,
Version: ss.Version,
})
dec := NewDecoder(client, logger, &serde)

bytes, err := enc.Encode(ctx, have)
is.NoErr(err)

got, err := dec.Decode(ctx, bytes)
is.NoErr(err)

is.Equal(want, got)
}
Loading

0 comments on commit 43feb98

Please sign in to comment.