From aa3c59ccb2836ebd04ae89a4a26d14d7ca1bbe0d Mon Sep 17 00:00:00 2001 From: Xiang Gu Date: Tue, 27 Sep 2022 13:44:47 -0400 Subject: [PATCH] descs: upgrade `WriteDescToBatch` to use CPut Previously, `WriteDescToBatch`, which is called to update a descriptor in storage by a `desc.Collection`, uses `Put` primitive to directly modify the storage layer. We would like to tighten it to use a `CPut` to prevent (in)advertent clobbering of that table. This PR does that by book-keeping the raw bytes of the to-be-updated descriptor in the descriptor, acquired when we first read it into the `desc.Collection` from the storage layer. Then, the infrastructural work done in the previous commit allows us to carry over these raw bytes as we are preparing the `MutableDescriptor` that we pass into this `WriteDescToBatch` method. One additional difficulty is that what if we will need to call `WriteDescToBatch` more than once in one transaction. For example, in the new schema changer, statement phase and precommit phase will both be in one transaction, but we call `WriteDescToBatch` at the end of each stage. Hence, for some DDL stmts (e.g. `DROP COLUMN`), we will call `WriteDescToBatch` twice in one transaction. The first call will modify the descriptor in storage and also added this descriptor to `desc.Collection.uncommitted` set, so, the second call will get it from there. To make `CPut` work correctly for the second call, we will need to get the expected byte slice from the `uncommitted` descriptor set. This motivates the change to update a descriptor's byte slice field when it's added to the `uncommitted` descriptor sett. --- pkg/sql/catalog/descs/BUILD.bazel | 1 + pkg/sql/catalog/descs/collection.go | 16 +++++++++++++++- pkg/sql/catalog/descs/collection_test.go | 6 +++--- pkg/sql/catalog/descs/uncommitted_descriptors.go | 16 +++++++++++++++- pkg/sql/catalog/internal/catkv/catalog_query.go | 1 + pkg/sql/repair.go | 1 + 6 files changed, 36 insertions(+), 5 deletions(-) diff --git a/pkg/sql/catalog/descs/BUILD.bazel b/pkg/sql/catalog/descs/BUILD.bazel index 52d4a1e9d4ec..47b00e214850 100644 --- a/pkg/sql/catalog/descs/BUILD.bazel +++ b/pkg/sql/catalog/descs/BUILD.bazel @@ -33,6 +33,7 @@ go_library( "//pkg/clusterversion", "//pkg/keys", "//pkg/kv", + "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", "//pkg/spanconfig", diff --git a/pkg/sql/catalog/descs/collection.go b/pkg/sql/catalog/descs/collection.go index f37a2b5c7cd0..cb12f30ee4c6 100644 --- a/pkg/sql/catalog/descs/collection.go +++ b/pkg/sql/catalog/descs/collection.go @@ -269,6 +269,20 @@ func (tc *Collection) WriteDescToBatch( return err } } + // Retrieve the expected bytes of `desc` in storage. + // If this is the first time we write to `desc` in the transaction, its + // expected bytes will be retrieved when we read it into this desc.Collection, + // and carried over in it. + // If, however, this is not the first time we write to `desc` in the transaction, + // which means it has existed in `tc.uncommitted`, we will retrieve the expected + // bytes from there. + var expected []byte + if exist := tc.uncommitted.getUncommittedByID(desc.GetID()); exist != nil { + expected = exist.GetRawBytesInStorage() + } else { + expected = desc.GetRawBytesInStorage() + } + if err := tc.AddUncommittedDescriptor(ctx, desc); err != nil { return err } @@ -277,7 +291,7 @@ func (tc *Collection) WriteDescToBatch( if kvTrace { log.VEventf(ctx, 2, "Put %s -> %s", descKey, proto) } - b.Put(descKey, proto) + b.CPut(descKey, proto, expected) return nil } diff --git a/pkg/sql/catalog/descs/collection_test.go b/pkg/sql/catalog/descs/collection_test.go index e75b9afd560d..c0f9b157abcd 100644 --- a/pkg/sql/catalog/descs/collection_test.go +++ b/pkg/sql/catalog/descs/collection_test.go @@ -284,7 +284,7 @@ func TestAddUncommittedDescriptorAndMutableResolution(t *testing.T) { immByNameAfter, err := descriptors.GetImmutableDatabaseByName(ctx, txn, "new_name", flags) require.NoError(t, err) require.Equal(t, db.GetVersion(), immByNameAfter.GetVersion()) - require.Equal(t, mut.ImmutableCopy(), immByNameAfter) + require.Equal(t, mut.ImmutableCopy().DescriptorProto(), immByNameAfter.DescriptorProto()) _, immByIDAfter, err := descriptors.GetImmutableDatabaseByID(ctx, txn, db.GetID(), flags) require.NoError(t, err) @@ -733,7 +733,7 @@ func TestDescriptorCache(t *testing.T) { } found := cat.LookupDescriptorEntry(mut.ID) require.NotEmpty(t, found) - require.Equal(t, mut.ImmutableCopy(), found) + require.Equal(t, mut.ImmutableCopy().DescriptorProto(), found.DescriptorProto()) return nil })) }) @@ -768,7 +768,7 @@ func TestDescriptorCache(t *testing.T) { return err } require.Len(t, dbDescs, 4) - require.Equal(t, mut.ImmutableCopy(), dbDescs[0]) + require.Equal(t, mut.ImmutableCopy().DescriptorProto(), dbDescs[0].DescriptorProto()) return nil })) }) diff --git a/pkg/sql/catalog/descs/uncommitted_descriptors.go b/pkg/sql/catalog/descs/uncommitted_descriptors.go index b398e1167c65..4d059cc35205 100644 --- a/pkg/sql/catalog/descs/uncommitted_descriptors.go +++ b/pkg/sql/catalog/descs/uncommitted_descriptors.go @@ -14,6 +14,7 @@ import ( "context" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree" @@ -186,7 +187,20 @@ func (ud *uncommittedDescriptors) upsert( newBytes += mut.ByteSize() } ud.mutable.Upsert(mut) - uncommitted := mut.ImmutableCopy() + // Upserting a descriptor into the "uncommitted" set implies + // this descriptor is going to be written to storage very soon. We + // compute what the raw bytes of this descriptor in storage is going to + // look like when that write happens, so that any further update to this + // descriptor, which will be retrieved from the "uncommitted" set, will + // carry the correct raw bytes in storage with it. + var val roachpb.Value + if err = val.SetProto(mut.DescriptorProto()); err != nil { + return err + } + rawBytesInStorageAfterPendingWrite := val.TagAndDataBytes() + uncommittedBuilder := mut.NewBuilder() + uncommittedBuilder.SetRawBytesInStorage(rawBytesInStorageAfterPendingWrite) + uncommitted := uncommittedBuilder.BuildImmutable() newBytes += uncommitted.ByteSize() ud.uncommitted.Upsert(uncommitted, uncommitted.SkipNamespace()) if err = ud.memAcc.Grow(ctx, newBytes); err != nil { diff --git a/pkg/sql/catalog/internal/catkv/catalog_query.go b/pkg/sql/catalog/internal/catkv/catalog_query.go index 5c0c6824dc98..0639130275f2 100644 --- a/pkg/sql/catalog/internal/catkv/catalog_query.go +++ b/pkg/sql/catalog/internal/catkv/catalog_query.go @@ -149,6 +149,7 @@ func build( if err := b.RunPostDeserializationChanges(); err != nil { return nil, errors.NewAssertionErrorWithWrappedErrf(err, "error during RunPostDeserializationChanges") } + b.SetRawBytesInStorage(rowValue.TagAndDataBytes()) desc := b.BuildImmutable() if id != desc.GetID() { return nil, errors.AssertionFailedf("unexpected ID %d in descriptor", desc.GetID()) diff --git a/pkg/sql/repair.go b/pkg/sql/repair.go index 8c5ffe75f56b..cf4faba4f9e8 100644 --- a/pkg/sql/repair.go +++ b/pkg/sql/repair.go @@ -659,6 +659,7 @@ func unsafeReadDescriptor( if err != nil || b == nil { return nil, notice, err } + b.SetRawBytesInStorage(descRow.Value.TagAndDataBytes()) return b.BuildExistingMutable(), notice, nil }