Skip to content

Commit

Permalink
Merge pull request #109257 from irfansharif/backport23.1-108815
Browse files Browse the repository at this point in the history
release-23.1: kv,sql: integrate row-level TTL reads with CPU limiter
  • Loading branch information
irfansharif authored Aug 22, 2023
2 parents 6b03a1a + ab6a1a2 commit d7c6fdb
Show file tree
Hide file tree
Showing 6 changed files with 236 additions and 33 deletions.
7 changes: 6 additions & 1 deletion pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
Expand Down Expand Up @@ -263,8 +264,12 @@ type DB struct {
// SQLKVResponseAdmissionQ is for use by SQL clients of the DB, and is
// placed here simply for plumbing convenience, as there is a diversity of
// SQL code that all uses kv.DB.
// TODO(sumeer): find a home for this in the SQL layer.
//
// TODO(sumeer,irfansharif): Find a home for these in the SQL layer.
// Especially SettingsValue.
SQLKVResponseAdmissionQ *admission.WorkQueue
AdmissionPacerFactory admission.PacerFactory
SettingsValues *settings.Values
}

// NonTransactionalSender returns a Sender that can be used for sending
Expand Down
88 changes: 74 additions & 14 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,44 @@ var elasticCPUDurationPerExportRequest = settings.RegisterDurationSetting(
},
)

// elasticCPUDurationPerInternalLowPriRead controls how many CPU tokens are
// allotted for each internally submitted low priority read request.
var elasticCPUDurationPerInternalLowPriRead = settings.RegisterDurationSetting(
settings.SystemOnly,
"kvadmission.elastic_cpu.duration_per_low_pri_read",
"controls how many CPU tokens are allotted for each internally submitted low priority read request",
10*time.Millisecond,
func(duration time.Duration) error {
if duration < admission.MinElasticCPUDuration {
return fmt.Errorf("minimum CPU duration allowed is %s, got %s",
admission.MinElasticCPUDuration, duration)
}
if duration > admission.MaxElasticCPUDuration {
return fmt.Errorf("maximum CPU duration allowed is %s, got %s",
admission.MaxElasticCPUDuration, duration)
}
return nil
},
)

// internalLowPriReadElasticControlEnabled determines whether internally
// submitted low pri reads integrate with elastic CPU control.
var internalLowPriReadElasticControlEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kvadmission.low_pri_read_elastic_control.enabled",
"determines whether the internally submitted low priority reads reads integrate with elastic CPU control",
false,
)

// exportRequestElasticControlEnabled determines whether export requests
// integrate with elastic CPU control.
var exportRequestElasticControlEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kvadmission.export_request_elastic_control.enabled",
"determines whether the export requests integrate with elastic CPU control",
true,
)

// elasticCPUDurationPerRangefeedScanUnit controls how many CPU tokens are
// allotted for each unit of work during rangefeed catchup scans. Only takes
// effect if kvadmission.rangefeed_catchup_scan_elastic_control.enabled is set.
Expand Down Expand Up @@ -258,21 +296,42 @@ func (n *controllerImpl) AdmitKVWork(
}
}
if admissionEnabled {
if ba.IsSingleExportRequest() {
// Backups generate batches with single export requests, which we
// admit through the elastic CPU work queue. We grant this
// CPU-intensive work a set amount of CPU time and expect it to
// terminate (cooperatively) once it exceeds its grant. The amount
// disbursed is 100ms, which we've experimentally found to be long
// enough to do enough useful work per-request while not causing too
// much in the way of scheduling delays on individual cores. Within
// admission control we have machinery that observes scheduling
// latencies periodically and reduces the total amount of CPU time
// handed out through this mechanism, as a way to provide latency
// isolation to non-elastic ("latency sensitive") work running on
// the same machine.
// - Backups generate batches with single export requests, which we
// admit through the elastic CPU work queue. We grant this
// CPU-intensive work a set amount of CPU time and expect it to
// terminate (cooperatively) once it exceeds its grant. The amount
// disbursed is 100ms, which we've experimentally found to be long
// enough to do enough useful work per-request while not causing too
// much in the way of scheduling delays on individual cores. Within
// admission control we have machinery that observes scheduling
// latencies periodically and reduces the total amount of CPU time
// handed out through this mechanism, as a way to provide latency
// isolation to non-elastic ("latency sensitive") work running on the
// same machine.
// - We do the same for internally submitted low priority reads in
// general (notably, for KV work done on the behalf of row-level TTL
// reads). Everything admissionpb.UserLowPri and above uses the slots
// mechanism.
isInternalLowPriRead := ba.IsReadOnly() && admissionInfo.Priority < admissionpb.UserLowPri
shouldUseElasticCPU :=
(exportRequestElasticControlEnabled.Get(&n.settings.SV) && ba.IsSingleExportRequest()) ||
(internalLowPriReadElasticControlEnabled.Get(&n.settings.SV) && isInternalLowPriRead)

if shouldUseElasticCPU {
var admitDuration time.Duration
if ba.IsSingleExportRequest() {
admitDuration = elasticCPUDurationPerExportRequest.Get(&n.settings.SV)
} else if isInternalLowPriRead {
admitDuration = elasticCPUDurationPerInternalLowPriRead.Get(&n.settings.SV)
}

// TODO(irfansharif): For export requests it's possible to preempt,
// i.e. once the CPU slice is used up we terminate the work. We
// don't do this for the general case of low priority internal
// reads, so in some sense, the integration is incomplete. This is
// probably harmless.
elasticWorkHandle, err := n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.Admit(
ctx, elasticCPUDurationPerExportRequest.Get(&n.settings.SV), admissionInfo,
ctx, admitDuration, admissionInfo,
)
if err != nil {
return Handle{}, err
Expand All @@ -285,6 +344,7 @@ func (n *controllerImpl) AdmitKVWork(
}
}()
} else {
// Use the slots-based mechanism for everything else.
callAdmittedWorkDoneOnKVAdmissionQ, err := n.kvAdmissionQ.Admit(ctx, admissionInfo)
if err != nil {
return Handle{}, err
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,8 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
dbCtx.Stopper = stopper
db := kv.NewDBWithContext(cfg.AmbientCtx, tcsFactory, clock, dbCtx)
db.SQLKVResponseAdmissionQ = gcoords.Regular.GetWorkQueue(admission.SQLKVResponseWork)
db.AdmissionPacerFactory = gcoords.Elastic
db.SettingsValues = &cfg.Settings.SV

nlActive, nlRenewal := cfg.NodeLivenessDurations()
if knobs := cfg.TestingKnobs.NodeLiveness; knobs != nil {
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/row/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,8 +436,10 @@ func (rf *Fetcher) Init(ctx context.Context, args FetcherInitArgs) error {
}
if args.Txn != nil {
fetcherArgs.sendFn = makeTxnKVFetcherDefaultSendFunc(args.Txn, &batchRequestsIssued)
fetcherArgs.requestAdmissionHeader = args.Txn.AdmissionHeader()
fetcherArgs.responseAdmissionQ = args.Txn.DB().SQLKVResponseAdmissionQ
fetcherArgs.admission.requestHeader = args.Txn.AdmissionHeader()
fetcherArgs.admission.responseQ = args.Txn.DB().SQLKVResponseAdmissionQ
fetcherArgs.admission.pacerFactory = args.Txn.DB().AdmissionPacerFactory
fetcherArgs.admission.settingsValues = args.Txn.DB().SettingsValues
}
rf.kvFetcher = newKVFetcher(newTxnKVFetcherInternal(fetcherArgs))
}
Expand Down
160 changes: 146 additions & 14 deletions pkg/sql/row/kv_batch_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package row

import (
"context"
"fmt"
"sync/atomic"
"time"
"unsafe"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/fetchpb"
Expand Down Expand Up @@ -57,6 +59,42 @@ var defaultKVBatchSize = rowinfra.KeyLimit(util.ConstantWithMetamorphicTestValue
1, /* metamorphicValue */
))

var logAdmissionPacerErr = log.Every(100 * time.Millisecond)

// elasticCPUDurationPerLowPriReadResponse controls how many CPU tokens are allotted
// each time we seek admission for response handling during internally submitted
// low priority reads (like row-level TTL selects).
var elasticCPUDurationPerLowPriReadResponse = settings.RegisterDurationSetting(
settings.SystemOnly,
"sqladmission.elastic_cpu.duration_per_low_pri_read_response",
"controls how many CPU tokens are allotted for handling responses for internally submitted low priority reads",
// NB: Experimentally, during TTL reads, we observed cumulative on-CPU time
// by SQL processors >> 100ms, over the course of a single select fetching
// many rows. So we pick a relative high duration here.
100*time.Millisecond,
func(duration time.Duration) error {
if duration < admission.MinElasticCPUDuration {
return fmt.Errorf("minimum CPU duration allowed is %s, got %s",
admission.MinElasticCPUDuration, duration)
}
if duration > admission.MaxElasticCPUDuration {
return fmt.Errorf("maximum CPU duration allowed is %s, got %s",
admission.MaxElasticCPUDuration, duration)
}
return nil
},
)

// internalLowPriReadElasticControlEnabled determines whether the sql portion of
// internally submitted low-priority reads (like row-level TTL selects)
// integrate with elastic CPU control.
var internalLowPriReadElasticControlEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"sqladmission.low_pri_read_response_elastic_control.enabled",
"determines whether the sql portion of internally submitted reads integrate with elastic CPU controller",
false,
)

// sendFunc is the function used to execute a KV batch; normally
// wraps (*client.Txn).Send.
type sendFunc func(
Expand Down Expand Up @@ -185,6 +223,7 @@ type txnKVFetcher struct {
// For request and response admission control.
requestAdmissionHeader kvpb.AdmissionHeader
responseAdmissionQ *admission.WorkQueue
admissionPacer *admission.Pacer
}

var _ KVBatchFetcher = &txnKVFetcher{}
Expand Down Expand Up @@ -262,8 +301,13 @@ type newTxnKVFetcherArgs struct {
acc *mon.BoundAccount
forceProductionKVBatchSize bool
batchRequestsIssued *int64
requestAdmissionHeader kvpb.AdmissionHeader
responseAdmissionQ *admission.WorkQueue

admission struct { // groups AC-related fields
requestHeader kvpb.AdmissionHeader
responseQ *admission.WorkQueue
pacerFactory admission.PacerFactory
settingsValues *settings.Values
}
}

// newTxnKVFetcherInternal initializes a txnKVFetcher.
Expand All @@ -282,9 +326,14 @@ func newTxnKVFetcherInternal(args newTxnKVFetcherArgs) *txnKVFetcher {
lockTimeout: args.lockTimeout,
acc: args.acc,
forceProductionKVBatchSize: args.forceProductionKVBatchSize,
requestAdmissionHeader: args.requestAdmissionHeader,
responseAdmissionQ: args.responseAdmissionQ,
}
requestAdmissionHeader: args.admission.requestHeader,
responseAdmissionQ: args.admission.responseQ,
}
f.maybeInitAdmissionPacer(
args.admission.requestHeader,
args.admission.pacerFactory,
args.admission.settingsValues,
)
f.kvBatchFetcherHelper.init(f.nextBatch, args.batchRequestsIssued)
return f
}
Expand All @@ -295,6 +344,42 @@ func (f *txnKVFetcher) setTxnAndSendFn(txn *kv.Txn, sendFn sendFunc) {
f.sendFn = sendFn
f.requestAdmissionHeader = txn.AdmissionHeader()
f.responseAdmissionQ = txn.DB().SQLKVResponseAdmissionQ

if f.admissionPacer != nil {
f.admissionPacer.Close()
}
f.maybeInitAdmissionPacer(txn.AdmissionHeader(), txn.DB().AdmissionPacerFactory, txn.DB().SettingsValues)
}

// maybeInitAdmissionPacer selectively initializes an admission.Pacer for work
// done as part of internally submitted low-priority reads (like row-level TTL
// selects).
func (f *txnKVFetcher) maybeInitAdmissionPacer(
admissionHeader kvpb.AdmissionHeader, pacerFactory admission.PacerFactory, sv *settings.Values,
) {
if sv == nil {
// Only nil in tests and in SQL pods (we don't have admission pacing in
// the latter anyway).
return
}
admissionPri := admissionpb.WorkPriority(admissionHeader.Priority)
if internalLowPriReadElasticControlEnabled.Get(sv) &&
admissionPri < admissionpb.UserLowPri &&
pacerFactory != nil {

f.admissionPacer = pacerFactory.NewPacer(
elasticCPUDurationPerLowPriReadResponse.Get(sv),
admission.WorkInfo{
// NB: This is either code that runs in physically isolated SQL
// pods for secondary tenants, or for the system tenant, in
// nodes running colocated SQL+KV code where all SQL code is run
// on behalf of the one tenant. So from an AC perspective, the
// tenant ID we pass through here is irrelevant.
TenantID: roachpb.SystemTenantID,
Priority: admissionPri,
CreateTime: admissionHeader.CreateTime,
})
}
}

// SetupNextFetch sets up the Fetcher for the next set of spans.
Expand Down Expand Up @@ -533,16 +618,10 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error {
}
f.batchResponseAccountedFor = returnedBytes
}

// Do admission control after we've accounted for the response bytes.
if br != nil && f.responseAdmissionQ != nil {
responseAdmission := admission.WorkInfo{
TenantID: roachpb.SystemTenantID,
Priority: admissionpb.WorkPriority(f.requestAdmissionHeader.Priority),
CreateTime: f.requestAdmissionHeader.CreateTime,
}
if _, err := f.responseAdmissionQ.Admit(ctx, responseAdmission); err != nil {
return err
}
if err := f.maybeAdmitBatchResponse(ctx, br); err != nil {
return err
}

f.batchIdx++
Expand Down Expand Up @@ -570,6 +649,58 @@ func (f *txnKVFetcher) fetch(ctx context.Context) error {
return nil
}

func (f *txnKVFetcher) maybeAdmitBatchResponse(ctx context.Context, br *kvpb.BatchResponse) error {
if br == nil {
return nil // nothing to do
}

if f.admissionPacer != nil {
// If admissionPacer is initialized, we're using the elastic CPU control
// mechanism (the work is elastic in nature and using the slots based
// mechanism would permit high scheduling latencies). We want to limit
// the CPU% used by SQL during internally submitted reads, like
// row-level TTL selects. All that work happens on the same goroutine
// doing this fetch, so is accounted for when invoking .Pace() as we
// fetch KVs as part of our volcano operator iteration. See CPU profiles
// posted on #98722.
//
// TODO(irfansharif): At the time of writing, SELECTs done by the TTL
// job are not distributed at SQL level (since our DistSQL physical
// planning heuristics deems it not worthy of distribution), and with
// the local plan we only have a single goroutine (unless
// maybeParallelizeLocalScans splits up the single scan into multiple
// TableReader processors). This may change as part of
// https://github.com/cockroachdb/cockroach/issues/82164 where CPU
// intensive SQL work will happen on a different goroutine from the ones
// that evaluate the BatchRequests, so the integration is tricker there.
// If we're unable to integrate it well, we could disable usage of the
// streamer to preserve this current form of pacing.
//
// TODO(irfansharif): Add tests for the SELECT queries issued by the TTL
// to ensure that they have local plans with a single TableReader
// processor in multi-node clusters.
if err := f.admissionPacer.Pace(ctx); err != nil {
// We're unable to pace things automatically -- shout loudly
// semi-infrequently but don't fail the kv fetcher itself. At
// worst we'd be over-admitting.
if logAdmissionPacerErr.ShouldLog() {
log.Errorf(ctx, "automatic pacing: %v", err)
}
}
} else if f.responseAdmissionQ != nil {
responseAdmission := admission.WorkInfo{
TenantID: roachpb.SystemTenantID,
Priority: admissionpb.WorkPriority(f.requestAdmissionHeader.Priority),
CreateTime: f.requestAdmissionHeader.CreateTime,
}
if _, err := f.responseAdmissionQ.Admit(ctx, responseAdmission); err != nil {
return err
}
}

return nil
}

// popBatch returns the 0th "batch" in a slice of "batches", as well as the rest
// of the slice of the "batches". It nils the pointer to the 0th element before
// reslicing the outer slice.
Expand Down Expand Up @@ -746,6 +877,7 @@ func (f *txnKVFetcher) reset(ctx context.Context) {
// Close releases the resources of this txnKVFetcher.
func (f *txnKVFetcher) Close(ctx context.Context) {
f.reset(ctx)
f.admissionPacer.Close()
}

const requestUnionOverhead = int64(unsafe.Sizeof(kvpb.RequestUnion{}))
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/row/kv_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ func newTxnKVFetcher(
// In most cases, the txn is non-nil; however, in some code paths (e.g.
// when executing EXPLAIN (VEC)) it might be nil, so we need to have
// this check.
fetcherArgs.requestAdmissionHeader = txn.AdmissionHeader()
fetcherArgs.responseAdmissionQ = txn.DB().SQLKVResponseAdmissionQ
fetcherArgs.admission.requestHeader = txn.AdmissionHeader()
fetcherArgs.admission.responseQ = txn.DB().SQLKVResponseAdmissionQ
fetcherArgs.admission.pacerFactory = txn.DB().AdmissionPacerFactory
fetcherArgs.admission.settingsValues = txn.DB().SettingsValues
}
return newTxnKVFetcherInternal(fetcherArgs)
}
Expand Down

0 comments on commit d7c6fdb

Please sign in to comment.