Skip to content

Commit

Permalink
Merge #97916
Browse files Browse the repository at this point in the history
97916: kv: opt into Elastic CPU limiting resume spans r=adityamaru,arulajmani a=stevendanna

In #96691, we changed ExportRequest such that it returns resume spans that result from the elastic CPU limiter all the way to the caller.

This has at least two problems:

1) In a mixed-version state, the caller might not yet know how to
   handle resume spans. This could result in incomplete responses
   erroneous being used as if they were full responses.

2) The DistSender inspects a request to determine whether it may stop
   early. If it shouldn't be able to stop early, then the request is
   split up, possibly sent in parallel, and all responses are
   combined. The code which combines responses asserts that neither side
   has a resume span.  As a result, we've seen failures such as

        crdb_internal.fingerprint(): combining
        /Tenant/2/Table/106/1/-{8403574544142222370/0-37656332809536692}
        with /Tenant/{2/Table/106/1/436440321206557763/0-3}

   since the change was made.

Here, we add a new request header field to allow callers to indicate whether they are prepared to accept resume spans. Further, we add that new field into the logic in DistSender which decides how to process requests.

The downside here is that crdb_internal.fingerprint won't have its requests sent in parallel.

Release note: None

Fixes #97886,  #97903

Epic: none

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Mar 7, 2023
2 parents fa0440b + 83a3724 commit 0dd1e91
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 19 deletions.
5 changes: 3 additions & 2 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,9 @@ func runBackupProcessor(
// We set the DistSender response target bytes field to a sentinel
// value. The sentinel value of 1 forces the ExportRequest to paginate
// after creating a single SST.
TargetBytes: 1,
Timestamp: span.end,
TargetBytes: 1,
Timestamp: span.end,
ReturnElasticCPUResumeSpans: true,
}
if priority {
// This re-attempt is reading far enough in the past that we just want
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/changefeedccl/schemafeed/schema_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,10 @@ func sendExportRequestWithPriorityOverride(
span roachpb.Span,
startTS, endTS hlc.Timestamp,
) (kvpb.Response, error) {
header := kvpb.Header{Timestamp: endTS}
header := kvpb.Header{
Timestamp: endTS,
ReturnElasticCPUResumeSpans: true,
}
req := &kvpb.ExportRequest{
RequestHeader: kvpb.RequestHeaderFromSpan(span),
StartTime: startTS,
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,8 @@ func (ds *DistSender) divideAndSendBatchToRanges(
}()

canParallelize := ba.Header.MaxSpanRequestKeys == 0 && ba.Header.TargetBytes == 0 &&
!ba.Header.ReturnOnRangeBoundary
!ba.Header.ReturnOnRangeBoundary &&
!ba.Header.ReturnElasticCPUResumeSpans
if ba.IsSingleCheckConsistencyRequest() {
// Don't parallelize full checksum requests as they have to touch the
// entirety of each replica of each range they touch.
Expand Down Expand Up @@ -1447,7 +1448,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
ba.UpdateTxn(resp.reply.Txn)
}

mightStopEarly := ba.MaxSpanRequestKeys > 0 || ba.TargetBytes > 0 || ba.ReturnOnRangeBoundary
mightStopEarly := ba.MaxSpanRequestKeys > 0 || ba.TargetBytes > 0 || ba.ReturnOnRangeBoundary || ba.ReturnElasticCPUResumeSpans
// Check whether we've received enough responses to exit query loop.
if mightStopEarly {
var replyKeys int64
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvclient/revision_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ func GetAllRevisions(
allRevs chan []VersionedValues,
) error {
for {
header := kvpb.Header{Timestamp: endTime}
header := kvpb.Header{
Timestamp: endTime,
ReturnElasticCPUResumeSpans: true,
}
req := &kvpb.ExportRequest{
RequestHeader: kvpb.RequestHeader{Key: startKey, EndKey: endKey},
StartTime: startTime,
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2635,6 +2635,15 @@ message Header {
// each Scan and ReverseScan.
sql.sqlbase.IndexFetchSpec index_fetch_spec = 29;

// ReturnElasticCPUResumeSpans, if set, indicates that the caller
// expects early-termination of requests based on the Elastic CPU
// limiter.
//
// Resume spans returned because of the underlying request being
// rate-limited by the ElasticCPU limiter will have a reason of
// RESUME_ELASTIC_CPU_LIMIT.
bool return_elastic_cpu_resume_spans = 30 [(gogoproto.customname) = "ReturnElasticCPUResumeSpans"];

reserved 7, 10, 12, 14, 20;
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/storage",
Expand All @@ -165,6 +167,7 @@ go_test(
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
Expand Down
26 changes: 14 additions & 12 deletions pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func evalExport(
// chance to move the goroutine off CPU allowing other processes to make
// progress. The client is responsible for handling pagination of
// ExportRequests.
if resumeInfo.CPUOverlimit {
if resumeInfo.CPUOverlimit && h.ReturnElasticCPUResumeSpans {
// Note, since we have not exported any data we do not populate the
// `Files` field of the ExportResponse.
reply.ResumeSpan = &roachpb.Span{
Expand All @@ -248,14 +248,18 @@ func evalExport(
reply.ResumeReason = kvpb.RESUME_ELASTIC_CPU_LIMIT
break
} else {
// We should never come here. There should be no condition aside from
// resource constraints that results in an early exit without
// exporting any data. Regardless, if we have a resumeKey we
// immediately retry the ExportRequest from that key and timestamp
// onwards.
if !build.IsRelease() {
return result.Result{}, errors.AssertionFailedf("ExportRequest exited without " +
"exporting any data for an unknown reason; programming error")
if !resumeInfo.CPUOverlimit {
// We should never come here. There should be no condition aside from
// resource constraints that results in an early exit without
// exporting any data. Regardless, if we have a resumeKey we
// immediately retry the ExportRequest from that key and timestamp
// onwards.
if !build.IsRelease() {
return result.Result{}, errors.AssertionFailedf("ExportRequest exited without " +
"exporting any data for an unknown reason; programming error")
} else {
log.Warningf(ctx, "unexpected resume span from ExportRequest without exporting any data for an unknown reason: %v", resumeInfo)
}
}
start = resumeInfo.ResumeKey.Key
resumeKeyTS = resumeInfo.ResumeKey.Timestamp
Expand Down Expand Up @@ -303,14 +307,12 @@ func evalExport(
// resuming our export from the resume key. This gives the scheduler a
// chance to take the current goroutine off CPU and allow other processes to
// progress.
if resumeInfo.CPUOverlimit {
if resumeInfo.CPUOverlimit && h.ReturnElasticCPUResumeSpans {
if resumeInfo.ResumeKey.Key != nil {
reply.ResumeSpan = &roachpb.Span{
Key: resumeInfo.ResumeKey.Key,
EndKey: args.EndKey,
}
// TODO(during review): Do we want to add another resume reason
// specifically for CPU preemption.
reply.ResumeReason = kvpb.RESUME_ELASTIC_CPU_LIMIT
}
break
Expand Down
69 changes: 69 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,28 @@ import (
"sort"
"strconv"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -415,6 +421,69 @@ INTO
})
}

func TestExportRequestWithCPULimitResumeSpans(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
rng, _ := randutil.NewTestRand()
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
UseDatabase: "test",
Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Error {
for _, ru := range request.Requests {
if _, ok := ru.GetInner().(*kvpb.ExportRequest); ok {
h := admission.ElasticCPUWorkHandleFromContext(ctx)
if h == nil {
t.Fatalf("expected context to have CPU work handle")
}
h.TestingOverrideOverLimit(func() (bool, time.Duration) {
if rng.Float32() > 0.5 {
return true, 0
}
return false, 0
})
}
}
return nil
},
}},
},
})

defer tc.Stopper().Stop(context.Background())

s := tc.TenantOrServer(0)
sqlDB := tc.Conns[0]
db := sqlutils.MakeSQLRunner(sqlDB)
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
kvDB := s.DB()

const (
initRows = 1000
splits = 100
)
db.Exec(t, "CREATE DATABASE IF NOT EXISTS test")
db.Exec(t, "CREATE TABLE test (k PRIMARY KEY) AS SELECT generate_series(1, $1)", initRows)
db.Exec(t, "ALTER TABLE test SPLIT AT (select i*10 from generate_series(1, $1) as i)", initRows/splits)
db.Exec(t, "ALTER TABLE test SCATTER")

desc := desctestutils.TestingGetPublicTableDescriptor(kvDB, execCfg.Codec, "test", "test")
span := desc.TableSpan(execCfg.Codec)

req := &kvpb.ExportRequest{
RequestHeader: kvpb.RequestHeader{
Key: span.Key,
EndKey: span.EndKey},
}
header := kvpb.Header{
ReturnElasticCPUResumeSpans: true,
}
_, err := kv.SendWrappedWith(ctx, kvDB.NonTransactionalSender(), header, req)
require.NoError(t, err.GoError())
}

func TestExportGCThreshold(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ func getDescriptorsFromStoreForInterval(
// Create an export request (1 kv call) for all descriptors for given
// descriptor ID written during the interval [timestamp, endTimestamp).
batchRequestHeader := kvpb.Header{
Timestamp: upperBound.Prev(),
Timestamp: upperBound.Prev(),
ReturnElasticCPUResumeSpans: true,
}
descriptorKey := catalogkeys.MakeDescMetadataKey(codec, id)
// Unmarshal key span retrieved from export request to construct historical descs.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sem/builtins/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -7563,6 +7563,10 @@ expires until the statement bundle is collected`,
// specially in the future so as to allow the fingerprint to complete
// in the face of intents.
WaitPolicy: lock.WaitPolicy_Error,
// TODO(ssd): Setting this disables async sending in
// DistSender so it likely substantially impacts
// performance.
ReturnElasticCPUResumeSpans: true,
}
admissionHeader := kvpb.AdmissionHeader{
Priority: int32(admissionpb.BulkNormalPri),
Expand Down

0 comments on commit 0dd1e91

Please sign in to comment.