Skip to content

Commit

Permalink
sql: improving historical descriptor look up efficiency
Browse files Browse the repository at this point in the history
Fixes #70692. The existing implementation for looking up old
historical descriptors required multiple round trips to storage.
This improvement requires only 1, at most 2, KV calls to storage
by using a single ExportRequest.

Release note (performance improvement): reduce kv calls when
looking up old historical descriptors to 1 or at most 2.
  • Loading branch information
jameswsj10 committed Oct 12, 2021
1 parent 8f546a6 commit ac17112
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 122 deletions.
3 changes: 3 additions & 0 deletions pkg/sql/catalog/lease/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catalogkv",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/nstree",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/storage",
"//pkg/util/encoding",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/log",
Expand Down
159 changes: 113 additions & 46 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
kvstorage "github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
Expand Down Expand Up @@ -183,73 +186,137 @@ type historicalDescriptor struct {
expiration hlc.Timestamp // ModificationTime of the next descriptor
}

// Read an older descriptor version for the particular timestamp
// from the store. We unfortunately need to read more than one descriptor
// version just so that we can set the expiration time on the descriptor
// properly.
// Retrieves historical descriptors of given id within the lower and upper bound
// timestamp from the MVCC key range.
func (m *Manager) getDescriptorsFromStoreForInterval(
ctx context.Context, db *kv.DB, id descpb.ID, lowerBound, upperBound hlc.Timestamp,
) ([]catalog.Descriptor, error) {
// Create an export request (1 kv call) for all descriptors for given
// descriptor ID in range [timestamp, endTimestamp]
batchRequestHeader := roachpb.Header{Timestamp: upperBound}
descriptorKey := catalogkeys.MakeDescMetadataKey(m.Codec(), id)
requestHeader := roachpb.RequestHeader{
Key: descriptorKey,
EndKey: descriptorKey.PrefixEnd(),
}
req := &roachpb.ExportRequest{
RequestHeader: requestHeader,
StartTime: lowerBound,
MVCCFilter: roachpb.MVCCFilter_All,
ReturnSST: true,
}
res, pErr := kv.SendWrappedWith(ctx, db.NonTransactionalSender(), batchRequestHeader, req)
if pErr != nil {
err := pErr.GoError()
return nil, errors.Wrapf(err, "Error in retrieving latest table descs as of timestamp %s", lowerBound)
}

// unmarshal key span retrieved from export request to extract descriptors
var descriptorsRead []catalog.Descriptor
for _, file := range res.(*roachpb.ExportResponse).Files {
if err := func() error {
it, err := kvstorage.NewMemSSTIterator(file.SST, false /* verify */)
if err != nil {
return err
}
defer it.Close()

// Convert each MVCC key value pair corresponding to the specified
// descriptor ID
for it.SeekGE(kvstorage.NilKey); ; it.Next() {
if ok, err := it.Valid(); err != nil {
return err
} else if !ok {
return nil
}

// Extract MVCCKey for descriptor in file
k := it.UnsafeKey()
remaining, _, _, err := m.Codec().DecodeIndexPrefix(k.Key)
if err != nil {
return err
}
_, id, err := encoding.DecodeUvarintAscending(remaining)
if err != nil {
return err
}

// Decode ID and construct descriptor
unsafeValue := it.UnsafeValue()
if unsafeValue == nil {
name := fmt.Sprintf("desc(%d)", id)
return errors.Errorf("%v was dropped or truncated", name)
}
value := roachpb.Value{RawBytes: unsafeValue}
var desc descpb.Descriptor
if err = value.GetProto(&desc); err != nil {
return err
}
b := catalogkv.NewBuilderWithMVCCTimestamp(&desc, k.Timestamp)
descriptorsRead = append(descriptorsRead, b.BuildImmutable())
}
}(); err != nil {
return nil, err
}
}
return descriptorsRead, nil
}

// Read an older descriptor version for the particular timestamp from the store
// with at most 2 KV calls. We unfortunately need to read more than one
// descriptor version just so that we can set the expiration time on the
// descriptor properly.
//
// TODO(vivek): Future work:
// 1. Read multiple versions of a descriptor through one kv call.
// 2. Translate multiple simultaneous calls to this method into a single call
// as is done for acquireNodeLease().
// 3. Figure out a sane policy on when these descriptors should be purged.
// They are currently purged in PurgeOldVersions.
func (m *Manager) readOlderVersionForTimestamp(
ctx context.Context, id descpb.ID, timestamp hlc.Timestamp,
) ([]historicalDescriptor, error) {
expiration, done := func() (hlc.Timestamp, bool) {
t := m.findDescriptorState(id, false /* create */)
// Retrieve the endTimestamp for our query, which will be the modification
// time of the first descriptor in the manager's active set.
t := m.findDescriptorState(id, false /*create*/)
endTimestamp := func() hlc.Timestamp {
t.mu.Lock()
defer t.mu.Unlock()
afterIdx := 0
// Walk back the versions to find one that is valid for the timestamp.
for i := len(t.mu.active.data) - 1; i >= 0; i-- {
// Check to see if the ModificationTime is valid.
if desc := t.mu.active.data[i]; desc.GetModificationTime().LessEq(timestamp) {
if expiration := desc.getExpiration(); timestamp.Less(expiration) {
// Existing valid descriptor version.
return expiration, true
}
// We need a version after data[i], but before data[i+1].
// We could very well use the timestamp to read the
// descriptor, but unfortunately we will not be able to assign
// it a proper expiration time. Therefore, we read
// descriptor versions one by one from afterIdx back into the
// past until we find a valid one.
afterIdx = i + 1
break
}
if len(t.mu.active.data) == 0 {
return hlc.Timestamp{}
}

if afterIdx == len(t.mu.active.data) {
return hlc.Timestamp{}, true
}

// Read descriptor versions one by one into the past until we
// find a valid one. Every version is assigned an expiration time that
// is the ModificationTime of the previous one read.
return t.mu.active.data[afterIdx].GetModificationTime(), false
return t.mu.active.data[0].GetModificationTime()
}()
if done {
return nil, nil

// Make an export request for descriptors between the start and end
// timestamps, returning descriptors in decreasing modification time order.
descs, err := m.getDescriptorsFromStoreForInterval(ctx, m.DB(), id, timestamp, endTimestamp)
if err != nil {
return nil, err
}

// Read descriptors from the store.
// Iterate from the latest modification time from retrieved descriptors.
var versions []historicalDescriptor
for {
desc, err := m.storage.getForExpiration(ctx, expiration, id)
subsequentTimestamp := endTimestamp
for i := 0; i < len(descs); i++ {
versions = append(versions, historicalDescriptor{
desc: descs[i],
expiration: subsequentTimestamp,
})
subsequentTimestamp = descs[i].GetModificationTime()
}

// In the case where the descriptor we're looking for is modified before the
// earliest retrieved timestamp, we get the descriptor before the first
// descriptor by making another KV call.
if timestamp.Less(subsequentTimestamp) {
desc, err := m.storage.getForExpiration(ctx, subsequentTimestamp, id)
if err != nil {
return nil, err
}
versions = append(versions, historicalDescriptor{
desc: desc,
expiration: expiration,
expiration: subsequentTimestamp,
})
if desc.GetModificationTime().LessEq(timestamp) {
break
}
// Set the expiration time for the next descriptor.
expiration = desc.GetModificationTime()
}

return versions, nil
Expand Down Expand Up @@ -773,7 +840,7 @@ type LeasedDescriptor interface {
}

// Acquire acquires a read lease for the specified descriptor ID valid for
// the timestamp. It returns the descriptor and a expiration time.
// the timestamp. It returns the descriptor and an expiration time.
// A transaction using this descriptor must ensure that its
// commit-timestamp < expiration-time. Care must be taken to not modify
// the returned descriptor.
Expand Down
Loading

0 comments on commit ac17112

Please sign in to comment.