Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: descs: fix txn commit waiting on wrong lease version #87755

Merged
merged 1 commit into from
Sep 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 33 additions & 23 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,20 +237,6 @@ func (tc *Collection) AddUncommittedDescriptor(
desc.DescriptorType(), desc.GetName(), desc.GetID(), desc.GetVersion())
}
}()
original := tc.stored.GetCachedByID(desc.GetID())
if original == nil {
if !desc.IsNew() {
return errors.New("non-new descriptor does not exist in storage yet")
}
if desc.GetVersion() != 1 {
return errors.New("version should be 1, descriptor does not exist in storage yet")
}
} else {
if desc.GetVersion() != original.GetVersion() && desc.GetVersion() != original.GetVersion()+1 {
return errors.Newf("incompatible version, descriptor version %d exists in storage",
original.GetVersion())
}
}
if tc.synthetic.getSyntheticByID(desc.GetID()) != nil {
return errors.AssertionFailedf(
"cannot add uncommitted %s %q (%d) when a synthetic descriptor with the same ID exists",
Expand All @@ -262,7 +248,7 @@ func (tc *Collection) AddUncommittedDescriptor(
desc.DescriptorType(), desc.GetName(), desc.GetID())
}
tc.stored.RemoveFromNameIndex(desc)
return tc.uncommitted.upsert(ctx, original, desc)
return tc.uncommitted.upsert(ctx, desc)
}

// ValidateOnWriteEnabled is the cluster setting used to enable or disable
Expand Down Expand Up @@ -308,16 +294,40 @@ func (tc *Collection) WriteDesc(
return txn.Run(ctx, b)
}

// GetDescriptorsWithNewVersion returns all the IDVersion pairs that have
// undergone a schema change. Returns nil for no schema changes. The version
// returned for each schema change is clusterVersion - 1, because that's the one
// that will be used when checking for table descriptor two version invariance.
func (tc *Collection) GetDescriptorsWithNewVersion() (originalVersions []lease.IDVersion) {
_ = tc.uncommitted.iterateNewVersionByID(func(originalVersion lease.IDVersion) error {
originalVersions = append(originalVersions, originalVersion)
// GetOriginalPreviousIDVersionsForUncommitted returns all the IDVersion
// pairs for descriptors that have undergone a schema change.
// Returns an empty slice for no schema changes.
//
// The version returned for each schema change is clusterVersion - 1, because
// that's the one that will be used when checking for table descriptor
// two-version invariance.
func (tc *Collection) GetOriginalPreviousIDVersionsForUncommitted() (
withNewVersions []lease.IDVersion,
err error,
) {
err = tc.uncommitted.iterateUncommittedByID(func(uncommitted catalog.Descriptor) error {
original := tc.uncommitted.getOriginalByID(uncommitted.GetID())
// Ignore new descriptors.
if original == nil {
return nil
}
// Sanity checks. If AddUncommittedDescriptor is implemented and used
// correctly then these should never fail.
if original.GetVersion() == 0 {
return errors.AssertionFailedf(
"expected original version of uncommitted %s %q (%d) to be non-zero",
uncommitted.DescriptorType(), uncommitted.GetName(), uncommitted.GetID())
}
if expected, actual := uncommitted.GetVersion()-1, original.GetVersion(); expected != actual {
return errors.AssertionFailedf(
"expected original version of uncommitted %s %q (%d) to be %d, instead is %d",
uncommitted.DescriptorType(), uncommitted.GetName(), uncommitted.GetID(), expected, actual)
}
prev := lease.NewIDVersionPrev(original.GetName(), original.GetID(), original.GetVersion())
withNewVersions = append(withNewVersions, prev)
return nil
})
return originalVersions
return withNewVersions, err
}

// GetUncommittedTables returns all the tables updated or created in the
Expand Down
21 changes: 10 additions & 11 deletions pkg/sql/catalog/descs/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,16 @@ func (tc *Collection) finalizeDescriptors(
"len(validationLevels) = %d should be equal to len(descs) = %d",
len(validationLevels), len(descs))
}
// Add the descriptors to the uncommitted layer if we want them to be mutable.
if flags.RequireMutable {
for i, desc := range descs {
mut, err := tc.uncommitted.ensureMutable(ctx, desc)
if err != nil {
return err
}
descs[i] = mut
}
}
// Ensure that all descriptors are sufficiently validated.
requiredLevel := validate.MutableRead
if !flags.RequireMutable && !flags.AvoidLeased {
Expand All @@ -385,17 +395,6 @@ func (tc *Collection) finalizeDescriptors(
tc.stored.UpdateValidationLevel(desc, requiredLevel)
}
}
// Add the descriptors to the uncommitted layer if we want them to be mutable.
if !flags.RequireMutable {
return nil
}
for i, desc := range descs {
mut, err := tc.uncommitted.ensureMutable(ctx, desc)
if err != nil {
return err
}
descs[i] = mut
}
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/catalog/descs/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func (tc *Collection) GetLeasedImmutableTableByID(
func (tc *Collection) GetUncommittedMutableTableByID(
id descpb.ID,
) (catalog.TableDescriptor, *tabledesc.Mutable, error) {
original, mut := tc.uncommitted.getUncommittedMutableByID(id)
original := tc.uncommitted.getOriginalByID(id)
mut := tc.uncommitted.getUncommittedMutableByID(id)
if mut == nil {
return nil, nil, nil
}
Expand Down
19 changes: 11 additions & 8 deletions pkg/sql/catalog/descs/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,10 @@ func (cf *CollectionFactory) TxnWithExecutor(
return nil
}
for {
var modifiedDescriptors []lease.IDVersion
var withNewVersion []lease.IDVersion
var deletedDescs catalog.DescriptorIDSet
if err := run(ctx, func(ctx context.Context, txn *kv.Txn) error {
modifiedDescriptors, deletedDescs = nil, catalog.DescriptorIDSet{}
if err := run(ctx, func(ctx context.Context, txn *kv.Txn) (err error) {
withNewVersion, deletedDescs = nil, catalog.DescriptorIDSet{}
descsCol := cf.NewCollection(
ctx, nil, /* temporarySchemaProvider */
cf.ieFactoryWithTxn.MemoryMonitor(),
Expand All @@ -164,13 +164,16 @@ func (cf *CollectionFactory) TxnWithExecutor(
return err
}
deletedDescs = descsCol.deletedDescs
modifiedDescriptors = descsCol.GetDescriptorsWithNewVersion()
withNewVersion, err = descsCol.GetOriginalPreviousIDVersionsForUncommitted()
if err != nil {
return err
}
return commitTxnFn(ctx)
}); IsTwoVersionInvariantViolationError(err) {
continue
} else {
if err == nil {
err = waitForDescriptors(modifiedDescriptors, deletedDescs)
err = waitForDescriptors(withNewVersion, deletedDescs)
}
return err
}
Expand Down Expand Up @@ -210,9 +213,9 @@ func CheckTwoVersionInvariant(
txn *kv.Txn,
onRetryBackoff func(),
) error {
withNewVersion := descsCol.GetDescriptorsWithNewVersion()
if withNewVersion == nil {
return nil
withNewVersion, err := descsCol.GetOriginalPreviousIDVersionsForUncommitted()
if err != nil || withNewVersion == nil {
return err
}
if txn.IsCommitted() {
panic("transaction has already committed")
Expand Down
88 changes: 48 additions & 40 deletions pkg/sql/catalog/descs/uncommitted_descriptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/util/mon"
Expand Down Expand Up @@ -74,6 +73,15 @@ func (ud *uncommittedDescriptors) reset(ctx context.Context) {
}
}

// getOriginalByID returns the original version of the uncommitted descriptor
// with this ID, if it exists.
func (ud *uncommittedDescriptors) getOriginalByID(id descpb.ID) catalog.Descriptor {
if e := ud.original.Get(id); e != nil {
return e.(catalog.Descriptor)
}
return nil
}

// getUncommittedByID returns the uncommitted descriptor for this ID, if it
// exists.
func (ud *uncommittedDescriptors) getUncommittedByID(id descpb.ID) catalog.Descriptor {
Expand All @@ -83,21 +91,16 @@ func (ud *uncommittedDescriptors) getUncommittedByID(id descpb.ID) catalog.Descr
return nil
}

// getUncommittedMutableByID returns the original descriptor as well as the
// uncommitted mutable descriptor for this ID if they exist.
// getUncommittedMutableByID returns the uncommitted descriptor for this ID, if
// it exists, in mutable form. This mutable descriptor is owned by the
// collection.
func (ud *uncommittedDescriptors) getUncommittedMutableByID(
id descpb.ID,
) (original catalog.Descriptor, mutable catalog.MutableDescriptor) {
if ud.uncommitted.GetByID(id) == nil {
return nil, nil
}
if me := ud.mutable.Get(id); me != nil {
mutable = me.(catalog.MutableDescriptor)
}
if oe := ud.original.Get(id); oe != nil {
original = oe.(catalog.Descriptor)
) catalog.MutableDescriptor {
if me := ud.mutable.Get(id); me != nil && ud.uncommitted.GetByID(id) != nil {
return me.(catalog.MutableDescriptor)
}
return original, mutable
return nil
}

// getUncommittedByName returns the uncommitted descriptor for this name key, if
Expand All @@ -111,20 +114,6 @@ func (ud *uncommittedDescriptors) getUncommittedByName(
return nil
}

// iterateNewVersionByID applies fn to each lease.IDVersion from the original
// descriptor, if it exists, for each uncommitted descriptor, in ascending
// sequence of IDs.
func (ud *uncommittedDescriptors) iterateNewVersionByID(
fn func(originalVersion lease.IDVersion) error,
) error {
return ud.uncommitted.IterateByID(func(entry catalog.NameEntry) error {
if o := ud.original.Get(entry.GetID()); o != nil {
return fn(lease.NewIDVersionPrev(o.GetName(), o.GetID(), o.(catalog.Descriptor).GetVersion()))
}
return nil
})
}

// iterateUncommittedByID applies fn to the uncommitted descriptors in ascending
// sequence of IDs.
func (ud *uncommittedDescriptors) iterateUncommittedByID(
Expand All @@ -146,23 +135,44 @@ func (ud *uncommittedDescriptors) ensureMutable(
return e.(catalog.MutableDescriptor), nil
}
mut := original.NewBuilder().BuildExistingMutable()
newBytes := mut.ByteSize()
if original.GetID() == keys.SystemDatabaseID {
return mut, nil
}
if err := ud.memAcc.Grow(ctx, mut.ByteSize()); err != nil {
return nil, errors.Wrap(err, "Memory usage exceeds limit for uncommittedDescriptors")
}
ud.original.Upsert(original)
ud.mutable.Upsert(mut)
original = original.NewBuilder().BuildImmutable()
newBytes += original.ByteSize()
ud.original.Upsert(original)
if err := ud.memAcc.Grow(ctx, newBytes); err != nil {
return nil, errors.Wrap(err, "memory usage exceeds limit for uncommittedDescriptors")
}
return mut, nil
}

// upsert adds an uncommitted descriptor to this layer.
// This is called exclusively by the Collection's AddUncommittedDescriptor
// method.
func (ud *uncommittedDescriptors) upsert(
ctx context.Context, original catalog.Descriptor, mut catalog.MutableDescriptor,
ctx context.Context, mut catalog.MutableDescriptor,
) (err error) {
original := ud.getOriginalByID(mut.GetID())
// Perform some sanity checks to ensure the version counters are correct.
if original == nil {
if !mut.IsNew() {
return errors.New("non-new descriptor does not exist in storage yet")
}
if mut.GetVersion() != 1 {
return errors.New("new descriptor version should be 1")
}
} else {
if mut.IsNew() {
return errors.New("new descriptor already exists in storage")
}
if mut.GetVersion() != original.GetVersion()+1 {
return errors.Newf("expected uncommitted version %d, instead got %d",
original.GetVersion()+1, mut.GetVersion())
}
}
// Refresh the cached fields on mutable type descriptors.
if typ, ok := mut.(*typedesc.Mutable); ok {
mut, err = typedesc.UpdateCachedFieldsOnModifiedMutable(typ)
Expand All @@ -171,18 +181,16 @@ func (ud *uncommittedDescriptors) upsert(
}
}
// Add the descriptors to the uncommitted descriptors layer.
imm := mut.ImmutableCopy()
newBytes := imm.ByteSize()
var newBytes int64
if mut.IsNew() {
newBytes += mut.ByteSize()
}
if err = ud.memAcc.Grow(ctx, newBytes); err != nil {
return errors.Wrap(err, "Memory usage exceeds limit for uncommittedDescriptors")
}
ud.mutable.Upsert(mut)
ud.uncommitted.Upsert(imm, imm.SkipNamespace())
if original != nil {
ud.original.Upsert(original)
uncommitted := mut.ImmutableCopy()
newBytes += uncommitted.ByteSize()
ud.uncommitted.Upsert(uncommitted, uncommitted.SkipNamespace())
if err = ud.memAcc.Grow(ctx, newBytes); err != nil {
return errors.Wrap(err, "memory usage exceeds limit for uncommittedDescriptors")
}
return nil
}
16 changes: 8 additions & 8 deletions pkg/sql/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ func TestValidateTTLScheduledJobs(t *testing.T) {
{
desc: "not pointing at a valid scheduled job",
setup: func(t *testing.T, sqlDB *gosql.DB, kvDB *kv.DB, s serverutils.TestServerInterface, tableDesc *tabledesc.Mutable, scheduleID int64) {
tableDesc.RowLevelTTL.ScheduleID = 0
require.NoError(t, sql.TestingDescsTxn(ctx, s, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error {
{
// We need the collection to read the descriptor from storage for
// the subsequent write to succeed.
flags := tree.CommonLookupFlags{Required: true, AvoidLeased: true}
_, err := col.GetImmutableDescriptorByID(ctx, txn, tableDesc.GetID(), flags)
require.NoError(t, err)
require.NoError(t, sql.TestingDescsTxn(ctx, s, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) (err error) {
// We need the collection to read the descriptor from storage for
// the subsequent write to succeed.
tableDesc, err = col.GetMutableTableByID(ctx, txn, tableDesc.GetID(), tree.ObjectLookupFlagsWithRequired())
tableDesc.RowLevelTTL.ScheduleID = 0
tableDesc.Version++
if err != nil {
return err
}
return col.WriteDesc(ctx, false /* kvBatch */, tableDesc, txn)
}))
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -979,9 +979,11 @@ func (ex *connExecutor) commitSQLTransactionInternal(ctx context.Context) error
// Now that we've committed, if we modified any descriptor we need to make sure
// to release the leases for them so that the schema change can proceed and
// we don't block the client.
if descs := ex.extraTxnState.descCollection.GetDescriptorsWithNewVersion(); descs != nil {
ex.extraTxnState.descCollection.ReleaseLeases(ctx)
withNewVersion, err := ex.extraTxnState.descCollection.GetOriginalPreviousIDVersionsForUncommitted()
if err != nil || withNewVersion == nil {
return err
}
ex.extraTxnState.descCollection.ReleaseLeases(ctx)
return nil
}

Expand Down
Loading