From e96ddba39f923ce75e84622248664921cae3074d Mon Sep 17 00:00:00 2001 From: jameswsj10 Date: Wed, 22 Sep 2021 10:54:59 -0700 Subject: [PATCH] sql: improve historical descriptor look up efficiency Fixes #70692. The existing implementation for looking up old historical descriptors required multiple round trips to storage. This improvement requires only 1, at most 2, KV calls to storage by using a single ExportRequest. Release note (performance improvement): improve efficiency of looking up old historical descriptors. --- pkg/ccl/backupccl/backup_test.go | 13 +- pkg/ccl/changefeedccl/cdctest/BUILD.bazel | 4 +- pkg/ccl/changefeedccl/cdctest/validator.go | 6 +- .../changefeedccl/cdctest/validator_test.go | 6 +- pkg/kv/kvserver/replica_rangefeed_test.go | 4 +- pkg/sql/catalog/lease/BUILD.bazel | 2 + pkg/sql/catalog/lease/lease.go | 196 +++++++++++++----- pkg/sql/catalog/lease/lease_internal_test.go | 135 ++++++++++++ pkg/sql/catalog/lease/lease_test.go | 114 ++++------ pkg/sql/drop_test.go | 3 +- pkg/sql/exec_util.go | 21 -- pkg/sql/revert_test.go | 3 +- pkg/sql/sem/tree/as_of.go | 21 ++ 13 files changed, 362 insertions(+), 166 deletions(-) diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 65e17da5480b..9e5eb9989ab9 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -6753,12 +6753,19 @@ func TestPaginatedBackupTenant(t *testing.T) { return fmt.Sprintf("%v%s", span.String(), spanStr) } + // Check if export request is from a lease for a descriptor to avoid picking + // up on wrong export requests + isLeasingExportRequest := func(r *roachpb.ExportRequest) bool { + _, tenantID, _ := keys.DecodeTenantPrefix(r.Key) + codec := keys.MakeSQLCodec(tenantID) + return bytes.HasPrefix(r.Key, codec.DescMetadataPrefix()) && + r.EndKey.Equal(r.Key.PrefixEnd()) + } params.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{ TestingRequestFilter: func(ctx context.Context, request roachpb.BatchRequest) *roachpb.Error { for _, ru := range request.Requests { - switch ru.GetInner().(type) { - case *roachpb.ExportRequest: - exportRequest := ru.GetInner().(*roachpb.ExportRequest) + if exportRequest, ok := ru.GetInner().(*roachpb.ExportRequest); ok && + !isLeasingExportRequest(exportRequest) { exportRequestSpans = append( exportRequestSpans, requestSpanStr(roachpb.Span{Key: exportRequest.Key, EndKey: exportRequest.EndKey}, exportRequest.ResumeKeyTS), diff --git a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel index d654a9e8ab06..e2d4f68cb935 100644 --- a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel @@ -17,7 +17,7 @@ go_library( "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/roachpb:with-mocks", - "//pkg/sql", + "//pkg/sql/sem/tree", "//pkg/testutils/serverutils", "//pkg/util/fsm", "//pkg/util/hlc", @@ -45,7 +45,7 @@ go_test( "//pkg/security", "//pkg/security/securitytest", "//pkg/server", - "//pkg/sql", + "//pkg/sql/sem/tree", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", diff --git a/pkg/ccl/changefeedccl/cdctest/validator.go b/pkg/ccl/changefeedccl/cdctest/validator.go index 78f360205a74..eb9561e5fbc5 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator.go +++ b/pkg/ccl/changefeedccl/cdctest/validator.go @@ -17,7 +17,7 @@ import ( "strings" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/errors" ) @@ -675,14 +675,14 @@ func ParseJSONValueTimestamps(v []byte) (updated, resolved hlc.Timestamp, err er } if valueRaw.Updated != `` { var err error - updated, err = sql.ParseHLC(valueRaw.Updated) + updated, err = tree.ParseHLC(valueRaw.Updated) if err != nil { return hlc.Timestamp{}, hlc.Timestamp{}, err } } if valueRaw.Resolved != `` { var err error - resolved, err = sql.ParseHLC(valueRaw.Resolved) + resolved, err = tree.ParseHLC(valueRaw.Resolved) if err != nil { return hlc.Timestamp{}, hlc.Timestamp{}, err } diff --git a/pkg/ccl/changefeedccl/cdctest/validator_test.go b/pkg/ccl/changefeedccl/cdctest/validator_test.go index af4786fa110e..3990672bdaa5 100644 --- a/pkg/ccl/changefeedccl/cdctest/validator_test.go +++ b/pkg/ccl/changefeedccl/cdctest/validator_test.go @@ -15,7 +15,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -127,7 +127,7 @@ func TestBeforeAfterValidator(t *testing.T) { ts := make([]hlc.Timestamp, len(tsRaw)) for i := range tsRaw { var err error - ts[i], err = sql.ParseHLC(tsRaw[i]) + ts[i], err = tree.ParseHLC(tsRaw[i]) if err != nil { t.Fatal(err) } @@ -280,7 +280,7 @@ func TestFingerprintValidator(t *testing.T) { ts := make([]hlc.Timestamp, len(tsRaw)) for i := range tsRaw { var err error - ts[i], err = sql.ParseHLC(tsRaw[i]) + ts[i], err = tree.ParseHLC(tsRaw[i]) if err != nil { t.Fatal(err) } diff --git a/pkg/kv/kvserver/replica_rangefeed_test.go b/pkg/kv/kvserver/replica_rangefeed_test.go index bf87f2a0a1ad..181281781790 100644 --- a/pkg/kv/kvserver/replica_rangefeed_test.go +++ b/pkg/kv/kvserver/replica_rangefeed_test.go @@ -27,7 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -873,7 +873,7 @@ func TestReplicaRangefeedPushesTransactions(t *testing.T) { // if it ever gets pushed. var ts2Str string require.NoError(t, tx1.QueryRowContext(ctx, "SELECT cluster_logical_timestamp()").Scan(&ts2Str)) - ts2, err := sql.ParseHLC(ts2Str) + ts2, err := tree.ParseHLC(ts2Str) require.NoError(t, err) // Wait for the RangeFeed checkpoint on each RangeFeed to exceed this timestamp. diff --git a/pkg/sql/catalog/lease/BUILD.bazel b/pkg/sql/catalog/lease/BUILD.bazel index f76b46893634..4234d159929d 100644 --- a/pkg/sql/catalog/lease/BUILD.bazel +++ b/pkg/sql/catalog/lease/BUILD.bazel @@ -25,12 +25,14 @@ go_library( "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/catalog", + "//pkg/sql/catalog/catalogkeys", "//pkg/sql/catalog/catalogkv", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/nstree", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", "//pkg/sql/sqlutil", + "//pkg/storage", "//pkg/util/grpcutil", "//pkg/util/hlc", "//pkg/util/log", diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 7091cbdb08a9..072b595bc523 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -27,11 +27,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + kvstorage "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" @@ -183,76 +185,166 @@ type historicalDescriptor struct { expiration hlc.Timestamp // ModificationTime of the next descriptor } -// Read an older descriptor version for the particular timestamp -// from the store. We unfortunately need to read more than one descriptor -// version just so that we can set the expiration time on the descriptor -// properly. +// Retrieves historical descriptors of given id within the lower and upper bound +// timestamp from the MVCC key range in decreasing modification time order. Any +// descriptor versions that were modified in the range [lower, upper) will be +// retrieved through an export request. // -// TODO(vivek): Future work: -// 1. Read multiple versions of a descriptor through one kv call. -// 2. Translate multiple simultaneous calls to this method into a single call +// In the following scenario v4 is our oldest active lease +// [v1@t1 ][v2@t3 ][v3@t5 ][v4@t7 +// [start end] +// getDescriptorsFromStoreForInterval(..., start, end) will get back: +// [v3, v2] (reverse order) +// +// Note that this does not necessarily retrieve a descriptor version that was +// alive at the lower bound timestamp. +func getDescriptorsFromStoreForInterval( + ctx context.Context, + db *kv.DB, + codec keys.SQLCodec, + id descpb.ID, + lowerBound, upperBound hlc.Timestamp, +) ([]historicalDescriptor, error) { + // Create an export request (1 kv call) for all descriptors for given + // descriptor ID written during the interval [timestamp, endTimestamp). + batchRequestHeader := roachpb.Header{} + if upperBound.WallTime != 0 { + batchRequestHeader = roachpb.Header{Timestamp: upperBound.Prev()} + } + descriptorKey := catalogkeys.MakeDescMetadataKey(codec, id) + requestHeader := roachpb.RequestHeader{ + Key: descriptorKey, + EndKey: descriptorKey.PrefixEnd(), + } + req := &roachpb.ExportRequest{ + RequestHeader: requestHeader, + StartTime: lowerBound.Prev(), + MVCCFilter: roachpb.MVCCFilter_All, + ReturnSST: true, + } + + // Export request returns descriptors in decreasing modification time. + res, pErr := kv.SendWrappedWith(ctx, db.NonTransactionalSender(), batchRequestHeader, req) + if pErr != nil { + return nil, errors.Wrapf(pErr.GoError(), "error in retrieving descs between %s, %s", + lowerBound, upperBound) + } + + // Unmarshal key span retrieved from export request to construct historical descs. + var descriptorsRead []historicalDescriptor + // Keep track of the most recently processed descriptor's modification time to + // set as the expiration for the next descriptor to process. Recall we process + // descriptors in decreasing modification time. + subsequentModificationTime := upperBound + for _, file := range res.(*roachpb.ExportResponse).Files { + if err := func() error { + it, err := kvstorage.NewMemSSTIterator(file.SST, false /* verify */) + if err != nil { + return err + } + defer it.Close() + + // Convert each MVCC key value pair corresponding to the specified + // descriptor ID. + for it.SeekGE(kvstorage.NilKey); ; it.Next() { + if ok, err := it.Valid(); err != nil { + return err + } else if !ok { + return nil + } + + // Decode key and value of descriptor. + k := it.UnsafeKey() + descContent := it.UnsafeValue() + if descContent == nil { + return errors.Wrapf(errors.New("unsafe value error"), "error "+ + "extracting raw bytes of descriptor with key %s modified between "+ + "%s, %s", k.String(), k.Timestamp, subsequentModificationTime) + } + + // Construct a plain descriptor. + value := roachpb.Value{RawBytes: descContent} + var desc descpb.Descriptor + if err := value.GetProto(&desc); err != nil { + return err + } + descBuilder := catalogkv.NewBuilderWithMVCCTimestamp(&desc, k.Timestamp) + + // Construct a historical descriptor with expiration. + histDesc := historicalDescriptor{ + desc: descBuilder.BuildImmutable(), + expiration: subsequentModificationTime, + } + descriptorsRead = append(descriptorsRead, histDesc) + + // Update the expiration time for next descriptor. + subsequentModificationTime = k.Timestamp + } + }(); err != nil { + return nil, err + } + } + return descriptorsRead, nil +} + +// Read older descriptor versions for the particular timestamp from store +// through an ExportRequest. The ExportRequest queries the key span for versions +// in range [timestamp, earliest modification time in memory) or [timestamp:] if +// there are no active leases in memory. This is followed by a call to +// getForExpiration in case the ExportRequest doesn't grab the earliest +// descriptor version we are interested in, resulting at most 2 KV calls. +// +// TODO(vivek, james): Future work: +// 1. Translate multiple simultaneous calls to this method into a single call // as is done for acquireNodeLease(). -// 3. Figure out a sane policy on when these descriptors should be purged. +// 2. Figure out a sane policy on when these descriptors should be purged. // They are currently purged in PurgeOldVersions. func (m *Manager) readOlderVersionForTimestamp( ctx context.Context, id descpb.ID, timestamp hlc.Timestamp, ) ([]historicalDescriptor, error) { - expiration, done := func() (hlc.Timestamp, bool) { - t := m.findDescriptorState(id, false /* create */) + // Retrieve the endTimestamp for our query, which will be the modification + // time of the first descriptor in the manager's active set. + t := m.findDescriptorState(id, false /*create*/) + endTimestamp := func() hlc.Timestamp { t.mu.Lock() defer t.mu.Unlock() - afterIdx := 0 - // Walk back the versions to find one that is valid for the timestamp. - for i := len(t.mu.active.data) - 1; i >= 0; i-- { - // Check to see if the ModificationTime is valid. - if desc := t.mu.active.data[i]; desc.GetModificationTime().LessEq(timestamp) { - if expiration := desc.getExpiration(); timestamp.Less(expiration) { - // Existing valid descriptor version. - return expiration, true - } - // We need a version after data[i], but before data[i+1]. - // We could very well use the timestamp to read the - // descriptor, but unfortunately we will not be able to assign - // it a proper expiration time. Therefore, we read - // descriptor versions one by one from afterIdx back into the - // past until we find a valid one. - afterIdx = i + 1 - break - } + if len(t.mu.active.data) == 0 { + return hlc.Timestamp{} } - - if afterIdx == len(t.mu.active.data) { - return hlc.Timestamp{}, true - } - - // Read descriptor versions one by one into the past until we - // find a valid one. Every version is assigned an expiration time that - // is the ModificationTime of the previous one read. - return t.mu.active.data[afterIdx].GetModificationTime(), false + return t.mu.active.data[0].GetModificationTime() }() - if done { - return nil, nil + + // Retrieve descriptors in range [timestamp, endTimestamp) in decreasing + // modification time order. + descs, err := getDescriptorsFromStoreForInterval(ctx, m.DB(), m.Codec(), id, timestamp, endTimestamp) + if err != nil { + return nil, err + } + if len(descs) == 0 { + return descs, nil } - // Read descriptors from the store. - var versions []historicalDescriptor - for { - desc, err := m.storage.getForExpiration(ctx, expiration, id) + // In the case where the descriptor we're looking for is modified before the + // input timestamp, we get the descriptor before the earliest descriptor + // retrieved from getDescriptorsFromStoreForInterval by making another KV + // call. + earliestModificationTime := descs[len(descs)-1].desc.GetModificationTime() + + // Unless the timestamp is exactly at the earliest modification time from + // ExportRequest, we'll invoke another call to retrieve the descriptor with + // modification time prior to the timestamp. + if timestamp.Less(earliestModificationTime) { + desc, err := m.storage.getForExpiration(ctx, earliestModificationTime, id) if err != nil { return nil, err } - versions = append(versions, historicalDescriptor{ + descs = append(descs, historicalDescriptor{ desc: desc, - expiration: expiration, + expiration: earliestModificationTime, }) - if desc.GetModificationTime().LessEq(timestamp) { - break - } - // Set the expiration time for the next descriptor. - expiration = desc.GetModificationTime() } - return versions, nil + return descs, nil } // Insert descriptor versions. The versions provided are not in @@ -773,7 +865,7 @@ type LeasedDescriptor interface { } // Acquire acquires a read lease for the specified descriptor ID valid for -// the timestamp. It returns the descriptor and a expiration time. +// the timestamp. It returns the descriptor and an expiration time. // A transaction using this descriptor must ensure that its // commit-timestamp < expiration-time. Care must be taken to not modify // the returned descriptor. diff --git a/pkg/sql/catalog/lease/lease_internal_test.go b/pkg/sql/catalog/lease/lease_internal_test.go index 834d82b95105..175080bf4efd 100644 --- a/pkg/sql/catalog/lease/lease_internal_test.go +++ b/pkg/sql/catalog/lease/lease_internal_test.go @@ -29,10 +29,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" + "github.com/stretchr/testify/require" ) func TestTableSet(t *testing.T) { @@ -1051,3 +1054,135 @@ func TestLeaseAcquireAndReleaseConcurrently(t *testing.T) { }) } } + +// Tests retrieving older versions within a given start and end timestamp of a +// table descriptor from store through an ExportRequest. +func TestReadOlderVersionForTimestamp(t *testing.T) { + defer leaktest.AfterTest(t)() + var stopper *stop.Stopper + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) + stopper = s.Stopper() + ctx := context.Background() + defer stopper.Stop(ctx) + + tdb := sqlutils.MakeSQLRunner(sqlDB) + // Prevent non-explicit Acquire to leases for testing purposes. + tdb.Exec(t, "SET CLUSTER SETTING sql.tablecache.lease.refresh_limit = 0") + tdb.Exec(t, "CREATE TABLE foo (i INT PRIMARY KEY)") + var tableID descpb.ID + tdb.QueryRow(t, "SELECT id FROM system.namespace WHERE name = 'foo'").Scan(&tableID) + + manager := s.LeaseManager().(*Manager) + const N = 5 + descs := make([]catalog.Descriptor, N+1) + + // Create N versions of table descriptor + for i := 0; i < N; i++ { + _, err := manager.Publish(ctx, tableID, func(desc catalog.MutableDescriptor) error { + descs[i] = desc.ImmutableCopy() // will be the previous version + return nil + }, nil) + require.NoError(t, err) + } + { + last, err := manager.Acquire(ctx, s.Clock().Now(), tableID) + require.NoError(t, err) + descs[N] = last.Underlying() + last.Release(ctx) + } + + type version int + versionTS := func(v version) hlc.Timestamp { + return descs[v].GetModificationTime() + } + versionDesc := func(v version) catalog.Descriptor { + return descs[v] + } + type testCase struct { + before []version + ts hlc.Timestamp + expected []version + } + + // Test historical read for descriptors as of specific timestamps and confirm + // expected data. + // [v0 ---)[v1 --)[v2 ---)[v3 ----)[v4 -----)[v5 ------) + for _, tc := range []testCase{ + { + before: []version{}, + ts: versionTS(0), + expected: []version{0, 1, 2, 3, 4, 5}, + }, + { + before: []version{}, + ts: versionTS(3), + expected: []version{3, 4, 5}, + }, + { + before: []version{}, + ts: versionTS(5), + expected: []version{5}, + }, + { + before: []version{}, + ts: versionTS(5).Prev(), + expected: []version{4, 5}, + }, + { + before: []version{5}, + ts: versionTS(3).Prev(), + expected: []version{2, 3, 4}, + }, + { + before: []version{5}, + ts: versionTS(4), + expected: []version{4}, + }, + { + before: []version{4, 5}, + ts: versionTS(2).Prev(), + expected: []version{1, 2, 3}, + }, + { + before: []version{0, 1, 2, 3, 4, 5}, + ts: versionTS(3), + expected: []version{}, + }, + } { + t.Run(fmt.Sprintf("%v@%v->%v", tc.before, tc.ts, tc.expected), func(t *testing.T) { + // Reset the descriptor state to before versions. + manager.mu.Lock() + descStates := manager.mu.descriptors + descStates[tableID] = nil + descStates[tableID] = &descriptorState{m: manager, id: tableID} + for _, v := range tc.before { + descStates[tableID].mu.active.insert( + &descriptorVersionState{ + t: descStates[tableID], + Descriptor: versionDesc(v), + }, + ) + } + manager.mu.Unlock() + + // Retrieve historicalDescriptors modification times. + retrieved, err := manager.readOlderVersionForTimestamp(ctx, tableID, tc.ts) + require.NoError(t, err) + var actualTS []hlc.Timestamp + for _, desc := range retrieved { + actualTS = append(actualTS, desc.desc.GetModificationTime()) + } + + // Validate retrieved descriptors match expected versions. + var expectedTS []hlc.Timestamp + for _, e := range tc.expected { + expectedTS = append(expectedTS, versionTS(e)) + } + require.Equal(t, len(tc.expected), len(actualTS), "\nexpected: %v\nactual: %v\n", expectedTS, actualTS) + for i, eTS := range expectedTS { + aTS := actualTS[len(actualTS)-i-1] + require.Equal(t, eTS, aTS, "expected: %v, actual: %v\n", eTS, actualTS) + } + }) + } +} diff --git a/pkg/sql/catalog/lease/lease_test.go b/pkg/sql/catalog/lease/lease_test.go index 1116755621e3..c7e05520debf 100644 --- a/pkg/sql/catalog/lease/lease_test.go +++ b/pkg/sql/catalog/lease/lease_test.go @@ -52,7 +52,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -2505,99 +2504,58 @@ func TestHistoricalAcquireDroppedDescriptor(t *testing.T) { tdb.CheckQueryResults(t, `WITH a AS (SELECT 'a'::`+typeName+`) SELECT * FROM a AS OF SYSTEM TIME `+now, [][]string{{"a"}}) } -// Test that attempts to use a descriptor at a timestamp that precedes when -// a descriptor is dropped but follows the notification that that descriptor -// was dropped will successfully acquire the lease. -func TestLeaseAcquireAfterDropWithEarlierTimestamp(t *testing.T) { +// Tests acquiring read leases on previous versions of a table descriptor from +// store. +func TestHistoricalDescriptorAcquire(t *testing.T) { defer leaktest.AfterTest(t)() - - // descID is the ID of the table we're dropping. - var descID atomic.Value - descID.Store(descpb.ID(0)) - type refreshEvent struct { - unblock chan struct{} - ts hlc.Timestamp - } - refreshed := make(chan refreshEvent) var stopper *stop.Stopper tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ - ServerArgs: base.TestServerArgs{ - Knobs: base.TestingKnobs{ - SQLLeaseManager: &lease.ManagerTestingKnobs{ - TestingDescriptorRefreshedEvent: func(descriptor *descpb.Descriptor) { - if descpb.GetDescriptorID(descriptor) != descID.Load().(descpb.ID) { - return - } - unblock := make(chan struct{}) - select { - case refreshed <- refreshEvent{ - unblock: unblock, - ts: descpb.GetDescriptorModificationTime(descriptor), - }: - case <-stopper.ShouldQuiesce(): - } - select { - case <-unblock: - case <-stopper.ShouldQuiesce(): - } - }, - }, - }, - }, + ServerArgs: base.TestServerArgs{}, }) stopper = tc.Stopper() ctx := context.Background() defer stopper.Stop(ctx) tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) - // Create a schema, create a table in that schema, insert into it, drop it, - // detect the drop has made its way to the lease manager and thus the lease - // has been removed, and note the timestamp at which the drop occurred, then - // ensure that the descriptors can be read at the previous timestamp. + // Create a schema, create table, alter table a few times to get some history + // of tables while keeping timestamp checkpoints for acquire query tdb.Exec(t, "CREATE SCHEMA sc") tdb.Exec(t, "CREATE TABLE sc.foo (i INT PRIMARY KEY)") tdb.Exec(t, "INSERT INTO sc.foo VALUES (1)") - { + + var ts1Str string + tdb.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&ts1Str) + ts1, err := tree.ParseHLC(ts1Str) + require.NoError(t, err) + + tdb.Exec(t, "ALTER TABLE sc.foo ADD COLUMN id UUID NOT NULL DEFAULT gen_random_uuid()") + tdb.Exec(t, "ALTER TABLE sc.foo RENAME COLUMN i TO former_id") + tdb.Exec(t, "ALTER TABLE sc.foo RENAME COLUMN id TO current_id") + + // Store table descriptor ID + var tableID atomic.Value + storeID := func(val *atomic.Value, name string) { var id descpb.ID - tdb.QueryRow(t, `SELECT id FROM system.namespace WHERE name = $1`, "sc").Scan(&id) + tdb.QueryRow(t, `SELECT id FROM system.namespace WHERE name = $1`, name).Scan(&id) require.NotEqual(t, descpb.ID(0), id) - descID.Store(id) + val.Store(id) } - dropErr := make(chan error, 1) - go func() { - _, err := tc.ServerConn(0).Exec("DROP SCHEMA sc CASCADE") - dropErr <- err - }() + storeID(&tableID, "foo") + + // Acquire descriptor version valid at timestamp ts1. Waits for the most + // recent version with the name column before doing so. + _, err = tc.Server(0).LeaseManager().(*lease.Manager).WaitForOneVersion(ctx, tableID.Load().(descpb.ID), base.DefaultRetryOptions()) + require.NoError(t, err, "Failed to wait for one version of descriptor: %s", err) + acquiredDescriptor, err := + tc.Server(0).LeaseManager().(*lease.Manager).Acquire(ctx, ts1, tableID.Load().(descpb.ID)) + assert.NoError(t, err) - // Observe that the lease manager has now marked the descriptor as dropped. - ev := <-refreshed - - // Ensure that reads at the previous timestamp will succeed. Before the - // commit that introduced this test, they would fail because the fallback - // used to read the table descriptor from the store did not exist for the - // schema. After this commit, there is no fallback and the lease manager - // properly serves the right version for both. - tdb.CheckQueryResults(t, - "SELECT * FROM sc.foo AS OF SYSTEM TIME "+ev.ts.Prev().AsOfSystemTime(), - [][]string{{"1"}}) - - // Test that using a timestamp equal to the timestamp at which the descriptor - // is dropped results in the proper error. - tdb.ExpectErr(t, `relation "sc.foo" does not exist`, - "SELECT * FROM sc.foo AS OF SYSTEM TIME "+ev.ts.AsOfSystemTime()) - - // Also ensure that the subsequent timestamp gets the same error. - tdb.ExpectErr(t, `relation "sc.foo" does not exist`, - "SELECT * FROM sc.foo AS OF SYSTEM TIME "+ev.ts.Next().AsOfSystemTime()) - - // Allow everything to continue. - close(ev.unblock) - require.NoError(t, <-dropErr) - - // Test again, after the namespace entry has been fully removed, that the - // query returns the exact same error. - tdb.ExpectErr(t, `relation "sc.foo" does not exist`, - "SELECT * FROM sc.foo AS OF SYSTEM TIME "+ev.ts.AsOfSystemTime()) + // Ensure the modificationTime <= timestamp < expirationTime + modificationTime := acquiredDescriptor.Underlying().GetModificationTime() + assert.Truef(t, modificationTime.LessEq(ts1) && + ts1.Less(acquiredDescriptor.Expiration()), "modification: %s, ts1: %s, "+ + "expiration: %s", modificationTime.String(), ts1.String(), + acquiredDescriptor.Expiration().String()) } func TestDropDescriptorRacesWithAcquisition(t *testing.T) { diff --git a/pkg/sql/drop_test.go b/pkg/sql/drop_test.go index d6943fcd1d45..70475a0640ea 100644 --- a/pkg/sql/drop_test.go +++ b/pkg/sql/drop_test.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/gcjob" "github.com/cockroachdb/cockroach/pkg/sql/row" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltestutils" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/startupmigrations" @@ -1206,7 +1207,7 @@ WHERE tdb.Exec(t, "INSERT INTO foo VALUES (1)") var afterInsertStr string tdb.QueryRow(t, "SELECT cluster_logical_timestamp()").Scan(&afterInsertStr) - afterInsert, err := sql.ParseHLC(afterInsertStr) + afterInsert, err := tree.ParseHLC(afterInsertStr) require.NoError(t, err) // Now set up a filter to detect when the DROP INDEX execution will begin and diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 0688229d3a19..e2eb4483ee0a 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1605,27 +1605,6 @@ func (p *planner) EvalAsOfTimestamp( return asOf, nil } -// ParseHLC parses a string representation of an `hlc.Timestamp`. -// This differs from hlc.ParseTimestamp in that it parses the decimal -// serialization of an hlc timestamp as opposed to the string serialization -// performed by hlc.Timestamp.String(). -// -// This function is used to parse: -// -// 1580361670629466905.0000000001 -// -// hlc.ParseTimestamp() would be used to parse: -// -// 1580361670.629466905,1 -// -func ParseHLC(s string) (hlc.Timestamp, error) { - dec, _, err := apd.NewFromString(s) - if err != nil { - return hlc.Timestamp{}, err - } - return tree.DecimalToHLC(dec) -} - // isAsOf analyzes a statement to bypass the logic in newPlan(), since // that requires the transaction to be started already. If the returned // timestamp is not nil, it is the timestamp to which a transaction diff --git a/pkg/sql/revert_test.go b/pkg/sql/revert_test.go index 15d2e1599938..df2f6e59591b 100644 --- a/pkg/sql/revert_test.go +++ b/pkg/sql/revert_test.go @@ -23,6 +23,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkv" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -56,7 +57,7 @@ func TestRevertTable(t *testing.T) { var ts string var before int db.QueryRow(t, `SELECT cluster_logical_timestamp(), xor_agg(k # rev) FROM test`).Scan(&ts, &before) - targetTime, err := sql.ParseHLC(ts) + targetTime, err := tree.ParseHLC(ts) require.NoError(t, err) const ignoreGC = false diff --git a/pkg/sql/sem/tree/as_of.go b/pkg/sql/sem/tree/as_of.go index 641fd61e5eb5..55ee64aa7dbe 100644 --- a/pkg/sql/sem/tree/as_of.go +++ b/pkg/sql/sem/tree/as_of.go @@ -313,3 +313,24 @@ func DecimalToHLC(d *apd.Decimal) (hlc.Timestamp, error) { Logical: int32(logical), }, nil } + +// ParseHLC parses a string representation of an `hlc.Timestamp`. +// This differs from hlc.ParseTimestamp in that it parses the decimal +// serialization of an hlc timestamp as opposed to the string serialization +// performed by hlc.Timestamp.String(). +// +// This function is used to parse: +// +// 1580361670629466905.0000000001 +// +// hlc.ParseTimestamp() would be used to parse: +// +// 1580361670.629466905,1 +// +func ParseHLC(s string) (hlc.Timestamp, error) { + dec, _, err := apd.NewFromString(s) + if err != nil { + return hlc.Timestamp{}, err + } + return DecimalToHLC(dec) +}