From fc44f51e8993b3b49f8c218b4a43874a13a4c40f Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Thu, 10 Nov 2022 11:19:11 -0500 Subject: [PATCH] admission: make Pacer type available in SQL server config Currently, the Pacer type is only used within KV, but will be used by SQL in future changes. For example, code for encoding/decoding CDC events resides in distSQL and is CPU intensive, so there is a plan to integrate admission control to it in (https://github.com/cockroachdb/cockroach/issues/90089). This change makes the Pacer type available to the SQL layer via the `execinfra.ServerConfig`. Because the Pacer was previously only used by KV, it lived in the `kvadmission` package. Since this change makes it available outside of KV, it is moved to the `admission` package. Furthermore, this change adds a new method, `ElasticCPUGrantCoordinator.NewPacer`, to instantiate new Pacer structs. Since the `ElasticCPUGrantCoordinator` implements several features not relevant to SQL, this change passes the coordinator to the SQL server config as the interface `PacerMaker`, which makes only the `NewPacer` method accessible. Currently tenant servers do not create grant coordinators for admission control. This change retains that behavior, except it passes a `nil ElasticCPUGrandCoordinator` which creates `nil`/noop Pacers. Adding these coordinators to tenant servers is a change outside the scope of this commit and is left as a `TODO`. Release note: None --- pkg/kv/kvserver/kvadmission/kvadmission.go | 88 ++++++---------------- pkg/kv/kvserver/rangefeed/BUILD.bazel | 2 +- pkg/kv/kvserver/rangefeed/catchup_scan.go | 6 +- pkg/kv/kvserver/replica_rangefeed.go | 6 +- pkg/server/node.go | 2 +- pkg/server/server.go | 1 + pkg/server/server_sql.go | 5 ++ pkg/server/tenant.go | 6 ++ pkg/sql/execinfra/server_config.go | 2 + pkg/util/admission/BUILD.bazel | 1 + pkg/util/admission/grant_coordinator.go | 22 ++++++ pkg/util/admission/pacer.go | 60 +++++++++++++++ 12 files changed, 126 insertions(+), 75 deletions(-) create mode 100644 pkg/util/admission/pacer.go diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index b682b584648e..c9d4a98b30cb 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -96,7 +96,7 @@ type Controller interface { // If enabled, it returns a non-nil Pacer that's to be used within rangefeed // catchup scans (typically CPU-intensive and affecting scheduling // latencies). - AdmitRangefeedRequest(roachpb.TenantID, *roachpb.RangeFeedRequest) *Pacer + AdmitRangefeedRequest(roachpb.TenantID, *roachpb.RangeFeedRequest) *admission.Pacer // SetTenantWeightProvider is used to set the provider that will be // periodically polled for weights. The stopper should be used to terminate // the periodic polling. @@ -135,11 +135,11 @@ type TenantWeightsForStore struct { type controllerImpl struct { // Admission control queues and coordinators. All three should be nil or // non-nil. - kvAdmissionQ *admission.WorkQueue - storeGrantCoords *admission.StoreGrantCoordinators - elasticCPUWorkQueue *admission.ElasticCPUWorkQueue - settings *cluster.Settings - every log.EveryN + kvAdmissionQ *admission.WorkQueue + storeGrantCoords *admission.StoreGrantCoordinators + elasticCPUGrantCoordinator *admission.ElasticCPUGrantCoordinator + settings *cluster.Settings + every log.EveryN } var _ Controller = &controllerImpl{} @@ -162,16 +162,16 @@ type Handle struct { // nil or non-nil. func MakeController( kvAdmissionQ *admission.WorkQueue, - elasticCPUWorkQueue *admission.ElasticCPUWorkQueue, + elasticCPUGrantCoordinator *admission.ElasticCPUGrantCoordinator, storeGrantCoords *admission.StoreGrantCoordinators, settings *cluster.Settings, ) Controller { return &controllerImpl{ - kvAdmissionQ: kvAdmissionQ, - storeGrantCoords: storeGrantCoords, - elasticCPUWorkQueue: elasticCPUWorkQueue, - settings: settings, - every: log.Every(10 * time.Second), + kvAdmissionQ: kvAdmissionQ, + storeGrantCoords: storeGrantCoords, + elasticCPUGrantCoordinator: elasticCPUGrantCoordinator, + settings: settings, + every: log.Every(10 * time.Second), } } @@ -257,7 +257,7 @@ func (n *controllerImpl) AdmitKVWork( // handed out through this mechanism, as a way to provide latency // isolation to non-elastic ("latency sensitive") work running on // the same machine. - elasticWorkHandle, err := n.elasticCPUWorkQueue.Admit( + elasticWorkHandle, err := n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.Admit( ctx, elasticCPUDurationPerExportRequest.Get(&n.settings.SV), admissionInfo, ) if err != nil { @@ -267,7 +267,7 @@ func (n *controllerImpl) AdmitKVWork( defer func() { if retErr != nil { // No elastic work was done. - n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) + n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) } }() } else { @@ -283,7 +283,7 @@ func (n *controllerImpl) AdmitKVWork( // AdmittedKVWorkDone implements the Controller interface. func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteBytes) { - n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) + n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) if ah.callAdmittedWorkDoneOnKVAdmissionQ { n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID) } @@ -308,21 +308,19 @@ func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteByt // AdmitRangefeedRequest implements the Controller interface. func (n *controllerImpl) AdmitRangefeedRequest( tenantID roachpb.TenantID, request *roachpb.RangeFeedRequest, -) *Pacer { +) *admission.Pacer { if !rangefeedCatchupScanElasticControlEnabled.Get(&n.settings.SV) { return nil } - return &Pacer{ - unit: elasticCPUDurationPerRangefeedScanUnit.Get(&n.settings.SV), - wi: admission.WorkInfo{ + return n.elasticCPUGrantCoordinator.NewPacer( + elasticCPUDurationPerRangefeedScanUnit.Get(&n.settings.SV), + admission.WorkInfo{ TenantID: tenantID, Priority: admissionpb.WorkPriority(request.AdmissionHeader.Priority), CreateTime: request.AdmissionHeader.CreateTime, BypassAdmission: false, - }, - wq: n.elasticCPUWorkQueue, - } + }) } // SetTenantWeightProvider implements the Controller interface. @@ -350,7 +348,7 @@ func (n *controllerImpl) SetTenantWeightProvider( weights.Node = nil } n.kvAdmissionQ.SetTenantWeights(weights.Node) - n.elasticCPUWorkQueue.SetTenantWeights(weights.Node) + n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.SetTenantWeights(weights.Node) for _, storeWeights := range weights.Stores { q := n.storeGrantCoords.TryGetQueueForStore(int32(storeWeights.StoreID)) @@ -441,47 +439,3 @@ func (wb *StoreWriteBytes) Release() { } storeWriteBytesPool.Put(wb) } - -// Pacer is used in tight loops (CPU-bound) for non-premptible elastic work. -// Callers are expected to invoke Pace() every loop iteration and Close() once -// done. Internally this type integrates with elastic CPU work queue, acquiring -// tokens for the CPU work being done, and blocking if tokens are unavailable. -// This allows for a form of cooperative scheduling with elastic CPU granters. -type Pacer struct { - unit time.Duration - wi admission.WorkInfo - wq *admission.ElasticCPUWorkQueue - - cur *admission.ElasticCPUWorkHandle -} - -// Pace is part of the Pacer interface. -func (p *Pacer) Pace(ctx context.Context) error { - if p == nil { - return nil - } - - if overLimit, _ := p.cur.OverLimit(); overLimit { - p.wq.AdmittedWorkDone(p.cur) - p.cur = nil - } - - if p.cur == nil { - handle, err := p.wq.Admit(ctx, p.unit, p.wi) - if err != nil { - return err - } - p.cur = handle - } - return nil -} - -// Close is part of the Pacer interface. -func (p *Pacer) Close() { - if p == nil || p.cur == nil { - return - } - - p.wq.AdmittedWorkDone(p.cur) - p.cur = nil -} diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index 674602f4682f..bac4f6ebbc90 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -17,11 +17,11 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/keys", - "//pkg/kv/kvserver/kvadmission", "//pkg/roachpb", "//pkg/settings", "//pkg/storage", "//pkg/storage/enginepb", + "//pkg/util/admission", "//pkg/util/bufalloc", "//pkg/util/envutil", "//pkg/util/hlc", diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan.go b/pkg/kv/kvserver/rangefeed/catchup_scan.go index 06a2fd974cec..1726d672187d 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan.go @@ -15,10 +15,10 @@ import ( "context" "time" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -66,7 +66,7 @@ type CatchUpIterator struct { close func() span roachpb.Span startTime hlc.Timestamp // exclusive - pacer *kvadmission.Pacer + pacer *admission.Pacer } // NewCatchUpIterator returns a CatchUpIterator for the given Reader over the @@ -79,7 +79,7 @@ func NewCatchUpIterator( span roachpb.Span, startTime hlc.Timestamp, closer func(), - pacer *kvadmission.Pacer, + pacer *admission.Pacer, ) *CatchUpIterator { return &CatchUpIterator{ simpleCatchupIter: storage.NewMVCCIncrementalIterator(reader, diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 9945ba0a548a..d31a687ac57e 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -21,13 +21,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -135,7 +135,7 @@ func (tp *rangefeedTxnPusher) ResolveIntents( // complete. The surrounding store's ConcurrentRequestLimiter is used to limit // the number of rangefeeds using catch-up iterators at the same time. func (r *Replica) RangeFeed( - args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, pacer *kvadmission.Pacer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, pacer *admission.Pacer, ) *roachpb.Error { return r.rangeFeedWithRangeID(r.RangeID, args, stream, pacer) } @@ -144,7 +144,7 @@ func (r *Replica) rangeFeedWithRangeID( _forStacks roachpb.RangeID, args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, - pacer *kvadmission.Pacer, + pacer *admission.Pacer, ) *roachpb.Error { if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", diff --git a/pkg/server/node.go b/pkg/server/node.go index 59112fa09e48..e9b91ca5fd2e 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -388,7 +388,7 @@ func NewNode( testingErrorEvent: cfg.TestingKnobs.TestingResponseErrorEvent, } n.storeCfg.KVAdmissionController = kvadmission.MakeController( - kvAdmissionQ, elasticCPUGrantCoord.ElasticCPUWorkQueue, storeGrantCoords, cfg.Settings, + kvAdmissionQ, elasticCPUGrantCoord, storeGrantCoords, cfg.Settings, ) n.storeCfg.SchedulerLatencyListener = elasticCPUGrantCoord.SchedulerLatencyListener n.perReplicaServer = kvserver.MakeServer(&n.Descriptor, n.stores) diff --git a/pkg/server/server.go b/pkg/server/server.go index 3d0a3b47cba4..864c2632f151 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -879,6 +879,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { monitorAndMetrics: sqlMonitorAndMetrics, settingsStorage: settingsWriter, eventsServer: eventsServer, + pacerMaker: gcoords.Elastic, }) if err != nil { return nil, err diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 978af8456059..0cc59f9c649e 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -365,6 +365,10 @@ type sqlServerArgs struct { // externalStorageBuilder is the constructor for accesses to external // storage. externalStorageBuilder *externalStorageBuilder + + // pacerMaker is used for elastic CPU control when performing + // CPU intensive operations, such as CDC event encoding/decoding. + pacerMaker admission.PacerMaker } type monitorAndMetrics struct { @@ -741,6 +745,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { CollectionFactory: collectionFactory, ExternalIORecorder: cfg.costController, RangeStatsFetcher: rangeStatsFetcher, + PacerMaker: cfg.pacerMaker, } cfg.TempStorageConfig.Mon.SetMetrics(distSQLMetrics.CurDiskBytesCount, distSQLMetrics.MaxDiskBytesHist) if distSQLTestingKnobs := cfg.TestingKnobs.DistSQL; distSQLTestingKnobs != nil { diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index b89c387fc85b..f8de3de1151a 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -54,6 +54,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -876,6 +877,10 @@ func makeTenantSQLServerArgs( eventsServer.TestingKnobs = knobs.(obs.EventServerTestingKnobs) } + // TODO(jayant): generate admission.NewGrantCoordinators and pass them + // as server args, following the same pattern as server.NewServer. + var noopElasticCPUGrantCoord *admission.ElasticCPUGrantCoordinator = nil + return sqlServerArgs{ sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{ nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(nil), @@ -925,6 +930,7 @@ func makeTenantSQLServerArgs( grpc: grpcServer, eventsServer: eventsServer, externalStorageBuilder: esb, + pacerMaker: noopElasticCPUGrantCoord, }, nil } diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index c74c3ca46442..1de6272c4b0a 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -192,6 +192,8 @@ type ServerConfig struct { // RangeStatsFetcher is used to fetch range stats for keys. RangeStatsFetcher eval.RangeStatsFetcher + + PacerMaker admission.PacerMaker } // RuntimeStats is an interface through which the rowexec layer can get diff --git a/pkg/util/admission/BUILD.bazel b/pkg/util/admission/BUILD.bazel index 4d30bc57e5ac..718eab88af70 100644 --- a/pkg/util/admission/BUILD.bazel +++ b/pkg/util/admission/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "granter.go", "io_load_listener.go", "kv_slot_adjuster.go", + "pacer.go", "scheduler_latency_listener.go", "sql_cpu_overload_indicator.go", "store_token_estimation.go", diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index 45ebe90aaff7..dfa8b40cdbb4 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -1014,3 +1014,25 @@ func (e *ElasticCPUGrantCoordinator) close() { func (e *ElasticCPUGrantCoordinator) tryGrant() { e.elasticCPUGranter.tryGrant() } + +// PacerMaker is used to construct a new admission.Pacer. +// +// PacerMaker interface should be used in the SQL layer in lieu of +// ElasticCPUGrantCoordinator because the ElasticCPUGrantCoordinator +// implements several features which are irrelevant to SQL. +type PacerMaker interface { + // NewPacer constructs a new admission.Pacer, which may be nil. + NewPacer(unit time.Duration, wi WorkInfo) *Pacer +} + +// NewPacer implements the PacerMaker interface. +func (e *ElasticCPUGrantCoordinator) NewPacer(unit time.Duration, wi WorkInfo) *Pacer { + if e == nil { + return nil + } + return &Pacer{ + unit: unit, + wi: wi, + wq: e.ElasticCPUWorkQueue, + } +} diff --git a/pkg/util/admission/pacer.go b/pkg/util/admission/pacer.go new file mode 100644 index 000000000000..ea5eabcd1685 --- /dev/null +++ b/pkg/util/admission/pacer.go @@ -0,0 +1,60 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package admission + +import ( + "context" + "time" +) + +// Pacer is used in tight loops (CPU-bound) for non-premptible elastic work. +// Callers are expected to invoke Pace() every loop iteration and Close() once +// done. Internally this type integrates with elastic CPU work queue, acquiring +// tokens for the CPU work being done, and blocking if tokens are unavailable. +// This allows for a form of cooperative scheduling with elastic CPU granters. +type Pacer struct { + unit time.Duration + wi WorkInfo + wq *ElasticCPUWorkQueue + + cur *ElasticCPUWorkHandle +} + +// Pace is part of the Pacer interface. +func (p *Pacer) Pace(ctx context.Context) error { + if p == nil { + return nil + } + + if overLimit, _ := p.cur.OverLimit(); overLimit { + p.wq.AdmittedWorkDone(p.cur) + p.cur = nil + } + + if p.cur == nil { + handle, err := p.wq.Admit(ctx, p.unit, p.wi) + if err != nil { + return err + } + p.cur = handle + } + return nil +} + +// Close is part of the Pacer interface. +func (p *Pacer) Close() { + if p == nil || p.cur == nil { + return + } + + p.wq.AdmittedWorkDone(p.cur) + p.cur = nil +}