Skip to content

Commit

Permalink
admission: make Pacer type available in SQL server config
Browse files Browse the repository at this point in the history
Currently, the Pacer type is only used within KV, but will be used by SQL
in future changes. For example, code for encoding/decoding CDC events resides
in distSQL and is CPU intensive, so there is a plan to integrate admission
control to it in (cockroachdb#90089).
This change makes the Pacer type available to the SQL layer via the
`execinfra.ServerConfig`.

Because the Pacer was previously only used by KV, it lived in the `kvadmission`
package. Since this change makes it available outside of KV, it is moved to
the `admission` package.

Furthermore, this change adds a new method,
`ElasticCPUGrantCoordinator.NewPacer`, to instantiate new Pacer structs.
Since the `ElasticCPUGrantCoordinator` implements several features not relevant
to SQL, this change passes the coordinator to the SQL server config as
the interface `PacerMaker`, which makes only the `NewPacer` method accessible.

Currently tenant servers do not create grant coordinators for admission
control. This change retains that behavior, except it passes a `nil
ElasticCPUGrandCoordinator` which creates `nil`/noop Pacers. Adding these
coordinators to tenant servers is a change outside the scope of this commit and
is left as a `TODO`.

Release note: None
  • Loading branch information
jayshrivastava committed Nov 30, 2022
1 parent 048fc35 commit 189c91c
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 75 deletions.
88 changes: 21 additions & 67 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Controller interface {
// If enabled, it returns a non-nil Pacer that's to be used within rangefeed
// catchup scans (typically CPU-intensive and affecting scheduling
// latencies).
AdmitRangefeedRequest(roachpb.TenantID, *roachpb.RangeFeedRequest) *Pacer
AdmitRangefeedRequest(roachpb.TenantID, *roachpb.RangeFeedRequest) *admission.Pacer
// SetTenantWeightProvider is used to set the provider that will be
// periodically polled for weights. The stopper should be used to terminate
// the periodic polling.
Expand Down Expand Up @@ -135,11 +135,11 @@ type TenantWeightsForStore struct {
type controllerImpl struct {
// Admission control queues and coordinators. All three should be nil or
// non-nil.
kvAdmissionQ *admission.WorkQueue
storeGrantCoords *admission.StoreGrantCoordinators
elasticCPUWorkQueue *admission.ElasticCPUWorkQueue
settings *cluster.Settings
every log.EveryN
kvAdmissionQ *admission.WorkQueue
storeGrantCoords *admission.StoreGrantCoordinators
elasticCPUGrantCoordinator *admission.ElasticCPUGrantCoordinator
settings *cluster.Settings
every log.EveryN
}

var _ Controller = &controllerImpl{}
Expand All @@ -162,16 +162,16 @@ type Handle struct {
// nil or non-nil.
func MakeController(
kvAdmissionQ *admission.WorkQueue,
elasticCPUWorkQueue *admission.ElasticCPUWorkQueue,
elasticCPUGrantCoordinator *admission.ElasticCPUGrantCoordinator,
storeGrantCoords *admission.StoreGrantCoordinators,
settings *cluster.Settings,
) Controller {
return &controllerImpl{
kvAdmissionQ: kvAdmissionQ,
storeGrantCoords: storeGrantCoords,
elasticCPUWorkQueue: elasticCPUWorkQueue,
settings: settings,
every: log.Every(10 * time.Second),
kvAdmissionQ: kvAdmissionQ,
storeGrantCoords: storeGrantCoords,
elasticCPUGrantCoordinator: elasticCPUGrantCoordinator,
settings: settings,
every: log.Every(10 * time.Second),
}
}

Expand Down Expand Up @@ -257,7 +257,7 @@ func (n *controllerImpl) AdmitKVWork(
// handed out through this mechanism, as a way to provide latency
// isolation to non-elastic ("latency sensitive") work running on
// the same machine.
elasticWorkHandle, err := n.elasticCPUWorkQueue.Admit(
elasticWorkHandle, err := n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.Admit(
ctx, elasticCPUDurationPerExportRequest.Get(&n.settings.SV), admissionInfo,
)
if err != nil {
Expand All @@ -267,7 +267,7 @@ func (n *controllerImpl) AdmitKVWork(
defer func() {
if retErr != nil {
// No elastic work was done.
n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle)
n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle)
}
}()
} else {
Expand All @@ -283,7 +283,7 @@ func (n *controllerImpl) AdmitKVWork(

// AdmittedKVWorkDone implements the Controller interface.
func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteBytes) {
n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle)
n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle)
if ah.callAdmittedWorkDoneOnKVAdmissionQ {
n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID)
}
Expand All @@ -308,21 +308,19 @@ func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteByt
// AdmitRangefeedRequest implements the Controller interface.
func (n *controllerImpl) AdmitRangefeedRequest(
tenantID roachpb.TenantID, request *roachpb.RangeFeedRequest,
) *Pacer {
) *admission.Pacer {
if !rangefeedCatchupScanElasticControlEnabled.Get(&n.settings.SV) {
return nil
}

return &Pacer{
unit: elasticCPUDurationPerRangefeedScanUnit.Get(&n.settings.SV),
wi: admission.WorkInfo{
return n.elasticCPUGrantCoordinator.NewPacer(
elasticCPUDurationPerRangefeedScanUnit.Get(&n.settings.SV),
admission.WorkInfo{
TenantID: tenantID,
Priority: admissionpb.WorkPriority(request.AdmissionHeader.Priority),
CreateTime: request.AdmissionHeader.CreateTime,
BypassAdmission: false,
},
wq: n.elasticCPUWorkQueue,
}
})
}

// SetTenantWeightProvider implements the Controller interface.
Expand Down Expand Up @@ -350,7 +348,7 @@ func (n *controllerImpl) SetTenantWeightProvider(
weights.Node = nil
}
n.kvAdmissionQ.SetTenantWeights(weights.Node)
n.elasticCPUWorkQueue.SetTenantWeights(weights.Node)
n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.SetTenantWeights(weights.Node)

for _, storeWeights := range weights.Stores {
q := n.storeGrantCoords.TryGetQueueForStore(int32(storeWeights.StoreID))
Expand Down Expand Up @@ -441,47 +439,3 @@ func (wb *StoreWriteBytes) Release() {
}
storeWriteBytesPool.Put(wb)
}

// Pacer is used in tight loops (CPU-bound) for non-premptible elastic work.
// Callers are expected to invoke Pace() every loop iteration and Close() once
// done. Internally this type integrates with elastic CPU work queue, acquiring
// tokens for the CPU work being done, and blocking if tokens are unavailable.
// This allows for a form of cooperative scheduling with elastic CPU granters.
type Pacer struct {
unit time.Duration
wi admission.WorkInfo
wq *admission.ElasticCPUWorkQueue

cur *admission.ElasticCPUWorkHandle
}

// Pace is part of the Pacer interface.
func (p *Pacer) Pace(ctx context.Context) error {
if p == nil {
return nil
}

if overLimit, _ := p.cur.OverLimit(); overLimit {
p.wq.AdmittedWorkDone(p.cur)
p.cur = nil
}

if p.cur == nil {
handle, err := p.wq.Admit(ctx, p.unit, p.wi)
if err != nil {
return err
}
p.cur = handle
}
return nil
}

// Close is part of the Pacer interface.
func (p *Pacer) Close() {
if p == nil || p.cur == nil {
return
}

p.wq.AdmittedWorkDone(p.cur)
p.cur = nil
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv/kvserver/kvadmission",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/admission",
"//pkg/util/bufalloc",
"//pkg/util/envutil",
"//pkg/util/hlc",
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/rangefeed/catchup_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -66,7 +66,7 @@ type CatchUpIterator struct {
close func()
span roachpb.Span
startTime hlc.Timestamp // exclusive
pacer *kvadmission.Pacer
pacer *admission.Pacer
}

// NewCatchUpIterator returns a CatchUpIterator for the given Reader over the
Expand All @@ -79,7 +79,7 @@ func NewCatchUpIterator(
span roachpb.Span,
startTime hlc.Timestamp,
closer func(),
pacer *kvadmission.Pacer,
pacer *admission.Pacer,
) *CatchUpIterator {
return &CatchUpIterator{
simpleCatchupIter: storage.NewMVCCIncrementalIterator(reader,
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -135,7 +135,7 @@ func (tp *rangefeedTxnPusher) ResolveIntents(
// complete. The surrounding store's ConcurrentRequestLimiter is used to limit
// the number of rangefeeds using catch-up iterators at the same time.
func (r *Replica) RangeFeed(
args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, pacer *kvadmission.Pacer,
args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, pacer *admission.Pacer,
) *roachpb.Error {
return r.rangeFeedWithRangeID(r.RangeID, args, stream, pacer)
}
Expand All @@ -144,7 +144,7 @@ func (r *Replica) rangeFeedWithRangeID(
_forStacks roachpb.RangeID,
args *roachpb.RangeFeedRequest,
stream roachpb.RangeFeedEventSink,
pacer *kvadmission.Pacer,
pacer *admission.Pacer,
) *roachpb.Error {
if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) {
return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s",
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func NewNode(
testingErrorEvent: cfg.TestingKnobs.TestingResponseErrorEvent,
}
n.storeCfg.KVAdmissionController = kvadmission.MakeController(
kvAdmissionQ, elasticCPUGrantCoord.ElasticCPUWorkQueue, storeGrantCoords, cfg.Settings,
kvAdmissionQ, elasticCPUGrantCoord, storeGrantCoords, cfg.Settings,
)
n.storeCfg.SchedulerLatencyListener = elasticCPUGrantCoord.SchedulerLatencyListener
n.perReplicaServer = kvserver.MakeServer(&n.Descriptor, n.stores)
Expand Down
1 change: 1 addition & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {
monitorAndMetrics: sqlMonitorAndMetrics,
settingsStorage: settingsWriter,
eventsServer: eventsServer,
admissionPacerFactory: gcoords.Elastic,
})
if err != nil {
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,10 @@ type sqlServerArgs struct {

// eventsServer communicates with the Observability Service.
eventsServer *obs.EventsServer

// admissionPacerFactory is used for elastic CPU control when performing
// CPU intensive operations, such as CDC event encoding/decoding.
admissionPacerFactory admission.PacerFactory
}

type monitorAndMetrics struct {
Expand Down Expand Up @@ -681,6 +685,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
CollectionFactory: collectionFactory,
ExternalIORecorder: cfg.costController,
RangeStatsFetcher: rangeStatsFetcher,
AdmissionPacerFactory: cfg.admissionPacerFactory,
}
cfg.TempStorageConfig.Mon.SetMetrics(distSQLMetrics.CurDiskBytesCount, distSQLMetrics.MaxDiskBytesHist)
if distSQLTestingKnobs := cfg.TestingKnobs.DistSQL; distSQLTestingKnobs != nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
Expand Down Expand Up @@ -574,6 +575,9 @@ func makeTenantSQLServerArgs(
)
obspb.RegisterObsServer(grpcServer.Server, obsServer)

// TODO(irfansharif): hook up NewGrantCoordinatorSQL.
var noopElasticCPUGrantCoord *admission.ElasticCPUGrantCoordinator = nil

return sqlServerArgs{
sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{
nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(nil),
Expand Down Expand Up @@ -621,6 +625,7 @@ func makeTenantSQLServerArgs(
monitorAndMetrics: monitorAndMetrics,
grpc: grpcServer,
eventsServer: obsServer,
admissionPacerFactory: noopElasticCPUGrantCoord,
}, nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ type ServerConfig struct {

// RangeStatsFetcher is used to fetch range stats for keys.
RangeStatsFetcher eval.RangeStatsFetcher

// AdmissionPacerFactory is used to integrate CPU-intensive work
// with elastic CPU control.
AdmissionPacerFactory admission.PacerFactory
}

// RuntimeStats is an interface through which the rowexec layer can get
Expand Down
1 change: 1 addition & 0 deletions pkg/util/admission/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"granter.go",
"io_load_listener.go",
"kv_slot_adjuster.go",
"pacer.go",
"scheduler_latency_listener.go",
"sql_cpu_overload_indicator.go",
"store_token_estimation.go",
Expand Down
7 changes: 7 additions & 0 deletions pkg/util/admission/admission.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,3 +561,10 @@ type storeRequestEstimates struct {
// writeTokens is the tokens to request at admission time. Must be > 0.
writeTokens int64
}

// PacerFactory is used to construct a new admission.Pacer.
type PacerFactory interface {
NewPacer(unit time.Duration, wi WorkInfo) *Pacer
}

var _ PacerFactory = &ElasticCPUGrantCoordinator{}
12 changes: 12 additions & 0 deletions pkg/util/admission/grant_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,3 +1014,15 @@ func (e *ElasticCPUGrantCoordinator) close() {
func (e *ElasticCPUGrantCoordinator) tryGrant() {
e.elasticCPUGranter.tryGrant()
}

// NewPacer implements the PacerMaker interface.
func (e *ElasticCPUGrantCoordinator) NewPacer(unit time.Duration, wi WorkInfo) *Pacer {
if e == nil {
return nil
}
return &Pacer{
unit: unit,
wi: wi,
wq: e.ElasticCPUWorkQueue,
}
}
Loading

0 comments on commit 189c91c

Please sign in to comment.