Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SR: Implement cached schema registry client #1049

Merged
merged 9 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ test:

test-integration:
# run required docker containers, execute integration tests, stop containers after tests
docker compose -f test/docker-compose-postgres.yml up --quiet-pull -d --wait
docker compose -f test/docker-compose-postgres.yml -f test/docker-compose-schemaregistry.yml up --quiet-pull -d --wait
go test $(GOTEST_FLAGS) -race --tags=integration ./...; ret=$$?; \
docker compose -f test/docker-compose-postgres.yml down; \
docker compose -f test/docker-compose-postgres.yml -f test/docker-compose-schemaregistry.yml down; \
exit $$ret

build-server: check-go-version
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ require (
github.com/jackc/pgx/v5 v5.4.0
github.com/jinzhu/copier v0.3.5
github.com/jpillora/backoff v1.0.0
github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230605121418-82e53767f0ac
github.com/matryer/is v1.4.1
github.com/piotrkowalczuk/promgrpc/v4 v4.1.0
github.com/prometheus/client_golang v1.16.0
github.com/prometheus/client_model v0.4.0
github.com/prometheus/common v0.44.0
github.com/rs/zerolog v1.29.1
github.com/twmb/go-cache v1.0.0
go.uber.org/goleak v1.2.1
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
golang.org/x/tools v0.10.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,8 @@ github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/lib/pq v1.10.4 h1:SO9z7FRPzA03QhHKJrH5BXA6HU1rS4V2nIVrrNC1iYk=
github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230605121418-82e53767f0ac h1:f0RCTaThW3/D5xByrGxfvR3o95UZsrkXFVkKSY+s89w=
github.com/lovromazgon/franz-go/pkg/sr v0.0.0-20230605121418-82e53767f0ac/go.mod h1:iz9EnaFViALD6sVqxYHs8BPC0ZEQtfhTpN7SG5b0Nqo=
github.com/matryer/is v1.4.1 h1:55ehd8zaGABKLXQUe2awZ99BD/PTc2ls+KV/dXphgEQ=
github.com/matryer/is v1.4.1/go.mod h1:8I/i5uYgLzgsgEloJE1U6xx5HkBQpAZvepWuujKwMRU=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
Expand Down Expand Up @@ -627,6 +629,8 @@ github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/tetratelabs/wazero v1.2.0 h1:I/8LMf4YkCZ3r2XaL9whhA0VMyAvF6QE+O7rco0DCeQ=
github.com/tetratelabs/wazero v1.2.0/go.mod h1:wYx2gNRg8/WihJfSDxA1TIL8H+GkfLYm+bIfbblu9VQ=
github.com/twmb/go-cache v1.0.0 h1:X7tM+xZHNxYxxLZ5jW6spoDbQRxvZ8rgHI6k9G3tynE=
github.com/twmb/go-cache v1.0.0/go.mod h1:1mAga/hGP9Lhki1NCnW3H8qjnaDrYxmFafW6tnakfAw=
github.com/urfave/cli v1.22.12/go.mod h1:sSBEIC79qR6OvcmsD4U3KABeOTxDqQtdDnaFuUN30b8=
github.com/vbatts/tar-split v0.11.3 h1:hLFqsOLQ1SsppQNTMpkpPXClLDfC2A3Zgy9OUU+RVck=
github.com/vbatts/tar-split v0.11.3/go.mod h1:9QlHN18E+fEH7RdG+QAJJcuya3rqT7eXSTY7wGrAokY=
Expand Down
108 changes: 108 additions & 0 deletions pkg/processor/schemaregistry/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright © 2023 Meroxa, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package schemaregistry

import (
"context"

"github.com/conduitio/conduit/pkg/foundation/cerrors"
"github.com/conduitio/conduit/pkg/foundation/log"
"github.com/conduitio/conduit/pkg/processor/schemaregistry/internal"
"github.com/lovromazgon/franz-go/pkg/sr"
)

// Client is a schema registry client that caches schemas. It is safe for
// concurrent use.
type Client struct {
logger log.CtxLogger
client sr.Client

cache internal.SchemaCache
}

// NewClient creates a new client using the provided logger and schema registry
// client options.
func NewClient(logger log.CtxLogger, opts ...sr.Opt) (*Client, error) {
defaultOpts := []sr.Opt{
sr.UserAgent("conduit"),
sr.URLs(), // disable default URL
}

client, err := sr.NewClient(append(defaultOpts, opts...)...)
if err != nil {
return nil, err
}

return &Client{
logger: logger,
client: *client,
}, nil
}

// CreateSchema checks if the schema is already registered in the cache and
// returns the associated sr.SubjectSchema if it is found. Otherwise, the schema
// is sent to the schema registry and stored in the cache, if the registration
// was successful.
func (c *Client) CreateSchema(ctx context.Context, subject string, schema sr.Schema) (sr.SubjectSchema, error) {
logEvent := c.logger.Trace(ctx).Str("operation", "CreateSchema").Str("subject", subject)
ss, err := c.cache.GetBySubjectText(subject, schema.Schema, func() (sr.SubjectSchema, error) {
logEvent.Msg("schema cache miss")
logEvent = nil // disable output for hit
return c.client.CreateSchema(ctx, subject, schema)
})
if err != nil {
return sr.SubjectSchema{}, cerrors.Errorf("failed to create schema with subject %q: %w", subject, err)
}
logEvent.Msg("schema cache hit")
return ss, nil
}

// SchemaByID checks if the schema is already registered in the cache and
// returns the associated sr.Schema if it is found. Otherwise, the schema is
// retrieved from the schema registry and stored in the cache.
// Note that the returned schema does not contain a subject and version, so the
// cache will not have an effect on methods that return a sr.SubjectSchema.
func (c *Client) SchemaByID(ctx context.Context, id int) (sr.Schema, error) {
logEvent := c.logger.Trace(ctx).Str("operation", "SchemaByID").Int("id", id)
s, err := c.cache.GetByID(id, func() (sr.Schema, error) {
logEvent.Msg("schema cache miss")
logEvent = nil // disable output for hit
return c.client.SchemaByID(ctx, id)
})
if err != nil {
return sr.Schema{}, cerrors.Errorf("failed to get schema with ID %q: %w", id, err)
}
logEvent.Msg("schema cache hit")
return s, nil
}

// SchemaBySubjectVersion checks if the schema is already registered in the
// cache and returns the associated sr.SubjectSchema if it is found. Otherwise,
// the schema is retrieved from the schema registry and stored in the cache.
func (c *Client) SchemaBySubjectVersion(ctx context.Context, subject string, version int) (sr.SubjectSchema, error) {
// TODO handle latest version separately, let caller define timeout after
// which the latest cached version should be downloaded again from upstream
logEvent := c.logger.Trace(ctx).Str("operation", "SchemaBySubjectVersion").Str("subject", subject).Int("version", version)
ss, err := c.cache.GetBySubjectVersion(subject, version, func() (sr.SubjectSchema, error) {
logEvent.Msg("schema cache miss")
logEvent = nil // disable output for hit
return c.client.SchemaByVersion(ctx, subject, version, sr.HideDeleted)
})
if err != nil {
return sr.SubjectSchema{}, cerrors.Errorf("failed to get schema with subject %q and version %q: %w", subject, version, err)
}
logEvent.Msg("schema cache hit")
return ss, nil
}
Loading