Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add support for schema revisions #7295

Merged
merged 11 commits into from
Jan 26, 2023
49 changes: 49 additions & 0 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"cloud.google.com/go/kms/apiv1/kmspb"
pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
testutil2 "cloud.google.com/go/pubsub/internal/testutil"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
gax "github.com/googleapis/gax-go/v2"
"golang.org/x/oauth2/google"
Expand Down Expand Up @@ -1956,6 +1957,8 @@ func TestIntegration_TopicRetention(t *testing.T) {
if err != nil {
t.Fatal(err)
}
defer topic.Delete(ctx)
defer topic.Stop()

cfg, err := topic.Config(ctx)
if err != nil {
Expand Down Expand Up @@ -2011,3 +2014,49 @@ func TestExactlyOnceDelivery_PublishReceive(t *testing.T) {
// Tests for large messages (larger than the 4MB gRPC limit).
testPublishAndReceive(t, client, 0, false, true, 1, 5*1024*1024)
}

func TestIntegration_TopicUpdateSchema(t *testing.T) {
ctx := context.Background()
// TODO(hongalex): update these staging endpoints after schema evolution is GA.
c := integrationTestClient(ctx, t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just not even merge until we can remove this. That should hopefully be next week.

defer c.Close()

sc := integrationTestSchemaClient(ctx, t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443"))
defer sc.Close()

schemaContent, err := ioutil.ReadFile("testdata/schema/us-states.avsc")
if err != nil {
t.Fatal(err)
}

schemaID := schemaIDs.New()
schemaCfg, err := sc.CreateSchema(ctx, schemaID, SchemaConfig{
Type: SchemaAvro,
Definition: string(schemaContent),
})
if err != nil {
t.Fatal(err)
}
defer sc.DeleteSchema(ctx, schemaID)

topic, err := c.CreateTopic(ctx, topicIDs.New())
if err != nil {
t.Fatal(err)
}
defer topic.Delete(ctx)
defer topic.Stop()

schema := &SchemaSettings{
Schema: schemaCfg.Name,
Encoding: EncodingJSON,
}
cfg, err := topic.Update(ctx, TopicConfigToUpdate{
SchemaSettings: schema,
})
if err != nil {
t.Fatal(err)
}
if diff := cmp.Diff(cfg.SchemaSettings, schema); diff != "" {
t.Fatalf("schema settings for update -want, +got: %v", diff)
}
}
147 changes: 133 additions & 14 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"context"
"fmt"
"io"
"math/rand"
"path"
"sort"
"strings"
Expand Down Expand Up @@ -90,9 +91,9 @@ type Server struct {
// GServer is the underlying service implementor. It is not intended to be used
// directly.
type GServer struct {
pb.PublisherServer
pb.SubscriberServer
pb.SchemaServiceServer
pb.UnimplementedPublisherServer
pb.UnimplementedSubscriberServer
pb.UnimplementedSchemaServiceServer

mu sync.Mutex
topics map[string]*topic
Expand All @@ -104,7 +105,9 @@ type GServer struct {
streamTimeout time.Duration
timeNowFunc func() time.Time
reactorOptions ReactorOptions
schemas map[string]*pb.Schema
// schemas is a map of schemaIDs to a slice of schema revisions.
// the last element in the slice is the most recent schema.
schemas map[string][]*pb.Schema

// PublishResponses is a channel of responses to use for Publish.
publishResponses chan *publishResponse
Expand Down Expand Up @@ -140,7 +143,7 @@ func NewServerWithPort(port int, opts ...ServerReactorOption) *Server {
reactorOptions: reactorOptions,
publishResponses: make(chan *publishResponse, 100),
autoPublishResponse: true,
schemas: map[string]*pb.Schema{},
schemas: map[string][]*pb.Schema{},
},
}
pb.RegisterPublisherServer(srv.Gsrv, &s.GServer)
Expand Down Expand Up @@ -365,6 +368,8 @@ func (s *GServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*p
return nil, err
}
t.proto.MessageRetentionDuration = req.Topic.MessageRetentionDuration
case "schema_settings":
t.proto.SchemaSettings = req.Topic.SchemaSettings
default:
return nil, status.Errorf(codes.InvalidArgument, "unknown field name %q", path)
}
Expand Down Expand Up @@ -1381,6 +1386,16 @@ func WithErrorInjection(funcName string, code codes.Code, msg string) ServerReac
}
}

const letters = "abcdef1234567890"

func genRevID() string {
id := make([]byte, 8)
for i := range id {
id[i] = letters[rand.Intn(len(letters))]
}
return string(id)
}

func (s *GServer) CreateSchema(_ context.Context, req *pb.CreateSchemaRequest) (*pb.Schema, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -1391,29 +1406,52 @@ func (s *GServer) CreateSchema(_ context.Context, req *pb.CreateSchemaRequest) (

name := fmt.Sprintf("%s/schemas/%s", req.Parent, req.SchemaId)
sc := &pb.Schema{
Name: name,
Type: req.Schema.Type,
Definition: req.Schema.Definition,
Name: name,
Type: req.Schema.Type,
Definition: req.Schema.Definition,
RevisionId: genRevID(),
RevisionCreateTime: timestamppb.Now(),
}
s.schemas[name] = sc
s.schemas[name] = append(s.schemas[name], sc)

return sc, nil
}

func (s *GServer) GetSchema(_ context.Context, req *pb.GetSchemaRequest) (*pb.Schema, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetSchema also allows one to pass in a path with the revision ID, e.g., projects/myproject/schemas/myschema@myrevision

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


s.mu.Lock()
defer s.mu.Unlock()

if handled, ret, err := s.runReactor(req, "GetSchema", &pb.Schema{}); handled || err != nil {
return ret.(*pb.Schema), err
}

sc, ok := s.schemas[req.Name]
ss := strings.Split(req.Name, "@")
var schemaName, revisionID string
if len := len(ss); len == 1 {
schemaName = ss[0]
} else if len == 2 {
schemaName = ss[0]
revisionID = ss[1]
} else {
return nil, status.Errorf(codes.InvalidArgument, "schema(%q) name parse error", req.Name)
}

schemaRev, ok := s.schemas[schemaName]
if !ok {
return nil, status.Errorf(codes.NotFound, "schema(%q) not found", req.Name)
}
return sc, nil

if revisionID == "" {
return schemaRev[len(schemaRev)-1], nil
}

for _, sc := range schemaRev {
if sc.RevisionId == revisionID {
return sc, nil
}
}

return nil, status.Errorf(codes.NotFound, "schema %q not found", req.Name)
}

func (s *GServer) ListSchemas(_ context.Context, req *pb.ListSchemasRequest) (*pb.ListSchemasResponse, error) {
Expand All @@ -1425,13 +1463,93 @@ func (s *GServer) ListSchemas(_ context.Context, req *pb.ListSchemasRequest) (*p
}
ss := make([]*pb.Schema, 0)
for _, sc := range s.schemas {
ss = append(ss, sc)
ss = append(ss, sc[len(sc)-1])
}
return &pb.ListSchemasResponse{
Schemas: ss,
}, nil
}

func (s *GServer) ListSchemaRevisions(_ context.Context, req *pb.ListSchemaRevisionsRequest) (*pb.ListSchemaRevisionsResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()

if handled, ret, err := s.runReactor(req, "ListSchemaRevisions", &pb.ListSchemasResponse{}); handled || err != nil {
return ret.(*pb.ListSchemaRevisionsResponse), err
}
ss := make([]*pb.Schema, 0)
ss = append(ss, s.schemas[req.Name]...)
return &pb.ListSchemaRevisionsResponse{
Schemas: ss,
}, nil
}

func (s *GServer) CommitSchema(_ context.Context, req *pb.CommitSchemaRequest) (*pb.Schema, error) {
s.mu.Lock()
defer s.mu.Unlock()

if handled, ret, err := s.runReactor(req, "CommitSchema", &pb.Schema{}); handled || err != nil {
return ret.(*pb.Schema), err
}

sc := &pb.Schema{
Name: req.Name,
Type: req.Schema.Type,
Definition: req.Schema.Definition,
}
sc.RevisionId = genRevID()
sc.RevisionCreateTime = timestamppb.Now()

s.schemas[req.Name] = append(s.schemas[req.Name], sc)

return sc, nil
}

// RollbackSchema rolls back the current schema to a previous revision by copying and creating a new revision.
func (s *GServer) RollbackSchema(_ context.Context, req *pb.RollbackSchemaRequest) (*pb.Schema, error) {
s.mu.Lock()
defer s.mu.Unlock()

if handled, ret, err := s.runReactor(req, "RollbackSchema", &pb.Schema{}); handled || err != nil {
return ret.(*pb.Schema), err
}

for _, sc := range s.schemas[req.Name] {
if sc.RevisionId == req.RevisionId {
newSchema := *sc
newSchema.RevisionId = genRevID()
newSchema.RevisionCreateTime = timestamppb.Now()
s.schemas[req.Name] = append(s.schemas[req.Name], &newSchema)
return &newSchema, nil
}
}
return nil, status.Errorf(codes.NotFound, "schema %q@%q not found", req.Name, req.RevisionId)
}

func (s *GServer) DeleteSchemaRevision(_ context.Context, req *pb.DeleteSchemaRevisionRequest) (*pb.Schema, error) {
s.mu.Lock()
defer s.mu.Unlock()

if handled, ret, err := s.runReactor(req, "DeleteSchemaRevision", &pb.Schema{}); handled || err != nil {
return ret.(*pb.Schema), err
}

if sc, ok := s.schemas[req.Name]; ok {
if len(sc) == 1 {
return nil, status.Errorf(codes.InvalidArgument, "cannot delete last revision for schema %q@%q", req.Name, req.RevisionId)
}
}

schema := s.schemas[req.Name]
for i, sc := range schema {
if sc.RevisionId == req.RevisionId {
s.schemas[req.Name] = append(schema[:i], schema[i+1:]...)
return schema[len(schema)-1], nil
}
}
return nil, status.Errorf(codes.NotFound, "schema %q@%q not found", req.Name, req.RevisionId)
}

func (s *GServer) DeleteSchema(_ context.Context, req *pb.DeleteSchemaRequest) (*emptypb.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -1480,7 +1598,8 @@ func (s *GServer) ValidateMessage(_ context.Context, req *pb.ValidateMessageRequ
if !ok {
return nil, status.Errorf(codes.NotFound, "schema(%q) not found", valReq.Name)
}
if sc.Definition == "" {
schema := sc[len(sc)-1]
if schema.Definition == "" {
return nil, status.Error(codes.InvalidArgument, "schema definition cannot be empty")
}
}
Expand Down
Loading