Skip to content

Commit

Permalink
sql: improve historical descriptor look up efficiency
Browse files Browse the repository at this point in the history
Fixes #70692. The existing implementation for looking up old
historical descriptors required multiple round trips to storage.
This improvement requires only 1, at most 2, KV calls to storage
by using a single ExportRequest.

Release note (performance improvement): improve efficiency of
looking up old historical descriptors.
  • Loading branch information
jameswsj10 committed Oct 27, 2021
1 parent 8f546a6 commit e96ddba
Show file tree
Hide file tree
Showing 13 changed files with 362 additions and 166 deletions.
13 changes: 10 additions & 3 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6753,12 +6753,19 @@ func TestPaginatedBackupTenant(t *testing.T) {
return fmt.Sprintf("%v%s", span.String(), spanStr)
}

// Check if export request is from a lease for a descriptor to avoid picking
// up on wrong export requests
isLeasingExportRequest := func(r *roachpb.ExportRequest) bool {
_, tenantID, _ := keys.DecodeTenantPrefix(r.Key)
codec := keys.MakeSQLCodec(tenantID)
return bytes.HasPrefix(r.Key, codec.DescMetadataPrefix()) &&
r.EndKey.Equal(r.Key.PrefixEnd())
}
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, request roachpb.BatchRequest) *roachpb.Error {
for _, ru := range request.Requests {
switch ru.GetInner().(type) {
case *roachpb.ExportRequest:
exportRequest := ru.GetInner().(*roachpb.ExportRequest)
if exportRequest, ok := ru.GetInner().(*roachpb.ExportRequest); ok &&
!isLeasingExportRequest(exportRequest) {
exportRequestSpans = append(
exportRequestSpans,
requestSpanStr(roachpb.Span{Key: exportRequest.Key, EndKey: exportRequest.EndKey}, exportRequest.ResumeKeyTS),
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ go_library(
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/roachpb:with-mocks",
"//pkg/sql",
"//pkg/sql/sem/tree",
"//pkg/testutils/serverutils",
"//pkg/util/fsm",
"//pkg/util/hlc",
Expand Down Expand Up @@ -45,7 +45,7 @@ go_test(
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/sem/tree",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -675,14 +675,14 @@ func ParseJSONValueTimestamps(v []byte) (updated, resolved hlc.Timestamp, err er
}
if valueRaw.Updated != `` {
var err error
updated, err = sql.ParseHLC(valueRaw.Updated)
updated, err = tree.ParseHLC(valueRaw.Updated)
if err != nil {
return hlc.Timestamp{}, hlc.Timestamp{}, err
}
}
if valueRaw.Resolved != `` {
var err error
resolved, err = sql.ParseHLC(valueRaw.Resolved)
resolved, err = tree.ParseHLC(valueRaw.Resolved)
if err != nil {
return hlc.Timestamp{}, hlc.Timestamp{}, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/cdctest/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestBeforeAfterValidator(t *testing.T) {
ts := make([]hlc.Timestamp, len(tsRaw))
for i := range tsRaw {
var err error
ts[i], err = sql.ParseHLC(tsRaw[i])
ts[i], err = tree.ParseHLC(tsRaw[i])
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestFingerprintValidator(t *testing.T) {
ts := make([]hlc.Timestamp, len(tsRaw))
for i := range tsRaw {
var err error
ts[i], err = sql.ParseHLC(tsRaw[i])
ts[i], err = tree.ParseHLC(tsRaw[i])
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -873,7 +873,7 @@ func TestReplicaRangefeedPushesTransactions(t *testing.T) {
// if it ever gets pushed.
var ts2Str string
require.NoError(t, tx1.QueryRowContext(ctx, "SELECT cluster_logical_timestamp()").Scan(&ts2Str))
ts2, err := sql.ParseHLC(ts2Str)
ts2, err := tree.ParseHLC(ts2Str)
require.NoError(t, err)

// Wait for the RangeFeed checkpoint on each RangeFeed to exceed this timestamp.
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/catalog/lease/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catalogkv",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/nstree",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/storage",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/log",
Expand Down
196 changes: 144 additions & 52 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
kvstorage "github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
Expand Down Expand Up @@ -183,76 +185,166 @@ type historicalDescriptor struct {
expiration hlc.Timestamp // ModificationTime of the next descriptor
}

// Read an older descriptor version for the particular timestamp
// from the store. We unfortunately need to read more than one descriptor
// version just so that we can set the expiration time on the descriptor
// properly.
// Retrieves historical descriptors of given id within the lower and upper bound
// timestamp from the MVCC key range in decreasing modification time order. Any
// descriptor versions that were modified in the range [lower, upper) will be
// retrieved through an export request.
//
// TODO(vivek): Future work:
// 1. Read multiple versions of a descriptor through one kv call.
// 2. Translate multiple simultaneous calls to this method into a single call
// In the following scenario v4 is our oldest active lease
// [v1@t1 ][v2@t3 ][v3@t5 ][v4@t7
// [start end]
// getDescriptorsFromStoreForInterval(..., start, end) will get back:
// [v3, v2] (reverse order)
//
// Note that this does not necessarily retrieve a descriptor version that was
// alive at the lower bound timestamp.
func getDescriptorsFromStoreForInterval(
ctx context.Context,
db *kv.DB,
codec keys.SQLCodec,
id descpb.ID,
lowerBound, upperBound hlc.Timestamp,
) ([]historicalDescriptor, error) {
// Create an export request (1 kv call) for all descriptors for given
// descriptor ID written during the interval [timestamp, endTimestamp).
batchRequestHeader := roachpb.Header{}
if upperBound.WallTime != 0 {
batchRequestHeader = roachpb.Header{Timestamp: upperBound.Prev()}
}
descriptorKey := catalogkeys.MakeDescMetadataKey(codec, id)
requestHeader := roachpb.RequestHeader{
Key: descriptorKey,
EndKey: descriptorKey.PrefixEnd(),
}
req := &roachpb.ExportRequest{
RequestHeader: requestHeader,
StartTime: lowerBound.Prev(),
MVCCFilter: roachpb.MVCCFilter_All,
ReturnSST: true,
}

// Export request returns descriptors in decreasing modification time.
res, pErr := kv.SendWrappedWith(ctx, db.NonTransactionalSender(), batchRequestHeader, req)
if pErr != nil {
return nil, errors.Wrapf(pErr.GoError(), "error in retrieving descs between %s, %s",
lowerBound, upperBound)
}

// Unmarshal key span retrieved from export request to construct historical descs.
var descriptorsRead []historicalDescriptor
// Keep track of the most recently processed descriptor's modification time to
// set as the expiration for the next descriptor to process. Recall we process
// descriptors in decreasing modification time.
subsequentModificationTime := upperBound
for _, file := range res.(*roachpb.ExportResponse).Files {
if err := func() error {
it, err := kvstorage.NewMemSSTIterator(file.SST, false /* verify */)
if err != nil {
return err
}
defer it.Close()

// Convert each MVCC key value pair corresponding to the specified
// descriptor ID.
for it.SeekGE(kvstorage.NilKey); ; it.Next() {
if ok, err := it.Valid(); err != nil {
return err
} else if !ok {
return nil
}

// Decode key and value of descriptor.
k := it.UnsafeKey()
descContent := it.UnsafeValue()
if descContent == nil {
return errors.Wrapf(errors.New("unsafe value error"), "error "+
"extracting raw bytes of descriptor with key %s modified between "+
"%s, %s", k.String(), k.Timestamp, subsequentModificationTime)
}

// Construct a plain descriptor.
value := roachpb.Value{RawBytes: descContent}
var desc descpb.Descriptor
if err := value.GetProto(&desc); err != nil {
return err
}
descBuilder := catalogkv.NewBuilderWithMVCCTimestamp(&desc, k.Timestamp)

// Construct a historical descriptor with expiration.
histDesc := historicalDescriptor{
desc: descBuilder.BuildImmutable(),
expiration: subsequentModificationTime,
}
descriptorsRead = append(descriptorsRead, histDesc)

// Update the expiration time for next descriptor.
subsequentModificationTime = k.Timestamp
}
}(); err != nil {
return nil, err
}
}
return descriptorsRead, nil
}

// Read older descriptor versions for the particular timestamp from store
// through an ExportRequest. The ExportRequest queries the key span for versions
// in range [timestamp, earliest modification time in memory) or [timestamp:] if
// there are no active leases in memory. This is followed by a call to
// getForExpiration in case the ExportRequest doesn't grab the earliest
// descriptor version we are interested in, resulting at most 2 KV calls.
//
// TODO(vivek, james): Future work:
// 1. Translate multiple simultaneous calls to this method into a single call
// as is done for acquireNodeLease().
// 3. Figure out a sane policy on when these descriptors should be purged.
// 2. Figure out a sane policy on when these descriptors should be purged.
// They are currently purged in PurgeOldVersions.
func (m *Manager) readOlderVersionForTimestamp(
ctx context.Context, id descpb.ID, timestamp hlc.Timestamp,
) ([]historicalDescriptor, error) {
expiration, done := func() (hlc.Timestamp, bool) {
t := m.findDescriptorState(id, false /* create */)
// Retrieve the endTimestamp for our query, which will be the modification
// time of the first descriptor in the manager's active set.
t := m.findDescriptorState(id, false /*create*/)
endTimestamp := func() hlc.Timestamp {
t.mu.Lock()
defer t.mu.Unlock()
afterIdx := 0
// Walk back the versions to find one that is valid for the timestamp.
for i := len(t.mu.active.data) - 1; i >= 0; i-- {
// Check to see if the ModificationTime is valid.
if desc := t.mu.active.data[i]; desc.GetModificationTime().LessEq(timestamp) {
if expiration := desc.getExpiration(); timestamp.Less(expiration) {
// Existing valid descriptor version.
return expiration, true
}
// We need a version after data[i], but before data[i+1].
// We could very well use the timestamp to read the
// descriptor, but unfortunately we will not be able to assign
// it a proper expiration time. Therefore, we read
// descriptor versions one by one from afterIdx back into the
// past until we find a valid one.
afterIdx = i + 1
break
}
if len(t.mu.active.data) == 0 {
return hlc.Timestamp{}
}

if afterIdx == len(t.mu.active.data) {
return hlc.Timestamp{}, true
}

// Read descriptor versions one by one into the past until we
// find a valid one. Every version is assigned an expiration time that
// is the ModificationTime of the previous one read.
return t.mu.active.data[afterIdx].GetModificationTime(), false
return t.mu.active.data[0].GetModificationTime()
}()
if done {
return nil, nil

// Retrieve descriptors in range [timestamp, endTimestamp) in decreasing
// modification time order.
descs, err := getDescriptorsFromStoreForInterval(ctx, m.DB(), m.Codec(), id, timestamp, endTimestamp)
if err != nil {
return nil, err
}
if len(descs) == 0 {
return descs, nil
}

// Read descriptors from the store.
var versions []historicalDescriptor
for {
desc, err := m.storage.getForExpiration(ctx, expiration, id)
// In the case where the descriptor we're looking for is modified before the
// input timestamp, we get the descriptor before the earliest descriptor
// retrieved from getDescriptorsFromStoreForInterval by making another KV
// call.
earliestModificationTime := descs[len(descs)-1].desc.GetModificationTime()

// Unless the timestamp is exactly at the earliest modification time from
// ExportRequest, we'll invoke another call to retrieve the descriptor with
// modification time prior to the timestamp.
if timestamp.Less(earliestModificationTime) {
desc, err := m.storage.getForExpiration(ctx, earliestModificationTime, id)
if err != nil {
return nil, err
}
versions = append(versions, historicalDescriptor{
descs = append(descs, historicalDescriptor{
desc: desc,
expiration: expiration,
expiration: earliestModificationTime,
})
if desc.GetModificationTime().LessEq(timestamp) {
break
}
// Set the expiration time for the next descriptor.
expiration = desc.GetModificationTime()
}

return versions, nil
return descs, nil
}

// Insert descriptor versions. The versions provided are not in
Expand Down Expand Up @@ -773,7 +865,7 @@ type LeasedDescriptor interface {
}

// Acquire acquires a read lease for the specified descriptor ID valid for
// the timestamp. It returns the descriptor and a expiration time.
// the timestamp. It returns the descriptor and an expiration time.
// A transaction using this descriptor must ensure that its
// commit-timestamp < expiration-time. Care must be taken to not modify
// the returned descriptor.
Expand Down
Loading

0 comments on commit e96ddba

Please sign in to comment.