From 83a3724a534324d97130b3c4dd4408c03192ba2a Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 2 Mar 2023 13:18:37 +0000 Subject: [PATCH] kv: opt into Elastic CPU limiting resume spans In #97886, we changed ExportRequest such that it returns resume spans that result from the elastic CPU limiter all the way to the caller. This has at least two problems: 1) In a mixed-version state, the caller might not yet know how to handle resume spans. This could result in incomplete responses erroneously being used as if they were full responses. 2) The DistSender inspects a request to determine whether it may stop early. If it shouldn't be able to stop early, then the request is split up, possibly sent in parallel, and all responses are combined. The code which combines responses asserts that neither side has a resume span. As a result, we've seen failures such as crdb_internal.fingerprint(): combining /Tenant/2/Table/106/1/-{8403574544142222370/0-37656332809536692} with /Tenant/{2/Table/106/1/436440321206557763/0-3} since the change was made. Here, we add a new request header field to allow callers to indicate whether they are prepared to accept resume spans. Further, we add that new field into the logic in DistSender which decides how to process requests. Now, if ReturnElasticCPUResumeSpans is set, the DistSender will no longer send requests in parallel and knows to expect a possible early exit. The downside here is that crdb_internal.fingerprint won't have its requests sent in parallel. Release note: None Epic: none --- pkg/ccl/backupccl/backup_processor.go | 5 +- .../changefeedccl/schemafeed/schema_feed.go | 5 +- pkg/kv/kvclient/kvcoord/dist_sender.go | 5 +- pkg/kv/kvclient/revision_reader.go | 5 +- pkg/kv/kvpb/api.proto | 9 +++ pkg/kv/kvserver/batcheval/BUILD.bazel | 3 + pkg/kv/kvserver/batcheval/cmd_export.go | 26 +++---- pkg/kv/kvserver/batcheval/cmd_export_test.go | 69 +++++++++++++++++++ pkg/sql/catalog/lease/lease.go | 3 +- pkg/sql/sem/builtins/builtins.go | 4 ++ 10 files changed, 115 insertions(+), 19 deletions(-) diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index abaf65c9dcd5..439789a6155a 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -399,8 +399,9 @@ func runBackupProcessor( // We set the DistSender response target bytes field to a sentinel // value. The sentinel value of 1 forces the ExportRequest to paginate // after creating a single SST. - TargetBytes: 1, - Timestamp: span.end, + TargetBytes: 1, + Timestamp: span.end, + ReturnElasticCPUResumeSpans: true, } if priority { // This re-attempt is reading far enough in the past that we just want diff --git a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go index 27602393a50d..a999b74e731b 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schema_feed.go +++ b/pkg/ccl/changefeedccl/schemafeed/schema_feed.go @@ -579,7 +579,10 @@ func sendExportRequestWithPriorityOverride( span roachpb.Span, startTS, endTS hlc.Timestamp, ) (kvpb.Response, error) { - header := kvpb.Header{Timestamp: endTS} + header := kvpb.Header{ + Timestamp: endTS, + ReturnElasticCPUResumeSpans: true, + } req := &kvpb.ExportRequest{ RequestHeader: kvpb.RequestHeaderFromSpan(span), StartTime: startTS, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index a0300f31ac5a..262bcca1382c 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1357,7 +1357,8 @@ func (ds *DistSender) divideAndSendBatchToRanges( }() canParallelize := ba.Header.MaxSpanRequestKeys == 0 && ba.Header.TargetBytes == 0 && - !ba.Header.ReturnOnRangeBoundary + !ba.Header.ReturnOnRangeBoundary && + !ba.Header.ReturnElasticCPUResumeSpans if ba.IsSingleCheckConsistencyRequest() { // Don't parallelize full checksum requests as they have to touch the // entirety of each replica of each range they touch. @@ -1447,7 +1448,7 @@ func (ds *DistSender) divideAndSendBatchToRanges( ba.UpdateTxn(resp.reply.Txn) } - mightStopEarly := ba.MaxSpanRequestKeys > 0 || ba.TargetBytes > 0 || ba.ReturnOnRangeBoundary + mightStopEarly := ba.MaxSpanRequestKeys > 0 || ba.TargetBytes > 0 || ba.ReturnOnRangeBoundary || ba.ReturnElasticCPUResumeSpans // Check whether we've received enough responses to exit query loop. if mightStopEarly { var replyKeys int64 diff --git a/pkg/kv/kvclient/revision_reader.go b/pkg/kv/kvclient/revision_reader.go index 8724b4ebac89..3b6e8e84ad11 100644 --- a/pkg/kv/kvclient/revision_reader.go +++ b/pkg/kv/kvclient/revision_reader.go @@ -38,7 +38,10 @@ func GetAllRevisions( allRevs chan []VersionedValues, ) error { for { - header := kvpb.Header{Timestamp: endTime} + header := kvpb.Header{ + Timestamp: endTime, + ReturnElasticCPUResumeSpans: true, + } req := &kvpb.ExportRequest{ RequestHeader: kvpb.RequestHeader{Key: startKey, EndKey: endKey}, StartTime: startTime, diff --git a/pkg/kv/kvpb/api.proto b/pkg/kv/kvpb/api.proto index d7b6ee6e7e78..c4d7436ad0e8 100644 --- a/pkg/kv/kvpb/api.proto +++ b/pkg/kv/kvpb/api.proto @@ -2635,6 +2635,15 @@ message Header { // each Scan and ReverseScan. sql.sqlbase.IndexFetchSpec index_fetch_spec = 29; + // ReturnElasticCPUResumeSpans, if set, indicates that the caller + // expects early-termination of requests based on the Elastic CPU + // limiter. + // + // Resume spans returned because of the underlying request being + // rate-limited by the ElasticCPU limiter will have a reason of + // RESUME_ELASTIC_CPU_LIMIT. + bool return_elastic_cpu_resume_spans = 30 [(gogoproto.customname) = "ReturnElasticCPUResumeSpans"]; + reserved 7, 10, 12, 14, 20; } diff --git a/pkg/kv/kvserver/batcheval/BUILD.bazel b/pkg/kv/kvserver/batcheval/BUILD.bazel index a66b5e087097..7d0d1155d4b6 100644 --- a/pkg/kv/kvserver/batcheval/BUILD.bazel +++ b/pkg/kv/kvserver/batcheval/BUILD.bazel @@ -150,9 +150,11 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/settings/cluster", + "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/descpb", + "//pkg/sql/catalog/desctestutils", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", "//pkg/storage", @@ -165,6 +167,7 @@ go_test( "//pkg/testutils/storageutils", "//pkg/testutils/testcluster", "//pkg/util", + "//pkg/util/admission", "//pkg/util/encoding", "//pkg/util/hlc", "//pkg/util/leaktest", diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index ef9bcce22193..cf6cbfbb985a 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -238,7 +238,7 @@ func evalExport( // chance to move the goroutine off CPU allowing other processes to make // progress. The client is responsible for handling pagination of // ExportRequests. - if resumeInfo.CPUOverlimit { + if resumeInfo.CPUOverlimit && h.ReturnElasticCPUResumeSpans { // Note, since we have not exported any data we do not populate the // `Files` field of the ExportResponse. reply.ResumeSpan = &roachpb.Span{ @@ -248,14 +248,18 @@ func evalExport( reply.ResumeReason = kvpb.RESUME_ELASTIC_CPU_LIMIT break } else { - // We should never come here. There should be no condition aside from - // resource constraints that results in an early exit without - // exporting any data. Regardless, if we have a resumeKey we - // immediately retry the ExportRequest from that key and timestamp - // onwards. - if !build.IsRelease() { - return result.Result{}, errors.AssertionFailedf("ExportRequest exited without " + - "exporting any data for an unknown reason; programming error") + if !resumeInfo.CPUOverlimit { + // We should never come here. There should be no condition aside from + // resource constraints that results in an early exit without + // exporting any data. Regardless, if we have a resumeKey we + // immediately retry the ExportRequest from that key and timestamp + // onwards. + if !build.IsRelease() { + return result.Result{}, errors.AssertionFailedf("ExportRequest exited without " + + "exporting any data for an unknown reason; programming error") + } else { + log.Warningf(ctx, "unexpected resume span from ExportRequest without exporting any data for an unknown reason: %v", resumeInfo) + } } start = resumeInfo.ResumeKey.Key resumeKeyTS = resumeInfo.ResumeKey.Timestamp @@ -303,14 +307,12 @@ func evalExport( // resuming our export from the resume key. This gives the scheduler a // chance to take the current goroutine off CPU and allow other processes to // progress. - if resumeInfo.CPUOverlimit { + if resumeInfo.CPUOverlimit && h.ReturnElasticCPUResumeSpans { if resumeInfo.ResumeKey.Key != nil { reply.ResumeSpan = &roachpb.Span{ Key: resumeInfo.ResumeKey.Key, EndKey: args.EndKey, } - // TODO(during review): Do we want to add another resume reason - // specifically for CPU preemption. reply.ResumeReason = kvpb.RESUME_ELASTIC_CPU_LIMIT } break diff --git a/pkg/kv/kvserver/batcheval/cmd_export_test.go b/pkg/kv/kvserver/batcheval/cmd_export_test.go index c4440fa57408..87754c46a133 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_export_test.go @@ -19,22 +19,28 @@ import ( "sort" "strconv" "testing" + "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -415,6 +421,69 @@ INTO }) } +func TestExportRequestWithCPULimitResumeSpans(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + rng, _ := randutil.NewTestRand() + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + UseDatabase: "test", + Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{ + TestingRequestFilter: func(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Error { + for _, ru := range request.Requests { + if _, ok := ru.GetInner().(*kvpb.ExportRequest); ok { + h := admission.ElasticCPUWorkHandleFromContext(ctx) + if h == nil { + t.Fatalf("expected context to have CPU work handle") + } + h.TestingOverrideOverLimit(func() (bool, time.Duration) { + if rng.Float32() > 0.5 { + return true, 0 + } + return false, 0 + }) + } + } + return nil + }, + }}, + }, + }) + + defer tc.Stopper().Stop(context.Background()) + + s := tc.TenantOrServer(0) + sqlDB := tc.Conns[0] + db := sqlutils.MakeSQLRunner(sqlDB) + execCfg := s.ExecutorConfig().(sql.ExecutorConfig) + kvDB := s.DB() + + const ( + initRows = 1000 + splits = 100 + ) + db.Exec(t, "CREATE DATABASE IF NOT EXISTS test") + db.Exec(t, "CREATE TABLE test (k PRIMARY KEY) AS SELECT generate_series(1, $1)", initRows) + db.Exec(t, "ALTER TABLE test SPLIT AT (select i*10 from generate_series(1, $1) as i)", initRows/splits) + db.Exec(t, "ALTER TABLE test SCATTER") + + desc := desctestutils.TestingGetPublicTableDescriptor(kvDB, execCfg.Codec, "test", "test") + span := desc.TableSpan(execCfg.Codec) + + req := &kvpb.ExportRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: span.Key, + EndKey: span.EndKey}, + } + header := kvpb.Header{ + ReturnElasticCPUResumeSpans: true, + } + _, err := kv.SendWrappedWith(ctx, kvDB.NonTransactionalSender(), header, req) + require.NoError(t, err.GoError()) +} + func TestExportGCThreshold(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 9780d502231c..a99d68f904fa 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -252,7 +252,8 @@ func getDescriptorsFromStoreForInterval( // Create an export request (1 kv call) for all descriptors for given // descriptor ID written during the interval [timestamp, endTimestamp). batchRequestHeader := kvpb.Header{ - Timestamp: upperBound.Prev(), + Timestamp: upperBound.Prev(), + ReturnElasticCPUResumeSpans: true, } descriptorKey := catalogkeys.MakeDescMetadataKey(codec, id) // Unmarshal key span retrieved from export request to construct historical descs. diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index cb2840303a5b..aafb12fd7cd6 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -7563,6 +7563,10 @@ expires until the statement bundle is collected`, // specially in the future so as to allow the fingerprint to complete // in the face of intents. WaitPolicy: lock.WaitPolicy_Error, + // TODO(ssd): Setting this disables async sending in + // DistSender so it likely substantially impacts + // performance. + ReturnElasticCPUResumeSpans: true, } admissionHeader := kvpb.AdmissionHeader{ Priority: int32(admissionpb.BulkNormalPri),