Skip to content

Commit

Permalink
Merge #92588
Browse files Browse the repository at this point in the history
92588: lease,systemschema: hook up the lease table to regional-by-row partitioning r=ajwerner a=ajwerner

This PR extends the `system.lease` table with REGIONAL BY ROW partitioning for use with #92395. The PR is itself freestanding. 

Epic: CRDB-18596

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
craig[bot] and ajwerner committed Dec 9, 2022
2 parents b68f4a7 + 411c8c2 commit 2485499
Show file tree
Hide file tree
Showing 14 changed files with 301 additions and 72 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/system_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ var systemTableBackupConfiguration = map[string]systemBackupConfiguration{
systemschema.EventLogTable.GetName(): {
shouldIncludeInClusterBackup: optOutOfClusterBackup,
},
systemschema.LeaseTable.GetName(): {
systemschema.LeaseTable().GetName(): {
shouldIncludeInClusterBackup: optOutOfClusterBackup,
},
systemschema.NamespaceTable.GetName(): {
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,6 +1339,8 @@ func (s *SQLServer) preStart(
return err
}

s.leaseMgr.SetRegionPrefix(regionPhysicalRep)

// Start the sql liveness subsystem. We'll need it to get a session.
s.sqlLivenessProvider.Start(ctx, regionPhysicalRep)

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/bootstrap/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func addSystemDescriptorsToSchema(target *MetadataSchema) {
target.AddDescriptorForSystemTenant(systemschema.TenantsTable)

// Add all the other system tables.
target.AddDescriptor(systemschema.LeaseTable)
target.AddDescriptor(systemschema.LeaseTable())
target.AddDescriptor(systemschema.EventLogTable)
target.AddDescriptor(systemschema.RangeEventTable)
target.AddDescriptor(systemschema.UITable)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/catalog/lease/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_library(
"//pkg/sql/catalog/nstree",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/enum",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/tree",
Expand Down Expand Up @@ -95,6 +96,7 @@ go_test(
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/enum",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/rowenc/keyside",
"//pkg/sql/sem/tree",
Expand All @@ -111,6 +113,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/encoding",
"//pkg/util/envutil",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
15 changes: 11 additions & 4 deletions pkg/sql/catalog/lease/descriptor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,10 @@ func (t *descriptorState) findForTimestamp(

// upsertLeaseLocked inserts a lease for a particular descriptor version.
// If an existing lease exists for the descriptor version it replaces
// it and returns it.
// it and returns it. The regionEnumPrefix is used if the cluster is configured
// for multi-region system tables.
func (t *descriptorState) upsertLeaseLocked(
ctx context.Context, desc catalog.Descriptor, expiration hlc.Timestamp,
ctx context.Context, desc catalog.Descriptor, expiration hlc.Timestamp, regionEnumPrefix []byte,
) (createdDescriptorVersionState *descriptorVersionState, toRelease *storedLease, _ error) {
if t.mu.maxVersionSeen < desc.GetVersion() {
t.mu.maxVersionSeen = desc.GetVersion()
Expand All @@ -140,7 +141,7 @@ func (t *descriptorState) upsertLeaseLocked(
if t.mu.active.findNewest() != nil {
log.Infof(ctx, "new lease: %s", desc)
}
descState := newDescriptorVersionState(t, desc, expiration, true /* isLease */)
descState := newDescriptorVersionState(t, desc, expiration, regionEnumPrefix, true /* isLease */)
t.mu.active.insert(descState)
return descState, nil, nil
}
Expand All @@ -163,6 +164,7 @@ func (t *descriptorState) upsertLeaseLocked(
s.mu.expiration = expiration
toRelease = s.mu.lease
s.mu.lease = &storedLease{
prefix: regionEnumPrefix,
id: desc.GetID(),
version: int(desc.GetVersion()),
expiration: storedLeaseExpiration(expiration),
Expand All @@ -176,7 +178,11 @@ func (t *descriptorState) upsertLeaseLocked(
var _ redact.SafeMessager = (*descriptorVersionState)(nil)

func newDescriptorVersionState(
t *descriptorState, desc catalog.Descriptor, expiration hlc.Timestamp, isLease bool,
t *descriptorState,
desc catalog.Descriptor,
expiration hlc.Timestamp,
prefix []byte,
isLease bool,
) *descriptorVersionState {
descState := &descriptorVersionState{
t: t,
Expand All @@ -186,6 +192,7 @@ func newDescriptorVersionState(
if isLease {
descState.mu.lease = &storedLease{
id: desc.GetID(),
prefix: prefix,
version: int(desc.GetVersion()),
expiration: storedLeaseExpiration(expiration),
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/lease/descriptor_version_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
// A lease stored in system.lease.
type storedLease struct {
id descpb.ID
prefix []byte
version int
expiration tree.DTimestamp
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/sql/catalog/lease/ie_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/errors"
)
Expand All @@ -26,6 +27,23 @@ type ieWriter struct {
}

func newInternalExecutorWriter(ie sqlutil.InternalExecutor, tableName string) *ieWriter {
if systemschema.TestSupportMultiRegion() {
const (
deleteLease = `
DELETE FROM %s
WHERE (crdb_region, "descID", version, "nodeID", expiration)
= ($1, $2, $3, $4, $5);`
insertLease = `
INSERT
INTO %s (crdb_region, "descID", version, "nodeID", expiration)
VALUES ($1, $2, $3, $4, $5)`
)
return &ieWriter{
ie: ie,
insertQuery: fmt.Sprintf(insertLease, tableName),
deleteQuery: fmt.Sprintf(deleteLease, tableName),
}
}
const (
deleteLease = `
DELETE FROM %s
Expand All @@ -44,6 +62,16 @@ VALUES ($1, $2, $3, $4)`
}

func (w *ieWriter) deleteLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
if systemschema.TestSupportMultiRegion() {
_, err := w.ie.Exec(
ctx,
"lease-release",
nil, /* txn */
w.deleteQuery,
l.regionPrefix, l.descID, l.version, l.instanceID, &l.expiration,
)
return err
}
_, err := w.ie.Exec(
ctx,
"lease-release",
Expand All @@ -55,6 +83,18 @@ func (w *ieWriter) deleteLease(ctx context.Context, txn *kv.Txn, l leaseFields)
}

func (w *ieWriter) insertLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
if systemschema.TestSupportMultiRegion() {
count, err := w.ie.Exec(ctx, "lease-insert", txn, w.insertQuery,
l.regionPrefix, l.descID, l.version, l.instanceID, &l.expiration,
)
if err != nil {
return err
}
if count != 1 {
return errors.Errorf("%s: expected 1 result, found %d", w.insertQuery, count)
}
return nil
}
count, err := w.ie.Exec(ctx, "lease-insert", txn, w.insertQuery,
l.descID, l.version, l.instanceID, &l.expiration,
)
Expand Down
28 changes: 20 additions & 8 deletions pkg/sql/catalog/lease/kv_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ func newKVWriter(codec keys.SQLCodec, db *kv.DB, id descpb.ID) *kvWriter {

func leaseTableWithID(id descpb.ID) catalog.TableDescriptor {
if id == keys.LeaseTableID {
return systemschema.LeaseTable
return systemschema.LeaseTable()
}
// Custom IDs are only used for testing.
mut := systemschema.LeaseTable.NewBuilder().
mut := systemschema.LeaseTable().NewBuilder().
BuildExistingMutable().(*tabledesc.Mutable)
mut.ID = id
return mut.ImmutableCopy().(catalog.TableDescriptor)
Expand Down Expand Up @@ -79,14 +79,26 @@ func (w *kvWriter) do(ctx context.Context, txn *kv.Txn, l leaseFields, f addToBa
}

func newBatch(txn *kv.Txn, l leaseFields, f addToBatchFunc) (*kv.Batch, error) {
entries := [...]tree.Datum{
tree.NewDInt(tree.DInt(l.descID)),
tree.NewDInt(tree.DInt(l.version)),
tree.NewDInt(tree.DInt(l.instanceID)),
&l.expiration,

var entries []tree.Datum
if systemschema.TestSupportMultiRegion() {
entries = []tree.Datum{
tree.NewDInt(tree.DInt(l.descID)),
tree.NewDInt(tree.DInt(l.version)),
tree.NewDInt(tree.DInt(l.instanceID)),
&l.expiration,
tree.NewDBytes(tree.DBytes(l.regionPrefix)),
}
} else {
entries = []tree.Datum{
tree.NewDInt(tree.DInt(l.descID)),
tree.NewDInt(tree.DInt(l.version)),
tree.NewDInt(tree.DInt(l.instanceID)),
&l.expiration,
}
}
b := txn.NewBatch()
if err := f(b, entries[:]...); err != nil {
if err := f(b, entries...); err != nil {
return nil, errors.NewAssertionErrorWithWrappedErrf(err, "failed to encode lease entry")
}
return b, nil
Expand Down
24 changes: 22 additions & 2 deletions pkg/sql/catalog/lease/kv_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/enum"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
kvstorage "github.com/cockroachdb/cockroach/pkg/storage"
Expand All @@ -35,6 +36,13 @@ import (
"github.com/stretchr/testify/require"
)

// MoveTablePrimaryIndexIDto2 is used to move the primary index of the created
// lease table from 1 to 2. It is injected from the lease_test package so that
// it can use sql primitives.
var MoveTablePrimaryIndexIDto2 func(
context.Context, *testing.T, serverutils.TestServerInterface, descpb.ID,
)

// TestKVWriterMatchesIEWriter is a rather involved test to exercise the
// kvWriter and ieWriter and confirm that they write exactly the same thing
// to the underlying key-value store. It does this by teeing operations to
Expand All @@ -52,9 +60,17 @@ func TestKVWriterMatchesIEWriter(t *testing.T) {
// Otherwise, we wouldn't get complete SSTs in our export under stress.
tdb.Exec(t, "SET CLUSTER SETTING admission.elastic_cpu.enabled = false")

schema := systemschema.LeaseTableSchema
if systemschema.TestSupportMultiRegion() {
schema = systemschema.MRLeaseTableSchema
}
makeTable := func(name string) (id descpb.ID) {
tdb.Exec(t, strings.Replace(systemschema.LeaseTableSchema, "system.lease", name, 1))
tdb.Exec(t, strings.Replace(schema, "system.lease", name, 1))
tdb.QueryRow(t, "SELECT id FROM system.namespace WHERE name = $1", name).Scan(&id)
// The MR variant of the table uses a non-
if systemschema.TestSupportMultiRegion() {
MoveTablePrimaryIndexIDto2(ctx, t, s, id)
}
return id
}
lease1ID := makeTable("lease1")
Expand Down Expand Up @@ -185,12 +201,16 @@ func generateWriteOps(n, numGroups int) func() (_ []writeOp, wantMore bool) {
if err != nil {
panic(err)
}
return leaseFields{
lf := leaseFields{
descID: descpb.ID(rand.Intn(vals)),
version: descpb.DescriptorVersion(rand.Intn(vals)),
instanceID: base.SQLInstanceID(rand.Intn(vals)),
expiration: *ts,
}
if systemschema.TestSupportMultiRegion() {
lf.regionPrefix = enum.One
}
return lf
}
var existing []leaseFields
return func() ([]writeOp, bool) {
Expand Down
47 changes: 42 additions & 5 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descbuilder"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/enum"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
Expand Down Expand Up @@ -435,7 +437,7 @@ func (m *Manager) insertDescriptorVersions(id descpb.ID, versions []historicalDe
existingVersion := t.mu.active.findVersion(versions[i].desc.GetVersion())
if existingVersion == nil {
t.mu.active.insert(
newDescriptorVersionState(t, versions[i].desc, versions[i].expiration, false))
newDescriptorVersionState(t, versions[i].desc, versions[i].expiration, nil, false))
}
}
}
Expand Down Expand Up @@ -509,7 +511,7 @@ func acquireNodeLease(
if newest != nil {
minExpiration = newest.getExpiration()
}
desc, expiration, err := m.storage.acquire(ctx, minExpiration, id)
desc, expiration, regionPrefix, err := m.storage.acquire(ctx, minExpiration, id)
if err != nil {
return nil, err
}
Expand All @@ -518,7 +520,7 @@ func acquireNodeLease(
t.mu.takenOffline = false
defer t.mu.Unlock()
var newDescVersionState *descriptorVersionState
newDescVersionState, toRelease, err = t.upsertLeaseLocked(ctx, desc, expiration)
newDescVersionState, toRelease, err = t.upsertLeaseLocked(ctx, desc, expiration, regionPrefix)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -734,6 +736,8 @@ func NewLeaseManager(
stopper: stopper,
sem: quotapool.NewIntPool("lease manager", leaseConcurrencyLimit),
}
lm.storage.regionPrefix = &atomic.Value{}
lm.storage.regionPrefix.Store(enum.One)
lm.stopper.AddCloser(lm.sem.Closer("stopper"))
lm.mu.descriptors = make(map[descpb.ID]*descriptorState)
lm.mu.updatesResolvedTimestamp = db.Clock().Now()
Expand Down Expand Up @@ -763,6 +767,22 @@ func (m *Manager) findNewest(id descpb.ID) *descriptorVersionState {
return t.mu.active.findNewest()
}

// SetRegionPrefix sets the prefix this Manager uses to write leases. If val
// is empty, this call is a no-op. Note that the default value is enum.One.
// This means that leases acquired before initial startup may write their
// entry to a remote region.
//
// TODO(ajwerner): We ought to reject attempts to lease descriptors before
// we've set the prefix if the table is partitioned. In principle, this should
// just mean returning ErrDescriptorNotFound. The challenge will be to sort
// out all the conditions in which we expect, or don't expect to get a prefix
// in a timely manner.
func (m *Manager) SetRegionPrefix(val []byte) {
if len(val) > 0 {
m.storage.regionPrefix.Store(val)
}
}

// AcquireByName returns a version for the specified descriptor valid for
// the timestamp. It returns the descriptor and a expiration time.
// A transaction using this descriptor must ensure that its
Expand Down Expand Up @@ -1290,9 +1310,19 @@ func (m *Manager) DeleteOrphanedLeases(ctx context.Context, timeThreshold int64)
// doesn't implement AS OF SYSTEM TIME.

// Read orphaned leases.
sqlQuery := fmt.Sprintf(`
const (
queryWithMR = `
SELECT "descID", version, expiration, crdb_region FROM system.public.lease AS OF SYSTEM TIME %d WHERE "nodeID" = %d
`
queryWithoutMR = `
SELECT "descID", version, expiration FROM system.public.lease AS OF SYSTEM TIME %d WHERE "nodeID" = %d
`, timeThreshold, instanceID)
`
)
query := queryWithoutMR
if systemschema.TestSupportMultiRegion() {
query = queryWithMR
}
sqlQuery := fmt.Sprintf(query, timeThreshold, instanceID)
var rows []tree.Datums
retryOptions := base.DefaultRetryOptions()
retryOptions.Closer = m.stopper.ShouldQuiesce()
Expand All @@ -1319,6 +1349,13 @@ SELECT "descID", version, expiration FROM system.public.lease AS OF SYSTEM TIME
version: int(tree.MustBeDInt(row[1])),
expiration: tree.MustBeDTimestamp(row[2]),
}
if len(row) == 4 {
if ed, ok := row[3].(*tree.DEnum); ok {
lease.prefix = ed.PhysicalRep
} else if bd, ok := row[3].(*tree.DBytes); ok {
lease.prefix = []byte((*bd))
}
}
if err := m.stopper.RunAsyncTaskEx(
ctx,
stop.TaskOpts{
Expand Down
Loading

0 comments on commit 2485499

Please sign in to comment.