Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
124537: kvserver: allow retrying scatter processing with more errors r=nvanbenschoten a=kvoli

Previously, scatter processing would only be retried when encountering a snapshot error. Other errors commonly occur, which we expect to be transient and retryable, such as the range descriptor changing or rejected lease transfers. The range descriptor change error being most common, due to the proclivity of clients to issue splits alongside scatter requests, which would update the range descriptor.

Retry failed scatter replicate processing if the returned error matches any of `IsRetriableReplicationChangeError`s, similar to range splits. Note the maximum number of retries remains at 5 for scatter.

Resolves: #124522
Release note: None

124751: catalog: collecting virtual schemas can be expensive for some ORM queries  r=fqazi a=fqazi

Previously, when running ORM queries with larger schemas or our ORM query bench test we noticed that aggregating virtual schema objects could take a big chunk of time. This was because converting the internal the VirtualTable / VirtualSchemas into a nstree.Catalog was not cheap. To address this, this patch will:

1. Convert the VirtualTable / VirtualSchemas into a nstree.Catalog which can be used between collections (the entire state here is immutable).
2. Reduce some extra copies that happen when copying betwen nstree.Catalog objects, by allowing references to be ingested.

Fixes: #124750

```Orignal:
BenchmarkORMQueries/asyncpg_types         	       1	16914664491 ns/op	         0 roundtrips	6151796312 B/op	33275332 allocs/op
After having a shared nstree.Catalog object:
BenchmarkORMQueries/asyncpg_types         	       1	13252306945 ns/op	         0 roundtrips	5442736128 B/op	29586155 allocs/op
After optimizing MutableCatalog.AddAll:
BenchmarkORMQueries/asyncpg_types         	       1	12399752839 ns/op	         0 roundtrips	4602288096 B/op	27474078 allocs/op
````

Co-authored-by: Austen McClernon <[email protected]>
Co-authored-by: Faizan Qazi <[email protected]>
  • Loading branch information
3 people committed May 28, 2024
3 parents 878d6be + bde1e38 + e38c88d commit 2cd70a3
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 43 deletions.
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -4156,8 +4156,12 @@ func (r *Replica) adminScatter(
ctx, r, desc, conf, true /* scatter */, false, /* dryRun */
)
if err != nil {
// TODO(tbg): can this use IsRetriableReplicationError?
if isSnapshotError(err) {
// If the error is expected to be transient, retry processing the range.
// This is most likely to occur when concurrent split and scatters are
// issued, in which case the scatter may fail due to the range split
// updating the descriptor while processing.
if IsRetriableReplicationChangeError(err) {
log.VEventf(ctx, 1, "retrying scatter process after retryable error: %v", err)
continue
}
break
Expand Down
5 changes: 2 additions & 3 deletions pkg/sql/catalog/descs/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/hydrateddesccache"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/internal/catkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
Expand All @@ -30,7 +29,7 @@ type CollectionFactory struct {
settings *cluster.Settings
codec keys.SQLCodec
leaseMgr *lease.Manager
virtualSchemas catalog.VirtualSchemas
virtualSchemas VirtualCatalogHolder
hydrated *hydrateddesccache.Cache
systemDatabase *catkv.SystemDatabaseCache
spanConfigSplitter spanconfig.Splitter
Expand Down Expand Up @@ -72,7 +71,7 @@ func NewCollectionFactory(
ctx context.Context,
settings *cluster.Settings,
leaseMgr *lease.Manager,
virtualSchemas catalog.VirtualSchemas,
virtualSchemas VirtualCatalogHolder,
hydrated *hydrateddesccache.Cache,
spanConfigSplitter spanconfig.Splitter,
spanConfigLimiter spanconfig.Limiter,
Expand Down
36 changes: 10 additions & 26 deletions pkg/sql/catalog/descs/virtual_descriptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,24 @@
package descs

import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// VirtualCatalogHolder holds a catalog of all virtual descriptors.
type VirtualCatalogHolder interface {
catalog.VirtualSchemas
// GetCatalog returns a catalog of vritual descriptors.
GetCatalog() nstree.Catalog
}

type virtualDescriptors struct {
vs catalog.VirtualSchemas
vs VirtualCatalogHolder
}

func makeVirtualDescriptors(schemas catalog.VirtualSchemas) virtualDescriptors {
func makeVirtualDescriptors(schemas VirtualCatalogHolder) virtualDescriptors {
return virtualDescriptors{vs: schemas}
}

Expand Down Expand Up @@ -82,25 +86,5 @@ func (tc virtualDescriptors) getSchemaByID(id descpb.ID) catalog.VirtualSchema {
}

func (tc virtualDescriptors) addAllToCatalog(mc nstree.MutableCatalog) {
_ = tc.vs.Visit(func(vd catalog.Descriptor, comment string) error {
mc.UpsertDescriptor(vd)
if vd.GetID() != keys.PublicSchemaID && !vd.Dropped() && !vd.SkipNamespace() {
mc.UpsertNamespaceEntry(vd, vd.GetID(), hlc.Timestamp{})
}
if comment == "" {
return nil
}
ck := catalogkeys.CommentKey{ObjectID: uint32(vd.GetID())}
switch vd.DescriptorType() {
case catalog.Database:
ck.CommentType = catalogkeys.DatabaseCommentType
case catalog.Schema:
ck.CommentType = catalogkeys.SchemaCommentType
case catalog.Table:
ck.CommentType = catalogkeys.TableCommentType
default:
return nil
}
return mc.UpsertComment(ck, comment)
})
mc.AddAll(tc.vs.GetCatalog())
}
52 changes: 40 additions & 12 deletions pkg/sql/catalog/nstree/catalog_mutable.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,26 @@ func (mc *MutableCatalog) ensureForID(id descpb.ID) *byIDEntry {
newEntry := &byIDEntry{
id: id,
}
if replaced := mc.byID.upsert(newEntry); replaced != nil {
*newEntry = *(replaced.(*byIDEntry))
if replaced := mc.ensureForIDWithEntry(newEntry); replaced != nil {
*newEntry = *replaced
} else {
mc.byteSize += newEntry.ByteSize()
}
return newEntry
}

// ensureForWithEntry upserts the entry and returns either the current entry or
// the one that has replaced.
func (mc *MutableCatalog) ensureForIDWithEntry(newEntry *byIDEntry) *byIDEntry {
mc.maybeInitialize()
if replaced := mc.byID.upsert(newEntry); replaced != nil {
return replaced.(*byIDEntry)
} else {
mc.byteSize += newEntry.ByteSize()
}
return nil
}

func (mc *MutableCatalog) maybeGetByID(id descpb.ID) *byIDEntry {
if !mc.IsInitialized() {
return nil
Expand All @@ -73,12 +85,22 @@ func (mc *MutableCatalog) ensureForName(key catalog.NameKey) *byNameEntry {
parentSchemaID: key.GetParentSchemaID(),
name: key.GetName(),
}
if replaced := mc.ensureForNameWithEntry(newEntry); replaced != nil {
*newEntry = *replaced
}
return newEntry
}

// ensureForNameWithEntry upserts the entry and returns either the current entry or
// the one that has replaced.
func (mc *MutableCatalog) ensureForNameWithEntry(newEntry *byNameEntry) *byNameEntry {
mc.maybeInitialize()
if replaced := mc.byName.upsert(newEntry); replaced != nil {
*newEntry = *(replaced.(*byNameEntry))
return replaced.(*byNameEntry)
} else {
mc.byteSize += newEntry.ByteSize()
}
return newEntry
return nil
}

// DeleteByName removes all by-name mappings from the MutableCatalog.
Expand Down Expand Up @@ -197,17 +219,23 @@ func (mc *MutableCatalog) AddAll(c Catalog) {
return
}
_ = c.byName.ascend(func(entry catalog.NameEntry) error {
e := mc.ensureForName(entry)
mc.byteSize -= e.ByteSize()
*e = *(entry.(*byNameEntry))
mc.byteSize += e.ByteSize()
ne := entry.(*byNameEntry)
e := mc.ensureForNameWithEntry(ne)
if e != nil {
// Update the size since the entry was replaced.
mc.byteSize -= e.ByteSize()
mc.byteSize += ne.ByteSize()
}
return nil
})
_ = c.byID.ascend(func(entry catalog.NameEntry) error {
e := mc.ensureForID(entry.GetID())
mc.byteSize -= e.ByteSize()
*e = *(entry.(*byIDEntry))
mc.byteSize += e.ByteSize()
ne := entry.(*byIDEntry)
e := mc.ensureForIDWithEntry(ne)
if e != nil {
// Update the size since the entry was replaced.
mc.byteSize -= e.ByteSize()
mc.byteSize += ne.ByteSize()
}
return nil
})
}
37 changes: 37 additions & 0 deletions pkg/sql/virtual_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@ import (
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catsessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemaexpr"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
Expand Down Expand Up @@ -409,6 +412,7 @@ type VirtualSchemaHolder struct {
defsByID map[descpb.ID]*virtualDefEntry
orderedNames []string

catalogCache nstree.MutableCatalog
// Needed for backward-compat on crdb_internal.ranges{_no_leases}.
// Remove in v23.2.
st *cluster.Settings
Expand Down Expand Up @@ -452,7 +456,13 @@ func (vs *VirtualSchemaHolder) Visit(fn func(desc catalog.Descriptor, comment st
return nil
}

// GetCatalog makes VirtualSchemaHolder implement descs.VirtualCatalogHolder.
func (vs *VirtualSchemaHolder) GetCatalog() nstree.Catalog {
return vs.catalogCache.Catalog
}

var _ catalog.VirtualSchemas = (*VirtualSchemaHolder)(nil)
var _ descs.VirtualCatalogHolder = (*VirtualSchemaHolder)(nil)

type virtualSchemaEntry struct {
desc catalog.SchemaDescriptor
Expand Down Expand Up @@ -969,6 +979,33 @@ func NewVirtualSchemaHolder(
order++
}
sort.Strings(vs.orderedNames)

// Setup the catalog cache inside the virtual schema holder.
err := vs.Visit(func(vd catalog.Descriptor, comment string) error {
vs.catalogCache.UpsertDescriptor(vd)
if vd.GetID() != keys.PublicSchemaID && !vd.Dropped() && !vd.SkipNamespace() {
vs.catalogCache.UpsertNamespaceEntry(vd, vd.GetID(), hlc.Timestamp{})
}
if comment == "" {
return nil
}
ck := catalogkeys.CommentKey{ObjectID: uint32(vd.GetID())}
switch vd.DescriptorType() {
case catalog.Database:
ck.CommentType = catalogkeys.DatabaseCommentType
case catalog.Schema:
ck.CommentType = catalogkeys.SchemaCommentType
case catalog.Table:
ck.CommentType = catalogkeys.TableCommentType
default:
return errors.AssertionFailedf("unsupported descriptor type for comment: %s", vd.DescriptorType())
}
return vs.catalogCache.UpsertComment(ck, comment)
})
if err != nil {
return nil, err
}

return vs, nil
}

Expand Down

0 comments on commit 2cd70a3

Please sign in to comment.