From 365521a9d9a263c226948b570ca844d2371bb3b2 Mon Sep 17 00:00:00 2001 From: arulajmani Date: Fri, 21 Jan 2022 12:58:20 -0500 Subject: [PATCH] spanconfig,protectedts: introduce the `ProtectedTSReader` interface This patch introduces the `spanconfig.ProtectedTSReader` interface. It is inteded to replace the `protectedts.Cache` as KVs integration point for querying protectedTS metadata in the v2 of the protectedTS subsystem. For now, the `protectedts.Cache` is the only implementor of it. We will have the `KVSubscriber` implement it as well once we start shipping protected timestamp information on span configurations. The `ProtectedTSReader` interface is also inteded to serve as an adapter interface between v1 and v2 of the PTS subsystem. In particular, for v21.2, we will consult both the `protectedts.Cache` and `KVSubscriber` for PTS information. The logic here will be gated behind a call to `GetProtectionTimetamps`, which is the only method this interface provides. Release note: None Release justification: low risk, high benefit changes to existing functionality. --- pkg/kv/kvserver/BUILD.bazel | 4 +- pkg/kv/kvserver/client_protectedts_test.go | 29 ++- pkg/kv/kvserver/client_replica_test.go | 15 +- pkg/kv/kvserver/client_split_test.go | 8 +- pkg/kv/kvserver/helpers_test.go | 2 +- .../kvserver/protectedts/ptcache/BUILD.bazel | 1 + pkg/kv/kvserver/protectedts/ptcache/cache.go | 17 ++ .../protectedts/ptcache/cache_test.go | 122 ++++++++-- .../protectedts/ptprovider/provider.go | 25 ++- pkg/kv/kvserver/replica.go | 4 +- .../kvserver/replica_protected_timestamp.go | 57 +++-- .../replica_protected_timestamp_test.go | 209 ++++++++++-------- pkg/kv/kvserver/store.go | 14 +- pkg/server/BUILD.bazel | 2 + pkg/server/server.go | 59 ++--- pkg/spanconfig/spanconfig.go | 28 +++ .../spanconfigptsreader/BUILD.bazel | 15 ++ pkg/spanconfig/spanconfigptsreader/adapter.go | 78 +++++++ 18 files changed, 486 insertions(+), 203 deletions(-) create mode 100644 pkg/spanconfig/spanconfigptsreader/BUILD.bazel create mode 100644 pkg/spanconfig/spanconfigptsreader/adapter.go diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index ecf5ab794434..fca39bff411c 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -132,8 +132,6 @@ go_library( "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", - "//pkg/kv/kvserver/protectedts", - "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/kv/kvserver/raftentry", "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/rditer", @@ -336,7 +334,6 @@ go_test( "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/liveness", "//pkg/kv/kvserver/liveness/livenesspb", - "//pkg/kv/kvserver/protectedts", "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/kv/kvserver/protectedts/ptstorage", "//pkg/kv/kvserver/raftentry", @@ -359,6 +356,7 @@ go_test( "//pkg/server/telemetry", "//pkg/settings/cluster", "//pkg/spanconfig", + "//pkg/spanconfig/spanconfigptsreader", "//pkg/spanconfig/spanconfigstore", "//pkg/sql", "//pkg/sql/catalog/bootstrap", diff --git a/pkg/kv/kvserver/client_protectedts_test.go b/pkg/kv/kvserver/client_protectedts_test.go index ce55646c9696..6622d7ff993a 100644 --- a/pkg/kv/kvserver/client_protectedts_test.go +++ b/pkg/kv/kvserver/client_protectedts_test.go @@ -18,10 +18,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptstorage" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" @@ -212,10 +213,10 @@ func TestProtectedTimestamps(t *testing.T) { // Verify that the record did indeed make its way down into KV where the // replica can read it from. - ptp := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider + ptsReader := tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().ProtectedTimestampReader require.NoError( t, - verifyProtectionTimestampExistsOnSpans(ctx, tc, ptp, ptsRec.Timestamp, ptsRec.DeprecatedSpans), + verifyProtectionTimestampExistsOnSpans(ctx, tc, ptsReader, ptsRec.Timestamp, ptsRec.DeprecatedSpans), ) // Make a new record that is doomed to fail. @@ -227,11 +228,12 @@ func TestProtectedTimestamps(t *testing.T) { _, err = ptsWithDB.GetRecord(ctx, nil /* txn */, failedRec.ID.GetUUID()) require.NoError(t, err) + // Verify that the record did indeed make its way down into KV where the // replica can read it from. We then verify (below) that the failed record // does not affect the ability to GC. require.NoError( t, - verifyProtectionTimestampExistsOnSpans(ctx, tc, ptp, failedRec.Timestamp, failedRec.DeprecatedSpans), + verifyProtectionTimestampExistsOnSpans(ctx, tc, ptsReader, failedRec.Timestamp, failedRec.DeprecatedSpans), ) // Add a new record that is after the old record. @@ -242,7 +244,7 @@ func TestProtectedTimestamps(t *testing.T) { require.NoError(t, ptsWithDB.Protect(ctx, nil /* txn */, &laterRec)) require.NoError( t, - verifyProtectionTimestampExistsOnSpans(ctx, tc, ptp, laterRec.Timestamp, laterRec.DeprecatedSpans), + verifyProtectionTimestampExistsOnSpans(ctx, tc, ptsReader, laterRec.Timestamp, laterRec.DeprecatedSpans), ) // Release the record that had succeeded and ensure that GC eventually @@ -271,27 +273,32 @@ func TestProtectedTimestamps(t *testing.T) { require.Equal(t, int(state.NumRecords), len(state.Records)) } +// verifyProtectionTimestampExistsOnSpans refreshes the PTS state in KV and +// ensures a protection at the given protectionTimestamp exists for all the +// supplied spans. func verifyProtectionTimestampExistsOnSpans( ctx context.Context, tc *testcluster.TestCluster, - provider protectedts.Provider, - pts hlc.Timestamp, + ptsReader spanconfig.ProtectedTSReader, + protectionTimestamp hlc.Timestamp, spans roachpb.Spans, ) error { - if err := provider.Refresh(ctx, tc.Server(0).Clock().Now()); err != nil { + if err := spanconfigptsreader.TestingRefreshPTSState( + ctx, ptsReader, tc.Server(0).Clock().Now(), + ); err != nil { return err } for _, sp := range spans { - timestamps, _ := provider.GetProtectionTimestamps(ctx, sp) + timestamps, _ := ptsReader.GetProtectionTimestamps(ctx, sp) found := false for _, ts := range timestamps { - if ts.Equal(pts) { + if ts.Equal(protectionTimestamp) { found = true break } } if !found { - return errors.Newf("protection timestamp %s does not exist on span %s", pts, sp) + return errors.Newf("protection timestamp %s does not exist on span %s", protectionTimestamp, sp) } } return nil diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 3ead2166f4be..427f011593a6 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/storage" @@ -3517,10 +3518,9 @@ func TestStrictGCEnforcement(t *testing.T) { t.Helper() testutils.SucceedsSoon(t, func() error { for i := 0; i < tc.NumServers(); i++ { - ptp := tc.Server(i).ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider - if ptp.Iterate(ctx, tableKey, tableKey, func(record *ptpb.Record) (wantMore bool) { - return false - }).Less(min) { + ptsReader := tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().ProtectedTimestampReader + _, asOf := ptsReader.GetProtectionTimestamps(ctx, tableSpan) + if asOf.Less(min) { return errors.Errorf("not yet read") } } @@ -3572,10 +3572,13 @@ func TestStrictGCEnforcement(t *testing.T) { } refreshPastLeaseStart = func(t *testing.T) { for i := 0; i < tc.NumServers(); i++ { - ptp := tc.Server(i).ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider + ptsReader := tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().ProtectedTimestampReader _, r := getFirstStoreReplica(t, tc.Server(i), tableKey) l, _ := r.GetLease() - require.NoError(t, ptp.Refresh(ctx, l.Start.ToTimestamp().Next())) + require.NoError( + t, + spanconfigptsreader.TestingRefreshPTSState(ctx, ptsReader, l.Start.ToTimestamp().Next()), + ) r.ReadProtectedTimestamps(ctx) } } diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 1847bf028ae5..2957aba56cbd 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -38,6 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/storage" @@ -2431,7 +2432,12 @@ func TestUnsplittableRange(t *testing.T) { // row. Once the first version of the row is cleaned up, the range should // exit the split queue purgatory. We need to tickle the protected timestamp // subsystem to release a timestamp at which we get to actually remove the data. - require.NoError(t, store.GetStoreConfig().ProtectedTimestampCache.Refresh(ctx, s.Clock().Now())) + require.NoError( + t, + spanconfigptsreader.TestingRefreshPTSState( + ctx, store.GetStoreConfig().ProtectedTimestampReader, s.Clock().Now(), + ), + ) repl := store.LookupReplica(tableKey) if err := store.ManualMVCCGC(repl); err != nil { t.Fatal(err) diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index a6e92183026d..916ce3afc2c8 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -488,7 +488,7 @@ func (r *Replica) ReadProtectedTimestamps(ctx context.Context) { defer r.maybeUpdateCachedProtectedTS(&ts) r.mu.RLock() defer r.mu.RUnlock() - ts = r.readProtectedTimestampsRLocked(ctx, nil /* f */) + ts = r.readProtectedTimestampsRLocked(ctx) } // ClosedTimestampPolicy returns the closed timestamp policy of the range, which diff --git a/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel b/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel index 1a46390e209c..0b20be4a049f 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel @@ -11,6 +11,7 @@ go_library( "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/roachpb", "//pkg/settings/cluster", + "//pkg/spanconfig", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/stop", diff --git a/pkg/kv/kvserver/protectedts/ptcache/cache.go b/pkg/kv/kvserver/protectedts/ptcache/cache.go index b379cdf49871..e99d012fdb1b 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/cache.go +++ b/pkg/kv/kvserver/protectedts/ptcache/cache.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -73,6 +74,7 @@ func New(config Config) *Cache { } var _ protectedts.Cache = (*Cache)(nil) +var _ spanconfig.ProtectedTSReader = (*Cache)(nil) // Iterate is part of the protectedts.Cache interface. func (c *Cache) Iterate( @@ -126,6 +128,21 @@ func (c *Cache) Refresh(ctx context.Context, asOf hlc.Timestamp) error { return nil } +// GetProtectionTimestamps is part of the spanconfig.ProtectedTSReader +// interface. +func (c *Cache) GetProtectionTimestamps( + ctx context.Context, sp roachpb.Span, +) (protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp) { + readAt := c.Iterate(ctx, + sp.Key, + sp.EndKey, + func(rec *ptpb.Record) (wantMore bool) { + protectionTimestamps = append(protectionTimestamps, rec.Timestamp) + return true + }) + return protectionTimestamps, readAt +} + // Start starts the periodic fetching of the Cache. A Cache must not be used // until after it has been started. An error will be returned if it has // already been started. diff --git a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go index cd251dd372b6..1a5e44862237 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go +++ b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go @@ -68,7 +68,7 @@ func TestCacheBasic(t *testing.T) { // Then we'll add a record and make sure it gets seen. sp := tableSpan(42) - r, createdAt := protect(t, tc.Server(0), p, sp) + r, createdAt := protect(t, tc.Server(0), p, s.Clock().Now(), sp) testutils.SucceedsSoon(t, func() error { var coveredBy []*ptpb.Record seenTS := c.Iterate(ctx, sp.Key, sp.EndKey, @@ -140,7 +140,7 @@ func TestRefresh(t *testing.T) { st.verifyCounters(t, 1, 0) // just need to scan meta }) t.Run("needs refresh, with change", func(t *testing.T) { - _, createdAt := protect(t, s, p, metaTableSpan) + _, createdAt := protect(t, s, p, s.Clock().Now(), metaTableSpan) st.resetCounters() require.NoError(t, c.Refresh(ctx, createdAt)) st.verifyCounters(t, 2, 1) // need to scan meta and then scan everything @@ -177,7 +177,7 @@ func TestRefresh(t *testing.T) { require.Regexp(t, "boom", c.Refresh(ctx, s.Clock().Now()).Error()) }) t.Run("error propagates while fetching records", func(t *testing.T) { - protect(t, s, p, metaTableSpan) + protect(t, s, p, s.Clock().Now(), metaTableSpan) st.setFilter(func(ba roachpb.BatchRequest) *roachpb.Error { if scanReq, ok := ba.GetArg(roachpb.Scan); ok { scan := scanReq.(*roachpb.ScanRequest) @@ -192,7 +192,7 @@ func TestRefresh(t *testing.T) { }) t.Run("Iterate does not hold mutex", func(t *testing.T) { inIterate := make(chan chan struct{}) - rec, createdAt := protect(t, s, p, metaTableSpan) + rec, createdAt := protect(t, s, p, s.Clock().Now(), metaTableSpan) require.NoError(t, c.Refresh(ctx, createdAt)) go c.Iterate(ctx, keys.MinKey, keys.MaxKey, func(r *ptpb.Record) (wantMore bool) { if r.ID.GetUUID() != rec.ID.GetUUID() { @@ -271,8 +271,8 @@ func TestQueryRecord(t *testing.T) { waitForAsOfAfter(t, c, hlc.Timestamp{}) // Create two records. sp42 := tableSpan(42) - r1, createdAt1 := protect(t, s, p, sp42) - r2, createdAt2 := protect(t, s, p, sp42) + r1, createdAt1 := protect(t, s, p, s.Clock().Now(), sp42) + r2, createdAt2 := protect(t, s, p, s.Clock().Now(), sp42) // Ensure they both don't exist and that the read timestamps precede the // create timestamps. exists1, asOf := c.QueryRecord(ctx, r1.ID.GetUUID()) @@ -291,7 +291,7 @@ func TestQueryRecord(t *testing.T) { require.True(t, !asOf.Less(createdAt2)) // Release 2 and then create 3. require.NoError(t, p.Release(ctx, nil /* txn */, r2.ID.GetUUID())) - r3, createdAt3 := protect(t, s, p, sp42) + r3, createdAt3 := protect(t, s, p, s.Clock().Now(), sp42) exists2, asOf = c.QueryRecord(ctx, r2.ID.GetUUID()) require.True(t, exists2) require.True(t, asOf.Less(createdAt3)) @@ -329,10 +329,10 @@ func TestIterate(t *testing.T) { sp42 := tableSpan(42) sp43 := tableSpan(43) sp44 := tableSpan(44) - r1, _ := protect(t, s, p, sp42) - r2, _ := protect(t, s, p, sp43) - r3, _ := protect(t, s, p, sp44) - r4, _ := protect(t, s, p, sp42, sp43) + r1, _ := protect(t, s, p, s.Clock().Now(), sp42) + r2, _ := protect(t, s, p, s.Clock().Now(), sp43) + r3, _ := protect(t, s, p, s.Clock().Now(), sp44) + r4, _ := protect(t, s, p, s.Clock().Now(), sp42, sp43) require.NoError(t, c.Refresh(ctx, s.Clock().Now())) t.Run("all", func(t *testing.T) { var recs records @@ -373,6 +373,99 @@ func (recs *records) sorted() []*ptpb.Record { return *recs } +func TestGetProtectionTimestamps(t *testing.T) { + ctx := context.Background() + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) + defer tc.Stopper().Stop(ctx) + // Set the poll interval to be very long. + s := tc.Server(0) + protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour) + + ts := func(nanos int) hlc.Timestamp { + return hlc.Timestamp{ + WallTime: int64(nanos), + } + } + sp42 := tableSpan(42) + sp43 := tableSpan(43) + sp44 := tableSpan(44) + sp4243 := roachpb.Span{Key: sp42.Key, EndKey: sp43.EndKey} + + for _, testCase := range []struct { + name string + test func(t *testing.T, p protectedts.Storage, c *ptcache.Cache, cleanup func(...*ptpb.Record)) + }{ + { + name: "multiple records apply to a single span", + test: func(t *testing.T, p protectedts.Storage, c *ptcache.Cache, cleanup func(...*ptpb.Record)) { + r1, _ := protect(t, s, p, ts(10), sp42) + r2, _ := protect(t, s, p, ts(11), sp42) + r3, _ := protect(t, s, p, ts(6), sp42) + require.NoError(t, c.Refresh(ctx, s.Clock().Now())) + + protectionTimestamps, _ := c.GetProtectionTimestamps(ctx, sp42) + sort.Slice(protectionTimestamps, func(i, j int) bool { + return protectionTimestamps[i].Less(protectionTimestamps[j]) + }) + require.Equal(t, []hlc.Timestamp{ts(6), ts(10), ts(11)}, protectionTimestamps) + cleanup(r1, r2, r3) + }, + }, + { + name: "no records apply", + test: func(t *testing.T, p protectedts.Storage, c *ptcache.Cache, cleanup func(...*ptpb.Record)) { + r1, _ := protect(t, s, p, ts(5), sp43) + r2, _ := protect(t, s, p, ts(10), sp44) + require.NoError(t, c.Refresh(ctx, s.Clock().Now())) + protectionTimestamps, _ := c.GetProtectionTimestamps(ctx, sp42) + require.Equal(t, []hlc.Timestamp(nil), protectionTimestamps) + cleanup(r1, r2) + }, + }, + { + name: "multiple overlapping spans multiple records", + test: func(t *testing.T, p protectedts.Storage, c *ptcache.Cache, cleanup func(...*ptpb.Record)) { + r1, _ := protect(t, s, p, ts(10), sp42) + r2, _ := protect(t, s, p, ts(15), sp42) + r3, _ := protect(t, s, p, ts(5), sp43) + r4, _ := protect(t, s, p, ts(6), sp43) + r5, _ := protect(t, s, p, ts(25), keys.EverythingSpan) + // Also add a record that doesn't overlap with the requested span and + // ensure it isn't retrieved below. + r6, _ := protect(t, s, p, ts(20), sp44) + require.NoError(t, c.Refresh(ctx, s.Clock().Now())) + + protectionTimestamps, _ := c.GetProtectionTimestamps(ctx, sp4243) + sort.Slice(protectionTimestamps, func(i, j int) bool { + return protectionTimestamps[i].Less(protectionTimestamps[j]) + }) + require.Equal( + t, []hlc.Timestamp{ts(5), ts(6), ts(10), ts(15), ts(25)}, protectionTimestamps, + ) + cleanup(r1, r2, r3, r4, r5, r6) + }, + }, + } { + t.Run(testCase.name, func(t *testing.T) { + p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), + s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB()) + + c := ptcache.New(ptcache.Config{ + Settings: s.ClusterSettings(), + DB: s.DB(), + Storage: p, + }) + require.NoError(t, c.Start(ctx, tc.Stopper())) + + testCase.test(t, p, c, func(records ...*ptpb.Record) { + for _, r := range records { + require.NoError(t, p.Release(ctx, nil, r.ID.GetUUID())) + } + }) + }) + } +} + func TestSettingChangedLeadsToFetch(t *testing.T) { ctx := context.Background() tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{}) @@ -425,9 +518,12 @@ func tableSpan(tableID uint32) roachpb.Span { } func protect( - t *testing.T, s serverutils.TestServerInterface, p protectedts.Storage, spans ...roachpb.Span, + t *testing.T, + s serverutils.TestServerInterface, + p protectedts.Storage, + protectTS hlc.Timestamp, + spans ...roachpb.Span, ) (r *ptpb.Record, createdAt hlc.Timestamp) { - protectTS := s.Clock().Now() r = &ptpb.Record{ ID: uuid.MakeV4().GetBytes(), Timestamp: protectTS, diff --git a/pkg/kv/kvserver/protectedts/ptprovider/provider.go b/pkg/kv/kvserver/protectedts/ptprovider/provider.go index c537151bf20e..1f55a2557808 100644 --- a/pkg/kv/kvserver/protectedts/ptprovider/provider.go +++ b/pkg/kv/kvserver/protectedts/ptprovider/provider.go @@ -39,7 +39,8 @@ type Config struct { Knobs *protectedts.TestingKnobs } -type provider struct { +// Provider is the concrete implementation of protectedts.Provider interface. +type Provider struct { protectedts.Storage protectedts.Verifier protectedts.Cache @@ -55,13 +56,15 @@ func New(cfg Config) (protectedts.Provider, error) { storage := ptstorage.New(cfg.Settings, cfg.InternalExecutor, cfg.Knobs) verifier := ptverifier.New(cfg.DB, storage) reconciler := ptreconcile.New(cfg.Settings, cfg.DB, storage, cfg.ReconcileStatusFuncs) - return &provider{ - Storage: storage, - Cache: ptcache.New(ptcache.Config{ - DB: cfg.DB, - Storage: storage, - Settings: cfg.Settings, - }), + cache := ptcache.New(ptcache.Config{ + DB: cfg.DB, + Storage: storage, + Settings: cfg.Settings, + }) + + return &Provider{ + Storage: storage, + Cache: cache, Verifier: verifier, Reconciler: reconciler, Struct: reconciler.Metrics(), @@ -81,13 +84,15 @@ func validateConfig(cfg Config) error { } } -func (p *provider) Start(ctx context.Context, stopper *stop.Stopper) error { +// Start implements the protectedts.Provider interface. +func (p *Provider) Start(ctx context.Context, stopper *stop.Stopper) error { if cache, ok := p.Cache.(*ptcache.Cache); ok { return cache.Start(ctx, stopper) } return nil } -func (p *provider) Metrics() metric.Struct { +// Metrics implements the protectedts.Provider interface. +func (p *Provider) Metrics() metric.Struct { return p.Struct } diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 65a593eabf67..bf67cf5926af 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -1024,8 +1024,8 @@ func (r *Replica) getImpliedGCThresholdRLocked( // If we have a protected timestamp record which precedes the implied // threshold, use the threshold it implies instead. - if c.earliestRecord != nil && c.earliestRecord.Timestamp.Less(threshold) { - return c.earliestRecord.Timestamp.Prev() + if !c.earliestProtectionTimestamp.IsEmpty() && c.earliestProtectionTimestamp.Less(threshold) { + return c.earliestProtectionTimestamp.Prev() } return threshold } diff --git a/pkg/kv/kvserver/replica_protected_timestamp.go b/pkg/kv/kvserver/replica_protected_timestamp.go index efab180fdc65..48a5eb132424 100644 --- a/pkg/kv/kvserver/replica_protected_timestamp.go +++ b/pkg/kv/kvserver/replica_protected_timestamp.go @@ -15,7 +15,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -25,8 +24,8 @@ import ( // cachedProtectedTimestampState is used to cache information about the state // of protected timestamps as they pertain to this replica. The data is // refreshed when the replica examines protected timestamps when being -// considered for gc or when verifying a protected timestamp record. -// It is consulted when determining whether a request can be served. +// considered for gc. It is consulted when determining whether a request can be +// served. type cachedProtectedTimestampState struct { // readAt denotes the timestamp at which this record was read. // It is used to coordinate updates to this field. It is also used to @@ -35,8 +34,8 @@ type cachedProtectedTimestampState struct { // that protected timestamps have not been observed. In this case we must // assume that any protected timestamp could exist to provide the contract // on verify. - readAt hlc.Timestamp - earliestRecord *ptpb.Record + readAt hlc.Timestamp + earliestProtectionTimestamp hlc.Timestamp } // clearIfNotNewer clears the state in ts if it is not newer than the passed @@ -66,31 +65,29 @@ func (r *Replica) maybeUpdateCachedProtectedTS(ts *cachedProtectedTimestampState } func (r *Replica) readProtectedTimestampsRLocked( - ctx context.Context, f func(r *ptpb.Record), + ctx context.Context, ) (ts cachedProtectedTimestampState) { desc := r.descRLocked() gcThreshold := *r.mu.state.GCThreshold - ts.readAt = r.store.protectedtsCache.Iterate(ctx, - roachpb.Key(desc.StartKey), - roachpb.Key(desc.EndKey), - func(rec *ptpb.Record) (wantMore bool) { - // Check if we've already GC'd past the timestamp this record was trying - // to protect, in which case we know that the record does not apply. - // Note that when we implement PROTECT_AT, we'll need to consult some - // replica state here to determine whether the record indeed has been - // applied. - if isValid := gcThreshold.LessEq(rec.Timestamp); !isValid { - return true - } - if f != nil { - f(rec) - } - if ts.earliestRecord == nil || rec.Timestamp.Less(ts.earliestRecord.Timestamp) { - ts.earliestRecord = rec - } - return true - }) + sp := roachpb.Span{ + Key: roachpb.Key(desc.StartKey), + EndKey: roachpb.Key(desc.EndKey), + } + var protectionTimestamps []hlc.Timestamp + protectionTimestamps, ts.readAt = r.store.protectedtsReader.GetProtectionTimestamps(ctx, sp) + earliestTS := hlc.Timestamp{} + for _, protectionTimestamp := range protectionTimestamps { + // Check if the timestamp the record was trying to protect is strictly + // below the GCThreshold, in which case, we know the record does not apply. + if isValid := gcThreshold.LessEq(protectionTimestamp); !isValid { + continue + } + if earliestTS.IsEmpty() || protectionTimestamp.Less(earliestTS) { + earliestTS = protectionTimestamp + } + } + ts.earliestProtectionTimestamp = earliestTS return ts } @@ -126,12 +123,12 @@ func (r *Replica) checkProtectedTimestampsForGC( // read.earliestRecord is the record with the earliest timestamp which is // greater than the existing gcThreshold. - read = r.readProtectedTimestampsRLocked(ctx, nil) + read = r.readProtectedTimestampsRLocked(ctx) gcTimestamp = read.readAt - if read.earliestRecord != nil { + if !read.earliestProtectionTimestamp.IsEmpty() { // NB: we want to allow GC up to the timestamp preceding the earliest valid - // record. - impliedGCTimestamp := gc.TimestampForThreshold(read.earliestRecord.Timestamp.Prev(), gcTTL) + // protection timestamp. + impliedGCTimestamp := gc.TimestampForThreshold(read.earliestProtectionTimestamp.Prev(), gcTTL) if impliedGCTimestamp.Less(gcTimestamp) { gcTimestamp = impliedGCTimestamp } diff --git a/pkg/kv/kvserver/replica_protected_timestamp_test.go b/pkg/kv/kvserver/replica_protected_timestamp_test.go index a70be6270156..d47d29851cf0 100644 --- a/pkg/kv/kvserver/replica_protected_timestamp_test.go +++ b/pkg/kv/kvserver/replica_protected_timestamp_test.go @@ -12,26 +12,23 @@ package kvserver import ( "context" - "fmt" + "math/rand" "testing" "time" "github.com/cockroachdb/cockroach/pkg/keys" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" ) // TestCheckProtectedTimestampsForGC exercises -// Replica.checkProtectedTimestampsForGC() at a low level. -// It does so by passing a Replica connected to an already -// shut down store to a variety of test cases. +// Replica.checkProtectedTimestampsForGC() at a low level. It does so by passing +// a Replica connected to an already shut down store to a variety of test cases. func TestCheckProtectedTimestampsForGC(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -44,11 +41,11 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) { name string // Note that the store underneath the passed in Replica has been stopped. // This leaves the test to mutate the Replica state as it sees fit. - test func(t *testing.T, r *Replica, mt *manualCache) + test func(t *testing.T, r *Replica, mp *manualPTSReader) }{ { name: "lease is too new", - test: func(t *testing.T, r *Replica, mt *manualCache) { + test: func(t *testing.T, r *Replica, _ *manualPTSReader) { r.mu.state.Lease.Start = r.store.Clock().NowAsClockTimestamp() canGC, _, gcTimestamp, _, _ := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) require.False(t, canGC) @@ -57,42 +54,30 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) { }, { name: "have overlapping but new enough that it's okay", - test: func(t *testing.T, r *Replica, mt *manualCache) { + test: func(t *testing.T, r *Replica, mp *manualPTSReader) { ts := r.store.Clock().Now() - mt.asOf = r.store.Clock().Now().Next() - mt.records = append(mt.records, &ptpb.Record{ - ID: uuid.MakeV4().GetBytes(), - Timestamp: ts, - DeprecatedSpans: []roachpb.Span{ - { - Key: keys.MinKey, - EndKey: keys.MaxKey, - }, - }, + mp.asOf = r.store.Clock().Now().Next() + mp.protections = append(mp.protections, manualPTSReaderProtection{ + sp: roachpb.Span{Key: keys.MinKey, EndKey: keys.MaxKey}, + protectionTimestamps: []hlc.Timestamp{ts}, }) // We should allow gc to proceed with the normal new threshold if that // threshold is earlier than all of the records. canGC, _, gcTimestamp, _, _ := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) require.True(t, canGC) - require.Equal(t, mt.asOf, gcTimestamp) + require.Equal(t, mp.asOf, gcTimestamp) }, }, { // In this case we have a record which protects some data but we can // set the threshold to a later point. name: "have overlapping but can still GC some", - test: func(t *testing.T, r *Replica, mt *manualCache) { + test: func(t *testing.T, r *Replica, mp *manualPTSReader) { ts := r.store.Clock().Now().Add(-11*time.Second.Nanoseconds(), 0) - mt.asOf = r.store.Clock().Now().Next() - mt.records = append(mt.records, &ptpb.Record{ - ID: uuid.MakeV4().GetBytes(), - Timestamp: ts, - DeprecatedSpans: []roachpb.Span{ - { - Key: keys.MinKey, - EndKey: keys.MaxKey, - }, - }, + mp.asOf = r.store.Clock().Now().Next() + mp.protections = append(mp.protections, manualPTSReaderProtection{ + sp: roachpb.Span{Key: keys.MinKey, EndKey: keys.MaxKey}, + protectionTimestamps: []hlc.Timestamp{ts}, }) // We should allow gc to proceed up to the timestamp which precedes the // protected timestamp. This means we expect a GC timestamp 10 seconds @@ -107,20 +92,14 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) { // In this case we have a record which is right up against the GC // threshold. name: "have overlapping but have already GC'd right up to the threshold", - test: func(t *testing.T, r *Replica, mt *manualCache) { + test: func(t *testing.T, r *Replica, mp *manualPTSReader) { r.mu.Lock() th := *r.mu.state.GCThreshold r.mu.Unlock() - mt.asOf = r.store.Clock().Now().Next() - mt.records = append(mt.records, &ptpb.Record{ - ID: uuid.MakeV4().GetBytes(), - Timestamp: th.Next(), - DeprecatedSpans: []roachpb.Span{ - { - Key: keys.MinKey, - EndKey: keys.MaxKey, - }, - }, + mp.asOf = r.store.Clock().Now().Next() + mp.protections = append(mp.protections, manualPTSReaderProtection{ + sp: roachpb.Span{Key: keys.MinKey, EndKey: keys.MaxKey}, + protectionTimestamps: []hlc.Timestamp{th.Next()}, }) // We should allow GC even if the threshold is already the // predecessor of the earliest valid record. However, the GC @@ -128,85 +107,135 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) { // applicable to manually enqueued ranges. canGC, _, gcTimestamp, oldThreshold, newThreshold := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) require.True(t, canGC) + require.Equal(t, newThreshold, oldThreshold) require.True(t, newThreshold.Equal(oldThreshold)) require.Equal(t, th.Add(10*time.Second.Nanoseconds(), 0), gcTimestamp) }, }, { name: "failed record does not prevent GC", - test: func(t *testing.T, r *Replica, mt *manualCache) { + test: func(t *testing.T, r *Replica, mp *manualPTSReader) { ts := r.store.Clock().Now() - id := uuid.MakeV4() thresh := ts.Next() r.mu.state.GCThreshold = &thresh - mt.asOf = thresh.Next() - mt.records = append(mt.records, &ptpb.Record{ - ID: id.GetBytes(), - Timestamp: ts, - DeprecatedSpans: []roachpb.Span{ - { - Key: keys.MinKey, - EndKey: keys.MaxKey, - }, - }, + mp.asOf = thresh.Next() + mp.protections = append(mp.protections, manualPTSReaderProtection{ + sp: roachpb.Span{Key: keys.MinKey, EndKey: keys.MaxKey}, + protectionTimestamps: []hlc.Timestamp{ts}, }) canGC, _, gcTimestamp, _, _ := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) require.True(t, canGC) - require.Equal(t, mt.asOf, gcTimestamp) + require.Equal(t, mp.asOf, gcTimestamp) + }, + }, + { + name: "earliest timestamp is picked when multiple records exist", + test: func(t *testing.T, r *Replica, mp *manualPTSReader) { + ts1 := r.store.Clock().Now().Add(-11*time.Second.Nanoseconds(), 0) + ts2 := r.store.Clock().Now().Add(-20*time.Second.Nanoseconds(), 0) + ts3 := r.store.Clock().Now().Add(-30*time.Second.Nanoseconds(), 0) + mp.asOf = r.store.Clock().Now().Next() + mp.protections = append(mp.protections, manualPTSReaderProtection{ + sp: roachpb.Span{Key: keys.MinKey, EndKey: keys.MaxKey}, + protectionTimestamps: []hlc.Timestamp{ts1, ts2, ts3}, + }) + + // Shuffle the protection timestamps for good measure. + mp.shuffleAllProtectionTimestamps() + // We should allow gc to proceed up to the timestamp which precedes the + // earliest protected timestamp (t3). This means we expect a GC + // timestamp 10 seconds after ts3.Prev() given the policy. + canGC, _, gcTimestamp, oldThreshold, newThreshold := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + require.True(t, canGC) + require.False(t, newThreshold.Equal(oldThreshold)) + require.Equal(t, ts3.Prev().Add(10*time.Second.Nanoseconds(), 0), gcTimestamp) + }, + }, + { + // We should be able to move the GC timestamp up if no protection + // timestamps apply. The timestamp moves up till how fresh our reading of + // PTS state is. + name: "no protections apply", + test: func(t *testing.T, r *Replica, mp *manualPTSReader) { + mp.asOf = r.store.Clock().Now().Next() + canGC, _, gcTimestamp, _, _ := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + require.True(t, canGC) + require.Equal(t, mp.asOf, gcTimestamp) + }, + }, + { + // Set up such that multiple timestamps are present including timestamps + // from failed records (i.e below the GCThreshold). We should be able to + // move the GC timestamp using the earliest protection timestamp that is + // still above the GCThreshold in such a case. + name: "multiple timestamps present including failed", + test: func(t *testing.T, r *Replica, mp *manualPTSReader) { + mp.asOf = r.store.Clock().Now().Next() + thresh := r.mu.state.GCThreshold + ts1 := thresh.Add(-7*time.Second.Nanoseconds(), 0) + ts2 := thresh.Add(-4*time.Second.Nanoseconds(), 0) + ts3 := thresh.Add(14*time.Second.Nanoseconds(), 0) + ts4 := thresh.Add(20*time.Second.Nanoseconds(), 0) + mp.protections = append(mp.protections, manualPTSReaderProtection{ + sp: roachpb.Span{Key: keys.MinKey, EndKey: keys.MaxKey}, + protectionTimestamps: []hlc.Timestamp{ts1, ts2, ts3, ts4}, + }) + mp.shuffleAllProtectionTimestamps() + // We should allow gc to proceed up to the timestamp which precedes the + // earliest protected timestamp (t3) that is still valid. This means we + // expect a GC timestamp 10 seconds after ts3.Prev() given the policy. + canGC, _, gcTimestamp, oldThreshold, newThreshold := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + require.True(t, canGC) + require.False(t, newThreshold.Equal(oldThreshold)) + require.Equal(t, ts3.Prev().Add(10*time.Second.Nanoseconds(), 0), gcTimestamp) }, }, } { t.Run(testCase.name, func(t *testing.T) { tc := testContext{} tsc := TestStoreConfig(nil) - mc := &manualCache{} - tsc.ProtectedTimestampCache = mc + mp := &manualPTSReader{} + tsc.ProtectedTimestampReader = mp stopper := stop.NewStopper() tc.StartWithStoreConfig(ctx, t, stopper, tsc) stopper.Stop(ctx) - testCase.test(t, tc.repl, mc) + testCase.test(t, tc.repl, mp) }) } } -type manualCache struct { - asOf hlc.Timestamp - records []*ptpb.Record - refresh func(ctx context.Context, asOf hlc.Timestamp) error +type manualPTSReaderProtection struct { + sp roachpb.Span + protectionTimestamps []hlc.Timestamp } -func (c *manualCache) Iterate( - ctx context.Context, start, end roachpb.Key, it protectedts.Iterator, -) hlc.Timestamp { - query := roachpb.Span{Key: start, EndKey: end} - for _, r := range c.records { - for _, sp := range r.DeprecatedSpans { - if query.Overlaps(sp) { - it(r) - break - } - } - } - return c.asOf +type manualPTSReader struct { + asOf hlc.Timestamp + protections []manualPTSReaderProtection } -func (c *manualCache) Refresh(ctx context.Context, asOf hlc.Timestamp) error { - if c.refresh == nil { - c.asOf = asOf - return nil +// GetProtectionTimestamps is part of the spanconfig.ProtectedTSReader +// interface. +func (mp *manualPTSReader) GetProtectionTimestamps( + _ context.Context, sp roachpb.Span, +) (protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp) { + for _, protection := range mp.protections { + if protection.sp.Overlaps(sp) { + protectionTimestamps = append(protectionTimestamps, protection.protectionTimestamps...) + } } - return c.refresh(ctx, asOf) + return protectionTimestamps, mp.asOf } -func (c *manualCache) QueryRecord( - ctx context.Context, id uuid.UUID, -) (exists bool, asOf hlc.Timestamp) { - for _, r := range c.records { - if r.ID.GetUUID() == id { - return true, c.asOf - } +// shuffleAllProtectionTimestamps shuffles protection timestamps associated with +// all spans. +func (mp *manualPTSReader) shuffleAllProtectionTimestamps() { + for i := range mp.protections { + rand.Shuffle(len(mp.protections[i].protectionTimestamps), func(a, b int) { + mp.protections[i].protectionTimestamps[a], mp.protections[i].protectionTimestamps[b] = + mp.protections[i].protectionTimestamps[b], mp.protections[i].protectionTimestamps[a] + }) } - return false, c.asOf } -var _ protectedts.Cache = (*manualCache)(nil) +var _ spanconfig.ProtectedTSReader = (*manualPTSReader)(nil) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index cb2c980c64e7..8191ecca6952 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -40,7 +40,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/tenantrate" @@ -246,7 +245,7 @@ func testStoreConfig(clock *hlc.Clock, version roachpb.Version) StoreConfig { CoalescedHeartbeatsInterval: 50 * time.Millisecond, ScanInterval: 10 * time.Minute, HistogramWindowInterval: metric.TestSampleInterval, - ProtectedTimestampCache: protectedts.EmptyCache(clock), + ProtectedTimestampReader: spanconfig.EmptyProtectedTSReader(clock), // Use a constant empty system config, which mirrors the previously // existing logic to install an empty system config in gossip. @@ -732,7 +731,7 @@ type Store struct { limiters batcheval.Limiters txnWaitMetrics *txnwait.Metrics sstSnapshotStorage SSTSnapshotStorage - protectedtsCache protectedts.Cache + protectedtsReader spanconfig.ProtectedTSReader ctSender *sidetransport.Sender // gossipRangeCountdown and leaseRangeCountdown are countdowns of @@ -1044,10 +1043,9 @@ type StoreConfig struct { ExternalStorage cloud.ExternalStorageFactory ExternalStorageFromURI cloud.ExternalStorageFromURIFactory - // ProtectedTimestampCache maintains the state of the protected timestamp - // subsystem. It is queried during the GC process and in the handling of - // AdminVerifyProtectedTimestampRequest. - ProtectedTimestampCache protectedts.Cache + // ProtectedTimestampReader provides a read-only view into the protected + // timestamp subsystem. It is queried during the GC process. + ProtectedTimestampReader spanconfig.ProtectedTSReader // KV Memory Monitor. Must be non-nil for production, and can be nil in some // tests. @@ -1227,7 +1225,7 @@ func NewStore( if err := s.sstSnapshotStorage.Clear(); err != nil { log.Warningf(ctx, "failed to clear snapshot storage: %v", err) } - s.protectedtsCache = cfg.ProtectedTimestampCache + s.protectedtsReader = cfg.ProtectedTimestampReader // On low-CPU instances, a default limit value may still allow ExportRequests // to tie up all cores so cap limiter at cores-1 when setting value is higher. diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 815c2406666c..9f7b394f28ba 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -103,6 +103,7 @@ go_library( "//pkg/kv/kvserver/loqrecovery", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb", "//pkg/kv/kvserver/protectedts", + "//pkg/kv/kvserver/protectedts/ptcache", "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/kv/kvserver/protectedts/ptprovider", "//pkg/kv/kvserver/protectedts/ptreconcile", @@ -138,6 +139,7 @@ go_library( "//pkg/spanconfig/spanconfigkvaccessor", "//pkg/spanconfig/spanconfigkvsubscriber", "//pkg/spanconfig/spanconfigmanager", + "//pkg/spanconfig/spanconfigptsreader", "//pkg/spanconfig/spanconfigreconciler", "//pkg/spanconfig/spanconfigsqltranslator", "//pkg/spanconfig/spanconfigsqlwatcher", diff --git a/pkg/server/server.go b/pkg/server/server.go index e2219ffe5a33..ed7629ab468a 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptcache" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptprovider" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptreconcile" serverrangefeed "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" @@ -56,6 +57,7 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigjob" // register jobs declared outside of pkg/sql "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvaccessor" "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigkvsubscriber" + "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" @@ -511,36 +513,37 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { systemConfigWatcher := systemconfigwatcher.New( keys.SystemSQLCodec, clock, rangeFeedFactory, &cfg.DefaultZoneConfig, ) + protectedTSReader := spanconfigptsreader.NewAdapter(protectedtsProvider.(*ptprovider.Provider).Cache.(*ptcache.Cache)) storeCfg := kvserver.StoreConfig{ - DefaultSpanConfig: cfg.DefaultZoneConfig.AsSpanConfig(), - Settings: st, - AmbientCtx: cfg.AmbientCtx, - RaftConfig: cfg.RaftConfig, - Clock: clock, - DB: db, - Gossip: g, - NodeLiveness: nodeLiveness, - Transport: raftTransport, - NodeDialer: nodeDialer, - RPCContext: rpcContext, - ScanInterval: cfg.ScanInterval, - ScanMinIdleTime: cfg.ScanMinIdleTime, - ScanMaxIdleTime: cfg.ScanMaxIdleTime, - HistogramWindowInterval: cfg.HistogramWindowInterval(), - StorePool: storePool, - SQLExecutor: internalExecutor, - LogRangeEvents: cfg.EventLogEnabled, - RangeDescriptorCache: distSender.RangeDescriptorCache(), - TimeSeriesDataStore: tsDB, - ClosedTimestampSender: ctSender, - ClosedTimestampReceiver: ctReceiver, - ExternalStorage: externalStorage, - ExternalStorageFromURI: externalStorageFromURI, - ProtectedTimestampCache: protectedtsProvider, - KVMemoryMonitor: kvMemoryMonitor, - RangefeedBudgetFactory: rangeReedBudgetFactory, - SystemConfigProvider: systemConfigWatcher, + DefaultSpanConfig: cfg.DefaultZoneConfig.AsSpanConfig(), + Settings: st, + AmbientCtx: cfg.AmbientCtx, + RaftConfig: cfg.RaftConfig, + Clock: clock, + DB: db, + Gossip: g, + NodeLiveness: nodeLiveness, + Transport: raftTransport, + NodeDialer: nodeDialer, + RPCContext: rpcContext, + ScanInterval: cfg.ScanInterval, + ScanMinIdleTime: cfg.ScanMinIdleTime, + ScanMaxIdleTime: cfg.ScanMaxIdleTime, + HistogramWindowInterval: cfg.HistogramWindowInterval(), + StorePool: storePool, + SQLExecutor: internalExecutor, + LogRangeEvents: cfg.EventLogEnabled, + RangeDescriptorCache: distSender.RangeDescriptorCache(), + TimeSeriesDataStore: tsDB, + ClosedTimestampSender: ctSender, + ClosedTimestampReceiver: ctReceiver, + ExternalStorage: externalStorage, + ExternalStorageFromURI: externalStorageFromURI, + ProtectedTimestampReader: protectedTSReader, + KVMemoryMonitor: kvMemoryMonitor, + RangefeedBudgetFactory: rangeReedBudgetFactory, + SystemConfigProvider: systemConfigWatcher, } var spanConfig struct { diff --git a/pkg/spanconfig/spanconfig.go b/pkg/spanconfig/spanconfig.go index 16df0ebdd5ca..4d74301ace69 100644 --- a/pkg/spanconfig/spanconfig.go +++ b/pkg/spanconfig/spanconfig.go @@ -397,3 +397,31 @@ func (u Update) Deletion() bool { func (u Update) Addition() bool { return !u.Deletion() } + +// ProtectedTSReader is the read-only portion for querying protected +// timestamp information. It doubles up as an adaptor interface for +// protectedts.Cache. +type ProtectedTSReader interface { + // GetProtectionTimestamps returns all protected timestamps that apply to any + // part of the given key span. The time at which this protected timestamp + // state is valid is returned as well. + GetProtectionTimestamps(ctx context.Context, sp roachpb.Span) ( + protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp, + ) +} + +// EmptyProtectedTSReader returns a ProtectedTSReader which contains no records +// and is always up-to date. This is intended for testing. +func EmptyProtectedTSReader(c *hlc.Clock) ProtectedTSReader { + return (*emptyProtectedTSReader)(c) +} + +type emptyProtectedTSReader hlc.Clock + +// GetProtectionTimestamps is part of the spanconfig.ProtectedTSReader +// interface. +func (r *emptyProtectedTSReader) GetProtectionTimestamps( + _ context.Context, _ roachpb.Span, +) ([]hlc.Timestamp, hlc.Timestamp) { + return nil, (*hlc.Clock)(r).Now() +} diff --git a/pkg/spanconfig/spanconfigptsreader/BUILD.bazel b/pkg/spanconfig/spanconfigptsreader/BUILD.bazel new file mode 100644 index 000000000000..5f3af5de4f52 --- /dev/null +++ b/pkg/spanconfig/spanconfigptsreader/BUILD.bazel @@ -0,0 +1,15 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "spanconfigptsreader", + srcs = ["adapter.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv/kvserver/protectedts/ptcache", + "//pkg/roachpb", + "//pkg/spanconfig", + "//pkg/util/hlc", + "@com_github_cockroachdb_errors//:errors", + ], +) diff --git a/pkg/spanconfig/spanconfigptsreader/adapter.go b/pkg/spanconfig/spanconfigptsreader/adapter.go new file mode 100644 index 000000000000..7ebf4f66a1f1 --- /dev/null +++ b/pkg/spanconfig/spanconfigptsreader/adapter.go @@ -0,0 +1,78 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigptsreader + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptcache" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" +) + +// adapter implements the spanconfig.ProtectedTSReader interface and is intended +// as a bridge between the old and new protected timestamp subsystems in KV. +// +// V1 of the protected timestamp subsystem only allowed protections to be set +// over spans belonging to the system tenant. These protections were cached by +// each node in the cluster by ptcache.Cache. V2 of the subsystem allows +// protections to be set on all spans (including secondary tenant spans) and are +// cached in the spanconfig.Store maintained by the spanconfig.KVSubscriber. In +// release 22.1 both the old and new subsystem co-exist, and as such, +// protections that apply to system tenant spans may be present in either the +// ptcache.Cache or spanconfig.KVSubscriber. This adapter struct encapsulates +// protected timestamp information from both these sources behind a single +// interface. +// +// TODO(arul): In 22.2, we would have completely migrated away from the old +// subsystem, and we'd be able to get rid of this interface. +// +// TODO(arul): Add the KVSubscriber here as well and actually encapsulate PTS +// information from both these sources as described above; This will happen once +// we make the KVSubscriber implement the spanconfig.ProtectedTSReader +// interface. +type adapter struct { + cache *ptcache.Cache +} + +var _ spanconfig.ProtectedTSReader = &adapter{} + +// NewAdapter returns an adapter that implements spanconfig.ProtectedTSReader. +func NewAdapter(cache *ptcache.Cache) spanconfig.ProtectedTSReader { + return &adapter{ + cache: cache, + } +} + +// GetProtectionTimestamps is part of the spanconfig.ProtectedTSReader +// interface. +func (a *adapter) GetProtectionTimestamps( + ctx context.Context, sp roachpb.Span, +) (protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp) { + return a.cache.GetProtectionTimestamps(ctx, sp) +} + +// TestingRefreshPTSState refreshes the in-memory protected timestamp state to +// at least asOf. +// TODO(arul): Once we wrap the KVSubscriber in this adapter interface, we'll +// need to ensure that the subscriber is at-least as caught up as the supplied +// asOf timestamp as well. +func TestingRefreshPTSState( + ctx context.Context, protectedTSReader spanconfig.ProtectedTSReader, asOf hlc.Timestamp, +) error { + a, ok := protectedTSReader.(*adapter) + if !ok { + return errors.AssertionFailedf("could not convert protectedts.Provider to ptprovider.Provider") + } + return a.cache.Refresh(ctx, asOf) +}