Skip to content

Commit

Permalink
ptstorage: change Protect and GetRecord to work with target column
Browse files Browse the repository at this point in the history
Protected timestamp records now apply on a `ptpb.Target` instead
of `roachpb.Spans`. This change modifies the `Protect` and `GetRecord`
methods of the ptstorage interface. Protect will persist the
`ptpb.Target` specified on the passed in `ptpb.Record` as an encoded
protocol buffer in the `target` column, and `GetRecord` will read the
`target` column when constructing a `ptpb.Record` to return to the caller.

If we are running in a mixed version state where the migration that adds
the `target` column to the system.pts_records table has not run,
we use the existing logic to protect Spans. This logic has wholesale
been moved to `deprecatedProtect` method.

Once the migration has run, all calls to protect must
have a non-nil `ptpb.Target` field on the record, as we will no longer
be persisting the `DeprecatedSpans` field in the underlying system table.
Post migration, we do not use the `kv.protectedts.max_spans` to enforce
any limits, but continue to use `kv.protectedts.max_bytes` to enforce a
ceiling on the total size of encoded targets we can store in the subsystem.

Most tests in `ptstorage_test` continue to run in both a pre and post migration
state to ensure that `Storage` is handling both span backed and target backed
records correctly. This can be cleaned up in a future version when we get
rid of spans from the pts record entirely. All other PTS tests in the `kvserver`
package run on a version prior to the pts migration since the remainder of the
new pts system is yet to be implemented. These tests will be revisited as and
when we develop those pieces of the subsystem.

Informs: #73727

Release note: None
  • Loading branch information
adityamaru committed Jan 14, 2022
1 parent ac5f637 commit 6ea1c5e
Show file tree
Hide file tree
Showing 17 changed files with 502 additions and 120 deletions.
1 change: 1 addition & 0 deletions pkg/base/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ type TestingKnobs struct {
SQLLivenessKnobs ModuleTestingKnobs
TelemetryLoggingKnobs ModuleTestingKnobs
DialerKnobs ModuleTestingKnobs
ProtectedTS ModuleTestingKnobs
}
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6747,6 +6747,8 @@ func TestRestoreErrorPropagates(t *testing.T) {

// TestProtectedTimestampsFailDueToLimits ensures that when creating a protected
// timestamp record fails, we return the correct error.
//
// TODO(adityamaru): Remove in 22.2 once no records protect spans.
func TestProtectedTimestampsFailDueToLimits(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/client_protectedts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ func TestProtectedTimestamps(t *testing.T) {
beforeWrites := s0.Clock().Now()
gcSoon()

pts := ptstorage.New(s0.ClusterSettings(), s0.InternalExecutor().(*sql.InternalExecutor))
pts := ptstorage.New(s0.ClusterSettings(), s0.InternalExecutor().(*sql.InternalExecutor),
nil /* knobs */)
ptsWithDB := ptstorage.WithDatabase(pts, s0.DB())
startKey := getTableStartKey("foo")
ptsRec := ptpb.Record{
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvserver/protectedts/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ go_library(
srcs = [
"protectedts.go",
"settings.go",
"testing_knobs.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts",
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/kv",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb:with-mocks",
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvserver/protectedts/ptcache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ go_test(
"//pkg/base",
"//pkg/keys",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/kv/kvserver/protectedts/ptstorage",
Expand Down
18 changes: 8 additions & 10 deletions pkg/kv/kvserver/protectedts/ptcache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptcache"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
Expand Down Expand Up @@ -48,8 +47,8 @@ func TestCacheBasic(t *testing.T) {
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)
s := tc.Server(0)
p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(),
s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB())
p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor),
nil /* knobs */), s.DB())

// Set the poll interval to be very short.
protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Microsecond)
Expand Down Expand Up @@ -110,15 +109,15 @@ func TestRefresh(t *testing.T) {
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: kvserverbase.ReplicaRequestFilter(st.requestFilter),
TestingRequestFilter: st.requestFilter,
},
},
},
})
defer tc.Stopper().Stop(ctx)
s := tc.Server(0)
p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(),
s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB())
s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB())

// Set the poll interval to be very long.
protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour)
Expand Down Expand Up @@ -225,8 +224,7 @@ func TestStart(t *testing.T) {
setup := func() (*testcluster.TestCluster, *ptcache.Cache) {
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{})
s := tc.Server(0)
p := ptstorage.New(s.ClusterSettings(),
s.InternalExecutor().(sqlutil.InternalExecutor))
p := ptstorage.New(s.ClusterSettings(), s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */)
// Set the poll interval to be very long.
protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour)
c := ptcache.New(ptcache.Config{
Expand Down Expand Up @@ -259,7 +257,7 @@ func TestQueryRecord(t *testing.T) {
defer tc.Stopper().Stop(ctx)
s := tc.Server(0)
p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(),
s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB())
s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB())
// Set the poll interval to be very long.
protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour)
c := ptcache.New(ptcache.Config{
Expand Down Expand Up @@ -316,7 +314,7 @@ func TestIterate(t *testing.T) {
defer tc.Stopper().Stop(ctx)
s := tc.Server(0)
p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(),
s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB())
s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB())

// Set the poll interval to be very long.
protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour)
Expand Down Expand Up @@ -381,7 +379,7 @@ func TestSettingChangedLeadsToFetch(t *testing.T) {
defer tc.Stopper().Stop(ctx)
s := tc.Server(0)
p := ptstorage.WithDatabase(ptstorage.New(s.ClusterSettings(),
s.InternalExecutor().(sqlutil.InternalExecutor)), s.DB())
s.InternalExecutor().(sqlutil.InternalExecutor), nil /* knobs */), s.DB())

// Set the poll interval to be very long.
protectedts.PollInterval.Override(ctx, &s.ClusterSettings().SV, 500*time.Hour)
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/protectedts/ptprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Config struct {
Stores *kvserver.Stores
ReconcileStatusFuncs ptreconcile.StatusFuncs
InternalExecutor sqlutil.InternalExecutor
Knobs *protectedts.TestingKnobs
}

type provider struct {
Expand All @@ -48,7 +49,7 @@ func New(cfg Config) (protectedts.Provider, error) {
if err := validateConfig(cfg); err != nil {
return nil, err
}
storage := ptstorage.New(cfg.Settings, cfg.InternalExecutor)
storage := ptstorage.New(cfg.Settings, cfg.InternalExecutor, cfg.Knobs)
verifier := ptverifier.New(cfg.DB, storage)
return &provider{
Storage: storage,
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/protectedts/ptstorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptstorage",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/kv",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
Expand Down Expand Up @@ -48,12 +49,15 @@ go_test(
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
Expand Down
54 changes: 53 additions & 1 deletion pkg/kv/kvserver/protectedts/ptstorage/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ WITH
checks AS (` + protectChecksCTE + `),
updated_meta AS (` + protectUpsertMetaCTE + `),
new_record AS (` + protectInsertRecordCTE + `)
SELECT
failed,
total_bytes AS prev_total_bytes,
version AS prev_version
FROM
checks, current_meta;`

// The `target` column was added to `system.protected_ts_records` as part of
// the tenant migration `AlterSystemProtectedTimestampAddColumn` necessitating
// queries that write to the table with and without the column. In a
// mixed-version state (prior to the migration) we use the queries without tha
// target column.
//
// TODO(adityamaru): Delete this in 22.2.
protectQueryWithoutTarget = `
WITH
current_meta AS (` + currentMetaCTE + `),
checks AS (` + protectChecksCTE + `),
updated_meta AS (` + protectUpsertMetaCTE + `),
new_record AS (` + protectInsertRecordWithoutTargetCTE + `)
SELECT
failed,
num_spans AS prev_spans,
Expand Down Expand Up @@ -84,6 +104,20 @@ RETURNING

protectInsertRecordCTE = `
INSERT
INTO
system.protected_ts_records (id, ts, meta_type, meta, num_spans, spans, target)
(
SELECT
$4, $5, $6, $7, $8, $9, $10
WHERE
NOT EXISTS(SELECT * FROM checks WHERE failed)
)
RETURNING
id
`

protectInsertRecordWithoutTargetCTE = `
INSERT
INTO
system.protected_ts_records (id, ts, meta_type, meta, num_spans, spans)
(
Expand All @@ -96,12 +130,30 @@ RETURNING
id
`

getRecordsQueryBase = `
// The `target` column was added to `system.protected_ts_records` as part of
// the tenant migration `AlterSystemProtectedTimestampAddColumn` necessitating
// queries that read from the table with and without the column. In a
// mixed-version state (prior to the migration) we use the queries without tha
// target column.
//
// TODO(adityamaru): Delete this in 22.2.
getRecordsWithoutTargetQueryBase = `
SELECT
id, ts, meta_type, meta, spans, verified
FROM
system.protected_ts_records`

getRecordsWithoutTargetQuery = getRecordsWithoutTargetQueryBase + ";"
getRecordWithoutTargetQuery = getRecordsWithoutTargetQueryBase + `
WHERE
id = $1;`

getRecordsQueryBase = `
SELECT
id, ts, meta_type, meta, spans, verified, target
FROM
system.protected_ts_records`

getRecordsQuery = getRecordsQueryBase + ";"
getRecordQuery = getRecordsQueryBase + `
WHERE
Expand Down
Loading

0 comments on commit 6ea1c5e

Please sign in to comment.