Skip to content

Commit

Permalink
sql: improve historical descriptor look up efficiency
Browse files Browse the repository at this point in the history
Fixes #70692. The existing implementation for looking up old
historical descriptors required multiple round trips to storage.
This improvement requires only 1, at most 2, KV calls to storage
by using a single ExportRequest.

Release note (performance improvement): improve efficiency of
looking up old historical descriptors.
  • Loading branch information
jameswsj10 committed Oct 15, 2021
1 parent 8f546a6 commit 3b7fa11
Show file tree
Hide file tree
Showing 5 changed files with 264 additions and 130 deletions.
13 changes: 10 additions & 3 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6753,12 +6753,19 @@ func TestPaginatedBackupTenant(t *testing.T) {
return fmt.Sprintf("%v%s", span.String(), spanStr)
}

// Check if export request is from a lease for a descriptor to avoid picking
// up on wrong export requests
isLeasingExportRequest := func(r *roachpb.ExportRequest) bool {
_, tenantID, _ := keys.DecodeTenantPrefix(r.Key)
codec := keys.MakeSQLCodec(tenantID)
return bytes.HasPrefix(r.Key, codec.DescMetadataPrefix()) &&
r.EndKey.Equal(r.Key.PrefixEnd())
}
params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, request roachpb.BatchRequest) *roachpb.Error {
for _, ru := range request.Requests {
switch ru.GetInner().(type) {
case *roachpb.ExportRequest:
exportRequest := ru.GetInner().(*roachpb.ExportRequest)
if exportRequest, ok := ru.GetInner().(*roachpb.ExportRequest); ok &&
!isLeasingExportRequest(exportRequest) {
exportRequestSpans = append(
exportRequestSpans,
requestSpanStr(roachpb.Span{Key: exportRequest.Key, EndKey: exportRequest.EndKey}, exportRequest.ResumeKeyTS),
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/catalog/lease/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catalogkv",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/nstree",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/storage",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/log",
Expand Down Expand Up @@ -98,6 +100,7 @@ go_test(
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/uuid",
"@com_github_cockroachdb_apd_v2//:apd",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
Expand Down
174 changes: 125 additions & 49 deletions pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
kvstorage "github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
Expand Down Expand Up @@ -183,76 +185,150 @@ type historicalDescriptor struct {
expiration hlc.Timestamp // ModificationTime of the next descriptor
}

// Read an older descriptor version for the particular timestamp
// from the store. We unfortunately need to read more than one descriptor
// version just so that we can set the expiration time on the descriptor
// properly.
// Retrieves historical descriptors of given id within the lower and upper bound
// timestamp from the MVCC key range. Any descriptor versions that were modified
// after the lower bound timestamp and before the upper bound timestamp will be
// retrieved through an export request.
//
// Note that this does not necessarily retrieve a descriptor version that was
// alive at the lower bound timestamp.
func getDescriptorsFromStoreForInterval(
ctx context.Context,
db *kv.DB,
codec keys.SQLCodec,
id descpb.ID,
lowerBound, upperBound hlc.Timestamp,
) ([]historicalDescriptor, error) {
// Create an export request (1 kv call) for all descriptors for given
// descriptor ID written during the interval [timestamp, endTimestamp].
batchRequestHeader := roachpb.Header{Timestamp: upperBound}
descriptorKey := catalogkeys.MakeDescMetadataKey(codec, id)
requestHeader := roachpb.RequestHeader{
Key: descriptorKey,
EndKey: descriptorKey.PrefixEnd(),
}
req := &roachpb.ExportRequest{
RequestHeader: requestHeader,
StartTime: lowerBound,
MVCCFilter: roachpb.MVCCFilter_All,
ReturnSST: true,
}

// Export request returns descriptors in decreasing modification time.
res, pErr := kv.SendWrappedWith(ctx, db.NonTransactionalSender(), batchRequestHeader, req)
if pErr != nil {
return nil, errors.Wrapf(pErr.GoError(), "error in retrieving descs between %s, %s",
lowerBound, upperBound)
}

// Unmarshal key span retrieved from export request to construct historical descs.
var descriptorsRead []historicalDescriptor
subsequentModificationTime := upperBound
for _, file := range res.(*roachpb.ExportResponse).Files {
if err := func() error {
it, err := kvstorage.NewMemSSTIterator(file.SST, false /* verify */)
if err != nil {
return err
}
defer it.Close()

// Convert each MVCC key value pair corresponding to the specified
// descriptor ID.
for it.SeekGE(kvstorage.NilKey); ; it.Next() {
if ok, err := it.Valid(); err != nil {
return err
} else if !ok {
return nil
}

// Decode key and value of descriptor.
k := it.UnsafeKey()
descContent := it.UnsafeValue()
if descContent == nil {
return errors.Wrapf(err, "error extracting raw bytes of descriptor with "+
"key %s modified between %s, %s", k.String(), k.Timestamp, subsequentModificationTime)
}

// Construct a plain descriptor.
value := roachpb.Value{RawBytes: descContent}
var desc descpb.Descriptor
if err := value.GetProto(&desc); err != nil {
return err
}
descBuilder := catalogkv.NewBuilderWithMVCCTimestamp(&desc, k.Timestamp)

// Construct a historical descriptor with expiration.
histDesc := historicalDescriptor{
desc: descBuilder.BuildImmutable(),
expiration: subsequentModificationTime,
}
descriptorsRead = append(descriptorsRead, histDesc)

// update the expiration time for next iteration.
subsequentModificationTime = k.Timestamp
}
}(); err != nil {
return nil, err
}
}
return descriptorsRead, nil
}

// Read an older descriptor version for the particular timestamp from the store
// with at most 2 KV calls. We unfortunately need to read more than one
// descriptor version just so that we can set the expiration time on the
// descriptor properly.
//
// TODO(vivek): Future work:
// 1. Read multiple versions of a descriptor through one kv call.
// 2. Translate multiple simultaneous calls to this method into a single call
// as is done for acquireNodeLease().
// 3. Figure out a sane policy on when these descriptors should be purged.
// They are currently purged in PurgeOldVersions.
func (m *Manager) readOlderVersionForTimestamp(
ctx context.Context, id descpb.ID, timestamp hlc.Timestamp,
) ([]historicalDescriptor, error) {
expiration, done := func() (hlc.Timestamp, bool) {
t := m.findDescriptorState(id, false /* create */)
// Retrieve the endTimestamp for our query, which will be the modification
// time of the first descriptor in the manager's active set.
t := m.findDescriptorState(id, false /*create*/)
endTimestamp := func() hlc.Timestamp {
t.mu.Lock()
defer t.mu.Unlock()
afterIdx := 0
// Walk back the versions to find one that is valid for the timestamp.
for i := len(t.mu.active.data) - 1; i >= 0; i-- {
// Check to see if the ModificationTime is valid.
if desc := t.mu.active.data[i]; desc.GetModificationTime().LessEq(timestamp) {
if expiration := desc.getExpiration(); timestamp.Less(expiration) {
// Existing valid descriptor version.
return expiration, true
}
// We need a version after data[i], but before data[i+1].
// We could very well use the timestamp to read the
// descriptor, but unfortunately we will not be able to assign
// it a proper expiration time. Therefore, we read
// descriptor versions one by one from afterIdx back into the
// past until we find a valid one.
afterIdx = i + 1
break
}
if len(t.mu.active.data) == 0 {
return hlc.Timestamp{}
}

if afterIdx == len(t.mu.active.data) {
return hlc.Timestamp{}, true
}

// Read descriptor versions one by one into the past until we
// find a valid one. Every version is assigned an expiration time that
// is the ModificationTime of the previous one read.
return t.mu.active.data[afterIdx].GetModificationTime(), false
return t.mu.active.data[0].GetModificationTime()
}()
if done {
return nil, nil

// Make an export request for descriptors between the start and end
// timestamps, returning descriptors in decreasing modification time order.
//
// In the following scenario v4 is our oldest active lease
// [v1@t1 ][v2@t3 ][v3@t5 ][v4@t7
// [start end]
// getDescriptorsFromStoreForInterval(..., start, end) will get back:
// [v3, v2] (reverse order)
descs, err := getDescriptorsFromStoreForInterval(ctx, m.DB(), m.Codec(), id, timestamp, endTimestamp)
if err != nil {
return nil, err
}

// Read descriptors from the store.
var versions []historicalDescriptor
for {
desc, err := m.storage.getForExpiration(ctx, expiration, id)
// In the case where the descriptor we're looking for is modified before the
// earliest retrieved timestamp, we get the descriptor before the earliest
// descriptor retrieved from getDescriptorsFromStoreForInterval by making
// another KV call.
earliestModificationTime := descs[len(descs)-1].desc.GetModificationTime()
if timestamp.Less(earliestModificationTime) {
desc, err := m.storage.getForExpiration(ctx, earliestModificationTime, id)
if err != nil {
return nil, err
}
versions = append(versions, historicalDescriptor{
descs = append(descs, historicalDescriptor{
desc: desc,
expiration: expiration,
expiration: earliestModificationTime,
})
if desc.GetModificationTime().LessEq(timestamp) {
break
}
// Set the expiration time for the next descriptor.
expiration = desc.GetModificationTime()
}

return versions, nil
return descs, nil
}

// Insert descriptor versions. The versions provided are not in
Expand Down Expand Up @@ -773,7 +849,7 @@ type LeasedDescriptor interface {
}

// Acquire acquires a read lease for the specified descriptor ID valid for
// the timestamp. It returns the descriptor and a expiration time.
// the timestamp. It returns the descriptor and an expiration time.
// A transaction using this descriptor must ensure that its
// commit-timestamp < expiration-time. Care must be taken to not modify
// the returned descriptor.
Expand Down
90 changes: 90 additions & 0 deletions pkg/sql/catalog/lease/lease_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,23 @@ import (
"testing"
"time"

"github.com/cockroachdb/apd/v2"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/stretchr/testify/require"
)

func TestTableSet(t *testing.T) {
Expand Down Expand Up @@ -1051,3 +1055,89 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) {
})
}
}

// Tests retrieving older versions within a given start and end timestamp of a
// table descriptor from store through an ExportRequest.
func TestHistoricalExportRequestForTimeRange(t *testing.T) {
defer leaktest.AfterTest(t)()
var stopper *stop.Stopper
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
stopper = s.Stopper()
ctx := context.Background()
defer stopper.Stop(ctx)

parseHLC := func(ts string) (hlc.Timestamp, error) {
dec, _, err := apd.NewFromString(ts)
if err != nil {
return hlc.Timestamp{}, err
}
return tree.DecimalToHLC(dec)
}

// Create a schema, create table, alter table a few times to get some history
// of tables while keeping checkpoints (timestamps), and call export request
// to see if contents are matching as expected between specific time
// intervals.
sqlDB.Exec("CREATE SCHEMA sc")
sqlDB.Exec("CREATE TABLE sc.foo (i INT PRIMARY KEY)")
sqlDB.Exec("INSERT INTO sc.foo VALUES (1)")

var ts1Str string
sqlDB.QueryRow("SELECT cluster_logical_timestamp()").Scan(&ts1Str)
ts1, err := parseHLC(ts1Str)
require.NoError(t, err)

sqlDB.Exec("ALTER SCHEMA sc RENAME TO sc2")
sqlDB.Exec("ALTER TABLE sc2.foo ADD COLUMN id UUID NOT NULL DEFAULT gen_random_uuid()")
sqlDB.Exec("ALTER TABLE sc2.foo RENAME COLUMN i TO former_id")
sqlDB.Exec("ALTER TABLE sc2.foo RENAME COLUMN id TO current_id")
sqlDB.Exec("CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')")
sqlDB.Exec("ALTER TYPE status DROP VALUE 'inactive'")
sqlDB.Exec("ALTER TYPE status DROP VALUE 'open'")

var ts2Str string
sqlDB.QueryRow("SELECT cluster_logical_timestamp()").Scan(&ts2Str)
ts2, err := parseHLC(ts2Str)
require.NoError(t, err)

// Store table descriptor ID
var tableID atomic.Value
storeID := func(val *atomic.Value, name string) {
var id descpb.ID
sqlDB.QueryRow(`SELECT id FROM system.namespace WHERE name = $1`, name).Scan(&id)
require.NotEqual(t, descpb.ID(0), id)
val.Store(id)
}
storeID(&tableID, "foo")

// Export Request for descriptor versions between ts1 and ts2. Waits for the
// most recent version with the name col and removes manager's active
// descriptorVersions before doing so for test purposes.
manager := s.LeaseManager().(*Manager)
descriptorID := tableID.Load().(descpb.ID)
_, err = manager.WaitForOneVersion(ctx, descriptorID, base.DefaultRetryOptions())
require.NoError(t, err)

manager.mu.Lock()
for _, mDesc := range manager.mu.descriptors {
mDesc.mu.Lock()
if mDesc.id == descriptorID {
mDesc.mu.active.data = nil
}
mDesc.mu.Unlock()
}
manager.mu.Unlock()

historicalDescs, err := getDescriptorsFromStoreForInterval(ctx, manager.DB(),
manager.Codec(), descriptorID, ts1, ts2)
require.NoError(t, err)

// Assert returned descriptors modification times are between ts1 and ts2 and the IDs match our query historicalDesc id
for _, historicalDesc := range historicalDescs {
modificationTime := historicalDesc.desc.GetModificationTime()
id := historicalDesc.desc.GetID()
require.Truef(t, ts1.Less(modificationTime), "ts1: %s, modification: %s", ts1.String(), modificationTime.String())
require.Truef(t, modificationTime.Less(ts2), "modification: %s, ts2: %s", modificationTime.String(), ts2.String())
require.Equalf(t, descriptorID, id, "(ID) Expected: %d, Got: %d", descriptorID, id)
}
}
Loading

0 comments on commit 3b7fa11

Please sign in to comment.