Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
76350: sql: introduce new FIFO cache for txnID cache r=Azhng a=Azhng

Reviewer note: first commit belongs to cockroachdb#76244

---

Previously, txnID cache relied on cache.UnorderedCache for storage and
FIFO eviction behavior. However, due to unintended allocation behavior
within cache.UnorderedCache, this yielded about 20% throughput drop in
kv95 benchmark.

This commit introduces a new FIFO cache, built to remove all allocation
during write operation. This commit removes all performance hit caused
by the previous use of cache.UnorderedCache.

Resolves cockroachdb#76013

Release note: None

76590: sql/catalog,migrations: propagate HasPostDeserializationChanges, simplify migrations r=ajwerner a=ajwerner

This is an imperfect change that feels a bit ad-hoc for my liking, but does
improve the situation. The problem is that in recent changes, we no longer
just retrieve mutable descriptors and then sometimes clone Immutable
descriptors. Instead we create the more general Immutable descriptors and
clone out mutables. The problem is that when we did this, we threw away
information regarding whether the descriptor itself had been modified
by post-deserialization. In general, this is mostly an exercise in plumbing.
The key feature is that when we retreive a Mutable descriptor and it was
changed, we need to know that. This change tests that.

There is some ad-hoc code to propagate isUncommitedVersion in various places
which I don't feel great about, but it also felt wrong not to. A follow-up
should come through to really nail down the properties here.

The existence of NewBuilder and the fact that it's called in various places
is another mechanism by which this information could be erased, but that's
less of a concern for me. This change makes it possible to simplify migrations
around descriptor upgrades.

### migrations: simplify descriptor migrations

We have two migrations which have the goal of iterating through the descriptors
and re-writing them if there are any changes due to the post-deserialization
logic. They were needlessly complex and low-level. Probably part of the reason
was that we were swallowing this information. This commit reworks them to use
shared library code.

Probably one of them could be eliminated altogether.

Release note: None

Co-authored-by: Azhng <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
3 people committed Feb 16, 2022
3 parents b429f25 + 8c35183 + 698b97c commit 9f98cde
Show file tree
Hide file tree
Showing 36 changed files with 1,117 additions and 551 deletions.
3 changes: 0 additions & 3 deletions pkg/migration/migrations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ go_library(
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/dbdesc",
"//pkg/sql/catalog/descbuilder",
"//pkg/sql/catalog/descidgen",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
Expand All @@ -48,8 +47,6 @@ go_library(
"//pkg/sql/privilege",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/retry",
Expand Down
73 changes: 73 additions & 0 deletions pkg/migration/migrations/descriptor_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/migration"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descidgen"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
)

// CreateSystemTable is a function to inject a new system table. If the table
Expand Down Expand Up @@ -63,3 +66,73 @@ func createSystemTable(
return txn.Run(ctx, b)
})
}

// runPostDeserializationChangesOnAllDescriptors will paginate through the
// descriptor table and upgrade all descriptors in need of upgrading.
func runPostDeserializationChangesOnAllDescriptors(
ctx context.Context, d migration.TenantDeps,
) error {
// maybeUpgradeDescriptors writes the descriptors with the given IDs
// and writes new versions for all descriptors which required post
// deserialization changes.
maybeUpgradeDescriptors := func(
ctx context.Context, d migration.TenantDeps, toUpgrade []descpb.ID,
) error {
return d.CollectionFactory.Txn(ctx, d.InternalExecutor, d.DB, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
descs, err := descriptors.GetMutableDescriptorsByID(ctx, txn, toUpgrade...)
if err != nil {
return err
}
batch := txn.NewBatch()
for _, desc := range descs {
if !desc.GetPostDeserializationChanges().HasChanges() {
continue
}
if err := descriptors.WriteDescToBatch(
ctx, false, desc, batch,
); err != nil {
return err
}
}
return txn.Run(ctx, batch)
})
}

query := `SELECT id, length(descriptor) FROM system.descriptor ORDER BY id DESC`
rows, err := d.InternalExecutor.QueryIterator(
ctx, "retrieve-descriptors-for-upgrade", nil /* txn */, query,
)
if err != nil {
return err
}
defer func() { _ = rows.Close() }()
var toUpgrade []descpb.ID
var curBatchBytes int
const maxBatchSize = 1 << 19 // 512 KiB
ok, err := rows.Next(ctx)
for ; ok && err == nil; ok, err = rows.Next(ctx) {
datums := rows.Cur()
id := tree.MustBeDInt(datums[0])
size := tree.MustBeDInt(datums[1])
if curBatchBytes+int(size) > maxBatchSize && curBatchBytes > 0 {
if err := maybeUpgradeDescriptors(ctx, d, toUpgrade); err != nil {
return err
}
toUpgrade = toUpgrade[:0]
}
curBatchBytes += int(size)
toUpgrade = append(toUpgrade, descpb.ID(id))
}
if err != nil {
return err
}
if err := rows.Close(); err != nil {
return err
}
if len(toUpgrade) == 0 {
return nil
}
return maybeUpgradeDescriptors(ctx, d, toUpgrade)
}
134 changes: 1 addition & 133 deletions pkg/migration/migrations/grant_option_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,145 +15,13 @@ import (

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/migration"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descbuilder"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
)

// grantOptionMigration iterates through every descriptor and sets a user's grant option bits
// equal to its privilege bits if it holds the "GRANT" privilege.
func grantOptionMigration(
ctx context.Context, _ clusterversion.ClusterVersion, d migration.TenantDeps, _ *jobs.Job,
) error {
query := `SELECT id, descriptor, crdb_internal_mvcc_timestamp FROM system.descriptor ORDER BY ID ASC`
rows, err := d.InternalExecutor.QueryIterator(
ctx, "retrieve-grant-options", nil /* txn */, query,
)
if err != nil {
return err
}

addGrantOptionFunc := func(ids []descpb.ID, descs []descpb.Descriptor, timestamps []hlc.Timestamp) error {
var modifiedDescs []catalog.MutableDescriptor
for i, id := range ids {
b := descbuilder.NewBuilderWithMVCCTimestamp(&descs[i], timestamps[i])
if b == nil {
return errors.Newf("unable to find descriptor for id %d", id)
}

b.RunPostDeserializationChanges()
mutableDesc := b.BuildExistingMutable()

modifiedDescs = append(modifiedDescs, mutableDesc)
}
if err := writeModifiedDescriptors(ctx, d, modifiedDescs); err != nil {
return err
}
return nil
}

return addGrantOptionMigration(ctx, rows, addGrantOptionFunc, 1<<19 /* 512 KiB batch size */)
}

// addGrantOptionFunction is used in addGrantOptionMigration to maybe add grant options
// of descriptors specified by the id.
type addGrantOptionFunction func(ids []descpb.ID, descs []descpb.Descriptor, timestamps []hlc.Timestamp) error

// addGrantOptionMigration is an abstraction for adding grant options.
// The rows provided should be the result of a select ID, descriptor, crdb_internal_mvcc_timestamp
// from system.descriptor table.
// The datums returned from the query are parsed to grab the descpb.Descriptor
// and addGrantOptionFunction is called on the desc.
// If minBatchSizeInBytes is specified, fixDescriptors will only be called once the
// size of the descriptors in the id array surpasses minBatchSizeInBytes.
func addGrantOptionMigration(
ctx context.Context,
rows sqlutil.InternalRows,
grantOptionFunc addGrantOptionFunction,
minBatchSizeInBytes int,
) error {
defer func() { _ = rows.Close() }()
ok, err := rows.Next(ctx)
if err != nil {
return err
}
currSize := 0 // in bytes.
var ids []descpb.ID
var descs []descpb.Descriptor
var timestamps []hlc.Timestamp
for ; ok; ok, err = rows.Next(ctx) {
if err != nil {
return err
}
datums := rows.Cur()
id, desc, ts, err := unmarshalDescFromDescriptorRow(datums)
if err != nil {
return err
}
ids = append(ids, id)
descs = append(descs, desc)
timestamps = append(timestamps, ts)
currSize += desc.Size()
if currSize > minBatchSizeInBytes || minBatchSizeInBytes == 0 {
err = grantOptionFunc(ids, descs, timestamps)
if err != nil {
return err
}
// Reset size and id array after the batch is fixed.
currSize = 0
ids = nil
descs = nil
timestamps = nil
}
}
// Fix remaining descriptors.
return grantOptionFunc(ids, descs, timestamps)
}

// unmarshalDescFromDescriptorRow takes in an InternalRow from a query that gets:
// ID, descriptor, crdb_internal_mvcc_timestamp from the system.descriptor table.
// ie: SELECT id, descriptor, crdb_internal_mvcc_timestamp FROM system.descriptor ORDER BY ID ASC
// and parses the id, descriptor and mvcc_timestamp fields.
func unmarshalDescFromDescriptorRow(
datums tree.Datums,
) (descpb.ID, descpb.Descriptor, hlc.Timestamp, error) {
id := descpb.ID(*datums[0].(*tree.DInt))
ts, err := tree.DecimalToHLC(&datums[2].(*tree.DDecimal).Decimal)
if err != nil {
return id, descpb.Descriptor{}, ts, errors.Wrapf(err,
"failed to convert MVCC timestamp decimal to HLC for id %d", id)
}
var desc descpb.Descriptor
if err := protoutil.Unmarshal(([]byte)(*datums[1].(*tree.DBytes)), &desc); err != nil {
return id, descpb.Descriptor{}, ts, errors.Wrapf(err,
"failed to unmarshal descriptor with ID %d", id)
}
return id, desc, ts, nil
}

// writeModifiedDescriptors writes the descriptors that we have given grant option privileges
// to back to batch
func writeModifiedDescriptors(
ctx context.Context, d migration.TenantDeps, modifiedDescs []catalog.MutableDescriptor,
) error {
return d.CollectionFactory.Txn(ctx, d.InternalExecutor, d.DB, func(
ctx context.Context, txn *kv.Txn, descriptors *descs.Collection,
) error {
batch := txn.NewBatch()
for _, desc := range modifiedDescs {
err := descriptors.WriteDescToBatch(ctx, false, desc, batch)
if err != nil {
return err
}
}
return txn.Run(ctx, batch)
})
return runPostDeserializationChangesOnAllDescriptors(ctx, d)
}
Loading

0 comments on commit 9f98cde

Please sign in to comment.