Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: opt into Elastic CPU limiting resume spans #97916

Merged
merged 1 commit into from
Mar 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,8 +399,9 @@ func runBackupProcessor(
// We set the DistSender response target bytes field to a sentinel
// value. The sentinel value of 1 forces the ExportRequest to paginate
// after creating a single SST.
TargetBytes: 1,
Timestamp: span.end,
TargetBytes: 1,
Timestamp: span.end,
ReturnElasticCPUResumeSpans: true,
}
if priority {
// This re-attempt is reading far enough in the past that we just want
Expand Down
5 changes: 4 additions & 1 deletion pkg/ccl/changefeedccl/schemafeed/schema_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,7 +579,10 @@ func sendExportRequestWithPriorityOverride(
span roachpb.Span,
startTS, endTS hlc.Timestamp,
) (kvpb.Response, error) {
header := kvpb.Header{Timestamp: endTS}
header := kvpb.Header{
Timestamp: endTS,
ReturnElasticCPUResumeSpans: true,
}
req := &kvpb.ExportRequest{
RequestHeader: kvpb.RequestHeaderFromSpan(span),
StartTime: startTS,
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1357,7 +1357,8 @@ func (ds *DistSender) divideAndSendBatchToRanges(
}()

canParallelize := ba.Header.MaxSpanRequestKeys == 0 && ba.Header.TargetBytes == 0 &&
!ba.Header.ReturnOnRangeBoundary
!ba.Header.ReturnOnRangeBoundary &&
!ba.Header.ReturnElasticCPUResumeSpans
if ba.IsSingleCheckConsistencyRequest() {
// Don't parallelize full checksum requests as they have to touch the
// entirety of each replica of each range they touch.
Expand Down Expand Up @@ -1447,7 +1448,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
ba.UpdateTxn(resp.reply.Txn)
}

mightStopEarly := ba.MaxSpanRequestKeys > 0 || ba.TargetBytes > 0 || ba.ReturnOnRangeBoundary
mightStopEarly := ba.MaxSpanRequestKeys > 0 || ba.TargetBytes > 0 || ba.ReturnOnRangeBoundary || ba.ReturnElasticCPUResumeSpans
// Check whether we've received enough responses to exit query loop.
if mightStopEarly {
var replyKeys int64
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvclient/revision_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ func GetAllRevisions(
allRevs chan []VersionedValues,
) error {
for {
header := kvpb.Header{Timestamp: endTime}
header := kvpb.Header{
Timestamp: endTime,
ReturnElasticCPUResumeSpans: true,
}
req := &kvpb.ExportRequest{
RequestHeader: kvpb.RequestHeader{Key: startKey, EndKey: endKey},
StartTime: startTime,
Expand Down
9 changes: 9 additions & 0 deletions pkg/kv/kvpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2635,6 +2635,15 @@ message Header {
// each Scan and ReverseScan.
sql.sqlbase.IndexFetchSpec index_fetch_spec = 29;

// ReturnElasticCPUResumeSpans, if set, indicates that the caller
// expects early-termination of requests based on the Elastic CPU
// limiter.
//
// Resume spans returned because of the underlying request being
// rate-limited by the ElasticCPU limiter will have a reason of
// RESUME_ELASTIC_CPU_LIMIT.
bool return_elastic_cpu_resume_spans = 30 [(gogoproto.customname) = "ReturnElasticCPUResumeSpans"];

reserved 7, 10, 12, 14, 20;
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvserver/batcheval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,11 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/bootstrap",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/rowenc",
"//pkg/sql/sem/tree",
"//pkg/storage",
Expand All @@ -165,6 +167,7 @@ go_test(
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/leaktest",
Expand Down
26 changes: 14 additions & 12 deletions pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func evalExport(
// chance to move the goroutine off CPU allowing other processes to make
// progress. The client is responsible for handling pagination of
// ExportRequests.
if resumeInfo.CPUOverlimit {
if resumeInfo.CPUOverlimit && h.ReturnElasticCPUResumeSpans {
// Note, since we have not exported any data we do not populate the
// `Files` field of the ExportResponse.
reply.ResumeSpan = &roachpb.Span{
Expand All @@ -248,14 +248,18 @@ func evalExport(
reply.ResumeReason = kvpb.RESUME_ELASTIC_CPU_LIMIT
break
} else {
// We should never come here. There should be no condition aside from
// resource constraints that results in an early exit without
// exporting any data. Regardless, if we have a resumeKey we
// immediately retry the ExportRequest from that key and timestamp
// onwards.
if !build.IsRelease() {
return result.Result{}, errors.AssertionFailedf("ExportRequest exited without " +
"exporting any data for an unknown reason; programming error")
if !resumeInfo.CPUOverlimit {
// We should never come here. There should be no condition aside from
// resource constraints that results in an early exit without
// exporting any data. Regardless, if we have a resumeKey we
// immediately retry the ExportRequest from that key and timestamp
// onwards.
if !build.IsRelease() {
return result.Result{}, errors.AssertionFailedf("ExportRequest exited without " +
"exporting any data for an unknown reason; programming error")
} else {
log.Warningf(ctx, "unexpected resume span from ExportRequest without exporting any data for an unknown reason: %v", resumeInfo)
}
}
start = resumeInfo.ResumeKey.Key
resumeKeyTS = resumeInfo.ResumeKey.Timestamp
Expand Down Expand Up @@ -303,14 +307,12 @@ func evalExport(
// resuming our export from the resume key. This gives the scheduler a
// chance to take the current goroutine off CPU and allow other processes to
// progress.
if resumeInfo.CPUOverlimit {
if resumeInfo.CPUOverlimit && h.ReturnElasticCPUResumeSpans {
if resumeInfo.ResumeKey.Key != nil {
reply.ResumeSpan = &roachpb.Span{
Key: resumeInfo.ResumeKey.Key,
EndKey: args.EndKey,
}
// TODO(during review): Do we want to add another resume reason
// specifically for CPU preemption.
reply.ResumeReason = kvpb.RESUME_ELASTIC_CPU_LIMIT
}
break
Expand Down
69 changes: 69 additions & 0 deletions pkg/kv/kvserver/batcheval/cmd_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,28 @@ import (
"sort"
"strconv"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -415,6 +421,69 @@ INTO
})
}

func TestExportRequestWithCPULimitResumeSpans(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
rng, _ := randutil.NewTestRand()
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
UseDatabase: "test",
Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
TestingRequestFilter: func(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Error {
for _, ru := range request.Requests {
if _, ok := ru.GetInner().(*kvpb.ExportRequest); ok {
h := admission.ElasticCPUWorkHandleFromContext(ctx)
if h == nil {
t.Fatalf("expected context to have CPU work handle")
}
h.TestingOverrideOverLimit(func() (bool, time.Duration) {
if rng.Float32() > 0.5 {
return true, 0
}
return false, 0
})
}
}
return nil
},
}},
},
})

defer tc.Stopper().Stop(context.Background())

s := tc.TenantOrServer(0)
sqlDB := tc.Conns[0]
db := sqlutils.MakeSQLRunner(sqlDB)
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
kvDB := s.DB()

const (
initRows = 1000
splits = 100
)
db.Exec(t, "CREATE DATABASE IF NOT EXISTS test")
db.Exec(t, "CREATE TABLE test (k PRIMARY KEY) AS SELECT generate_series(1, $1)", initRows)
db.Exec(t, "ALTER TABLE test SPLIT AT (select i*10 from generate_series(1, $1) as i)", initRows/splits)
db.Exec(t, "ALTER TABLE test SCATTER")

desc := desctestutils.TestingGetPublicTableDescriptor(kvDB, execCfg.Codec, "test", "test")
span := desc.TableSpan(execCfg.Codec)

req := &kvpb.ExportRequest{
RequestHeader: kvpb.RequestHeader{
Key: span.Key,
EndKey: span.EndKey},
}
header := kvpb.Header{
ReturnElasticCPUResumeSpans: true,
}
_, err := kv.SendWrappedWith(ctx, kvDB.NonTransactionalSender(), header, req)
require.NoError(t, err.GoError())
}

func TestExportGCThreshold(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/catalog/lease/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ func getDescriptorsFromStoreForInterval(
// Create an export request (1 kv call) for all descriptors for given
// descriptor ID written during the interval [timestamp, endTimestamp).
batchRequestHeader := kvpb.Header{
Timestamp: upperBound.Prev(),
Timestamp: upperBound.Prev(),
ReturnElasticCPUResumeSpans: true,
}
descriptorKey := catalogkeys.MakeDescMetadataKey(codec, id)
// Unmarshal key span retrieved from export request to construct historical descs.
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sem/builtins/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -7563,6 +7563,10 @@ expires until the statement bundle is collected`,
// specially in the future so as to allow the fingerprint to complete
// in the face of intents.
WaitPolicy: lock.WaitPolicy_Error,
// TODO(ssd): Setting this disables async sending in
// DistSender so it likely substantially impacts
// performance.
ReturnElasticCPUResumeSpans: true,
}
admissionHeader := kvpb.AdmissionHeader{
Priority: int32(admissionpb.BulkNormalPri),
Expand Down