Skip to content

Commit

Permalink
Setup defined (and configurable) behavior if a ZedToken from
Browse files Browse the repository at this point in the history
an older datastore is used

All ZedTokens are now minted with the datastore's unique ID included
in the ZedToken and that ID is checked when the ZedToken is decoded.

In scenarios where the datastore ID does not match, either an error is
raised (watch, at_exact_snapshot) or configurable behavior is used
(at_least_as_fresh)

Fixes #1541
  • Loading branch information
josephschorr committed Mar 13, 2024
1 parent 2ea70ad commit 52895fe
Show file tree
Hide file tree
Showing 33 changed files with 704 additions and 143 deletions.
4 changes: 2 additions & 2 deletions e2e/newenemy/newenemy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,8 @@ func checkDataNoNewEnemy(ctx context.Context, t testing.TB, slowNodeID int, crdb
require.NoError(t, err)
t.Log("r2 token: ", r2.WrittenAt.Token)

z1, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock})
z2, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock})
z1, _, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock})
z2, _, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock})

t.Log("z1 revision: ", z1)
t.Log("z2 revision: ", z2)
Expand Down
4 changes: 1 addition & 3 deletions internal/datastore/crdb/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@ const (
colUniqueID = "unique_id"
)

var (
queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)
)
var queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata)

func (cds *crdbDatastore) UniqueID(ctx context.Context) (string, error) {
if cds.uniqueID.Load() == nil {
Expand Down
8 changes: 7 additions & 1 deletion internal/datastore/proxy/proxy_test/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,16 @@ import (

type MockDatastore struct {
mock.Mock

CurrentUniqueID string
}

func (dm *MockDatastore) UniqueID(_ context.Context) (string, error) {
return "mockds", nil
if dm.CurrentUniqueID == "" {
return "mockds", nil
}

return dm.CurrentUniqueID, nil
}

func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader {
Expand Down
9 changes: 8 additions & 1 deletion internal/datastore/revisions/commonrevision.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package revisions

import (
"context"

"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/spiceerrors"
)
Expand Down Expand Up @@ -43,7 +45,12 @@ func RevisionParser(kind RevisionKind) ParsingFunc {

// CommonDecoder is a revision decoder that can decode revisions of a given kind.
type CommonDecoder struct {
Kind RevisionKind
Kind RevisionKind
DatastoreUniqueID string
}

func (cd CommonDecoder) UniqueID(_ context.Context) (string, error) {
return cd.DatastoreUniqueID, nil
}

func (cd CommonDecoder) RevisionFromString(s string) (datastore.Revision, error) {
Expand Down
81 changes: 66 additions & 15 deletions internal/middleware/consistency/consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/authzed/spicedb/internal/services/shared"
"github.com/authzed/spicedb/pkg/cursor"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/spiceerrors"
"github.com/authzed/spicedb/pkg/zedtoken"
)

Expand Down Expand Up @@ -55,27 +56,47 @@ func RevisionFromContext(ctx context.Context) (datastore.Revision, *v1.ZedToken,
handle := c.(*revisionHandle)
rev := handle.revision
if rev != nil {
return rev, zedtoken.MustNewFromRevision(rev), nil
ds := datastoremw.FromContext(ctx)
if ds == nil {
return nil, nil, spiceerrors.MustBugf("consistency middleware did not inject datastore")
}

zedToken, err := zedtoken.NewFromRevision(ctx, rev, ds)
if err != nil {
return nil, nil, err
}

return rev, zedToken, nil
}
}

return nil, nil, fmt.Errorf("consistency middleware did not inject revision")
}

type MismatchingTokenOption int

const (
TreatMismatchingTokensAsFullConsistency MismatchingTokenOption = iota

TreatMismatchingTokensAsMinLatency

TreatMismatchingTokensAsError
)

// AddRevisionToContext adds a revision to the given context, based on the consistency block found
// in the given request (if applicable).
func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore) error {
func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore, option MismatchingTokenOption) error {
switch req := req.(type) {
case hasConsistency:
return addRevisionToContextFromConsistency(ctx, req, ds)
return addRevisionToContextFromConsistency(ctx, req, ds, option)
default:
return nil
}
}

// addRevisionToContextFromConsistency adds a revision to the given context, based on the consistency block found
// in the given request (if applicable).
func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency, ds datastore.Datastore) error {
func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency, ds datastore.Datastore, option MismatchingTokenOption) error {
handle := ctx.Value(revisionKey)
if handle == nil {
return nil
Expand All @@ -91,7 +112,7 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency
// Always use the revision encoded in the cursor.
ConsistentyCounter.WithLabelValues("snapshot", "cursor").Inc()

requestedRev, err := cursor.DecodeToDispatchRevision(withOptionalCursor.GetOptionalCursor(), ds)
requestedRev, _, err := cursor.DecodeToDispatchRevision(ctx, withOptionalCursor.GetOptionalCursor(), ds)
if err != nil {
return rewriteDatastoreError(ctx, err)
}
Expand Down Expand Up @@ -130,7 +151,7 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency
case consistency.GetAtLeastAsFresh() != nil:
// At least as fresh as: Pick one of the datastore's revision and that specified, which
// ever is later.
picked, pickedRequest, err := pickBestRevision(ctx, consistency.GetAtLeastAsFresh(), ds)
picked, pickedRequest, err := pickBestRevision(ctx, consistency.GetAtLeastAsFresh(), ds, option)
if err != nil {
return rewriteDatastoreError(ctx, err)
}
Expand All @@ -147,11 +168,16 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency
// Exact snapshot: Use the revision as encoded in the zed token.
ConsistentyCounter.WithLabelValues("snapshot", "request").Inc()

requestedRev, err := zedtoken.DecodeRevision(consistency.GetAtExactSnapshot(), ds)
requestedRev, status, err := zedtoken.DecodeRevision(consistency.GetAtExactSnapshot(), ds)
if err != nil {
return errInvalidZedToken
}

if status == zedtoken.StatusMismatchedDatastoreID {
log.Error().Str("zedtoken", consistency.GetAtExactSnapshot().Token).Msg("ZedToken specified references an older datastore but at-exact-snapshot was requested")
return fmt.Errorf("ZedToken specified references an older datastore but at-exact-snapshot was requested")
}

err = ds.CheckRevision(ctx, requestedRev)
if err != nil {
return rewriteDatastoreError(ctx, err)
Expand All @@ -175,7 +201,7 @@ var bypassServiceWhitelist = map[string]struct{}{

// UnaryServerInterceptor returns a new unary server interceptor that performs per-request exchange of
// the specified consistency configuration for the revision at which to perform the request.
func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
func UnaryServerInterceptor(option MismatchingTokenOption) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
for bypass := range bypassServiceWhitelist {
if strings.HasPrefix(info.FullMethod, bypass) {
Expand All @@ -184,7 +210,7 @@ func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
}
ds := datastoremw.MustFromContext(ctx)
newCtx := ContextWithHandle(ctx)
if err := AddRevisionToContext(newCtx, req, ds); err != nil {
if err := AddRevisionToContext(newCtx, req, ds, option); err != nil {
return nil, err
}

Expand All @@ -194,21 +220,22 @@ func UnaryServerInterceptor() grpc.UnaryServerInterceptor {

// StreamServerInterceptor returns a new stream server interceptor that performs per-request exchange of
// the specified consistency configuration for the revision at which to perform the request.
func StreamServerInterceptor() grpc.StreamServerInterceptor {
func StreamServerInterceptor(option MismatchingTokenOption) grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
for bypass := range bypassServiceWhitelist {
if strings.HasPrefix(info.FullMethod, bypass) {
return handler(srv, stream)
}
}
wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context())}
wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context()), option}
return handler(srv, wrapper)
}
}

type recvWrapper struct {
grpc.ServerStream
ctx context.Context
ctx context.Context
option MismatchingTokenOption
}

func (s *recvWrapper) Context() context.Context { return s.ctx }
Expand All @@ -219,24 +246,48 @@ func (s *recvWrapper) RecvMsg(m interface{}) error {
}
ds := datastoremw.MustFromContext(s.ctx)

return AddRevisionToContext(s.ctx, m, ds)
return AddRevisionToContext(s.ctx, m, ds, s.option)
}

// pickBestRevision compares the provided ZedToken with the optimized revision of the datastore, and returns the most
// recent one. The boolean return value will be true if the provided ZedToken is the most recent, false otherwise.
func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore.Datastore) (datastore.Revision, bool, error) {
func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore.Datastore, option MismatchingTokenOption) (datastore.Revision, bool, error) {
// Calculate a revision as we see fit
databaseRev, err := ds.OptimizedRevision(ctx)
if err != nil {
return datastore.NoRevision, false, err
}

if requested != nil {
requestedRev, err := zedtoken.DecodeRevision(requested, ds)
requestedRev, status, err := zedtoken.DecodeRevision(requested, ds)
if err != nil {
return datastore.NoRevision, false, errInvalidZedToken
}

if status == zedtoken.StatusMismatchedDatastoreID {
switch option {
case TreatMismatchingTokensAsFullConsistency:
log.Warn().Str("zedtoken", requested.Token).Msg("ZedToken specified references an older datastore and SpiceDB is configured to treat this as a full consistency request")
headRev, err := ds.HeadRevision(ctx)
if err != nil {
return datastore.NoRevision, false, err
}

return headRev, false, nil

case TreatMismatchingTokensAsMinLatency:
log.Warn().Str("zedtoken", requested.Token).Msg("ZedToken specified references an older datastore and SpiceDB is configured to treat this as a min latency request")
return databaseRev, false, nil

case TreatMismatchingTokensAsError:
log.Error().Str("zedtoken", requested.Token).Msg("ZedToken specified references an older datastore and SpiceDB is configured to raise an error in this scenario")
return datastore.NoRevision, false, fmt.Errorf("ZedToken specified references an older datastore and SpiceDB is configured to raise an error in this scenario")

default:
return datastore.NoRevision, false, spiceerrors.MustBugf("unknown mismatching token option: %v", option)
}
}

if databaseRev.GreaterThan(requestedRev) {
return databaseRev, false, nil
}
Expand Down
Loading

0 comments on commit 52895fe

Please sign in to comment.