Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

spanconfig: implement the ProtectedTSReader interface on the KVSubscriber #77338

Merged
merged 1 commit into from
Mar 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ ALL_TESTS = [
"//pkg/spanconfig/spanconfigkvaccessor:spanconfigkvaccessor_test",
"//pkg/spanconfig/spanconfigkvsubscriber:spanconfigkvsubscriber_test",
"//pkg/spanconfig/spanconfigmanager:spanconfigmanager_test",
"//pkg/spanconfig/spanconfigptsreader:spanconfigptsreader_test",
"//pkg/spanconfig/spanconfigreconciler:spanconfigreconciler_test",
"//pkg/spanconfig/spanconfigsqltranslator:spanconfigsqltranslator_test",
"//pkg/spanconfig/spanconfigsqlwatcher:spanconfigsqlwatcher_test",
Expand Down
20 changes: 15 additions & 5 deletions pkg/kv/kvserver/client_protectedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ func TestProtectedTimestamps(t *testing.T) {
ptsReader := tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().ProtectedTimestampReader
require.NoError(
t,
verifyProtectionTimestampExistsOnSpans(ctx, tc, ptsReader, ptsRec.Timestamp, ptsRec.DeprecatedSpans),
verifyProtectionTimestampExistsOnSpans(
ctx, t, tc, ptsReader, ptsRec.Timestamp, ptsRec.DeprecatedSpans,
),
)

// Make a new record that is doomed to fail.
Expand All @@ -233,7 +235,9 @@ func TestProtectedTimestamps(t *testing.T) {
// does not affect the ability to GC.
require.NoError(
t,
verifyProtectionTimestampExistsOnSpans(ctx, tc, ptsReader, failedRec.Timestamp, failedRec.DeprecatedSpans),
verifyProtectionTimestampExistsOnSpans(
ctx, t, tc, ptsReader, failedRec.Timestamp, failedRec.DeprecatedSpans,
),
)

// Add a new record that is after the old record.
Expand All @@ -244,7 +248,9 @@ func TestProtectedTimestamps(t *testing.T) {
require.NoError(t, ptsWithDB.Protect(ctx, nil /* txn */, &laterRec))
require.NoError(
t,
verifyProtectionTimestampExistsOnSpans(ctx, tc, ptsReader, laterRec.Timestamp, laterRec.DeprecatedSpans),
verifyProtectionTimestampExistsOnSpans(
ctx, t, tc, ptsReader, laterRec.Timestamp, laterRec.DeprecatedSpans,
),
)

// Release the record that had succeeded and ensure that GC eventually
Expand Down Expand Up @@ -278,18 +284,22 @@ func TestProtectedTimestamps(t *testing.T) {
// supplied spans.
func verifyProtectionTimestampExistsOnSpans(
ctx context.Context,
t *testing.T,
tc *testcluster.TestCluster,
ptsReader spanconfig.ProtectedTSReader,
protectionTimestamp hlc.Timestamp,
spans roachpb.Spans,
) error {
if err := spanconfigptsreader.TestingRefreshPTSState(
ctx, ptsReader, tc.Server(0).Clock().Now(),
ctx, t, ptsReader, tc.Server(0).Clock().Now(),
); err != nil {
return err
}
for _, sp := range spans {
timestamps, _ := ptsReader.GetProtectionTimestamps(ctx, sp)
timestamps, _, err := ptsReader.GetProtectionTimestamps(ctx, sp)
if err != nil {
return err
}
found := false
for _, ts := range timestamps {
if ts.Equal(protectionTimestamp) {
Expand Down
13 changes: 9 additions & 4 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3519,7 +3519,10 @@ func TestStrictGCEnforcement(t *testing.T) {
testutils.SucceedsSoon(t, func() error {
for i := 0; i < tc.NumServers(); i++ {
ptsReader := tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().ProtectedTimestampReader
_, asOf := ptsReader.GetProtectionTimestamps(ctx, tableSpan)
_, asOf, err := ptsReader.GetProtectionTimestamps(ctx, tableSpan)
if err != nil {
return err
}
if asOf.Less(min) {
return errors.Errorf("not yet read")
}
Expand Down Expand Up @@ -3577,9 +3580,10 @@ func TestStrictGCEnforcement(t *testing.T) {
l, _ := r.GetLease()
require.NoError(
t,
spanconfigptsreader.TestingRefreshPTSState(ctx, ptsReader, l.Start.ToTimestamp().Next()),
spanconfigptsreader.TestingRefreshPTSState(ctx, t, ptsReader, l.Start.ToTimestamp().Next()),
)
r.ReadProtectedTimestamps(ctx)
err := r.ReadProtectedTimestamps(ctx)
require.NoError(t, err)
}
}
refreshCacheAndUpdatePTSState = func(t *testing.T, nodeID roachpb.NodeID) {
Expand All @@ -3590,7 +3594,8 @@ func TestStrictGCEnforcement(t *testing.T) {
ptp := tc.Server(i).ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider
require.NoError(t, ptp.Refresh(ctx, tc.Server(i).Clock().Now()))
_, r := getFirstStoreReplica(t, tc.Server(i), tableKey)
r.ReadProtectedTimestamps(ctx)
err := r.ReadProtectedTimestamps(ctx)
require.NoError(t, err)
}
}
)
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/client_spanconfigs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestSpanConfigUpdateAppliedToReplica(t *testing.T) {
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
DisableSplitQueue: true,
DisableGCQueue: true,
},
SpanConfig: &spanconfig.TestingKnobs{
StoreKVSubscriberOverride: mockSubscriber,
Expand Down Expand Up @@ -118,6 +119,12 @@ func (m *mockSpanConfigSubscriber) GetSpanConfigForKey(
return m.Store.GetSpanConfigForKey(ctx, key)
}

func (m *mockSpanConfigSubscriber) GetProtectionTimestamps(
context.Context, roachpb.Span,
) ([]hlc.Timestamp, hlc.Timestamp, error) {
panic("unimplemented")
}

func (m *mockSpanConfigSubscriber) LastUpdated() hlc.Timestamp {
panic("unimplemented")
}
Expand Down
14 changes: 5 additions & 9 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -2385,6 +2385,9 @@ func TestUnsplittableRange(t *testing.T) {
DefaultZoneConfigOverride: &zoneConfig,
DefaultSystemZoneConfigOverride: &zoneSystemConfig,
},
SpanConfig: &spanconfig.TestingKnobs{
ProtectedTSReaderOverrideFn: spanconfig.EmptyProtectedTSReader,
},
},
})
s := serv.(*server.TestServer)
Expand Down Expand Up @@ -2430,14 +2433,7 @@ func TestUnsplittableRange(t *testing.T) {
manualClock.Increment(10 * ttl.Nanoseconds())
// Trigger the MVCC GC queue, which should clean up the earlier version of the
// row. Once the first version of the row is cleaned up, the range should
// exit the split queue purgatory. We need to tickle the protected timestamp
// subsystem to release a timestamp at which we get to actually remove the data.
require.NoError(
t,
spanconfigptsreader.TestingRefreshPTSState(
ctx, store.GetStoreConfig().ProtectedTimestampReader, s.Clock().Now(),
),
)
// exit the split queue purgatory.
repl := store.LookupReplica(tableKey)
if err := store.ManualMVCCGC(repl); err != nil {
t.Fatal(err)
Expand Down
6 changes: 4 additions & 2 deletions pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,12 +483,14 @@ func (r *Replica) MaybeUnquiesceAndWakeLeader() bool {
return r.maybeUnquiesceAndWakeLeaderLocked()
}

func (r *Replica) ReadProtectedTimestamps(ctx context.Context) {
func (r *Replica) ReadProtectedTimestamps(ctx context.Context) error {
var ts cachedProtectedTimestampState
defer r.maybeUpdateCachedProtectedTS(&ts)
r.mu.RLock()
defer r.mu.RUnlock()
ts = r.readProtectedTimestampsRLocked(ctx)
var err error
ts, err = r.readProtectedTimestampsRLocked(ctx)
return err
}

// ClosedTimestampPolicy returns the closed timestamp policy of the range, which
Expand Down
11 changes: 9 additions & 2 deletions pkg/kv/kvserver/mvcc_gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,11 @@ func (mgcq *mvccGCQueue) shouldQueue(
// Consult the protected timestamp state to determine whether we can GC and
// the timestamp which can be used to calculate the score.
_, conf := repl.DescAndSpanConfig()
canGC, _, gcTimestamp, oldThreshold, newThreshold := repl.checkProtectedTimestampsForGC(ctx, conf.TTL())
canGC, _, gcTimestamp, oldThreshold, newThreshold, err := repl.checkProtectedTimestampsForGC(ctx, conf.TTL())
if err != nil {
log.VErrEventf(ctx, 2, "failed to check protected timestamp for gc: %v", err)
return false, 0
}
if !canGC {
return false, 0
}
Expand Down Expand Up @@ -525,7 +529,10 @@ func (mgcq *mvccGCQueue) process(
// Consult the protected timestamp state to determine whether we can GC and
// the timestamp which can be used to calculate the score and updated GC
// threshold.
canGC, cacheTimestamp, gcTimestamp, oldThreshold, newThreshold := repl.checkProtectedTimestampsForGC(ctx, conf.TTL())
canGC, cacheTimestamp, gcTimestamp, oldThreshold, newThreshold, err := repl.checkProtectedTimestampsForGC(ctx, conf.TTL())
if err != nil {
return false, err
}
if !canGC {
return false, nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/protectedts/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/spanconfig",
"//pkg/util/hlc",
"//pkg/util/metric",
"//pkg/util/stop",
Expand Down
8 changes: 8 additions & 0 deletions pkg/kv/kvserver/protectedts/protectedts.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -115,6 +116,7 @@ type Iterator func(*ptpb.Record) (wantMore bool)
// by any Records at a given asOf can move its GC threshold up to that
// timestamp less its GC TTL.
type Cache interface {
spanconfig.ProtectedTSReader

// Iterate examines the records with spans which overlap with [from, to).
// Nil values for from or to are equivalent to Key{}. The order of records
Expand Down Expand Up @@ -178,3 +180,9 @@ func (c *emptyCache) QueryRecord(
func (c *emptyCache) Refresh(_ context.Context, asOf hlc.Timestamp) error {
return nil
}

func (c *emptyCache) GetProtectionTimestamps(
context.Context, roachpb.Span,
) (protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp, err error) {
return protectionTimestamps, (*hlc.Clock)(c).Now(), nil
}
1 change: 0 additions & 1 deletion pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/stop",
Expand Down
6 changes: 2 additions & 4 deletions pkg/kv/kvserver/protectedts/ptcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -74,7 +73,6 @@ func New(config Config) *Cache {
}

var _ protectedts.Cache = (*Cache)(nil)
var _ spanconfig.ProtectedTSReader = (*Cache)(nil)

// Iterate is part of the protectedts.Cache interface.
func (c *Cache) Iterate(
Expand Down Expand Up @@ -132,15 +130,15 @@ func (c *Cache) Refresh(ctx context.Context, asOf hlc.Timestamp) error {
// interface.
func (c *Cache) GetProtectionTimestamps(
ctx context.Context, sp roachpb.Span,
) (protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp) {
) (protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp, err error) {
readAt := c.Iterate(ctx,
sp.Key,
sp.EndKey,
func(rec *ptpb.Record) (wantMore bool) {
protectionTimestamps = append(protectionTimestamps, rec.Timestamp)
return true
})
return protectionTimestamps, readAt
return protectionTimestamps, readAt, nil
}

// Start starts the periodic fetching of the Cache. A Cache must not be used
Expand Down
9 changes: 6 additions & 3 deletions pkg/kv/kvserver/protectedts/ptcache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,8 @@ func TestGetProtectionTimestamps(t *testing.T) {
r3, _ := protect(t, s, p, ts(6), sp42)
require.NoError(t, c.Refresh(ctx, s.Clock().Now()))

protectionTimestamps, _ := c.GetProtectionTimestamps(ctx, sp42)
protectionTimestamps, _, err := c.GetProtectionTimestamps(ctx, sp42)
require.NoError(t, err)
sort.Slice(protectionTimestamps, func(i, j int) bool {
return protectionTimestamps[i].Less(protectionTimestamps[j])
})
Expand All @@ -417,7 +418,8 @@ func TestGetProtectionTimestamps(t *testing.T) {
r1, _ := protect(t, s, p, ts(5), sp43)
r2, _ := protect(t, s, p, ts(10), sp44)
require.NoError(t, c.Refresh(ctx, s.Clock().Now()))
protectionTimestamps, _ := c.GetProtectionTimestamps(ctx, sp42)
protectionTimestamps, _, err := c.GetProtectionTimestamps(ctx, sp42)
require.NoError(t, err)
require.Equal(t, []hlc.Timestamp(nil), protectionTimestamps)
cleanup(r1, r2)
},
Expand All @@ -435,7 +437,8 @@ func TestGetProtectionTimestamps(t *testing.T) {
r6, _ := protect(t, s, p, ts(20), sp44)
require.NoError(t, c.Refresh(ctx, s.Clock().Now()))

protectionTimestamps, _ := c.GetProtectionTimestamps(ctx, sp4243)
protectionTimestamps, _, err := c.GetProtectionTimestamps(ctx, sp4243)
require.NoError(t, err)
sort.Slice(protectionTimestamps, func(i, j int) bool {
return protectionTimestamps[i].Less(protectionTimestamps[j])
})
Expand Down
22 changes: 15 additions & 7 deletions pkg/kv/kvserver/replica_protected_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (r *Replica) maybeUpdateCachedProtectedTS(ts *cachedProtectedTimestampState

func (r *Replica) readProtectedTimestampsRLocked(
ctx context.Context,
) (ts cachedProtectedTimestampState) {
) (ts cachedProtectedTimestampState, _ error) {
desc := r.descRLocked()
gcThreshold := *r.mu.state.GCThreshold

Expand All @@ -75,7 +75,11 @@ func (r *Replica) readProtectedTimestampsRLocked(
EndKey: roachpb.Key(desc.EndKey),
}
var protectionTimestamps []hlc.Timestamp
protectionTimestamps, ts.readAt = r.store.protectedtsReader.GetProtectionTimestamps(ctx, sp)
var err error
protectionTimestamps, ts.readAt, err = r.store.protectedtsReader.GetProtectionTimestamps(ctx, sp)
if err != nil {
return ts, err
}
earliestTS := hlc.Timestamp{}
for _, protectionTimestamp := range protectionTimestamps {
// Check if the timestamp the record was trying to protect is strictly
Expand All @@ -88,7 +92,7 @@ func (r *Replica) readProtectedTimestampsRLocked(
}
}
ts.earliestProtectionTimestamp = earliestTS
return ts
return ts, nil
}

// checkProtectedTimestampsForGC determines whether the Replica can run GC. If
Expand All @@ -104,7 +108,7 @@ func (r *Replica) readProtectedTimestampsRLocked(
// old gc threshold, and the new gc threshold.
func (r *Replica) checkProtectedTimestampsForGC(
ctx context.Context, gcTTL time.Duration,
) (canGC bool, cacheTimestamp, gcTimestamp, oldThreshold, newThreshold hlc.Timestamp) {
) (canGC bool, cacheTimestamp, gcTimestamp, oldThreshold, newThreshold hlc.Timestamp, _ error) {

// We may be reading the protected timestamp cache while we're holding
// the Replica.mu for reading. If we do so and find newer state in the cache
Expand All @@ -123,7 +127,11 @@ func (r *Replica) checkProtectedTimestampsForGC(

// read.earliestRecord is the record with the earliest timestamp which is
// greater than the existing gcThreshold.
read = r.readProtectedTimestampsRLocked(ctx)
var err error
read, err = r.readProtectedTimestampsRLocked(ctx)
if err != nil {
return false, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}, err
}
gcTimestamp = read.readAt
if !read.earliestProtectionTimestamp.IsEmpty() {
// NB: we want to allow GC up to the timestamp preceding the earliest valid
Expand All @@ -137,12 +145,12 @@ func (r *Replica) checkProtectedTimestampsForGC(
if gcTimestamp.Less(lease.Start.ToTimestamp()) {
log.VEventf(ctx, 1, "not gc'ing replica %v due to new lease %v started after %v",
r, lease, gcTimestamp)
return false, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}
return false, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}, nil
}

newThreshold = gc.CalculateThreshold(gcTimestamp, gcTTL)

return true, read.readAt, gcTimestamp, oldThreshold, newThreshold
return true, read.readAt, gcTimestamp, oldThreshold, newThreshold, nil
}

// markPendingGC is called just prior to sending the GC request to increase the
Expand Down
Loading