Skip to content

Commit

Permalink
descs: upgrade WriteDescToBatch to use CPut
Browse files Browse the repository at this point in the history
Previously, `WriteDescToBatch`, which is called to
update a descriptor in storage by a `desc.Collection`, uses `Put`
primitive to directly modify the storage layer. We would like to tighten
it to use a `CPut` to prevent (in)advertent clobbering of that table.

This PR does that by book-keeping the raw bytes of the to-be-updated
descriptor in the descriptor, acquired when we first read it into the
`desc.Collection` from the storage layer. Then, the infrastructural
work done in the previous commit allows us to carry over these raw bytes
as we are preparing the `MutableDescriptor` that we pass into this
`WriteDescToBatch` method.

One additional difficulty is that what if we will need to call
`WriteDescToBatch` more than once in one transaction. For example,
in the new schema changer, statement phase and precommit phase will both
be in one transaction, but we call `WriteDescToBatch` at the end of
each stage. Hence, for some DDL stmts (e.g. `DROP COLUMN`), we will
call `WriteDescToBatch` twice in one transaction. The first call will
modify the descriptor in storage and also added this descriptor to
`desc.Collection.uncommitted` set, so, the second call will get it from
there. To make `CPut` work correctly for the second call, we will need
to get the expected byte slice from the `uncommitted` descriptor set.
This motivates the change to update a descriptor's byte slice field when
it's added to the `uncommitted` descriptor sett.
  • Loading branch information
Xiang-Gu committed Oct 3, 2022
1 parent 241d790 commit aa3c59c
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 5 deletions.
1 change: 1 addition & 0 deletions pkg/sql/catalog/descs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ go_library(
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
Expand Down
16 changes: 15 additions & 1 deletion pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,20 @@ func (tc *Collection) WriteDescToBatch(
return err
}
}
// Retrieve the expected bytes of `desc` in storage.
// If this is the first time we write to `desc` in the transaction, its
// expected bytes will be retrieved when we read it into this desc.Collection,
// and carried over in it.
// If, however, this is not the first time we write to `desc` in the transaction,
// which means it has existed in `tc.uncommitted`, we will retrieve the expected
// bytes from there.
var expected []byte
if exist := tc.uncommitted.getUncommittedByID(desc.GetID()); exist != nil {
expected = exist.GetRawBytesInStorage()
} else {
expected = desc.GetRawBytesInStorage()
}

if err := tc.AddUncommittedDescriptor(ctx, desc); err != nil {
return err
}
Expand All @@ -277,7 +291,7 @@ func (tc *Collection) WriteDescToBatch(
if kvTrace {
log.VEventf(ctx, 2, "Put %s -> %s", descKey, proto)
}
b.Put(descKey, proto)
b.CPut(descKey, proto, expected)
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/catalog/descs/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func TestAddUncommittedDescriptorAndMutableResolution(t *testing.T) {
immByNameAfter, err := descriptors.GetImmutableDatabaseByName(ctx, txn, "new_name", flags)
require.NoError(t, err)
require.Equal(t, db.GetVersion(), immByNameAfter.GetVersion())
require.Equal(t, mut.ImmutableCopy(), immByNameAfter)
require.Equal(t, mut.ImmutableCopy().DescriptorProto(), immByNameAfter.DescriptorProto())

_, immByIDAfter, err := descriptors.GetImmutableDatabaseByID(ctx, txn, db.GetID(), flags)
require.NoError(t, err)
Expand Down Expand Up @@ -733,7 +733,7 @@ func TestDescriptorCache(t *testing.T) {
}
found := cat.LookupDescriptorEntry(mut.ID)
require.NotEmpty(t, found)
require.Equal(t, mut.ImmutableCopy(), found)
require.Equal(t, mut.ImmutableCopy().DescriptorProto(), found.DescriptorProto())
return nil
}))
})
Expand Down Expand Up @@ -768,7 +768,7 @@ func TestDescriptorCache(t *testing.T) {
return err
}
require.Len(t, dbDescs, 4)
require.Equal(t, mut.ImmutableCopy(), dbDescs[0])
require.Equal(t, mut.ImmutableCopy().DescriptorProto(), dbDescs[0].DescriptorProto())
return nil
}))
})
Expand Down
16 changes: 15 additions & 1 deletion pkg/sql/catalog/descs/uncommitted_descriptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
Expand Down Expand Up @@ -186,7 +187,20 @@ func (ud *uncommittedDescriptors) upsert(
newBytes += mut.ByteSize()
}
ud.mutable.Upsert(mut)
uncommitted := mut.ImmutableCopy()
// Upserting a descriptor into the "uncommitted" set implies
// this descriptor is going to be written to storage very soon. We
// compute what the raw bytes of this descriptor in storage is going to
// look like when that write happens, so that any further update to this
// descriptor, which will be retrieved from the "uncommitted" set, will
// carry the correct raw bytes in storage with it.
var val roachpb.Value
if err = val.SetProto(mut.DescriptorProto()); err != nil {
return err
}
rawBytesInStorageAfterPendingWrite := val.TagAndDataBytes()
uncommittedBuilder := mut.NewBuilder()
uncommittedBuilder.SetRawBytesInStorage(rawBytesInStorageAfterPendingWrite)
uncommitted := uncommittedBuilder.BuildImmutable()
newBytes += uncommitted.ByteSize()
ud.uncommitted.Upsert(uncommitted, uncommitted.SkipNamespace())
if err = ud.memAcc.Grow(ctx, newBytes); err != nil {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/internal/catkv/catalog_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func build(
if err := b.RunPostDeserializationChanges(); err != nil {
return nil, errors.NewAssertionErrorWithWrappedErrf(err, "error during RunPostDeserializationChanges")
}
b.SetRawBytesInStorage(rowValue.TagAndDataBytes())
desc := b.BuildImmutable()
if id != desc.GetID() {
return nil, errors.AssertionFailedf("unexpected ID %d in descriptor", desc.GetID())
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ func unsafeReadDescriptor(
if err != nil || b == nil {
return nil, notice, err
}
b.SetRawBytesInStorage(descRow.Value.TagAndDataBytes())
return b.BuildExistingMutable(), notice, nil
}

Expand Down

0 comments on commit aa3c59c

Please sign in to comment.