Skip to content

Commit

Permalink
spanconfig,protectedts: introduce the ProtectedTSReader interface
Browse files Browse the repository at this point in the history
This patch introduces the `spanconfig.ProtectedTSReader` interface.
It is inteded to replace the `protectedts.Cache` as KVs integration
point for querying protectedTS metadata in the v2 of the protectedTS
subsystem.

For now, the `protectedts.Cache` is the only implementor of it. We will
have the `KVSubscriber` implement it as well once we start shipping
protected timestamp information on span configurations.

The `ProtectedTSReader` interface is also inteded to serve as an adapter
interface between v1 and v2 of the PTS subsystem. In particular, for
v21.2, we will consult both the `protectedts.Cache` and `KVSubscriber`
for PTS information. The logic here will be gated behind a call to
`GetProtectionTimetamps`, which is the only method this interface
provides.

Release note: None

Release justification: low risk, high benefit changes to existing
functionality.
  • Loading branch information
arulajmani committed Mar 1, 2022
1 parent 93b815f commit 365521a
Show file tree
Hide file tree
Showing 18 changed files with 486 additions and 203 deletions.
4 changes: 1 addition & 3 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ go_library(
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/kv/kvserver/raftentry",
"//pkg/kv/kvserver/rangefeed",
"//pkg/kv/kvserver/rditer",
Expand Down Expand Up @@ -336,7 +334,6 @@ go_test(
"//pkg/kv/kvserver/kvserverpb",
"//pkg/kv/kvserver/liveness",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/kv/kvserver/protectedts/ptstorage",
"//pkg/kv/kvserver/raftentry",
Expand All @@ -359,6 +356,7 @@ go_test(
"//pkg/server/telemetry",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigptsreader",
"//pkg/spanconfig/spanconfigstore",
"//pkg/sql",
"//pkg/sql/catalog/bootstrap",
Expand Down
29 changes: 18 additions & 11 deletions pkg/kv/kvserver/client_protectedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptstorage"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand Down Expand Up @@ -212,10 +213,10 @@ func TestProtectedTimestamps(t *testing.T) {

// Verify that the record did indeed make its way down into KV where the
// replica can read it from.
ptp := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider
ptsReader := tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().ProtectedTimestampReader
require.NoError(
t,
verifyProtectionTimestampExistsOnSpans(ctx, tc, ptp, ptsRec.Timestamp, ptsRec.DeprecatedSpans),
verifyProtectionTimestampExistsOnSpans(ctx, tc, ptsReader, ptsRec.Timestamp, ptsRec.DeprecatedSpans),
)

// Make a new record that is doomed to fail.
Expand All @@ -227,11 +228,12 @@ func TestProtectedTimestamps(t *testing.T) {
_, err = ptsWithDB.GetRecord(ctx, nil /* txn */, failedRec.ID.GetUUID())
require.NoError(t, err)

// Verify that the record did indeed make its way down into KV where the
// replica can read it from. We then verify (below) that the failed record
// does not affect the ability to GC.
require.NoError(
t,
verifyProtectionTimestampExistsOnSpans(ctx, tc, ptp, failedRec.Timestamp, failedRec.DeprecatedSpans),
verifyProtectionTimestampExistsOnSpans(ctx, tc, ptsReader, failedRec.Timestamp, failedRec.DeprecatedSpans),
)

// Add a new record that is after the old record.
Expand All @@ -242,7 +244,7 @@ func TestProtectedTimestamps(t *testing.T) {
require.NoError(t, ptsWithDB.Protect(ctx, nil /* txn */, &laterRec))
require.NoError(
t,
verifyProtectionTimestampExistsOnSpans(ctx, tc, ptp, laterRec.Timestamp, laterRec.DeprecatedSpans),
verifyProtectionTimestampExistsOnSpans(ctx, tc, ptsReader, laterRec.Timestamp, laterRec.DeprecatedSpans),
)

// Release the record that had succeeded and ensure that GC eventually
Expand Down Expand Up @@ -271,27 +273,32 @@ func TestProtectedTimestamps(t *testing.T) {
require.Equal(t, int(state.NumRecords), len(state.Records))
}

// verifyProtectionTimestampExistsOnSpans refreshes the PTS state in KV and
// ensures a protection at the given protectionTimestamp exists for all the
// supplied spans.
func verifyProtectionTimestampExistsOnSpans(
ctx context.Context,
tc *testcluster.TestCluster,
provider protectedts.Provider,
pts hlc.Timestamp,
ptsReader spanconfig.ProtectedTSReader,
protectionTimestamp hlc.Timestamp,
spans roachpb.Spans,
) error {
if err := provider.Refresh(ctx, tc.Server(0).Clock().Now()); err != nil {
if err := spanconfigptsreader.TestingRefreshPTSState(
ctx, ptsReader, tc.Server(0).Clock().Now(),
); err != nil {
return err
}
for _, sp := range spans {
timestamps, _ := provider.GetProtectionTimestamps(ctx, sp)
timestamps, _ := ptsReader.GetProtectionTimestamps(ctx, sp)
found := false
for _, ts := range timestamps {
if ts.Equal(pts) {
if ts.Equal(protectionTimestamp) {
found = true
break
}
}
if !found {
return errors.Newf("protection timestamp %s does not exist on span %s", pts, sp)
return errors.Newf("protection timestamp %s does not exist on span %s", protectionTimestamp, sp)
}
}
return nil
Expand Down
15 changes: 9 additions & 6 deletions pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -3517,10 +3518,9 @@ func TestStrictGCEnforcement(t *testing.T) {
t.Helper()
testutils.SucceedsSoon(t, func() error {
for i := 0; i < tc.NumServers(); i++ {
ptp := tc.Server(i).ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider
if ptp.Iterate(ctx, tableKey, tableKey, func(record *ptpb.Record) (wantMore bool) {
return false
}).Less(min) {
ptsReader := tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().ProtectedTimestampReader
_, asOf := ptsReader.GetProtectionTimestamps(ctx, tableSpan)
if asOf.Less(min) {
return errors.Errorf("not yet read")
}
}
Expand Down Expand Up @@ -3572,10 +3572,13 @@ func TestStrictGCEnforcement(t *testing.T) {
}
refreshPastLeaseStart = func(t *testing.T) {
for i := 0; i < tc.NumServers(); i++ {
ptp := tc.Server(i).ExecutorConfig().(sql.ExecutorConfig).ProtectedTimestampProvider
ptsReader := tc.GetFirstStoreFromServer(t, 0).GetStoreConfig().ProtectedTimestampReader
_, r := getFirstStoreReplica(t, tc.Server(i), tableKey)
l, _ := r.GetLease()
require.NoError(t, ptp.Refresh(ctx, l.Start.ToTimestamp().Next()))
require.NoError(
t,
spanconfigptsreader.TestingRefreshPTSState(ctx, ptsReader, l.Start.ToTimestamp().Next()),
)
r.ReadProtectedTimestamps(ctx)
}
}
Expand Down
8 changes: 7 additions & 1 deletion pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigptsreader"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -2431,7 +2432,12 @@ func TestUnsplittableRange(t *testing.T) {
// row. Once the first version of the row is cleaned up, the range should
// exit the split queue purgatory. We need to tickle the protected timestamp
// subsystem to release a timestamp at which we get to actually remove the data.
require.NoError(t, store.GetStoreConfig().ProtectedTimestampCache.Refresh(ctx, s.Clock().Now()))
require.NoError(
t,
spanconfigptsreader.TestingRefreshPTSState(
ctx, store.GetStoreConfig().ProtectedTimestampReader, s.Clock().Now(),
),
)
repl := store.LookupReplica(tableKey)
if err := store.ManualMVCCGC(repl); err != nil {
t.Fatal(err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ func (r *Replica) ReadProtectedTimestamps(ctx context.Context) {
defer r.maybeUpdateCachedProtectedTS(&ts)
r.mu.RLock()
defer r.mu.RUnlock()
ts = r.readProtectedTimestampsRLocked(ctx, nil /* f */)
ts = r.readProtectedTimestampsRLocked(ctx)
}

// ClosedTimestampPolicy returns the closed timestamp policy of the range, which
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/stop",
Expand Down
17 changes: 17 additions & 0 deletions pkg/kv/kvserver/protectedts/ptcache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
Expand Down Expand Up @@ -73,6 +74,7 @@ func New(config Config) *Cache {
}

var _ protectedts.Cache = (*Cache)(nil)
var _ spanconfig.ProtectedTSReader = (*Cache)(nil)

// Iterate is part of the protectedts.Cache interface.
func (c *Cache) Iterate(
Expand Down Expand Up @@ -126,6 +128,21 @@ func (c *Cache) Refresh(ctx context.Context, asOf hlc.Timestamp) error {
return nil
}

// GetProtectionTimestamps is part of the spanconfig.ProtectedTSReader
// interface.
func (c *Cache) GetProtectionTimestamps(
ctx context.Context, sp roachpb.Span,
) (protectionTimestamps []hlc.Timestamp, asOf hlc.Timestamp) {
readAt := c.Iterate(ctx,
sp.Key,
sp.EndKey,
func(rec *ptpb.Record) (wantMore bool) {
protectionTimestamps = append(protectionTimestamps, rec.Timestamp)
return true
})
return protectionTimestamps, readAt
}

// Start starts the periodic fetching of the Cache. A Cache must not be used
// until after it has been started. An error will be returned if it has
// already been started.
Expand Down
Loading

0 comments on commit 365521a

Please sign in to comment.