From 52895fee8f070cd624aa5ff50267dc788188fbef Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 29 Jan 2024 15:58:39 -0500 Subject: [PATCH] Setup defined (and configurable) behavior if a ZedToken from an older datastore is used All ZedTokens are now minted with the datastore's unique ID included in the ZedToken and that ID is checked when the ZedToken is decoded. In scenarios where the datastore ID does not match, either an error is raised (watch, at_exact_snapshot) or configurable behavior is used (at_least_as_fresh) Fixes #1541 --- e2e/newenemy/newenemy_test.go | 4 +- internal/datastore/crdb/stats.go | 4 +- internal/datastore/proxy/proxy_test/mock.go | 8 +- .../datastore/revisions/commonrevision.go | 9 +- .../middleware/consistency/consistency.go | 81 ++++++-- .../consistency/consistency_test.go | 190 +++++++++++++++++- .../services/integrationtesting/cert_test.go | 8 +- .../consistencytestutil/servicetester.go | 12 +- .../services/integrationtesting/perf_test.go | 2 +- internal/services/v1/debug_test.go | 2 +- internal/services/v1/metadata_test.go | 8 +- internal/services/v1/permissions_test.go | 32 +-- internal/services/v1/relationships.go | 14 +- internal/services/v1/relationships_test.go | 15 +- internal/services/v1/schema.go | 14 +- internal/services/v1/watch.go | 13 +- internal/services/v1/watch_test.go | 2 +- internal/testserver/server.go | 4 +- pkg/cmd/serve.go | 1 + pkg/cmd/server/defaults.go | 19 +- pkg/cmd/server/server.go | 21 ++ pkg/cmd/server/server_test.go | 5 +- pkg/cmd/server/zz_generated.options.go | 9 + pkg/cmd/testserver/testserver.go | 8 +- pkg/cursor/cursor.go | 26 ++- pkg/cursor/cursor_test.go | 5 +- pkg/development/devcontext.go | 4 +- pkg/proto/impl/v1/impl.pb.go | 36 +++- pkg/proto/impl/v1/impl.pb.validate.go | 4 + pkg/proto/impl/v1/impl_vtproto.pb.go | 94 +++++++++ pkg/zedtoken/zedtoken.go | 73 +++++-- pkg/zedtoken/zedtoken_test.go | 112 +++++++++-- proto/internal/impl/v1/impl.proto | 8 + 33 files changed, 704 insertions(+), 143 deletions(-) diff --git a/e2e/newenemy/newenemy_test.go b/e2e/newenemy/newenemy_test.go index 7c37737442..e2352367d0 100644 --- a/e2e/newenemy/newenemy_test.go +++ b/e2e/newenemy/newenemy_test.go @@ -376,8 +376,8 @@ func checkDataNoNewEnemy(ctx context.Context, t testing.TB, slowNodeID int, crdb require.NoError(t, err) t.Log("r2 token: ", r2.WrittenAt.Token) - z1, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) - z2, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) + z1, _, _ := zedtoken.DecodeRevision(r1.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) + z2, _, _ := zedtoken.DecodeRevision(r2.WrittenAt, revisions.CommonDecoder{Kind: revisions.HybridLogicalClock}) t.Log("z1 revision: ", z1) t.Log("z2 revision: ", z2) diff --git a/internal/datastore/crdb/stats.go b/internal/datastore/crdb/stats.go index 3dffe03574..a8a7e25261 100644 --- a/internal/datastore/crdb/stats.go +++ b/internal/datastore/crdb/stats.go @@ -18,9 +18,7 @@ const ( colUniqueID = "unique_id" ) -var ( - queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata) -) +var queryReadUniqueID = psql.Select(colUniqueID).From(tableMetadata) func (cds *crdbDatastore) UniqueID(ctx context.Context) (string, error) { if cds.uniqueID.Load() == nil { diff --git a/internal/datastore/proxy/proxy_test/mock.go b/internal/datastore/proxy/proxy_test/mock.go index 02ac011b3f..422f6011fb 100644 --- a/internal/datastore/proxy/proxy_test/mock.go +++ b/internal/datastore/proxy/proxy_test/mock.go @@ -13,10 +13,16 @@ import ( type MockDatastore struct { mock.Mock + + CurrentUniqueID string } func (dm *MockDatastore) UniqueID(_ context.Context) (string, error) { - return "mockds", nil + if dm.CurrentUniqueID == "" { + return "mockds", nil + } + + return dm.CurrentUniqueID, nil } func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader { diff --git a/internal/datastore/revisions/commonrevision.go b/internal/datastore/revisions/commonrevision.go index de85496d9b..c1adc51fca 100644 --- a/internal/datastore/revisions/commonrevision.go +++ b/internal/datastore/revisions/commonrevision.go @@ -1,6 +1,8 @@ package revisions import ( + "context" + "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/spiceerrors" ) @@ -43,7 +45,12 @@ func RevisionParser(kind RevisionKind) ParsingFunc { // CommonDecoder is a revision decoder that can decode revisions of a given kind. type CommonDecoder struct { - Kind RevisionKind + Kind RevisionKind + DatastoreUniqueID string +} + +func (cd CommonDecoder) UniqueID(_ context.Context) (string, error) { + return cd.DatastoreUniqueID, nil } func (cd CommonDecoder) RevisionFromString(s string) (datastore.Revision, error) { diff --git a/internal/middleware/consistency/consistency.go b/internal/middleware/consistency/consistency.go index c928e6b88a..ac6e222870 100644 --- a/internal/middleware/consistency/consistency.go +++ b/internal/middleware/consistency/consistency.go @@ -18,6 +18,7 @@ import ( "github.com/authzed/spicedb/internal/services/shared" "github.com/authzed/spicedb/pkg/cursor" "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/spiceerrors" "github.com/authzed/spicedb/pkg/zedtoken" ) @@ -55,19 +56,39 @@ func RevisionFromContext(ctx context.Context) (datastore.Revision, *v1.ZedToken, handle := c.(*revisionHandle) rev := handle.revision if rev != nil { - return rev, zedtoken.MustNewFromRevision(rev), nil + ds := datastoremw.FromContext(ctx) + if ds == nil { + return nil, nil, spiceerrors.MustBugf("consistency middleware did not inject datastore") + } + + zedToken, err := zedtoken.NewFromRevision(ctx, rev, ds) + if err != nil { + return nil, nil, err + } + + return rev, zedToken, nil } } return nil, nil, fmt.Errorf("consistency middleware did not inject revision") } +type MismatchingTokenOption int + +const ( + TreatMismatchingTokensAsFullConsistency MismatchingTokenOption = iota + + TreatMismatchingTokensAsMinLatency + + TreatMismatchingTokensAsError +) + // AddRevisionToContext adds a revision to the given context, based on the consistency block found // in the given request (if applicable). -func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore) error { +func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Datastore, option MismatchingTokenOption) error { switch req := req.(type) { case hasConsistency: - return addRevisionToContextFromConsistency(ctx, req, ds) + return addRevisionToContextFromConsistency(ctx, req, ds, option) default: return nil } @@ -75,7 +96,7 @@ func AddRevisionToContext(ctx context.Context, req interface{}, ds datastore.Dat // addRevisionToContextFromConsistency adds a revision to the given context, based on the consistency block found // in the given request (if applicable). -func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency, ds datastore.Datastore) error { +func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency, ds datastore.Datastore, option MismatchingTokenOption) error { handle := ctx.Value(revisionKey) if handle == nil { return nil @@ -91,7 +112,7 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency // Always use the revision encoded in the cursor. ConsistentyCounter.WithLabelValues("snapshot", "cursor").Inc() - requestedRev, err := cursor.DecodeToDispatchRevision(withOptionalCursor.GetOptionalCursor(), ds) + requestedRev, _, err := cursor.DecodeToDispatchRevision(ctx, withOptionalCursor.GetOptionalCursor(), ds) if err != nil { return rewriteDatastoreError(ctx, err) } @@ -130,7 +151,7 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency case consistency.GetAtLeastAsFresh() != nil: // At least as fresh as: Pick one of the datastore's revision and that specified, which // ever is later. - picked, pickedRequest, err := pickBestRevision(ctx, consistency.GetAtLeastAsFresh(), ds) + picked, pickedRequest, err := pickBestRevision(ctx, consistency.GetAtLeastAsFresh(), ds, option) if err != nil { return rewriteDatastoreError(ctx, err) } @@ -147,11 +168,16 @@ func addRevisionToContextFromConsistency(ctx context.Context, req hasConsistency // Exact snapshot: Use the revision as encoded in the zed token. ConsistentyCounter.WithLabelValues("snapshot", "request").Inc() - requestedRev, err := zedtoken.DecodeRevision(consistency.GetAtExactSnapshot(), ds) + requestedRev, status, err := zedtoken.DecodeRevision(consistency.GetAtExactSnapshot(), ds) if err != nil { return errInvalidZedToken } + if status == zedtoken.StatusMismatchedDatastoreID { + log.Error().Str("zedtoken", consistency.GetAtExactSnapshot().Token).Msg("ZedToken specified references an older datastore but at-exact-snapshot was requested") + return fmt.Errorf("ZedToken specified references an older datastore but at-exact-snapshot was requested") + } + err = ds.CheckRevision(ctx, requestedRev) if err != nil { return rewriteDatastoreError(ctx, err) @@ -175,7 +201,7 @@ var bypassServiceWhitelist = map[string]struct{}{ // UnaryServerInterceptor returns a new unary server interceptor that performs per-request exchange of // the specified consistency configuration for the revision at which to perform the request. -func UnaryServerInterceptor() grpc.UnaryServerInterceptor { +func UnaryServerInterceptor(option MismatchingTokenOption) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { for bypass := range bypassServiceWhitelist { if strings.HasPrefix(info.FullMethod, bypass) { @@ -184,7 +210,7 @@ func UnaryServerInterceptor() grpc.UnaryServerInterceptor { } ds := datastoremw.MustFromContext(ctx) newCtx := ContextWithHandle(ctx) - if err := AddRevisionToContext(newCtx, req, ds); err != nil { + if err := AddRevisionToContext(newCtx, req, ds, option); err != nil { return nil, err } @@ -194,21 +220,22 @@ func UnaryServerInterceptor() grpc.UnaryServerInterceptor { // StreamServerInterceptor returns a new stream server interceptor that performs per-request exchange of // the specified consistency configuration for the revision at which to perform the request. -func StreamServerInterceptor() grpc.StreamServerInterceptor { +func StreamServerInterceptor(option MismatchingTokenOption) grpc.StreamServerInterceptor { return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { for bypass := range bypassServiceWhitelist { if strings.HasPrefix(info.FullMethod, bypass) { return handler(srv, stream) } } - wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context())} + wrapper := &recvWrapper{stream, ContextWithHandle(stream.Context()), option} return handler(srv, wrapper) } } type recvWrapper struct { grpc.ServerStream - ctx context.Context + ctx context.Context + option MismatchingTokenOption } func (s *recvWrapper) Context() context.Context { return s.ctx } @@ -219,12 +246,12 @@ func (s *recvWrapper) RecvMsg(m interface{}) error { } ds := datastoremw.MustFromContext(s.ctx) - return AddRevisionToContext(s.ctx, m, ds) + return AddRevisionToContext(s.ctx, m, ds, s.option) } // pickBestRevision compares the provided ZedToken with the optimized revision of the datastore, and returns the most // recent one. The boolean return value will be true if the provided ZedToken is the most recent, false otherwise. -func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore.Datastore) (datastore.Revision, bool, error) { +func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore.Datastore, option MismatchingTokenOption) (datastore.Revision, bool, error) { // Calculate a revision as we see fit databaseRev, err := ds.OptimizedRevision(ctx) if err != nil { @@ -232,11 +259,35 @@ func pickBestRevision(ctx context.Context, requested *v1.ZedToken, ds datastore. } if requested != nil { - requestedRev, err := zedtoken.DecodeRevision(requested, ds) + requestedRev, status, err := zedtoken.DecodeRevision(requested, ds) if err != nil { return datastore.NoRevision, false, errInvalidZedToken } + if status == zedtoken.StatusMismatchedDatastoreID { + switch option { + case TreatMismatchingTokensAsFullConsistency: + log.Warn().Str("zedtoken", requested.Token).Msg("ZedToken specified references an older datastore and SpiceDB is configured to treat this as a full consistency request") + headRev, err := ds.HeadRevision(ctx) + if err != nil { + return datastore.NoRevision, false, err + } + + return headRev, false, nil + + case TreatMismatchingTokensAsMinLatency: + log.Warn().Str("zedtoken", requested.Token).Msg("ZedToken specified references an older datastore and SpiceDB is configured to treat this as a min latency request") + return databaseRev, false, nil + + case TreatMismatchingTokensAsError: + log.Error().Str("zedtoken", requested.Token).Msg("ZedToken specified references an older datastore and SpiceDB is configured to raise an error in this scenario") + return datastore.NoRevision, false, fmt.Errorf("ZedToken specified references an older datastore and SpiceDB is configured to raise an error in this scenario") + + default: + return datastore.NoRevision, false, spiceerrors.MustBugf("unknown mismatching token option: %v", option) + } + } + if databaseRev.GreaterThan(requestedRev) { return databaseRev, false, nil } diff --git a/internal/middleware/consistency/consistency_test.go b/internal/middleware/consistency/consistency_test.go index 58a2555246..e28a816805 100644 --- a/internal/middleware/consistency/consistency_test.go +++ b/internal/middleware/consistency/consistency_test.go @@ -10,6 +10,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/proxy/proxy_test" "github.com/authzed/spicedb/internal/datastore/revisions" + datastoremw "github.com/authzed/spicedb/internal/middleware/datastore" "github.com/authzed/spicedb/pkg/cursor" dispatch "github.com/authzed/spicedb/pkg/proto/dispatch/v1" "github.com/authzed/spicedb/pkg/zedtoken" @@ -29,7 +30,9 @@ func TestAddRevisionToContextNoneSupplied(t *testing.T) { ds.On("OptimizedRevision").Return(optimized, nil).Once() updated := ContextWithHandle(context.Background()) - err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{}, ds) + updated = datastoremw.ContextWithDatastore(updated, ds) + + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{}, ds, TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -46,13 +49,15 @@ func TestAddRevisionToContextMinimizeLatency(t *testing.T) { ds.On("OptimizedRevision").Return(optimized, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_MinimizeLatency{ MinimizeLatency: true, }, }, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -69,13 +74,15 @@ func TestAddRevisionToContextFullyConsistent(t *testing.T) { ds.On("HeadRevision").Return(head, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_FullyConsistent{ FullyConsistent: true, }, }, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -93,13 +100,15 @@ func TestAddRevisionToContextAtLeastAsFresh(t *testing.T) { ds.On("RevisionFromString", exact.String()).Return(exact, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(exact), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(exact), }, }, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -117,13 +126,15 @@ func TestAddRevisionToContextAtValidExactSnapshot(t *testing.T) { ds.On("RevisionFromString", exact.String()).Return(exact, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ - AtExactSnapshot: zedtoken.MustNewFromRevision(exact), + AtExactSnapshot: zedtoken.MustNewFromRevisionForTesting(exact), }, }, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.NoError(err) rev, _, err := RevisionFromContext(updated) @@ -141,13 +152,15 @@ func TestAddRevisionToContextAtInvalidExactSnapshot(t *testing.T) { ds.On("RevisionFromString", zero.String()).Return(zero, nil).Once() updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ - AtExactSnapshot: zedtoken.MustNewFromRevision(zero), + AtExactSnapshot: zedtoken.MustNewFromRevisionForTesting(zero), }, }, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.Error(err) ds.AssertExpectations(t) } @@ -155,7 +168,10 @@ func TestAddRevisionToContextAtInvalidExactSnapshot(t *testing.T) { func TestAddRevisionToContextNoConsistencyAPI(t *testing.T) { require := require.New(t) + ds := &proxy_test.MockDatastore{} + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) _, _, err := RevisionFromContext(updated) require.Error(err) @@ -174,14 +190,16 @@ func TestAddRevisionToContextWithCursor(t *testing.T) { // revision in context is at `exact` updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtExactSnapshot{ - AtExactSnapshot: zedtoken.MustNewFromRevision(exact), + AtExactSnapshot: zedtoken.MustNewFromRevisionForTesting(exact), }, }, OptionalCursor: cursor, - }, ds) + }, ds, TreatMismatchingTokensAsError) require.NoError(err) // ensure we get back `optimized` from the cursor @@ -191,3 +209,153 @@ func TestAddRevisionToContextWithCursor(t *testing.T) { require.True(optimized.Equal(rev)) ds.AssertExpectations(t) } + +func TestAtExactSnapshotWithMismatchedToken(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtExactSnapshot{ + AtExactSnapshot: zedToken, + }, + }, + }, ds, TreatMismatchingTokensAsError) + require.Error(err) + require.ErrorContains(err, "ZedToken specified references an older datastore but at-exact-snapshot") +} + +func TestAtLeastAsFreshWithMismatchedTokenExpectError(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedToken, + }, + }, + }, ds, TreatMismatchingTokensAsError) + require.Error(err) + require.ErrorContains(err, "ZedToken specified references an older datastore and SpiceDB is configured to raise an error in this scenario") +} + +func TestAtLeastAsFreshWithMismatchedTokenExpectMinLatency(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedToken, + }, + }, + }, ds, TreatMismatchingTokensAsMinLatency) + require.NoError(err) + + rev, _, err := RevisionFromContext(updated) + require.NoError(err) + + require.True(optimized.Equal(rev)) + ds.AssertExpectations(t) +} + +func TestAtLeastAsFreshWithMismatchedTokenExpectFullConsistency(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("HeadRevision").Return(head, nil).Once() + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", optimized.String()).Return(optimized, nil).Once() + + // revision in context is at `exact` + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + // mint a token with a different datastore ID. + ds.CurrentUniqueID = "foo" + zedToken, err := zedtoken.NewFromRevision(context.Background(), optimized, ds) + require.NoError(err) + + ds.CurrentUniqueID = "bar" + err = AddRevisionToContext(updated, &v1.LookupResourcesRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedToken, + }, + }, + }, ds, TreatMismatchingTokensAsFullConsistency) + require.NoError(err) + + rev, _, err := RevisionFromContext(updated) + require.NoError(err) + + require.True(head.Equal(rev)) + ds.AssertExpectations(t) +} + +func TestAddRevisionToContextAtLeastAsFreshMatchingIDs(t *testing.T) { + require := require.New(t) + + ds := &proxy_test.MockDatastore{} + ds.On("OptimizedRevision").Return(optimized, nil).Once() + ds.On("RevisionFromString", exact.String()).Return(exact, nil).Once() + + ds.CurrentUniqueID = "foo" + + updated := ContextWithHandle(context.Background()) + updated = datastoremw.ContextWithDatastore(updated, ds) + + err := AddRevisionToContext(updated, &v1.ReadRelationshipsRequest{ + Consistency: &v1.Consistency{ + Requirement: &v1.Consistency_AtLeastAsFresh{ + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(exact), + }, + }, + }, ds, TreatMismatchingTokensAsError) + require.NoError(err) + + rev, _, err := RevisionFromContext(updated) + require.NoError(err) + + require.True(exact.Equal(rev)) + ds.AssertExpectations(t) +} diff --git a/internal/services/integrationtesting/cert_test.go b/internal/services/integrationtesting/cert_test.go index 516aa6de19..4ccdf27271 100644 --- a/internal/services/integrationtesting/cert_test.go +++ b/internal/services/integrationtesting/cert_test.go @@ -146,7 +146,7 @@ func TestCertRotation(t *testing.T) { }, { Name: "consistency", - Middleware: consistency.UnaryServerInterceptor(), + Middleware: consistency.UnaryServerInterceptor(consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", @@ -165,7 +165,7 @@ func TestCertRotation(t *testing.T) { }, { Name: "consistency", - Middleware: consistency.StreamServerInterceptor(), + Middleware: consistency.StreamServerInterceptor(consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", @@ -209,7 +209,7 @@ func TestCertRotation(t *testing.T) { _, err = client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: rel.Resource, @@ -262,7 +262,7 @@ func TestCertRotation(t *testing.T) { _, err = client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: rel.Resource, diff --git a/internal/services/integrationtesting/consistencytestutil/servicetester.go b/internal/services/integrationtesting/consistencytestutil/servicetester.go index 0dbc26a2c6..24a4786ee9 100644 --- a/internal/services/integrationtesting/consistencytestutil/servicetester.go +++ b/internal/services/integrationtesting/consistencytestutil/servicetester.go @@ -78,7 +78,7 @@ func (v1st v1ServiceTester) Check(ctx context.Context, resource *core.ObjectAndR }, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, Context: context, @@ -98,7 +98,7 @@ func (v1st v1ServiceTester) Expand(ctx context.Context, resource *core.ObjectAnd Permission: resource.Relation, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) @@ -128,7 +128,7 @@ func (v1st v1ServiceTester) Read(_ context.Context, namespaceName string, atRevi }, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) @@ -166,7 +166,7 @@ func (v1st v1ServiceTester) LookupResources(_ context.Context, resourceRelation }, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, OptionalLimit: limit, @@ -214,7 +214,7 @@ func (v1st v1ServiceTester) LookupSubjects(_ context.Context, resource *core.Obj OptionalSubjectRelation: optionalizeRelation(subjectRelation.Relation), Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, Context: builtContext, @@ -244,7 +244,7 @@ func (v1st v1ServiceTester) BulkCheck(ctx context.Context, items []*v1.BulkCheck Items: items, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(atRevision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(atRevision), }, }, }) diff --git a/internal/services/integrationtesting/perf_test.go b/internal/services/integrationtesting/perf_test.go index f411bce80e..d2ab1786b9 100644 --- a/internal/services/integrationtesting/perf_test.go +++ b/internal/services/integrationtesting/perf_test.go @@ -58,7 +58,7 @@ func TestBurst(t *testing.T) { _, err := client.CheckPermission(context.Background(), &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: rel.Resource, diff --git a/internal/services/v1/debug_test.go b/internal/services/v1/debug_test.go index b925a33609..0fc8c65915 100644 --- a/internal/services/v1/debug_test.go +++ b/internal/services/v1/debug_test.go @@ -468,7 +468,7 @@ func TestCheckPermissionWithDebug(t *testing.T) { checkResp, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: stc.checkRequest.resource, diff --git a/internal/services/v1/metadata_test.go b/internal/services/v1/metadata_test.go index 90ffc962b6..82d195ea23 100644 --- a/internal/services/v1/metadata_test.go +++ b/internal/services/v1/metadata_test.go @@ -38,7 +38,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { _, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -96,7 +96,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { _, err := client.ExpandPermissionTree(ctx, &v1.ExpandPermissionTreeRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -130,7 +130,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { stream, err := client.LookupResources(ctx, &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, ResourceObjectType: "document", @@ -155,7 +155,7 @@ func TestAllMethodsReturnMetadata(t *testing.T) { stream, err := client.LookupSubjects(ctx, &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), diff --git a/internal/services/v1/permissions_test.go b/internal/services/v1/permissions_test.go index f653154a2f..f7ec048b81 100644 --- a/internal/services/v1/permissions_test.go +++ b/internal/services/v1/permissions_test.go @@ -274,7 +274,7 @@ func TestCheckPermissions(t *testing.T) { checkResp, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: tc.resource, @@ -331,7 +331,7 @@ func TestCheckPermissionWithDebugInfo(t *testing.T) { checkResp, err := client.CheckPermission(ctx, &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "masterplan"), @@ -548,7 +548,7 @@ func TestLookupResources(t *testing.T) { Subject: tc.subject, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) @@ -623,7 +623,7 @@ func TestExpand(t *testing.T) { Permission: tc.startPermission, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) @@ -867,7 +867,7 @@ func TestLookupSubjects(t *testing.T) { OptionalSubjectRelation: tc.subjectRelation, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }, grpc.Trailer(&trailer)) @@ -915,7 +915,7 @@ func TestCheckWithCaveats(t *testing.T) { request := &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "companyplan"), @@ -1027,7 +1027,7 @@ func TestCheckWithCaveatErrors(t *testing.T) { request := &v1.CheckPermissionRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "firstdoc"), @@ -1080,7 +1080,7 @@ func TestLookupResourcesWithCaveats(t *testing.T) { request := &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, ResourceObjectType: "document", @@ -1126,7 +1126,7 @@ func TestLookupResourcesWithCaveats(t *testing.T) { request = &v1.LookupResourcesRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, ResourceObjectType: "document", @@ -1199,7 +1199,7 @@ func TestLookupSubjectsWithCaveats(t *testing.T) { request := &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1244,7 +1244,7 @@ func TestLookupSubjectsWithCaveats(t *testing.T) { request = &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1289,7 +1289,7 @@ func TestLookupSubjectsWithCaveats(t *testing.T) { request = &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1363,7 +1363,7 @@ func TestLookupSubjectsWithCaveatedWildcards(t *testing.T) { request := &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1402,7 +1402,7 @@ func TestLookupSubjectsWithCaveatedWildcards(t *testing.T) { request = &v1.LookupSubjectsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, Resource: obj("document", "first"), @@ -1546,7 +1546,7 @@ func TestLookupResourcesWithCursors(t *testing.T) { Subject: tc.subject, Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, OptionalLimit: uint32(limit), @@ -1615,7 +1615,7 @@ func TestLookupResourcesDeduplication(t *testing.T) { Subject: sub("user", "tom", ""), Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, }) diff --git a/internal/services/v1/relationships.go b/internal/services/v1/relationships.go index 7d3f8b6ce3..b23b9aebf8 100644 --- a/internal/services/v1/relationships.go +++ b/internal/services/v1/relationships.go @@ -329,8 +329,13 @@ func (ps *permissionServer) WriteRelationships(ctx context.Context, req *v1.Writ writeUpdateCounter.WithLabelValues(v1.RelationshipUpdate_Operation_name[int32(kind)]).Observe(float64(count)) } + zedToken, err := zedtoken.NewFromRevision(ctx, revision, ds) + if err != nil { + return nil, ps.rewriteError(ctx, err) + } + return &v1.WriteRelationshipsResponse{ - WrittenAt: zedtoken.MustNewFromRevision(revision), + WrittenAt: zedToken, }, nil } @@ -424,8 +429,13 @@ func (ps *permissionServer) DeleteRelationships(ctx context.Context, req *v1.Del return nil, ps.rewriteError(ctx, err) } + zedToken, err := zedtoken.NewFromRevision(ctx, revision, ds) + if err != nil { + return nil, ps.rewriteError(ctx, err) + } + return &v1.DeleteRelationshipsResponse{ - DeletedAt: zedtoken.MustNewFromRevision(revision), + DeletedAt: zedToken, DeletionProgress: deletionProgress, }, nil } diff --git a/internal/services/v1/relationships_test.go b/internal/services/v1/relationships_test.go index 3c1bedcc04..662a610ef4 100644 --- a/internal/services/v1/relationships_test.go +++ b/internal/services/v1/relationships_test.go @@ -296,7 +296,7 @@ func TestReadRelationships(t *testing.T) { stream, err := client.ReadRelationships(context.Background(), &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, RelationshipFilter: tc.filter, @@ -1151,7 +1151,7 @@ func TestDeleteRelationships(t *testing.T) { } require.NoError(err) require.NotNil(resp.DeletedAt) - rev, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) + rev, _, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) require.NoError(err) require.True(rev.GreaterThan(revision)) require.EqualValues(standardTuplesWithout(tc.deleted), readAll(require, client, resp.DeletedAt)) @@ -1227,7 +1227,7 @@ func TestDeleteRelationshipsBeyondLimitPartial(t *testing.T) { headRev, err := ds.HeadRevision(context.Background()) require.NoError(err) - beforeDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevision(headRev)) + beforeDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevisionForTesting(headRev)) resp, err := client.DeleteRelationships(context.Background(), &v1.DeleteRelationshipsRequest{ RelationshipFilter: &v1.RelationshipFilter{ @@ -1238,7 +1238,10 @@ func TestDeleteRelationshipsBeyondLimitPartial(t *testing.T) { }) require.NoError(err) - afterDelete := readOfType(require, "document", client, resp.DeletedAt) + headRev, err = ds.HeadRevision(context.Background()) + require.NoError(err) + + afterDelete := readOfType(require, "document", client, zedtoken.MustNewFromRevisionForTesting(headRev)) require.LessOrEqual(len(beforeDelete)-len(afterDelete), batchSize) if i == 0 { @@ -1249,7 +1252,7 @@ func TestDeleteRelationshipsBeyondLimitPartial(t *testing.T) { require.NoError(err) require.NotNil(resp.DeletedAt) - rev, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) + rev, _, err := zedtoken.DecodeRevision(resp.DeletedAt, ds) require.NoError(err) require.True(rev.GreaterThan(revision)) require.EqualValues(standardTuplesWithout(expected), readAll(require, client, resp.DeletedAt)) @@ -1506,7 +1509,7 @@ func TestReadRelationshipsInvalidCursor(t *testing.T) { stream, err := client.ReadRelationships(context.Background(), &v1.ReadRelationshipsRequest{ Consistency: &v1.Consistency{ Requirement: &v1.Consistency_AtLeastAsFresh{ - AtLeastAsFresh: zedtoken.MustNewFromRevision(revision), + AtLeastAsFresh: zedtoken.MustNewFromRevisionForTesting(revision), }, }, RelationshipFilter: &v1.RelationshipFilter{ diff --git a/internal/services/v1/schema.go b/internal/services/v1/schema.go index 25cfc72ba9..8638949dd6 100644 --- a/internal/services/v1/schema.go +++ b/internal/services/v1/schema.go @@ -97,9 +97,14 @@ func (ss *schemaServer) ReadSchema(ctx context.Context, _ *v1.ReadSchemaRequest) DispatchCount: dispatchCount, }) + zedToken, err := zedtoken.NewFromRevision(ctx, headRevision, ds) + if err != nil { + return nil, ss.rewriteError(ctx, err) + } + return &v1.ReadSchemaResponse{ SchemaText: schemaText, - ReadAt: zedtoken.MustNewFromRevision(headRevision), + ReadAt: zedToken, }, nil } @@ -145,7 +150,12 @@ func (ss *schemaServer) WriteSchema(ctx context.Context, in *v1.WriteSchemaReque return nil, ss.rewriteError(ctx, err) } + zedToken, err := zedtoken.NewFromRevision(ctx, revision, ds) + if err != nil { + return nil, ss.rewriteError(ctx, err) + } + return &v1.WriteSchemaResponse{ - WrittenAt: zedtoken.MustNewFromRevision(revision), + WrittenAt: zedToken, }, nil } diff --git a/internal/services/v1/watch.go b/internal/services/v1/watch.go index 0bb42fc733..8c8bbc7d74 100644 --- a/internal/services/v1/watch.go +++ b/internal/services/v1/watch.go @@ -52,11 +52,15 @@ func (ws *watchServer) Watch(req *v1.WatchRequest, stream v1.WatchService_WatchS var afterRevision datastore.Revision if req.OptionalStartCursor != nil && req.OptionalStartCursor.Token != "" { - decodedRevision, err := zedtoken.DecodeRevision(req.OptionalStartCursor, ds) + decodedRevision, tokenStatus, err := zedtoken.DecodeRevision(req.OptionalStartCursor, ds) if err != nil { return status.Errorf(codes.InvalidArgument, "failed to decode start revision: %s", err) } + if tokenStatus == zedtoken.StatusMismatchedDatastoreID { + return status.Errorf(codes.InvalidArgument, "start revision was generated by a different datastore") + } + afterRevision = decodedRevision } else { var err error @@ -95,9 +99,14 @@ func (ws *watchServer) Watch(req *v1.WatchRequest, stream v1.WatchService_WatchS if ok { filtered := filterUpdates(objectTypes, filters, update.RelationshipChanges) if len(filtered) > 0 { + zedToken, err := zedtoken.NewFromRevision(ctx, update.Revision, ds) + if err != nil { + return err + } + if err := stream.Send(&v1.WatchResponse{ Updates: filtered, - ChangesThrough: zedtoken.MustNewFromRevision(update.Revision), + ChangesThrough: zedToken, }); err != nil { return status.Errorf(codes.Canceled, "watch canceled by user: %s", err) } diff --git a/internal/services/v1/watch_test.go b/internal/services/v1/watch_test.go index 09de4fcecd..97f499dfae 100644 --- a/internal/services/v1/watch_test.go +++ b/internal/services/v1/watch_test.go @@ -210,7 +210,7 @@ func TestWatch(t *testing.T) { t.Cleanup(cleanup) client := v1.NewWatchServiceClient(conn) - cursor := zedtoken.MustNewFromRevision(revision) + cursor := zedtoken.MustNewFromRevisionForTesting(revision) if tc.startCursor != nil { cursor = tc.startCursor } diff --git a/internal/testserver/server.go b/internal/testserver/server.go index 108d3156c3..e4dbc25919 100644 --- a/internal/testserver/server.go +++ b/internal/testserver/server.go @@ -90,7 +90,7 @@ func NewTestServerWithConfig(require *require.Assertions, }, { Name: "consistency", - Middleware: consistency.UnaryServerInterceptor(), + Middleware: consistency.UnaryServerInterceptor(consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", @@ -113,7 +113,7 @@ func NewTestServerWithConfig(require *require.Assertions, }, { Name: "consistency", - Middleware: consistency.StreamServerInterceptor(), + Middleware: consistency.StreamServerInterceptor(consistency.TreatMismatchingTokensAsError), }, { Name: "servicespecific", diff --git a/pkg/cmd/serve.go b/pkg/cmd/serve.go index ffdcc7dbf5..54ee2da3eb 100644 --- a/pkg/cmd/serve.go +++ b/pkg/cmd/serve.go @@ -132,6 +132,7 @@ func RegisterServeFlags(cmd *cobra.Command, config *server.Config) error { cmd.Flags().IntVar(&config.MaxRelationshipContextSize, "max-relationship-context-size", 25000, "maximum allowed size of the context to be stored in a relationship") cmd.Flags().DurationVar(&config.StreamingAPITimeout, "streaming-api-response-delay-timeout", 30*time.Second, "max duration time elapsed between messages sent by the server-side to the client (responses) before the stream times out") cmd.Flags().DurationVar(&config.WatchHeartbeat, "watch-api-heartbeat", 1*time.Second, "heartbeat time on the watch in the API. 0 means to default to the datastore's minimum.") + cmd.Flags().StringVar(&config.MismatchZedTokenBehavior, "mismatch-zed-token-behavior", "full-consistency", "behavior when a mismatched zedtoken is encountered. One of: full-consistency (treat as a full-consistency call), min-latency (treat as a min-latency call), error (return an error). defaults to full-consistency for safety.") cmd.Flags().BoolVar(&config.V1SchemaAdditiveOnly, "testing-only-schema-additive-writes", false, "append new definitions to the existing schema, rather than overwriting it") if err := cmd.Flags().MarkHidden("testing-only-schema-additive-writes"); err != nil { diff --git a/pkg/cmd/server/defaults.go b/pkg/cmd/server/defaults.go index 1212fcd84e..b1828ede91 100644 --- a/pkg/cmd/server/defaults.go +++ b/pkg/cmd/server/defaults.go @@ -149,13 +149,14 @@ const ( ) type MiddlewareOption struct { - logger zerolog.Logger - authFunc grpcauth.AuthFunc - enableVersionResponse bool - dispatcher dispatch.Dispatcher - ds datastore.Datastore - enableRequestLog bool - enableResponseLog bool + logger zerolog.Logger + authFunc grpcauth.AuthFunc + enableVersionResponse bool + dispatcher dispatch.Dispatcher + ds datastore.Datastore + enableRequestLog bool + enableResponseLog bool + mismatchingZedTokenOption consistencymw.MismatchingTokenOption } // GRPCMetricsUnaryInterceptor creates the default prometheus metrics interceptor for unary gRPCs @@ -223,7 +224,7 @@ func DefaultUnaryMiddleware(opts MiddlewareOption) (*MiddlewareChain[grpc.UnaryS NewUnaryMiddleware(). WithName(DefaultInternalMiddlewareConsistency). WithInternal(true). - WithInterceptor(consistencymw.UnaryServerInterceptor()). + WithInterceptor(consistencymw.UnaryServerInterceptor(opts.mismatchingZedTokenOption)). Done(), NewUnaryMiddleware(). @@ -290,7 +291,7 @@ func DefaultStreamingMiddleware(opts MiddlewareOption) (*MiddlewareChain[grpc.St NewStreamMiddleware(). WithName(DefaultInternalMiddlewareConsistency). WithInternal(true). - WithInterceptor(consistencymw.StreamServerInterceptor()). + WithInterceptor(consistencymw.StreamServerInterceptor(opts.mismatchingZedTokenOption)). Done(), NewStreamMiddleware(). diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index a3a90e45b1..fad770a303 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -33,6 +33,7 @@ import ( "github.com/authzed/spicedb/internal/dispatch/graph" "github.com/authzed/spicedb/internal/gateway" log "github.com/authzed/spicedb/internal/logging" + "github.com/authzed/spicedb/internal/middleware/consistency" "github.com/authzed/spicedb/internal/services" dispatchSvc "github.com/authzed/spicedb/internal/services/dispatch" "github.com/authzed/spicedb/internal/services/health" @@ -110,6 +111,7 @@ type Config struct { MaxDatastoreReadPageSize uint64 `debugmap:"visible"` StreamingAPITimeout time.Duration `debugmap:"visible"` WatchHeartbeat time.Duration `debugmap:"visible"` + MismatchZedTokenBehavior string `debugmap:"visible"` // Additional Services MetricsAPI util.HTTPServerConfig `debugmap:"visible"` @@ -352,6 +354,24 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { watchServiceOption = services.WatchServiceDisabled } + var mismatchZedTokenOption consistency.MismatchingTokenOption + switch c.MismatchZedTokenBehavior { + case "": + fallthrough + + case "full-consistency": + mismatchZedTokenOption = consistency.TreatMismatchingTokensAsFullConsistency + + case "min-latency": + mismatchZedTokenOption = consistency.TreatMismatchingTokensAsMinLatency + + case "error": + mismatchZedTokenOption = consistency.TreatMismatchingTokensAsError + + default: + return nil, fmt.Errorf("unknown mismatched zedtoken behavior: %s", c.MismatchZedTokenBehavior) + } + opts := MiddlewareOption{ log.Logger, c.GRPCAuthFunc, @@ -360,6 +380,7 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { ds, c.EnableRequestLogs, c.EnableResponseLogs, + mismatchZedTokenOption, } defaultUnaryMiddlewareChain, err := DefaultUnaryMiddleware(opts) if err != nil { diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index f87f3f3847..2e30207bfe 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -9,6 +9,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/memdb" "github.com/authzed/spicedb/internal/logging" + "github.com/authzed/spicedb/internal/middleware/consistency" "github.com/authzed/spicedb/pkg/cmd/datastore" "github.com/authzed/spicedb/pkg/cmd/util" @@ -230,7 +231,7 @@ func TestModifyUnaryMiddleware(t *testing.T) { }, }} - opt := MiddlewareOption{logging.Logger, nil, false, nil, nil, false, false} + opt := MiddlewareOption{logging.Logger, nil, false, nil, nil, false, false, consistency.TreatMismatchingTokensAsFullConsistency} defaultMw, err := DefaultUnaryMiddleware(opt) require.NoError(t, err) @@ -256,7 +257,7 @@ func TestModifyStreamingMiddleware(t *testing.T) { }, }} - opt := MiddlewareOption{logging.Logger, nil, false, nil, nil, false, false} + opt := MiddlewareOption{logging.Logger, nil, false, nil, nil, false, false, consistency.TreatMismatchingTokensAsFullConsistency} defaultMw, err := DefaultStreamingMiddleware(opt) require.NoError(t, err) diff --git a/pkg/cmd/server/zz_generated.options.go b/pkg/cmd/server/zz_generated.options.go index 95b0d9f439..96cfd39684 100644 --- a/pkg/cmd/server/zz_generated.options.go +++ b/pkg/cmd/server/zz_generated.options.go @@ -81,6 +81,7 @@ func (c *Config) ToOption() ConfigOption { to.MaxDatastoreReadPageSize = c.MaxDatastoreReadPageSize to.StreamingAPITimeout = c.StreamingAPITimeout to.WatchHeartbeat = c.WatchHeartbeat + to.MismatchZedTokenBehavior = c.MismatchZedTokenBehavior to.MetricsAPI = c.MetricsAPI to.UnaryMiddlewareModification = c.UnaryMiddlewareModification to.StreamingMiddlewareModification = c.StreamingMiddlewareModification @@ -141,6 +142,7 @@ func (c Config) DebugMap() map[string]any { debugMap["MaxDatastoreReadPageSize"] = helpers.DebugValue(c.MaxDatastoreReadPageSize, false) debugMap["StreamingAPITimeout"] = helpers.DebugValue(c.StreamingAPITimeout, false) debugMap["WatchHeartbeat"] = helpers.DebugValue(c.WatchHeartbeat, false) + debugMap["MismatchZedTokenBehavior"] = helpers.DebugValue(c.MismatchZedTokenBehavior, false) debugMap["MetricsAPI"] = helpers.DebugValue(c.MetricsAPI, false) debugMap["SilentlyDisableTelemetry"] = helpers.DebugValue(c.SilentlyDisableTelemetry, false) debugMap["TelemetryCAOverridePath"] = helpers.DebugValue(c.TelemetryCAOverridePath, false) @@ -496,6 +498,13 @@ func WithWatchHeartbeat(watchHeartbeat time.Duration) ConfigOption { } } +// WithMismatchZedTokenBehavior returns an option that can set MismatchZedTokenBehavior on a Config +func WithMismatchZedTokenBehavior(mismatchZedTokenBehavior string) ConfigOption { + return func(c *Config) { + c.MismatchZedTokenBehavior = mismatchZedTokenBehavior + } +} + // WithMetricsAPI returns an option that can set MetricsAPI on a Config func WithMetricsAPI(metricsAPI util.HTTPServerConfig) ConfigOption { return func(c *Config) { diff --git a/pkg/cmd/testserver/testserver.go b/pkg/cmd/testserver/testserver.go index 47b3f22877..c90a87b263 100644 --- a/pkg/cmd/testserver/testserver.go +++ b/pkg/cmd/testserver/testserver.go @@ -78,13 +78,13 @@ func (c *Config) Complete() (RunnableTestServer, error) { grpc.ChainUnaryInterceptor( datastoreMiddleware.UnaryServerInterceptor(), dispatchmw.UnaryServerInterceptor(dispatcher), - consistencymw.UnaryServerInterceptor(), + consistencymw.UnaryServerInterceptor(consistencymw.TreatMismatchingTokensAsError), servicespecific.UnaryServerInterceptor, ), grpc.ChainStreamInterceptor( datastoreMiddleware.StreamServerInterceptor(), dispatchmw.StreamServerInterceptor(dispatcher), - consistencymw.StreamServerInterceptor(), + consistencymw.StreamServerInterceptor(consistencymw.TreatMismatchingTokensAsError), servicespecific.StreamServerInterceptor, ), ) @@ -97,14 +97,14 @@ func (c *Config) Complete() (RunnableTestServer, error) { datastoreMiddleware.UnaryServerInterceptor(), readonly.UnaryServerInterceptor(), dispatchmw.UnaryServerInterceptor(dispatcher), - consistencymw.UnaryServerInterceptor(), + consistencymw.UnaryServerInterceptor(consistencymw.TreatMismatchingTokensAsError), servicespecific.UnaryServerInterceptor, ), grpc.ChainStreamInterceptor( datastoreMiddleware.StreamServerInterceptor(), readonly.StreamServerInterceptor(), dispatchmw.StreamServerInterceptor(dispatcher), - consistencymw.StreamServerInterceptor(), + consistencymw.StreamServerInterceptor(consistencymw.TreatMismatchingTokensAsError), servicespecific.StreamServerInterceptor, ), ) diff --git a/pkg/cursor/cursor.go b/pkg/cursor/cursor.go index 38df2353ee..cc81b31be9 100644 --- a/pkg/cursor/cursor.go +++ b/pkg/cursor/cursor.go @@ -1,6 +1,7 @@ package cursor import ( + "context" "encoding/base64" "errors" "fmt" @@ -11,6 +12,7 @@ import ( dispatch "github.com/authzed/spicedb/pkg/proto/dispatch/v1" impl "github.com/authzed/spicedb/pkg/proto/impl/v1" "github.com/authzed/spicedb/pkg/spiceerrors" + "github.com/authzed/spicedb/pkg/zedtoken" ) // Encode converts a decoded cursor to its opaque version. @@ -92,25 +94,39 @@ func DecodeToDispatchCursor(encoded *v1.Cursor, callAndParameterHash string) (*d // DecodeToDispatchRevision decodes an encoded API cursor into an internal dispatch revision. // NOTE: this method does *not* verify the caller's method signature. -func DecodeToDispatchRevision(encoded *v1.Cursor, ds revisionDecoder) (datastore.Revision, error) { +func DecodeToDispatchRevision(ctx context.Context, encoded *v1.Cursor, ds revisionDecoder) (datastore.Revision, zedtoken.TokenStatus, error) { decoded, err := Decode(encoded) if err != nil { - return nil, err + return nil, zedtoken.StatusUnknown, err } v1decoded := decoded.GetV1() if v1decoded == nil { - return nil, ErrNilCursor + return nil, zedtoken.StatusUnknown, ErrNilCursor + } + + datastoreUniqueID, err := ds.UniqueID(ctx) + if err != nil { + return nil, zedtoken.StatusUnknown, fmt.Errorf(errEncodeError, err) } parsed, err := ds.RevisionFromString(v1decoded.Revision) if err != nil { - return datastore.NoRevision, fmt.Errorf(errDecodeError, err) + return datastore.NoRevision, zedtoken.StatusUnknown, fmt.Errorf(errDecodeError, err) + } + + if v1decoded.DatastoreUniqueId == "" { + return parsed, zedtoken.StatusLegacyEmptyDatastoreID, nil + } + + if v1decoded.DatastoreUniqueId != datastoreUniqueID { + return parsed, zedtoken.StatusMismatchedDatastoreID, nil } - return parsed, nil + return parsed, zedtoken.StatusValid, nil } type revisionDecoder interface { + UniqueID(_ context.Context) (string, error) RevisionFromString(string) (datastore.Revision, error) } diff --git a/pkg/cursor/cursor_test.go b/pkg/cursor/cursor_test.go index 9013fb51df..1c66863024 100644 --- a/pkg/cursor/cursor_test.go +++ b/pkg/cursor/cursor_test.go @@ -1,6 +1,7 @@ package cursor import ( + "context" "fmt" "testing" @@ -58,7 +59,7 @@ func TestEncodeDecode(t *testing.T) { require.Equal(tc.sections, decoded.Sections) - decodedRev, err := DecodeToDispatchRevision(encoded, revisions.CommonDecoder{ + decodedRev, _, err := DecodeToDispatchRevision(context.Background(), encoded, revisions.CommonDecoder{ Kind: revisions.TransactionID, }) require.NoError(err) @@ -136,7 +137,7 @@ func TestDecode(t *testing.T) { require.NotNil(decoded) require.Equal(testCase.expectedSections, decoded.Sections) - decodedRev, err := DecodeToDispatchRevision(&v1.Cursor{ + decodedRev, _, err := DecodeToDispatchRevision(context.Background(), &v1.Cursor{ Token: testCase.token, }, revisions.CommonDecoder{ Kind: revisions.TransactionID, diff --git a/pkg/development/devcontext.go b/pkg/development/devcontext.go index 4a93fa1445..c7ff66ed5c 100644 --- a/pkg/development/devcontext.go +++ b/pkg/development/devcontext.go @@ -124,11 +124,11 @@ func (dc *DevContext) RunV1InMemoryService() (*grpc.ClientConn, func(), error) { s := grpc.NewServer( grpc.ChainUnaryInterceptor( datastoremw.UnaryServerInterceptor(dc.Datastore), - consistency.UnaryServerInterceptor(), + consistency.UnaryServerInterceptor(consistency.TreatMismatchingTokensAsError), ), grpc.ChainStreamInterceptor( datastoremw.StreamServerInterceptor(dc.Datastore), - consistency.StreamServerInterceptor(), + consistency.StreamServerInterceptor(consistency.TreatMismatchingTokensAsError), ), ) ps := v1svc.NewPermissionsServer(dc.Dispatcher, v1svc.PermissionsServerConfig{ diff --git a/pkg/proto/impl/v1/impl.pb.go b/pkg/proto/impl/v1/impl.pb.go index f802925911..44e9ef927a 100644 --- a/pkg/proto/impl/v1/impl.pb.go +++ b/pkg/proto/impl/v1/impl.pb.go @@ -400,6 +400,9 @@ type V1Cursor struct { CallAndParametersHash string `protobuf:"bytes,3,opt,name=call_and_parameters_hash,json=callAndParametersHash,proto3" json:"call_and_parameters_hash,omitempty"` // dispatch_version is the version of the dispatcher which created the cursor. DispatchVersion uint32 `protobuf:"varint,4,opt,name=dispatch_version,json=dispatchVersion,proto3" json:"dispatch_version,omitempty"` + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // cursors. + DatastoreUniqueId string `protobuf:"bytes,5,opt,name=datastore_unique_id,json=datastoreUniqueId,proto3" json:"datastore_unique_id,omitempty"` } func (x *V1Cursor) Reset() { @@ -462,6 +465,13 @@ func (x *V1Cursor) GetDispatchVersion() uint32 { return 0 } +func (x *V1Cursor) GetDatastoreUniqueId() string { + if x != nil { + return x.DatastoreUniqueId + } + return "" +} + type DocComment struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -805,6 +815,9 @@ type DecodedZedToken_V1ZedToken struct { unknownFields protoimpl.UnknownFields Revision string `protobuf:"bytes,1,opt,name=revision,proto3" json:"revision,omitempty"` + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // tokens. + DatastoreUniqueId string `protobuf:"bytes,2,opt,name=datastore_unique_id,json=datastoreUniqueId,proto3" json:"datastore_unique_id,omitempty"` } func (x *DecodedZedToken_V1ZedToken) Reset() { @@ -846,6 +859,13 @@ func (x *DecodedZedToken_V1ZedToken) GetRevision() string { return "" } +func (x *DecodedZedToken_V1ZedToken) GetDatastoreUniqueId() string { + if x != nil { + return x.DatastoreUniqueId + } + return "" +} + var File_impl_v1_impl_proto protoreflect.FileDescriptor var file_impl_v1_impl_proto_rawDesc = []byte{ @@ -875,7 +895,7 @@ var file_impl_v1_impl_proto_rawDesc = []byte{ 0x08, 0x56, 0x32, 0x5a, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x5f, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x22, 0x82, 0x02, 0x0a, 0x0f, 0x44, 0x65, 0x63, 0x6f, 0x64, + 0x5f, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x22, 0xb2, 0x02, 0x0a, 0x0f, 0x44, 0x65, 0x63, 0x6f, 0x64, 0x65, 0x64, 0x5a, 0x65, 0x64, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x55, 0x0a, 0x14, 0x64, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x76, 0x31, 0x5f, 0x7a, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x21, 0x2e, 0x69, 0x6d, 0x70, 0x6c, 0x2e, @@ -888,15 +908,18 @@ var file_impl_v1_impl_proto_rawDesc = []byte{ 0x65, 0x6e, 0x48, 0x00, 0x52, 0x02, 0x76, 0x31, 0x1a, 0x26, 0x0a, 0x08, 0x56, 0x31, 0x5a, 0x6f, 0x6f, 0x6b, 0x69, 0x65, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, - 0x1a, 0x28, 0x0a, 0x0a, 0x56, 0x31, 0x5a, 0x65, 0x64, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x1a, + 0x1a, 0x58, 0x0a, 0x0a, 0x56, 0x31, 0x5a, 0x65, 0x64, 0x54, 0x6f, 0x6b, 0x65, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, + 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2e, 0x0a, 0x13, 0x64, 0x61, + 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x5f, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, 0x69, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, 0x6f, + 0x72, 0x65, 0x55, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x49, 0x64, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x6f, 0x6e, 0x65, 0x6f, 0x66, 0x22, 0x45, 0x0a, 0x0d, 0x44, 0x65, 0x63, 0x6f, 0x64, 0x65, 0x64, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x23, 0x0a, 0x02, 0x76, 0x31, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x11, 0x2e, 0x69, 0x6d, 0x70, 0x6c, 0x2e, 0x76, 0x31, 0x2e, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x48, 0x00, 0x52, 0x02, 0x76, 0x31, 0x42, 0x0f, 0x0a, 0x0d, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x5f, 0x6f, 0x6e, 0x65, - 0x6f, 0x66, 0x22, 0xa6, 0x01, 0x0a, 0x08, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, + 0x6f, 0x66, 0x22, 0xd6, 0x01, 0x0a, 0x08, 0x56, 0x31, 0x43, 0x75, 0x72, 0x73, 0x6f, 0x72, 0x12, 0x1a, 0x0a, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x72, 0x65, 0x76, 0x69, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x08, 0x73, @@ -906,7 +929,10 @@ var file_impl_v1_impl_proto_rawDesc = []byte{ 0x6e, 0x64, 0x50, 0x61, 0x72, 0x61, 0x6d, 0x65, 0x74, 0x65, 0x72, 0x73, 0x48, 0x61, 0x73, 0x68, 0x12, 0x29, 0x0a, 0x10, 0x64, 0x69, 0x73, 0x70, 0x61, 0x74, 0x63, 0x68, 0x5f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0f, 0x64, 0x69, 0x73, 0x70, - 0x61, 0x74, 0x63, 0x68, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x26, 0x0a, 0x0a, 0x44, + 0x61, 0x74, 0x63, 0x68, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x2e, 0x0a, 0x13, 0x64, + 0x61, 0x74, 0x61, 0x73, 0x74, 0x6f, 0x72, 0x65, 0x5f, 0x75, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x5f, + 0x69, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x11, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, + 0x6f, 0x72, 0x65, 0x55, 0x6e, 0x69, 0x71, 0x75, 0x65, 0x49, 0x64, 0x22, 0x26, 0x0a, 0x0a, 0x44, 0x6f, 0x63, 0x43, 0x6f, 0x6d, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0x8e, 0x01, 0x0a, 0x10, 0x52, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x6f, 0x6e, diff --git a/pkg/proto/impl/v1/impl.pb.validate.go b/pkg/proto/impl/v1/impl.pb.validate.go index 43a753a435..d46c680943 100644 --- a/pkg/proto/impl/v1/impl.pb.validate.go +++ b/pkg/proto/impl/v1/impl.pb.validate.go @@ -733,6 +733,8 @@ func (m *V1Cursor) validate(all bool) error { // no validation rules for DispatchVersion + // no validation rules for DatastoreUniqueId + if len(errors) > 0 { return V1CursorMultiError(errors) } @@ -1589,6 +1591,8 @@ func (m *DecodedZedToken_V1ZedToken) validate(all bool) error { // no validation rules for Revision + // no validation rules for DatastoreUniqueId + if len(errors) > 0 { return DecodedZedToken_V1ZedTokenMultiError(errors) } diff --git a/pkg/proto/impl/v1/impl_vtproto.pb.go b/pkg/proto/impl/v1/impl_vtproto.pb.go index b33b124c04..e39a6cddaa 100644 --- a/pkg/proto/impl/v1/impl_vtproto.pb.go +++ b/pkg/proto/impl/v1/impl_vtproto.pb.go @@ -154,6 +154,7 @@ func (m *DecodedZedToken_V1ZedToken) CloneVT() *DecodedZedToken_V1ZedToken { } r := new(DecodedZedToken_V1ZedToken) r.Revision = m.Revision + r.DatastoreUniqueId = m.DatastoreUniqueId if len(m.unknownFields) > 0 { r.unknownFields = make([]byte, len(m.unknownFields)) copy(r.unknownFields, m.unknownFields) @@ -242,6 +243,7 @@ func (m *V1Cursor) CloneVT() *V1Cursor { r.Revision = m.Revision r.CallAndParametersHash = m.CallAndParametersHash r.DispatchVersion = m.DispatchVersion + r.DatastoreUniqueId = m.DatastoreUniqueId if rhs := m.Sections; rhs != nil { tmpContainer := make([]string, len(rhs)) copy(tmpContainer, rhs) @@ -542,6 +544,9 @@ func (this *DecodedZedToken_V1ZedToken) EqualVT(that *DecodedZedToken_V1ZedToken if this.Revision != that.Revision { return false } + if this.DatastoreUniqueId != that.DatastoreUniqueId { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -707,6 +712,9 @@ func (this *V1Cursor) EqualVT(that *V1Cursor) bool { if this.DispatchVersion != that.DispatchVersion { return false } + if this.DatastoreUniqueId != that.DatastoreUniqueId { + return false + } return string(this.unknownFields) == string(that.unknownFields) } @@ -1121,6 +1129,13 @@ func (m *DecodedZedToken_V1ZedToken) MarshalToSizedBufferVT(dAtA []byte) (int, e i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.DatastoreUniqueId) > 0 { + i -= len(m.DatastoreUniqueId) + copy(dAtA[i:], m.DatastoreUniqueId) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.DatastoreUniqueId))) + i-- + dAtA[i] = 0x12 + } if len(m.Revision) > 0 { i -= len(m.Revision) copy(dAtA[i:], m.Revision) @@ -1302,6 +1317,13 @@ func (m *V1Cursor) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if len(m.DatastoreUniqueId) > 0 { + i -= len(m.DatastoreUniqueId) + copy(dAtA[i:], m.DatastoreUniqueId) + i = protohelpers.EncodeVarint(dAtA, i, uint64(len(m.DatastoreUniqueId))) + i-- + dAtA[i] = 0x2a + } if m.DispatchVersion != 0 { i = protohelpers.EncodeVarint(dAtA, i, uint64(m.DispatchVersion)) i-- @@ -1628,6 +1650,10 @@ func (m *DecodedZedToken_V1ZedToken) SizeVT() (n int) { if l > 0 { n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) } + l = len(m.DatastoreUniqueId) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } n += len(m.unknownFields) return n } @@ -1717,6 +1743,10 @@ func (m *V1Cursor) SizeVT() (n int) { if m.DispatchVersion != 0 { n += 1 + protohelpers.SizeOfVarint(uint64(m.DispatchVersion)) } + l = len(m.DatastoreUniqueId) + if l > 0 { + n += 1 + l + protohelpers.SizeOfVarint(uint64(l)) + } n += len(m.unknownFields) return n } @@ -2358,6 +2388,38 @@ func (m *DecodedZedToken_V1ZedToken) UnmarshalVT(dAtA []byte) error { } m.Revision = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DatastoreUniqueId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DatastoreUniqueId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) @@ -2749,6 +2811,38 @@ func (m *V1Cursor) UnmarshalVT(dAtA []byte) error { break } } + case 5: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field DatastoreUniqueId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return protohelpers.ErrInvalidLength + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return protohelpers.ErrInvalidLength + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.DatastoreUniqueId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/pkg/zedtoken/zedtoken.go b/pkg/zedtoken/zedtoken.go index 797b3e184b..f655b74565 100644 --- a/pkg/zedtoken/zedtoken.go +++ b/pkg/zedtoken/zedtoken.go @@ -2,6 +2,7 @@ package zedtoken import ( + "context" "encoding/base64" "errors" "fmt" @@ -22,9 +23,32 @@ const ( // zedtoken argument to Decode var ErrNilZedToken = errors.New("zedtoken pointer was nil") -// MustNewFromRevision generates an encoded zedtoken from an integral revision. -func MustNewFromRevision(revision datastore.Revision) *v1.ZedToken { - encoded, err := NewFromRevision(revision) +// legacyEmptyDatastoreID is the empty datastore ID for legacy tokens and cursors. +const legacyEmptyDatastoreID = "" + +// TokenStatus is the status of a zedtoken. +type TokenStatus int + +const ( + // StatusUnknown indicates that the status of the zedtoken is unknown. + StatusUnknown TokenStatus = iota + + // StatusLegacyEmptyDatastoreID indicates that the zedtoken is a legacy token + // with an empty datastore ID. + StatusLegacyEmptyDatastoreID + + // StatusValid indicates that the zedtoken is valid. + StatusValid + + // StatusMismatchedDatastoreID indicates that the zedtoken is valid, but the + // datastore ID does not match the current datastore, indicating that the + // token was generated by a different datastore. + StatusMismatchedDatastoreID +) + +// MustNewFromRevisionForTesting generates an encoded zedtoken from an integral revision. +func MustNewFromRevisionForTesting(revision datastore.Revision) *v1.ZedToken { + encoded, err := newFromRevision(revision, legacyEmptyDatastoreID) if err != nil { panic(err) } @@ -32,11 +56,21 @@ func MustNewFromRevision(revision datastore.Revision) *v1.ZedToken { } // NewFromRevision generates an encoded zedtoken from an integral revision. -func NewFromRevision(revision datastore.Revision) (*v1.ZedToken, error) { +func NewFromRevision(ctx context.Context, revision datastore.Revision, ds datastore.Datastore) (*v1.ZedToken, error) { + datastoreUniqueID, err := ds.UniqueID(ctx) + if err != nil { + return nil, fmt.Errorf(errEncodeError, err) + } + + return newFromRevision(revision, datastoreUniqueID) +} + +func newFromRevision(revision datastore.Revision, datastoreUniqueID string) (*v1.ZedToken, error) { toEncode := &zedtoken.DecodedZedToken{ VersionOneof: &zedtoken.DecodedZedToken_V1{ V1: &zedtoken.DecodedZedToken_V1ZedToken{ - Revision: revision.String(), + Revision: revision.String(), + DatastoreUniqueId: datastoreUniqueID, }, }, } @@ -77,10 +111,10 @@ func Decode(encoded *v1.ZedToken) (*zedtoken.DecodedZedToken, error) { } // DecodeRevision converts and extracts the revision from a zedtoken or legacy zookie. -func DecodeRevision(encoded *v1.ZedToken, ds revisionDecoder) (datastore.Revision, error) { +func DecodeRevision(encoded *v1.ZedToken, ds revisionDecoder) (datastore.Revision, TokenStatus, error) { decoded, err := Decode(encoded) if err != nil { - return datastore.NoRevision, err + return datastore.NoRevision, StatusUnknown, err } switch ver := decoded.VersionOneof.(type) { @@ -88,21 +122,36 @@ func DecodeRevision(encoded *v1.ZedToken, ds revisionDecoder) (datastore.Revisio revString := fmt.Sprintf("%d", ver.DeprecatedV1Zookie.Revision) parsed, err := ds.RevisionFromString(revString) if err != nil { - return datastore.NoRevision, fmt.Errorf(errDecodeError, err) + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, err) } - return parsed, nil + return parsed, StatusLegacyEmptyDatastoreID, nil case *zedtoken.DecodedZedToken_V1: parsed, err := ds.RevisionFromString(ver.V1.Revision) if err != nil { - return datastore.NoRevision, fmt.Errorf(errDecodeError, err) + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, err) } - return parsed, nil + + if ver.V1.DatastoreUniqueId == legacyEmptyDatastoreID { + return parsed, StatusLegacyEmptyDatastoreID, nil + } + + datastoreUniqueID, err := ds.UniqueID(context.Background()) + if err != nil { + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, err) + } + + if ver.V1.DatastoreUniqueId != datastoreUniqueID { + return parsed, StatusMismatchedDatastoreID, nil + } + + return parsed, StatusValid, nil default: - return datastore.NoRevision, fmt.Errorf(errDecodeError, fmt.Errorf("unknown zookie version: %T", decoded.VersionOneof)) + return datastore.NoRevision, StatusUnknown, fmt.Errorf(errDecodeError, fmt.Errorf("unknown zookie version: %T", decoded.VersionOneof)) } } type revisionDecoder interface { + UniqueID(context.Context) (string, error) RevisionFromString(string) (datastore.Revision, error) } diff --git a/pkg/zedtoken/zedtoken_test.go b/pkg/zedtoken/zedtoken_test.go index 09bbc4d48e..85df77aa4d 100644 --- a/pkg/zedtoken/zedtoken_test.go +++ b/pkg/zedtoken/zedtoken_test.go @@ -41,10 +41,9 @@ func TestZedTokenEncode(t *testing.T) { rev := rev t.Run(rev.String(), func(t *testing.T) { require := require.New(t) - encoded, err := NewFromRevision(rev) - require.NoError(err) + encoded := MustNewFromRevisionForTesting(rev) - decoded, err := DecodeRevision(encoded, revisions.CommonDecoder{ + decoded, _, err := DecodeRevision(encoded, revisions.CommonDecoder{ Kind: revisions.TransactionID, }) require.NoError(err) @@ -58,10 +57,9 @@ func TestZedTokenEncodeHLC(t *testing.T) { rev := rev t.Run(rev.String(), func(t *testing.T) { require := require.New(t) - encoded, err := NewFromRevision(rev) - require.NoError(err) + encoded := MustNewFromRevisionForTesting(rev) - decoded, err := DecodeRevision(encoded, revisions.CommonDecoder{ + decoded, _, err := DecodeRevision(encoded, revisions.CommonDecoder{ Kind: revisions.HybridLogicalClock, }) require.NoError(err) @@ -71,65 +69,92 @@ func TestZedTokenEncodeHLC(t *testing.T) { } var decodeTests = []struct { - format string - token string - expectedRevision datastore.Revision - expectError bool + format string + token string + datastoreUniqueID string + expectedRevision datastore.Revision + expectedStatus TokenStatus + expectError bool }{ { format: "invalid", token: "abc", expectedRevision: datastore.NoRevision, + expectedStatus: StatusUnknown, expectError: true, }, { format: "V1 Zookie", token: "CAESAA==", expectedRevision: revisions.NewForTransactionID(0), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAESAggB", expectedRevision: revisions.NewForTransactionID(1), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAESAggC", expectedRevision: revisions.NewForTransactionID(2), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAESAwiAAg==", expectedRevision: revisions.NewForTransactionID(256), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 Zookie", token: "CAIaAwoBMA==", expectedRevision: revisions.NewForTransactionID(0), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBMQ==", expectedRevision: revisions.NewForTransactionID(1), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBMg==", expectedRevision: revisions.NewForTransactionID(2), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, { format: "V1 ZedToken", token: "CAIaAwoBNA==", expectedRevision: revisions.NewForTransactionID(4), + expectedStatus: StatusLegacyEmptyDatastoreID, expectError: false, }, + { + format: "V1 ZedToken with matching datastore unique ID", + token: "GhIKAjQyEgxzb21ldW5pcXVlaWQ=", + datastoreUniqueID: "someuniqueid", + expectedRevision: revisions.NewForTransactionID(42), + expectedStatus: StatusValid, + expectError: false, + }, + { + format: "V1 ZedToken with mismatched datastore unique ID", + token: "GhIKAjQyEgxzb21ldW5pcXVlaWQ=", + datastoreUniqueID: "anotheruniqueid", + expectedRevision: revisions.NewForTransactionID(42), + expectedStatus: StatusMismatchedDatastoreID, + expectError: false, + }, } func TestDecode(t *testing.T) { @@ -139,15 +164,17 @@ func TestDecode(t *testing.T) { t.Run(testName, func(t *testing.T) { require := require.New(t) - decoded, err := DecodeRevision(&v1.ZedToken{ + decoded, status, err := DecodeRevision(&v1.ZedToken{ Token: testCase.token, }, revisions.CommonDecoder{ - Kind: revisions.TransactionID, + DatastoreUniqueID: testCase.datastoreUniqueID, + Kind: revisions.TransactionID, }) if testCase.expectError { require.Error(err) } else { require.NoError(err) + require.Equal(testCase.expectedStatus, status) require.True( testCase.expectedRevision.Equal(decoded), "%s != %s", @@ -160,14 +187,17 @@ func TestDecode(t *testing.T) { } var hlcDecodeTests = []struct { - format string - token string - expectedRevision datastore.Revision - expectError bool + format string + token string + datastoreUniqueID string + expectedRevision datastore.Revision + expectedStatus TokenStatus + expectError bool }{ { - format: "V1 ZedToken", - token: "CAIaFQoTMTYyMTUzODE4OTAyODkyODAwMA==", + format: "V1 ZedToken", + token: "CAIaFQoTMTYyMTUzODE4OTAyODkyODAwMA==", + expectedStatus: StatusLegacyEmptyDatastoreID, expectedRevision: func() datastore.Revision { r, err := revisions.NewForHLC(decimal.NewFromInt(1621538189028928000)) if err != nil { @@ -175,11 +205,47 @@ var hlcDecodeTests = []struct { } return r }(), + }, + { + format: "V1 ZedToken", + token: "GiAKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMQ==", + expectedStatus: StatusLegacyEmptyDatastoreID, + expectedRevision: (func() datastore.Revision { + v, err := decimal.NewFromString("1693540940373045727.0000000001") + if err != nil { + panic(err) + } + r, err := revisions.NewForHLC(v) + if err != nil { + panic(err) + } + return r + })(), expectError: false, }, { - format: "V1 ZedToken", - token: "GiAKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMQ==", + format: "V1 ZedToken with matching datastore unique ID", + token: "GkYKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMRIkNjM0OWFhZjItMzdjZC00N2I5LTg0ZTgtZmU1ZmE2ZTJkZWFk", + datastoreUniqueID: "6349aaf2-37cd-47b9-84e8-fe5fa6e2dead", + expectedStatus: StatusValid, + expectedRevision: (func() datastore.Revision { + v, err := decimal.NewFromString("1693540940373045727.0000000001") + if err != nil { + panic(err) + } + r, err := revisions.NewForHLC(v) + if err != nil { + panic(err) + } + return r + })(), + expectError: false, + }, + { + format: "V1 ZedToken with mismatched datastore unique ID", + token: "GkYKHjE2OTM1NDA5NDAzNzMwNDU3MjcuMDAwMDAwMDAwMRIkNjM0OWFhZjItMzdjZC00N2I5LTg0ZTgtZmU1ZmE2ZTJkZWFk", + datastoreUniqueID: "arrrg-6349aaf2-37cd-47b9-84e8-fe5fa6e2dead", + expectedStatus: StatusMismatchedDatastoreID, expectedRevision: (func() datastore.Revision { v, err := decimal.NewFromString("1693540940373045727.0000000001") if err != nil { @@ -204,15 +270,17 @@ func TestHLCDecode(t *testing.T) { t.Run(testName, func(t *testing.T) { require := require.New(t) - decoded, err := DecodeRevision(&v1.ZedToken{ + decoded, status, err := DecodeRevision(&v1.ZedToken{ Token: testCase.token, }, revisions.CommonDecoder{ - Kind: revisions.HybridLogicalClock, + DatastoreUniqueID: testCase.datastoreUniqueID, + Kind: revisions.HybridLogicalClock, }) if testCase.expectError { require.Error(err) } else { require.NoError(err) + require.Equal(testCase.expectedStatus, status) require.True( testCase.expectedRevision.Equal(decoded), "%s != %s", diff --git a/proto/internal/impl/v1/impl.proto b/proto/internal/impl/v1/impl.proto index c6e871a1ed..0e3f1356e6 100644 --- a/proto/internal/impl/v1/impl.proto +++ b/proto/internal/impl/v1/impl.proto @@ -33,6 +33,10 @@ message DecodedZedToken { } message V1ZedToken { string revision = 1; + + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // tokens. + string datastore_unique_id = 2; } oneof version_oneof { V1Zookie deprecated_v1_zookie = 2; @@ -60,6 +64,10 @@ message V1Cursor { // dispatch_version is the version of the dispatcher which created the cursor. uint32 dispatch_version = 4; + + // datastore_unique_id is the unique ID for the datastore. Will be empty for legacy + // cursors. + string datastore_unique_id = 5; } message DocComment {