Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: improve historical descriptor look up efficiency #71239

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6753,12 +6753,19 @@ func TestPaginatedBackupTenant(t *testing.T) {
return fmt.Sprintf("%v%s", span.String(), spanStr)
}

// Check if export request is from a lease for a descriptor to avoid picking
// up on wrong export requests
isLeasingExportRequest := func(r *roachpb.ExportRequest) bool {
_, tenantID, _ := keys.DecodeTenantPrefix(r.Key)
codec := keys.MakeSQLCodec(tenantID)
return bytes.HasPrefix(r.Key, codec.DescMetadataPrefix()) &&
r.EndKey.Equal(r.Key.PrefixEnd())
}
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, request roachpb.BatchRequest) *roachpb.Error {
for _, ru := range request.Requests {
switch ru.GetInner().(type) {
case *roachpb.ExportRequest:
exportRequest := ru.GetInner().(*roachpb.ExportRequest)
if exportRequest, ok := ru.GetInner().(*roachpb.ExportRequest); ok &&
!isLeasingExportRequest(exportRequest) {
exportRequestSpans = append(
exportRequestSpans,
requestSpanStr(roachpb.Span{Key: exportRequest.Key, EndKey: exportRequest.EndKey}, exportRequest.ResumeKeyTS),
Expand All @@ -6769,9 +6776,9 @@ func TestPaginatedBackupTenant(t *testing.T) {
return nil
},
TestingResponseFilter: func(ctx context.Context, ba roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error {
for _, ru := range br.Responses {
switch ru.GetInner().(type) {
case *roachpb.ExportResponse:
for i, ru := range br.Responses {
if exportRequest, ok := ba.Requests[i].GetInner().(*roachpb.ExportRequest); ok &&
!isLeasingExportRequest(exportRequest) {
exportResponse := ru.GetInner().(*roachpb.ExportResponse)
// Every ExportResponse should have a single SST when running backup
// within a tenant.
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/cdctest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ go_library(
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/roachpb:with-mocks",
"//pkg/sql",
"//pkg/sql/sem/tree",
"//pkg/testutils/serverutils",
"//pkg/util/fsm",
"//pkg/util/hlc",
Expand Down Expand Up @@ -45,7 +45,7 @@ go_test(
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/sem/tree",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/cdctest/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -675,14 +675,14 @@ func ParseJSONValueTimestamps(v []byte) (updated, resolved hlc.Timestamp, err er
}
if valueRaw.Updated != `` {
var err error
updated, err = sql.ParseHLC(valueRaw.Updated)
updated, err = tree.ParseHLC(valueRaw.Updated)
if err != nil {
return hlc.Timestamp{}, hlc.Timestamp{}, err
}
}
if valueRaw.Resolved != `` {
var err error
resolved, err = sql.ParseHLC(valueRaw.Resolved)
resolved, err = tree.ParseHLC(valueRaw.Resolved)
if err != nil {
return hlc.Timestamp{}, hlc.Timestamp{}, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/cdctest/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestBeforeAfterValidator(t *testing.T) {
ts := make([]hlc.Timestamp, len(tsRaw))
for i := range tsRaw {
var err error
ts[i], err = sql.ParseHLC(tsRaw[i])
ts[i], err = tree.ParseHLC(tsRaw[i])
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -280,7 +280,7 @@ func TestFingerprintValidator(t *testing.T) {
ts := make([]hlc.Timestamp, len(tsRaw))
for i := range tsRaw {
var err error
ts[i], err = sql.ParseHLC(tsRaw[i])
ts[i], err = tree.ParseHLC(tsRaw[i])
if err != nil {
t.Fatal(err)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_rangefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -873,7 +873,7 @@ func TestReplicaRangefeedPushesTransactions(t *testing.T) {
// if it ever gets pushed.
var ts2Str string
require.NoError(t, tx1.QueryRowContext(ctx, "SELECT cluster_logical_timestamp()").Scan(&ts2Str))
ts2, err := sql.ParseHLC(ts2Str)
ts2, err := tree.ParseHLC(ts2Str)
require.NoError(t, err)

// Wait for the RangeFeed checkpoint on each RangeFeed to exceed this timestamp.
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/catalog/lease/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catalogkv",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/nstree",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/storage",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/log",
Expand Down
202 changes: 151 additions & 51 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
kvstorage "github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
Expand Down Expand Up @@ -183,76 +185,174 @@ type historicalDescriptor struct {
expiration hlc.Timestamp // ModificationTime of the next descriptor
}

// Read an older descriptor version for the particular timestamp
// from the store. We unfortunately need to read more than one descriptor
// version just so that we can set the expiration time on the descriptor
// properly.
// Retrieves historical descriptors of given id within the lower and upper bound
// timestamp from the MVCC key range in decreasing modification time order. Any
// descriptor versions that were modified in the range [lower, upper) will be
// retrieved through an export request. A lower bound of an empty timestamp
// hlc.Timestamp{} will result in an error.
//
// TODO(vivek): Future work:
// 1. Read multiple versions of a descriptor through one kv call.
// 2. Translate multiple simultaneous calls to this method into a single call
// In the following scenario v4 is our oldest active lease
// [v1@t1 ][v2@t3 ][v3@t5 ][v4@t7
// [start end]
// getDescriptorsFromStoreForInterval(..., start, end) will get back:
// [v3, v2] (reverse order)
//
// Note that this does not necessarily retrieve a descriptor version that was
// alive at the lower bound timestamp.
func getDescriptorsFromStoreForInterval(
ctx context.Context,
db *kv.DB,
codec keys.SQLCodec,
id descpb.ID,
lowerBound, upperBound hlc.Timestamp,
) ([]historicalDescriptor, error) {
// Ensure lower bound is not an empty timestamp (now).
if lowerBound.Logical == 0 && lowerBound.WallTime == 0 {
return nil, errors.New("Lower bound for export request cannot be 0")
}

// Create an export request (1 kv call) for all descriptors for given
// descriptor ID written during the interval [timestamp, endTimestamp).
batchRequestHeader := roachpb.Header{}
if upperBound.WallTime != 0 {
batchRequestHeader = roachpb.Header{Timestamp: upperBound.Prev()}
}
descriptorKey := catalogkeys.MakeDescMetadataKey(codec, id)
requestHeader := roachpb.RequestHeader{
Key: descriptorKey,
EndKey: descriptorKey.PrefixEnd(),
}
req := &roachpb.ExportRequest{
RequestHeader: requestHeader,
StartTime: lowerBound.Prev(),
MVCCFilter: roachpb.MVCCFilter_All,
ReturnSST: true,
}

// Export request returns descriptors in decreasing modification time.
res, pErr := kv.SendWrappedWith(ctx, db.NonTransactionalSender(), batchRequestHeader, req)
if pErr != nil {
return nil, errors.Wrapf(pErr.GoError(), "error in retrieving descs between %s, %s",
lowerBound, upperBound)
}

// Unmarshal key span retrieved from export request to construct historical descs.
var descriptorsRead []historicalDescriptor
// Keep track of the most recently processed descriptor's modification time to
// set as the expiration for the next descriptor to process. Recall we process
// descriptors in decreasing modification time.
subsequentModificationTime := upperBound
for _, file := range res.(*roachpb.ExportResponse).Files {
if err := func() error {
it, err := kvstorage.NewMemSSTIterator(file.SST, false /* verify */)
if err != nil {
return err
}
defer it.Close()

// Convert each MVCC key value pair corresponding to the specified
// descriptor ID.
for it.SeekGE(kvstorage.NilKey); ; it.Next() {
if ok, err := it.Valid(); err != nil {
return err
} else if !ok {
return nil
}

// Decode key and value of descriptor.
k := it.UnsafeKey()
descContent := it.UnsafeValue()
if descContent == nil {
return errors.Wrapf(errors.New("unsafe value error"), "error "+
"extracting raw bytes of descriptor with key %s modified between "+
"%s, %s", k.String(), k.Timestamp, subsequentModificationTime)
}

// Construct a plain descriptor.
value := roachpb.Value{RawBytes: descContent}
var desc descpb.Descriptor
if err := value.GetProto(&desc); err != nil {
return err
}
descBuilder := catalogkv.NewBuilderWithMVCCTimestamp(&desc, k.Timestamp)

// Construct a historical descriptor with expiration.
histDesc := historicalDescriptor{
desc: descBuilder.BuildImmutable(),
expiration: subsequentModificationTime,
}
descriptorsRead = append(descriptorsRead, histDesc)

// Update the expiration time for next descriptor.
subsequentModificationTime = k.Timestamp
}
}(); err != nil {
return nil, err
}
}
return descriptorsRead, nil
}

// Read older descriptor versions for the particular timestamp from store
// through an ExportRequest. The ExportRequest queries the key span for versions
// in range [timestamp, earliest modification time in memory) or [timestamp:] if
// there are no active leases in memory. This is followed by a call to
// getForExpiration in case the ExportRequest doesn't grab the earliest
// descriptor version we are interested in, resulting at most 2 KV calls.
//
// TODO(vivek, james): Future work:
// 1. Translate multiple simultaneous calls to this method into a single call
// as is done for acquireNodeLease().
// 3. Figure out a sane policy on when these descriptors should be purged.
// 2. Figure out a sane policy on when these descriptors should be purged.
// They are currently purged in PurgeOldVersions.
func (m *Manager) readOlderVersionForTimestamp(
ctx context.Context, id descpb.ID, timestamp hlc.Timestamp,
) ([]historicalDescriptor, error) {
expiration, done := func() (hlc.Timestamp, bool) {
t := m.findDescriptorState(id, false /* create */)
// Retrieve the endTimestamp for our query, which will be the modification
// time of the first descriptor in the manager's active set.
t := m.findDescriptorState(id, false /*create*/)
endTimestamp := func() hlc.Timestamp {
t.mu.Lock()
defer t.mu.Unlock()
afterIdx := 0
// Walk back the versions to find one that is valid for the timestamp.
for i := len(t.mu.active.data) - 1; i >= 0; i-- {
// Check to see if the ModificationTime is valid.
if desc := t.mu.active.data[i]; desc.GetModificationTime().LessEq(timestamp) {
if expiration := desc.getExpiration(); timestamp.Less(expiration) {
// Existing valid descriptor version.
return expiration, true
}
// We need a version after data[i], but before data[i+1].
// We could very well use the timestamp to read the
// descriptor, but unfortunately we will not be able to assign
// it a proper expiration time. Therefore, we read
// descriptor versions one by one from afterIdx back into the
// past until we find a valid one.
afterIdx = i + 1
break
}
if len(t.mu.active.data) == 0 {
return hlc.Timestamp{}
}
return t.mu.active.data[0].GetModificationTime()
}()

if afterIdx == len(t.mu.active.data) {
return hlc.Timestamp{}, true
}
// Retrieve descriptors in range [timestamp, endTimestamp) in decreasing
// modification time order.
descs, err := getDescriptorsFromStoreForInterval(ctx, m.DB(), m.Codec(), id, timestamp, endTimestamp)
if err != nil {
return nil, err
}

// Read descriptor versions one by one into the past until we
// find a valid one. Every version is assigned an expiration time that
// is the ModificationTime of the previous one read.
return t.mu.active.data[afterIdx].GetModificationTime(), false
}()
if done {
return nil, nil
// In the case where the descriptor we're looking for is modified before the
// input timestamp, we get the descriptor before the earliest descriptor we
// have from either in memory or from the call to
// getDescriptorsFromStoreForInterval.
var earliestModificationTime hlc.Timestamp
if len(descs) == 0 {
earliestModificationTime = endTimestamp
} else {
earliestModificationTime = descs[len(descs)-1].desc.GetModificationTime()
}

// Read descriptors from the store.
var versions []historicalDescriptor
for {
desc, err := m.storage.getForExpiration(ctx, expiration, id)
// Unless the timestamp is exactly at the earliest modification time from
// ExportRequest, we'll invoke another call to retrieve the descriptor with
// modification time prior to the timestamp.
if timestamp.Less(earliestModificationTime) {
desc, err := m.storage.getForExpiration(ctx, earliestModificationTime, id)
if err != nil {
return nil, err
}
versions = append(versions, historicalDescriptor{
descs = append(descs, historicalDescriptor{
desc: desc,
expiration: expiration,
expiration: earliestModificationTime,
})
if desc.GetModificationTime().LessEq(timestamp) {
break
}
// Set the expiration time for the next descriptor.
expiration = desc.GetModificationTime()
}

return versions, nil
return descs, nil
}

// Insert descriptor versions. The versions provided are not in
Expand Down Expand Up @@ -773,7 +873,7 @@ type LeasedDescriptor interface {
}

// Acquire acquires a read lease for the specified descriptor ID valid for
// the timestamp. It returns the descriptor and a expiration time.
// the timestamp. It returns the descriptor and an expiration time.
// A transaction using this descriptor must ensure that its
// commit-timestamp < expiration-time. Care must be taken to not modify
// the returned descriptor.
Expand Down
Loading