From 189c91c03a80de5909d73bce5f358ddb072cdc94 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Thu, 10 Nov 2022 11:19:11 -0500 Subject: [PATCH] admission: make Pacer type available in SQL server config Currently, the Pacer type is only used within KV, but will be used by SQL in future changes. For example, code for encoding/decoding CDC events resides in distSQL and is CPU intensive, so there is a plan to integrate admission control to it in (https://github.com/cockroachdb/cockroach/issues/90089). This change makes the Pacer type available to the SQL layer via the `execinfra.ServerConfig`. Because the Pacer was previously only used by KV, it lived in the `kvadmission` package. Since this change makes it available outside of KV, it is moved to the `admission` package. Furthermore, this change adds a new method, `ElasticCPUGrantCoordinator.NewPacer`, to instantiate new Pacer structs. Since the `ElasticCPUGrantCoordinator` implements several features not relevant to SQL, this change passes the coordinator to the SQL server config as the interface `PacerMaker`, which makes only the `NewPacer` method accessible. Currently tenant servers do not create grant coordinators for admission control. This change retains that behavior, except it passes a `nil ElasticCPUGrandCoordinator` which creates `nil`/noop Pacers. Adding these coordinators to tenant servers is a change outside the scope of this commit and is left as a `TODO`. Release note: None --- pkg/kv/kvserver/kvadmission/kvadmission.go | 88 ++++++---------------- pkg/kv/kvserver/rangefeed/BUILD.bazel | 2 +- pkg/kv/kvserver/rangefeed/catchup_scan.go | 6 +- pkg/kv/kvserver/replica_rangefeed.go | 6 +- pkg/server/node.go | 2 +- pkg/server/server.go | 1 + pkg/server/server_sql.go | 5 ++ pkg/server/tenant.go | 5 ++ pkg/sql/execinfra/server_config.go | 4 + pkg/util/admission/BUILD.bazel | 1 + pkg/util/admission/admission.go | 7 ++ pkg/util/admission/grant_coordinator.go | 12 +++ pkg/util/admission/pacer.go | 60 +++++++++++++++ 13 files changed, 124 insertions(+), 75 deletions(-) create mode 100644 pkg/util/admission/pacer.go diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index 5c826b651f61..eb8bf09dd400 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -96,7 +96,7 @@ type Controller interface { // If enabled, it returns a non-nil Pacer that's to be used within rangefeed // catchup scans (typically CPU-intensive and affecting scheduling // latencies). - AdmitRangefeedRequest(roachpb.TenantID, *roachpb.RangeFeedRequest) *Pacer + AdmitRangefeedRequest(roachpb.TenantID, *roachpb.RangeFeedRequest) *admission.Pacer // SetTenantWeightProvider is used to set the provider that will be // periodically polled for weights. The stopper should be used to terminate // the periodic polling. @@ -135,11 +135,11 @@ type TenantWeightsForStore struct { type controllerImpl struct { // Admission control queues and coordinators. All three should be nil or // non-nil. - kvAdmissionQ *admission.WorkQueue - storeGrantCoords *admission.StoreGrantCoordinators - elasticCPUWorkQueue *admission.ElasticCPUWorkQueue - settings *cluster.Settings - every log.EveryN + kvAdmissionQ *admission.WorkQueue + storeGrantCoords *admission.StoreGrantCoordinators + elasticCPUGrantCoordinator *admission.ElasticCPUGrantCoordinator + settings *cluster.Settings + every log.EveryN } var _ Controller = &controllerImpl{} @@ -162,16 +162,16 @@ type Handle struct { // nil or non-nil. func MakeController( kvAdmissionQ *admission.WorkQueue, - elasticCPUWorkQueue *admission.ElasticCPUWorkQueue, + elasticCPUGrantCoordinator *admission.ElasticCPUGrantCoordinator, storeGrantCoords *admission.StoreGrantCoordinators, settings *cluster.Settings, ) Controller { return &controllerImpl{ - kvAdmissionQ: kvAdmissionQ, - storeGrantCoords: storeGrantCoords, - elasticCPUWorkQueue: elasticCPUWorkQueue, - settings: settings, - every: log.Every(10 * time.Second), + kvAdmissionQ: kvAdmissionQ, + storeGrantCoords: storeGrantCoords, + elasticCPUGrantCoordinator: elasticCPUGrantCoordinator, + settings: settings, + every: log.Every(10 * time.Second), } } @@ -257,7 +257,7 @@ func (n *controllerImpl) AdmitKVWork( // handed out through this mechanism, as a way to provide latency // isolation to non-elastic ("latency sensitive") work running on // the same machine. - elasticWorkHandle, err := n.elasticCPUWorkQueue.Admit( + elasticWorkHandle, err := n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.Admit( ctx, elasticCPUDurationPerExportRequest.Get(&n.settings.SV), admissionInfo, ) if err != nil { @@ -267,7 +267,7 @@ func (n *controllerImpl) AdmitKVWork( defer func() { if retErr != nil { // No elastic work was done. - n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) + n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) } }() } else { @@ -283,7 +283,7 @@ func (n *controllerImpl) AdmitKVWork( // AdmittedKVWorkDone implements the Controller interface. func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteBytes) { - n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) + n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) if ah.callAdmittedWorkDoneOnKVAdmissionQ { n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID) } @@ -308,21 +308,19 @@ func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteByt // AdmitRangefeedRequest implements the Controller interface. func (n *controllerImpl) AdmitRangefeedRequest( tenantID roachpb.TenantID, request *roachpb.RangeFeedRequest, -) *Pacer { +) *admission.Pacer { if !rangefeedCatchupScanElasticControlEnabled.Get(&n.settings.SV) { return nil } - return &Pacer{ - unit: elasticCPUDurationPerRangefeedScanUnit.Get(&n.settings.SV), - wi: admission.WorkInfo{ + return n.elasticCPUGrantCoordinator.NewPacer( + elasticCPUDurationPerRangefeedScanUnit.Get(&n.settings.SV), + admission.WorkInfo{ TenantID: tenantID, Priority: admissionpb.WorkPriority(request.AdmissionHeader.Priority), CreateTime: request.AdmissionHeader.CreateTime, BypassAdmission: false, - }, - wq: n.elasticCPUWorkQueue, - } + }) } // SetTenantWeightProvider implements the Controller interface. @@ -350,7 +348,7 @@ func (n *controllerImpl) SetTenantWeightProvider( weights.Node = nil } n.kvAdmissionQ.SetTenantWeights(weights.Node) - n.elasticCPUWorkQueue.SetTenantWeights(weights.Node) + n.elasticCPUGrantCoordinator.ElasticCPUWorkQueue.SetTenantWeights(weights.Node) for _, storeWeights := range weights.Stores { q := n.storeGrantCoords.TryGetQueueForStore(int32(storeWeights.StoreID)) @@ -441,47 +439,3 @@ func (wb *StoreWriteBytes) Release() { } storeWriteBytesPool.Put(wb) } - -// Pacer is used in tight loops (CPU-bound) for non-premptible elastic work. -// Callers are expected to invoke Pace() every loop iteration and Close() once -// done. Internally this type integrates with elastic CPU work queue, acquiring -// tokens for the CPU work being done, and blocking if tokens are unavailable. -// This allows for a form of cooperative scheduling with elastic CPU granters. -type Pacer struct { - unit time.Duration - wi admission.WorkInfo - wq *admission.ElasticCPUWorkQueue - - cur *admission.ElasticCPUWorkHandle -} - -// Pace is part of the Pacer interface. -func (p *Pacer) Pace(ctx context.Context) error { - if p == nil { - return nil - } - - if overLimit, _ := p.cur.OverLimit(); overLimit { - p.wq.AdmittedWorkDone(p.cur) - p.cur = nil - } - - if p.cur == nil { - handle, err := p.wq.Admit(ctx, p.unit, p.wi) - if err != nil { - return err - } - p.cur = handle - } - return nil -} - -// Close is part of the Pacer interface. -func (p *Pacer) Close() { - if p == nil || p.cur == nil { - return - } - - p.wq.AdmittedWorkDone(p.cur) - p.cur = nil -} diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index b971df3eae0c..e9f1abf12e20 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -17,11 +17,11 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/keys", - "//pkg/kv/kvserver/kvadmission", "//pkg/roachpb", "//pkg/settings", "//pkg/storage", "//pkg/storage/enginepb", + "//pkg/util/admission", "//pkg/util/bufalloc", "//pkg/util/envutil", "//pkg/util/hlc", diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan.go b/pkg/kv/kvserver/rangefeed/catchup_scan.go index 06a2fd974cec..1726d672187d 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan.go @@ -15,10 +15,10 @@ import ( "context" "time" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -66,7 +66,7 @@ type CatchUpIterator struct { close func() span roachpb.Span startTime hlc.Timestamp // exclusive - pacer *kvadmission.Pacer + pacer *admission.Pacer } // NewCatchUpIterator returns a CatchUpIterator for the given Reader over the @@ -79,7 +79,7 @@ func NewCatchUpIterator( span roachpb.Span, startTime hlc.Timestamp, closer func(), - pacer *kvadmission.Pacer, + pacer *admission.Pacer, ) *CatchUpIterator { return &CatchUpIterator{ simpleCatchupIter: storage.NewMVCCIncrementalIterator(reader, diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index a8b829eb9f6c..b11106475cca 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -21,13 +21,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -135,7 +135,7 @@ func (tp *rangefeedTxnPusher) ResolveIntents( // complete. The surrounding store's ConcurrentRequestLimiter is used to limit // the number of rangefeeds using catch-up iterators at the same time. func (r *Replica) RangeFeed( - args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, pacer *kvadmission.Pacer, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, pacer *admission.Pacer, ) *roachpb.Error { return r.rangeFeedWithRangeID(r.RangeID, args, stream, pacer) } @@ -144,7 +144,7 @@ func (r *Replica) rangeFeedWithRangeID( _forStacks roachpb.RangeID, args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, - pacer *kvadmission.Pacer, + pacer *admission.Pacer, ) *roachpb.Error { if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", diff --git a/pkg/server/node.go b/pkg/server/node.go index 51e505299c58..b6a339517aef 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -388,7 +388,7 @@ func NewNode( testingErrorEvent: cfg.TestingKnobs.TestingResponseErrorEvent, } n.storeCfg.KVAdmissionController = kvadmission.MakeController( - kvAdmissionQ, elasticCPUGrantCoord.ElasticCPUWorkQueue, storeGrantCoords, cfg.Settings, + kvAdmissionQ, elasticCPUGrantCoord, storeGrantCoords, cfg.Settings, ) n.storeCfg.SchedulerLatencyListener = elasticCPUGrantCoord.SchedulerLatencyListener n.perReplicaServer = kvserver.MakeServer(&n.Descriptor, n.stores) diff --git a/pkg/server/server.go b/pkg/server/server.go index 705809477482..3bd6c9f8442b 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -851,6 +851,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { monitorAndMetrics: sqlMonitorAndMetrics, settingsStorage: settingsWriter, eventsServer: eventsServer, + admissionPacerFactory: gcoords.Elastic, }) if err != nil { return nil, err diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 9e7576920d6e..69f66eebf9b3 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -355,6 +355,10 @@ type sqlServerArgs struct { // eventsServer communicates with the Observability Service. eventsServer *obs.EventsServer + + // admissionPacerFactory is used for elastic CPU control when performing + // CPU intensive operations, such as CDC event encoding/decoding. + admissionPacerFactory admission.PacerFactory } type monitorAndMetrics struct { @@ -681,6 +685,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { CollectionFactory: collectionFactory, ExternalIORecorder: cfg.costController, RangeStatsFetcher: rangeStatsFetcher, + AdmissionPacerFactory: cfg.admissionPacerFactory, } cfg.TempStorageConfig.Mon.SetMetrics(distSQLMetrics.CurDiskBytesCount, distSQLMetrics.MaxDiskBytesHist) if distSQLTestingKnobs := cfg.TestingKnobs.DistSQL; distSQLTestingKnobs != nil { diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 09722e9c1572..668dd57d698c 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -48,6 +48,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" @@ -574,6 +575,9 @@ func makeTenantSQLServerArgs( ) obspb.RegisterObsServer(grpcServer.Server, obsServer) + // TODO(irfansharif): hook up NewGrantCoordinatorSQL. + var noopElasticCPUGrantCoord *admission.ElasticCPUGrantCoordinator = nil + return sqlServerArgs{ sqlServerOptionalKVArgs: sqlServerOptionalKVArgs{ nodesStatusServer: serverpb.MakeOptionalNodesStatusServer(nil), @@ -621,6 +625,7 @@ func makeTenantSQLServerArgs( monitorAndMetrics: monitorAndMetrics, grpc: grpcServer, eventsServer: obsServer, + admissionPacerFactory: noopElasticCPUGrantCoord, }, nil } diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 323b04d71136..a34f481666dc 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -192,6 +192,10 @@ type ServerConfig struct { // RangeStatsFetcher is used to fetch range stats for keys. RangeStatsFetcher eval.RangeStatsFetcher + + // AdmissionPacerFactory is used to integrate CPU-intensive work + // with elastic CPU control. + AdmissionPacerFactory admission.PacerFactory } // RuntimeStats is an interface through which the rowexec layer can get diff --git a/pkg/util/admission/BUILD.bazel b/pkg/util/admission/BUILD.bazel index 552d269225ff..bcd7c2d9f3eb 100644 --- a/pkg/util/admission/BUILD.bazel +++ b/pkg/util/admission/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "granter.go", "io_load_listener.go", "kv_slot_adjuster.go", + "pacer.go", "scheduler_latency_listener.go", "sql_cpu_overload_indicator.go", "store_token_estimation.go", diff --git a/pkg/util/admission/admission.go b/pkg/util/admission/admission.go index 998bc1065dc9..7ad56fcd9d10 100644 --- a/pkg/util/admission/admission.go +++ b/pkg/util/admission/admission.go @@ -561,3 +561,10 @@ type storeRequestEstimates struct { // writeTokens is the tokens to request at admission time. Must be > 0. writeTokens int64 } + +// PacerFactory is used to construct a new admission.Pacer. +type PacerFactory interface { + NewPacer(unit time.Duration, wi WorkInfo) *Pacer +} + +var _ PacerFactory = &ElasticCPUGrantCoordinator{} diff --git a/pkg/util/admission/grant_coordinator.go b/pkg/util/admission/grant_coordinator.go index 67b187a27966..c4c78d3030bb 100644 --- a/pkg/util/admission/grant_coordinator.go +++ b/pkg/util/admission/grant_coordinator.go @@ -1014,3 +1014,15 @@ func (e *ElasticCPUGrantCoordinator) close() { func (e *ElasticCPUGrantCoordinator) tryGrant() { e.elasticCPUGranter.tryGrant() } + +// NewPacer implements the PacerMaker interface. +func (e *ElasticCPUGrantCoordinator) NewPacer(unit time.Duration, wi WorkInfo) *Pacer { + if e == nil { + return nil + } + return &Pacer{ + unit: unit, + wi: wi, + wq: e.ElasticCPUWorkQueue, + } +} diff --git a/pkg/util/admission/pacer.go b/pkg/util/admission/pacer.go new file mode 100644 index 000000000000..ea5eabcd1685 --- /dev/null +++ b/pkg/util/admission/pacer.go @@ -0,0 +1,60 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package admission + +import ( + "context" + "time" +) + +// Pacer is used in tight loops (CPU-bound) for non-premptible elastic work. +// Callers are expected to invoke Pace() every loop iteration and Close() once +// done. Internally this type integrates with elastic CPU work queue, acquiring +// tokens for the CPU work being done, and blocking if tokens are unavailable. +// This allows for a form of cooperative scheduling with elastic CPU granters. +type Pacer struct { + unit time.Duration + wi WorkInfo + wq *ElasticCPUWorkQueue + + cur *ElasticCPUWorkHandle +} + +// Pace is part of the Pacer interface. +func (p *Pacer) Pace(ctx context.Context) error { + if p == nil { + return nil + } + + if overLimit, _ := p.cur.OverLimit(); overLimit { + p.wq.AdmittedWorkDone(p.cur) + p.cur = nil + } + + if p.cur == nil { + handle, err := p.wq.Admit(ctx, p.unit, p.wi) + if err != nil { + return err + } + p.cur = handle + } + return nil +} + +// Close is part of the Pacer interface. +func (p *Pacer) Close() { + if p == nil || p.cur == nil { + return + } + + p.wq.AdmittedWorkDone(p.cur) + p.cur = nil +}