From 369b16f9525f9ac9a0811c66ce61eda9f6c566e4 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Thu, 26 Jan 2023 14:03:32 -0800 Subject: [PATCH] feat(pubsub): add support for schema revisions (#7295) * feat(pubsub): add support for schema revisions * update comments * make ListSchemaRevisions in fake more efficient * fix failing tests related to revisionID being populated * fix conversion of SchemaSettings to proto * support GetSchema with revision id * support editing schemas in topic update * add tests for updating topic with schema --- pubsub/integration_test.go | 49 +++++++++++++ pubsub/pstest/fake.go | 147 +++++++++++++++++++++++++++++++++---- pubsub/schema.go | 122 ++++++++++++++++++++++++------ pubsub/schema_test.go | 98 +++++++++++++++++++++++-- pubsub/topic.go | 10 ++- pubsub/topic_test.go | 32 ++++++++ 6 files changed, 413 insertions(+), 45 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 11b8963648a0..854a014e4917 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -36,6 +36,7 @@ import ( "cloud.google.com/go/kms/apiv1/kmspb" pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" testutil2 "cloud.google.com/go/pubsub/internal/testutil" + "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" gax "github.com/googleapis/gax-go/v2" "golang.org/x/oauth2/google" @@ -1956,6 +1957,8 @@ func TestIntegration_TopicRetention(t *testing.T) { if err != nil { t.Fatal(err) } + defer topic.Delete(ctx) + defer topic.Stop() cfg, err := topic.Config(ctx) if err != nil { @@ -2011,3 +2014,49 @@ func TestExactlyOnceDelivery_PublishReceive(t *testing.T) { // Tests for large messages (larger than the 4MB gRPC limit). testPublishAndReceive(t, client, 0, false, true, 1, 5*1024*1024) } + +func TestIntegration_TopicUpdateSchema(t *testing.T) { + ctx := context.Background() + // TODO(hongalex): update these staging endpoints after schema evolution is GA. + c := integrationTestClient(ctx, t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443")) + defer c.Close() + + sc := integrationTestSchemaClient(ctx, t, option.WithEndpoint("staging-pubsub.sandbox.googleapis.com:443")) + defer sc.Close() + + schemaContent, err := ioutil.ReadFile("testdata/schema/us-states.avsc") + if err != nil { + t.Fatal(err) + } + + schemaID := schemaIDs.New() + schemaCfg, err := sc.CreateSchema(ctx, schemaID, SchemaConfig{ + Type: SchemaAvro, + Definition: string(schemaContent), + }) + if err != nil { + t.Fatal(err) + } + defer sc.DeleteSchema(ctx, schemaID) + + topic, err := c.CreateTopic(ctx, topicIDs.New()) + if err != nil { + t.Fatal(err) + } + defer topic.Delete(ctx) + defer topic.Stop() + + schema := &SchemaSettings{ + Schema: schemaCfg.Name, + Encoding: EncodingJSON, + } + cfg, err := topic.Update(ctx, TopicConfigToUpdate{ + SchemaSettings: schema, + }) + if err != nil { + t.Fatal(err) + } + if diff := cmp.Diff(cfg.SchemaSettings, schema); diff != "" { + t.Fatalf("schema settings for update -want, +got: %v", diff) + } +} diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index c767eb4db20b..d407675aa26d 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -26,6 +26,7 @@ import ( "context" "fmt" "io" + "math/rand" "path" "sort" "strings" @@ -90,9 +91,9 @@ type Server struct { // GServer is the underlying service implementor. It is not intended to be used // directly. type GServer struct { - pb.PublisherServer - pb.SubscriberServer - pb.SchemaServiceServer + pb.UnimplementedPublisherServer + pb.UnimplementedSubscriberServer + pb.UnimplementedSchemaServiceServer mu sync.Mutex topics map[string]*topic @@ -104,7 +105,9 @@ type GServer struct { streamTimeout time.Duration timeNowFunc func() time.Time reactorOptions ReactorOptions - schemas map[string]*pb.Schema + // schemas is a map of schemaIDs to a slice of schema revisions. + // the last element in the slice is the most recent schema. + schemas map[string][]*pb.Schema // PublishResponses is a channel of responses to use for Publish. publishResponses chan *publishResponse @@ -140,7 +143,7 @@ func NewServerWithPort(port int, opts ...ServerReactorOption) *Server { reactorOptions: reactorOptions, publishResponses: make(chan *publishResponse, 100), autoPublishResponse: true, - schemas: map[string]*pb.Schema{}, + schemas: map[string][]*pb.Schema{}, }, } pb.RegisterPublisherServer(srv.Gsrv, &s.GServer) @@ -365,6 +368,8 @@ func (s *GServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*p return nil, err } t.proto.MessageRetentionDuration = req.Topic.MessageRetentionDuration + case "schema_settings": + t.proto.SchemaSettings = req.Topic.SchemaSettings default: return nil, status.Errorf(codes.InvalidArgument, "unknown field name %q", path) } @@ -1381,6 +1386,16 @@ func WithErrorInjection(funcName string, code codes.Code, msg string) ServerReac } } +const letters = "abcdef1234567890" + +func genRevID() string { + id := make([]byte, 8) + for i := range id { + id[i] = letters[rand.Intn(len(letters))] + } + return string(id) +} + func (s *GServer) CreateSchema(_ context.Context, req *pb.CreateSchemaRequest) (*pb.Schema, error) { s.mu.Lock() defer s.mu.Unlock() @@ -1391,17 +1406,18 @@ func (s *GServer) CreateSchema(_ context.Context, req *pb.CreateSchemaRequest) ( name := fmt.Sprintf("%s/schemas/%s", req.Parent, req.SchemaId) sc := &pb.Schema{ - Name: name, - Type: req.Schema.Type, - Definition: req.Schema.Definition, + Name: name, + Type: req.Schema.Type, + Definition: req.Schema.Definition, + RevisionId: genRevID(), + RevisionCreateTime: timestamppb.Now(), } - s.schemas[name] = sc + s.schemas[name] = append(s.schemas[name], sc) return sc, nil } func (s *GServer) GetSchema(_ context.Context, req *pb.GetSchemaRequest) (*pb.Schema, error) { - s.mu.Lock() defer s.mu.Unlock() @@ -1409,11 +1425,33 @@ func (s *GServer) GetSchema(_ context.Context, req *pb.GetSchemaRequest) (*pb.Sc return ret.(*pb.Schema), err } - sc, ok := s.schemas[req.Name] + ss := strings.Split(req.Name, "@") + var schemaName, revisionID string + if len := len(ss); len == 1 { + schemaName = ss[0] + } else if len == 2 { + schemaName = ss[0] + revisionID = ss[1] + } else { + return nil, status.Errorf(codes.InvalidArgument, "schema(%q) name parse error", req.Name) + } + + schemaRev, ok := s.schemas[schemaName] if !ok { return nil, status.Errorf(codes.NotFound, "schema(%q) not found", req.Name) } - return sc, nil + + if revisionID == "" { + return schemaRev[len(schemaRev)-1], nil + } + + for _, sc := range schemaRev { + if sc.RevisionId == revisionID { + return sc, nil + } + } + + return nil, status.Errorf(codes.NotFound, "schema %q not found", req.Name) } func (s *GServer) ListSchemas(_ context.Context, req *pb.ListSchemasRequest) (*pb.ListSchemasResponse, error) { @@ -1425,13 +1463,93 @@ func (s *GServer) ListSchemas(_ context.Context, req *pb.ListSchemasRequest) (*p } ss := make([]*pb.Schema, 0) for _, sc := range s.schemas { - ss = append(ss, sc) + ss = append(ss, sc[len(sc)-1]) } return &pb.ListSchemasResponse{ Schemas: ss, }, nil } +func (s *GServer) ListSchemaRevisions(_ context.Context, req *pb.ListSchemaRevisionsRequest) (*pb.ListSchemaRevisionsResponse, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if handled, ret, err := s.runReactor(req, "ListSchemaRevisions", &pb.ListSchemasResponse{}); handled || err != nil { + return ret.(*pb.ListSchemaRevisionsResponse), err + } + ss := make([]*pb.Schema, 0) + ss = append(ss, s.schemas[req.Name]...) + return &pb.ListSchemaRevisionsResponse{ + Schemas: ss, + }, nil +} + +func (s *GServer) CommitSchema(_ context.Context, req *pb.CommitSchemaRequest) (*pb.Schema, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if handled, ret, err := s.runReactor(req, "CommitSchema", &pb.Schema{}); handled || err != nil { + return ret.(*pb.Schema), err + } + + sc := &pb.Schema{ + Name: req.Name, + Type: req.Schema.Type, + Definition: req.Schema.Definition, + } + sc.RevisionId = genRevID() + sc.RevisionCreateTime = timestamppb.Now() + + s.schemas[req.Name] = append(s.schemas[req.Name], sc) + + return sc, nil +} + +// RollbackSchema rolls back the current schema to a previous revision by copying and creating a new revision. +func (s *GServer) RollbackSchema(_ context.Context, req *pb.RollbackSchemaRequest) (*pb.Schema, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if handled, ret, err := s.runReactor(req, "RollbackSchema", &pb.Schema{}); handled || err != nil { + return ret.(*pb.Schema), err + } + + for _, sc := range s.schemas[req.Name] { + if sc.RevisionId == req.RevisionId { + newSchema := *sc + newSchema.RevisionId = genRevID() + newSchema.RevisionCreateTime = timestamppb.Now() + s.schemas[req.Name] = append(s.schemas[req.Name], &newSchema) + return &newSchema, nil + } + } + return nil, status.Errorf(codes.NotFound, "schema %q@%q not found", req.Name, req.RevisionId) +} + +func (s *GServer) DeleteSchemaRevision(_ context.Context, req *pb.DeleteSchemaRevisionRequest) (*pb.Schema, error) { + s.mu.Lock() + defer s.mu.Unlock() + + if handled, ret, err := s.runReactor(req, "DeleteSchemaRevision", &pb.Schema{}); handled || err != nil { + return ret.(*pb.Schema), err + } + + if sc, ok := s.schemas[req.Name]; ok { + if len(sc) == 1 { + return nil, status.Errorf(codes.InvalidArgument, "cannot delete last revision for schema %q@%q", req.Name, req.RevisionId) + } + } + + schema := s.schemas[req.Name] + for i, sc := range schema { + if sc.RevisionId == req.RevisionId { + s.schemas[req.Name] = append(schema[:i], schema[i+1:]...) + return schema[len(schema)-1], nil + } + } + return nil, status.Errorf(codes.NotFound, "schema %q@%q not found", req.Name, req.RevisionId) +} + func (s *GServer) DeleteSchema(_ context.Context, req *pb.DeleteSchemaRequest) (*emptypb.Empty, error) { s.mu.Lock() defer s.mu.Unlock() @@ -1480,7 +1598,8 @@ func (s *GServer) ValidateMessage(_ context.Context, req *pb.ValidateMessageRequ if !ok { return nil, status.Errorf(codes.NotFound, "schema(%q) not found", valReq.Name) } - if sc.Definition == "" { + schema := sc[len(sc)-1] + if schema.Definition == "" { return nil, status.Error(codes.InvalidArgument, "schema definition cannot be empty") } } diff --git a/pubsub/schema.go b/pubsub/schema.go index 10bea6e74e0b..efb410b66d5d 100644 --- a/pubsub/schema.go +++ b/pubsub/schema.go @@ -17,6 +17,7 @@ package pubsub import ( "context" "fmt" + "time" "google.golang.org/api/option" @@ -56,9 +57,17 @@ type SchemaConfig struct { // the full definition of the schema that is a valid schema definition of // the type specified in `type`. Definition string + + // RevisionID is the revision ID of the schema. + // This field is output only. + RevisionID string + + // RevisionCreateTime is the timestamp that the revision was created. + // This field is output only. + RevisionCreateTime time.Time } -// SchemaType is the possible shcema definition types. +// SchemaType is the possible schema definition types. type SchemaType pb.Schema_Type const ( @@ -86,8 +95,22 @@ const ( // SchemaSettings are settings for validating messages // published against a schema. type SchemaSettings struct { - Schema string + // The name of the schema that messages published should be + // validated against. Format is `projects/{project}/schemas/{schema}` + Schema string + + // The encoding of messages validated against the schema. Encoding SchemaEncoding + + // The minimum (inclusive) revision allowed for validating messages. If empty + // or not present, allow any revision to be validated against LastRevisionID or + // any revision created before. + FirstRevisionID string + + // The maximum (inclusive) revision allowed for validating messages. If empty + // or not present, allow any revision to be validated against FirstRevisionID + // or any revision created after. + LastRevisionID string } func schemaSettingsToProto(schema *SchemaSettings) *pb.SchemaSettings { @@ -95,8 +118,10 @@ func schemaSettingsToProto(schema *SchemaSettings) *pb.SchemaSettings { return nil } return &pb.SchemaSettings{ - Schema: schema.Schema, - Encoding: pb.Encoding(schema.Encoding), + Schema: schema.Schema, + Encoding: pb.Encoding(schema.Encoding), + FirstRevisionId: schema.FirstRevisionID, + LastRevisionId: schema.LastRevisionID, } } @@ -105,8 +130,10 @@ func protoToSchemaSettings(pbs *pb.SchemaSettings) *SchemaSettings { return nil } return &SchemaSettings{ - Schema: pbs.Schema, - Encoding: SchemaEncoding(pbs.Encoding), + Schema: pbs.Schema, + Encoding: SchemaEncoding(pbs.Encoding), + FirstRevisionID: pbs.FirstRevisionId, + LastRevisionID: pbs.LastRevisionId, } } @@ -134,9 +161,11 @@ func (s *SchemaConfig) toProto() *pb.Schema { func protoToSchemaConfig(pbs *pb.Schema) *SchemaConfig { return &SchemaConfig{ - Name: pbs.Name, - Type: SchemaType(pbs.Type), - Definition: pbs.Definition, + Name: pbs.Name, + Type: SchemaType(pbs.Type), + Definition: pbs.Definition, + RevisionID: pbs.RevisionId, + RevisionCreateTime: pbs.RevisionCreateTime.AsTime(), } } @@ -197,10 +226,59 @@ func (s *SchemaIterator) Next() (*SchemaConfig, error) { return protoToSchemaConfig(pbs), nil } +// ListSchemaRevisions lists all schema revisions for the named schema. +func (c *SchemaClient) ListSchemaRevisions(ctx context.Context, schemaID string, view SchemaView) *SchemaIterator { + return &SchemaIterator{ + it: c.sc.ListSchemaRevisions(ctx, &pb.ListSchemaRevisionsRequest{ + Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID), + View: pb.SchemaView(view), + }), + } +} + +// CommitSchema commits a new schema revision to an existing schema. +func (c *SchemaClient) CommitSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error) { + req := &pb.CommitSchemaRequest{ + Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID), + Schema: s.toProto(), + } + pbs, err := c.sc.CommitSchema(ctx, req) + if err != nil { + return nil, err + } + return protoToSchemaConfig(pbs), nil +} + +// RollbackSchema creates a new schema revision that is a copy of the provided revision. +func (c *SchemaClient) RollbackSchema(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error) { + req := &pb.RollbackSchemaRequest{ + Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID), + RevisionId: revisionID, + } + pbs, err := c.sc.RollbackSchema(ctx, req) + if err != nil { + return nil, err + } + return protoToSchemaConfig(pbs), nil +} + +// DeleteSchemaRevision deletes a specific schema revision. +func (c *SchemaClient) DeleteSchemaRevision(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error) { + schemaPath := fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID) + schema, err := c.sc.DeleteSchemaRevision(ctx, &pb.DeleteSchemaRevisionRequest{ + Name: schemaPath, + RevisionId: revisionID, + }) + if err != nil { + return nil, err + } + return protoToSchemaConfig(schema), nil +} + // DeleteSchema deletes an existing schema given a schema ID. -func (s *SchemaClient) DeleteSchema(ctx context.Context, schemaID string) error { - schemaPath := fmt.Sprintf("projects/%s/schemas/%s", s.projectID, schemaID) - return s.sc.DeleteSchema(ctx, &pb.DeleteSchemaRequest{ +func (c *SchemaClient) DeleteSchema(ctx context.Context, schemaID string) error { + schemaPath := fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID) + return c.sc.DeleteSchema(ctx, &pb.DeleteSchemaRequest{ Name: schemaPath, }) } @@ -210,12 +288,12 @@ func (s *SchemaClient) DeleteSchema(ctx context.Context, schemaID string) error type ValidateSchemaResult struct{} // ValidateSchema validates a schema config and returns an error if invalid. -func (s *SchemaClient) ValidateSchema(ctx context.Context, schema SchemaConfig) (*ValidateSchemaResult, error) { +func (c *SchemaClient) ValidateSchema(ctx context.Context, schema SchemaConfig) (*ValidateSchemaResult, error) { req := &pb.ValidateSchemaRequest{ - Parent: fmt.Sprintf("projects/%s", s.projectID), + Parent: fmt.Sprintf("projects/%s", c.projectID), Schema: schema.toProto(), } - _, err := s.sc.ValidateSchema(ctx, req) + _, err := c.sc.ValidateSchema(ctx, req) if err != nil { return nil, err } @@ -228,16 +306,16 @@ type ValidateMessageResult struct{} // ValidateMessageWithConfig validates a message against an schema specified // by a schema config. -func (s *SchemaClient) ValidateMessageWithConfig(ctx context.Context, msg []byte, encoding SchemaEncoding, config SchemaConfig) (*ValidateMessageResult, error) { +func (c *SchemaClient) ValidateMessageWithConfig(ctx context.Context, msg []byte, encoding SchemaEncoding, config SchemaConfig) (*ValidateMessageResult, error) { req := &pb.ValidateMessageRequest{ - Parent: fmt.Sprintf("projects/%s", s.projectID), + Parent: fmt.Sprintf("projects/%s", c.projectID), SchemaSpec: &pb.ValidateMessageRequest_Schema{ Schema: config.toProto(), }, Message: msg, Encoding: pb.Encoding(encoding), } - _, err := s.sc.ValidateMessage(ctx, req) + _, err := c.sc.ValidateMessage(ctx, req) if err != nil { return nil, err } @@ -246,16 +324,16 @@ func (s *SchemaClient) ValidateMessageWithConfig(ctx context.Context, msg []byte // ValidateMessageWithID validates a message against an schema specified // by the schema ID of an existing schema. -func (s *SchemaClient) ValidateMessageWithID(ctx context.Context, msg []byte, encoding SchemaEncoding, schemaID string) (*ValidateMessageResult, error) { +func (c *SchemaClient) ValidateMessageWithID(ctx context.Context, msg []byte, encoding SchemaEncoding, schemaID string) (*ValidateMessageResult, error) { req := &pb.ValidateMessageRequest{ - Parent: fmt.Sprintf("projects/%s", s.projectID), + Parent: fmt.Sprintf("projects/%s", c.projectID), SchemaSpec: &pb.ValidateMessageRequest_Name{ - Name: fmt.Sprintf("projects/%s/schemas/%s", s.projectID, schemaID), + Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID), }, Message: msg, Encoding: pb.Encoding(encoding), } - _, err := s.sc.ValidateMessage(ctx, req) + _, err := c.sc.ValidateMessage(ctx, req) if err != nil { return nil, err } diff --git a/pubsub/schema_test.go b/pubsub/schema_test.go index bc0b7efba44e..6f306511e194 100644 --- a/pubsub/schema_test.go +++ b/pubsub/schema_test.go @@ -55,13 +55,19 @@ func TestSchemaBasicCreateGetDelete(t *testing.T) { admin, _ := newSchemaFake(t) defer admin.Close() - if gotConfig, err := admin.CreateSchema(ctx, schemaID, schemaConfig); err != nil { - t.Errorf("CreateSchema() got err: %v", err) - } else if diff := cmp.Diff(*gotConfig, schemaConfig); diff != "" { + gotConfig, err := admin.CreateSchema(ctx, schemaID, schemaConfig) + if err != nil { + t.Fatalf("CreateSchema() got err: %v", err) + } + // Don't compare revisionID / create time since that isn't known + // until after it is created. + schemaConfig.RevisionID = gotConfig.RevisionID + schemaConfig.RevisionCreateTime = gotConfig.RevisionCreateTime + if diff := cmp.Diff(*gotConfig, schemaConfig); diff != "" { t.Errorf("CreateSchema() -want, +got: %v", diff) } - gotConfig, err := admin.Schema(ctx, schemaID, SchemaViewFull) + gotConfig, err = admin.Schema(ctx, schemaID, SchemaViewFull) if err != nil { t.Errorf("Schema() got err: %v", err) } @@ -106,10 +112,9 @@ func TestSchemaListSchemas(t *testing.T) { break } if err != nil { - t.Errorf("SchemaIterator.Next() got err: %v", err) - } else { - gotSchemaConfigs = append(gotSchemaConfigs, *schema) + t.Fatalf("SchemaIterator.Next() got err: %v", err) } + gotSchemaConfigs = append(gotSchemaConfigs, *schema) } got := len(gotSchemaConfigs) @@ -119,6 +124,85 @@ func TestSchemaListSchemas(t *testing.T) { } } +func TestSchema_SchemaRevisions(t *testing.T) { + ctx := context.Background() + admin, _ := newSchemaFake(t) + defer admin.Close() + + // Inputs + schemaID := "my-schema" + schemaPath := fmt.Sprintf("projects/my-proj/schemas/%s", schemaID) + schemaConfig := SchemaConfig{ + Name: schemaPath, + Type: SchemaAvro, + Definition: "def1", + } + + gotConfig, err := admin.CreateSchema(ctx, schemaID, schemaConfig) + if err != nil { + t.Fatalf("CreateSchema() got err: %v", err) + } + // Update the original config with populated revision ID/create time from server. + // Testing this isn't important since the values aren't known beforehand. + schemaConfig.RevisionID = gotConfig.RevisionID + schemaConfig.RevisionCreateTime = gotConfig.RevisionCreateTime + if diff := cmp.Diff(*gotConfig, schemaConfig); diff != "" { + t.Fatalf("CreateSchema() -want, +got: %v", diff) + } + + schemaConfig.Definition = "def2" + revConfig, err := admin.CommitSchema(ctx, schemaID, schemaConfig) + if err != nil { + t.Fatalf("CommitSchema() got err: %v", err) + } + schemaConfig.RevisionID = revConfig.RevisionID + schemaConfig.RevisionCreateTime = revConfig.RevisionCreateTime + if diff := cmp.Diff(*revConfig, schemaConfig); diff != "" { + t.Fatalf("CommitSchema() -want, +got: %v", diff) + } + + rbConfig, err := admin.RollbackSchema(ctx, schemaID, gotConfig.RevisionID) + if err != nil { + t.Fatalf("RollbackSchema() got err: %v", err) + } + schemaConfig.RevisionID = rbConfig.RevisionID + schemaConfig.RevisionCreateTime = rbConfig.RevisionCreateTime + schemaConfig.Definition = "def1" + if diff := cmp.Diff(*rbConfig, schemaConfig); diff != "" { + t.Fatalf("RollbackSchema() -want, +got: %v", diff) + } + + if _, err := admin.DeleteSchemaRevision(ctx, schemaID, gotConfig.RevisionID); err != nil { + t.Fatalf("DeleteSchemaRevision() got err: %v", err) + } + + var got []*SchemaConfig + it := admin.ListSchemaRevisions(ctx, schemaID, SchemaViewFull) + for { + sc, err := it.Next() + if err == iterator.Done { + break + } + if err != nil { + t.Fatalf("SchemaIterator.Next() got err: %v", err) + } + got = append(got, sc) + } + if gotLen, wantLen := len(got), 2; gotLen != wantLen { + t.Errorf("lListSchemaRevisions() got %d revisions, want: %d", gotLen, wantLen) + } +} + +func TestSchemaRollbackSchema(t *testing.T) { + admin, _ := newSchemaFake(t) + defer admin.Close() +} + +func TestSchemaDeleteSchemaRevision(t *testing.T) { + admin, _ := newSchemaFake(t) + defer admin.Close() +} + func TestSchemaValidateSchema(t *testing.T) { ctx := context.Background() admin, _ := newSchemaFake(t) diff --git a/pubsub/topic.go b/pubsub/topic.go index 3073bd6498d3..9e6caf02f4e9 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -218,8 +218,7 @@ type TopicConfig struct { // "projects/P/locations/L/keyRings/R/cryptoKeys/K". KMSKeyName string - // Schema defines the schema settings upon topic creation. This cannot - // be modified after a topic has been created. + // Schema defines the schema settings upon topic creation. SchemaSettings *SchemaSettings // RetentionDuration configures the minimum duration to retain a message @@ -292,6 +291,9 @@ type TopicConfigToUpdate struct { // If set to a negative value, this clears RetentionDuration from the topic. // If nil, the retention duration remains unchanged. RetentionDuration optional.Duration + + // Schema defines the schema settings upon topic creation. + SchemaSettings *SchemaSettings } func protoToTopicConfig(pbt *pb.Topic) TopicConfig { @@ -403,6 +405,10 @@ func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest { } paths = append(paths, "message_retention_duration") } + if cfg.SchemaSettings != nil { + pt.SchemaSettings = schemaSettingsToProto(cfg.SchemaSettings) + paths = append(paths, "schema_settings") + } return &pb.UpdateTopicRequest{ Topic: pt, UpdateMask: &fmpb.FieldMask{Paths: paths}, diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go index 84878b33c9b9..92e46400bdd9 100644 --- a/pubsub/topic_test.go +++ b/pubsub/topic_test.go @@ -304,6 +304,38 @@ func TestUpdateTopic_MessageStoragePolicy(t *testing.T) { } } +func TestUpdateTopic_SchemaSettings(t *testing.T) { + ctx := context.Background() + client, srv := newFake(t) + defer client.Close() + defer srv.Close() + + topic := mustCreateTopic(t, client, "T") + config, err := topic.Config(ctx) + if err != nil { + t.Fatal(err) + } + want := TopicConfig{} + opt := cmpopts.IgnoreUnexported(TopicConfig{}) + if !testutil.Equal(config, want, opt) { + t.Errorf("\ngot %+v\nwant %+v", config, want) + } + + // Update schema settings. + settings := &SchemaSettings{ + Schema: "some-schema", + Encoding: EncodingJSON, + FirstRevisionID: "1234", + } + config2, err := topic.Update(ctx, TopicConfigToUpdate{SchemaSettings: settings}) + if err != nil { + t.Fatal(err) + } + if !testutil.Equal(config2.SchemaSettings, settings) { + t.Errorf("\ngot %+v\nwant %+v", config2, settings) + } +} + type alwaysFailPublish struct { pubsubpb.PublisherServer }