Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): add support for schema revisions #7295

Merged
merged 11 commits into from
Jan 26, 2023
117 changes: 106 additions & 11 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"context"
"fmt"
"io"
"math/rand"
"path"
"sort"
"strings"
Expand Down Expand Up @@ -104,7 +105,9 @@ type GServer struct {
streamTimeout time.Duration
timeNowFunc func() time.Time
reactorOptions ReactorOptions
schemas map[string]*pb.Schema
// schemas is a map of schemaIDs to a slice of schema revisions.
// the last element in the slice is the most recent schema.
schemas map[string][]*pb.Schema

// PublishResponses is a channel of responses to use for Publish.
publishResponses chan *publishResponse
Expand Down Expand Up @@ -140,7 +143,7 @@ func NewServerWithPort(port int, opts ...ServerReactorOption) *Server {
reactorOptions: reactorOptions,
publishResponses: make(chan *publishResponse, 100),
autoPublishResponse: true,
schemas: map[string]*pb.Schema{},
schemas: map[string][]*pb.Schema{},
},
}
pb.RegisterPublisherServer(srv.Gsrv, &s.GServer)
Expand Down Expand Up @@ -1381,6 +1384,16 @@ func WithErrorInjection(funcName string, code codes.Code, msg string) ServerReac
}
}

const letters = "abcdef1234567890"

func genRevID() string {
id := make([]byte, 8)
for i := range id {
id[i] = letters[rand.Intn(len(letters))]
}
return string(id)
}

func (s *GServer) CreateSchema(_ context.Context, req *pb.CreateSchemaRequest) (*pb.Schema, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand All @@ -1391,29 +1404,30 @@ func (s *GServer) CreateSchema(_ context.Context, req *pb.CreateSchemaRequest) (

name := fmt.Sprintf("%s/schemas/%s", req.Parent, req.SchemaId)
sc := &pb.Schema{
Name: name,
Type: req.Schema.Type,
Definition: req.Schema.Definition,
Name: name,
Type: req.Schema.Type,
Definition: req.Schema.Definition,
RevisionId: genRevID(),
RevisionCreateTime: timestamppb.Now(),
}
s.schemas[name] = sc
s.schemas[name] = append(s.schemas[name], sc)

return sc, nil
}

func (s *GServer) GetSchema(_ context.Context, req *pb.GetSchemaRequest) (*pb.Schema, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetSchema also allows one to pass in a path with the revision ID, e.g., projects/myproject/schemas/myschema@myrevision

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


s.mu.Lock()
defer s.mu.Unlock()

if handled, ret, err := s.runReactor(req, "GetSchema", &pb.Schema{}); handled || err != nil {
return ret.(*pb.Schema), err
}

sc, ok := s.schemas[req.Name]
schemas, ok := s.schemas[req.Name]
if !ok {
return nil, status.Errorf(codes.NotFound, "schema(%q) not found", req.Name)
}
return sc, nil
return schemas[len(schemas)-1], nil
}

func (s *GServer) ListSchemas(_ context.Context, req *pb.ListSchemasRequest) (*pb.ListSchemasResponse, error) {
Expand All @@ -1425,13 +1439,93 @@ func (s *GServer) ListSchemas(_ context.Context, req *pb.ListSchemasRequest) (*p
}
ss := make([]*pb.Schema, 0)
for _, sc := range s.schemas {
ss = append(ss, sc)
ss = append(ss, sc[len(sc)-1])
}
return &pb.ListSchemasResponse{
Schemas: ss,
}, nil
}

func (s *GServer) ListSchemaRevisions(_ context.Context, req *pb.ListSchemaRevisionsRequest) (*pb.ListSchemaRevisionsResponse, error) {
s.mu.Lock()
defer s.mu.Unlock()

if handled, ret, err := s.runReactor(req, "ListSchemaRevisions", &pb.ListSchemasResponse{}); handled || err != nil {
return ret.(*pb.ListSchemaRevisionsResponse), err
}
ss := make([]*pb.Schema, 0)
ss = append(ss, s.schemas[req.Name]...)
return &pb.ListSchemaRevisionsResponse{
Schemas: ss,
}, nil
}

func (s *GServer) CommitSchema(_ context.Context, req *pb.CommitSchemaRequest) (*pb.Schema, error) {
s.mu.Lock()
defer s.mu.Unlock()

if handled, ret, err := s.runReactor(req, "CommitSchema", &pb.Schema{}); handled || err != nil {
return ret.(*pb.Schema), err
}

sc := &pb.Schema{
Name: req.Name,
Type: req.Schema.Type,
Definition: req.Schema.Definition,
}
sc.RevisionId = genRevID()
sc.RevisionCreateTime = timestamppb.Now()

s.schemas[req.Name] = append(s.schemas[req.Name], sc)

return sc, nil
}

// RollbackSchema rolls back the current schema to a previous revision by copying and creating a new revision.
func (s *GServer) RollbackSchema(_ context.Context, req *pb.RollbackSchemaRequest) (*pb.Schema, error) {
s.mu.Lock()
defer s.mu.Unlock()

if handled, ret, err := s.runReactor(req, "RollbackSchema", &pb.Schema{}); handled || err != nil {
return ret.(*pb.Schema), err
}

for _, sc := range s.schemas[req.Name] {
if sc.RevisionId == req.RevisionId {
newSchema := *sc
newSchema.RevisionId = genRevID()
newSchema.RevisionCreateTime = timestamppb.Now()
s.schemas[req.Name] = append(s.schemas[req.Name], &newSchema)
return &newSchema, nil
}
}
return nil, status.Errorf(codes.NotFound, "schema %q@%q not found", req.Name, req.RevisionId)
}

func (s *GServer) DeleteSchemaRevision(_ context.Context, req *pb.DeleteSchemaRevisionRequest) (*pb.Schema, error) {
s.mu.Lock()
defer s.mu.Unlock()

if handled, ret, err := s.runReactor(req, "DeleteSchemaRevision", &pb.Schema{}); handled || err != nil {
return ret.(*pb.Schema), err
}

if sc, ok := s.schemas[req.Name]; ok {
if len(sc) == 1 {
return nil, status.Errorf(codes.InvalidArgument, "cannot delete last revision for schema %q@%q", req.Name, req.RevisionId)
}
}

schema := s.schemas[req.Name]
for i, sc := range schema {
if sc.RevisionId == req.RevisionId {
s.schemas[req.Name] = append(schema[:i], schema[i+1:]...)
return schema[len(schema)-1], nil
}
}
return nil, status.Errorf(codes.NotFound, "schema %q@%q not found", req.Name, req.RevisionId)
}

func (s *GServer) DeleteSchema(_ context.Context, req *pb.DeleteSchemaRequest) (*emptypb.Empty, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -1480,7 +1574,8 @@ func (s *GServer) ValidateMessage(_ context.Context, req *pb.ValidateMessageRequ
if !ok {
return nil, status.Errorf(codes.NotFound, "schema(%q) not found", valReq.Name)
}
if sc.Definition == "" {
schema := sc[len(sc)-1]
if schema.Definition == "" {
return nil, status.Error(codes.InvalidArgument, "schema definition cannot be empty")
}
}
Expand Down
122 changes: 100 additions & 22 deletions pubsub/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package pubsub
import (
"context"
"fmt"
"time"

"google.golang.org/api/option"

Expand Down Expand Up @@ -56,9 +57,17 @@ type SchemaConfig struct {
// the full definition of the schema that is a valid schema definition of
// the type specified in `type`.
Definition string

// RevisionID is the revision ID of the schema.
// This field is output only.
RevisionID string

// RevisionCreateTime is the timestamp that the revision was created.
// This field is output only.
RevisionCreateTime time.Time
}

// SchemaType is the possible shcema definition types.
// SchemaType is the possible schema definition types.
type SchemaType pb.Schema_Type

const (
Expand Down Expand Up @@ -86,17 +95,33 @@ const (
// SchemaSettings are settings for validating messages
// published against a schema.
type SchemaSettings struct {
Schema string
// The name of the schema that messages published should be
// validated against. Format is `projects/{project}/schemas/{schema}`
Schema string

// The encoding of messages validated against the schema.
Encoding SchemaEncoding

// The minimum (inclusive) revision allowed for validating messages. If empty
// or not present, allow any revision to be validated against LastRevisionID or
// any revision created before.
FirstRevisionID string

// The maximum (inclusive) revision allowed for validating messages. If empty
// or not present, allow any revision to be validated against FirstRevisionID
// or any revision created after.
LastRevisionID string
}

func schemaSettingsToProto(schema *SchemaSettings) *pb.SchemaSettings {
if schema == nil {
return nil
}
return &pb.SchemaSettings{
Schema: schema.Schema,
Encoding: pb.Encoding(schema.Encoding),
Schema: schema.Schema,
Encoding: pb.Encoding(schema.Encoding),
FirstRevisionId: schema.FirstRevisionID,
LastRevisionId: schema.LastRevisionID,
}
}

Expand All @@ -105,8 +130,10 @@ func protoToSchemaSettings(pbs *pb.SchemaSettings) *SchemaSettings {
return nil
}
return &SchemaSettings{
Schema: pbs.Schema,
Encoding: SchemaEncoding(pbs.Encoding),
Schema: pbs.Schema,
Encoding: SchemaEncoding(pbs.Encoding),
FirstRevisionID: pbs.FirstRevisionId,
LastRevisionID: pbs.LastRevisionId,
}
}

Expand Down Expand Up @@ -134,9 +161,11 @@ func (s *SchemaConfig) toProto() *pb.Schema {

func protoToSchemaConfig(pbs *pb.Schema) *SchemaConfig {
return &SchemaConfig{
Name: pbs.Name,
Type: SchemaType(pbs.Type),
Definition: pbs.Definition,
Name: pbs.Name,
Type: SchemaType(pbs.Type),
Definition: pbs.Definition,
RevisionID: pbs.RevisionId,
RevisionCreateTime: pbs.RevisionCreateTime.AsTime(),
}
}

Expand Down Expand Up @@ -197,10 +226,59 @@ func (s *SchemaIterator) Next() (*SchemaConfig, error) {
return protoToSchemaConfig(pbs), nil
}

// ListSchemaRevisions lists all schema revisions for the named schema.
func (c *SchemaClient) ListSchemaRevisions(ctx context.Context, schemaID string, view SchemaView) *SchemaIterator {
return &SchemaIterator{
it: c.sc.ListSchemaRevisions(ctx, &pb.ListSchemaRevisionsRequest{
Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID),
View: pb.SchemaView(view),
}),
}
}

// CommitSchema commits a new schema revision to an existing schema.
func (c *SchemaClient) CommitSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error) {
req := &pb.CommitSchemaRequest{
Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID),
Schema: s.toProto(),
}
pbs, err := c.sc.CommitSchema(ctx, req)
if err != nil {
return nil, err
}
return protoToSchemaConfig(pbs), nil
}

// RollbackSchema creates a new schema revision that is a copy of the provided revision.
func (c *SchemaClient) RollbackSchema(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error) {
req := &pb.RollbackSchemaRequest{
Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID),
RevisionId: revisionID,
}
pbs, err := c.sc.RollbackSchema(ctx, req)
if err != nil {
return nil, err
}
return protoToSchemaConfig(pbs), nil
}

// DeleteSchemaRevision deletes a specific schema revision.
func (c *SchemaClient) DeleteSchemaRevision(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error) {
schemaPath := fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID)
schema, err := c.sc.DeleteSchemaRevision(ctx, &pb.DeleteSchemaRevisionRequest{
Name: schemaPath,
RevisionId: revisionID,
})
if err != nil {
return nil, err
}
return protoToSchemaConfig(schema), nil
}

// DeleteSchema deletes an existing schema given a schema ID.
func (s *SchemaClient) DeleteSchema(ctx context.Context, schemaID string) error {
schemaPath := fmt.Sprintf("projects/%s/schemas/%s", s.projectID, schemaID)
return s.sc.DeleteSchema(ctx, &pb.DeleteSchemaRequest{
func (c *SchemaClient) DeleteSchema(ctx context.Context, schemaID string) error {
schemaPath := fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID)
return c.sc.DeleteSchema(ctx, &pb.DeleteSchemaRequest{
Name: schemaPath,
})
}
Expand All @@ -210,12 +288,12 @@ func (s *SchemaClient) DeleteSchema(ctx context.Context, schemaID string) error
type ValidateSchemaResult struct{}

// ValidateSchema validates a schema config and returns an error if invalid.
func (s *SchemaClient) ValidateSchema(ctx context.Context, schema SchemaConfig) (*ValidateSchemaResult, error) {
func (c *SchemaClient) ValidateSchema(ctx context.Context, schema SchemaConfig) (*ValidateSchemaResult, error) {
req := &pb.ValidateSchemaRequest{
Parent: fmt.Sprintf("projects/%s", s.projectID),
Parent: fmt.Sprintf("projects/%s", c.projectID),
Schema: schema.toProto(),
}
_, err := s.sc.ValidateSchema(ctx, req)
_, err := c.sc.ValidateSchema(ctx, req)
if err != nil {
return nil, err
}
Expand All @@ -228,16 +306,16 @@ type ValidateMessageResult struct{}

// ValidateMessageWithConfig validates a message against an schema specified
// by a schema config.
func (s *SchemaClient) ValidateMessageWithConfig(ctx context.Context, msg []byte, encoding SchemaEncoding, config SchemaConfig) (*ValidateMessageResult, error) {
func (c *SchemaClient) ValidateMessageWithConfig(ctx context.Context, msg []byte, encoding SchemaEncoding, config SchemaConfig) (*ValidateMessageResult, error) {
req := &pb.ValidateMessageRequest{
Parent: fmt.Sprintf("projects/%s", s.projectID),
Parent: fmt.Sprintf("projects/%s", c.projectID),
SchemaSpec: &pb.ValidateMessageRequest_Schema{
Schema: config.toProto(),
},
Message: msg,
Encoding: pb.Encoding(encoding),
}
_, err := s.sc.ValidateMessage(ctx, req)
_, err := c.sc.ValidateMessage(ctx, req)
if err != nil {
return nil, err
}
Expand All @@ -246,16 +324,16 @@ func (s *SchemaClient) ValidateMessageWithConfig(ctx context.Context, msg []byte

// ValidateMessageWithID validates a message against an schema specified
// by the schema ID of an existing schema.
func (s *SchemaClient) ValidateMessageWithID(ctx context.Context, msg []byte, encoding SchemaEncoding, schemaID string) (*ValidateMessageResult, error) {
func (c *SchemaClient) ValidateMessageWithID(ctx context.Context, msg []byte, encoding SchemaEncoding, schemaID string) (*ValidateMessageResult, error) {
req := &pb.ValidateMessageRequest{
Parent: fmt.Sprintf("projects/%s", s.projectID),
Parent: fmt.Sprintf("projects/%s", c.projectID),
SchemaSpec: &pb.ValidateMessageRequest_Name{
Name: fmt.Sprintf("projects/%s/schemas/%s", s.projectID, schemaID),
Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID),
},
Message: msg,
Encoding: pb.Encoding(encoding),
}
_, err := s.sc.ValidateMessage(ctx, req)
_, err := c.sc.ValidateMessage(ctx, req)
if err != nil {
return nil, err
}
Expand Down
Loading