From 4f9ec3468b98b04ce6e5ffff4950999bc60cd18a Mon Sep 17 00:00:00 2001 From: adityamaru Date: Thu, 9 Feb 2023 09:33:33 -0500 Subject: [PATCH] *: enables elastic CPU limiter for all users of ExportRequest Previously, there was a strange coupling between the elastic CPU limiter and the `header.TargetBytes` DistSender limit set on each ExportRequest. Even if a request was preempted on exhausting its allotted CPU tokens, it would only return from kvserver by virtue of its `header.TargetBytes` being set to a non-zero value. Out of the four users of ExportRequest, only backup set this field to a sentinel value of 1 to limit the number of SSTs we send back in an ExportResponse. The remaining callers of ExportRequest would not return from the kvserver. Instead they would evaluate the request from the resume key immediately, not giving the scheduler a chance to take the goroutine off CPU. This change breaks this coupling by introducing a `resumeInfo` object that indicates whether the resumption was because we were over our CPU limit. If it was, we return an ExportResponse with our progress so far. This change shifts the burden of handling pagination to the client. This seems better than having the server sleep or wait around until its CPU tokens are replenished as the client would be left wondering why a request is taking so long. To that effect this change adds pagination support to the other callers of ExportRequest. Note, we do not set `SplitMidKey` at these other callsites yet. Thus, all pagination will happen at key boundaries in the ExportRequest. A follow-up will add support for `SplitMidKey` to these callers. Informs: #96684 Release note: None --- pkg/BUILD.bazel | 2 + pkg/ccl/backupccl/BUILD.bazel | 1 + pkg/ccl/backupccl/backup_test.go | 47 ++++++ pkg/ccl/backupccl/targets.go | 76 +++++---- pkg/ccl/changefeedccl/schemafeed/BUILD.bazel | 5 + .../changefeedccl/schemafeed/schema_feed.go | 154 ++++++++++-------- .../schemafeed/schema_feed_test.go | 89 +++++++++- pkg/kv/kvclient/BUILD.bazel | 33 +++- pkg/kv/kvclient/main_test.go | 38 +++++ pkg/kv/kvclient/revision_reader.go | 116 +++++++------ pkg/kv/kvclient/revision_reader_test.go | 90 ++++++++++ pkg/kv/kvpb/api.proto | 4 + pkg/kv/kvserver/batcheval/BUILD.bazel | 1 + pkg/kv/kvserver/batcheval/cmd_export.go | 83 ++++++++-- pkg/kv/kvserver/batcheval/cmd_export_test.go | 4 +- pkg/sql/catalog/lease/BUILD.bazel | 1 + pkg/sql/catalog/lease/lease.go | 149 +++++++++-------- pkg/sql/catalog/lease/lease_internal_test.go | 57 +++++++ pkg/sql/delete_preserving_index_test.go | 20 ++- pkg/storage/mvcc.go | 78 +++++---- pkg/storage/mvcc_history_test.go | 10 +- pkg/storage/mvcc_test.go | 20 +-- pkg/util/admission/elastic_cpu_work_handle.go | 6 + 23 files changed, 793 insertions(+), 291 deletions(-) create mode 100644 pkg/kv/kvclient/main_test.go create mode 100644 pkg/kv/kvclient/revision_reader_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 972889f84109..3ae16b294262 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -183,6 +183,7 @@ ALL_TESTS = [ "//pkg/kv/kvclient/rangefeed/rangefeedbuffer:rangefeedbuffer_test", "//pkg/kv/kvclient/rangefeed/rangefeedcache:rangefeedcache_test", "//pkg/kv/kvclient/rangefeed:rangefeed_test", + "//pkg/kv/kvclient:kvclient_test", "//pkg/kv/kvnemesis:kvnemesis_test", "//pkg/kv/kvpb:kvpb_disallowed_imports_test", "//pkg/kv/kvpb:kvpb_test", @@ -1179,6 +1180,7 @@ GO_TARGETS = [ "//pkg/kv/kvclient/rangefeed:rangefeed_test", "//pkg/kv/kvclient/rangestats:rangestats", "//pkg/kv/kvclient:kvclient", + "//pkg/kv/kvclient:kvclient_test", "//pkg/kv/kvnemesis/kvnemesisutil:kvnemesisutil", "//pkg/kv/kvnemesis:kvnemesis", "//pkg/kv/kvnemesis:kvnemesis_test", diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 6b2d002a19a5..395a046d3362 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -281,6 +281,7 @@ go_test( "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util", + "//pkg/util/admission", "//pkg/util/ctxgroup", "//pkg/util/encoding", "//pkg/util/hlc", diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index 7d61e0476a44..923d31128214 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -95,6 +95,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/ioctx" @@ -10829,3 +10830,49 @@ func TestBackupInLocality(t *testing.T) { db.ExpectErr(t, tc.err, "BACKUP system.users INTO $1 WITH coordinator_locality = $2", fmt.Sprintf("userfile:///tc%d", i), tc.filter) } } + +// TestExportResponseDataSizeZeroCPUPagination verifies that an ExportRequest +// that is preempted by the elastic CPU limiter and has DataSize = 0, is +// returned to the client to handle pagination. +func TestExportResponseDataSizeZeroCPUPagination(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + first := true + var numRequests int + externalDir, dirCleanup := testutils.TempDir(t) + defer dirCleanup() + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + ExternalIODir: externalDir, + Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Error { + for _, ru := range request.Requests { + if _, ok := ru.GetInner().(*kvpb.ExportRequest); ok { + numRequests++ + h := admission.ElasticCPUWorkHandleFromContext(ctx) + if h == nil { + t.Fatalf("expected context to have CPU work handle") + } + h.TestingOverrideOverLimit(func() (bool, time.Duration) { + if first { + first = false + return true, 0 + } + return false, 0 + }) + } + } + return nil + }, + }}, + }) + defer s.Stopper().Stop(ctx) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1), (2)`) + sqlDB.Exec(t, `DELETE FROM foo WHERE a = 1`) + sqlDB.Exec(t, `BACKUP TABLE foo INTO 'nodelocal://1/foo'`) + require.Equal(t, 2, numRequests) +} diff --git a/pkg/ccl/backupccl/targets.go b/pkg/ccl/backupccl/targets.go index 2525602dafb9..51bb3fc1aa0d 100644 --- a/pkg/ccl/backupccl/targets.go +++ b/pkg/ccl/backupccl/targets.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" @@ -191,46 +192,57 @@ func getAllDescChanges( startKey := codec.TablePrefix(keys.DescriptorTableID) endKey := startKey.PrefixEnd() - allRevs, err := kvclient.GetAllRevisions(ctx, db, startKey, endKey, startTime, endTime) - if err != nil { - return nil, err - } + g := ctxgroup.WithContext(ctx) + allRevs := make(chan []kvclient.VersionedValues) + g.GoCtx(func(ctx context.Context) error { + defer close(allRevs) + return kvclient.GetAllRevisions(ctx, db, startKey, endKey, startTime, endTime, allRevs) + }) var res []backuppb.BackupManifest_DescriptorRevision - - for _, revs := range allRevs { - id, err := codec.DecodeDescMetadataID(revs.Key) - if err != nil { - return nil, err - } - for _, rev := range revs.Values { - r := backuppb.BackupManifest_DescriptorRevision{ID: descpb.ID(id), Time: rev.Timestamp} - if len(rev.RawBytes) != 0 { - // We update the modification time for the descriptors here with the - // timestamp of the KV row so that we can identify the appropriate - // descriptors to use during restore. - // Note that the modification time of descriptors on disk is usually 0. - // See the comment on descpb.FromSerializedValue for more details. - b, err := descbuilder.FromSerializedValue(&rev) + g.GoCtx(func(ctx context.Context) error { + for revs := range allRevs { + for _, rev := range revs { + id, err := codec.DecodeDescMetadataID(rev.Key) if err != nil { - return nil, err - } - if b == nil { - continue + return err } - desc := b.BuildCreatedMutable() - r.Desc = desc.DescriptorProto() - // Collect the prior IDs of table descriptors, as the ID may have been - // changed during truncate prior to 20.2. - switch t := desc.(type) { - case *tabledesc.Mutable: - if priorIDs != nil && t.ReplacementOf.ID != descpb.InvalidID { - priorIDs[t.ID] = t.ReplacementOf.ID + for _, values := range rev.Values { + r := backuppb.BackupManifest_DescriptorRevision{ID: descpb.ID(id), Time: values.Timestamp} + if len(values.RawBytes) != 0 { + // We update the modification time for the descriptors here with the + // timestamp of the KV row so that we can identify the appropriate + // descriptors to use during restore. + // Note that the modification time of descriptors on disk is usually 0. + // See the comment on descpb.FromSerializedValue for more details. + b, err := descbuilder.FromSerializedValue(&values) + if err != nil { + return err + } + if b == nil { + continue + } + desc := b.BuildCreatedMutable() + r.Desc = desc.DescriptorProto() + // Collect the prior IDs of table descriptors, as the ID may have been + // changed during truncate prior to 20.2. + switch t := desc.(type) { + case *tabledesc.Mutable: + if priorIDs != nil && t.ReplacementOf.ID != descpb.InvalidID { + priorIDs[t.ID] = t.ReplacementOf.ID + } + } } + res = append(res, r) } } - res = append(res, r) } + + return nil + }) + + if err := g.Wait(); err != nil { + return nil, err } return res, nil } diff --git a/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel b/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel index d4f406646d59..db2af26d435a 100644 --- a/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel +++ b/pkg/ccl/changefeedccl/schemafeed/BUILD.bazel @@ -63,6 +63,8 @@ go_test( "//pkg/jobs/jobspb", "//pkg/keys", "//pkg/kv/kvpb", + "//pkg/kv/kvserver", + "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", @@ -70,15 +72,18 @@ go_test( "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/tabledesc", + "//pkg/sql/pgwire", "//pkg/testutils", "//pkg/testutils/datapathutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/util/admission", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/randutil", + "//pkg/util/timeutil", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", "@com_github_gogo_protobuf//proto", diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index 8d6563a259cb..27602393a50d 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -572,15 +572,13 @@ var highPriorityAfter = settings.RegisterDurationSetting( time.Minute, ).WithPublic() -func fetchDescriptorsWithPriorityOverride( +func sendExportRequestWithPriorityOverride( ctx context.Context, st *cluster.Settings, sender kv.Sender, - codec keys.SQLCodec, + span roachpb.Span, startTS, endTS hlc.Timestamp, ) (kvpb.Response, error) { - span := roachpb.Span{Key: codec.TablePrefix(keys.DescriptorTableID)} - span.EndKey = span.Key.PrefixEnd() header := kvpb.Header{Timestamp: endTS} req := &kvpb.ExportRequest{ RequestHeader: kvpb.RequestHeaderFromSpan(span), @@ -588,7 +586,7 @@ func fetchDescriptorsWithPriorityOverride( MVCCFilter: kvpb.MVCCFilter_All, } - fetchDescriptors := func(ctx context.Context) (kvpb.Response, error) { + sendRequest := func(ctx context.Context) (kvpb.Response, error) { resp, pErr := kv.SendWrappedWith(ctx, sender, header, req) if pErr != nil { err := pErr.GoError() @@ -599,7 +597,7 @@ func fetchDescriptorsWithPriorityOverride( priorityAfter := highPriorityAfter.Get(&st.SV) if priorityAfter == 0 { - return fetchDescriptors(ctx) + return sendRequest(ctx) } var resp kvpb.Response @@ -607,7 +605,7 @@ func fetchDescriptorsWithPriorityOverride( ctx, "schema-feed", priorityAfter, func(ctx context.Context) error { var err error - resp, err = fetchDescriptors(ctx) + resp, err = sendRequest(ctx) return err }, ) @@ -616,7 +614,7 @@ func fetchDescriptorsWithPriorityOverride( } if errors.HasType(err, (*contextutil.TimeoutError)(nil)) { header.UserPriority = roachpb.MaxUserPriority - return fetchDescriptors(ctx) + return sendRequest(ctx) } return nil, err } @@ -629,85 +627,97 @@ func (tf *schemaFeed) fetchDescriptorVersions( } codec := tf.leaseMgr.Codec() start := timeutil.Now() - res, err := fetchDescriptorsWithPriorityOverride( - ctx, tf.settings, tf.db.KV().NonTransactionalSender(), codec, startTS, endTS) - if log.ExpensiveLogEnabled(ctx, 2) { - log.Infof(ctx, `fetched table descs (%s,%s] took %s err=%s`, startTS, endTS, timeutil.Since(start), err) - } - if err != nil { - return nil, err - } + span := roachpb.Span{Key: codec.TablePrefix(keys.DescriptorTableID)} + span.EndKey = span.Key.PrefixEnd() tf.mu.Lock() defer tf.mu.Unlock() var descriptors []catalog.Descriptor - found := errors.New(``) - for _, file := range res.(*kvpb.ExportResponse).Files { - if err := func() error { - it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, storage.IterOptions{ - // NB: We assume there will be no MVCC range tombstones here. - KeyTypes: storage.IterKeyTypePointsOnly, - LowerBound: keys.MinKey, - UpperBound: keys.MaxKey, - }) - if err != nil { - return err - } - defer it.Close() - for it.SeekGE(storage.NilKey); ; it.Next() { - if ok, err := it.Valid(); err != nil { - return err - } else if !ok { - return nil - } - k := it.UnsafeKey() - remaining, _, _, err := codec.DecodeIndexPrefix(k.Key) - if err != nil { - return err - } - _, id, err := encoding.DecodeUvarintAscending(remaining) - if err != nil { - return err - } - var origName changefeedbase.StatementTimeName - isTable, _ := tf.targets.EachHavingTableID(descpb.ID(id), func(t changefeedbase.Target) error { - origName = t.StatementTimeName - return found // sentinel error to break the loop - }) - isType := tf.mu.typeDeps.containsType(descpb.ID(id)) - // Check if the descriptor is an interesting table or type. - if !(isTable || isType) { - // Uninteresting descriptor. - continue - } + for { + res, err := sendExportRequestWithPriorityOverride( + ctx, tf.settings, tf.db.KV().NonTransactionalSender(), span, startTS, endTS) + if log.ExpensiveLogEnabled(ctx, 2) { + log.Infof(ctx, `fetched table descs (%s,%s] took %s err=%s`, startTS, endTS, timeutil.Since(start), err) + } + if err != nil { + return nil, err + } - unsafeValue, err := it.UnsafeValue() + found := errors.New(``) + exportResp := res.(*kvpb.ExportResponse) + for _, file := range exportResp.Files { + if err := func() error { + it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, storage.IterOptions{ + // NB: We assume there will be no MVCC range tombstones here. + KeyTypes: storage.IterKeyTypePointsOnly, + LowerBound: keys.MinKey, + UpperBound: keys.MaxKey, + }) if err != nil { return err } - if unsafeValue == nil { - name := origName - if name == "" { - name = changefeedbase.StatementTimeName(fmt.Sprintf("desc(%d)", id)) + defer it.Close() + for it.SeekGE(storage.NilKey); ; it.Next() { + if ok, err := it.Valid(); err != nil { + return err + } else if !ok { + return nil + } + k := it.UnsafeKey() + remaining, _, _, err := codec.DecodeIndexPrefix(k.Key) + if err != nil { + return err + } + _, id, err := encoding.DecodeUvarintAscending(remaining) + if err != nil { + return err + } + var origName changefeedbase.StatementTimeName + isTable, _ := tf.targets.EachHavingTableID(descpb.ID(id), func(t changefeedbase.Target) error { + origName = t.StatementTimeName + return found // sentinel error to break the loop + }) + isType := tf.mu.typeDeps.containsType(descpb.ID(id)) + // Check if the descriptor is an interesting table or type. + if !(isTable || isType) { + // Uninteresting descriptor. + continue } - return errors.Errorf(`"%v" was dropped or truncated`, name) - } - // Unmarshal the descriptor. - value := roachpb.Value{RawBytes: unsafeValue, Timestamp: k.Timestamp} - b, err := descbuilder.FromSerializedValue(&value) - if err != nil { - return err - } - if b != nil && (b.DescriptorType() == catalog.Table || b.DescriptorType() == catalog.Type) { - descriptors = append(descriptors, b.BuildImmutable()) + unsafeValue, err := it.UnsafeValue() + if err != nil { + return err + } + if unsafeValue == nil { + name := origName + if name == "" { + name = changefeedbase.StatementTimeName(fmt.Sprintf("desc(%d)", id)) + } + return errors.Errorf(`"%v" was dropped or truncated`, name) + } + + // Unmarshal the descriptor. + value := roachpb.Value{RawBytes: unsafeValue, Timestamp: k.Timestamp} + b, err := descbuilder.FromSerializedValue(&value) + if err != nil { + return err + } + if b != nil && (b.DescriptorType() == catalog.Table || b.DescriptorType() == catalog.Type) { + descriptors = append(descriptors, b.BuildImmutable()) + } } + }(); err != nil { + return nil, err } - }(); err != nil { - return nil, err } + + if exportResp.ResumeSpan == nil { + break + } + span.Key = exportResp.ResumeSpan.Key } + return descriptors, nil } diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed_test.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed_test.go index ac3ceb32d6c7..c3e2fdb00799 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed_test.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed_test.go @@ -14,17 +14,25 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -169,8 +177,10 @@ func TestIssuesHighPriorityReadsIfBlocked(t *testing.T) { highPriorityAfter.Override(ctx, &s.ClusterSettings().SV, priorityAfter) var responseFiles []kvpb.ExportResponse_File testutils.SucceedsWithin(t, func() error { - resp, err := fetchDescriptorsWithPriorityOverride(ctx, s.ClusterSettings(), - kvDB.NonTransactionalSender(), keys.SystemSQLCodec, hlc.Timestamp{}, s.Clock().Now()) + span := roachpb.Span{Key: keys.SystemSQLCodec.TablePrefix(keys.DescriptorTableID)} + span.EndKey = span.Key.PrefixEnd() + resp, err := sendExportRequestWithPriorityOverride(ctx, s.ClusterSettings(), + kvDB.NonTransactionalSender(), span, hlc.Timestamp{}, s.Clock().Now()) if err != nil { return err } @@ -179,3 +189,78 @@ func TestIssuesHighPriorityReadsIfBlocked(t *testing.T) { }, 10*priorityAfter) require.Less(t, 0, len(responseFiles)) } + +func TestFetchDescriptorVersionsCPULimiterPagination(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var numRequests int + first := true + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Error { + for _, ru := range request.Requests { + if _, ok := ru.GetInner().(*kvpb.ExportRequest); ok { + numRequests++ + h := admission.ElasticCPUWorkHandleFromContext(ctx) + if h == nil { + t.Fatalf("expected context to have CPU work handle") + } + h.TestingOverrideOverLimit(func() (bool, time.Duration) { + if first { + first = false + return true, 0 + } + return false, 0 + }) + } + } + return nil + }, + }}, + }) + defer s.Stopper().Stop(ctx) + sqlServer := s.SQLServer().(*sql.Server) + if len(s.TestTenants()) != 0 { + sqlServer = s.TestTenants()[0].PGServer().(*pgwire.Server).SQLServer + } + + sqlDB := sqlutils.MakeSQLRunner(db) + beforeCreate := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `CREATE TABLE baz (a INT PRIMARY KEY)`) + afterCreate := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + + var targets changefeedbase.Targets + var tableID descpb.ID + var statementTimeName changefeedbase.StatementTimeName + sqlDB.QueryRow(t, "SELECT $1::regclass::int, $1::regclass::string", "foo").Scan( + &tableID, &statementTimeName) + targets.Add(changefeedbase.Target{ + Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY, + TableID: tableID, + FamilyName: "primary", + StatementTimeName: statementTimeName, + }) + sqlDB.QueryRow(t, "SELECT $1::regclass::int, $1::regclass::string", "bar").Scan( + &tableID, &statementTimeName) + targets.Add(changefeedbase.Target{ + Type: jobspb.ChangefeedTargetSpecification_PRIMARY_FAMILY_ONLY, + TableID: tableID, + FamilyName: "primary", + StatementTimeName: statementTimeName, + }) + now := s.Clock().Now() + sf := New(ctx, &sqlServer.GetExecutorConfig().DistSQLSrv.ServerConfig, + TestingAllEventFilter, targets, now, nil, changefeedbase.CanHandle{ + MultipleColumnFamilies: true, + VirtualColumns: true, + }) + scf := sf.(*schemaFeed) + desc, err := scf.fetchDescriptorVersions(ctx, beforeCreate, afterCreate) + require.NoError(t, err) + require.Len(t, desc, 2) + require.Equal(t, 2, numRequests) +} diff --git a/pkg/kv/kvclient/BUILD.bazel b/pkg/kv/kvclient/BUILD.bazel index ea0689a8b271..c916b7b36589 100644 --- a/pkg/kv/kvclient/BUILD.bazel +++ b/pkg/kv/kvclient/BUILD.bazel @@ -1,5 +1,5 @@ load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "kvclient", @@ -20,4 +20,35 @@ go_library( ], ) +go_test( + name = "kvclient_test", + srcs = [ + "main_test.go", + "revision_reader_test.go", + ], + args = ["-test.timeout=295s"], + embed = [":kvclient"], + deps = [ + "//pkg/base", + "//pkg/keys", + "//pkg/kv/kvpb", + "//pkg/kv/kvserver", + "//pkg/roachpb", + "//pkg/security/securityassets", + "//pkg/security/securitytest", + "//pkg/server", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/testutils/testcluster", + "//pkg/util/admission", + "//pkg/util/ctxgroup", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/log", + "//pkg/util/randutil", + "//pkg/util/timeutil", + "@com_github_stretchr_testify//require", + ], +) + get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvclient/main_test.go b/pkg/kv/kvclient/main_test.go new file mode 100644 index 000000000000..a16c21848ef0 --- /dev/null +++ b/pkg/kv/kvclient/main_test.go @@ -0,0 +1,38 @@ +// Copyright 2015 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvclient_test + +import ( + "os" + "testing" + + "github.com/cockroachdb/cockroach/pkg/security/securityassets" + "github.com/cockroachdb/cockroach/pkg/security/securitytest" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/randutil" +) + +//go:generate ../../util/leaktest/add-leaktest.sh *_test.go + +func init() { + securityassets.SetLoader(securitytest.EmbeddedAssets) +} +func TestMain(m *testing.M) { + randutil.SeedForTests() + serverutils.InitTestServerFactory(server.TestServerFactory) + serverutils.InitTestClusterFactory(testcluster.TestClusterFactory) + + code := m.Run() + + os.Exit(code) +} diff --git a/pkg/kv/kvclient/revision_reader.go b/pkg/kv/kvclient/revision_reader.go index f4ed3896e82b..8724b4ebac89 100644 --- a/pkg/kv/kvclient/revision_reader.go +++ b/pkg/kv/kvclient/revision_reader.go @@ -30,60 +30,84 @@ type VersionedValues struct { // GetAllRevisions scans all keys between startKey and endKey getting all // revisions between startTime and endTime. -// TODO(dt): if/when client gets a ScanRevisionsRequest or similar, use that. func GetAllRevisions( - ctx context.Context, db *kv.DB, startKey, endKey roachpb.Key, startTime, endTime hlc.Timestamp, -) ([]VersionedValues, error) { - // TODO(dt): version check. - header := kvpb.Header{Timestamp: endTime} - req := &kvpb.ExportRequest{ - RequestHeader: kvpb.RequestHeader{Key: startKey, EndKey: endKey}, - StartTime: startTime, - MVCCFilter: kvpb.MVCCFilter_All, - } - resp, pErr := kv.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req) - if pErr != nil { - return nil, pErr.GoError() - } - - var res []VersionedValues - for _, file := range resp.(*kvpb.ExportResponse).Files { - iterOpts := storage.IterOptions{ - KeyTypes: storage.IterKeyTypePointsOnly, - LowerBound: file.Span.Key, - UpperBound: file.Span.EndKey, + ctx context.Context, + db *kv.DB, + startKey, endKey roachpb.Key, + startTime, endTime hlc.Timestamp, + allRevs chan []VersionedValues, +) error { + for { + header := kvpb.Header{Timestamp: endTime} + req := &kvpb.ExportRequest{ + RequestHeader: kvpb.RequestHeader{Key: startKey, EndKey: endKey}, + StartTime: startTime, + MVCCFilter: kvpb.MVCCFilter_All, } - iter, err := storage.NewMemSSTIterator(file.SST, true, iterOpts) - if err != nil { - return nil, err + resp, pErr := kv.SendWrappedWith(ctx, db.NonTransactionalSender(), header, req) + if pErr != nil { + return pErr.GoError() } - defer iter.Close() - iter.SeekGE(storage.MVCCKey{Key: startKey}) - for ; ; iter.Next() { - if valid, err := iter.Valid(); !valid || err != nil { - if err != nil { - return nil, err - } - break - } else if iter.UnsafeKey().Key.Compare(endKey) >= 0 { - break + exportResp := resp.(*kvpb.ExportResponse) + var res []VersionedValues + for _, file := range exportResp.Files { + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsOnly, + LowerBound: file.Span.Key, + UpperBound: file.Span.EndKey, } - key := iter.UnsafeKey() - keyCopy := make([]byte, len(key.Key)) - copy(keyCopy, key.Key) - key.Key = keyCopy - v, err := iter.UnsafeValue() + iter, err := storage.NewMemSSTIterator(file.SST, true, iterOpts) if err != nil { - return nil, err + return err } - value := make([]byte, len(v)) - copy(value, v) - if len(res) == 0 || !res[len(res)-1].Key.Equal(key.Key) { - res = append(res, VersionedValues{Key: key.Key}) + defer func() { + if iter != nil { + iter.Close() + } + }() + iter.SeekGE(storage.MVCCKey{Key: startKey}) + + for ; ; iter.Next() { + if valid, err := iter.Valid(); !valid || err != nil { + if err != nil { + return err + } + break + } else if iter.UnsafeKey().Key.Compare(endKey) >= 0 { + break + } + key := iter.UnsafeKey() + keyCopy := make([]byte, len(key.Key)) + copy(keyCopy, key.Key) + key.Key = keyCopy + v, err := iter.UnsafeValue() + if err != nil { + return err + } + value := make([]byte, len(v)) + copy(value, v) + if len(res) == 0 || !res[len(res)-1].Key.Equal(key.Key) { + res = append(res, VersionedValues{Key: key.Key}) + } + res[len(res)-1].Values = append(res[len(res)-1].Values, roachpb.Value{Timestamp: key.Timestamp, RawBytes: value}) } - res[len(res)-1].Values = append(res[len(res)-1].Values, roachpb.Value{Timestamp: key.Timestamp, RawBytes: value}) + + // Close and nil out the iter to release the underlying resources. + iter.Close() + iter = nil + } + + select { + case <-ctx.Done(): + return ctx.Err() + case allRevs <- res: + } + + // Check if the ExportRequest paginated with a resume span. + if exportResp.ResumeSpan == nil { + return nil } + startKey = exportResp.ResumeSpan.Key } - return res, nil } diff --git a/pkg/kv/kvclient/revision_reader_test.go b/pkg/kv/kvclient/revision_reader_test.go new file mode 100644 index 000000000000..52faf18b03e8 --- /dev/null +++ b/pkg/kv/kvclient/revision_reader_test.go @@ -0,0 +1,90 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvclient + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +func TestGetAllRevisionsCPULimiterPagination(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + first := true + var numRequests int + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Error { + for _, ru := range request.Requests { + if _, ok := ru.GetInner().(*kvpb.ExportRequest); ok { + numRequests++ + h := admission.ElasticCPUWorkHandleFromContext(ctx) + if h == nil { + t.Fatalf("expected context to have CPU work handle") + } + h.TestingOverrideOverLimit(func() (bool, time.Duration) { + if first { + first = false + return true, 0 + } + return false, 0 + }) + } + } + return nil + }, + }}, + }) + defer s.Stopper().Stop(ctx) + + sqlDB := sqlutils.MakeSQLRunner(db) + beforeCreate := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `CREATE TABLE bar (b INT PRIMARY KEY)`) + afterCreate := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + + k := s.Codec().TablePrefix(keys.DescriptorTableID) + tableSpan := roachpb.Span{Key: k, EndKey: k.PrefixEnd()} + allRevs := make(chan []VersionedValues) + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + defer close(allRevs) + return GetAllRevisions(ctx, kvDB, tableSpan.Key, tableSpan.EndKey, beforeCreate, afterCreate, allRevs) + }) + var numResponses int + g.GoCtx(func(ctx context.Context) error { + for range allRevs { + numResponses++ + } + return nil + }) + require.NoError(t, g.Wait()) + require.Equal(t, 2, numRequests) + require.Equal(t, 2, numResponses) +} diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index e3e0900a1618..328c201696f9 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -80,6 +80,10 @@ enum ResumeReason { // The DistSender encountered a range boundary and returned a partial result, // in response to return_on_range_boundary. RESUME_RANGE_BOUNDARY = 4; + // The ElasticCPUHandle signalled that the command evaluation exceeded its + // allotted CPU time. It is the callers responsibility to resume from the + // returned resume key. + RESUME_ELASTIC_CPU_LIMIT = 5; } // RequestHeaderPure is not to be used directly. It's generated only for use of diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index c10f21d60ca8..f821c54e7a27 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -56,6 +56,7 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval", visibility = ["//visibility:public"], deps = [ + "//pkg/build", "//pkg/clusterversion", "//pkg/keys", "//pkg/kv/kvpb", diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index 8ff1e07f7e66..4b0fd5620647 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -15,6 +15,7 @@ import ( "fmt" "time" + "github.com/cockroachdb/cockroach/pkg/build" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" @@ -184,7 +185,7 @@ func evalExport( StopMidKey: args.SplitMidKey, } var summary kvpb.BulkOpSummary - var resume storage.MVCCKey + var resumeInfo storage.ExportRequestResumeInfo var fingerprint uint64 var err error if args.ExportFingerprint { @@ -196,7 +197,7 @@ func evalExport( StripValueChecksum: true, } var hasRangeKeys bool - summary, resume, fingerprint, hasRangeKeys, err = storage.MVCCExportFingerprint(ctx, + summary, resumeInfo, fingerprint, hasRangeKeys, err = storage.MVCCExportFingerprint(ctx, cArgs.EvalCtx.ClusterSettings(), reader, opts, destFile) if err != nil { return result.Result{}, maybeAnnotateExceedMaxSizeError(err) @@ -210,7 +211,7 @@ func evalExport( destFile = &storage.MemFile{} } } else { - summary, resume, err = storage.MVCCExportToSST(ctx, cArgs.EvalCtx.ClusterSettings(), reader, + summary, resumeInfo, err = storage.MVCCExportToSST(ctx, cArgs.EvalCtx.ClusterSettings(), reader, opts, destFile) if err != nil { return result.Result{}, maybeAnnotateExceedMaxSizeError(err) @@ -226,18 +227,48 @@ func evalExport( // early exit and thus have a resume key despite // not having data. if summary.DataSize == 0 { - if resume.Key != nil { - start = resume.Key - resumeKeyTS = resume.Timestamp - continue - } else { - break + hasResumeKey := resumeInfo.ResumeKey.Key != nil + + // If we have a resumeKey, it means that we must have hit a resource + // constraint before exporting any data. + if hasResumeKey { + // If we hit our CPU limit we must return the response to the client + // instead of retrying immediately. This will give the scheduler a + // chance to move the goroutine off CPU allowing other processes to make + // progress. The client is responsible for handling pagination of + // ExportRequests. + if resumeInfo.CPUOverlimit { + // Note, since we have not exported any data we do not populate the + // `Files` field of the ExportResponse. + reply.ResumeSpan = &roachpb.Span{ + Key: resumeInfo.ResumeKey.Key, + EndKey: args.EndKey, + } + reply.ResumeReason = kvpb.RESUME_ELASTIC_CPU_LIMIT + break + } else { + // We should never come here. There should be no condition aside from + // resource constraints that results in an early exit without + // exporting any data. Regardless, if we have a resumeKey we + // immediately retry the ExportRequest from that key and timestamp + // onwards. + if !build.IsRelease() { + return result.Result{}, errors.AssertionFailedf("ExportRequest exited without " + + "exporting any data for an unknown reason; programming error") + } + start = resumeInfo.ResumeKey.Key + resumeKeyTS = resumeInfo.ResumeKey.Timestamp + continue + } } + // If we do not have a resumeKey it indicates that there is no data to be + // exported in this span. + break } span := roachpb.Span{Key: start} - if resume.Key != nil { - span.EndKey = resume.Key + if resumeInfo.ResumeKey.Key != nil { + span.EndKey = resumeInfo.ResumeKey.Key } else { span.EndKey = args.EndKey } @@ -250,21 +281,39 @@ func evalExport( // `Fingerprint` for point-keys and the SST file that contains the // rangekeys we encountered during ExportRequest evaluation. exported = kvpb.ExportResponse_File{ - EndKeyTS: resume.Timestamp, + EndKeyTS: resumeInfo.ResumeKey.Timestamp, SST: data, Fingerprint: fingerprint, } } else { exported = kvpb.ExportResponse_File{ Span: span, - EndKeyTS: resume.Timestamp, + EndKeyTS: resumeInfo.ResumeKey.Timestamp, Exported: summary, SST: data, } } reply.Files = append(reply.Files, exported) - start = resume.Key - resumeKeyTS = resume.Timestamp + start = resumeInfo.ResumeKey.Key + resumeKeyTS = resumeInfo.ResumeKey.Timestamp + + // If we paginated because we are over our allotted CPU limit, we must break + // from command evaluation and return a response to the client before + // resuming our export from the resume key. This gives the scheduler a + // chance to take the current goroutine off CPU and allow other processes to + // progress. + if resumeInfo.CPUOverlimit { + if resumeInfo.ResumeKey.Key != nil { + reply.ResumeSpan = &roachpb.Span{ + Key: resumeInfo.ResumeKey.Key, + EndKey: args.EndKey, + } + // TODO(during review): Do we want to add another resume reason + // specifically for CPU preemption. + reply.ResumeReason = kvpb.RESUME_ELASTIC_CPU_LIMIT + } + break + } if h.TargetBytes > 0 { curSizeOfExportedSSTs += summary.DataSize @@ -295,9 +344,9 @@ func evalExport( // the next SST. In the worst case this could lead to us exceeding our // TargetBytes by SST target size + overage. if reply.NumBytes == h.TargetBytes { - if resume.Key != nil { + if resumeInfo.ResumeKey.Key != nil { reply.ResumeSpan = &roachpb.Span{ - Key: resume.Key, + Key: resumeInfo.ResumeKey.Key, EndKey: args.EndKey, } reply.ResumeReason = kvpb.RESUME_BYTE_LIMIT diff --git a/pkg/kv/kvserver/batcheval/cmd_export_test.go b/pkg/kv/kvserver/batcheval/cmd_export_test.go index f67a0743a7c8..36b6eecb07f6 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_export_test.go @@ -591,11 +591,10 @@ func assertEqualKVs( start := storage.MVCCKey{Key: startKey} for start.Key != nil { var sst []byte - var summary kvpb.BulkOpSummary maxSize := uint64(0) prevStart := start sstFile := &storage.MemFile{} - summary, start, err = storage.MVCCExportToSST(ctx, st, e, storage.MVCCExportOptions{ + summary, resumeInfo, err := storage.MVCCExportToSST(ctx, st, e, storage.MVCCExportOptions{ StartKey: start, EndKey: endKey, StartTS: startTime, @@ -606,6 +605,7 @@ func assertEqualKVs( StopMidKey: bool(stopMidKey), }, sstFile) require.NoError(t, err) + start = resumeInfo.ResumeKey sst = sstFile.Data() loaded := loadSST(t, sst, startKey, endKey) // Ensure that the pagination worked properly. diff --git a/pkg/sql/catalog/lease/BUILD.bazel b/pkg/sql/catalog/lease/BUILD.bazel index b719478ea754..84fe16951580 100644 --- a/pkg/sql/catalog/lease/BUILD.bazel +++ b/pkg/sql/catalog/lease/BUILD.bazel @@ -113,6 +113,7 @@ go_test( "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util", + "//pkg/util/admission", "//pkg/util/encoding", "//pkg/util/envutil", "//pkg/util/hlc", diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 6fcc43436482..9780d502231c 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -255,85 +255,102 @@ func getDescriptorsFromStoreForInterval( Timestamp: upperBound.Prev(), } descriptorKey := catalogkeys.MakeDescMetadataKey(codec, id) - requestHeader := kvpb.RequestHeader{ - Key: descriptorKey, - EndKey: descriptorKey.PrefixEnd(), - } - req := &kvpb.ExportRequest{ - RequestHeader: requestHeader, - StartTime: lowerBound.Prev(), - MVCCFilter: kvpb.MVCCFilter_All, - } - - // Export request returns descriptors in decreasing modification time. - res, pErr := kv.SendWrappedWith(ctx, db.NonTransactionalSender(), batchRequestHeader, req) - if pErr != nil { - return nil, errors.Wrapf(pErr.GoError(), "error in retrieving descs between %s, %s", - lowerBound, upperBound) - } - // Unmarshal key span retrieved from export request to construct historical descs. var descriptorsRead []historicalDescriptor - // Keep track of the most recently processed descriptor's modification time to - // set as the expiration for the next descriptor to process. Recall we process - // descriptors in decreasing modification time. - subsequentModificationTime := upperBound - for _, file := range res.(*kvpb.ExportResponse).Files { - if err := func() error { - it, err := kvstorage.NewMemSSTIterator(file.SST, false, /* verify */ - kvstorage.IterOptions{ - // NB: We assume there will be no MVCC range tombstones here. - KeyTypes: kvstorage.IterKeyTypePointsOnly, - LowerBound: keys.MinKey, - UpperBound: keys.MaxKey, - }) - if err != nil { - return err - } - defer it.Close() + for { + requestHeader := kvpb.RequestHeader{ + Key: descriptorKey, + EndKey: descriptorKey.PrefixEnd(), + } + req := &kvpb.ExportRequest{ + RequestHeader: requestHeader, + StartTime: lowerBound.Prev(), + MVCCFilter: kvpb.MVCCFilter_All, + } - // Convert each MVCC key value pair corresponding to the specified - // descriptor ID. - for it.SeekGE(kvstorage.NilKey); ; it.Next() { - if ok, err := it.Valid(); err != nil { - return err - } else if !ok { - return nil - } + // Export request returns descriptors in decreasing modification time. + res, pErr := kv.SendWrappedWith(ctx, db.NonTransactionalSender(), batchRequestHeader, req) + if pErr != nil { + return nil, errors.Wrapf(pErr.GoError(), "error in retrieving descs between %s, %s", + lowerBound, upperBound) + } - // Decode key and value of descriptor. - k := it.UnsafeKey() - descContent, err := it.UnsafeValue() + // Keep track of the most recently processed descriptor's modification time to + // set as the expiration for the next descriptor to process. Recall we process + // descriptors in decreasing modification time. + subsequentModificationTime := upperBound + exportResp := res.(*kvpb.ExportResponse) + for _, file := range exportResp.Files { + if err := func() error { + it, err := kvstorage.NewMemSSTIterator(file.SST, false, /* verify */ + kvstorage.IterOptions{ + // NB: We assume there will be no MVCC range tombstones here. + KeyTypes: kvstorage.IterKeyTypePointsOnly, + LowerBound: keys.MinKey, + UpperBound: keys.MaxKey, + }) if err != nil { return err } - if descContent == nil { - return errors.Wrapf(errors.New("unsafe value error"), "error "+ - "extracting raw bytes of descriptor with key %s modified between "+ - "%s, %s", k.String(), k.Timestamp, subsequentModificationTime) - } + defer func() { + if it != nil { + it.Close() + } + }() - // Construct a plain descriptor. - value := roachpb.Value{RawBytes: descContent, Timestamp: k.Timestamp} - descBuilder, err := descbuilder.FromSerializedValue(&value) - if err != nil { - return err - } + // Convert each MVCC key value pair corresponding to the specified + // descriptor ID. + for it.SeekGE(kvstorage.NilKey); ; it.Next() { + if ok, err := it.Valid(); err != nil { + return err + } else if !ok { + // Close and nil out the iter to release the underlying resources. + it.Close() + it = nil + return nil + } - // Construct a historical descriptor with expiration. - histDesc := historicalDescriptor{ - desc: descBuilder.BuildImmutable(), - expiration: subsequentModificationTime, - } - descriptorsRead = append(descriptorsRead, histDesc) + // Decode key and value of descriptor. + k := it.UnsafeKey() + descContent, err := it.UnsafeValue() + if err != nil { + return err + } + if descContent == nil { + return errors.Wrapf(errors.New("unsafe value error"), "error "+ + "extracting raw bytes of descriptor with key %s modified between "+ + "%s, %s", k.String(), k.Timestamp, subsequentModificationTime) + } + + // Construct a plain descriptor. + value := roachpb.Value{RawBytes: descContent, Timestamp: k.Timestamp} + descBuilder, err := descbuilder.FromSerializedValue(&value) + if err != nil { + return err + } + + // Construct a historical descriptor with expiration. + histDesc := historicalDescriptor{ + desc: descBuilder.BuildImmutable(), + expiration: subsequentModificationTime, + } + descriptorsRead = append(descriptorsRead, histDesc) - // Update the expiration time for next descriptor. - subsequentModificationTime = k.Timestamp + // Update the expiration time for next descriptor. + subsequentModificationTime = k.Timestamp + } + }(); err != nil { + return nil, err } - }(); err != nil { - return nil, err } + + // Check if the ExportRequest paginated with a resume span. + if exportResp.ResumeSpan == nil { + break + } + descriptorKey = exportResp.ResumeSpan.Key } + return descriptorsRead, nil } diff --git a/pkg/sql/catalog/lease/lease_internal_test.go b/pkg/sql/catalog/lease/lease_internal_test.go index 8ae9a144f5ff..5c5a46d95e09 100644 --- a/pkg/sql/catalog/lease/lease_internal_test.go +++ b/pkg/sql/catalog/lease/lease_internal_test.go @@ -24,6 +24,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -31,9 +33,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" "github.com/stretchr/testify/require" @@ -1476,3 +1481,55 @@ func TestLeasedDescriptorByteSizeBaseline(t *testing.T) { }) } } + +// TODO(adityamaru): We do not set SplitMidKey to true for ExportRequests sent +// in getDescriptorsFromStoreForInterval. This disallows the elastic CPU limiter +// from preempting the ExportRequest unless we are on a key boundary. In this +// test we are only exporting revisions of the same key so we should never be +// allowed to paginate because of exhausted CPU tokens. Once we do add support +// for SplitMidKey, we should change the test to verify the correctness of our +// pagination logic. +// +// For now, assert that all revisions are fetched in a single ExportRequest even +// though we are always OverLimit according to the elastic CPU limiter. +func TestGetDescriptorsFromStoreForIntervalCPULimiterPagination(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var numRequests int + s, db, kvDB := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Error { + for _, ru := range request.Requests { + if _, ok := ru.GetInner().(*kvpb.ExportRequest); ok { + numRequests++ + h := admission.ElasticCPUWorkHandleFromContext(ctx) + if h == nil { + t.Fatalf("expected context to have CPU work handle") + } + h.TestingOverrideOverLimit(func() (bool, time.Duration) { + return true, 0 + }) + } + } + return nil + }, + }}, + }) + defer s.Stopper().Stop(ctx) + + sqlDB := sqlutils.MakeSQLRunner(db) + beforeCreate := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `ALTER TABLE foo RENAME TO bar`) + sqlDB.Exec(t, `ALTER TABLE bar RENAME TO baz`) + afterCreate := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + var tableID int + sqlDB.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'baz'`).Scan(&tableID) + descs, err := getDescriptorsFromStoreForInterval(ctx, kvDB, s.Codec(), descpb.ID(tableID), + beforeCreate, afterCreate) + require.NoError(t, err) + require.Len(t, descs, 3) + require.Equal(t, numRequests, 1) +} diff --git a/pkg/sql/delete_preserving_index_test.go b/pkg/sql/delete_preserving_index_test.go index b3d7e169a039..5e5c4b6112f2 100644 --- a/pkg/sql/delete_preserving_index_test.go +++ b/pkg/sql/delete_preserving_index_test.go @@ -46,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/intsets" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -129,10 +130,21 @@ func TestDeletePreservingIndexEncoding(t *testing.T) { prefix := rowenc.MakeIndexKeyPrefix(keys.SystemSQLCodec, tableDesc.GetID(), index.ID) prefixEnd := append(prefix, []byte("\xff")...) - revisions, err := kvclient.GetAllRevisions(context.Background(), kvDB, prefix, prefixEnd, now, end) - if err != nil { - return nil, nil, err - } + revisionsCh := make(chan []kvclient.VersionedValues) + g := ctxgroup.WithContext(ctx) + g.GoCtx(func(ctx context.Context) error { + defer close(revisionsCh) + return kvclient.GetAllRevisions(context.Background(), kvDB, prefix, prefixEnd, now, end, revisionsCh) + }) + + var revisions []kvclient.VersionedValues + g.GoCtx(func(ctx context.Context) error { + for r := range revisionsCh { + revisions = append(revisions, r...) + } + return nil + }) + require.NoError(t, g.Wait()) completeSchemaChange <- struct{}{} finishedSchemaChange.Wait() diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index b1973155166f..7de80b84170e 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -6186,7 +6186,7 @@ func MVCCIsSpanEmpty( // allocations. func MVCCExportFingerprint( ctx context.Context, cs *cluster.Settings, reader Reader, opts MVCCExportOptions, dest io.Writer, -) (kvpb.BulkOpSummary, MVCCKey, uint64, bool, error) { +) (kvpb.BulkOpSummary, ExportRequestResumeInfo, uint64, bool, error) { ctx, span := tracing.ChildSpan(ctx, "storage.MVCCExportFingerprint") defer span.Finish() @@ -6194,18 +6194,19 @@ func MVCCExportFingerprint( fingerprintWriter := makeFingerprintWriter(ctx, hasher, cs, dest, opts.FingerprintOptions) defer fingerprintWriter.Close() - summary, resumeKey, err := mvccExportToWriter(ctx, reader, opts, &fingerprintWriter) - if err != nil { - return kvpb.BulkOpSummary{}, MVCCKey{}, 0, false, err + summary, resumeInfo, exportErr := mvccExportToWriter(ctx, reader, opts, &fingerprintWriter) + if exportErr != nil { + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, 0, false, exportErr } fingerprint, err := fingerprintWriter.Finish() if err != nil { - return kvpb.BulkOpSummary{}, MVCCKey{}, 0, false, err + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, 0, false, err } hasRangeKeys := fingerprintWriter.sstWriter.DataSize != 0 - return summary, resumeKey, fingerprint, hasRangeKeys, err + + return summary, resumeInfo, fingerprint, hasRangeKeys, nil } // MVCCExportToSST exports changes to the keyrange [StartKey, EndKey) over the @@ -6213,29 +6214,34 @@ func MVCCExportFingerprint( // details. func MVCCExportToSST( ctx context.Context, cs *cluster.Settings, reader Reader, opts MVCCExportOptions, dest io.Writer, -) (kvpb.BulkOpSummary, MVCCKey, error) { +) (kvpb.BulkOpSummary, ExportRequestResumeInfo, error) { ctx, span := tracing.ChildSpan(ctx, "storage.MVCCExportToSST") defer span.Finish() sstWriter := MakeBackupSSTWriter(ctx, cs, dest) defer sstWriter.Close() - summary, resumeKey, err := mvccExportToWriter(ctx, reader, opts, &sstWriter) - if err != nil { - return kvpb.BulkOpSummary{}, MVCCKey{}, err + summary, resumeInfo, exportErr := mvccExportToWriter(ctx, reader, opts, &sstWriter) + if exportErr != nil { + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, exportErr } if summary.DataSize == 0 { // If no records were added to the sstable, skip // completing it and return an empty summary. // - // We still propagate the resumeKey because our - // iteration may have been halted because of resource - // limitations before any keys were added to the - // returned SST. - return kvpb.BulkOpSummary{}, resumeKey, nil + // We still propagate the resumeKey because our iteration may have been + // halted because of resource limitations before any keys were added to the + // returned SST. We also propagate the error because an + // ExportOverElasticCPULimitError is used to signal that we should paginate + // and return a response to the client, instead of retrying immediately. + return summary, resumeInfo, exportErr + } + + if err := sstWriter.Finish(); err != nil { + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, err } - return summary, resumeKey, sstWriter.Finish() + return summary, resumeInfo, nil } // ExportWriter is a trimmed down version of the Writer interface. It contains @@ -6268,6 +6274,11 @@ type ExportWriter interface { PutUnversioned(key roachpb.Key, value []byte) error } +type ExportRequestResumeInfo struct { + ResumeKey MVCCKey + CPUOverlimit bool +} + // mvccExportToWriter exports changes to the keyrange [StartKey, EndKey) over // the interval (StartTS, EndTS] to the passed in writer. See MVCCExportOptions // for options. StartTS may be zero. @@ -6295,7 +6306,7 @@ type ExportWriter interface { // responsibility of the caller to Finish() / Close() the passed in writer. func mvccExportToWriter( ctx context.Context, reader Reader, opts MVCCExportOptions, writer ExportWriter, -) (kvpb.BulkOpSummary, MVCCKey, error) { +) (kvpb.BulkOpSummary, ExportRequestResumeInfo, error) { // If we're not exporting all revisions then we can mask point keys below any // MVCC range tombstones, since we don't care about them. var rangeKeyMasking hlc.Timestamp @@ -6379,7 +6390,7 @@ func mvccExportToWriter( iter.SeekGE(opts.StartKey) for { if ok, err := iter.Valid(); err != nil { - return kvpb.BulkOpSummary{}, MVCCKey{}, err + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, err } else if !ok { break } else if iter.NumCollectedIntents() > 0 { @@ -6412,8 +6423,7 @@ func mvccExportToWriter( if isNewKey { resumeKey.Timestamp = hlc.Timestamp{} } - log.VInfof(ctx, 2, "paginating ExportRequest: CPU over-limit") - break + return rows.BulkOpSummary, ExportRequestResumeInfo{ResumeKey: resumeKey, CPUOverlimit: true}, nil } } @@ -6430,13 +6440,13 @@ func mvccExportToWriter( mvccValue, err = decodeExtendedMVCCValue(v.Value) } if err != nil { - return kvpb.BulkOpSummary{}, MVCCKey{}, errors.Wrapf(err, + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "decoding mvcc value %s", v.Value) } // Export only the inner roachpb.Value, not the MVCCValue header. rawValue := mvccValue.Value.RawBytes if err := writer.PutRawMVCCRangeKey(rangeKeys.AsRangeKey(v), rawValue); err != nil { - return kvpb.BulkOpSummary{}, MVCCKey{}, err + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, err } } rows.BulkOpSummary.DataSize += rangeKeysSize @@ -6481,7 +6491,7 @@ func mvccExportToWriter( break } if reachedMaxSize { - return kvpb.BulkOpSummary{}, MVCCKey{}, &ExceedMaxSizeError{ + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, &ExceedMaxSizeError{ reached: newSize, maxSize: opts.MaxSize} } } @@ -6497,7 +6507,7 @@ func mvccExportToWriter( // Process point keys. unsafeValue, err := iter.UnsafeValue() if err != nil { - return kvpb.BulkOpSummary{}, MVCCKey{}, err + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, err } skip := false if unsafeKey.IsValue() { @@ -6506,7 +6516,7 @@ func mvccExportToWriter( mvccValue, err = decodeExtendedMVCCValue(unsafeValue) } if err != nil { - return kvpb.BulkOpSummary{}, MVCCKey{}, errors.Wrapf(err, "decoding mvcc value %s", unsafeKey) + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "decoding mvcc value %s", unsafeKey) } // Export only the inner roachpb.Value, not the MVCCValue header. @@ -6519,7 +6529,7 @@ func mvccExportToWriter( if !skip { if err := rows.Count(unsafeKey.Key); err != nil { - return kvpb.BulkOpSummary{}, MVCCKey{}, errors.Wrapf(err, "decoding %s", unsafeKey) + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "decoding %s", unsafeKey) } curSize := rows.BulkOpSummary.DataSize curSizeWithRangeKeys := curSize + maxRangeKeysSizeIfTruncated(unsafeKey.Key) @@ -6528,7 +6538,7 @@ func mvccExportToWriter( kvSize := int64(len(unsafeKey.Key) + len(unsafeValue)) if curSize == 0 && opts.MaxSize > 0 && kvSize > int64(opts.MaxSize) { // This single key exceeds the MaxSize. Even if we paginate below, this will still fail. - return kvpb.BulkOpSummary{}, MVCCKey{}, &ExceedMaxSizeError{reached: kvSize, maxSize: opts.MaxSize} + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, &ExceedMaxSizeError{reached: kvSize, maxSize: opts.MaxSize} } newSize := curSize + kvSize newSizeWithRangeKeys := curSizeWithRangeKeys + kvSize @@ -6547,18 +6557,18 @@ func mvccExportToWriter( break } if reachedMaxSize { - return kvpb.BulkOpSummary{}, MVCCKey{}, &ExceedMaxSizeError{ + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, &ExceedMaxSizeError{ reached: newSizeWithRangeKeys, maxSize: opts.MaxSize} } if unsafeKey.Timestamp.IsEmpty() { // This should never be an intent since the incremental iterator returns // an error when encountering intents. if err := writer.PutUnversioned(unsafeKey.Key, unsafeValue); err != nil { - return kvpb.BulkOpSummary{}, MVCCKey{}, errors.Wrapf(err, "adding key %s", unsafeKey) + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "adding key %s", unsafeKey) } } else { if err := writer.PutRawMVCC(unsafeKey, unsafeValue); err != nil { - return kvpb.BulkOpSummary{}, MVCCKey{}, errors.Wrapf(err, "adding key %s", unsafeKey) + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "adding key %s", unsafeKey) } } rows.BulkOpSummary.DataSize = newSize @@ -6585,7 +6595,7 @@ func mvccExportToWriter( } } err := iter.TryGetIntentError() - return kvpb.BulkOpSummary{}, MVCCKey{}, err + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, err } // Flush any pending buffered range keys, truncated to the resume key (if @@ -6610,19 +6620,19 @@ func mvccExportToWriter( mvccValue, err = decodeExtendedMVCCValue(v.Value) } if err != nil { - return kvpb.BulkOpSummary{}, MVCCKey{}, errors.Wrapf(err, + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, errors.Wrapf(err, "decoding mvcc value %s", v.Value) } // Export only the inner roachpb.Value, not the MVCCValue header. rawValue := mvccValue.Value.RawBytes if err := writer.PutRawMVCCRangeKey(rangeKeys.AsRangeKey(v), rawValue); err != nil { - return kvpb.BulkOpSummary{}, MVCCKey{}, err + return kvpb.BulkOpSummary{}, ExportRequestResumeInfo{}, err } } rows.BulkOpSummary.DataSize += rangeKeysSize } - return rows.BulkOpSummary, resumeKey, nil + return rows.BulkOpSummary, ExportRequestResumeInfo{ResumeKey: resumeKey}, nil } // MVCCExportOptions contains options for MVCCExportToSST. diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index e06a204eb040..d9ccf98e4917 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -1386,12 +1386,12 @@ func cmdExport(e *evalCtx) error { sstFile := &storage.MemFile{} var summary kvpb.BulkOpSummary - var resume storage.MVCCKey + var resumeInfo storage.ExportRequestResumeInfo var fingerprint uint64 var hasRangeKeys bool var err error if shouldFingerprint { - summary, resume, fingerprint, hasRangeKeys, err = storage.MVCCExportFingerprint(e.ctx, e.st, r, + summary, resumeInfo, fingerprint, hasRangeKeys, err = storage.MVCCExportFingerprint(e.ctx, e.st, r, opts, sstFile) if err != nil { return err @@ -1402,15 +1402,15 @@ func cmdExport(e *evalCtx) error { e.results.buf.Printf("export: %s", &summary) e.results.buf.Print(" fingerprint=true") } else { - summary, resume, err = storage.MVCCExportToSST(e.ctx, e.st, r, opts, sstFile) + summary, resumeInfo, err = storage.MVCCExportToSST(e.ctx, e.st, r, opts, sstFile) if err != nil { return err } e.results.buf.Printf("export: %s", &summary) } - if resume.Key != nil { - e.results.buf.Printf(" resume=%s", resume) + if resumeInfo.ResumeKey.Key != nil { + e.results.buf.Printf(" resume=%s", resumeInfo.ResumeKey) } e.results.buf.Printf("\n") diff --git a/pkg/storage/mvcc_test.go b/pkg/storage/mvcc_test.go index 2f202c82936a..2e01d64c0eda 100644 --- a/pkg/storage/mvcc_test.go +++ b/pkg/storage/mvcc_test.go @@ -6043,7 +6043,7 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) { sstFile := &MemFile{} opts := initialOpts opts.StartKey = startKey - _, resumeKey, err := MVCCExportToSST(ctx, st, engine, opts, sstFile) + _, resumeInfo, err := MVCCExportToSST(ctx, st, engine, opts, sstFile) require.NoError(t, err) chunk := sstToKeys(t, sstFile.Data()) require.LessOrEqual(t, len(chunk), len(expectedData)-dataIndex, "remaining test data") @@ -6051,7 +6051,7 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) { require.True(t, key.Equal(expectedData[dataIndex]), "returned key is not equal") dataIndex++ } - startKey = resumeKey + startKey = resumeInfo.ResumeKey } require.Equal(t, len(expectedData), dataIndex, "not all expected data was consumed") } @@ -6076,11 +6076,9 @@ func TestMVCCExportToSSTExhaustedAtStart(t *testing.T) { latest: false, }) - // Our ElasticCPUWorkHandle will fail on the - // very first call. As a result, the very - // first resturn from MVCCExportToSST will - // actually contain no data but _should_ - // return a resume key. + // Our ElasticCPUWorkHandle will fail on the very first call. As a result, + // the very first return from MVCCExportToSST will actually contain no + // data but _should_ return a resume key. firstCall := true ctx := admission.ContextWithElasticCPUWorkHandle(context.Background(), admission.TestingNewElasticCPUHandleWithCallback(func() (bool, time.Duration) { if firstCall { @@ -6378,7 +6376,7 @@ func TestMVCCExportToSSTSplitMidKey(t *testing.T) { } for !resumeKey.Equal(MVCCKey{}) { dest := &MemFile{} - _, resumeKey, _ = MVCCExportToSST( + _, resumeInfo, err := MVCCExportToSST( ctx, st, engine, MVCCExportOptions{ StartKey: resumeKey, EndKey: key(3).Next(), @@ -6389,6 +6387,8 @@ func TestMVCCExportToSSTSplitMidKey(t *testing.T) { MaxSize: maxSize, StopMidKey: test.stopMidKey, }, dest) + require.NoError(t, err) + resumeKey = resumeInfo.ResumeKey if !resumeKey.Timestamp.IsEmpty() { resumeWithTs++ } @@ -6444,13 +6444,13 @@ func TestMVCCExportFingerprint(t *testing.T) { fingerprint := func(opts MVCCExportOptions, engine Engine) (uint64, []byte, kvpb.BulkOpSummary, MVCCKey) { dest := &MemFile{} var err error - res, resumeKey, fingerprint, hasRangeKeys, err := MVCCExportFingerprint( + res, resumeInfo, fingerprint, hasRangeKeys, err := MVCCExportFingerprint( ctx, st, engine, opts, dest) require.NoError(t, err) if !hasRangeKeys { dest = &MemFile{} } - return fingerprint, dest.Data(), res, resumeKey + return fingerprint, dest.Data(), res, resumeInfo.ResumeKey } // verifyFingerprintAgainstOracle uses the `fingerprintOracle` to compute a diff --git a/pkg/util/admission/elastic_cpu_work_handle.go b/pkg/util/admission/elastic_cpu_work_handle.go index caa561952fef..85c5561304b5 100644 --- a/pkg/util/admission/elastic_cpu_work_handle.go +++ b/pkg/util/admission/elastic_cpu_work_handle.go @@ -117,6 +117,12 @@ func (h *ElasticCPUWorkHandle) overLimitInner() (overLimit bool, difference time return false, h.differenceWithAllottedAtLastCheck } +// TestingOverrideOverLimit allows tests to override the behaviour of +// OverLimit(). +func (h *ElasticCPUWorkHandle) TestingOverrideOverLimit(f func() (bool, time.Duration)) { + h.testingOverrideOverLimit = f +} + type handleKey struct{} // ContextWithElasticCPUWorkHandle returns a Context wrapping the supplied elastic