Skip to content

Commit

Permalink
lease: migrate lease to rbr compatible index
Browse files Browse the repository at this point in the history
Migrate the system.lease table to an index that is byte compatible with
a regional by row index. The version gates are intended to follow the
protocol discussed in the comment at the top of
upgrades/system_rbr_indexes.go

Writing to the lease table always occurs throught he kv API. The table
is read via the SQL api. The SQL uses of the system.lease table are
forwards compatible as long as they do not need to read the crdb_region.
Queries that interact with individual leases need to retrieve the
crdb_region and must consult the version gate.

Part of cockroachdb#94843

Release Note: None
  • Loading branch information
jeffswenson committed Mar 16, 2023
1 parent a426d93 commit ee55464
Show file tree
Hide file tree
Showing 18 changed files with 182 additions and 176 deletions.
1 change: 1 addition & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
cfg.internalDB,
cfg.clock,
cfg.Settings,
settingsWatcher,
codec,
lmKnobs,
cfg.stopper,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,7 @@ go_test(
"//pkg/security/username",
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/server/settingswatcher",
"//pkg/server/status/statuspb",
"//pkg/server/telemetry",
"//pkg/settings",
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/catalog/bootstrap/testdata/testdata

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions pkg/sql/catalog/lease/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvpb",
"//pkg/multitenant",
"//pkg/roachpb",
"//pkg/server/settingswatcher",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
Expand Down Expand Up @@ -86,6 +88,7 @@ go_test(
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/server/settingswatcher",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
Expand Down
50 changes: 5 additions & 45 deletions pkg/sql/catalog/lease/ie_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"fmt"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/errors"
)
Expand All @@ -27,32 +26,15 @@ type ieWriter struct {
}

func newInternalExecutorWriter(ie isql.Executor, tableName string) *ieWriter {
if systemschema.TestSupportMultiRegion() {
const (
deleteLease = `
const (
deleteLease = `
DELETE FROM %s
WHERE (crdb_region, "descID", version, "nodeID", expiration)
= ($1, $2, $3, $4, $5);`
insertLease = `
insertLease = `
INSERT
INTO %s (crdb_region, "descID", version, "nodeID", expiration)
VALUES ($1, $2, $3, $4, $5)`
)
return &ieWriter{
ie: ie,
insertQuery: fmt.Sprintf(insertLease, tableName),
deleteQuery: fmt.Sprintf(deleteLease, tableName),
}
}
const (
deleteLease = `
DELETE FROM %s
WHERE ("descID", version, "nodeID", expiration)
= ($1, $2, $3, $4);`
insertLease = `
INSERT
INTO %s ("descID", version, "nodeID", expiration)
VALUES ($1, $2, $3, $4)`
)
return &ieWriter{
ie: ie,
Expand All @@ -62,41 +44,19 @@ VALUES ($1, $2, $3, $4)`
}

func (w *ieWriter) deleteLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
if systemschema.TestSupportMultiRegion() {
_, err := w.ie.Exec(
ctx,
"lease-release",
nil, /* txn */
w.deleteQuery,
l.regionPrefix, l.descID, l.version, l.instanceID, &l.expiration,
)
return err
}
_, err := w.ie.Exec(
ctx,
"lease-release",
nil, /* txn */
w.deleteQuery,
l.descID, l.version, l.instanceID, &l.expiration,
l.regionPrefix, l.descID, l.version, l.instanceID, &l.expiration,
)
return err
}

func (w *ieWriter) insertLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
if systemschema.TestSupportMultiRegion() {
count, err := w.ie.Exec(ctx, "lease-insert", txn, w.insertQuery,
l.regionPrefix, l.descID, l.version, l.instanceID, &l.expiration,
)
if err != nil {
return err
}
if count != 1 {
return errors.Errorf("%s: expected 1 result, found %d", w.insertQuery, count)
}
return nil
}
count, err := w.ie.Exec(ctx, "lease-insert", txn, w.insertQuery,
l.descID, l.version, l.instanceID, &l.expiration,
l.regionPrefix, l.descID, l.version, l.instanceID, &l.expiration,
)
if err != nil {
return err
Expand Down
108 changes: 75 additions & 33 deletions pkg/sql/catalog/lease/kv_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ package lease
import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/server/settingswatcher"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand All @@ -27,13 +29,21 @@ import (
// kvWriter implements writer using the raw KV API.
type kvWriter struct {
db *kv.DB
w bootstrap.KVWriter

oldWriter bootstrap.KVWriter
newWriter bootstrap.KVWriter

settingsWatcher *settingswatcher.SettingsWatcher
}

func newKVWriter(codec keys.SQLCodec, db *kv.DB, id descpb.ID) *kvWriter {
func newKVWriter(
codec keys.SQLCodec, db *kv.DB, id descpb.ID, settingsWatcher *settingswatcher.SettingsWatcher,
) *kvWriter {
return &kvWriter{
db: db,
w: bootstrap.MakeKVWriter(codec, leaseTableWithID(id)),
db: db,
newWriter: bootstrap.MakeKVWriter(codec, leaseTableWithID(id)),
oldWriter: bootstrap.MakeKVWriter(codec, systemschema.V22_2_LeaseTable()),
settingsWatcher: settingsWatcher,
}
}

Expand All @@ -48,27 +58,65 @@ func leaseTableWithID(id descpb.ID) catalog.TableDescriptor {
return mut.ImmutableCopy().(catalog.TableDescriptor)
}

func (w *kvWriter) versionGuard(
ctx context.Context, txn *kv.Txn,
) (settingswatcher.VersionGuard, error) {
return w.settingsWatcher.MakeVersionGuard(ctx, txn, clusterversion.V23_1_SystemRbrCleanup)
}

func (w *kvWriter) insertLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
return w.do(ctx, txn, l, func(b *kv.Batch, datum ...tree.Datum) error {
return w.w.Insert(ctx, b, false /* kvTrace */, datum...)
return w.do(ctx, txn, l, func(guard settingswatcher.VersionGuard, b *kv.Batch) error {
if guard.IsActive(clusterversion.V23_1_SystemRbrDualWrite) {
err := w.newWriter.Insert(ctx, b, false /*kvTrace */, leaseAsRbrDatum(l)...)
if err != nil {
return err
}
}
if !guard.IsActive(clusterversion.V23_1_SystemRbrSingleWrite) {
err := w.oldWriter.Insert(ctx, b, false /*kvTrace */, leaseAsRbtDatum(l)...)
if err != nil {
return err
}
}
return nil
})
}

func (w *kvWriter) deleteLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
return w.do(ctx, txn, l, func(b *kv.Batch, datum ...tree.Datum) error {
return w.w.Delete(ctx, b, false /* kvTrace */, datum...)
return w.do(ctx, txn, l, func(guard settingswatcher.VersionGuard, b *kv.Batch) error {
if guard.IsActive(clusterversion.V23_1_SystemRbrDualWrite) {
err := w.newWriter.Delete(ctx, b, false /*kvTrace */, leaseAsRbrDatum(l)...)
if err != nil {
return err
}
}
if !guard.IsActive(clusterversion.V23_1_SystemRbrSingleWrite) {
err := w.oldWriter.Delete(ctx, b, false /*kvTrace */, leaseAsRbtDatum(l)...)
if err != nil {
return err
}
}
return nil
})
}

type addToBatchFunc = func(*kv.Batch, ...tree.Datum) error
type addToBatchFunc = func(settingswatcher.VersionGuard, *kv.Batch) error

func (w *kvWriter) do(ctx context.Context, txn *kv.Txn, l leaseFields, f addToBatchFunc) error {
func (w *kvWriter) do(
ctx context.Context, txn *kv.Txn, lease leaseFields, addToBatch addToBatchFunc,
) error {
run := (*kv.Txn).Run
do := func(ctx context.Context, txn *kv.Txn) error {
b, err := newBatch(txn, l, f)
guard, err := w.versionGuard(ctx, txn)
if err != nil {
return err
}

b := txn.NewBatch()
err = addToBatch(guard, b)
if err != nil {
return errors.NewAssertionErrorWithWrappedErrf(err, "failed to encode lease entry")
}
return run(txn, ctx, b)
}
if txn != nil {
Expand All @@ -78,28 +126,22 @@ func (w *kvWriter) do(ctx context.Context, txn *kv.Txn, l leaseFields, f addToBa
return w.db.Txn(ctx, do)
}

func newBatch(txn *kv.Txn, l leaseFields, f addToBatchFunc) (*kv.Batch, error) {

var entries []tree.Datum
if systemschema.TestSupportMultiRegion() {
entries = []tree.Datum{
tree.NewDInt(tree.DInt(l.descID)),
tree.NewDInt(tree.DInt(l.version)),
tree.NewDInt(tree.DInt(l.instanceID)),
&l.expiration,
tree.NewDBytes(tree.DBytes(l.regionPrefix)),
}
} else {
entries = []tree.Datum{
tree.NewDInt(tree.DInt(l.descID)),
tree.NewDInt(tree.DInt(l.version)),
tree.NewDInt(tree.DInt(l.instanceID)),
&l.expiration,
}
func leaseAsRbrDatum(l leaseFields) []tree.Datum {
return []tree.Datum{
tree.NewDInt(tree.DInt(l.descID)),
tree.NewDInt(tree.DInt(l.version)),
tree.NewDInt(tree.DInt(l.instanceID)),
&l.expiration,
tree.NewDBytes(tree.DBytes(l.regionPrefix)),
}
b := txn.NewBatch()
if err := f(b, entries...); err != nil {
return nil, errors.NewAssertionErrorWithWrappedErrf(err, "failed to encode lease entry")

}

func leaseAsRbtDatum(l leaseFields) []tree.Datum {
return []tree.Datum{
tree.NewDInt(tree.DInt(l.descID)),
tree.NewDInt(tree.DInt(l.version)),
tree.NewDInt(tree.DInt(l.instanceID)),
&l.expiration,
}
return b, nil
}
23 changes: 9 additions & 14 deletions pkg/sql/catalog/lease/kv_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/settingswatcher"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
"github.com/cockroachdb/cockroach/pkg/sql/enum"
Expand Down Expand Up @@ -62,26 +63,22 @@ func TestKVWriterMatchesIEWriter(t *testing.T) {
tdb.Exec(t, "SET CLUSTER SETTING admission.elastic_cpu.enabled = false")

schema := systemschema.LeaseTableSchema
if systemschema.TestSupportMultiRegion() {
schema = systemschema.MRLeaseTableSchema
}
makeTable := func(name string) (id descpb.ID) {
tdb.Exec(t, strings.Replace(schema, "system.lease", name, 1))
tdb.QueryRow(t, "SELECT id FROM system.namespace WHERE name = $1", name).Scan(&id)
// The MR variant of the table uses a non-
if systemschema.TestSupportMultiRegion() {
MoveTablePrimaryIndexIDto2(ctx, t, s, id)
}
MoveTablePrimaryIndexIDto2(ctx, t, s, id)
return id
}
lease1ID := makeTable("lease1")
lease2ID := makeTable("lease2")

ie := s.InternalExecutor().(isql.Executor)
codec := s.LeaseManager().(*Manager).Codec()
settingsWatcher := s.SettingsWatcher().(*settingswatcher.SettingsWatcher)
w := teeWriter{
a: newInternalExecutorWriter(ie, "defaultdb.public.lease1"),
b: newKVWriter(codec, kvDB, lease2ID),
b: newKVWriter(codec, kvDB, lease2ID, settingsWatcher),
}
start := kvDB.Clock().Now()
groups := generateWriteOps(2<<10, 1<<10)
Expand Down Expand Up @@ -205,13 +202,11 @@ func generateWriteOps(n, numGroups int) func() (_ []writeOp, wantMore bool) {
panic(err)
}
lf := leaseFields{
descID: descpb.ID(rand.Intn(vals)),
version: descpb.DescriptorVersion(rand.Intn(vals)),
instanceID: base.SQLInstanceID(rand.Intn(vals)),
expiration: *ts,
}
if systemschema.TestSupportMultiRegion() {
lf.regionPrefix = enum.One
descID: descpb.ID(rand.Intn(vals)),
version: descpb.DescriptorVersion(rand.Intn(vals)),
instanceID: base.SQLInstanceID(rand.Intn(vals)),
expiration: *ts,
regionPrefix: enum.One,
}
return lf
}
Expand Down
Loading

0 comments on commit ee55464

Please sign in to comment.