diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 98afcacf41a3..d3a7f1b4daaf 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -200,6 +200,7 @@ ALL_TESTS = [ "//pkg/spanconfig/spanconfigkvaccessor:spanconfigkvaccessor_test", "//pkg/spanconfig/spanconfigkvsubscriber:spanconfigkvsubscriber_test", "//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test", + "//pkg/spanconfig/spanconfigptsreader:spanconfigptsreader_test", "//pkg/spanconfig/spanconfigreconciler:spanconfigreconciler_test", "//pkg/spanconfig/spanconfigsqltranslator:spanconfigsqltranslator_test", "//pkg/spanconfig/spanconfigsqlwatcher:spanconfigsqlwatcher_test", diff --git a/pkg/kv/kvserver/client_protectedts_test.go b/pkg/kv/kvserver/client_protectedts_test.go index 6622d7ff993a..74eb5378c892 100644 --- a/pkg/kv/kvserver/client_protectedts_test.go +++ b/pkg/kv/kvserver/client_protectedts_test.go @@ -216,7 +216,9 @@ func TestProtectedTimestamps(t *testing.T) { ptsReader := tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().ProtectedTimestampReader require.NoError( t, - verifyProtectionTimestampExistsOnSpans(ctx, tc, ptsReader, ptsRec.Timestamp, ptsRec.DeprecatedSpans), + verifyProtectionTimestampExistsOnSpans( + ctx, t, tc, ptsReader, ptsRec.Timestamp, ptsRec.DeprecatedSpans, + ), ) // Make a new record that is doomed to fail. @@ -233,7 +235,9 @@ func TestProtectedTimestamps(t *testing.T) { // does not affect the ability to GC. require.NoError( t, - verifyProtectionTimestampExistsOnSpans(ctx, tc, ptsReader, failedRec.Timestamp, failedRec.DeprecatedSpans), + verifyProtectionTimestampExistsOnSpans( + ctx, t, tc, ptsReader, failedRec.Timestamp, failedRec.DeprecatedSpans, + ), ) // Add a new record that is after the old record. @@ -244,7 +248,9 @@ func TestProtectedTimestamps(t *testing.T) { require.NoError(t, ptsWithDB.Protect(ctx, nil /* txn */, &laterRec)) require.NoError( t, - verifyProtectionTimestampExistsOnSpans(ctx, tc, ptsReader, laterRec.Timestamp, laterRec.DeprecatedSpans), + verifyProtectionTimestampExistsOnSpans( + ctx, t, tc, ptsReader, laterRec.Timestamp, laterRec.DeprecatedSpans, + ), ) // Release the record that had succeeded and ensure that GC eventually @@ -278,18 +284,22 @@ func TestProtectedTimestamps(t *testing.T) { // supplied spans. func verifyProtectionTimestampExistsOnSpans( ctx context.Context, + t *testing.T, tc *testcluster.TestCluster, ptsReader spanconfig.ProtectedTSReader, protectionTimestamp hlc.Timestamp, spans roachpb.Spans, ) error { if err := spanconfigptsreader.TestingRefreshPTSState( - ctx, ptsReader, tc.Server(0).Clock().Now(), + ctx, t, ptsReader, tc.Server(0).Clock().Now(), ); err != nil { return err } for _, sp := range spans { - timestamps, _ := ptsReader.GetProtectionTimestamps(ctx, sp) + timestamps, _, err := ptsReader.GetProtectionTimestamps(ctx, sp) + if err != nil { + return err + } found := false for _, ts := range timestamps { if ts.Equal(protectionTimestamp) { diff --git a/pkg/kv/kvserver/client_replica_test.go b/pkg/kv/kvserver/client_replica_test.go index 427f011593a6..681c1ba18854 100644 --- a/pkg/kv/kvserver/client_replica_test.go +++ b/pkg/kv/kvserver/client_replica_test.go @@ -3519,7 +3519,10 @@ func TestStrictGCEnforcement(t *testing.T) { testutils.SucceedsSoon(t, func() error { for i := 0; i < tc.NumServers(); i++ { ptsReader := tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().ProtectedTimestampReader - _, asOf := ptsReader.GetProtectionTimestamps(ctx, tableSpan) + _, asOf, err := ptsReader.GetProtectionTimestamps(ctx, tableSpan) + if err != nil { + return err + } if asOf.Less(min) { return errors.Errorf("not yet read") } @@ -3577,9 +3580,10 @@ func TestStrictGCEnforcement(t *testing.T) { l, _ := r.GetLease() require.NoError( t, - spanconfigptsreader.TestingRefreshPTSState(ctx, ptsReader, l.Start.ToTimestamp().Next()), + spanconfigptsreader.TestingRefreshPTSState(ctx, t, ptsReader, l.Start.ToTimestamp().Next()), ) - r.ReadProtectedTimestamps(ctx) + err := r.ReadProtectedTimestamps(ctx) + require.NoError(t, err) } } refreshCacheAndUpdatePTSState = func(t *testing.T, nodeID roachpb.NodeID) { @@ -3590,7 +3594,8 @@ func TestStrictGCEnforcement(t *testing.T) { ptp := tc.Server(i).ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider require.NoError(t, ptp.Refresh(ctx, tc.Server(i).Clock().Now())) _, r := getFirstStoreReplica(t, tc.Server(i), tableKey) - r.ReadProtectedTimestamps(ctx) + err := r.ReadProtectedTimestamps(ctx) + require.NoError(t, err) } } ) diff --git a/pkg/kv/kvserver/client_spanconfigs_test.go b/pkg/kv/kvserver/client_spanconfigs_test.go index c9bfd59838fd..3f46b364dfbf 100644 --- a/pkg/kv/kvserver/client_spanconfigs_test.go +++ b/pkg/kv/kvserver/client_spanconfigs_test.go @@ -47,6 +47,7 @@ func TestSpanConfigUpdateAppliedToReplica(t *testing.T) { Store: &kvserver.StoreTestingKnobs{ DisableMergeQueue: true, DisableSplitQueue: true, + DisableGCQueue: true, }, SpanConfig: &spanconfig.TestingKnobs{ StoreKVSubscriberOverride: mockSubscriber, @@ -118,6 +119,12 @@ func (m *mockSpanConfigSubscriber) GetSpanConfigForKey( return m.Store.GetSpanConfigForKey(ctx, key) } +func (m *mockSpanConfigSubscriber) GetProtectionTimestamps( + context.Context, roachpb.Span, +) ([]hlc.Timestamp, hlc.Timestamp, error) { + panic("unimplemented") +} + func (m *mockSpanConfigSubscriber) LastUpdated() hlc.Timestamp { panic("unimplemented") } diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 2957aba56cbd..64430abd5933 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -38,7 +38,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/server" - "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader" + "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" "github.com/cockroachdb/cockroach/pkg/storage" @@ -2385,6 +2385,9 @@ func TestUnsplittableRange(t *testing.T) { DefaultZoneConfigOverride: &zoneConfig, DefaultSystemZoneConfigOverride: &zoneSystemConfig, }, + SpanConfig: &spanconfig.TestingKnobs{ + ProtectedTSReaderOverrideFn: spanconfig.EmptyProtectedTSReader, + }, }, }) s := serv.(*server.TestServer) @@ -2430,14 +2433,7 @@ func TestUnsplittableRange(t *testing.T) { manualClock.Increment(10 * ttl.Nanoseconds()) // Trigger the MVCC GC queue, which should clean up the earlier version of the // row. Once the first version of the row is cleaned up, the range should - // exit the split queue purgatory. We need to tickle the protected timestamp - // subsystem to release a timestamp at which we get to actually remove the data. - require.NoError( - t, - spanconfigptsreader.TestingRefreshPTSState( - ctx, store.GetStoreConfig().ProtectedTimestampReader, s.Clock().Now(), - ), - ) + // exit the split queue purgatory. repl := store.LookupReplica(tableKey) if err := store.ManualMVCCGC(repl); err != nil { t.Fatal(err) diff --git a/pkg/kv/kvserver/helpers_test.go b/pkg/kv/kvserver/helpers_test.go index 916ce3afc2c8..c32d40fe999c 100644 --- a/pkg/kv/kvserver/helpers_test.go +++ b/pkg/kv/kvserver/helpers_test.go @@ -483,12 +483,14 @@ func (r *Replica) MaybeUnquiesceAndWakeLeader() bool { return r.maybeUnquiesceAndWakeLeaderLocked() } -func (r *Replica) ReadProtectedTimestamps(ctx context.Context) { +func (r *Replica) ReadProtectedTimestamps(ctx context.Context) error { var ts cachedProtectedTimestampState defer r.maybeUpdateCachedProtectedTS(&ts) r.mu.RLock() defer r.mu.RUnlock() - ts = r.readProtectedTimestampsRLocked(ctx) + var err error + ts, err = r.readProtectedTimestampsRLocked(ctx) + return err } // ClosedTimestampPolicy returns the closed timestamp policy of the range, which diff --git a/pkg/kv/kvserver/mvcc_gc_queue.go b/pkg/kv/kvserver/mvcc_gc_queue.go index de016745c2bd..d218a1389a6a 100644 --- a/pkg/kv/kvserver/mvcc_gc_queue.go +++ b/pkg/kv/kvserver/mvcc_gc_queue.go @@ -180,7 +180,11 @@ func (mgcq *mvccGCQueue) shouldQueue( // Consult the protected timestamp state to determine whether we can GC and // the timestamp which can be used to calculate the score. _, conf := repl.DescAndSpanConfig() - canGC, _, gcTimestamp, oldThreshold, newThreshold := repl.checkProtectedTimestampsForGC(ctx, conf.TTL()) + canGC, _, gcTimestamp, oldThreshold, newThreshold, err := repl.checkProtectedTimestampsForGC(ctx, conf.TTL()) + if err != nil { + log.VErrEventf(ctx, 2, "failed to check protected timestamp for gc: %v", err) + return false, 0 + } if !canGC { return false, 0 } @@ -525,7 +529,10 @@ func (mgcq *mvccGCQueue) process( // Consult the protected timestamp state to determine whether we can GC and // the timestamp which can be used to calculate the score and updated GC // threshold. - canGC, cacheTimestamp, gcTimestamp, oldThreshold, newThreshold := repl.checkProtectedTimestampsForGC(ctx, conf.TTL()) + canGC, cacheTimestamp, gcTimestamp, oldThreshold, newThreshold, err := repl.checkProtectedTimestampsForGC(ctx, conf.TTL()) + if err != nil { + return false, err + } if !canGC { return false, nil } diff --git a/pkg/kv/kvserver/protectedts/BUILD.bazel b/pkg/kv/kvserver/protectedts/BUILD.bazel index e0b055fb23b1..85d31485cc40 100644 --- a/pkg/kv/kvserver/protectedts/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/roachpb", "//pkg/settings", + "//pkg/spanconfig", "//pkg/util/hlc", "//pkg/util/metric", "//pkg/util/stop", diff --git a/pkg/kv/kvserver/protectedts/protectedts.go b/pkg/kv/kvserver/protectedts/protectedts.go index 10349f7e9aa9..5ed957ae2fe1 100644 --- a/pkg/kv/kvserver/protectedts/protectedts.go +++ b/pkg/kv/kvserver/protectedts/protectedts.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -115,6 +116,7 @@ type Iterator func(*ptpb.Record) (wantMore bool) // by any Records at a given asOf can move its GC threshold up to that // timestamp less its GC TTL. type Cache interface { + spanconfig.ProtectedTSReader // Iterate examines the records with spans which overlap with [from, to). // Nil values for from or to are equivalent to Key{}. The order of records @@ -178,3 +180,9 @@ func (c *emptyCache) QueryRecord( func (c *emptyCache) Refresh(_ context.Context, asOf hlc.Timestamp) error { return nil } + +func (c *emptyCache) GetProtectionTimestamps( + context.Context, roachpb.Span, +) (protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp, err error) { + return protectionTimestamps, (*hlc.Clock)(c).Now(), nil +} diff --git a/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel b/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel index 0b20be4a049f..1a46390e209c 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel +++ b/pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel @@ -11,7 +11,6 @@ go_library( "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/roachpb", "//pkg/settings/cluster", - "//pkg/spanconfig", "//pkg/util/hlc", "//pkg/util/log", "//pkg/util/stop", diff --git a/pkg/kv/kvserver/protectedts/ptcache/cache.go b/pkg/kv/kvserver/protectedts/ptcache/cache.go index e99d012fdb1b..6d277ffb6561 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/cache.go +++ b/pkg/kv/kvserver/protectedts/ptcache/cache.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -74,7 +73,6 @@ func New(config Config) *Cache { } var _ protectedts.Cache = (*Cache)(nil) -var _ spanconfig.ProtectedTSReader = (*Cache)(nil) // Iterate is part of the protectedts.Cache interface. func (c *Cache) Iterate( @@ -132,7 +130,7 @@ func (c *Cache) Refresh(ctx context.Context, asOf hlc.Timestamp) error { // interface. func (c *Cache) GetProtectionTimestamps( ctx context.Context, sp roachpb.Span, -) (protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp) { +) (protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp, err error) { readAt := c.Iterate(ctx, sp.Key, sp.EndKey, @@ -140,7 +138,7 @@ func (c *Cache) GetProtectionTimestamps( protectionTimestamps = append(protectionTimestamps, rec.Timestamp) return true }) - return protectionTimestamps, readAt + return protectionTimestamps, readAt, nil } // Start starts the periodic fetching of the Cache. A Cache must not be used diff --git a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go index 1a5e44862237..37a5dd43ad1b 100644 --- a/pkg/kv/kvserver/protectedts/ptcache/cache_test.go +++ b/pkg/kv/kvserver/protectedts/ptcache/cache_test.go @@ -403,7 +403,8 @@ func TestGetProtectionTimestamps(t *testing.T) { r3, _ := protect(t, s, p, ts(6), sp42) require.NoError(t, c.Refresh(ctx, s.Clock().Now())) - protectionTimestamps, _ := c.GetProtectionTimestamps(ctx, sp42) + protectionTimestamps, _, err := c.GetProtectionTimestamps(ctx, sp42) + require.NoError(t, err) sort.Slice(protectionTimestamps, func(i, j int) bool { return protectionTimestamps[i].Less(protectionTimestamps[j]) }) @@ -417,7 +418,8 @@ func TestGetProtectionTimestamps(t *testing.T) { r1, _ := protect(t, s, p, ts(5), sp43) r2, _ := protect(t, s, p, ts(10), sp44) require.NoError(t, c.Refresh(ctx, s.Clock().Now())) - protectionTimestamps, _ := c.GetProtectionTimestamps(ctx, sp42) + protectionTimestamps, _, err := c.GetProtectionTimestamps(ctx, sp42) + require.NoError(t, err) require.Equal(t, []hlc.Timestamp(nil), protectionTimestamps) cleanup(r1, r2) }, @@ -435,7 +437,8 @@ func TestGetProtectionTimestamps(t *testing.T) { r6, _ := protect(t, s, p, ts(20), sp44) require.NoError(t, c.Refresh(ctx, s.Clock().Now())) - protectionTimestamps, _ := c.GetProtectionTimestamps(ctx, sp4243) + protectionTimestamps, _, err := c.GetProtectionTimestamps(ctx, sp4243) + require.NoError(t, err) sort.Slice(protectionTimestamps, func(i, j int) bool { return protectionTimestamps[i].Less(protectionTimestamps[j]) }) diff --git a/pkg/kv/kvserver/replica_protected_timestamp.go b/pkg/kv/kvserver/replica_protected_timestamp.go index 48a5eb132424..fb86e671987f 100644 --- a/pkg/kv/kvserver/replica_protected_timestamp.go +++ b/pkg/kv/kvserver/replica_protected_timestamp.go @@ -66,7 +66,7 @@ func (r *Replica) maybeUpdateCachedProtectedTS(ts *cachedProtectedTimestampState func (r *Replica) readProtectedTimestampsRLocked( ctx context.Context, -) (ts cachedProtectedTimestampState) { +) (ts cachedProtectedTimestampState, _ error) { desc := r.descRLocked() gcThreshold := *r.mu.state.GCThreshold @@ -75,7 +75,11 @@ func (r *Replica) readProtectedTimestampsRLocked( EndKey: roachpb.Key(desc.EndKey), } var protectionTimestamps []hlc.Timestamp - protectionTimestamps, ts.readAt = r.store.protectedtsReader.GetProtectionTimestamps(ctx, sp) + var err error + protectionTimestamps, ts.readAt, err = r.store.protectedtsReader.GetProtectionTimestamps(ctx, sp) + if err != nil { + return ts, err + } earliestTS := hlc.Timestamp{} for _, protectionTimestamp := range protectionTimestamps { // Check if the timestamp the record was trying to protect is strictly @@ -88,7 +92,7 @@ func (r *Replica) readProtectedTimestampsRLocked( } } ts.earliestProtectionTimestamp = earliestTS - return ts + return ts, nil } // checkProtectedTimestampsForGC determines whether the Replica can run GC. If @@ -104,7 +108,7 @@ func (r *Replica) readProtectedTimestampsRLocked( // old gc threshold, and the new gc threshold. func (r *Replica) checkProtectedTimestampsForGC( ctx context.Context, gcTTL time.Duration, -) (canGC bool, cacheTimestamp, gcTimestamp, oldThreshold, newThreshold hlc.Timestamp) { +) (canGC bool, cacheTimestamp, gcTimestamp, oldThreshold, newThreshold hlc.Timestamp, _ error) { // We may be reading the protected timestamp cache while we're holding // the Replica.mu for reading. If we do so and find newer state in the cache @@ -123,7 +127,11 @@ func (r *Replica) checkProtectedTimestampsForGC( // read.earliestRecord is the record with the earliest timestamp which is // greater than the existing gcThreshold. - read = r.readProtectedTimestampsRLocked(ctx) + var err error + read, err = r.readProtectedTimestampsRLocked(ctx) + if err != nil { + return false, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}, err + } gcTimestamp = read.readAt if !read.earliestProtectionTimestamp.IsEmpty() { // NB: we want to allow GC up to the timestamp preceding the earliest valid @@ -137,12 +145,12 @@ func (r *Replica) checkProtectedTimestampsForGC( if gcTimestamp.Less(lease.Start.ToTimestamp()) { log.VEventf(ctx, 1, "not gc'ing replica %v due to new lease %v started after %v", r, lease, gcTimestamp) - return false, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{} + return false, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}, nil } newThreshold = gc.CalculateThreshold(gcTimestamp, gcTTL) - return true, read.readAt, gcTimestamp, oldThreshold, newThreshold + return true, read.readAt, gcTimestamp, oldThreshold, newThreshold, nil } // markPendingGC is called just prior to sending the GC request to increase the diff --git a/pkg/kv/kvserver/replica_protected_timestamp_test.go b/pkg/kv/kvserver/replica_protected_timestamp_test.go index d47d29851cf0..f02ca7af3ead 100644 --- a/pkg/kv/kvserver/replica_protected_timestamp_test.go +++ b/pkg/kv/kvserver/replica_protected_timestamp_test.go @@ -47,7 +47,8 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) { name: "lease is too new", test: func(t *testing.T, r *Replica, _ *manualPTSReader) { r.mu.state.Lease.Start = r.store.Clock().NowAsClockTimestamp() - canGC, _, gcTimestamp, _, _ := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + canGC, _, gcTimestamp, _, _, err := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + require.NoError(t, err) require.False(t, canGC) require.Zero(t, gcTimestamp) }, @@ -63,7 +64,8 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) { }) // We should allow gc to proceed with the normal new threshold if that // threshold is earlier than all of the records. - canGC, _, gcTimestamp, _, _ := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + canGC, _, gcTimestamp, _, _, err := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + require.NoError(t, err) require.True(t, canGC) require.Equal(t, mp.asOf, gcTimestamp) }, @@ -82,7 +84,8 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) { // We should allow gc to proceed up to the timestamp which precedes the // protected timestamp. This means we expect a GC timestamp 10 seconds // after ts.Prev() given the policy. - canGC, _, gcTimestamp, oldThreshold, newThreshold := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + canGC, _, gcTimestamp, oldThreshold, newThreshold, err := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + require.NoError(t, err) require.True(t, canGC) require.False(t, newThreshold.Equal(oldThreshold)) require.Equal(t, ts.Prev().Add(10*time.Second.Nanoseconds(), 0), gcTimestamp) @@ -105,7 +108,8 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) { // predecessor of the earliest valid record. However, the GC // queue does not enqueue ranges in such cases, so this is only // applicable to manually enqueued ranges. - canGC, _, gcTimestamp, oldThreshold, newThreshold := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + canGC, _, gcTimestamp, oldThreshold, newThreshold, err := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + require.NoError(t, err) require.True(t, canGC) require.Equal(t, newThreshold, oldThreshold) require.True(t, newThreshold.Equal(oldThreshold)) @@ -123,7 +127,8 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) { sp: roachpb.Span{Key: keys.MinKey, EndKey: keys.MaxKey}, protectionTimestamps: []hlc.Timestamp{ts}, }) - canGC, _, gcTimestamp, _, _ := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + canGC, _, gcTimestamp, _, _, err := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + require.NoError(t, err) require.True(t, canGC) require.Equal(t, mp.asOf, gcTimestamp) }, @@ -145,7 +150,8 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) { // We should allow gc to proceed up to the timestamp which precedes the // earliest protected timestamp (t3). This means we expect a GC // timestamp 10 seconds after ts3.Prev() given the policy. - canGC, _, gcTimestamp, oldThreshold, newThreshold := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + canGC, _, gcTimestamp, oldThreshold, newThreshold, err := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + require.NoError(t, err) require.True(t, canGC) require.False(t, newThreshold.Equal(oldThreshold)) require.Equal(t, ts3.Prev().Add(10*time.Second.Nanoseconds(), 0), gcTimestamp) @@ -158,7 +164,8 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) { name: "no protections apply", test: func(t *testing.T, r *Replica, mp *manualPTSReader) { mp.asOf = r.store.Clock().Now().Next() - canGC, _, gcTimestamp, _, _ := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + canGC, _, gcTimestamp, _, _, err := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + require.NoError(t, err) require.True(t, canGC) require.Equal(t, mp.asOf, gcTimestamp) }, @@ -184,7 +191,8 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) { // We should allow gc to proceed up to the timestamp which precedes the // earliest protected timestamp (t3) that is still valid. This means we // expect a GC timestamp 10 seconds after ts3.Prev() given the policy. - canGC, _, gcTimestamp, oldThreshold, newThreshold := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + canGC, _, gcTimestamp, oldThreshold, newThreshold, err := r.checkProtectedTimestampsForGC(ctx, makeTTLDuration(10)) + require.NoError(t, err) require.True(t, canGC) require.False(t, newThreshold.Equal(oldThreshold)) require.Equal(t, ts3.Prev().Add(10*time.Second.Nanoseconds(), 0), gcTimestamp) @@ -217,14 +225,14 @@ type manualPTSReader struct { // GetProtectionTimestamps is part of the spanconfig.ProtectedTSReader // interface. func (mp *manualPTSReader) GetProtectionTimestamps( - _ context.Context, sp roachpb.Span, -) (protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp) { + ctx context.Context, sp roachpb.Span, +) (protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp, err error) { for _, protection := range mp.protections { if protection.sp.Overlaps(sp) { protectionTimestamps = append(protectionTimestamps, protection.protectionTimestamps...) } } - return protectionTimestamps, mp.asOf + return protectionTimestamps, mp.asOf, nil } // shuffleAllProtectionTimestamps shuffles protection timestamps associated with diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 0a53422fce99..4c8b46df94e3 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -103,7 +103,6 @@ go_library( "//pkg/kv/kvserver/loqrecovery", "//pkg/kv/kvserver/loqrecovery/loqrecoverypb", "//pkg/kv/kvserver/protectedts", - "//pkg/kv/kvserver/protectedts/ptcache", "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/kv/kvserver/protectedts/ptprovider", "//pkg/kv/kvserver/protectedts/ptreconcile", diff --git a/pkg/server/server.go b/pkg/server/server.go index ed7629ab468a..859173f37939 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -37,7 +37,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptcache" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptprovider" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptreconcile" serverrangefeed "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" @@ -135,7 +134,7 @@ type Server struct { replicationReporter *reports.Reporter protectedtsProvider protectedts.Provider - spanConfigSubscriber *spanconfigkvsubscriber.KVSubscriber + spanConfigSubscriber spanconfig.KVSubscriber sqlServer *SQLServer @@ -513,55 +512,21 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { systemConfigWatcher := systemconfigwatcher.New( keys.SystemSQLCodec, clock, rangeFeedFactory, &cfg.DefaultZoneConfig, ) - protectedTSReader := spanconfigptsreader.NewAdapter(protectedtsProvider.(*ptprovider.Provider).Cache.(*ptcache.Cache)) - - storeCfg := kvserver.StoreConfig{ - DefaultSpanConfig: cfg.DefaultZoneConfig.AsSpanConfig(), - Settings: st, - AmbientCtx: cfg.AmbientCtx, - RaftConfig: cfg.RaftConfig, - Clock: clock, - DB: db, - Gossip: g, - NodeLiveness: nodeLiveness, - Transport: raftTransport, - NodeDialer: nodeDialer, - RPCContext: rpcContext, - ScanInterval: cfg.ScanInterval, - ScanMinIdleTime: cfg.ScanMinIdleTime, - ScanMaxIdleTime: cfg.ScanMaxIdleTime, - HistogramWindowInterval: cfg.HistogramWindowInterval(), - StorePool: storePool, - SQLExecutor: internalExecutor, - LogRangeEvents: cfg.EventLogEnabled, - RangeDescriptorCache: distSender.RangeDescriptorCache(), - TimeSeriesDataStore: tsDB, - ClosedTimestampSender: ctSender, - ClosedTimestampReceiver: ctReceiver, - ExternalStorage: externalStorage, - ExternalStorageFromURI: externalStorageFromURI, - ProtectedTimestampReader: protectedTSReader, - KVMemoryMonitor: kvMemoryMonitor, - RangefeedBudgetFactory: rangeReedBudgetFactory, - SystemConfigProvider: systemConfigWatcher, - } var spanConfig struct { // kvAccessor powers the span configuration RPCs and the host tenant's // reconciliation job. kvAccessor spanconfig.KVAccessor - // subscriber is used by stores to subsribe to span configuration - // updates. - subscriber *spanconfigkvsubscriber.KVSubscriber + // subscriber is used by stores to subscribe to span configuration updates. + subscriber spanconfig.KVSubscriber // kvAccessorForTenantRecords is when creating/destroying secondary // tenant records. kvAccessorForTenantRecords spanconfig.KVAccessor } - storeCfg.SpanConfigsDisabled = cfg.SpanConfigsDisabled if !cfg.SpanConfigsDisabled { spanConfigKnobs, _ := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs) if spanConfigKnobs != nil && spanConfigKnobs.StoreKVSubscriberOverride != nil { - storeCfg.SpanConfigSubscriber = spanConfigKnobs.StoreKVSubscriberOverride + spanConfig.subscriber = spanConfigKnobs.StoreKVSubscriberOverride } else { // We use the span configs infra to control whether rangefeeds are // enabled on a given range. At the moment this only applies to @@ -578,7 +543,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { // For system table ranges we install configs that allow for // rangefeeds. Until then, we simply allow rangefeeds when a more // targeted config is not found. - fallbackConf := storeCfg.DefaultSpanConfig + fallbackConf := cfg.DefaultZoneConfig.AsSpanConfig() fallbackConf.RangefeedEnabled = true // We do the same for opting out of strict GC enforcement; it // really only applies to user table ranges @@ -592,7 +557,6 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { fallbackConf, spanConfigKnobs, ) - storeCfg.SpanConfigSubscriber = spanConfig.subscriber } scKVAccessor := spanconfigkvaccessor.New( @@ -609,6 +573,50 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { // Use a no-op accessor where tenant records are created/destroyed. spanConfig.kvAccessorForTenantRecords = spanconfigkvaccessor.NoopKVAccessor + + spanConfig.subscriber = spanconfigkvsubscriber.NewNoopSubscriber(clock) + } + + var protectedTSReader spanconfig.ProtectedTSReader + if cfg.TestingKnobs.SpanConfig != nil && + cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs).ProtectedTSReaderOverrideFn != nil { + fn := cfg.TestingKnobs.SpanConfig.(*spanconfig.TestingKnobs).ProtectedTSReaderOverrideFn + protectedTSReader = fn(clock) + } else { + protectedTSReader = spanconfigptsreader.NewAdapter(protectedtsProvider.(*ptprovider.Provider).Cache, spanConfig.subscriber) + } + + storeCfg := kvserver.StoreConfig{ + DefaultSpanConfig: cfg.DefaultZoneConfig.AsSpanConfig(), + Settings: st, + AmbientCtx: cfg.AmbientCtx, + RaftConfig: cfg.RaftConfig, + Clock: clock, + DB: db, + Gossip: g, + NodeLiveness: nodeLiveness, + Transport: raftTransport, + NodeDialer: nodeDialer, + RPCContext: rpcContext, + ScanInterval: cfg.ScanInterval, + ScanMinIdleTime: cfg.ScanMinIdleTime, + ScanMaxIdleTime: cfg.ScanMaxIdleTime, + HistogramWindowInterval: cfg.HistogramWindowInterval(), + StorePool: storePool, + SQLExecutor: internalExecutor, + LogRangeEvents: cfg.EventLogEnabled, + RangeDescriptorCache: distSender.RangeDescriptorCache(), + TimeSeriesDataStore: tsDB, + ClosedTimestampSender: ctSender, + ClosedTimestampReceiver: ctReceiver, + ExternalStorage: externalStorage, + ExternalStorageFromURI: externalStorageFromURI, + ProtectedTimestampReader: protectedTSReader, + KVMemoryMonitor: kvMemoryMonitor, + RangefeedBudgetFactory: rangeReedBudgetFactory, + SystemConfigProvider: systemConfigWatcher, + SpanConfigSubscriber: spanConfig.subscriber, + SpanConfigsDisabled: cfg.SpanConfigsDisabled, } if storeTestingKnobs := cfg.TestingKnobs.Store; storeTestingKnobs != nil { @@ -1418,8 +1426,10 @@ func (s *Server) PreStart(ctx context.Context) error { } if !s.cfg.SpanConfigsDisabled && s.spanConfigSubscriber != nil { - if err := s.spanConfigSubscriber.Start(ctx, s.stopper); err != nil { - return err + if subscriber, ok := s.spanConfigSubscriber.(*spanconfigkvsubscriber.KVSubscriber); ok { + if err := subscriber.Start(ctx, s.stopper); err != nil { + return err + } } } // Start garbage collecting system events. diff --git a/pkg/spanconfig/protectedts_state_reader.go b/pkg/spanconfig/protectedts_state_reader.go index 7749fac58a2a..a57809f75969 100644 --- a/pkg/spanconfig/protectedts_state_reader.go +++ b/pkg/spanconfig/protectedts_state_reader.go @@ -51,19 +51,19 @@ func (p *ProtectedTimestampStateReader) GetProtectionPoliciesForCluster() []roac // TenantProtectedTimestamps represents all the protections that apply to a // tenant's keyspace. type TenantProtectedTimestamps struct { - protections []roachpb.ProtectionPolicy - tenantID roachpb.TenantID + Protections []roachpb.ProtectionPolicy + TenantID roachpb.TenantID } // GetTenantProtections returns the ProtectionPolicies that apply to this tenant. func (t *TenantProtectedTimestamps) GetTenantProtections() []roachpb.ProtectionPolicy { - return t.protections + return t.Protections } // GetTenantID returns the tenant ID of the tenant that the protected timestamp // records target. func (t *TenantProtectedTimestamps) GetTenantID() roachpb.TenantID { - return t.tenantID + return t.TenantID } // GetProtectionPoliciesForTenants returns all the protected timestamps that @@ -77,7 +77,7 @@ func (p *ProtectedTimestampStateReader) GetProtectionPoliciesForTenants() []Tena // apply to a particular tenant's keyspace. func (p *ProtectedTimestampStateReader) ProtectionExistsForTenant(tenantID roachpb.TenantID) bool { for _, tp := range p.tenantProtections { - if tp.tenantID.Equal(tenantID) { + if tp.TenantID.Equal(tenantID) { return true } } @@ -120,6 +120,6 @@ func (p *ProtectedTimestampStateReader) loadProtectedTimestampRecords(ptsState p for tenID, tenantProtections := range tenantProtections { p.tenantProtections = append(p.tenantProtections, - TenantProtectedTimestamps{tenantID: tenID, protections: tenantProtections}) + TenantProtectedTimestamps{TenantID: tenID, Protections: tenantProtections}) } } diff --git a/pkg/spanconfig/protectedts_state_reader_test.go b/pkg/spanconfig/protectedts_state_reader_test.go index 6c0e290764ab..414a6e995e4a 100644 --- a/pkg/spanconfig/protectedts_state_reader_test.go +++ b/pkg/spanconfig/protectedts_state_reader_test.go @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0, included in the file // licenses/APL.txt. -package spanconfig +package spanconfig_test import ( "context" @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobsprotectedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -64,24 +65,24 @@ func TestProtectedTimestampStateReader(t *testing.T) { protectTenants(state, ts(5), []roachpb.TenantID{roachpb.MakeTenantID(2)}) protectTenants(state, ts(6), []roachpb.TenantID{roachpb.MakeTenantID(2)}) - ptsStateReader := NewProtectedTimestampStateReader(context.Background(), *state) + ptsStateReader := spanconfig.NewProtectedTimestampStateReader(context.Background(), *state) clusterTimestamps := ptsStateReader.GetProtectionPoliciesForCluster() require.Len(t, clusterTimestamps, 1) require.Equal(t, []roachpb.ProtectionPolicy{{ProtectedTimestamp: ts(3)}}, clusterTimestamps) tenantTimestamps := ptsStateReader.GetProtectionPoliciesForTenants() sort.Slice(tenantTimestamps, func(i, j int) bool { - return tenantTimestamps[i].tenantID.ToUint64() < tenantTimestamps[j].tenantID.ToUint64() + return tenantTimestamps[i].TenantID.ToUint64() < tenantTimestamps[j].TenantID.ToUint64() }) require.Len(t, tenantTimestamps, 2) - require.Equal(t, []TenantProtectedTimestamps{ + require.Equal(t, []spanconfig.TenantProtectedTimestamps{ { - tenantID: roachpb.MakeTenantID(1), - protections: []roachpb.ProtectionPolicy{{ProtectedTimestamp: ts(4)}}, + TenantID: roachpb.MakeTenantID(1), + Protections: []roachpb.ProtectionPolicy{{ProtectedTimestamp: ts(4)}}, }, { - tenantID: roachpb.MakeTenantID(2), - protections: []roachpb.ProtectionPolicy{{ProtectedTimestamp: ts(5)}, + TenantID: roachpb.MakeTenantID(2), + Protections: []roachpb.ProtectionPolicy{{ProtectedTimestamp: ts(5)}, {ProtectedTimestamp: ts(6)}}, }, }, tenantTimestamps) diff --git a/pkg/spanconfig/spanconfig.go b/pkg/spanconfig/spanconfig.go index 4d74301ace69..8484645a0058 100644 --- a/pkg/spanconfig/spanconfig.go +++ b/pkg/spanconfig/spanconfig.go @@ -48,10 +48,12 @@ type KVAccessor interface { WithTxn(context.Context, *kv.Txn) KVAccessor } -// KVSubscriber presents a consistent[1] snapshot of a StoreReader that's -// incrementally maintained with changes made to the global span configurations -// state (system.span_configurations). The maintenance happens transparently; -// callers can subscribe to learn about what key spans may have seen a +// KVSubscriber presents a consistent[1] snapshot of a StoreReader and +// ProtectedTSReader that's incrementally maintained with changes made to the +// global span configurations state (system.span_configurations). The +// maintenance happens transparently. +// +// Callers can subscribe to learn about what key spans may have seen a // configuration change. After learning about a span update through a callback // invocation, subscribers can consult the embedded StoreReader to retrieve an // up-to-date[2] config for the updated span. The callback is called in a single @@ -66,10 +68,11 @@ type KVAccessor interface { // StoreReader for the given span would still retrieve the last config observed // for the span[3]. // -// [1]: The contents of the StoreReader at t1 corresponds exactly to the -// contents of the global span configuration state at t0 where t0 <= t1. If -// the StoreReader is read from at t2 where t2 > t1, it's guaranteed to -// observe a view of the global state at t >= t0. +// [1]: The contents of the StoreReader and ProtectedTSReader at t1 corresponds +// exactly to the contents of the global span configuration state at t0 +// where t0 <= t1. If the StoreReader or ProtectedTSReader is read from at +// t2 where t2 > t1, it's guaranteed to observe a view of the global state +// at t >= t0. // [2]: For the canonical KVSubscriber implementation, this is typically lagging // by the closed timestamp target duration. // [3]: The canonical KVSubscriber implementation is bounced whenever errors @@ -77,6 +80,8 @@ type KVAccessor interface { // (typically through a coarsely targeted [min,max) span). type KVSubscriber interface { StoreReader + ProtectedTSReader + LastUpdated() hlc.Timestamp Subscribe(func(ctx context.Context, updated roachpb.Span)) } @@ -406,7 +411,7 @@ type ProtectedTSReader interface { // part of the given key span. The time at which this protected timestamp // state is valid is returned as well. GetProtectionTimestamps(ctx context.Context, sp roachpb.Span) ( - protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp, + protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp, _ error, ) } @@ -421,7 +426,7 @@ type emptyProtectedTSReader hlc.Clock // GetProtectionTimestamps is part of the spanconfig.ProtectedTSReader // interface. func (r *emptyProtectedTSReader) GetProtectionTimestamps( - _ context.Context, _ roachpb.Span, -) ([]hlc.Timestamp, hlc.Timestamp) { - return nil, (*hlc.Clock)(r).Now() + context.Context, roachpb.Span, +) ([]hlc.Timestamp, hlc.Timestamp, error) { + return nil, (*hlc.Clock)(r).Now(), nil } diff --git a/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel b/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel index dfb7c1bc4ac2..b7a3f44641bb 100644 --- a/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel +++ b/pkg/spanconfig/spanconfigkvsubscriber/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "spanconfigkvsubscriber", srcs = [ + "dummy.go", "kvsubscriber.go", "spanconfigdecoder.go", ], diff --git a/pkg/spanconfig/spanconfigkvsubscriber/dummy.go b/pkg/spanconfig/spanconfigkvsubscriber/dummy.go new file mode 100644 index 000000000000..dbeebf81156e --- /dev/null +++ b/pkg/spanconfig/spanconfigkvsubscriber/dummy.go @@ -0,0 +1,69 @@ +// Copyright 2021 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigkvsubscriber + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// noopKVSubscriber is a KVSubscriber that no-ops and is always up-to-date. +// Intended for tests that do not make use of the span configurations +// infrastructure. +type noopKVSubscriber struct { + clock *hlc.Clock +} + +var _ spanconfig.KVSubscriber = &noopKVSubscriber{} + +// NewNoopSubscriber returns a new no-op KVSubscriber. +func NewNoopSubscriber(clock *hlc.Clock) spanconfig.KVSubscriber { + return &noopKVSubscriber{ + clock: clock, + } +} + +// Subscribe is part of the spanconfig.KVSubsriber interface. +func (n *noopKVSubscriber) Subscribe(func(context.Context, roachpb.Span)) {} + +// LastUpdated is part of the spanconfig.KVSubscriber interface. +func (n *noopKVSubscriber) LastUpdated() hlc.Timestamp { + return n.clock.Now() +} + +// NeedsSplit is part of the spanconfig.KVSubscriber interface. +func (n *noopKVSubscriber) NeedsSplit(context.Context, roachpb.RKey, roachpb.RKey) bool { + return false +} + +// ComputeSplitKey is part of the spanconfig.KVSubscriber interface. +func (n *noopKVSubscriber) ComputeSplitKey( + context.Context, roachpb.RKey, roachpb.RKey, +) roachpb.RKey { + return roachpb.RKey{} +} + +// GetSpanConfigForKey is part of the spanconfig.KVSubscriber interface. +func (n *noopKVSubscriber) GetSpanConfigForKey( + context.Context, roachpb.RKey, +) (roachpb.SpanConfig, error) { + return roachpb.SpanConfig{}, nil +} + +// GetProtectionTimestamps is part of the spanconfig.KVSubscriber interface. +func (n *noopKVSubscriber) GetProtectionTimestamps( + context.Context, roachpb.Span, +) ([]hlc.Timestamp, hlc.Timestamp, error) { + return nil, n.LastUpdated(), nil +} diff --git a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go index 0714c2bdbcec..2c96f2103719 100644 --- a/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go +++ b/pkg/spanconfig/spanconfigkvsubscriber/kvsubscriber.go @@ -90,7 +90,7 @@ type KVSubscriber struct { // populated while the exposed spanconfig.StoreReader appears static. // Once sufficiently caught up, the fresh spanconfig.Store is swapped in // and the old discarded. See type-level comment for more details. - internal spanconfig.Store + internal *spanconfigstore.Store handlers []handler } } @@ -208,6 +208,26 @@ func (s *KVSubscriber) GetSpanConfigForKey( return s.mu.internal.GetSpanConfigForKey(ctx, key) } +// GetProtectionTimestamps is part of the spanconfig.KVSubscriber interface. +func (s *KVSubscriber) GetProtectionTimestamps( + ctx context.Context, sp roachpb.Span, +) (protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp, _ error) { + s.mu.RLock() + defer s.mu.RUnlock() + + if err := s.mu.internal.ForEachOverlappingSpanConfig(ctx, sp, + func(_ roachpb.Span, config roachpb.SpanConfig) error { + for _, protection := range config.GCPolicy.ProtectionPolicies { + protectionTimestamps = append(protectionTimestamps, protection.ProtectedTimestamp) + } + return nil + }); err != nil { + return nil, hlc.Timestamp{}, err + } + + return protectionTimestamps, s.mu.lastUpdated, nil +} + func (s *KVSubscriber) handleUpdate(ctx context.Context, u rangefeedcache.Update) { switch u.Type { case rangefeedcache.CompleteUpdate: diff --git a/pkg/spanconfig/spanconfigptsreader/BUILD.bazel b/pkg/spanconfig/spanconfigptsreader/BUILD.bazel index 5f3af5de4f52..c211793c9f7b 100644 --- a/pkg/spanconfig/spanconfigptsreader/BUILD.bazel +++ b/pkg/spanconfig/spanconfigptsreader/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "spanconfigptsreader", @@ -6,10 +6,28 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader", visibility = ["//visibility:public"], deps = [ - "//pkg/kv/kvserver/protectedts/ptcache", + "//pkg/kv/kvserver/protectedts", "//pkg/roachpb", "//pkg/spanconfig", + "//pkg/testutils", "//pkg/util/hlc", "@com_github_cockroachdb_errors//:errors", ], ) + +go_test( + name = "spanconfigptsreader_test", + srcs = ["adapter_test.go"], + embed = [":spanconfigptsreader"], + deps = [ + "//pkg/keys", + "//pkg/kv/kvserver/protectedts", + "//pkg/roachpb", + "//pkg/spanconfig", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/stop", + "//pkg/util/uuid", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/spanconfig/spanconfigptsreader/adapter.go b/pkg/spanconfig/spanconfigptsreader/adapter.go index 7ebf4f66a1f1..b6a853cb5566 100644 --- a/pkg/spanconfig/spanconfigptsreader/adapter.go +++ b/pkg/spanconfig/spanconfigptsreader/adapter.go @@ -12,10 +12,12 @@ package spanconfigptsreader import ( "context" + "testing" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptcache" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -36,21 +38,20 @@ import ( // // TODO(arul): In 22.2, we would have completely migrated away from the old // subsystem, and we'd be able to get rid of this interface. -// -// TODO(arul): Add the KVSubscriber here as well and actually encapsulate PTS -// information from both these sources as described above; This will happen once -// we make the KVSubscriber implement the spanconfig.ProtectedTSReader -// interface. type adapter struct { - cache *ptcache.Cache + cache protectedts.Cache + kvSubscriber spanconfig.KVSubscriber } var _ spanconfig.ProtectedTSReader = &adapter{} // NewAdapter returns an adapter that implements spanconfig.ProtectedTSReader. -func NewAdapter(cache *ptcache.Cache) spanconfig.ProtectedTSReader { +func NewAdapter( + cache protectedts.Cache, kvSubscriber spanconfig.KVSubscriber, +) spanconfig.ProtectedTSReader { return &adapter{ - cache: cache, + cache: cache, + kvSubscriber: kvSubscriber, } } @@ -58,21 +59,43 @@ func NewAdapter(cache *ptcache.Cache) spanconfig.ProtectedTSReader { // interface. func (a *adapter) GetProtectionTimestamps( ctx context.Context, sp roachpb.Span, -) (protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp) { - return a.cache.GetProtectionTimestamps(ctx, sp) +) (protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp, err error) { + cacheTimestamps, cacheFreshness, err := a.cache.GetProtectionTimestamps(ctx, sp) + if err != nil { + return nil, hlc.Timestamp{}, err + } + subscriberTimestamps, subscriberFreshness, err := a.kvSubscriber.GetProtectionTimestamps(ctx, sp) + if err != nil { + return nil, hlc.Timestamp{}, err + } + + // The freshness of the adapter is the minimum freshness of the Cache and + // KVSubscriber. + subscriberFreshness.Backward(cacheFreshness) + return append(subscriberTimestamps, cacheTimestamps...), subscriberFreshness, nil } // TestingRefreshPTSState refreshes the in-memory protected timestamp state to // at least asOf. -// TODO(arul): Once we wrap the KVSubscriber in this adapter interface, we'll -// need to ensure that the subscriber is at-least as caught up as the supplied -// asOf timestamp as well. func TestingRefreshPTSState( - ctx context.Context, protectedTSReader spanconfig.ProtectedTSReader, asOf hlc.Timestamp, + ctx context.Context, + t *testing.T, + protectedTSReader spanconfig.ProtectedTSReader, + asOf hlc.Timestamp, ) error { a, ok := protectedTSReader.(*adapter) if !ok { - return errors.AssertionFailedf("could not convert protectedts.Provider to ptprovider.Provider") + return errors.AssertionFailedf("could not convert protectedTSReader to adapter") } + testutils.SucceedsSoon(t, func() error { + _, fresh, err := a.GetProtectionTimestamps(ctx, roachpb.Span{}) + if err != nil { + return err + } + if fresh.Less(asOf) { + return errors.AssertionFailedf("KVSubscriber fresh as of %s; not caught up to %s", fresh, asOf) + } + return nil + }) return a.cache.Refresh(ctx, asOf) } diff --git a/pkg/spanconfig/spanconfigptsreader/adapter_test.go b/pkg/spanconfig/spanconfigptsreader/adapter_test.go new file mode 100644 index 000000000000..ff008f0fb622 --- /dev/null +++ b/pkg/spanconfig/spanconfigptsreader/adapter_test.go @@ -0,0 +1,167 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package spanconfigptsreader + +import ( + "context" + "sort" + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/spanconfig" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +// TestAdapter ensures that the spanconfigptsreader.adapter correctly returns +// protected timestamp information +func TestAdapter(t *testing.T) { + defer leaktest.AfterTest(t)() + ts := func(nanos int) hlc.Timestamp { + return hlc.Timestamp{ + WallTime: int64(nanos), + } + } + validateProtectedTimestamps := func(expected, actual []hlc.Timestamp) { + sort.Slice(actual, func(i, j int) bool { + return actual[i].Less(actual[j]) + }) + require.Equal(t, expected, actual) + } + + mc := &manualCache{} + ms := &manualSubscriber{} + + adapter := NewAdapter(mc, ms) + ctx := context.Background() + + // Setup with an empty subscriber and cache; ensure no records are returned + // and the freshness timestamp is the minimum of the two. + mc.asOf = ts(10) + ms.updatedTS = ts(14) + timestamps, asOf, err := adapter.GetProtectionTimestamps(ctx, keys.EverythingSpan) + require.NoError(t, err) + require.Empty(t, timestamps) + require.Equal(t, ts(10), asOf) + + // Forward the freshness of the cache past the subscriber's. + mc.asOf = ts(18) + timestamps, asOf, err = adapter.GetProtectionTimestamps(ctx, keys.EverythingSpan) + require.NoError(t, err) + require.Empty(t, timestamps) + require.Equal(t, ts(14), asOf) + + // Add some records to the cache; ensure they're returned. + mc.protectedTimestamps = append(mc.protectedTimestamps, ts(6), ts(10)) + mc.asOf = ts(20) + + timestamps, asOf, err = adapter.GetProtectionTimestamps(ctx, keys.EverythingSpan) + require.NoError(t, err) + require.Equal(t, ts(14), asOf) + validateProtectedTimestamps([]hlc.Timestamp{ts(6), ts(10)}, timestamps) + + // Add some records to the subscriber, ensure they're returned as well. + ms.protectedTimestamps = append(ms.protectedTimestamps, ts(7), ts(12)) + ms.updatedTS = ts(19) + timestamps, asOf, err = adapter.GetProtectionTimestamps(ctx, keys.EverythingSpan) + require.NoError(t, err) + require.Equal(t, ts(19), asOf) + validateProtectedTimestamps([]hlc.Timestamp{ts(6), ts(7), ts(10), ts(12)}, timestamps) + + // Clear out records from the cache, bump its freshness. + mc.protectedTimestamps = nil + mc.asOf = ts(22) + timestamps, asOf, err = adapter.GetProtectionTimestamps(ctx, keys.EverythingSpan) + require.NoError(t, err) + require.Equal(t, ts(19), asOf) + validateProtectedTimestamps([]hlc.Timestamp{ts(7), ts(12)}, timestamps) + + // Clear out records from the subscriber, bump its freshness. + ms.protectedTimestamps = nil + ms.updatedTS = ts(25) + timestamps, asOf, err = adapter.GetProtectionTimestamps(ctx, keys.EverythingSpan) + require.NoError(t, err) + require.Equal(t, ts(22), asOf) + require.Empty(t, timestamps) +} + +type manualSubscriber struct { + protectedTimestamps []hlc.Timestamp + updatedTS hlc.Timestamp +} + +var _ spanconfig.KVSubscriber = &manualSubscriber{} + +func (m *manualSubscriber) GetProtectionTimestamps( + context.Context, roachpb.Span, +) ([]hlc.Timestamp, hlc.Timestamp, error) { + return m.protectedTimestamps, m.updatedTS, nil +} + +func (m *manualSubscriber) Start(context.Context, *stop.Stopper) error { + panic("unimplemented") +} + +func (m *manualSubscriber) NeedsSplit(ctx context.Context, start, end roachpb.RKey) bool { + panic("unimplemented") +} + +func (m *manualSubscriber) ComputeSplitKey( + context.Context, roachpb.RKey, roachpb.RKey, +) roachpb.RKey { + panic("unimplemented") +} + +func (m *manualSubscriber) GetSpanConfigForKey( + context.Context, roachpb.RKey, +) (roachpb.SpanConfig, error) { + panic("unimplemented") +} + +func (m *manualSubscriber) LastUpdated() hlc.Timestamp { + return m.updatedTS +} + +func (m *manualSubscriber) Subscribe(callback func(context.Context, roachpb.Span)) { + panic("unimplemented") +} + +type manualCache struct { + asOf hlc.Timestamp + protectedTimestamps []hlc.Timestamp +} + +var _ protectedts.Cache = (*manualCache)(nil) + +func (c *manualCache) GetProtectionTimestamps( + context.Context, roachpb.Span, +) (protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp, err error) { + return c.protectedTimestamps, c.asOf, nil +} + +func (c *manualCache) Iterate( + _ context.Context, start, end roachpb.Key, it protectedts.Iterator, +) hlc.Timestamp { + panic("unimplemented") +} + +func (c *manualCache) Refresh(ctx context.Context, asOf hlc.Timestamp) error { + panic("unimplemented") +} + +func (c *manualCache) QueryRecord(context.Context, uuid.UUID) (exists bool, asOf hlc.Timestamp) { + panic("unimplemented") +} diff --git a/pkg/spanconfig/spanconfigstore/store.go b/pkg/spanconfig/spanconfigstore/store.go index a83c9f5b4903..ab633716562b 100644 --- a/pkg/spanconfig/spanconfigstore/store.go +++ b/pkg/spanconfig/spanconfigstore/store.go @@ -86,6 +86,14 @@ func (s *Store) GetSpanConfigForKey( s.mu.RLock() defer s.mu.RUnlock() + return s.getSpanConfigForKeyRLocked(ctx, key) +} + +// getSpanConfigForKeyRLocked is like GetSpanConfigForKey but requires the +// caller to hold the Store read lock. +func (s *Store) getSpanConfigForKeyRLocked( + ctx context.Context, key roachpb.RKey, +) (roachpb.SpanConfig, error) { conf, found, err := s.mu.spanConfigStore.getSpanConfigForKey(ctx, key) if err != nil { return roachpb.SpanConfig{}, err @@ -107,6 +115,23 @@ func (s *Store) Apply( return deleted, added } +// ForEachOverlappingSpanConfig invokes the supplied callback on each +// span config that overlaps with the supplied span. In addition to the +// SpanConfig, the s pan it applies over is passed into the callback as well. +func (s *Store) ForEachOverlappingSpanConfig( + ctx context.Context, span roachpb.Span, f func(roachpb.Span, roachpb.SpanConfig) error, +) error { + s.mu.RLock() + defer s.mu.RUnlock() + return s.mu.spanConfigStore.forEachOverlapping(span, func(entry spanConfigEntry) error { + config, err := s.getSpanConfigForKeyRLocked(ctx, roachpb.RKey(entry.span.Key)) + if err != nil { + return err + } + return f(entry.span, config) + }) +} + // Copy returns a copy of the Store. func (s *Store) Copy(ctx context.Context) *Store { s.mu.Lock() diff --git a/pkg/spanconfig/spanconfigstore/store_test.go b/pkg/spanconfig/spanconfigstore/store_test.go index defcf67596fa..62c751be37bb 100644 --- a/pkg/spanconfig/spanconfigstore/store_test.go +++ b/pkg/spanconfig/spanconfigstore/store_test.go @@ -33,16 +33,6 @@ func (s *Store) TestingApplyInternal( return s.applyInternal(dryrun, updates...) } -// TestingSpanConfigStoreForEachOverlapping exports an internal method on the -// spanConfigStore for testing purposes. -func (s *Store) TestingSpanConfigStoreForEachOverlapping( - span roachpb.Span, f func(spanConfigEntry) error, -) error { - s.mu.RLock() - defer s.mu.RUnlock() - return s.mu.spanConfigStore.forEachOverlapping(span, f) -} - // TestDataDriven runs datadriven tests against the Store interface. // The syntax is as follows: // @@ -136,12 +126,12 @@ func TestDataDriven(t *testing.T) { span := spanconfigtestutils.ParseSpan(t, spanStr) var results []string - _ = store.TestingSpanConfigStoreForEachOverlapping(span, - func(entry spanConfigEntry) error { + _ = store.ForEachOverlappingSpanConfig(ctx, span, + func(sp roachpb.Span, conf roachpb.SpanConfig) error { results = append(results, spanconfigtestutils.PrintSpanConfigRecord(t, spanconfig.Record{ - Target: spanconfig.MakeTargetFromSpan(entry.span), - Config: entry.config, + Target: spanconfig.MakeTargetFromSpan(sp), + Config: conf, }), ) return nil diff --git a/pkg/spanconfig/spanconfigstore/testdata/batched/system_span_configs_overlap b/pkg/spanconfig/spanconfigstore/testdata/batched/system_span_configs_overlap new file mode 100644 index 000000000000..24e0e8c0e807 --- /dev/null +++ b/pkg/spanconfig/spanconfigstore/testdata/batched/system_span_configs_overlap @@ -0,0 +1,49 @@ +apply +set [b,d):A +set {entire-keyspace}:X +set [e,g):B +---- +added {entire-keyspace}:X +added [b,d):A +added [e,g):B + +# Span encompassing all spans we have inserted. +overlapping span=[a,j) +---- +[b,d):A+X +[e,g):B+X + +apply +set [g,i):C +---- +added [g,i):C + +# Span encompassing all spans we have inserted. +overlapping span=[a,i) +---- +[b,d):A+X +[e,g):B+X +[g,i):C+X + +# Span straddling first and second span. +overlapping span=[b,e) +---- +[b,d):A+X + +# Straddling all 3 spans. +overlapping span=[c,h) +---- +[b,d):A+X +[e,g):B+X +[g,i):C+X + +apply +set {source=1,target=1}:Y +---- +added {source=1,target=1}:Y + +overlapping span=[a,i) +---- +[b,d):A+X+Y +[e,g):B+X+Y +[g,i):C+X+Y diff --git a/pkg/spanconfig/testing_knobs.go b/pkg/spanconfig/testing_knobs.go index 4dfacb96595a..0c755812e5f5 100644 --- a/pkg/spanconfig/testing_knobs.go +++ b/pkg/spanconfig/testing_knobs.go @@ -14,6 +14,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // TestingKnobs provide fine-grained control over the various span config @@ -69,6 +70,10 @@ type TestingKnobs struct { // ReconcilerInitialInterceptor, if set, is invoked at the very outset of // the reconciliation process. ReconcilerInitialInterceptor func() + + // ProtectedTSReaderOverrideFn returns a ProtectedTSReader which is used to + // override the ProtectedTSReader used when setting up a new store. + ProtectedTSReaderOverrideFn func(clock *hlc.Clock) ProtectedTSReader } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.