Skip to content

Commit

Permalink
feat(pubsub): add support for schema revisions (#7295)
Browse files Browse the repository at this point in the history
* feat(pubsub): add support for schema revisions

* update comments

* make ListSchemaRevisions in fake more efficient

* fix failing tests related to revisionID being populated

* fix conversion of SchemaSettings to proto

* support GetSchema with revision id

* support editing schemas in topic update

* add tests for updating topic with schema
  • Loading branch information
hongalex authored Jan 26, 2023
1 parent d1db4b2 commit 369b16f
Show file tree
Hide file tree
Showing 6 changed files with 413 additions and 45 deletions.
49 changes: 49 additions & 0 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"cloud.google.com/go/kms/apiv1/kmspb"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
testutil2 "cloud.google.com/go/pubsub/internal/testutil"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
gax "github.com/googleapis/gax-go/v2"
"golang.org/x/oauth2/google"
Expand Down Expand Up @@ -1956,6 +1957,8 @@ func TestIntegration_TopicRetention(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer topic.Delete(ctx)
defer topic.Stop()

cfg, err := topic.Config(ctx)
if err != nil {
Expand Down Expand Up @@ -2011,3 +2014,49 @@ func TestExactlyOnceDelivery_PublishReceive(t *testing.T) {
// Tests for large messages (larger than the 4MB gRPC limit).
testPublishAndReceive(t, client, 0, false, true, 1, 5*1024*1024)
}

func TestIntegration_TopicUpdateSchema(t *testing.T) {
ctx := context.Background()
// TODO(hongalex): update these staging endpoints after schema evolution is GA.
c := integrationTestClient(ctx, t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443"))
defer c.Close()

sc := integrationTestSchemaClient(ctx, t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443"))
defer sc.Close()

schemaContent, err := ioutil.ReadFile("testdata/schema/us-states.avsc")
if err != nil {
t.Fatal(err)
}

schemaID := schemaIDs.New()
schemaCfg, err := sc.CreateSchema(ctx, schemaID, SchemaConfig{
Type: SchemaAvro,
Definition: string(schemaContent),
})
if err != nil {
t.Fatal(err)
}
defer sc.DeleteSchema(ctx, schemaID)

topic, err := c.CreateTopic(ctx, topicIDs.New())
if err != nil {
t.Fatal(err)
}
defer topic.Delete(ctx)
defer topic.Stop()

schema := &SchemaSettings{
Schema: schemaCfg.Name,
Encoding: EncodingJSON,
}
cfg, err := topic.Update(ctx, TopicConfigToUpdate{
SchemaSettings: schema,
})
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(cfg.SchemaSettings, schema); diff != "" {
t.Fatalf("schema settings for update -want, +got: %v", diff)
}
}
147 changes: 133 additions & 14 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"context"
"fmt"
"io"
"math/rand"
"path"
"sort"
"strings"
Expand Down Expand Up @@ -90,9 +91,9 @@ type Server struct {
// GServer is the underlying service implementor. It is not intended to be used
// directly.
type GServer struct {
pb.PublisherServer
pb.SubscriberServer
pb.SchemaServiceServer
pb.UnimplementedPublisherServer
pb.UnimplementedSubscriberServer
pb.UnimplementedSchemaServiceServer

mu sync.Mutex
topics map[string]*topic
Expand All @@ -104,7 +105,9 @@ type GServer struct {
streamTimeout time.Duration
timeNowFunc func() time.Time
reactorOptions ReactorOptions
schemas map[string]*pb.Schema
// schemas is a map of schemaIDs to a slice of schema revisions.
// the last element in the slice is the most recent schema.
schemas map[string][]*pb.Schema

// PublishResponses is a channel of responses to use for Publish.
publishResponses chan *publishResponse
Expand Down Expand Up @@ -140,7 +143,7 @@ func NewServerWithPort(port int, opts ...ServerReactorOption) *Server {
reactorOptions: reactorOptions,
publishResponses: make(chan *publishResponse, 100),
autoPublishResponse: true,
schemas: map[string]*pb.Schema{},
schemas: map[string][]*pb.Schema{},
},
}
pb.RegisterPublisherServer(srv.Gsrv, &s.GServer)
Expand Down Expand Up @@ -365,6 +368,8 @@ func (s *GServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*p
return nil, err
}
t.proto.MessageRetentionDuration = req.Topic.MessageRetentionDuration
case "schema_settings":
t.proto.SchemaSettings = req.Topic.SchemaSettings
default:
return nil, status.Errorf(codes.InvalidArgument, "unknown field name %q", path)
}
Expand Down Expand Up @@ -1381,6 +1386,16 @@ func WithErrorInjection(funcName string, code codes.Code, msg string) ServerReac
}
}

const letters = "abcdef1234567890"

func genRevID() string {
id := make([]byte, 8)
for i := range id {
id[i] = letters[rand.Intn(len(letters))]
}
return string(id)
}

func (s *GServer) CreateSchema(_ context.Context, req *pb.CreateSchemaRequest) (*pb.Schema, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -1391,29 +1406,52 @@ func (s *GServer) CreateSchema(_ context.Context, req *pb.CreateSchemaRequest) (

name := fmt.Sprintf("%s/schemas/%s", req.Parent, req.SchemaId)
sc := &pb.Schema{
Name: name,
Type: req.Schema.Type,
Definition: req.Schema.Definition,
Name: name,
Type: req.Schema.Type,
Definition: req.Schema.Definition,
RevisionId: genRevID(),
RevisionCreateTime: timestamppb.Now(),
}
s.schemas[name] = sc
s.schemas[name] = append(s.schemas[name], sc)

return sc, nil
}

func (s *GServer) GetSchema(_ context.Context, req *pb.GetSchemaRequest) (*pb.Schema, error) {

s.mu.Lock()
defer s.mu.Unlock()

if handled, ret, err := s.runReactor(req, "GetSchema", &pb.Schema{}); handled || err != nil {
return ret.(*pb.Schema), err
}

sc, ok := s.schemas[req.Name]
ss := strings.Split(req.Name, "@")
var schemaName, revisionID string
if len := len(ss); len == 1 {
schemaName = ss[0]
} else if len == 2 {
schemaName = ss[0]
revisionID = ss[1]
} else {
return nil, status.Errorf(codes.InvalidArgument, "schema(%q) name parse error", req.Name)
}

schemaRev, ok := s.schemas[schemaName]
if !ok {
return nil, status.Errorf(codes.NotFound, "schema(%q) not found", req.Name)
}
return sc, nil

if revisionID == "" {
return schemaRev[len(schemaRev)-1], nil
}

for _, sc := range schemaRev {
if sc.RevisionId == revisionID {
return sc, nil
}
}

return nil, status.Errorf(codes.NotFound, "schema %q not found", req.Name)
}

func (s *GServer) ListSchemas(_ context.Context, req *pb.ListSchemasRequest) (*pb.ListSchemasResponse, error) {
Expand All @@ -1425,13 +1463,93 @@ func (s *GServer) ListSchemas(_ context.Context, req *pb.ListSchemasRequest) (*p
}
ss := make([]*pb.Schema, 0)
for _, sc := range s.schemas {
ss = append(ss, sc)
ss = append(ss, sc[len(sc)-1])
}
return &pb.ListSchemasResponse{
Schemas: ss,
}, nil
}

func (s *GServer) ListSchemaRevisions(_ context.Context, req *pb.ListSchemaRevisionsRequest) (*pb.ListSchemaRevisionsResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()

if handled, ret, err := s.runReactor(req, "ListSchemaRevisions", &pb.ListSchemasResponse{}); handled || err != nil {
return ret.(*pb.ListSchemaRevisionsResponse), err
}
ss := make([]*pb.Schema, 0)
ss = append(ss, s.schemas[req.Name]...)
return &pb.ListSchemaRevisionsResponse{
Schemas: ss,
}, nil
}

func (s *GServer) CommitSchema(_ context.Context, req *pb.CommitSchemaRequest) (*pb.Schema, error) {
s.mu.Lock()
defer s.mu.Unlock()

if handled, ret, err := s.runReactor(req, "CommitSchema", &pb.Schema{}); handled || err != nil {
return ret.(*pb.Schema), err
}

sc := &pb.Schema{
Name: req.Name,
Type: req.Schema.Type,
Definition: req.Schema.Definition,
}
sc.RevisionId = genRevID()
sc.RevisionCreateTime = timestamppb.Now()

s.schemas[req.Name] = append(s.schemas[req.Name], sc)

return sc, nil
}

// RollbackSchema rolls back the current schema to a previous revision by copying and creating a new revision.
func (s *GServer) RollbackSchema(_ context.Context, req *pb.RollbackSchemaRequest) (*pb.Schema, error) {
s.mu.Lock()
defer s.mu.Unlock()

if handled, ret, err := s.runReactor(req, "RollbackSchema", &pb.Schema{}); handled || err != nil {
return ret.(*pb.Schema), err
}

for _, sc := range s.schemas[req.Name] {
if sc.RevisionId == req.RevisionId {
newSchema := *sc
newSchema.RevisionId = genRevID()
newSchema.RevisionCreateTime = timestamppb.Now()
s.schemas[req.Name] = append(s.schemas[req.Name], &newSchema)
return &newSchema, nil
}
}
return nil, status.Errorf(codes.NotFound, "schema %q@%q not found", req.Name, req.RevisionId)
}

func (s *GServer) DeleteSchemaRevision(_ context.Context, req *pb.DeleteSchemaRevisionRequest) (*pb.Schema, error) {
s.mu.Lock()
defer s.mu.Unlock()

if handled, ret, err := s.runReactor(req, "DeleteSchemaRevision", &pb.Schema{}); handled || err != nil {
return ret.(*pb.Schema), err
}

if sc, ok := s.schemas[req.Name]; ok {
if len(sc) == 1 {
return nil, status.Errorf(codes.InvalidArgument, "cannot delete last revision for schema %q@%q", req.Name, req.RevisionId)
}
}

schema := s.schemas[req.Name]
for i, sc := range schema {
if sc.RevisionId == req.RevisionId {
s.schemas[req.Name] = append(schema[:i], schema[i+1:]...)
return schema[len(schema)-1], nil
}
}
return nil, status.Errorf(codes.NotFound, "schema %q@%q not found", req.Name, req.RevisionId)
}

func (s *GServer) DeleteSchema(_ context.Context, req *pb.DeleteSchemaRequest) (*emptypb.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -1480,7 +1598,8 @@ func (s *GServer) ValidateMessage(_ context.Context, req *pb.ValidateMessageRequ
if !ok {
return nil, status.Errorf(codes.NotFound, "schema(%q) not found", valReq.Name)
}
if sc.Definition == "" {
schema := sc[len(sc)-1]
if schema.Definition == "" {
return nil, status.Error(codes.InvalidArgument, "schema definition cannot be empty")
}
}
Expand Down
Loading

0 comments on commit 369b16f

Please sign in to comment.