Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
72669: bazel: properly generate `.eg.go` code in `pkg/sql/colconv` via bazel r=rail a=rickystewart

Release note: None

72714: sql/catalog/lease: permit gaps in descriptor history r=ajwerner a=ajwerner

In #71239, we added a new mechanism to look up historical descriptors. I
erroneously informed @jameswsj10 that we would never have gaps in the
descriptor history, and, thus, when looking up historical descriptors, we
could always use the earliest descriptor's modification time as the bounds
for the relevant query.

This turns out to not be true. Consider the case where version 3 is a
historical version and then version 4 pops up and gets leased. Version 3 will
get removed if it is not referenced. In the meantime, version 3 existed when we
went to go find version 2. At that point, we'll inject version 2 and have
version 4 leased.  We need to make sure we can handle the case where we need to
go fetch version 3.

In the meantime, this change also removes some logic added to support the
eventual resurrection of #59606 whereby we'll use the export request to fetch
descriptor history to power historical queries even in the face of descriptors
having been deleted.

Fixes #72706.

Release note: None

72740: sql/catalog/descs: fix perf regression  r=ajwerner a=ajwerner

This commit in #71936 had the unfortunate side-effect of allocating and forcing reads on the `uncommittedDescriptors` set even when we aren't looking for the system database. This has an outsized impact on the performance of the single-node, high-core-count KV runs. Instead of always initializing the system database, just do it when we access it. 

```
name             old ops/s   new ops/s   delta
KV95-throughput  88.6k ± 0%  94.8k ± 1%   +7.00%  (p=0.008 n=5+5)

name             old ms/s    new ms/s    delta
KV95-P50          1.60 ± 0%   1.40 ± 0%  -12.50%  (p=0.008 n=5+5)
KV95-Avg          0.60 ± 0%   0.50 ± 0%  -16.67%  (p=0.008 n=5+5)
```

The second commit is more speculative and came from looking at a profile where 1.6% of the allocated garbage was due to that `NameInfo` even though we'll never, ever hit it.

<img width="2345" alt="Screen Shot 2021-11-15 at 12 57 31 AM" src="https://user-images.githubusercontent.com/1839234/141729924-d00eebab-b35c-42bd-8d0b-ee39f3ac7d46.png">
 
 Fixes #72499

Co-authored-by: Ricky Stewart <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
3 people committed Nov 15, 2021
4 parents c968f1f + d58ec7a + 340346d + 322aeaf commit 47a2e2f
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 74 deletions.
29 changes: 22 additions & 7 deletions pkg/sql/catalog/descs/uncommitted_descriptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package descs

import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -100,11 +101,16 @@ type uncommittedDescriptors struct {
//
// TODO(postamar): better uncommitted namespace changes handling after 22.1.
descNames nstree.Set

// addedSystemDatabase is used to mark whether the optimization to add the
// system database to the set of uncommitted descriptors has occurred.
addedSystemDatabase bool
}

func (ud *uncommittedDescriptors) reset() {
ud.descs.Clear()
ud.descNames.Clear()
ud.addedSystemDatabase = false
}

// add adds a descriptor to the set of uncommitted descriptors and returns
Expand All @@ -124,7 +130,9 @@ func (ud *uncommittedDescriptors) add(mut catalog.MutableDescriptor) (catalog.De
// checkOut checks out an uncommitted mutable descriptor for use in the
// transaction. This descriptor should later be checked in again.
func (ud *uncommittedDescriptors) checkOut(id descpb.ID) (catalog.MutableDescriptor, error) {
ud.maybeInitialize()
if id == keys.SystemDatabaseID {
ud.maybeAddSystemDatabase()
}
entry := ud.descs.GetByID(id)
if entry == nil {
return nil, errors.NewAssertionErrorWithWrappedErrf(
Expand Down Expand Up @@ -182,7 +190,9 @@ func maybeRefreshCachedFieldsOnTypeDescriptor(

// getByID looks up an uncommitted descriptor by ID.
func (ud *uncommittedDescriptors) getByID(id descpb.ID) catalog.Descriptor {
ud.maybeInitialize()
if id == keys.SystemDatabaseID && !ud.addedSystemDatabase {
ud.maybeAddSystemDatabase()
}
entry := ud.descs.GetByID(id)
if entry == nil {
return nil
Expand All @@ -200,12 +210,18 @@ func (ud *uncommittedDescriptors) getByID(id descpb.ID) catalog.Descriptor {
func (ud *uncommittedDescriptors) getByName(
dbID descpb.ID, schemaID descpb.ID, name string,
) (hasKnownRename bool, desc catalog.Descriptor) {
ud.maybeInitialize()
if dbID == 0 && schemaID == 0 && name == systemschema.SystemDatabaseName {
ud.maybeAddSystemDatabase()
}
// Walk latest to earliest so that a DROP followed by a CREATE with the same
// name will result in the CREATE being seen.
if got := ud.descs.GetByName(dbID, schemaID, name); got != nil {
return false, got.(*uncommittedDescriptor).immutable
}
// Check whether the set is empty to avoid allocating the NameInfo.
if ud.descNames.Empty() {
return false, nil
}
return ud.descNames.Contains(descpb.NameInfo{
ParentID: dbID,
ParentSchemaID: schemaID,
Expand All @@ -216,7 +232,6 @@ func (ud *uncommittedDescriptors) getByName(
func (ud *uncommittedDescriptors) iterateNewVersionByID(
fn func(entry catalog.NameEntry, originalVersion lease.IDVersion) error,
) error {
ud.maybeInitialize()
return ud.descs.IterateByID(func(entry catalog.NameEntry) error {
mut := entry.(*uncommittedDescriptor).mutable
if mut == nil || mut.IsNew() || !mut.IsUncommittedVersion() {
Expand All @@ -229,7 +244,6 @@ func (ud *uncommittedDescriptors) iterateNewVersionByID(
func (ud *uncommittedDescriptors) iterateImmutableByID(
fn func(imm catalog.Descriptor) error,
) error {
ud.maybeInitialize()
return ud.descs.IterateByID(func(entry catalog.NameEntry) error {
return fn(entry.(*uncommittedDescriptor).immutable)
})
Expand Down Expand Up @@ -286,8 +300,9 @@ var systemUncommittedDatabase = &uncommittedDescriptor{
// value lazily when this is needed, which ought to be exceedingly rare.
}

func (ud *uncommittedDescriptors) maybeInitialize() {
if ud.descs.Len() == 0 {
func (ud *uncommittedDescriptors) maybeAddSystemDatabase() {
if !ud.addedSystemDatabase {
ud.addedSystemDatabase = true
ud.descs.Upsert(systemUncommittedDatabase)
}
}
40 changes: 26 additions & 14 deletions pkg/sql/catalog/lease/descriptor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type descriptorState struct {
func (t *descriptorState) findForTimestamp(
ctx context.Context, timestamp hlc.Timestamp,
) (*descriptorVersionState, bool, error) {
expensiveLogEnabled := log.ExpensiveLogEnabled(ctx, 2)
t.mu.Lock()
defer t.mu.Unlock()

Expand All @@ -109,7 +110,7 @@ func (t *descriptorState) findForTimestamp(
latest := i+1 == len(t.mu.active.data)
if !desc.hasExpired(timestamp) {
// Existing valid descriptor version.
desc.incRefCount(ctx)
desc.incRefCount(ctx, expensiveLogEnabled)
return desc, latest, nil
}

Expand Down Expand Up @@ -220,29 +221,37 @@ func (t *descriptorState) release(ctx context.Context, s *descriptorVersionState

// Decrements the refcount and returns true if the lease has to be removed
// from the store.
decRefcount := func(s *descriptorVersionState) *storedLease {
expensiveLoggingEnabled := log.ExpensiveLogEnabled(ctx, 2)
decRefCount := func(s *descriptorVersionState) (shouldRemove bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.mu.refcount--
if expensiveLoggingEnabled {
log.Infof(ctx, "release: %s", s.stringLocked())
}
return s.mu.refcount == 0
}
maybeMarkRemoveStoredLease := func(s *descriptorVersionState) *storedLease {
// Figure out if we'd like to remove the lease from the store asap (i.e.
// when the refcount drops to 0). If so, we'll need to mark the lease as
// invalid.
removeOnceDereferenced := t.m.removeOnceDereferenced() ||
removeOnceDereferenced :=
// Release from the store if the descriptor has been dropped or taken
// offline.
t.mu.takenOffline ||
// Release from the store if the lease is not for the latest
// version; only leases for the latest version can be acquired.
s != t.mu.active.findNewest() ||
s.GetVersion() < t.mu.maxVersionSeen

// Release from the store if the lease is not for the latest
// version; only leases for the latest version can be acquired.
s != t.mu.active.findNewest() ||
s.GetVersion() < t.mu.maxVersionSeen ||
t.m.removeOnceDereferenced()
if !removeOnceDereferenced {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
s.mu.refcount--
if log.ExpensiveLogEnabled(ctx, 2) {
log.Infof(ctx, "release: %s", s.stringLocked())
}
if s.mu.refcount < 0 {
panic(errors.AssertionFailedf("negative ref count: %s", s))
}

if s.mu.refcount == 0 && s.mu.lease != nil && removeOnceDereferenced {
l := s.mu.lease
s.mu.lease = nil
Expand All @@ -251,9 +260,12 @@ func (t *descriptorState) release(ctx context.Context, s *descriptorVersionState
return nil
}
maybeRemoveLease := func() *storedLease {
if shouldRemove := decRefCount(s); !shouldRemove {
return nil
}
t.mu.Lock()
defer t.mu.Unlock()
if l := decRefcount(s); l != nil {
if l := maybeMarkRemoveStoredLease(s); l != nil {
t.mu.active.remove(s)
return l
}
Expand Down
12 changes: 5 additions & 7 deletions pkg/sql/catalog/lease/descriptor_version_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ func (s *descriptorVersionState) Underlying() catalog.Descriptor {
}

func (s *descriptorVersionState) Expiration() hlc.Timestamp {
s.mu.Lock()
defer s.mu.Unlock()
return s.mu.expiration
return s.getExpiration()
}

func (s *descriptorVersionState) SafeMessage() string {
Expand Down Expand Up @@ -113,15 +111,15 @@ func (s *descriptorVersionState) hasExpiredLocked(timestamp hlc.Timestamp) bool
return s.mu.expiration.LessEq(timestamp)
}

func (s *descriptorVersionState) incRefCount(ctx context.Context) {
func (s *descriptorVersionState) incRefCount(ctx context.Context, expensiveLogEnabled bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.incRefCountLocked(ctx)
s.incRefCountLocked(ctx, expensiveLogEnabled)
}

func (s *descriptorVersionState) incRefCountLocked(ctx context.Context) {
func (s *descriptorVersionState) incRefCountLocked(ctx context.Context, expensiveLogEnabled bool) {
s.mu.refcount++
if log.ExpensiveLogEnabled(ctx, 2) {
if expensiveLogEnabled {
log.VEventf(ctx, 2, "descriptorVersionState.incRefCount: %s", s.stringLocked())
}
}
Expand Down
68 changes: 55 additions & 13 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package lease
import (
"context"
"fmt"
"sort"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -207,15 +208,23 @@ func getDescriptorsFromStoreForInterval(
lowerBound, upperBound hlc.Timestamp,
) ([]historicalDescriptor, error) {
// Ensure lower bound is not an empty timestamp (now).
if lowerBound.Logical == 0 && lowerBound.WallTime == 0 {
return nil, errors.New("Lower bound for export request cannot be 0")
if lowerBound.IsEmpty() {
return nil, errors.AssertionFailedf(
"getDescriptorsFromStoreForInterval: lower bound cannot be empty")
}
// TODO(ajwerner): We'll want to lift this limitation in order to allow this
// function to find descriptors which could not be found by leasing. This
// will also require some careful managing of expiration timestamps for the
// final descriptor.
if upperBound.IsEmpty() {
return nil, errors.AssertionFailedf(
"getDescriptorsFromStoreForInterval: upper bound cannot be empty")
}

// Create an export request (1 kv call) for all descriptors for given
// descriptor ID written during the interval [timestamp, endTimestamp).
batchRequestHeader := roachpb.Header{}
if upperBound.WallTime != 0 {
batchRequestHeader = roachpb.Header{Timestamp: upperBound.Prev()}
batchRequestHeader := roachpb.Header{
Timestamp: upperBound.Prev(),
}
descriptorKey := catalogkeys.MakeDescMetadataKey(codec, id)
requestHeader := roachpb.RequestHeader{
Expand Down Expand Up @@ -308,17 +317,50 @@ func getDescriptorsFromStoreForInterval(
func (m *Manager) readOlderVersionForTimestamp(
ctx context.Context, id descpb.ID, timestamp hlc.Timestamp,
) ([]historicalDescriptor, error) {
// Retrieve the endTimestamp for our query, which will be the modification
// time of the first descriptor in the manager's active set.
// Retrieve the endTimestamp for our query, which will be the first
// modification timestamp above our query timestamp.
t := m.findDescriptorState(id, false /*create*/)
endTimestamp := func() hlc.Timestamp {
// A missing descriptor state indicates that this descriptor has been
// purged in the meantime. We should go back around in the acquisition
// loop to make the appropriate error appear.
if t == nil {
return nil, nil
}
endTimestamp, done := func() (hlc.Timestamp, bool) {
t.mu.Lock()
defer t.mu.Unlock()

// If there are no descriptors, then we won't have a valid end timestamp.
if len(t.mu.active.data) == 0 {
return hlc.Timestamp{}
return hlc.Timestamp{}, true
}
return t.mu.active.data[0].GetModificationTime()
// We permit gaps in historical versions. We want to find the timestamp
// that represents the start of the validity interval for the known version
// which immediately follows the timestamps we're searching for.
i := sort.Search(len(t.mu.active.data), func(i int) bool {
return timestamp.Less(t.mu.active.data[i].GetModificationTime())
})

// If the timestamp we're searching for is somehow after the last descriptor
// we have in play, then either we have the right descriptor, or some other
// shenanigans where we've evicted the descriptor has occurred.
//
// TODO(ajwerner): When we come to modify this code to allow us to find
// historical descriptors which have been dropped, we'll need to rework
// this case and support providing no upperBound to
// getDescriptorFromStoreForInterval.
if i == len(t.mu.active.data) ||
// If we found a descriptor that isn't the first descriptor, go and check
// whether the descriptor for which we're searching actually exists. This
// will deal with cases where a concurrent fetch filled it in for us.
i > 0 && timestamp.Less(t.mu.active.data[i-1].getExpiration()) {
return hlc.Timestamp{}, true
}
return t.mu.active.data[i].GetModificationTime(), false
}()
if done {
return nil, nil
}

// Retrieve descriptors in range [timestamp, endTimestamp) in decreasing
// modification time order.
Expand Down Expand Up @@ -666,9 +708,9 @@ func NewLeaseManager(
func NameMatchesDescriptor(
desc catalog.Descriptor, parentID descpb.ID, parentSchemaID descpb.ID, name string,
) bool {
return desc.GetParentID() == parentID &&
desc.GetParentSchemaID() == parentSchemaID &&
desc.GetName() == name
return desc.GetName() == name &&
desc.GetParentID() == parentID &&
desc.GetParentSchemaID() == parentSchemaID
}

// findNewest returns the newest descriptor version state for the ID.
Expand Down
Loading

0 comments on commit 47a2e2f

Please sign in to comment.