Skip to content

Commit

Permalink
spanconfigprotectedts: introduce a pts table reader
Browse files Browse the repository at this point in the history
This change introduces a `ProtectedTimestampTableReader`
that provides a txn scoped, in-memory view of the system
table that stores protected timestamp records.

The `SQLTranslator` will use this table reader to generate
SpanConfigs and SystemSpanConfigs in a follow up PR.

Informs: #73727

Release note: None
  • Loading branch information
adityamaru committed Jan 12, 2022
1 parent 1384cfe commit 080e7ae
Show file tree
Hide file tree
Showing 5 changed files with 421 additions and 0 deletions.
23 changes: 23 additions & 0 deletions pkg/spanconfig/spanconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,29 @@ type StoreReader interface {
GetSpanConfigForKey(ctx context.Context, key roachpb.RKey) (roachpb.SpanConfig, error)
}

// TenantProtectedTimestamps represents all the Protections that apply to a
// tenant's keyspace.
type TenantProtectedTimestamps struct {
Protections []hlc.Timestamp
TenantID roachpb.TenantID
}

// ProtectedTimestampTableReader represents a table reader that returns the
// protected timestamps that apply to the different targets that can be
// protected from GC.
type ProtectedTimestampTableReader interface {
// GetProtectedTimestampsForCluster returns all the protected timestamps that
// apply to the entire cluster's keyspace.
GetProtectedTimestampsForCluster() []hlc.Timestamp
// GetProtectedTimestampsForTenants returns all the protected timestamps that
// apply to a paritcular tenant's keyspace. It returns this for all tenants
// that have protected timestamp records.
GetProtectedTimestampsForTenants() []TenantProtectedTimestamps
// GetProtectedTimestampsForSchemaObject returns all the protected timestamps
// that apply to the descID's keyspan.
GetProtectedTimestampsForSchemaObject(descID descpb.ID) []hlc.Timestamp
}

// DescriptorUpdate captures the ID and the type of descriptor or zone that been
// updated. It's the unit of what the SQLWatcher emits.
type DescriptorUpdate struct {
Expand Down
53 changes: 53 additions & 0 deletions pkg/spanconfig/spanconfigprotectedts/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "spanconfigprotectedts",
srcs = ["protectedts_table_reader.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/spanconfig/spanconfigprotectedts",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/spanconfig",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/util/hlc",
"//pkg/util/protoutil",
"@com_github_cockroachdb_errors//:errors",
],
)

go_test(
name = "spanconfigprotectedts_test",
srcs = [
"main_test.go",
"protectedts_table_reader_test.go",
],
embed = [":spanconfigprotectedts"],
deps = [
"//pkg/base",
"//pkg/jobs",
"//pkg/jobs/jobsprotectedts",
"//pkg/kv",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/roachpb:with-mocks",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/spanconfig",
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/distsql",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/uuid",
"@com_github_stretchr_testify//require",
],
)
31 changes: 31 additions & 0 deletions pkg/spanconfig/spanconfigprotectedts/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package spanconfigprotectedts

import (
"os"
"testing"

"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/security/securitytest"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
)

func TestMain(m *testing.M) {
security.SetAssetLoader(securitytest.EmbeddedAssets)
serverutils.InitTestServerFactory(server.TestServerFactory)
serverutils.InitTestClusterFactory(testcluster.TestClusterFactory)
os.Exit(m.Run())
}

//go:generate ../../util/leaktest/add-leaktest.sh *_test.go
199 changes: 199 additions & 0 deletions pkg/spanconfig/spanconfigprotectedts/protectedts_table_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package spanconfigprotectedts

import (
"context"
"fmt"
"strconv"
"strings"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
)

const (
// ProdSystemProtectedTimestampTable is the system table where we store
// protected timestamp records.
ProdSystemProtectedTimestampTable = "system.protected_ts_records"
tenantKeyPrefix = "tenant"
)

type key string

func makeSchemaObjectKey(id descpb.ID) key {
return key(strconv.FormatUint(uint64(id), 10))
}

func makeTenantKey(tenantID roachpb.TenantID) key {
return key(fmt.Sprintf("%s%d", tenantKeyPrefix, tenantID.ToUint64()))
}

func isTenantKey(k key) bool {
return strings.HasPrefix(string(k), tenantKeyPrefix)
}

func getTenantIDFromKey(k key) roachpb.TenantID {
tenantIDStr := strings.TrimPrefix(string(k), tenantKeyPrefix)
tenantID, err := strconv.ParseUint(tenantIDStr, 10, 64)
if err != nil {
// this should never fail.
return roachpb.TenantID{}
}
return roachpb.MakeTenantID(tenantID)
}

func makeClusterKey() key {
return "cluster"
}

// ProtectedTimestampTableReader provides an in-memory, transaction scoped view
// of the system table that stores protected timestamp records.
//
// ProtectedTimestampTableReader implements the
// spanconfig.ProtectedTimestampTableReader interface with which the user can
// read from the in-memory store.
type ProtectedTimestampTableReader struct {
protections map[key][]hlc.Timestamp
}

var _ spanconfig.ProtectedTimestampTableReader = &ProtectedTimestampTableReader{}

// New returns an instance of a ProtectedTimestampTableReader, with the
// in-memory state representing the transaction scoped view of the
// `ptsRecordSystemTable`.
func New(
ctx context.Context, ptsRecordSystemTable string, ie sqlutil.InternalExecutor, txn *kv.Txn,
) (*ProtectedTimestampTableReader, error) {
reader := &ProtectedTimestampTableReader{protections: make(map[key][]hlc.Timestamp)}
if err := reader.loadProtectedTimestampRecords(ctx, ptsRecordSystemTable, ie, txn); err != nil {
return nil, err
}
return reader, nil
}

// GetProtectedTimestampsForCluster implements the ProtectedTimestampTableReader
// interface.
func (p *ProtectedTimestampTableReader) GetProtectedTimestampsForCluster() []hlc.Timestamp {
return p.protections[makeClusterKey()]
}

// GetProtectedTimestampsForTenants implements the ProtectedTimestampTableReader
// interface.
func (p *ProtectedTimestampTableReader) GetProtectedTimestampsForTenants() []spanconfig.TenantProtectedTimestamps {
var res []spanconfig.TenantProtectedTimestamps
for k, v := range p.protections {
if isTenantKey(k) {
res = append(res, spanconfig.TenantProtectedTimestamps{Protections: v, TenantID: getTenantIDFromKey(k)})
}
}
return res
}

// GetProtectedTimestampsForSchemaObject implements the ProtectedTimestampTableReader interface.
func (p *ProtectedTimestampTableReader) GetProtectedTimestampsForSchemaObject(
descID descpb.ID,
) []hlc.Timestamp {
return p.protections[makeSchemaObjectKey(descID)]
}

type protectedTimestampRow struct {
ts hlc.Timestamp
target *ptpb.Target
}

func (p *ProtectedTimestampTableReader) unmarshalProtectedTimestampRecord(
row []tree.Datum, cols []colinfo.ResultColumn,
) (protectedTimestampRow, error) {
var res protectedTimestampRow
if len(row) != len(cols) {
return res, errors.AssertionFailedf("expected only %d columns but found %d", len(cols), len(row))
}

for i := range cols {
datum := tree.UnwrapDatum(nil, row[i])
if datum == tree.DNull {
// We should never see a null value.
return res, errors.AssertionFailedf("unexpected NULL in column: %d of row: %+v", i, row)
}

switch d := datum.(type) {
case *tree.DDecimal:
ts, err := tree.DecimalToHLC(&d.Decimal)
if err != nil {
return res, err
}
res.ts = ts
case *tree.DBytes:
targetBytes := []byte(*d)
target := &ptpb.Target{}
if err := protoutil.Unmarshal(targetBytes, target); err != nil {
return res, errors.Wrapf(err, "failed to unmarshal target column for row: %v", row)
}
res.target = target
default:
return res, errors.Newf("cannot handle type %T", datum)
}
}
return res, nil
}

func (p *ProtectedTimestampTableReader) loadProtectedTimestampRecords(
ctx context.Context, ptsRecordsSystemTable string, ie sqlutil.InternalExecutor, txn *kv.Txn,
) (retErr error) {
getProtectedTimestampRecordStmt := fmt.Sprintf(`SELECT ts, target FROM %s`, ptsRecordsSystemTable)
it, err := ie.QueryIteratorEx(ctx, "load-pts-records", txn,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
getProtectedTimestampRecordStmt)
if err != nil {
return err
}

defer func() { retErr = errors.CombineErrors(retErr, it.Close()) }()

var ok bool
for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) {
row := it.Cur()
ptsRecord, err := p.unmarshalProtectedTimestampRecord(row, it.Types())
if err != nil {
return err
}

// Add an entry to the in-memory mapping.
switch t := ptsRecord.target.Union.(type) {
case *ptpb.Target_Cluster:
clusterKey := makeClusterKey()
p.protections[clusterKey] = append(p.protections[clusterKey], ptsRecord.ts)
case *ptpb.Target_Tenants:
for _, tenID := range t.Tenants.IDs {
tenantKey := makeTenantKey(tenID)
p.protections[tenantKey] = append(p.protections[tenantKey], ptsRecord.ts)
}
case *ptpb.Target_SchemaObjects:
for _, descID := range t.SchemaObjects.IDs {
descKey := makeSchemaObjectKey(descID)
p.protections[descKey] = append(p.protections[descKey], ptsRecord.ts)
}
}
}
return err
}
Loading

0 comments on commit 080e7ae

Please sign in to comment.