Skip to content

Commit

Permalink
Merge #120780
Browse files Browse the repository at this point in the history
120780: sql/catalog: address session based leasing bugs r=fqazi a=fqazi

This patch addresses the following issues:
1. Previously the session based leasing upgrade had a race condition against the leasing count queries, which could cause them to incorrect expect the wrong version of the descriptor. To address this, we are going to start using the system database descriptor version to determine if synthetic descriptors should be used (instead of cluster settings).
2. The repair_schema logictest had an intermittent failure because it was force deleting and injecting descriptors instantly. This runs into an existing design limitation of the lease manager where deleted descriptors are instantly cleaned up from memory, but asynchronously removed from storage. When we inject the descriptor with the exact same versions, the lease manager would detect a duplicate row and fails renewing (in the session based model). This condition can only be hit by repair queries and in the expiry based model would lead to duplicate rows. For the purpose of testing we are going to intentionally give a new version when adding the descriptor back, since the sequence in general dangerous with repair queries (i.e. e need to confirm that the lease is no longer in use before injecting the descriptor again or set a new version).


Fixes: #120445 
Fixes #120735 
Fixes #120675


Co-authored-by: Faizan Qazi <[email protected]>
  • Loading branch information
craig[bot] and fqazi committed Mar 21, 2024
2 parents d8c1276 + 48ebfd1 commit 19d874a
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 19 deletions.
28 changes: 19 additions & 9 deletions pkg/sql/catalog/lease/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (
"fmt"
"strings"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
clustersettings "github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema"
Expand Down Expand Up @@ -43,8 +45,17 @@ func CountLeases(
forAnyVersion bool,
) (int, error) {
// Indicates if the leasing descriptor has been upgraded for session based
// leasing.
leasingDescIsSessionBased := hasSessionBasedLeasingDesc(ctx, settings)
// leasing. Note: Unit tests will never provide cached database regions
// so resolve the version from the cluster settings.
var systemDBVersion *roachpb.Version
if cachedDatabaseRegions != nil {
systemDBVersion = cachedDatabaseRegions.GetSystemDatabaseVersion()
} else {
v := settings.Version.ActiveVersion(ctx).Version
systemDBVersion = &v
}
leasingDescIsSessionBased := systemDBVersion != nil &&
systemDBVersion.AtLeast(clusterversion.V24_1_SessionBasedLeasingUpgradeDescriptor.Version())
leasingMode := readSessionBasedLeasingMode(ctx, settings)
whereClauses := make([][]string, 2)
for _, t := range versions {
Expand All @@ -61,13 +72,12 @@ func CountLeases(
t.ID, versionClause),
)
}

whereClauseIdx := make([]int, 0, 2)
syntheticDescriptors := make(catalog.Descriptors, 0, 2)
if leasingMode != SessionBasedOnly {
// The leasing descriptor is not session based yet, so we need to inject
// in synthetically.
if !leasingDescIsSessionBased {
// The leasing descriptor is session based, so we need to inject
// expiry based descriptor synthetically.
if leasingDescIsSessionBased {
syntheticDescriptors = append(syntheticDescriptors, systemschema.LeaseTable_V23_2())
} else {
syntheticDescriptors = append(syntheticDescriptors, nil)
Expand All @@ -76,9 +86,9 @@ func CountLeases(

}
if leasingMode >= SessionBasedDrain {
// The leasing descriptor has been upgraded to be session based, so
// we need to use a synthetic descriptor for the old expiry based format.
if leasingDescIsSessionBased {
// The leasing descriptor is not yet session based, so inject the session
// based descriptor synthetically.
if !leasingDescIsSessionBased {
syntheticDescriptors = append(syntheticDescriptors, systemschema.LeaseTable())
} else {
syntheticDescriptors = append(syntheticDescriptors, nil)
Expand Down
14 changes: 10 additions & 4 deletions pkg/sql/catalog/lease/kv_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func leaseTableWithID(id descpb.ID, table systemschema.SystemTable) catalog.Tabl
}

func (w *kvWriter) insertLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
return w.do(ctx, txn, l, func(b *kv.Batch) error {
if err := w.do(ctx, txn, l, func(b *kv.Batch) error {
// We support writing both session based and expiry based leases within
// the KV writer. To be able to support a migration between the two types
// of writer will in some cases need to be able to write both types of leases.
Expand All @@ -91,11 +91,14 @@ func (w *kvWriter) insertLease(ctx context.Context, txn *kv.Txn, l leaseFields)
}
}
return nil
})
}); err != nil {
return errors.Wrapf(err, "failed to insert lease %v", l)
}
return nil
}

func (w *kvWriter) deleteLease(ctx context.Context, txn *kv.Txn, l leaseFields) error {
return w.do(ctx, txn, l, func(b *kv.Batch) error {
if err := w.do(ctx, txn, l, func(b *kv.Batch) error {
// We support deleting both session based and expiry based leases within
// the KV writer. To be able to support a migration between the two types
// of writer will in some cases need to be able to delete both types of leases.
Expand All @@ -118,7 +121,10 @@ func (w *kvWriter) deleteLease(ctx context.Context, txn *kv.Txn, l leaseFields)
}
}
return nil
})
}); err != nil {
return errors.Wrapf(err, "failed to delete lease: %v", l)
}
return nil
}

type addToBatchFunc = func(*kv.Batch) error
Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,6 @@ func (m *Manager) sessionBasedLeasingModeAtLeast(
return m.getSessionBasedLeasingMode(ctx) >= minimumMode
}

func hasSessionBasedLeasingDesc(ctx context.Context, settings *cluster.Settings) bool {
return !settings.Version.IsActive(ctx, clusterversion.V24_1_SessionBasedLeasingUpgradeDescriptor)
}

func readSessionBasedLeasingMode(
ctx context.Context, settings *cluster.Settings,
) SessionBasedLeasingMode {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/regionliveness/prober.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ type RegionProvider interface {
type CachedDatabaseRegions interface {
IsMultiRegion() bool
GetRegionEnumTypeDesc() catalog.RegionEnumTypeDescriptor
GetSystemDatabaseVersion() *roachpb.Version
}

type livenessProber struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/regions/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
deps = [
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/server/serverpb",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descs",
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/regions/cached_db_regions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
)
Expand Down Expand Up @@ -67,6 +68,11 @@ func (c *CachedDatabaseRegions) GetRegionEnumTypeDesc() catalog.RegionEnumTypeDe
return c.dbRegionEnumDesc.AsRegionEnumTypeDescriptor()
}

// GetSystemDatabaseVersion helper to get the system database version.
func (c *CachedDatabaseRegions) GetSystemDatabaseVersion() *roachpb.Version {
return c.dbDesc.DatabaseDesc().GetSystemDatabaseSchemaVersion()
}

func TestingModifyRegionEnum(
c *CachedDatabaseRegions, mutateFn func(catalog.TypeDescriptor) catalog.TypeDescriptor,
) {
Expand Down
18 changes: 18 additions & 0 deletions pkg/sql/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/regions"
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -174,6 +176,22 @@ func (p *planner) UnsafeUpsertDescriptor(
p.Descriptors().SkipValidationOnWrite()
}

// If we are pushing out a brand new descriptor confirm that no leases
// exist before we publish it. This could happen if we did an unsafe delete,
// since we will not wait for all leases to expire. So, as a safety force the
// unsafe upserts to wait for no leases to exist on this descriptor.
if !force &&
mut.GetVersion() == 1 {
execCfg := p.execCfg
regionCache, err := regions.NewCachedDatabaseRegions(ctx, execCfg.DB, execCfg.LeaseManager)
if err != nil {
return err
}
if err := execCfg.LeaseManager.WaitForNoVersion(ctx, mut.GetID(), regionCache, retry.Options{}); err != nil {
return err
}
}

{
b := p.txn.NewBatch()
if err := p.Descriptors().WriteDescToBatch(
Expand Down
7 changes: 5 additions & 2 deletions pkg/upgrade/upgrades/v24_1_session_based_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func upgradeSystemLeasesDescriptor(
ctx context.Context, version clusterversion.ClusterVersion, deps upgrade.TenantDeps,
) error {
// Upgrade the descriptor in storage to have the new format.
return deps.DB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
if err := deps.DB.DescsTxn(ctx, func(ctx context.Context, txn descs.Txn) error {
leaseTable, err := txn.Descriptors().MutableByID(txn.KV()).Table(ctx, keys.LeaseTableID)
if err != nil {
return err
Expand All @@ -146,5 +146,8 @@ func upgradeSystemLeasesDescriptor(
leaseTable.NextColumnID = newLeaseTableFormat.TableDescriptor.GetNextColumnID()
leaseTable.NextIndexID = newLeaseTableFormat.TableDescriptor.GetNextIndexID()
return txn.Descriptors().WriteDesc(ctx, false, leaseTable, txn.KV())
})
}); err != nil {
return err
}
return bumpSystemDatabaseSchemaVersion(ctx, version, deps)
}

0 comments on commit 19d874a

Please sign in to comment.