From dab432a7258c2b8ad2c713b269c32da2366aa012 Mon Sep 17 00:00:00 2001 From: Marius Posta Date: Fri, 9 Sep 2022 14:04:48 +0000 Subject: [PATCH] descs: fix txn commit waiting on wrong lease version Recent work on the descs.Collection (PR #87067) introduced a regression in which it would return bad lease.IDVersion versions for the uncommitted descriptors. In addition to fixing this, this commit corrects the lifecycle of the `original` descriptors in the uncommitted layer, which now remain unaffected by changes in the storage layer. This commit tightens the constraints on what can be passed to AddUncommittedDescriptor. Fixes #87672. Release justification: important bug fix Release note: None --- pkg/sql/catalog/descs/collection.go | 56 +++++++----- pkg/sql/catalog/descs/descriptor.go | 21 +++-- pkg/sql/catalog/descs/table.go | 3 +- pkg/sql/catalog/descs/txn.go | 19 ++-- .../catalog/descs/uncommitted_descriptors.go | 88 ++++++++++--------- pkg/sql/check_test.go | 16 ++-- pkg/sql/conn_executor_exec.go | 6 +- pkg/sql/create_view.go | 35 ++++++-- .../testdata/logic_test/schema_change_in_txn | 29 ++++++ pkg/sql/schema_changer_test.go | 12 +-- 10 files changed, 175 insertions(+), 110 deletions(-) diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index 04799213aefd..99756499fd34 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -237,20 +237,6 @@ func (tc *Collection) AddUncommittedDescriptor( desc.DescriptorType(), desc.GetName(), desc.GetID(), desc.GetVersion()) } }() - original := tc.stored.GetCachedByID(desc.GetID()) - if original == nil { - if !desc.IsNew() { - return errors.New("non-new descriptor does not exist in storage yet") - } - if desc.GetVersion() != 1 { - return errors.New("version should be 1, descriptor does not exist in storage yet") - } - } else { - if desc.GetVersion() != original.GetVersion() && desc.GetVersion() != original.GetVersion()+1 { - return errors.Newf("incompatible version, descriptor version %d exists in storage", - original.GetVersion()) - } - } if tc.synthetic.getSyntheticByID(desc.GetID()) != nil { return errors.AssertionFailedf( "cannot add uncommitted %s %q (%d) when a synthetic descriptor with the same ID exists", @@ -262,7 +248,7 @@ func (tc *Collection) AddUncommittedDescriptor( desc.DescriptorType(), desc.GetName(), desc.GetID()) } tc.stored.RemoveFromNameIndex(desc) - return tc.uncommitted.upsert(ctx, original, desc) + return tc.uncommitted.upsert(ctx, desc) } // ValidateOnWriteEnabled is the cluster setting used to enable or disable @@ -308,16 +294,40 @@ func (tc *Collection) WriteDesc( return txn.Run(ctx, b) } -// GetDescriptorsWithNewVersion returns all the IDVersion pairs that have -// undergone a schema change. Returns nil for no schema changes. The version -// returned for each schema change is clusterVersion - 1, because that's the one -// that will be used when checking for table descriptor two version invariance. -func (tc *Collection) GetDescriptorsWithNewVersion() (originalVersions []lease.IDVersion) { - _ = tc.uncommitted.iterateNewVersionByID(func(originalVersion lease.IDVersion) error { - originalVersions = append(originalVersions, originalVersion) +// GetOriginalPreviousIDVersionsForUncommitted returns all the IDVersion +// pairs for descriptors that have undergone a schema change. +// Returns an empty slice for no schema changes. +// +// The version returned for each schema change is clusterVersion - 1, because +// that's the one that will be used when checking for table descriptor +// two-version invariance. +func (tc *Collection) GetOriginalPreviousIDVersionsForUncommitted() ( + withNewVersions []lease.IDVersion, + err error, +) { + err = tc.uncommitted.iterateUncommittedByID(func(uncommitted catalog.Descriptor) error { + original := tc.uncommitted.getOriginalByID(uncommitted.GetID()) + // Ignore new descriptors. + if original == nil { + return nil + } + // Sanity checks. If AddUncommittedDescriptor is implemented and used + // correctly then these should never fail. + if original.GetVersion() == 0 { + return errors.AssertionFailedf( + "expected original version of uncommitted %s %q (%d) to be non-zero", + uncommitted.DescriptorType(), uncommitted.GetName(), uncommitted.GetID()) + } + if expected, actual := uncommitted.GetVersion()-1, original.GetVersion(); expected != actual { + return errors.AssertionFailedf( + "expected original version of uncommitted %s %q (%d) to be %d, instead is %d", + uncommitted.DescriptorType(), uncommitted.GetName(), uncommitted.GetID(), expected, actual) + } + prev := lease.NewIDVersionPrev(original.GetName(), original.GetID(), original.GetVersion()) + withNewVersions = append(withNewVersions, prev) return nil }) - return originalVersions + return withNewVersions, err } // GetUncommittedTables returns all the tables updated or created in the diff --git a/pkg/sql/catalog/descs/descriptor.go b/pkg/sql/catalog/descs/descriptor.go index 05576b524134..ada1681117f9 100644 --- a/pkg/sql/catalog/descs/descriptor.go +++ b/pkg/sql/catalog/descs/descriptor.go @@ -366,6 +366,16 @@ func (tc *Collection) finalizeDescriptors( "len(validationLevels) = %d should be equal to len(descs) = %d", len(validationLevels), len(descs)) } + // Add the descriptors to the uncommitted layer if we want them to be mutable. + if flags.RequireMutable { + for i, desc := range descs { + mut, err := tc.uncommitted.ensureMutable(ctx, desc) + if err != nil { + return err + } + descs[i] = mut + } + } // Ensure that all descriptors are sufficiently validated. requiredLevel := validate.MutableRead if !flags.RequireMutable && !flags.AvoidLeased { @@ -385,17 +395,6 @@ func (tc *Collection) finalizeDescriptors( tc.stored.UpdateValidationLevel(desc, requiredLevel) } } - // Add the descriptors to the uncommitted layer if we want them to be mutable. - if !flags.RequireMutable { - return nil - } - for i, desc := range descs { - mut, err := tc.uncommitted.ensureMutable(ctx, desc) - if err != nil { - return err - } - descs[i] = mut - } return nil } diff --git a/pkg/sql/catalog/descs/table.go b/pkg/sql/catalog/descs/table.go index ab434e47388f..d4f31d9a76b6 100644 --- a/pkg/sql/catalog/descs/table.go +++ b/pkg/sql/catalog/descs/table.go @@ -83,7 +83,8 @@ func (tc *Collection) GetLeasedImmutableTableByID( func (tc *Collection) GetUncommittedMutableTableByID( id descpb.ID, ) (catalog.TableDescriptor, *tabledesc.Mutable, error) { - original, mut := tc.uncommitted.getUncommittedMutableByID(id) + original := tc.uncommitted.getOriginalByID(id) + mut := tc.uncommitted.getUncommittedMutableByID(id) if mut == nil { return nil, nil, nil } diff --git a/pkg/sql/catalog/descs/txn.go b/pkg/sql/catalog/descs/txn.go index 30dbef9427fc..0b3bba3168ef 100644 --- a/pkg/sql/catalog/descs/txn.go +++ b/pkg/sql/catalog/descs/txn.go @@ -150,10 +150,10 @@ func (cf *CollectionFactory) TxnWithExecutor( return nil } for { - var modifiedDescriptors []lease.IDVersion + var withNewVersion []lease.IDVersion var deletedDescs catalog.DescriptorIDSet - if err := run(ctx, func(ctx context.Context, txn *kv.Txn) error { - modifiedDescriptors, deletedDescs = nil, catalog.DescriptorIDSet{} + if err := run(ctx, func(ctx context.Context, txn *kv.Txn) (err error) { + withNewVersion, deletedDescs = nil, catalog.DescriptorIDSet{} descsCol := cf.NewCollection( ctx, nil, /* temporarySchemaProvider */ cf.ieFactoryWithTxn.MemoryMonitor(), @@ -164,13 +164,16 @@ func (cf *CollectionFactory) TxnWithExecutor( return err } deletedDescs = descsCol.deletedDescs - modifiedDescriptors = descsCol.GetDescriptorsWithNewVersion() + withNewVersion, err = descsCol.GetOriginalPreviousIDVersionsForUncommitted() + if err != nil { + return err + } return commitTxnFn(ctx) }); IsTwoVersionInvariantViolationError(err) { continue } else { if err == nil { - err = waitForDescriptors(modifiedDescriptors, deletedDescs) + err = waitForDescriptors(withNewVersion, deletedDescs) } return err } @@ -210,9 +213,9 @@ func CheckTwoVersionInvariant( txn *kv.Txn, onRetryBackoff func(), ) error { - withNewVersion := descsCol.GetDescriptorsWithNewVersion() - if withNewVersion == nil { - return nil + withNewVersion, err := descsCol.GetOriginalPreviousIDVersionsForUncommitted() + if err != nil || withNewVersion == nil { + return err } if txn.IsCommitted() { panic("transaction has already committed") diff --git a/pkg/sql/catalog/descs/uncommitted_descriptors.go b/pkg/sql/catalog/descs/uncommitted_descriptors.go index 05d9768b09e6..b398e1167c65 100644 --- a/pkg/sql/catalog/descs/uncommitted_descriptors.go +++ b/pkg/sql/catalog/descs/uncommitted_descriptors.go @@ -16,7 +16,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree" "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/util/mon" @@ -74,6 +73,15 @@ func (ud *uncommittedDescriptors) reset(ctx context.Context) { } } +// getOriginalByID returns the original version of the uncommitted descriptor +// with this ID, if it exists. +func (ud *uncommittedDescriptors) getOriginalByID(id descpb.ID) catalog.Descriptor { + if e := ud.original.Get(id); e != nil { + return e.(catalog.Descriptor) + } + return nil +} + // getUncommittedByID returns the uncommitted descriptor for this ID, if it // exists. func (ud *uncommittedDescriptors) getUncommittedByID(id descpb.ID) catalog.Descriptor { @@ -83,21 +91,16 @@ func (ud *uncommittedDescriptors) getUncommittedByID(id descpb.ID) catalog.Descr return nil } -// getUncommittedMutableByID returns the original descriptor as well as the -// uncommitted mutable descriptor for this ID if they exist. +// getUncommittedMutableByID returns the uncommitted descriptor for this ID, if +// it exists, in mutable form. This mutable descriptor is owned by the +// collection. func (ud *uncommittedDescriptors) getUncommittedMutableByID( id descpb.ID, -) (original catalog.Descriptor, mutable catalog.MutableDescriptor) { - if ud.uncommitted.GetByID(id) == nil { - return nil, nil - } - if me := ud.mutable.Get(id); me != nil { - mutable = me.(catalog.MutableDescriptor) - } - if oe := ud.original.Get(id); oe != nil { - original = oe.(catalog.Descriptor) +) catalog.MutableDescriptor { + if me := ud.mutable.Get(id); me != nil && ud.uncommitted.GetByID(id) != nil { + return me.(catalog.MutableDescriptor) } - return original, mutable + return nil } // getUncommittedByName returns the uncommitted descriptor for this name key, if @@ -111,20 +114,6 @@ func (ud *uncommittedDescriptors) getUncommittedByName( return nil } -// iterateNewVersionByID applies fn to each lease.IDVersion from the original -// descriptor, if it exists, for each uncommitted descriptor, in ascending -// sequence of IDs. -func (ud *uncommittedDescriptors) iterateNewVersionByID( - fn func(originalVersion lease.IDVersion) error, -) error { - return ud.uncommitted.IterateByID(func(entry catalog.NameEntry) error { - if o := ud.original.Get(entry.GetID()); o != nil { - return fn(lease.NewIDVersionPrev(o.GetName(), o.GetID(), o.(catalog.Descriptor).GetVersion())) - } - return nil - }) -} - // iterateUncommittedByID applies fn to the uncommitted descriptors in ascending // sequence of IDs. func (ud *uncommittedDescriptors) iterateUncommittedByID( @@ -146,14 +135,17 @@ func (ud *uncommittedDescriptors) ensureMutable( return e.(catalog.MutableDescriptor), nil } mut := original.NewBuilder().BuildExistingMutable() + newBytes := mut.ByteSize() if original.GetID() == keys.SystemDatabaseID { return mut, nil } - if err := ud.memAcc.Grow(ctx, mut.ByteSize()); err != nil { - return nil, errors.Wrap(err, "Memory usage exceeds limit for uncommittedDescriptors") - } - ud.original.Upsert(original) ud.mutable.Upsert(mut) + original = original.NewBuilder().BuildImmutable() + newBytes += original.ByteSize() + ud.original.Upsert(original) + if err := ud.memAcc.Grow(ctx, newBytes); err != nil { + return nil, errors.Wrap(err, "memory usage exceeds limit for uncommittedDescriptors") + } return mut, nil } @@ -161,8 +153,26 @@ func (ud *uncommittedDescriptors) ensureMutable( // This is called exclusively by the Collection's AddUncommittedDescriptor // method. func (ud *uncommittedDescriptors) upsert( - ctx context.Context, original catalog.Descriptor, mut catalog.MutableDescriptor, + ctx context.Context, mut catalog.MutableDescriptor, ) (err error) { + original := ud.getOriginalByID(mut.GetID()) + // Perform some sanity checks to ensure the version counters are correct. + if original == nil { + if !mut.IsNew() { + return errors.New("non-new descriptor does not exist in storage yet") + } + if mut.GetVersion() != 1 { + return errors.New("new descriptor version should be 1") + } + } else { + if mut.IsNew() { + return errors.New("new descriptor already exists in storage") + } + if mut.GetVersion() != original.GetVersion()+1 { + return errors.Newf("expected uncommitted version %d, instead got %d", + original.GetVersion()+1, mut.GetVersion()) + } + } // Refresh the cached fields on mutable type descriptors. if typ, ok := mut.(*typedesc.Mutable); ok { mut, err = typedesc.UpdateCachedFieldsOnModifiedMutable(typ) @@ -171,18 +181,16 @@ func (ud *uncommittedDescriptors) upsert( } } // Add the descriptors to the uncommitted descriptors layer. - imm := mut.ImmutableCopy() - newBytes := imm.ByteSize() + var newBytes int64 if mut.IsNew() { newBytes += mut.ByteSize() } - if err = ud.memAcc.Grow(ctx, newBytes); err != nil { - return errors.Wrap(err, "Memory usage exceeds limit for uncommittedDescriptors") - } ud.mutable.Upsert(mut) - ud.uncommitted.Upsert(imm, imm.SkipNamespace()) - if original != nil { - ud.original.Upsert(original) + uncommitted := mut.ImmutableCopy() + newBytes += uncommitted.ByteSize() + ud.uncommitted.Upsert(uncommitted, uncommitted.SkipNamespace()) + if err = ud.memAcc.Grow(ctx, newBytes); err != nil { + return errors.Wrap(err, "memory usage exceeds limit for uncommittedDescriptors") } return nil } diff --git a/pkg/sql/check_test.go b/pkg/sql/check_test.go index 2e06417edc69..c264605acd78 100644 --- a/pkg/sql/check_test.go +++ b/pkg/sql/check_test.go @@ -53,14 +53,14 @@ func TestValidateTTLScheduledJobs(t *testing.T) { { desc: "not pointing at a valid scheduled job", setup: func(t *testing.T, sqlDB *gosql.DB, kvDB *kv.DB, s serverutils.TestServerInterface, tableDesc *tabledesc.Mutable, scheduleID int64) { - tableDesc.RowLevelTTL.ScheduleID = 0 - require.NoError(t, sql.TestingDescsTxn(ctx, s, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error { - { - // We need the collection to read the descriptor from storage for - // the subsequent write to succeed. - flags := tree.CommonLookupFlags{Required: true, AvoidLeased: true} - _, err := col.GetImmutableDescriptorByID(ctx, txn, tableDesc.GetID(), flags) - require.NoError(t, err) + require.NoError(t, sql.TestingDescsTxn(ctx, s, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) (err error) { + // We need the collection to read the descriptor from storage for + // the subsequent write to succeed. + tableDesc, err = col.GetMutableTableByID(ctx, txn, tableDesc.GetID(), tree.ObjectLookupFlagsWithRequired()) + tableDesc.RowLevelTTL.ScheduleID = 0 + tableDesc.Version++ + if err != nil { + return err } return col.WriteDesc(ctx, false /* kvBatch */, tableDesc, txn) })) diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 04bb1ffd82bb..07be53e49572 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -979,9 +979,11 @@ func (ex *connExecutor) commitSQLTransactionInternal(ctx context.Context) error // Now that we've committed, if we modified any descriptor we need to make sure // to release the leases for them so that the schema change can proceed and // we don't block the client. - if descs := ex.extraTxnState.descCollection.GetDescriptorsWithNewVersion(); descs != nil { - ex.extraTxnState.descCollection.ReleaseLeases(ctx) + withNewVersion, err := ex.extraTxnState.descCollection.GetOriginalPreviousIDVersionsForUncommitted() + if err != nil || withNewVersion == nil { + return err } + ex.extraTxnState.descCollection.ReleaseLeases(ctx) return nil } diff --git a/pkg/sql/create_view.go b/pkg/sql/create_view.go index 77845b370d82..85a4e6887226 100644 --- a/pkg/sql/create_view.go +++ b/pkg/sql/create_view.go @@ -107,18 +107,35 @@ func (n *createViewNode) startExec(params runParams) error { // If so, promote this view to temporary. backRefMutables := make(map[descpb.ID]*tabledesc.Mutable, len(n.planDeps)) hasTempBackref := false - for id, updated := range n.planDeps { - _, backRefMutable, err := params.p.Descriptors().GetUncommittedMutableTableByID(id) - if err != nil { + { + var ids catalog.DescriptorIDSet + for id := range n.planDeps { + ids.Add(id) + } + flags := tree.CommonLookupFlags{ + Required: true, + RequireMutable: true, + AvoidLeased: true, + AvoidSynthetic: true, + } + // Lookup the dependent tables in bulk to minimize round-trips to KV. + if _, err := params.p.Descriptors().GetImmutableDescriptorsByID( + params.ctx, params.p.Txn(), flags, ids.Ordered()..., + ); err != nil { return err } - if backRefMutable == nil { - backRefMutable = tabledesc.NewBuilder(updated.desc.TableDesc()).BuildExistingMutableTable() - } - if !n.persistence.IsTemporary() && backRefMutable.Temporary { - hasTempBackref = true + for id := range n.planDeps { + backRefMutable, err := params.p.Descriptors().GetMutableTableByID( + params.ctx, params.p.Txn(), id, tree.ObjectLookupFlagsWithRequired(), + ) + if err != nil { + return err + } + if !n.persistence.IsTemporary() && backRefMutable.Temporary { + hasTempBackref = true + } + backRefMutables[id] = backRefMutable } - backRefMutables[id] = backRefMutable } if hasTempBackref { n.persistence = tree.PersistenceTemporary diff --git a/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn b/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn index 5ce148643e75..0ad6c45e63f4 100644 --- a/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn +++ b/pkg/sql/logictest/testdata/logic_test/schema_change_in_txn @@ -1921,3 +1921,32 @@ DROP INDEX t1_57592@idx; statement error pgcode XXA00 there is no unique constraint matching given keys for referenced table t1_57592 COMMIT + +# Regression test for issue #87672 in which the lease manager would wait on +# the wrong version to be released. +subtest regression_87672 + +statement ok +CREATE DATABASE db_87672; + +statement ok +USE db_87672; + +statement ok +CREATE SCHEMA sc; + +statement ok +BEGIN; + +statement ok +CREATE SCHEMA sc2; + +statement ok +SELECT * FROM crdb_internal.tables; + +statement ok +DROP SCHEMA sc; + +# Prior to fixing this bug, this commit would hang indefinitely. +statement ok +COMMIT; diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 0ef776d3319b..1650215aa21b 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -1520,20 +1520,16 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v INT); Required: true, AvoidLeased: true, }, } - tbl, err := col.GetImmutableTableByID(ctx, txn, tableDesc.GetID(), flags) + tbl, err := col.GetMutableTableByID(ctx, txn, tableDesc.GetID(), flags) if err != nil { return err } - table := tabledesc.NewBuilder(tbl.TableDesc()).BuildExistingMutableTable() - if err != nil { - return err - } - table.MaybeIncrementVersion() + tbl.Version++ ba := txn.NewBatch() - if err := col.WriteDescToBatch(ctx, false /* kvTrace */, table, ba); err != nil { + if err := col.WriteDescToBatch(ctx, false /* kvTrace */, tbl, ba); err != nil { return err } - version = table.GetVersion() + version = tbl.GetVersion() // Here we don't want to actually wait for the backfill to drop its lease. // To avoid that, we hack the machinery which tries oh so hard to make it