From ac17112a7e9b3ae86fd010ef83b99b7ffdfec25d Mon Sep 17 00:00:00 2001 From: jameswsj10 Date: Wed, 22 Sep 2021 10:54:59 -0700 Subject: [PATCH] sql: improving historical descriptor look up efficiency Fixes #70692. The existing implementation for looking up old historical descriptors required multiple round trips to storage. This improvement requires only 1, at most 2, KV calls to storage by using a single ExportRequest. Release note (performance improvement): reduce kv calls when looking up old historical descriptors to 1 or at most 2. --- pkg/sql/catalog/lease/BUILD.bazel | 3 + pkg/sql/catalog/lease/lease.go | 159 ++++++++++++++++++------- pkg/sql/catalog/lease/lease_test.go | 178 ++++++++++++++++------------ pkg/sql/catalog/lease/testutils.go | 19 +++ 4 files changed, 237 insertions(+), 122 deletions(-) diff --git a/pkg/sql/catalog/lease/BUILD.bazel b/pkg/sql/catalog/lease/BUILD.bazel index f76b46893634..7413a8eb4828 100644 --- a/pkg/sql/catalog/lease/BUILD.bazel +++ b/pkg/sql/catalog/lease/BUILD.bazel @@ -25,12 +25,15 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/catalog", + "//pkg/sql/catalog/catalogkeys", "//pkg/sql/catalog/catalogkv", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/nstree", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sqlutil", + "//pkg/storage", + "//pkg/util/encoding", "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/log", diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 7091cbdb08a9..455a5c808f87 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -27,11 +27,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + kvstorage "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" @@ -183,13 +186,88 @@ type historicalDescriptor struct { expiration hlc.Timestamp // ModificationTime of the next descriptor } -// Read an older descriptor version for the particular timestamp -// from the store. We unfortunately need to read more than one descriptor -// version just so that we can set the expiration time on the descriptor -// properly. +// Retrieves historical descriptors of given id within the lower and upper bound +// timestamp from the MVCC key range. +func (m *Manager) getDescriptorsFromStoreForInterval( + ctx context.Context, db *kv.DB, id descpb.ID, lowerBound, upperBound hlc.Timestamp, +) ([]catalog.Descriptor, error) { + // Create an export request (1 kv call) for all descriptors for given + // descriptor ID in range [timestamp, endTimestamp] + batchRequestHeader := roachpb.Header{Timestamp: upperBound} + descriptorKey := catalogkeys.MakeDescMetadataKey(m.Codec(), id) + requestHeader := roachpb.RequestHeader{ + Key: descriptorKey, + EndKey: descriptorKey.PrefixEnd(), + } + req := &roachpb.ExportRequest{ + RequestHeader: requestHeader, + StartTime: lowerBound, + MVCCFilter: roachpb.MVCCFilter_All, + ReturnSST: true, + } + res, pErr := kv.SendWrappedWith(ctx, db.NonTransactionalSender(), batchRequestHeader, req) + if pErr != nil { + err := pErr.GoError() + return nil, errors.Wrapf(err, "Error in retrieving latest table descs as of timestamp %s", lowerBound) + } + + // unmarshal key span retrieved from export request to extract descriptors + var descriptorsRead []catalog.Descriptor + for _, file := range res.(*roachpb.ExportResponse).Files { + if err := func() error { + it, err := kvstorage.NewMemSSTIterator(file.SST, false /* verify */) + if err != nil { + return err + } + defer it.Close() + + // Convert each MVCC key value pair corresponding to the specified + // descriptor ID + for it.SeekGE(kvstorage.NilKey); ; it.Next() { + if ok, err := it.Valid(); err != nil { + return err + } else if !ok { + return nil + } + + // Extract MVCCKey for descriptor in file + k := it.UnsafeKey() + remaining, _, _, err := m.Codec().DecodeIndexPrefix(k.Key) + if err != nil { + return err + } + _, id, err := encoding.DecodeUvarintAscending(remaining) + if err != nil { + return err + } + + // Decode ID and construct descriptor + unsafeValue := it.UnsafeValue() + if unsafeValue == nil { + name := fmt.Sprintf("desc(%d)", id) + return errors.Errorf("%v was dropped or truncated", name) + } + value := roachpb.Value{RawBytes: unsafeValue} + var desc descpb.Descriptor + if err = value.GetProto(&desc); err != nil { + return err + } + b := catalogkv.NewBuilderWithMVCCTimestamp(&desc, k.Timestamp) + descriptorsRead = append(descriptorsRead, b.BuildImmutable()) + } + }(); err != nil { + return nil, err + } + } + return descriptorsRead, nil +} + +// Read an older descriptor version for the particular timestamp from the store +// with at most 2 KV calls. We unfortunately need to read more than one +// descriptor version just so that we can set the expiration time on the +// descriptor properly. // // TODO(vivek): Future work: -// 1. Read multiple versions of a descriptor through one kv call. // 2. Translate multiple simultaneous calls to this method into a single call // as is done for acquireNodeLease(). // 3. Figure out a sane policy on when these descriptors should be purged. @@ -197,59 +275,48 @@ type historicalDescriptor struct { func (m *Manager) readOlderVersionForTimestamp( ctx context.Context, id descpb.ID, timestamp hlc.Timestamp, ) ([]historicalDescriptor, error) { - expiration, done := func() (hlc.Timestamp, bool) { - t := m.findDescriptorState(id, false /* create */) + // Retrieve the endTimestamp for our query, which will be the modification + // time of the first descriptor in the manager's active set. + t := m.findDescriptorState(id, false /*create*/) + endTimestamp := func() hlc.Timestamp { t.mu.Lock() defer t.mu.Unlock() - afterIdx := 0 - // Walk back the versions to find one that is valid for the timestamp. - for i := len(t.mu.active.data) - 1; i >= 0; i-- { - // Check to see if the ModificationTime is valid. - if desc := t.mu.active.data[i]; desc.GetModificationTime().LessEq(timestamp) { - if expiration := desc.getExpiration(); timestamp.Less(expiration) { - // Existing valid descriptor version. - return expiration, true - } - // We need a version after data[i], but before data[i+1]. - // We could very well use the timestamp to read the - // descriptor, but unfortunately we will not be able to assign - // it a proper expiration time. Therefore, we read - // descriptor versions one by one from afterIdx back into the - // past until we find a valid one. - afterIdx = i + 1 - break - } + if len(t.mu.active.data) == 0 { + return hlc.Timestamp{} } - - if afterIdx == len(t.mu.active.data) { - return hlc.Timestamp{}, true - } - - // Read descriptor versions one by one into the past until we - // find a valid one. Every version is assigned an expiration time that - // is the ModificationTime of the previous one read. - return t.mu.active.data[afterIdx].GetModificationTime(), false + return t.mu.active.data[0].GetModificationTime() }() - if done { - return nil, nil + + // Make an export request for descriptors between the start and end + // timestamps, returning descriptors in decreasing modification time order. + descs, err := m.getDescriptorsFromStoreForInterval(ctx, m.DB(), id, timestamp, endTimestamp) + if err != nil { + return nil, err } - // Read descriptors from the store. + // Iterate from the latest modification time from retrieved descriptors. var versions []historicalDescriptor - for { - desc, err := m.storage.getForExpiration(ctx, expiration, id) + subsequentTimestamp := endTimestamp + for i := 0; i < len(descs); i++ { + versions = append(versions, historicalDescriptor{ + desc: descs[i], + expiration: subsequentTimestamp, + }) + subsequentTimestamp = descs[i].GetModificationTime() + } + + // In the case where the descriptor we're looking for is modified before the + // earliest retrieved timestamp, we get the descriptor before the first + // descriptor by making another KV call. + if timestamp.Less(subsequentTimestamp) { + desc, err := m.storage.getForExpiration(ctx, subsequentTimestamp, id) if err != nil { return nil, err } versions = append(versions, historicalDescriptor{ desc: desc, - expiration: expiration, + expiration: subsequentTimestamp, }) - if desc.GetModificationTime().LessEq(timestamp) { - break - } - // Set the expiration time for the next descriptor. - expiration = desc.GetModificationTime() } return versions, nil @@ -773,7 +840,7 @@ type LeasedDescriptor interface { } // Acquire acquires a read lease for the specified descriptor ID valid for -// the timestamp. It returns the descriptor and a expiration time. +// the timestamp. It returns the descriptor and an expiration time. // A transaction using this descriptor must ensure that its // commit-timestamp < expiration-time. Care must be taken to not modify // the returned descriptor. diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 1116755621e3..e57e2b7d34e3 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -52,7 +52,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -2505,99 +2504,126 @@ func TestHistoricalAcquireDroppedDescriptor(t *testing.T) { tdb.CheckQueryResults(t, `WITH a AS (SELECT 'a'::`+typeName+`) SELECT * FROM a AS OF SYSTEM TIME `+now, [][]string{{"a"}}) } -// Test that attempts to use a descriptor at a timestamp that precedes when -// a descriptor is dropped but follows the notification that that descriptor -// was dropped will successfully acquire the lease. -func TestLeaseAcquireAfterDropWithEarlierTimestamp(t *testing.T) { +// Tests retrieving older versions within a given start and end timestamp of a +// table descriptor from store through an ExportRequest. +func TestHistoricalExportRequestForTimeRange(t *testing.T) { defer leaktest.AfterTest(t)() + var stopper *stop.Stopper + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{}, + }) + stopper = tc.Stopper() + ctx := context.Background() + defer stopper.Stop(ctx) + tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) + + // Create a schema, create table, alter table a few times to get some history + // of tables while keeping checkpoints (timestamps), and call export request + // to see if contents are matching as expected between specific time + // intervals. + tdb.Exec(t, "CREATE SCHEMA sc") + tdb.Exec(t, "CREATE TABLE sc.foo (i INT PRIMARY KEY)") + tdb.Exec(t, "INSERT INTO sc.foo VALUES (1)") - // descID is the ID of the table we're dropping. - var descID atomic.Value - descID.Store(descpb.ID(0)) - type refreshEvent struct { - unblock chan struct{} - ts hlc.Timestamp + var ts1Str string + tdb.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&ts1Str) + ts1, err := sql.ParseHLC(ts1Str) + require.NoError(t, err) + + tdb.Exec(t, "ALTER SCHEMA sc RENAME TO sc2") + tdb.Exec(t, "ALTER TABLE sc2.foo ADD COLUMN id UUID NOT NULL DEFAULT gen_random_uuid()") + tdb.Exec(t, "ALTER TABLE sc2.foo RENAME COLUMN i TO former_id") + tdb.Exec(t, "ALTER TABLE sc2.foo RENAME COLUMN id TO current_id") + tdb.Exec(t, "CREATE TYPE status AS ENUM ('open', 'closed', 'inactive')") + tdb.Exec(t, "ALTER TYPE status DROP VALUE 'inactive'") + tdb.Exec(t, "ALTER TYPE status DROP VALUE 'open'") + + var ts2Str string + tdb.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&ts2Str) + ts2, err := sql.ParseHLC(ts2Str) + require.NoError(t, err) + + // Store table descriptor ID + var tableID atomic.Value + storeID := func(val *atomic.Value, name string) { + var id descpb.ID + tdb.QueryRow(t, `SELECT id FROM system.namespace WHERE name = $1`, name).Scan(&id) + require.NotEqual(t, descpb.ID(0), id) + val.Store(id) } - refreshed := make(chan refreshEvent) + storeID(&tableID, "foo") + + // Export Request for descriptor versions between ts1 and ts2. Waits for the + // most recent version with the name col and removes manager's active + // descriptorVersions before doing so for test purposes. + manager := tc.Server(0).LeaseManager().(*lease.Manager) + descriptorID := tableID.Load().(descpb.ID) + _, err = manager.WaitForOneVersion(ctx, descriptorID, base.DefaultRetryOptions()) + require.NoError(t, err) + historicalDescs, err := manager.TestingDescriptorExportRequest(ctx, descriptorID, ts1, ts2) + assert.NoError(t, err) + + // Assert returned descriptors modification times are between ts1 and ts2 and the IDs match our query desc id + for _, desc := range historicalDescs { + modificationTime := desc.GetModificationTime() + assert.Truef(t, ts1.Less(modificationTime), "ts1: %s, modification: %s", ts1.String(), modificationTime.String()) + assert.Truef(t, modificationTime.Less(ts2), "modification: %s, ts2: %s", modificationTime.String(), ts2.String()) + assert.Equalf(t, descriptorID, desc.GetID(), "(ID) Expected: %d, Got: %d", descriptorID, desc.GetID()) + } +} + +// Tests acquiring read leases on previous versions of a table descriptor from +// store. +func TestHistoricalDescriptorAcquire(t *testing.T) { + defer leaktest.AfterTest(t)() var stopper *stop.Stopper tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - SQLLeaseManager: &lease.ManagerTestingKnobs{ - TestingDescriptorRefreshedEvent: func(descriptor *descpb.Descriptor) { - if descpb.GetDescriptorID(descriptor) != descID.Load().(descpb.ID) { - return - } - unblock := make(chan struct{}) - select { - case refreshed <- refreshEvent{ - unblock: unblock, - ts: descpb.GetDescriptorModificationTime(descriptor), - }: - case <-stopper.ShouldQuiesce(): - } - select { - case <-unblock: - case <-stopper.ShouldQuiesce(): - } - }, - }, - }, - }, + ServerArgs: base.TestServerArgs{}, }) stopper = tc.Stopper() ctx := context.Background() defer stopper.Stop(ctx) tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - // Create a schema, create a table in that schema, insert into it, drop it, - // detect the drop has made its way to the lease manager and thus the lease - // has been removed, and note the timestamp at which the drop occurred, then - // ensure that the descriptors can be read at the previous timestamp. + // Create a schema, create table, alter table a few times to get some history + // of tables while keeping timestamp checkpoints for acquire query tdb.Exec(t, "CREATE SCHEMA sc") tdb.Exec(t, "CREATE TABLE sc.foo (i INT PRIMARY KEY)") tdb.Exec(t, "INSERT INTO sc.foo VALUES (1)") - { + + var ts1Str string + tdb.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&ts1Str) + ts1, err := sql.ParseHLC(ts1Str) + require.NoError(t, err) + + tdb.Exec(t, "ALTER TABLE sc.foo ADD COLUMN id UUID NOT NULL DEFAULT gen_random_uuid()") + tdb.Exec(t, "ALTER TABLE sc.foo RENAME COLUMN i TO former_id") + tdb.Exec(t, "ALTER TABLE sc.foo RENAME COLUMN id TO current_id") + + // Store table descriptor ID + var tableID atomic.Value + storeID := func(val *atomic.Value, name string) { var id descpb.ID - tdb.QueryRow(t, `SELECT id FROM system.namespace WHERE name = $1`, "sc").Scan(&id) + tdb.QueryRow(t, `SELECT id FROM system.namespace WHERE name = $1`, name).Scan(&id) require.NotEqual(t, descpb.ID(0), id) - descID.Store(id) + val.Store(id) } - dropErr := make(chan error, 1) - go func() { - _, err := tc.ServerConn(0).Exec("DROP SCHEMA sc CASCADE") - dropErr <- err - }() + storeID(&tableID, "foo") + + // Acquire descriptor version valid at timestamp ts1. Waits for the most + // recent version with the name column before doing so. + _, err = tc.Server(0).LeaseManager().(*lease.Manager).WaitForOneVersion(ctx, tableID.Load().(descpb.ID), base.DefaultRetryOptions()) + require.NoError(t, err, "Failed to wait for one version of descriptor: %s", err) + acquiredDescriptor, err := + tc.Server(0).LeaseManager().(*lease.Manager).Acquire(ctx, ts1, tableID.Load().(descpb.ID)) + assert.NoError(t, err) - // Observe that the lease manager has now marked the descriptor as dropped. - ev := <-refreshed - - // Ensure that reads at the previous timestamp will succeed. Before the - // commit that introduced this test, they would fail because the fallback - // used to read the table descriptor from the store did not exist for the - // schema. After this commit, there is no fallback and the lease manager - // properly serves the right version for both. - tdb.CheckQueryResults(t, - "SELECT * FROM sc.foo AS OF SYSTEM TIME "+ev.ts.Prev().AsOfSystemTime(), - [][]string{{"1"}}) - - // Test that using a timestamp equal to the timestamp at which the descriptor - // is dropped results in the proper error. - tdb.ExpectErr(t, `relation "sc.foo" does not exist`, - "SELECT * FROM sc.foo AS OF SYSTEM TIME "+ev.ts.AsOfSystemTime()) - - // Also ensure that the subsequent timestamp gets the same error. - tdb.ExpectErr(t, `relation "sc.foo" does not exist`, - "SELECT * FROM sc.foo AS OF SYSTEM TIME "+ev.ts.Next().AsOfSystemTime()) - - // Allow everything to continue. - close(ev.unblock) - require.NoError(t, <-dropErr) - - // Test again, after the namespace entry has been fully removed, that the - // query returns the exact same error. - tdb.ExpectErr(t, `relation "sc.foo" does not exist`, - "SELECT * FROM sc.foo AS OF SYSTEM TIME "+ev.ts.AsOfSystemTime()) + // Ensure the modificationTime <= timestamp < expirationTime + modificationTime := acquiredDescriptor.Underlying().GetModificationTime() + assert.Truef(t, modificationTime.LessEq(ts1) && + ts1.Less(acquiredDescriptor.Expiration()), "modification: %s, ts1: %s, "+ + "expiration: %s", modificationTime.String(), ts1.String(), + acquiredDescriptor.Expiration().String()) } func TestDropDescriptorRacesWithAcquisition(t *testing.T) { diff --git a/pkg/sql/catalog/lease/testutils.go b/pkg/sql/catalog/lease/testutils.go index 8d5b7b5d4cde..056f7c29ee13 100644 --- a/pkg/sql/catalog/lease/testutils.go +++ b/pkg/sql/catalog/lease/testutils.go @@ -94,3 +94,22 @@ func (m *Manager) TestingAcquireAndAssertMinVersion( func (m *Manager) TestingOutstandingLeasesGauge() *metric.Gauge { return m.storage.outstandingLeases } + +// TestingDescriptorExportRequest calls an export request for all descriptors +// within the given range from the descriptor KV layer. Before calling export +// request, we empty the manager's active leases to ensure we are retrieving +// from store. +func (m *Manager) TestingDescriptorExportRequest( + ctx context.Context, id descpb.ID, start, end hlc.Timestamp, +) ([]catalog.Descriptor, error) { + // clear out current manager's descriptorStates' active leases, call export request + m.mu.Lock() + for _, mDesc := range m.mu.descriptors { + mDesc.mu.Lock() + mDesc.mu.active.data = nil + mDesc.mu.Unlock() + } + m.mu.Unlock() + + return m.getDescriptorsFromStoreForInterval(ctx, m.DB(), id, start, end) +}