From 3c01ccca11f5703710892ad887b195f4dd6bb31c Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Sun, 9 Oct 2022 19:48:57 -0400 Subject: [PATCH 1/5] kvadmission: carve out package for AC<->KV integration Pure code movement/renaming. We also renamed a cluster setting kv.store.admission.provisioned_bandwidth to kvadmission.store.provisioned_bandwidth. Release note (general note): We renamed a cluster setting kv.store.admission.provisioned_bandwidth to kvadmission.store.provisioned_bandwidth. --- docs/generated/settings/settings.html | 2 +- pkg/BUILD.bazel | 2 + pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/kvadmission/BUILD.bazel | 24 ++ pkg/kv/kvserver/kvadmission/kvadmission.go | 390 ++++++++++++++++++ pkg/kv/kvserver/mvcc_gc_queue.go | 5 +- .../replica_application_state_machine.go | 23 +- pkg/kv/kvserver/replica_raft.go | 13 +- pkg/kv/kvserver/replica_read.go | 8 +- pkg/kv/kvserver/replica_send.go | 11 +- pkg/kv/kvserver/replica_write.go | 8 +- pkg/kv/kvserver/store.go | 331 +-------------- pkg/kv/kvserver/store_send.go | 5 +- pkg/kv/kvserver/stores.go | 28 +- pkg/server/BUILD.bazel | 1 + pkg/server/node.go | 13 +- 16 files changed, 471 insertions(+), 394 deletions(-) create mode 100644 pkg/kv/kvserver/kvadmission/BUILD.bazel create mode 100644 pkg/kv/kvserver/kvadmission/kvadmission.go diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 0651ccb93275..e9867f87fbc8 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -63,10 +63,10 @@ kv.snapshot_delegation.enabledbooleanfalseset to true to allow snapshots from follower replicas kv.snapshot_rebalance.max_ratebyte size32 MiBthe rate limit (bytes/sec) to use for rebalance and upreplication snapshots kv.snapshot_recovery.max_ratebyte size32 MiBthe rate limit (bytes/sec) to use for recovery snapshots -kv.store.admission.provisioned_bandwidthbyte size0 Bif set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be over-ridden on a per-store basis using the --store flag kv.transaction.max_intents_bytesinteger4194304maximum number of bytes used to track locks in transactions kv.transaction.max_refresh_spans_bytesinteger4194304maximum number of bytes used to track refresh spans in serializable transactions kv.transaction.reject_over_max_intents_budget.enabledbooleanfalseif set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed +kvadmission.store.provisioned_bandwidthbyte size0 Bif set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be over-ridden on a per-store basis using the --store flag schedules.backup.gc_protection.enabledbooleantrueenable chaining of GC protection across backups run as part of a schedule security.ocsp.modeenumerationoffuse OCSP to check whether TLS certificates are revoked. If the OCSP server is unreachable, in strict mode all certificates will be rejected and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2] security.ocsp.timeoutduration3stimeout before considering the OCSP server unreachable diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 055c51ff2ff7..1326a7b003b5 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1142,6 +1142,7 @@ GO_TARGETS = [ "//pkg/kv/kvserver/idalloc:idalloc_test", "//pkg/kv/kvserver/intentresolver:intentresolver", "//pkg/kv/kvserver/intentresolver:intentresolver_test", + "//pkg/kv/kvserver/kvadmission:kvadmission", "//pkg/kv/kvserver/kvserverbase:kvserverbase", "//pkg/kv/kvserver/kvserverpb:kvserverpb", "//pkg/kv/kvserver/liveness/livenesspb:livenesspb", @@ -2439,6 +2440,7 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/gc:get_x_data", "//pkg/kv/kvserver/idalloc:get_x_data", "//pkg/kv/kvserver/intentresolver:get_x_data", + "//pkg/kv/kvserver/kvadmission:get_x_data", "//pkg/kv/kvserver/kvserverbase:get_x_data", "//pkg/kv/kvserver/kvserverpb:get_x_data", "//pkg/kv/kvserver/liveness:get_x_data", diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 37adbb41f394..b2b79c085ea1 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -132,6 +132,7 @@ go_library( "//pkg/kv/kvserver/gc", "//pkg/kv/kvserver/idalloc", "//pkg/kv/kvserver/intentresolver", + "//pkg/kv/kvserver/kvadmission", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/liveness", diff --git a/pkg/kv/kvserver/kvadmission/BUILD.bazel b/pkg/kv/kvserver/kvadmission/BUILD.bazel new file mode 100644 index 000000000000..44573b5e43fc --- /dev/null +++ b/pkg/kv/kvserver/kvadmission/BUILD.bazel @@ -0,0 +1,24 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "kvadmission", + srcs = ["kvadmission.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission", + visibility = ["//visibility:public"], + deps = [ + "//pkg/roachpb", + "//pkg/settings", + "//pkg/settings/cluster", + "//pkg/util/admission", + "//pkg/util/admission/admissionpb", + "//pkg/util/buildutil", + "//pkg/util/log", + "//pkg/util/stop", + "//pkg/util/timeutil", + "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_pebble//:pebble", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go new file mode 100644 index 000000000000..5e35934945c7 --- /dev/null +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -0,0 +1,390 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package kvadmission is the integration layer between KV and admission +// control. +package kvadmission + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/admission" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/pebble" +) + +// elasticCPUDurationPerExportRequest controls how many CPU tokens are allotted +// for each export request. +var elasticCPUDurationPerExportRequest = settings.RegisterDurationSetting( + settings.SystemOnly, + "kvadmission.elastic_cpu.duration_per_export_request", + "controls how many CPU tokens are allotted for each export request", + admission.MaxElasticCPUDuration, + func(duration time.Duration) error { + if duration < admission.MinElasticCPUDuration { + return fmt.Errorf("minimum CPU duration allowed per export request is %s, got %s", + admission.MinElasticCPUDuration, duration) + } + if duration > admission.MaxElasticCPUDuration { + return fmt.Errorf("maximum CPU duration allowed per export request is %s, got %s", + admission.MaxElasticCPUDuration, duration) + } + return nil + }, +) + +// Controller provides admission control for the KV layer. +type Controller interface { + // AdmitKVWork must be called before performing KV work. + // BatchRequest.AdmissionHeader and BatchRequest.Replica.StoreID must be + // populated for admission to work correctly. If err is non-nil, the + // returned handle can be ignored. If err is nil, AdmittedKVWorkDone must be + // called after the KV work is done executing. + AdmitKVWork( + ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest, + ) (Handle, error) + // AdmittedKVWorkDone is called after the admitted KV work is done + // executing. + AdmittedKVWorkDone(Handle, *StoreWriteBytes) + // SetTenantWeightProvider is used to set the provider that will be + // periodically polled for weights. The stopper should be used to terminate + // the periodic polling. + SetTenantWeightProvider(provider TenantWeightProvider, stopper *stop.Stopper) + // SnapshotIngested informs admission control about a range snapshot + // ingestion. + SnapshotIngested(storeID roachpb.StoreID, ingestStats pebble.IngestOperationStats) + // FollowerStoreWriteBytes informs admission control about writes + // replicated to a raft follower, that have not been subject to admission + // control. + FollowerStoreWriteBytes(storeID roachpb.StoreID, followerWriteBytes FollowerStoreWriteBytes) +} + +// TenantWeightProvider can be periodically asked to provide the tenant +// weights. +type TenantWeightProvider interface { + GetTenantWeights() TenantWeights +} + +// TenantWeights contains the various tenant weights. +type TenantWeights struct { + // Node is the node level tenant ID => weight. + Node map[uint64]uint32 + // Stores contains the per-store tenant weights. + Stores []TenantWeightsForStore +} + +// TenantWeightsForStore contains the tenant weights for a store. +type TenantWeightsForStore struct { + roachpb.StoreID + // Weights is tenant ID => weight. + Weights map[uint64]uint32 +} + +// controllerImpl implements Controller interface. +type controllerImpl struct { + // Admission control queues and coordinators. All three should be nil or + // non-nil. + kvAdmissionQ *admission.WorkQueue + storeGrantCoords *admission.StoreGrantCoordinators + elasticCPUWorkQueue *admission.ElasticCPUWorkQueue + settings *cluster.Settings + every log.EveryN +} + +var _ Controller = &controllerImpl{} + +// Handle groups data around some piece admitted work. Depending on the +// type of work, it holds (a) references to specific work queues, (b) state +// needed to inform said work queues of what work was done after the fact, and +// (c) information around how much work a request is allowed to do (used for +// cooperative scheduling with elastic CPU granters). +type Handle struct { + tenantID roachpb.TenantID + storeAdmissionQ *admission.StoreWorkQueue + storeWorkHandle admission.StoreWorkHandle + ElasticCPUWorkHandle *admission.ElasticCPUWorkHandle + + callAdmittedWorkDoneOnKVAdmissionQ bool +} + +// MakeController returns a Controller. All three parameters must together be +// nil or non-nil. +func MakeController( + kvAdmissionQ *admission.WorkQueue, + elasticCPUWorkQueue *admission.ElasticCPUWorkQueue, + storeGrantCoords *admission.StoreGrantCoordinators, + settings *cluster.Settings, +) Controller { + return &controllerImpl{ + kvAdmissionQ: kvAdmissionQ, + storeGrantCoords: storeGrantCoords, + elasticCPUWorkQueue: elasticCPUWorkQueue, + settings: settings, + every: log.Every(10 * time.Second), + } +} + +// AdmitKVWork implements the Controller interface. +// +// TODO(irfansharif): There's a fair bit happening here and there's no test +// coverage. Fix that. +func (n *controllerImpl) AdmitKVWork( + ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest, +) (handle Handle, retErr error) { + ah := Handle{tenantID: tenantID} + if n.kvAdmissionQ == nil { + return ah, nil + } + + bypassAdmission := ba.IsAdmin() + source := ba.AdmissionHeader.Source + if !roachpb.IsSystemTenantID(tenantID.ToUint64()) { + // Request is from a SQL node. + bypassAdmission = false + source = roachpb.AdmissionHeader_FROM_SQL + } + if source == roachpb.AdmissionHeader_OTHER { + bypassAdmission = true + } + // TODO(abaptist): Revisit and deprecate this setting in v23.1. + if admission.KVBulkOnlyAdmissionControlEnabled.Get(&n.settings.SV) { + if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri { + bypassAdmission = true + } + } + createTime := ba.AdmissionHeader.CreateTime + if !bypassAdmission && createTime == 0 { + // TODO(sumeer): revisit this for multi-tenant. Specifically, the SQL use + // of zero CreateTime needs to be revisited. It should use high priority. + createTime = timeutil.Now().UnixNano() + } + admissionInfo := admission.WorkInfo{ + TenantID: tenantID, + Priority: admissionpb.WorkPriority(ba.AdmissionHeader.Priority), + CreateTime: createTime, + BypassAdmission: bypassAdmission, + } + + admissionEnabled := true + // Don't subject HeartbeatTxnRequest to the storeAdmissionQ. Even though + // it would bypass admission, it would consume a slot. When writes are + // throttled, we start generating more txn heartbeats, which then consume + // all the slots, causing no useful work to happen. We do want useful work + // to continue even when throttling since there are often significant + // number of tokens available. + if ba.IsWrite() && !ba.IsSingleHeartbeatTxnRequest() { + storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID)) + if storeAdmissionQ != nil { + storeWorkHandle, err := storeAdmissionQ.Admit( + ctx, admission.StoreWriteWorkInfo{WorkInfo: admissionInfo}) + if err != nil { + return Handle{}, err + } + admissionEnabled = storeWorkHandle.AdmissionEnabled() + if admissionEnabled { + defer func() { + if retErr != nil { + // No bytes were written. + _ = storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, admission.StoreWorkDoneInfo{}) + } + }() + ah.storeAdmissionQ, ah.storeWorkHandle = storeAdmissionQ, storeWorkHandle + } + } + } + if admissionEnabled { + if ba.IsSingleExportRequest() { + // Backups generate batches with single export requests, which we + // admit through the elastic CPU work queue. We grant this + // CPU-intensive work a set amount of CPU time and expect it to + // terminate (cooperatively) once it exceeds its grant. The amount + // disbursed is 100ms, which we've experimentally found to be long + // enough to do enough useful work per-request while not causing too + // much in the way of scheduling delays on individual cores. Within + // admission control we have machinery that observes scheduling + // latencies periodically and reduces the total amount of CPU time + // handed out through this mechanism, as a way to provide latency + // isolation to non-elastic ("latency sensitive") work running on + // the same machine. + elasticWorkHandle, err := n.elasticCPUWorkQueue.Admit( + ctx, elasticCPUDurationPerExportRequest.Get(&n.settings.SV), admissionInfo, + ) + if err != nil { + return Handle{}, err + } + ah.ElasticCPUWorkHandle = elasticWorkHandle + defer func() { + if retErr != nil { + // No elastic work was done. + n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) + } + }() + } else { + callAdmittedWorkDoneOnKVAdmissionQ, err := n.kvAdmissionQ.Admit(ctx, admissionInfo) + if err != nil { + return Handle{}, err + } + ah.callAdmittedWorkDoneOnKVAdmissionQ = callAdmittedWorkDoneOnKVAdmissionQ + } + } + return ah, nil +} + +// AdmittedKVWorkDone implements the Controller interface. +func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteBytes) { + n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) + if ah.callAdmittedWorkDoneOnKVAdmissionQ { + n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID) + } + if ah.storeAdmissionQ != nil { + var doneInfo admission.StoreWorkDoneInfo + if writeBytes != nil { + doneInfo = admission.StoreWorkDoneInfo(*writeBytes) + } + err := ah.storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, doneInfo) + if err != nil { + // This shouldn't be happening. + if buildutil.CrdbTestBuild { + log.Fatalf(context.Background(), "%s", errors.WithAssertionFailure(err)) + } + if n.every.ShouldLog() { + log.Errorf(context.Background(), "%s", err) + } + } + } +} + +// SetTenantWeightProvider implements the Controller interface. +func (n *controllerImpl) SetTenantWeightProvider( + provider TenantWeightProvider, stopper *stop.Stopper, +) { + // TODO(irfansharif): Use a stopper here instead. + go func() { + const weightCalculationPeriod = 10 * time.Minute + ticker := time.NewTicker(weightCalculationPeriod) + // Used for short-circuiting the weights calculation if all weights are + // disabled. + allWeightsDisabled := false + for { + select { + case <-ticker.C: + kvDisabled := !admission.KVTenantWeightsEnabled.Get(&n.settings.SV) + kvStoresDisabled := !admission.KVStoresTenantWeightsEnabled.Get(&n.settings.SV) + if allWeightsDisabled && kvDisabled && kvStoresDisabled { + // Have already transitioned to disabled, so noop. + continue + } + weights := provider.GetTenantWeights() + if kvDisabled { + weights.Node = nil + } + n.kvAdmissionQ.SetTenantWeights(weights.Node) + n.elasticCPUWorkQueue.SetTenantWeights(weights.Node) + + for _, storeWeights := range weights.Stores { + q := n.storeGrantCoords.TryGetQueueForStore(int32(storeWeights.StoreID)) + if q != nil { + if kvStoresDisabled { + storeWeights.Weights = nil + } + q.SetTenantWeights(storeWeights.Weights) + } + } + allWeightsDisabled = kvDisabled && kvStoresDisabled + case <-stopper.ShouldQuiesce(): + ticker.Stop() + return + } + } + }() +} + +// SnapshotIngested implements the Controller interface. +func (n *controllerImpl) SnapshotIngested( + storeID roachpb.StoreID, ingestStats pebble.IngestOperationStats, +) { + storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID)) + if storeAdmissionQ == nil { + return + } + storeAdmissionQ.StatsToIgnore(ingestStats) +} + +// FollowerStoreWriteBytes implements the Controller interface. +func (n *controllerImpl) FollowerStoreWriteBytes( + storeID roachpb.StoreID, followerWriteBytes FollowerStoreWriteBytes, +) { + if followerWriteBytes.WriteBytes == 0 && followerWriteBytes.IngestedBytes == 0 { + return + } + storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID)) + if storeAdmissionQ == nil { + return + } + storeAdmissionQ.BypassedWorkDone( + followerWriteBytes.NumEntries, followerWriteBytes.StoreWorkDoneInfo) +} + +// ProvisionedBandwidth set a value of the provisioned +// bandwidth for each store in the cluster. +var ProvisionedBandwidth = settings.RegisterByteSizeSetting( + settings.SystemOnly, "kvadmission.store.provisioned_bandwidth", + "if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+ + "for each store. It can be over-ridden on a per-store basis using the --store flag", + 0).WithPublic() + +// FollowerStoreWriteBytes captures stats about writes done to a store by a +// replica that is not the leaseholder. These are used for admission control. +type FollowerStoreWriteBytes struct { + NumEntries int64 + admission.StoreWorkDoneInfo +} + +// Merge follower store write statistics using the given data. +func (f *FollowerStoreWriteBytes) Merge(from FollowerStoreWriteBytes) { + f.NumEntries += from.NumEntries + f.WriteBytes += from.WriteBytes + f.IngestedBytes += from.IngestedBytes +} + +// StoreWriteBytes aliases admission.StoreWorkDoneInfo, since the notion of +// "work is done" is specific to admission control and doesn't need to leak +// everywhere. +type StoreWriteBytes admission.StoreWorkDoneInfo + +var storeWriteBytesPool = sync.Pool{ + New: func() interface{} { return &StoreWriteBytes{} }, +} + +// NewStoreWriteBytes constructs a new StoreWriteBytes. +func NewStoreWriteBytes() *StoreWriteBytes { + wb := storeWriteBytesPool.Get().(*StoreWriteBytes) + *wb = StoreWriteBytes{} + return wb +} + +// Release returns the *StoreWriteBytes to the pool. +func (wb *StoreWriteBytes) Release() { + if wb == nil { + return + } + storeWriteBytesPool.Put(wb) +} diff --git a/pkg/kv/kvserver/mvcc_gc_queue.go b/pkg/kv/kvserver/mvcc_gc_queue.go index b02fbda23ad4..c21ed2c4b262 100644 --- a/pkg/kv/kvserver/mvcc_gc_queue.go +++ b/pkg/kv/kvserver/mvcc_gc_queue.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/gc" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -506,7 +507,7 @@ func suspectedFullRangeDeletion(ms enginepb.MVCCStats) bool { type replicaGCer struct { repl *Replica count int32 // update atomically - admissionController KVAdmissionController + admissionController kvadmission.Controller storeID roachpb.StoreID } @@ -533,7 +534,7 @@ func (r *replicaGCer) send(ctx context.Context, req roachpb.GCRequest) error { ba.Add(&req) // Since we are talking directly to the replica, we need to explicitly do // admission control here, as we are bypassing server.Node. - var admissionHandle AdmissionHandle + var admissionHandle kvadmission.Handle if r.admissionController != nil { pri := admissionpb.WorkPriority(gc.AdmissionPriority.Get(&r.repl.ClusterSettings().SV)) ba.AdmissionHeader = roachpb.AdmissionHeader{ diff --git a/pkg/kv/kvserver/replica_application_state_machine.go b/pkg/kv/kvserver/replica_application_state_machine.go index 266aede2d050..3e4a1862339f 100644 --- a/pkg/kv/kvserver/replica_application_state_machine.go +++ b/pkg/kv/kvserver/replica_application_state_machine.go @@ -17,13 +17,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" - "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -57,20 +57,7 @@ type applyCommittedEntriesStats struct { stateAssertions int numEmptyEntries int numConfChangeEntries int - followerStoreWriteBytes followerStoreWriteBytes -} - -// followerStoreWriteBytes captures stats about writes done to a store by a -// replica that is not the leaseholder. These are used for admission control. -type followerStoreWriteBytes struct { - numEntries int64 - admission.StoreWorkDoneInfo -} - -func (f *followerStoreWriteBytes) merge(from followerStoreWriteBytes) { - f.numEntries += from.numEntries - f.WriteBytes += from.WriteBytes - f.IngestedBytes += from.IngestedBytes + followerStoreWriteBytes kvadmission.FollowerStoreWriteBytes } // nonDeterministicFailure is an error type that indicates that a state machine @@ -450,7 +437,7 @@ type replicaAppBatch struct { emptyEntries int mutations int start time.Time - followerStoreWriteBytes followerStoreWriteBytes + followerStoreWriteBytes kvadmission.FollowerStoreWriteBytes // Reused by addAppliedStateKeyToBatch to avoid heap allocations. asAlloc enginepb.RangeAppliedState @@ -551,7 +538,7 @@ func (b *replicaAppBatch) Stage( // nils the AddSSTable field. if !cmd.IsLocal() { writeBytes, ingestedBytes := cmd.getStoreWriteByteSizes() - b.followerStoreWriteBytes.numEntries++ + b.followerStoreWriteBytes.NumEntries++ b.followerStoreWriteBytes.WriteBytes += writeBytes b.followerStoreWriteBytes.IngestedBytes += ingestedBytes } @@ -1077,7 +1064,7 @@ func (b *replicaAppBatch) recordStatsOnCommit() { b.sm.stats.entriesProcessedBytes += b.entryBytes b.sm.stats.numEmptyEntries += b.emptyEntries b.sm.stats.batchesProcessed++ - b.sm.stats.followerStoreWriteBytes.merge(b.followerStoreWriteBytes) + b.sm.stats.followerStoreWriteBytes.Merge(b.followerStoreWriteBytes) elapsed := timeutil.Since(b.start) b.r.store.metrics.RaftCommandCommitLatency.RecordValue(elapsed.Nanoseconds()) diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index 96a41f1cde47..e082c0db604f 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/apply" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" @@ -106,7 +107,13 @@ func (r *Replica) evalAndPropose( st *kvserverpb.LeaseStatus, ui uncertainty.Interval, tok TrackedRequestToken, -) (chan proposalResult, func(), kvserverbase.CmdIDKey, *StoreWriteBytes, *roachpb.Error) { +) ( + chan proposalResult, + func(), + kvserverbase.CmdIDKey, + *kvadmission.StoreWriteBytes, + *roachpb.Error, +) { defer tok.DoneIfNotMoved(ctx) idKey := makeIDKey() proposal, pErr := r.requestToProposal(ctx, idKey, ba, g, st, ui) @@ -167,7 +174,7 @@ func (r *Replica) evalAndPropose( // typical lag in consensus is expected to be small compared to the time // granularity of admission control doing token and size estimation (which // is 15s). Also, admission control corrects for gaps in reporting. - writeBytes := newStoreWriteBytes() + writeBytes := kvadmission.NewStoreWriteBytes() if proposal.command.WriteBatch != nil { writeBytes.WriteBytes = int64(len(proposal.command.WriteBatch.Data)) } @@ -1046,7 +1053,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return stats, getNonDeterministicFailureExplanation(err), err } if r.store.cfg.KVAdmissionController != nil && - stats.apply.followerStoreWriteBytes.numEntries > 0 { + stats.apply.followerStoreWriteBytes.NumEntries > 0 { r.store.cfg.KVAdmissionController.FollowerStoreWriteBytes( r.store.StoreID(), stats.apply.followerStoreWriteBytes) } diff --git a/pkg/kv/kvserver/replica_read.go b/pkg/kv/kvserver/replica_read.go index 7f2830853047..f63b0666cc82 100644 --- a/pkg/kv/kvserver/replica_read.go +++ b/pkg/kv/kvserver/replica_read.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" @@ -38,7 +39,12 @@ import ( // reflect the key spans that it read. func (r *Replica) executeReadOnlyBatch( ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, -) (br *roachpb.BatchResponse, _ *concurrency.Guard, _ *StoreWriteBytes, pErr *roachpb.Error) { +) ( + br *roachpb.BatchResponse, + _ *concurrency.Guard, + _ *kvadmission.StoreWriteBytes, + pErr *roachpb.Error, +) { r.readOnlyCmdMu.RLock() defer r.readOnlyCmdMu.RUnlock() diff --git a/pkg/kv/kvserver/replica_send.go b/pkg/kv/kvserver/replica_send.go index 89c05635011d..ab5a705ff0a2 100644 --- a/pkg/kv/kvserver/replica_send.go +++ b/pkg/kv/kvserver/replica_send.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/poison" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/replicastats" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" @@ -120,7 +121,7 @@ func (r *Replica) Send( // *StoreWriteBytes return value. func (r *Replica) SendWithWriteBytes( ctx context.Context, req roachpb.BatchRequest, -) (*roachpb.BatchResponse, *StoreWriteBytes, *roachpb.Error) { +) (*roachpb.BatchResponse, *kvadmission.StoreWriteBytes, *roachpb.Error) { if r.store.cfg.Settings.CPUProfileType() == cluster.CPUProfileWithLabels { defer pprof.SetGoroutineLabels(ctx) // Note: the defer statement captured the previous context. @@ -168,7 +169,7 @@ func (r *Replica) SendWithWriteBytes( // Differentiate between read-write, read-only, and admin. var br *roachpb.BatchResponse var pErr *roachpb.Error - var writeBytes *StoreWriteBytes + var writeBytes *kvadmission.StoreWriteBytes if isReadOnly { log.Event(ctx, "read-only path") fn := (*Replica).executeReadOnlyBatch @@ -373,7 +374,7 @@ func (r *Replica) maybeAddRangeInfoToResponse( // concurrency guard back to the caller. type batchExecutionFn func( *Replica, context.Context, *roachpb.BatchRequest, *concurrency.Guard, -) (*roachpb.BatchResponse, *concurrency.Guard, *StoreWriteBytes, *roachpb.Error) +) (*roachpb.BatchResponse, *concurrency.Guard, *kvadmission.StoreWriteBytes, *roachpb.Error) var _ batchExecutionFn = (*Replica).executeWriteBatch var _ batchExecutionFn = (*Replica).executeReadOnlyBatch @@ -394,7 +395,7 @@ var _ batchExecutionFn = (*Replica).executeReadOnlyBatch // handles the process of retrying batch execution after addressing the error. func (r *Replica) executeBatchWithConcurrencyRetries( ctx context.Context, ba *roachpb.BatchRequest, fn batchExecutionFn, -) (br *roachpb.BatchResponse, writeBytes *StoreWriteBytes, pErr *roachpb.Error) { +) (br *roachpb.BatchResponse, writeBytes *kvadmission.StoreWriteBytes, pErr *roachpb.Error) { // Try to execute command; exit retry loop on success. var latchSpans, lockSpans *spanset.SpanSet var requestEvalKind concurrency.RequestEvalKind @@ -1046,7 +1047,7 @@ func (r *Replica) getBatchRequestQPS(ctx context.Context, ba *roachpb.BatchReque // recordRequestWriteBytes records the write bytes from a replica batch // request. -func (r *Replica) recordRequestWriteBytes(writeBytes *StoreWriteBytes) { +func (r *Replica) recordRequestWriteBytes(writeBytes *kvadmission.StoreWriteBytes) { if writeBytes == nil { return } diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index 3924b896a170..7acac0ff25fb 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" @@ -77,7 +78,12 @@ var migrateApplicationTimeout = settings.RegisterDurationSetting( // call to applyTimestampCache). func (r *Replica) executeWriteBatch( ctx context.Context, ba *roachpb.BatchRequest, g *concurrency.Guard, -) (br *roachpb.BatchResponse, _ *concurrency.Guard, _ *StoreWriteBytes, pErr *roachpb.Error) { +) ( + br *roachpb.BatchResponse, + _ *concurrency.Guard, + _ *kvadmission.StoreWriteBytes, + pErr *roachpb.Error, +) { startTime := timeutil.Now() // Even though we're not a read-only operation by definition, we have to diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 03161f2da852..e174c887e270 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/idalloc" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue" @@ -62,7 +63,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" - "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/envutil" @@ -85,7 +85,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" - "github.com/cockroachdb/pebble" "github.com/cockroachdb/redact" "go.etcd.io/etcd/raft/v3" "golang.org/x/time/rate" @@ -1120,7 +1119,7 @@ type StoreConfig struct { SpanConfigSubscriber spanconfig.KVSubscriber // KVAdmissionController is an optional field used for admission control. - KVAdmissionController KVAdmissionController + KVAdmissionController kvadmission.Controller // SchedulerLatencyListener listens in on scheduling latencies, information // that's then used to adjust various admission control components (like how @@ -3807,329 +3806,3 @@ func min(a, b int) int { } return b } - -// elasticCPUDurationPerExportRequest controls how many CPU tokens are allotted -// for each export request. -var elasticCPUDurationPerExportRequest = settings.RegisterDurationSetting( - settings.SystemOnly, - "kvadmission.elastic_cpu.duration_per_export_request", - "controls how many CPU tokens are allotted for each export request", - admission.MaxElasticCPUDuration, - func(duration time.Duration) error { - if duration < admission.MinElasticCPUDuration { - return fmt.Errorf("minimum CPU duration allowed per export request is %s, got %s", - admission.MinElasticCPUDuration, duration) - } - if duration > admission.MaxElasticCPUDuration { - return fmt.Errorf("maximum CPU duration allowed per export request is %s, got %s", - admission.MaxElasticCPUDuration, duration) - } - return nil - }, -) - -// KVAdmissionController provides admission control for the KV layer. -type KVAdmissionController interface { - // AdmitKVWork must be called before performing KV work. - // BatchRequest.AdmissionHeader and BatchRequest.Replica.StoreID must be - // populated for admission to work correctly. If err is non-nil, the - // returned handle can be ignored. If err is nil, AdmittedKVWorkDone must be - // called after the KV work is done executing. - AdmitKVWork( - ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest, - ) (AdmissionHandle, error) - // AdmittedKVWorkDone is called after the admitted KV work is done - // executing. - AdmittedKVWorkDone(AdmissionHandle, *StoreWriteBytes) - // SetTenantWeightProvider is used to set the provider that will be - // periodically polled for weights. The stopper should be used to terminate - // the periodic polling. - SetTenantWeightProvider(provider TenantWeightProvider, stopper *stop.Stopper) - // SnapshotIngested informs admission control about a range snapshot - // ingestion. - SnapshotIngested(storeID roachpb.StoreID, ingestStats pebble.IngestOperationStats) - // FollowerStoreWriteBytes informs admission control about writes - // replicated to a raft follower, that have not been subject to admission - // control. - FollowerStoreWriteBytes(storeID roachpb.StoreID, followerWriteBytes followerStoreWriteBytes) -} - -// TenantWeightProvider can be periodically asked to provide the tenant -// weights. -type TenantWeightProvider interface { - GetTenantWeights() TenantWeights -} - -// TenantWeights contains the various tenant weights. -type TenantWeights struct { - // Node is the node level tenant ID => weight. - Node map[uint64]uint32 - // Stores contains the per-store tenant weights. - Stores []TenantWeightsForStore -} - -// TenantWeightsForStore contains the tenant weights for a store. -type TenantWeightsForStore struct { - roachpb.StoreID - // Weights is tenant ID => weight. - Weights map[uint64]uint32 -} - -// KVAdmissionControllerImpl implements KVAdmissionController interface. -type KVAdmissionControllerImpl struct { - // Admission control queues and coordinators. All three should be nil or - // non-nil. - kvAdmissionQ *admission.WorkQueue - storeGrantCoords *admission.StoreGrantCoordinators - elasticCPUWorkQueue *admission.ElasticCPUWorkQueue - settings *cluster.Settings - every log.EveryN -} - -var _ KVAdmissionController = &KVAdmissionControllerImpl{} - -// AdmissionHandle groups data around some piece admitted work. Depending on the -// type of work, it holds (a) references to specific work queues, (b) state -// needed to inform said work queues of what work was done after the fact, and -// (c) information around how much work a request is allowed to do (used for -// cooperative scheduling with elastic CPU granters). -// -// TODO(irfansharif): Consider moving KVAdmissionController and adjacent types -// into a kvserver/kvadmission package. -type AdmissionHandle struct { - tenantID roachpb.TenantID - storeAdmissionQ *admission.StoreWorkQueue - storeWorkHandle admission.StoreWorkHandle - ElasticCPUWorkHandle *admission.ElasticCPUWorkHandle - - callAdmittedWorkDoneOnKVAdmissionQ bool -} - -// MakeKVAdmissionController returns a KVAdmissionController. All three -// parameters must together be nil or non-nil. -func MakeKVAdmissionController( - kvAdmissionQ *admission.WorkQueue, - elasticCPUWorkQueue *admission.ElasticCPUWorkQueue, - storeGrantCoords *admission.StoreGrantCoordinators, - settings *cluster.Settings, -) KVAdmissionController { - return &KVAdmissionControllerImpl{ - kvAdmissionQ: kvAdmissionQ, - storeGrantCoords: storeGrantCoords, - elasticCPUWorkQueue: elasticCPUWorkQueue, - settings: settings, - every: log.Every(10 * time.Second), - } -} - -// AdmitKVWork implements the KVAdmissionController interface. -// -// TODO(irfansharif): There's a fair bit happening here and there's no test -// coverage. Fix that. -func (n *KVAdmissionControllerImpl) AdmitKVWork( - ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest, -) (handle AdmissionHandle, retErr error) { - ah := AdmissionHandle{tenantID: tenantID} - if n.kvAdmissionQ == nil { - return ah, nil - } - - bypassAdmission := ba.IsAdmin() - source := ba.AdmissionHeader.Source - if !roachpb.IsSystemTenantID(tenantID.ToUint64()) { - // Request is from a SQL node. - bypassAdmission = false - source = roachpb.AdmissionHeader_FROM_SQL - } - if source == roachpb.AdmissionHeader_OTHER { - bypassAdmission = true - } - // TODO(abaptist): Revisit and deprecate this setting in v23.1. - if admission.KVBulkOnlyAdmissionControlEnabled.Get(&n.settings.SV) { - if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri { - bypassAdmission = true - } - } - - createTime := ba.AdmissionHeader.CreateTime - if !bypassAdmission && createTime == 0 { - // TODO(sumeer): revisit this for multi-tenant. Specifically, the SQL use - // of zero CreateTime needs to be revisited. It should use high priority. - createTime = timeutil.Now().UnixNano() - } - admissionInfo := admission.WorkInfo{ - TenantID: tenantID, - Priority: admissionpb.WorkPriority(ba.AdmissionHeader.Priority), - CreateTime: createTime, - BypassAdmission: bypassAdmission, - } - - admissionEnabled := true - // Don't subject HeartbeatTxnRequest to the storeAdmissionQ. Even though - // it would bypass admission, it would consume a slot. When writes are - // throttled, we start generating more txn heartbeats, which then consume - // all the slots, causing no useful work to happen. We do want useful work - // to continue even when throttling since there are often significant - // number of tokens available. - if ba.IsWrite() && !ba.IsSingleHeartbeatTxnRequest() { - storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(ba.Replica.StoreID)) - if storeAdmissionQ != nil { - storeWorkHandle, err := storeAdmissionQ.Admit( - ctx, admission.StoreWriteWorkInfo{WorkInfo: admissionInfo}) - if err != nil { - return AdmissionHandle{}, err - } - admissionEnabled = storeWorkHandle.AdmissionEnabled() - if admissionEnabled { - defer func() { - if retErr != nil { - // No bytes were written. - _ = storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, admission.StoreWorkDoneInfo{}) - } - }() - ah.storeAdmissionQ, ah.storeWorkHandle = storeAdmissionQ, storeWorkHandle - } - } - } - if admissionEnabled { - if ba.IsSingleExportRequest() { - // Backups generate batches with single export requests, which we - // admit through the elastic CPU work queue. We grant this - // CPU-intensive work a set amount of CPU time and expect it to - // terminate (cooperatively) once it exceeds its grant. The amount - // disbursed is 100ms, which we've experimentally found to be long - // enough to do enough useful work per-request while not causing too - // much in the way of scheduling delays on individual cores. Within - // admission control we have machinery that observes scheduling - // latencies periodically and reduces the total amount of CPU time - // handed out through this mechanism, as a way to provide latency - // isolation to non-elastic ("latency sensitive") work running on - // the same machine. - elasticWorkHandle, err := n.elasticCPUWorkQueue.Admit( - ctx, elasticCPUDurationPerExportRequest.Get(&n.settings.SV), admissionInfo, - ) - if err != nil { - return AdmissionHandle{}, err - } - ah.ElasticCPUWorkHandle = elasticWorkHandle - defer func() { - if retErr != nil { - // No elastic work was done. - n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) - } - }() - } else { - callAdmittedWorkDoneOnKVAdmissionQ, err := n.kvAdmissionQ.Admit(ctx, admissionInfo) - if err != nil { - return AdmissionHandle{}, err - } - ah.callAdmittedWorkDoneOnKVAdmissionQ = callAdmittedWorkDoneOnKVAdmissionQ - } - } - return ah, nil -} - -// AdmittedKVWorkDone implements the KVAdmissionController interface. -func (n *KVAdmissionControllerImpl) AdmittedKVWorkDone( - ah AdmissionHandle, writeBytes *StoreWriteBytes, -) { - n.elasticCPUWorkQueue.AdmittedWorkDone(ah.ElasticCPUWorkHandle) - if ah.callAdmittedWorkDoneOnKVAdmissionQ { - n.kvAdmissionQ.AdmittedWorkDone(ah.tenantID) - } - if ah.storeAdmissionQ != nil { - var doneInfo admission.StoreWorkDoneInfo - if writeBytes != nil { - doneInfo = admission.StoreWorkDoneInfo(*writeBytes) - } - err := ah.storeAdmissionQ.AdmittedWorkDone(ah.storeWorkHandle, doneInfo) - if err != nil { - // This shouldn't be happening. - if buildutil.CrdbTestBuild { - log.Fatalf(context.Background(), "%s", errors.WithAssertionFailure(err)) - } - if n.every.ShouldLog() { - log.Errorf(context.Background(), "%s", err) - } - } - } -} - -// SetTenantWeightProvider implements the KVAdmissionController interface. -func (n *KVAdmissionControllerImpl) SetTenantWeightProvider( - provider TenantWeightProvider, stopper *stop.Stopper, -) { - // TODO(irfansharif): Use a stopper here instead. - go func() { - const weightCalculationPeriod = 10 * time.Minute - ticker := time.NewTicker(weightCalculationPeriod) - // Used for short-circuiting the weights calculation if all weights are - // disabled. - allWeightsDisabled := false - for { - select { - case <-ticker.C: - kvDisabled := !admission.KVTenantWeightsEnabled.Get(&n.settings.SV) - kvStoresDisabled := !admission.KVStoresTenantWeightsEnabled.Get(&n.settings.SV) - if allWeightsDisabled && kvDisabled && kvStoresDisabled { - // Have already transitioned to disabled, so noop. - continue - } - weights := provider.GetTenantWeights() - if kvDisabled { - weights.Node = nil - } - n.kvAdmissionQ.SetTenantWeights(weights.Node) - n.elasticCPUWorkQueue.SetTenantWeights(weights.Node) - - for _, storeWeights := range weights.Stores { - q := n.storeGrantCoords.TryGetQueueForStore(int32(storeWeights.StoreID)) - if q != nil { - if kvStoresDisabled { - storeWeights.Weights = nil - } - q.SetTenantWeights(storeWeights.Weights) - } - } - allWeightsDisabled = kvDisabled && kvStoresDisabled - case <-stopper.ShouldQuiesce(): - ticker.Stop() - return - } - } - }() -} - -// SnapshotIngested implements the KVAdmissionController interface. -func (n *KVAdmissionControllerImpl) SnapshotIngested( - storeID roachpb.StoreID, ingestStats pebble.IngestOperationStats, -) { - storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID)) - if storeAdmissionQ == nil { - return - } - storeAdmissionQ.StatsToIgnore(ingestStats) -} - -// FollowerStoreWriteBytes implements the KVAdmissionController interface. -func (n *KVAdmissionControllerImpl) FollowerStoreWriteBytes( - storeID roachpb.StoreID, followerWriteBytes followerStoreWriteBytes, -) { - if followerWriteBytes.WriteBytes == 0 && followerWriteBytes.IngestedBytes == 0 { - return - } - storeAdmissionQ := n.storeGrantCoords.TryGetQueueForStore(int32(storeID)) - if storeAdmissionQ == nil { - return - } - storeAdmissionQ.BypassedWorkDone( - followerWriteBytes.numEntries, followerWriteBytes.StoreWorkDoneInfo) -} - -// ProvisionedBandwidthForAdmissionControl set a value of the provisioned -// bandwidth for each store in the cluster. -var ProvisionedBandwidthForAdmissionControl = settings.RegisterByteSizeSetting( - settings.SystemOnly, "kv.store.admission.provisioned_bandwidth", - "if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), "+ - "for each store. It can be over-ridden on a per-store basis using the --store flag", - 0).WithPublic() diff --git a/pkg/kv/kvserver/store_send.go b/pkg/kv/kvserver/store_send.go index 44c97c591844..cbb4b499552e 100644 --- a/pkg/kv/kvserver/store_send.go +++ b/pkg/kv/kvserver/store_send.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/limit" @@ -45,7 +46,7 @@ import ( func (s *Store) Send( ctx context.Context, ba roachpb.BatchRequest, ) (br *roachpb.BatchResponse, pErr *roachpb.Error) { - var writeBytes *StoreWriteBytes + var writeBytes *kvadmission.StoreWriteBytes br, writeBytes, pErr = s.SendWithWriteBytes(ctx, ba) writeBytes.Release() return br, pErr @@ -55,7 +56,7 @@ func (s *Store) Send( // *StoreWriteBytes return value. func (s *Store) SendWithWriteBytes( ctx context.Context, ba roachpb.BatchRequest, -) (br *roachpb.BatchResponse, writeBytes *StoreWriteBytes, pErr *roachpb.Error) { +) (br *roachpb.BatchResponse, writeBytes *kvadmission.StoreWriteBytes, pErr *roachpb.Error) { // Attach any log tags from the store to the context (which normally // comes from gRPC). ctx = s.AnnotateCtx(ctx) diff --git a/pkg/kv/kvserver/stores.go b/pkg/kv/kvserver/stores.go index 60cdd1b6d8fc..b18c2c24468a 100644 --- a/pkg/kv/kvserver/stores.go +++ b/pkg/kv/kvserver/stores.go @@ -14,7 +14,6 @@ import ( "context" "fmt" math "math" - "sync" "unsafe" "github.com/cockroachdb/cockroach/pkg/clusterversion" @@ -22,9 +21,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" - "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" @@ -187,34 +186,11 @@ func (ls *Stores) Send( return br, pErr } -// StoreWriteBytes aliases admission.StoreWorkDoneInfo, since the notion of -// "work is done" is specific to admission control and doesn't need to leak -// everywhere. -type StoreWriteBytes admission.StoreWorkDoneInfo - -var storeWriteBytesPool = sync.Pool{ - New: func() interface{} { return &StoreWriteBytes{} }, -} - -func newStoreWriteBytes() *StoreWriteBytes { - wb := storeWriteBytesPool.Get().(*StoreWriteBytes) - *wb = StoreWriteBytes{} - return wb -} - -// Release returns the *StoreWriteBytes to the pool. -func (wb *StoreWriteBytes) Release() { - if wb == nil { - return - } - storeWriteBytesPool.Put(wb) -} - // SendWithWriteBytes is the implementation of Send with an additional // *StoreWriteBytes return value. func (ls *Stores) SendWithWriteBytes( ctx context.Context, ba roachpb.BatchRequest, -) (*roachpb.BatchResponse, *StoreWriteBytes, *roachpb.Error) { +) (*roachpb.BatchResponse, *kvadmission.StoreWriteBytes, *roachpb.Error) { if err := ba.ValidateForEvaluation(); err != nil { log.Fatalf(ctx, "invalid batch (%s): %s", ba, err) } diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 542f9c411788..a578b6b50cb2 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -102,6 +102,7 @@ go_library( "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/closedts/ctpb", "//pkg/kv/kvserver/closedts/sidetransport", + "//pkg/kv/kvserver/kvadmission", "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/kvserverpb", "//pkg/kv/kvserver/liveness", diff --git a/pkg/server/node.go b/pkg/server/node.go index def0e0a53e1d..51e505299c58 100644 --- a/pkg/server/node.go +++ b/pkg/server/node.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" @@ -386,7 +387,7 @@ func NewNode( spanConfigAccessor: spanConfigAccessor, testingErrorEvent: cfg.TestingKnobs.TestingResponseErrorEvent, } - n.storeCfg.KVAdmissionController = kvserver.MakeKVAdmissionController( + n.storeCfg.KVAdmissionController = kvadmission.MakeController( kvAdmissionQ, elasticCPUGrantCoord.ElasticCPUWorkQueue, storeGrantCoords, cfg.Settings, ) n.storeCfg.SchedulerLatencyListener = elasticCPUGrantCoord.SchedulerLatencyListener @@ -846,7 +847,7 @@ func (n *Node) registerEnginesForDiskStatsMap( // GetPebbleMetrics implements admission.PebbleMetricsProvider. func (n *Node) GetPebbleMetrics() []admission.StoreMetrics { - clusterProvisionedBandwidth := kvserver.ProvisionedBandwidthForAdmissionControl.Get( + clusterProvisionedBandwidth := kvadmission.ProvisionedBandwidth.Get( &n.storeCfg.Settings.SV) storeIDToDiskStats, err := n.diskStatsMap.tryPopulateAdmissionDiskStats( context.Background(), clusterProvisionedBandwidth, status.GetDiskCounters) @@ -874,13 +875,13 @@ func (n *Node) GetPebbleMetrics() []admission.StoreMetrics { } // GetTenantWeights implements kvserver.TenantWeightProvider. -func (n *Node) GetTenantWeights() kvserver.TenantWeights { - weights := kvserver.TenantWeights{ +func (n *Node) GetTenantWeights() kvadmission.TenantWeights { + weights := kvadmission.TenantWeights{ Node: make(map[uint64]uint32), } _ = n.stores.VisitStores(func(store *kvserver.Store) error { sw := make(map[uint64]uint32) - weights.Stores = append(weights.Stores, kvserver.TenantWeightsForStore{ + weights.Stores = append(weights.Stores, kvadmission.TenantWeightsForStore{ StoreID: store.StoreID(), Weights: sw, }) @@ -1082,7 +1083,7 @@ func (n *Node) batchInternal( ctx = admission.ContextWithElasticCPUWorkHandle(ctx, handle.ElasticCPUWorkHandle) } - var writeBytes *kvserver.StoreWriteBytes + var writeBytes *kvadmission.StoreWriteBytes defer func() { n.storeCfg.KVAdmissionController.AdmittedKVWorkDone(handle, writeBytes) writeBytes.Release() From 25b3a8f18edab32f319ce0d278e194739139ae1f Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 10 Oct 2022 14:25:21 -0400 Subject: [PATCH 2/5] roachprod: simplify grafana-url computation There's only ever one URL. Release note: None --- pkg/cmd/roachprod/main.go | 7 ++----- pkg/roachprod/roachprod.go | 20 +++++++++++--------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/pkg/cmd/roachprod/main.go b/pkg/cmd/roachprod/main.go index b8de605e2803..820d257723ee 100644 --- a/pkg/cmd/roachprod/main.go +++ b/pkg/cmd/roachprod/main.go @@ -925,15 +925,12 @@ var grafanaURLCmd = &cobra.Command{ Short: `returns a url to the grafana dashboard`, Args: cobra.ExactArgs(1), Run: wrap(func(cmd *cobra.Command, args []string) error { - urls, err := roachprod.GrafanaURL(context.Background(), roachprodLibraryLogger, args[0], + url, err := roachprod.GrafanaURL(context.Background(), roachprodLibraryLogger, args[0], grafanaurlOpen) if err != nil { return err } - for _, url := range urls { - fmt.Println(url) - } - fmt.Println("username: admin; pwd: admin") + fmt.Println(url) return nil }), } diff --git a/pkg/roachprod/roachprod.go b/pkg/roachprod/roachprod.go index 1382f4b9bb93..90c5ab364d33 100644 --- a/pkg/roachprod/roachprod.go +++ b/pkg/roachprod/roachprod.go @@ -1401,13 +1401,11 @@ func StartGrafana( if err != nil { return err } - urls, err := GrafanaURL(ctx, l, clusterName, false) + url, err := GrafanaURL(ctx, l, clusterName, false) if err != nil { return err } - for i, url := range urls { - fmt.Printf("Grafana dashboard %d: %s\n", i, url) - } + fmt.Printf("Grafana dashboard: %s\n", url) return nil } @@ -1434,17 +1432,17 @@ func StopGrafana(ctx context.Context, l *logger.Logger, clusterName string, dump // GrafanaURL returns a url to the grafana dashboard func GrafanaURL( ctx context.Context, l *logger.Logger, clusterName string, openInBrowser bool, -) ([]string, error) { +) (string, error) { if err := LoadClusters(); err != nil { - return nil, err + return "", err } c, err := newCluster(l, clusterName) if err != nil { - return nil, err + return "", err } nodes, err := install.ListNodes("all", len(c.VMs)) if err != nil { - return nil, err + return "", err } // grafana is assumed to be running on the last node in the target grafanaNode := install.Nodes{nodes[len(nodes)-1]} @@ -1455,5 +1453,9 @@ func GrafanaURL( secure: false, port: 3000, } - return urlGenerator(c, l, grafanaNode, uConfig) + urls, err := urlGenerator(c, l, grafanaNode, uConfig) + if err != nil { + return "", err + } + return urls[0], nil } From 1753e543559cee3edaf0a93acbf9b7e382a1bd74 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 10 Oct 2022 14:07:27 -0400 Subject: [PATCH 3/5] kv,rangefeed: integrate catchup scans with elastic cpu Part of #65957. Changefeed backfills, given their scan-heavy nature, can be fairly CPU-intensive. In #89656 we introduced a roachtest demonstrating the latency impact backfills can have on a moderately CPU-saturated cluster. Similar to what we saw for backups, this CPU heavy nature can elevate Go scheduling latencies which in turn translates to foreground latency impact. This commit integrates rangefeed catchup scan with the elastic CPU limiter we introduced in #86638; this is one of two optional halves of changefeed backfills. The second half is the initial scan -- scan requests issued over some keyspan as of some timestamp. For that we simply rely on the existing slots mechanism but now setting a lower priority bit (BulkNormalPri) -- #88733. Experimentally we observed that during initial scans the encoding routines in changefeedccl are the most impactful CPU-wise, something #89589 can help with. We leave admission integration of parallel worker goroutines to future work (#90089). Unlike export requests rangefeed catchup scans are non-premptible. The rangefeed RPC is a streaming one, and the catchup scan is done during stream setup. So we don't have resumption tokens to propagate up to the caller like we did for backups. We still want CPU-bound work going through admission control to only use 100ms of CPU time, to exercise better control over scheduling latencies. To that end, we introduce the following component used within the rangefeed catchup iterator. // Pacer is used in tight loops (CPU-bound) for non-premptible // elastic work. Callers are expected to invoke Pace() every loop // iteration and Close() once done. Internally this type integrates // with elastic CPU work queue, acquiring tokens for the CPU work // being done, and blocking if tokens are unavailable. This allows // for a form of cooperative scheduling with elastic CPU granters. type Pacer struct func (p *Pacer) Pace(ctx context.Context) error { ... } func (p *Pacer) Close() { ... } Release note: None --- .../changefeedccl/changefeedbase/settings.go | 4 +- pkg/clusterversion/cockroach_versions.go | 1 - .../client_replica_circuit_breaker_test.go | 4 +- pkg/kv/kvserver/kvadmission/kvadmission.go | 119 +++++++++++++++++- pkg/kv/kvserver/rangefeed/BUILD.bazel | 1 + pkg/kv/kvserver/rangefeed/catchup_scan.go | 19 ++- .../rangefeed/catchup_scan_bench_test.go | 5 +- .../kvserver/rangefeed/catchup_scan_test.go | 12 +- pkg/kv/kvserver/rangefeed/registry.go | 6 +- pkg/kv/kvserver/rangefeed/registry_test.go | 2 +- pkg/kv/kvserver/replica_rangefeed.go | 17 +-- pkg/kv/kvserver/store.go | 8 +- 12 files changed, 165 insertions(+), 33 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 41b4b0f141cd..0f7bdb8d635a 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -108,7 +108,7 @@ var FrontierCheckpointMaxBytes = settings.RegisterByteSizeSetting( settings.TenantWritable, "changefeed.frontier_checkpoint_max_bytes", "controls the maximum size of the checkpoint as a total size of key bytes", - 1<<20, + 1<<20, // 1 MiB ) // ScanRequestLimit is the number of Scan requests that can run at once. @@ -126,7 +126,7 @@ var ScanRequestSize = settings.RegisterIntSetting( settings.TenantWritable, "changefeed.backfill.scan_request_size", "the maximum number of bytes returned by each scan request", - 16<<20, + 16<<20, // 16 MiB ) // SinkThrottleConfig describes throttling configuration for the sink. diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index ce0991f328e6..ceed8eed156c 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -302,7 +302,6 @@ const ( // supported in cloud storage and KMS. SupportAssumeRoleAuth - // FixUserfileRelatedDescriptorCorruption adds a migration which uses // heuristics to identify invalid table descriptors for userfile-related // descriptors. FixUserfileRelatedDescriptorCorruption diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index 472c52cbc71f..bfe0b928a2ec 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -442,7 +442,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { defer cancel() stream1 := &dummyStream{t: t, ctx: ctx, name: "rangefeed1", recv: make(chan *roachpb.RangeFeedEvent)} require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream1", func(ctx context.Context) { - err := tc.repls[0].RangeFeed(args, stream1).GoError() + err := tc.repls[0].RangeFeed(ctx, args, stream1).GoError() if ctx.Err() != nil { return // main goroutine stopping } @@ -496,7 +496,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { // the breaker. stream2 := &dummyStream{t: t, ctx: ctx, name: "rangefeed2", recv: make(chan *roachpb.RangeFeedEvent)} require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream2", func(ctx context.Context) { - err := tc.repls[0].RangeFeed(args, stream2).GoError() + err := tc.repls[0].RangeFeed(ctx, args, stream2).GoError() if ctx.Err() != nil { return // main goroutine stopping } diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index 5e35934945c7..ad37c3352ac1 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -51,6 +51,26 @@ var elasticCPUDurationPerExportRequest = settings.RegisterDurationSetting( }, ) +// elasticCPUDurationPerRangefeedScanUnit controls how many CPU tokens are +// allotted for each unit of work during rangefeed catchup scans. +var elasticCPUDurationPerRangefeedScanUnit = settings.RegisterDurationSetting( + settings.SystemOnly, + "kvadmission.elastic_cpu.duration_per_rangefeed_scan_unit", + "controls how many CPU tokens are allotted for each unit of work during rangefeed catchup scans", + admission.MaxElasticCPUDuration, + func(duration time.Duration) error { + if duration < admission.MinElasticCPUDuration { + return fmt.Errorf("minimum CPU duration allowed is %s, got %s", + admission.MinElasticCPUDuration, duration) + } + if duration > admission.MaxElasticCPUDuration { + return fmt.Errorf("maximum CPU duration allowed is %s, got %s", + admission.MaxElasticCPUDuration, duration) + } + return nil + }, +) + // Controller provides admission control for the KV layer. type Controller interface { // AdmitKVWork must be called before performing KV work. @@ -58,23 +78,25 @@ type Controller interface { // populated for admission to work correctly. If err is non-nil, the // returned handle can be ignored. If err is nil, AdmittedKVWorkDone must be // called after the KV work is done executing. - AdmitKVWork( - ctx context.Context, tenantID roachpb.TenantID, ba *roachpb.BatchRequest, - ) (Handle, error) + AdmitKVWork(context.Context, roachpb.TenantID, *roachpb.BatchRequest) (Handle, error) // AdmittedKVWorkDone is called after the admitted KV work is done // executing. AdmittedKVWorkDone(Handle, *StoreWriteBytes) + // AdmitRangefeedRequest must be called before serving rangefeed requests. + // It returns a Pacer that's used within rangefeed catchup scans (typically + // CPU-intensive and affects scheduling latencies negatively). + AdmitRangefeedRequest(roachpb.TenantID, *roachpb.RangeFeedRequest) *Pacer // SetTenantWeightProvider is used to set the provider that will be // periodically polled for weights. The stopper should be used to terminate // the periodic polling. - SetTenantWeightProvider(provider TenantWeightProvider, stopper *stop.Stopper) + SetTenantWeightProvider(TenantWeightProvider, *stop.Stopper) // SnapshotIngested informs admission control about a range snapshot // ingestion. - SnapshotIngested(storeID roachpb.StoreID, ingestStats pebble.IngestOperationStats) + SnapshotIngested(roachpb.StoreID, pebble.IngestOperationStats) // FollowerStoreWriteBytes informs admission control about writes // replicated to a raft follower, that have not been subject to admission // control. - FollowerStoreWriteBytes(storeID roachpb.StoreID, followerWriteBytes FollowerStoreWriteBytes) + FollowerStoreWriteBytes(roachpb.StoreID, FollowerStoreWriteBytes) } // TenantWeightProvider can be periodically asked to provide the tenant @@ -272,6 +294,27 @@ func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteByt } } +// AdmitRangefeedRequest implements the Controller interface. +func (n *controllerImpl) AdmitRangefeedRequest( + tenantID roachpb.TenantID, request *roachpb.RangeFeedRequest, +) *Pacer { + // TODO(irfansharif): We need to version gate/be defensive when integrating + // rangefeeds since admission headers will not be fully set on older version + // nodes. See EnableRangefeedElasticCPUControl in cockroach_versions.go. + // Consider a cluster setting too. + + return &Pacer{ + unit: elasticCPUDurationPerRangefeedScanUnit.Get(&n.settings.SV), + wi: admission.WorkInfo{ + TenantID: tenantID, + Priority: admissionpb.WorkPriority(request.AdmissionHeader.Priority), + CreateTime: request.AdmissionHeader.CreateTime, + BypassAdmission: false, + }, + wq: n.elasticCPUWorkQueue, + } +} + // SetTenantWeightProvider implements the Controller interface. func (n *controllerImpl) SetTenantWeightProvider( provider TenantWeightProvider, stopper *stop.Stopper, @@ -388,3 +431,67 @@ func (wb *StoreWriteBytes) Release() { } storeWriteBytesPool.Put(wb) } + +// Pacer is used in tight loops (CPU-bound) for non-premptible elastic work. +// Callers are expected to invoke Pace() every loop iteration and Close() once +// done. Internally this type integrates with elastic CPU work queue, acquiring +// tokens for the CPU work being done, and blocking if tokens are unavailable. +// This allows for a form of cooperative scheduling with elastic CPU granters. +type Pacer struct { + unit time.Duration + wi admission.WorkInfo + wq *admission.ElasticCPUWorkQueue + + cur *admission.ElasticCPUWorkHandle +} + +// Pace is part of the Pacer interface. +func (p *Pacer) Pace(ctx context.Context) error { + if p == nil { + return nil + } + + if overLimit, _ := p.cur.OverLimit(); overLimit { + p.wq.AdmittedWorkDone(p.cur) + p.cur = nil + } + + if p.cur == nil { + handle, err := p.wq.Admit(ctx, p.unit, p.wi) + if err != nil { + return err + } + p.cur = handle + } + return nil +} + +// Close is part of the Pacer interface. +func (p *Pacer) Close() { + if p == nil || p.cur == nil { + return + } + + p.wq.AdmittedWorkDone(p.cur) + p.cur = nil +} + +type pacerKey struct{} + +// ContextWithPacer returns a Context wrapping the supplied Pacer, if any. +func ContextWithPacer(ctx context.Context, h *Pacer) context.Context { + if h == nil { + return ctx + } + return context.WithValue(ctx, pacerKey{}, h) +} + +// PacerFromContext returns the Pacer contained in the Context, if any. +func PacerFromContext(ctx context.Context) *Pacer { + val := ctx.Value(pacerKey{}) + h, ok := val.(*Pacer) + if !ok { + return nil + } + return h +} diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index 2ecebc5d8d1f..b971df3eae0c 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -17,6 +17,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/keys", + "//pkg/kv/kvserver/kvadmission", "//pkg/roachpb", "//pkg/settings", "//pkg/storage", diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan.go b/pkg/kv/kvserver/rangefeed/catchup_scan.go index 242a03537172..d6af20efc097 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan.go @@ -12,7 +12,9 @@ package rangefeed import ( "bytes" + "context" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -62,6 +64,7 @@ type CatchUpIterator struct { close func() span roachpb.Span startTime hlc.Timestamp // exclusive + pacer *kvadmission.Pacer } // NewCatchUpIterator returns a CatchUpIterator for the given Reader over the @@ -70,7 +73,11 @@ type CatchUpIterator struct { // NB: startTime is exclusive, i.e. the first possible event will be emitted at // Timestamp.Next(). func NewCatchUpIterator( - reader storage.Reader, span roachpb.Span, startTime hlc.Timestamp, closer func(), + reader storage.Reader, + span roachpb.Span, + startTime hlc.Timestamp, + closer func(), + pacer *kvadmission.Pacer, ) *CatchUpIterator { return &CatchUpIterator{ simpleCatchupIter: storage.NewMVCCIncrementalIterator(reader, @@ -89,6 +96,7 @@ func NewCatchUpIterator( close: closer, span: span, startTime: startTime, + pacer: pacer, } } @@ -96,6 +104,7 @@ func NewCatchUpIterator( // callback. func (i *CatchUpIterator) Close() { i.simpleCatchupIter.Close() + i.pacer.Close() if i.close != nil { i.close() } @@ -117,7 +126,9 @@ type outputEventFn func(e *roachpb.RangeFeedEvent) error // For example, with MVCC range tombstones [a-f)@5 and [a-f)@3 overlapping point // keys a@6, a@4, and b@2, the emitted order is [a-f)@3,[a-f)@5,a@4,a@6,b@2 because // the start key "a" is ordered before all of the timestamped point keys. -func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) error { +func (i *CatchUpIterator) CatchUpScan( + ctx context.Context, outputFn outputEventFn, withDiff bool, +) error { var a bufalloc.ByteAllocator // MVCCIterator will encounter historical values for each key in // reverse-chronological order. To output in chronological order, store @@ -319,6 +330,10 @@ func (i *CatchUpIterator) CatchUpScan(outputFn outputEventFn, withDiff bool) err i.Next() } } + + if err := i.pacer.Pace(ctx); err != nil { + return errors.Wrap(err, "automatic pacing: %v") + } } // Output events for the last key encountered. diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go index 103db0386136..30684c509c09 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go @@ -45,13 +45,14 @@ func runCatchUpBenchmark(b *testing.B, emk engineMaker, opts benchOptions) (numE EndKey: endKey, } + ctx := context.Background() b.ResetTimer() for i := 0; i < b.N; i++ { func() { - iter := rangefeed.NewCatchUpIterator(eng, span, opts.ts, nil) + iter := rangefeed.NewCatchUpIterator(eng, span, opts.ts, nil, nil) defer iter.Close() counter := 0 - err := iter.CatchUpScan(func(*roachpb.RangeFeedEvent) error { + err := iter.CatchUpScan(ctx, func(*roachpb.RangeFeedEvent) error { counter++ return nil }, opts.withDiff) diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_test.go index 95acf1917c7c..657d694c7aa4 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan_test.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan_test.go @@ -106,11 +106,11 @@ func TestCatchupScan(t *testing.T) { } testutils.RunTrueAndFalse(t, "withDiff", func(t *testing.T, withDiff bool) { span := roachpb.Span{Key: testKey1, EndKey: roachpb.KeyMax} - iter := NewCatchUpIterator(eng, span, ts1, nil) + iter := NewCatchUpIterator(eng, span, ts1, nil, nil) defer iter.Close() var events []roachpb.RangeFeedValue // ts1 here is exclusive, so we do not want the versions at ts1. - require.NoError(t, iter.CatchUpScan(func(e *roachpb.RangeFeedEvent) error { + require.NoError(t, iter.CatchUpScan(ctx, func(e *roachpb.RangeFeedEvent) error { events = append(events, *e.Val) return nil }, withDiff)) @@ -149,10 +149,10 @@ func TestCatchupScanInlineError(t *testing.T) { // Run a catchup scan across the span and watch it error. span := roachpb.Span{Key: keys.LocalMax, EndKey: keys.MaxKey} - iter := NewCatchUpIterator(eng, span, hlc.Timestamp{}, nil) + iter := NewCatchUpIterator(eng, span, hlc.Timestamp{}, nil, nil) defer iter.Close() - err := iter.CatchUpScan(nil, false) + err := iter.CatchUpScan(ctx, nil, false) require.Error(t, err) require.Contains(t, err.Error(), "unexpected inline value") } @@ -189,11 +189,11 @@ func TestCatchupScanSeesOldIntent(t *testing.T) { // Run a catchup scan across the span and watch it succeed. span := roachpb.Span{Key: keys.LocalMax, EndKey: keys.MaxKey} - iter := NewCatchUpIterator(eng, span, tsCutoff, nil) + iter := NewCatchUpIterator(eng, span, tsCutoff, nil, nil) defer iter.Close() keys := map[string]struct{}{} - require.NoError(t, iter.CatchUpScan(func(e *roachpb.RangeFeedEvent) error { + require.NoError(t, iter.CatchUpScan(ctx, func(e *roachpb.RangeFeedEvent) error { keys[string(e.Val.Key)] = struct{}{} return nil }, true /* withDiff */)) diff --git a/pkg/kv/kvserver/rangefeed/registry.go b/pkg/kv/kvserver/rangefeed/registry.go index a945160e95f6..5c27c95f98ab 100644 --- a/pkg/kv/kvserver/rangefeed/registry.go +++ b/pkg/kv/kvserver/rangefeed/registry.go @@ -300,7 +300,7 @@ func (r *registration) disconnect(pErr *roachpb.Error) { // have been emitted. func (r *registration) outputLoop(ctx context.Context) error { // If the registration has a catch-up scan, run it. - if err := r.maybeRunCatchUpScan(); err != nil { + if err := r.maybeRunCatchUpScan(ctx); err != nil { err = errors.Wrap(err, "catch-up scan failed") log.Errorf(ctx, "%v", err) return err @@ -372,7 +372,7 @@ func (r *registration) drainAllocations(ctx context.Context) { // // If the registration does not have a catchUpIteratorConstructor, this method // is a no-op. -func (r *registration) maybeRunCatchUpScan() error { +func (r *registration) maybeRunCatchUpScan(ctx context.Context) error { catchUpIter := r.detachCatchUpIter() if catchUpIter == nil { return nil @@ -383,7 +383,7 @@ func (r *registration) maybeRunCatchUpScan() error { r.metrics.RangeFeedCatchUpScanNanos.Inc(timeutil.Since(start).Nanoseconds()) }() - return catchUpIter.CatchUpScan(r.stream.Send, r.withDiff) + return catchUpIter.CatchUpScan(ctx, r.stream.Send, r.withDiff) } // ID implements interval.Interface. diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index c88ec2d41677..34e2c824c766 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -278,7 +278,7 @@ func TestRegistrationCatchUpScan(t *testing.T) { }, hlc.Timestamp{WallTime: 4}, iter, true /* withDiff */) require.Zero(t, r.metrics.RangeFeedCatchUpScanNanos.Count()) - require.NoError(t, r.maybeRunCatchUpScan()) + require.NoError(t, r.maybeRunCatchUpScan(context.Background())) require.True(t, iter.closed) require.NotZero(t, r.metrics.RangeFeedCatchUpScanNanos.Count()) diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index b1955eb52f2f..401ca55771e5 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/intentresolver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -131,22 +132,24 @@ func (tp *rangefeedTxnPusher) ResolveIntents( // RangeFeed registers a rangefeed over the specified span. It sends updates to // the provided stream and returns with an optional error when the rangefeed is -// complete. The provided ConcurrentRequestLimiter is used to limit the number -// of rangefeeds using catch-up iterators at the same time. +// complete. The surrounding store's ConcurrentRequestLimiter is used to limit +// the number of rangefeeds using catch-up iterators at the same time. func (r *Replica) RangeFeed( - args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, + ctx context.Context, args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, ) *roachpb.Error { - return r.rangeFeedWithRangeID(r.RangeID, args, stream) + return r.rangeFeedWithRangeID(ctx, r.RangeID, args, stream) } func (r *Replica) rangeFeedWithRangeID( - _forStacks roachpb.RangeID, args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, + ctx context.Context, + _forStacks roachpb.RangeID, + args *roachpb.RangeFeedRequest, + stream roachpb.RangeFeedEventSink, ) *roachpb.Error { if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`)) } - ctx := r.AnnotateCtx(stream.Context()) rSpan, err := keys.SpanAddr(args.Span) if err != nil { @@ -222,7 +225,7 @@ func (r *Replica) rangeFeedWithRangeID( // Assert that we still hold the raftMu when this is called to ensure // that the catchUpIter reads from the current snapshot. r.raftMu.AssertHeld() - return rangefeed.NewCatchUpIterator(r.Engine(), span, startTime, iterSemRelease) + return rangefeed.NewCatchUpIterator(r.Engine(), span, startTime, iterSemRelease, kvadmission.PacerFromContext(ctx)) } } p := r.registerWithRangefeedRaftMuLocked( diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index e174c887e270..f7b25199fd3f 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -3183,7 +3183,13 @@ func (s *Store) RangeFeed( // one here. return roachpb.NewError(roachpb.NewRangeNotFoundError(args.RangeID, s.StoreID())) } - return repl.RangeFeed(args, stream) + + ctx := repl.AnnotateCtx(stream.Context()) + tenID, _ := repl.TenantID() + if pacer := s.cfg.KVAdmissionController.AdmitRangefeedRequest(tenID, args); pacer != nil { + ctx = kvadmission.ContextWithPacer(ctx, pacer) + } + return repl.RangeFeed(ctx, args, stream) } // updateReplicationGauges counts a number of simple replication statistics for From 69e24478ba24b0d79fd7d36bc9c94f8d7b1bdff7 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 24 Oct 2022 15:11:15 -0400 Subject: [PATCH 4/5] kvclient: use NormalPri for system-table rangefeed .. .. catch-up scans, introduce a private cluster setting (kvadmission.rangefeed_catchup_scan_elastic_control.enabled) to selectively switch off catch-up scan integration if needed, and plumb kvadmission.Pacer in explicitly to rangefeed catchup scan loop instead of opaquely through the surrounding context. Release note: None --- .../changefeedccl/changefeedbase/settings.go | 3 + pkg/ccl/changefeedccl/kvfeed/scanner.go | 9 ++- pkg/kv/db.go | 5 ++ .../kvclient/kvcoord/dist_sender_rangefeed.go | 21 +++++- pkg/kv/kvclient/rangefeed/BUILD.bazel | 1 + pkg/kv/kvclient/rangefeed/config.go | 15 ++++ pkg/kv/kvclient/rangefeed/db_adapter.go | 73 +++++++++++-------- .../rangefeed/mocks_generated_test.go | 13 +++- pkg/kv/kvclient/rangefeed/rangefeed.go | 8 +- .../kvclient/rangefeed/rangefeed_mock_test.go | 3 +- .../rangefeed/rangefeedcache/watcher.go | 4 + .../client_replica_circuit_breaker_test.go | 4 +- pkg/kv/kvserver/kvadmission/kvadmission.go | 44 +++++------ pkg/kv/kvserver/rangefeed/catchup_scan.go | 16 +++- pkg/kv/kvserver/replica_rangefeed.go | 9 ++- pkg/kv/kvserver/store.go | 7 +- .../spanconfigsqlwatcher/sqlwatcher.go | 3 + pkg/sql/catalog/lease/lease.go | 1 + .../instancestorage/instancereader.go | 1 + pkg/sql/stats/stats_cache.go | 1 + 20 files changed, 158 insertions(+), 83 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 0f7bdb8d635a..22841234c7e1 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -122,6 +122,9 @@ var ScanRequestLimit = settings.RegisterIntSetting( ) // ScanRequestSize is the target size of the scan request response. +// +// TODO(cdc,yevgeniy,irfansharif): 16 MiB is too large for "elastic" work such +// as this; reduce the default. Evaluate this as part of #90089. var ScanRequestSize = settings.RegisterIntSetting( settings.TenantWritable, "changefeed.backfill.scan_request_size", diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner.go b/pkg/ccl/changefeedccl/kvfeed/scanner.go index d53f92336dcb..ff248c40943e 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner.go @@ -142,7 +142,14 @@ func (p *scanRequestScanner) exportSpan( r.ScanFormat = roachpb.BATCH_RESPONSE b.Header.TargetBytes = targetBytesPerScan b.AdmissionHeader = roachpb.AdmissionHeader{ - Priority: int32(admissionpb.BulkNormalPri), + // TODO(irfansharif): Make this configurable if we want system table + // scanners or support "high priority" changefeeds to run at higher + // priorities. We use higher AC priorities for system-internal + // rangefeeds listening in on system table changes. + Priority: int32(admissionpb.BulkNormalPri), + // We specify a creation time for each batch (as opposed to at the + // txn level) -- this way later batches from earlier txns don't just + // out compete batches from newer txns. CreateTime: start.UnixNano(), Source: roachpb.AdmissionHeader_FROM_SQL, NoMemoryReservedAtSource: true, diff --git a/pkg/kv/db.go b/pkg/kv/db.go index a0bce7f5f1be..6aa209f5cb24 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -912,6 +912,11 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn { // is marked as poisoned and all future ops fail fast until the retry. The // callback may return either nil or the retryable error. Txn is responsible for // resetting the transaction and retrying the callback. +// +// TODO(irfansharif): Audit uses of this since API since it bypasses AC. Make +// the other variant (TxnWithAdmissionControl) the default, or maybe rename this +// to be more explicit (TxnWithoutAdmissionControl) so new callers have to be +// conscious about what they want. func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) error) error { return db.TxnWithAdmissionControl( ctx, roachpb.AdmissionHeader_OTHER, admissionpb.NormalPri, retryable) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index ddf2659a79ac..d343335215f1 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -86,6 +86,7 @@ func maxConcurrentCatchupScans(sv *settings.Values) int { type rangeFeedConfig struct { useMuxRangeFeed bool + overSystemTable bool } // RangeFeedOption configures a RangeFeed. @@ -104,6 +105,14 @@ func WithMuxRangeFeed() RangeFeedOption { }) } +// WithSystemTablePriority is used for system-internal rangefeeds, it uses a +// higher admission priority during catch up scans. +func WithSystemTablePriority() RangeFeedOption { + return optionFunc(func(c *rangeFeedConfig) { + c.overSystemTable = true + }) +} + // A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation. var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true) @@ -196,7 +205,7 @@ func (ds *DistSender) RangeFeedSpans( // Spawn a child goroutine to process this feed. g.GoCtx(func(ctx context.Context) error { return ds.partialRangeFeed(ctx, rr, eventProducer, sri.rs, sri.startAfter, - sri.token, withDiff, &catchupSem, rangeCh, eventCh) + sri.token, withDiff, &catchupSem, rangeCh, eventCh, cfg) }) case <-ctx.Done(): return ctx.Err() @@ -372,6 +381,7 @@ func (ds *DistSender) partialRangeFeed( catchupSem *limit.ConcurrentRequestLimiter, rangeCh chan<- singleRangeInfo, eventCh chan<- RangeFeedMessage, + cfg rangeFeedConfig, ) error { // Bound the partial rangefeed to the partial span. span := rs.AsRawSpanWithNoLocals() @@ -408,7 +418,7 @@ func (ds *DistSender) partialRangeFeed( // Establish a RangeFeed for a single Range. maxTS, err := ds.singleRangeFeed( ctx, span, startAfter, withDiff, token.Desc(), - catchupSem, eventCh, streamProducerFactory, active.onRangeEvent) + catchupSem, eventCh, streamProducerFactory, active.onRangeEvent, cfg) // Forward the timestamp in case we end up sending it again. startAfter.Forward(maxTS) @@ -496,11 +506,16 @@ func (ds *DistSender) singleRangeFeed( eventCh chan<- RangeFeedMessage, streamProducerFactory rangeFeedEventProducerFactory, onRangeEvent onRangeEventCb, + cfg rangeFeedConfig, ) (hlc.Timestamp, error) { // Ensure context is cancelled on all errors, to prevent gRPC stream leaks. ctx, cancelFeed := context.WithCancel(ctx) defer cancelFeed() + admissionPri := admissionpb.BulkNormalPri + if cfg.overSystemTable { + admissionPri = admissionpb.NormalPri + } args := roachpb.RangeFeedRequest{ Span: span, Header: roachpb.Header{ @@ -511,7 +526,7 @@ func (ds *DistSender) singleRangeFeed( AdmissionHeader: roachpb.AdmissionHeader{ // NB: AdmissionHeader is used only at the start of the range feed // stream since the initial catch-up scan is expensive. - Priority: int32(admissionpb.BulkNormalPri), + Priority: int32(admissionPri), CreateTime: timeutil.Now().UnixNano(), Source: roachpb.AdmissionHeader_FROM_SQL, NoMemoryReservedAtSource: true, diff --git a/pkg/kv/kvclient/rangefeed/BUILD.bazel b/pkg/kv/kvclient/rangefeed/BUILD.bazel index 7103f774f392..20e32afa0b38 100644 --- a/pkg/kv/kvclient/rangefeed/BUILD.bazel +++ b/pkg/kv/kvclient/rangefeed/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//pkg/roachpb", "//pkg/settings", "//pkg/settings/cluster", + "//pkg/util/admission/admissionpb", "//pkg/util/ctxgroup", "//pkg/util/hlc", "//pkg/util/limit", diff --git a/pkg/kv/kvclient/rangefeed/config.go b/pkg/kv/kvclient/rangefeed/config.go index c3d5621f421a..2be820c90b49 100644 --- a/pkg/kv/kvclient/rangefeed/config.go +++ b/pkg/kv/kvclient/rangefeed/config.go @@ -63,6 +63,12 @@ type scanConfig struct { // configures retry behavior retryBehavior ScanRetryBehavior + + // overSystemTable indicates whether this rangefeed is over a system table + // (used internally for CRDB's own functioning) and therefore should be + // treated with a more appropriate admission pri (NormalPri instead of + // BulkNormalPri). + overSystemTable bool } type optionFunc func(*config) @@ -287,3 +293,12 @@ func WithPProfLabel(key, value string) Option { c.extraPProfLabels = append(c.extraPProfLabels, key, value) }) } + +// WithSystemTablePriority communicates that the rangefeed is over a system +// table and thus operates at a higher priority (this primarily affects +// admission control). +func WithSystemTablePriority() Option { + return optionFunc(func(c *config) { + c.overSystemTable = true + }) +} diff --git a/pkg/kv/kvclient/rangefeed/db_adapter.go b/pkg/kv/kvclient/rangefeed/db_adapter.go index 759091e9909b..7bcb92110fc9 100644 --- a/pkg/kv/kvclient/rangefeed/db_adapter.go +++ b/pkg/kv/kvclient/rangefeed/db_adapter.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/limit" @@ -74,8 +75,9 @@ func (dbc *dbAdapter) RangeFeed( startFrom hlc.Timestamp, withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, + opts ...kvcoord.RangeFeedOption, ) error { - return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC) + return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC, opts...) } // concurrentBoundAccount is a thread safe bound account. @@ -118,7 +120,7 @@ func (dbc *dbAdapter) Scan( // If we don't have parallelism configured, just scan each span in turn. if cfg.scanParallelism == nil { for _, sp := range spans { - if err := dbc.scanSpan(ctx, sp, asOf, rowFn, cfg.targetScanBytes, cfg.onSpanDone, acc); err != nil { + if err := dbc.scanSpan(ctx, sp, asOf, rowFn, cfg.targetScanBytes, cfg.onSpanDone, cfg.overSystemTable, acc); err != nil { return err } } @@ -154,7 +156,7 @@ func (dbc *dbAdapter) Scan( g := ctxgroup.WithContext(ctx) err := dbc.divideAndSendScanRequests( ctx, &g, spans, asOf, rowFn, - parallelismFn, cfg.targetScanBytes, cfg.onSpanDone, acc) + parallelismFn, cfg.targetScanBytes, cfg.onSpanDone, cfg.overSystemTable, acc) if err != nil { cancel() } @@ -168,6 +170,7 @@ func (dbc *dbAdapter) scanSpan( rowFn func(value roachpb.KeyValue), targetScanBytes int64, onScanDone OnScanCompleted, + overSystemTable bool, acc *concurrentBoundAccount, ) error { if acc != nil { @@ -177,39 +180,46 @@ func (dbc *dbAdapter) scanSpan( defer acc.Shrink(ctx, targetScanBytes) } - return dbc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if err := txn.SetFixedTimestamp(ctx, asOf); err != nil { - return err - } - sp := span - var b kv.Batch - for { - b.Header.TargetBytes = targetScanBytes - b.Scan(sp.Key, sp.EndKey) - if err := txn.Run(ctx, &b); err != nil { + admissionPri := admissionpb.BulkNormalPri + if overSystemTable { + admissionPri = admissionpb.NormalPri + } + return dbc.db.TxnWithAdmissionControl(ctx, + roachpb.AdmissionHeader_ROOT_KV, + admissionPri, + func(ctx context.Context, txn *kv.Txn) error { + if err := txn.SetFixedTimestamp(ctx, asOf); err != nil { return err } - res := b.Results[0] - for _, row := range res.Rows { - rowFn(roachpb.KeyValue{Key: row.Key, Value: *row.Value}) - } - if res.ResumeSpan == nil { - if onScanDone != nil { - return onScanDone(ctx, sp) + sp := span + var b kv.Batch + for { + b.Header.TargetBytes = targetScanBytes + b.Scan(sp.Key, sp.EndKey) + if err := txn.Run(ctx, &b); err != nil { + return err + } + res := b.Results[0] + for _, row := range res.Rows { + rowFn(roachpb.KeyValue{Key: row.Key, Value: *row.Value}) + } + if res.ResumeSpan == nil { + if onScanDone != nil { + return onScanDone(ctx, sp) + } + return nil } - return nil - } - if onScanDone != nil { - if err := onScanDone(ctx, roachpb.Span{Key: sp.Key, EndKey: res.ResumeSpan.Key}); err != nil { - return err + if onScanDone != nil { + if err := onScanDone(ctx, roachpb.Span{Key: sp.Key, EndKey: res.ResumeSpan.Key}); err != nil { + return err + } } - } - sp = res.ResumeSpanAsValue() - b = kv.Batch{} - } - }) + sp = res.ResumeSpanAsValue() + b = kv.Batch{} + } + }) } // divideAndSendScanRequests divides spans into small ranges based on range boundaries, @@ -224,6 +234,7 @@ func (dbc *dbAdapter) divideAndSendScanRequests( parallelismFn func() int, targetScanBytes int64, onSpanDone OnScanCompleted, + overSystemTable bool, acc *concurrentBoundAccount, ) error { // Build a span group so that we can iterate spans in order. @@ -261,7 +272,7 @@ func (dbc *dbAdapter) divideAndSendScanRequests( sp := partialRS.AsRawSpanWithNoLocals() workGroup.GoCtx(func(ctx context.Context) error { defer limAlloc.Release() - return dbc.scanSpan(ctx, sp, asOf, rowFn, targetScanBytes, onSpanDone, acc) + return dbc.scanSpan(ctx, sp, asOf, rowFn, targetScanBytes, onSpanDone, overSystemTable, acc) }) if !ri.NeedAnother(nextRS) { diff --git a/pkg/kv/kvclient/rangefeed/mocks_generated_test.go b/pkg/kv/kvclient/rangefeed/mocks_generated_test.go index b0472217ad47..8c211dcb108d 100644 --- a/pkg/kv/kvclient/rangefeed/mocks_generated_test.go +++ b/pkg/kv/kvclient/rangefeed/mocks_generated_test.go @@ -38,17 +38,22 @@ func (m *MockDB) EXPECT() *MockDBMockRecorder { } // RangeFeed mocks base method. -func (m *MockDB) RangeFeed(arg0 context.Context, arg1 []roachpb.Span, arg2 hlc.Timestamp, arg3 bool, arg4 chan<- kvcoord.RangeFeedMessage) error { +func (m *MockDB) RangeFeed(arg0 context.Context, arg1 []roachpb.Span, arg2 hlc.Timestamp, arg3 bool, arg4 chan<- kvcoord.RangeFeedMessage, arg5 ...kvcoord.RangeFeedOption) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RangeFeed", arg0, arg1, arg2, arg3, arg4) + varargs := []interface{}{arg0, arg1, arg2, arg3, arg4} + for _, a := range arg5 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "RangeFeed", varargs...) ret0, _ := ret[0].(error) return ret0 } // RangeFeed indicates an expected call of RangeFeed. -func (mr *MockDBMockRecorder) RangeFeed(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { +func (mr *MockDBMockRecorder) RangeFeed(arg0, arg1, arg2, arg3, arg4 interface{}, arg5 ...interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RangeFeed", reflect.TypeOf((*MockDB)(nil).RangeFeed), arg0, arg1, arg2, arg3, arg4) + varargs := append([]interface{}{arg0, arg1, arg2, arg3, arg4}, arg5...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RangeFeed", reflect.TypeOf((*MockDB)(nil).RangeFeed), varargs...) } // Scan mocks base method. diff --git a/pkg/kv/kvclient/rangefeed/rangefeed.go b/pkg/kv/kvclient/rangefeed/rangefeed.go index 389717380d33..aaf2e75ecd76 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed.go @@ -55,6 +55,7 @@ type DB interface { startFrom hlc.Timestamp, withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, + opts ...kvcoord.RangeFeedOption, ) error // Scan encapsulates scanning a key span at a given point in time. The method @@ -287,6 +288,11 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) { // draining when the rangefeed fails. eventCh := make(chan kvcoord.RangeFeedMessage) + var rangefeedOpts []kvcoord.RangeFeedOption + if f.scanConfig.overSystemTable { + rangefeedOpts = append(rangefeedOpts, kvcoord.WithSystemTablePriority()) + } + for i := 0; r.Next(); i++ { ts := frontier.Frontier() if log.ExpensiveLogEnabled(ctx, 1) { @@ -296,7 +302,7 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) { start := timeutil.Now() rangeFeedTask := func(ctx context.Context) error { - return f.client.RangeFeed(ctx, f.spans, ts, f.withDiff, eventCh) + return f.client.RangeFeed(ctx, f.spans, ts, f.withDiff, eventCh, rangefeedOpts...) } processEventsTask := func(ctx context.Context) error { return f.processEvents(ctx, frontier, eventCh) diff --git a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go index 8565b10b4baa..67bfd19e51a8 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go +++ b/pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go @@ -54,6 +54,7 @@ func (m *mockClient) RangeFeed( startFrom hlc.Timestamp, withDiff bool, eventC chan<- kvcoord.RangeFeedMessage, + opts ...kvcoord.RangeFeedOption, ) error { return m.rangefeed(ctx, spans, startFrom, withDiff, eventC) } @@ -364,7 +365,7 @@ func TestBackoffOnRangefeedFailure(t *testing.T) { Times(3). Return(errors.New("rangefeed failed")) db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). - Do(func(context.Context, []roachpb.Span, hlc.Timestamp, bool, chan<- kvcoord.RangeFeedMessage) { + Do(func(context.Context, []roachpb.Span, hlc.Timestamp, bool, chan<- kvcoord.RangeFeedMessage, ...kvcoord.RangeFeedOption) { cancel() }). Return(nil) diff --git a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go index 33e5d52f3acd..32b7eaed15da 100644 --- a/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go +++ b/pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go @@ -292,6 +292,10 @@ func (s *Watcher) Run(ctx context.Context) error { case frontierBumpedCh <- struct{}{}: } }), + // TODO(irfansharif): Consider making this configurable on the Watcher + // type. As of 2022-11 all uses of this type are system-internal ones + // where a higher admission-pri makes sense. + rangefeed.WithSystemTablePriority(), rangefeed.WithDiff(s.withPrevValue), rangefeed.WithRowTimestampInInitialScan(true), rangefeed.WithOnInitialScanError(func(ctx context.Context, err error) (shouldFail bool) { diff --git a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go index bfe0b928a2ec..3cdd064831d6 100644 --- a/pkg/kv/kvserver/client_replica_circuit_breaker_test.go +++ b/pkg/kv/kvserver/client_replica_circuit_breaker_test.go @@ -442,7 +442,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { defer cancel() stream1 := &dummyStream{t: t, ctx: ctx, name: "rangefeed1", recv: make(chan *roachpb.RangeFeedEvent)} require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream1", func(ctx context.Context) { - err := tc.repls[0].RangeFeed(ctx, args, stream1).GoError() + err := tc.repls[0].RangeFeed(args, stream1, nil /* pacer */).GoError() if ctx.Err() != nil { return // main goroutine stopping } @@ -496,7 +496,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) { // the breaker. stream2 := &dummyStream{t: t, ctx: ctx, name: "rangefeed2", recv: make(chan *roachpb.RangeFeedEvent)} require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream2", func(ctx context.Context) { - err := tc.repls[0].RangeFeed(ctx, args, stream2).GoError() + err := tc.repls[0].RangeFeed(args, stream2, nil /* pacer */).GoError() if ctx.Err() != nil { return // main goroutine stopping } diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index ad37c3352ac1..b682b584648e 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -52,7 +52,8 @@ var elasticCPUDurationPerExportRequest = settings.RegisterDurationSetting( ) // elasticCPUDurationPerRangefeedScanUnit controls how many CPU tokens are -// allotted for each unit of work during rangefeed catchup scans. +// allotted for each unit of work during rangefeed catchup scans. Only takes +// effect if kvadmission.rangefeed_catchup_scan_elastic_control.enabled is set. var elasticCPUDurationPerRangefeedScanUnit = settings.RegisterDurationSetting( settings.SystemOnly, "kvadmission.elastic_cpu.duration_per_rangefeed_scan_unit", @@ -71,6 +72,15 @@ var elasticCPUDurationPerRangefeedScanUnit = settings.RegisterDurationSetting( }, ) +// rangefeedCatchupScanElasticControlEnabled determines whether rangefeed catch +// up scans integrate with elastic CPU control. +var rangefeedCatchupScanElasticControlEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kvadmission.rangefeed_catchup_scan_elastic_control.enabled", + "determines whether rangefeed catchup scans integrate with the elastic CPU control", + true, +) + // Controller provides admission control for the KV layer. type Controller interface { // AdmitKVWork must be called before performing KV work. @@ -83,8 +93,9 @@ type Controller interface { // executing. AdmittedKVWorkDone(Handle, *StoreWriteBytes) // AdmitRangefeedRequest must be called before serving rangefeed requests. - // It returns a Pacer that's used within rangefeed catchup scans (typically - // CPU-intensive and affects scheduling latencies negatively). + // If enabled, it returns a non-nil Pacer that's to be used within rangefeed + // catchup scans (typically CPU-intensive and affecting scheduling + // latencies). AdmitRangefeedRequest(roachpb.TenantID, *roachpb.RangeFeedRequest) *Pacer // SetTenantWeightProvider is used to set the provider that will be // periodically polled for weights. The stopper should be used to terminate @@ -298,10 +309,9 @@ func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteByt func (n *controllerImpl) AdmitRangefeedRequest( tenantID roachpb.TenantID, request *roachpb.RangeFeedRequest, ) *Pacer { - // TODO(irfansharif): We need to version gate/be defensive when integrating - // rangefeeds since admission headers will not be fully set on older version - // nodes. See EnableRangefeedElasticCPUControl in cockroach_versions.go. - // Consider a cluster setting too. + if !rangefeedCatchupScanElasticControlEnabled.Get(&n.settings.SV) { + return nil + } return &Pacer{ unit: elasticCPUDurationPerRangefeedScanUnit.Get(&n.settings.SV), @@ -475,23 +485,3 @@ func (p *Pacer) Close() { p.wq.AdmittedWorkDone(p.cur) p.cur = nil } - -type pacerKey struct{} - -// ContextWithPacer returns a Context wrapping the supplied Pacer, if any. -func ContextWithPacer(ctx context.Context, h *Pacer) context.Context { - if h == nil { - return ctx - } - return context.WithValue(ctx, pacerKey{}, h) -} - -// PacerFromContext returns the Pacer contained in the Context, if any. -func PacerFromContext(ctx context.Context) *Pacer { - val := ctx.Value(pacerKey{}) - h, ok := val.(*Pacer) - if !ok { - return nil - } - return h -} diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan.go b/pkg/kv/kvserver/rangefeed/catchup_scan.go index d6af20efc097..06a2fd974cec 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan.go @@ -13,6 +13,7 @@ package rangefeed import ( "bytes" "context" + "time" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -20,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/errors" ) @@ -154,6 +156,8 @@ func (i *CatchUpIterator) CatchUpScan( var lastKey roachpb.Key var meta enginepb.MVCCMetadata i.SeekGE(storage.MVCCKey{Key: i.span.Key}) + + every := log.Every(100 * time.Millisecond) for { if ok, err := i.Valid(); err != nil { return err @@ -161,6 +165,14 @@ func (i *CatchUpIterator) CatchUpScan( break } + if err := i.pacer.Pace(ctx); err != nil { + // We're unable to pace things automatically -- shout loudly + // semi-infrequently but don't fail the rangefeed itself. + if every.ShouldLog() { + log.Errorf(ctx, "automatic pacing: %v", err) + } + } + // Emit any new MVCC range tombstones when their start key is encountered. // Range keys can currently only be MVCC range tombstones. // @@ -330,10 +342,6 @@ func (i *CatchUpIterator) CatchUpScan( i.Next() } } - - if err := i.pacer.Pace(ctx); err != nil { - return errors.Wrap(err, "automatic pacing: %v") - } } // Output events for the last key encountered. diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 401ca55771e5..a8b829eb9f6c 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -135,21 +135,22 @@ func (tp *rangefeedTxnPusher) ResolveIntents( // complete. The surrounding store's ConcurrentRequestLimiter is used to limit // the number of rangefeeds using catch-up iterators at the same time. func (r *Replica) RangeFeed( - ctx context.Context, args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, + args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, pacer *kvadmission.Pacer, ) *roachpb.Error { - return r.rangeFeedWithRangeID(ctx, r.RangeID, args, stream) + return r.rangeFeedWithRangeID(r.RangeID, args, stream, pacer) } func (r *Replica) rangeFeedWithRangeID( - ctx context.Context, _forStacks roachpb.RangeID, args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, + pacer *kvadmission.Pacer, ) *roachpb.Error { if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) { return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s", docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`)) } + ctx := r.AnnotateCtx(stream.Context()) rSpan, err := keys.SpanAddr(args.Span) if err != nil { @@ -225,7 +226,7 @@ func (r *Replica) rangeFeedWithRangeID( // Assert that we still hold the raftMu when this is called to ensure // that the catchUpIter reads from the current snapshot. r.raftMu.AssertHeld() - return rangefeed.NewCatchUpIterator(r.Engine(), span, startTime, iterSemRelease, kvadmission.PacerFromContext(ctx)) + return rangefeed.NewCatchUpIterator(r.Engine(), span, startTime, iterSemRelease, pacer) } } p := r.registerWithRangefeedRaftMuLocked( diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index f7b25199fd3f..8aa7addcbb7b 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -3184,12 +3184,9 @@ func (s *Store) RangeFeed( return roachpb.NewError(roachpb.NewRangeNotFoundError(args.RangeID, s.StoreID())) } - ctx := repl.AnnotateCtx(stream.Context()) tenID, _ := repl.TenantID() - if pacer := s.cfg.KVAdmissionController.AdmitRangefeedRequest(tenID, args); pacer != nil { - ctx = kvadmission.ContextWithPacer(ctx, pacer) - } - return repl.RangeFeed(ctx, args, stream) + pacer := s.cfg.KVAdmissionController.AdmitRangefeedRequest(tenID, args) + return repl.RangeFeed(args, stream, pacer) } // updateReplicationGauges counts a number of simple replication statistics for diff --git a/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go b/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go index 99639056b8ed..745117208136 100644 --- a/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go +++ b/pkg/spanconfig/spanconfigsqlwatcher/sqlwatcher.go @@ -231,6 +231,7 @@ func (s *SQLWatcher) watchForDescriptorUpdates( []roachpb.Span{descriptorTableSpan}, startTS, handleEvent, + rangefeed.WithSystemTablePriority(), rangefeed.WithDiff(true), rangefeed.WithOnFrontierAdvance(func(ctx context.Context, resolvedTS hlc.Timestamp) { onFrontierAdvance(ctx, descriptorsRangefeed, resolvedTS) @@ -290,6 +291,7 @@ func (s *SQLWatcher) watchForZoneConfigUpdates( []roachpb.Span{zoneTableSpan}, startTS, handleEvent, + rangefeed.WithSystemTablePriority(), rangefeed.WithOnFrontierAdvance(func(ctx context.Context, resolvedTS hlc.Timestamp) { onFrontierAdvance(ctx, zonesRangefeed, resolvedTS) }), @@ -385,6 +387,7 @@ func (s *SQLWatcher) watchForProtectedTimestampUpdates( []roachpb.Span{ptsRecordsTableSpan}, startTS, handleEvent, + rangefeed.WithSystemTablePriority(), rangefeed.WithOnFrontierAdvance(func(ctx context.Context, resolvedTS hlc.Timestamp) { onFrontierAdvance(ctx, protectedTimestampRangefeed, resolvedTS) }), diff --git a/pkg/sql/catalog/lease/lease.go b/pkg/sql/catalog/lease/lease.go index 13ebcc0a1f8a..35f7cff7b4ef 100644 --- a/pkg/sql/catalog/lease/lease.go +++ b/pkg/sql/catalog/lease/lease.go @@ -1155,6 +1155,7 @@ func (m *Manager) watchForUpdates(ctx context.Context, descUpdateCh chan<- *desc // shuts down, so we don't need to call Close() ourselves. _, _ = m.rangeFeedFactory.RangeFeed( ctx, "lease", []roachpb.Span{descriptorTableSpan}, hlc.Timestamp{}, handleEvent, + rangefeed.WithSystemTablePriority(), ) } diff --git a/pkg/sql/sqlinstance/instancestorage/instancereader.go b/pkg/sql/sqlinstance/instancestorage/instancereader.go index 5118e76999cb..4c189dcae085 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancereader.go +++ b/pkg/sql/sqlinstance/instancestorage/instancereader.go @@ -160,6 +160,7 @@ func (r *Reader) maybeStartRangeFeed(ctx context.Context) *rangefeed.RangeFeed { []roachpb.Span{instancesTableSpan}, r.clock.Now(), updateCacheFn, + rangefeed.WithSystemTablePriority(), rangefeed.WithInitialScan(initialScanDoneFn), rangefeed.WithOnInitialScanError(initialScanErrFn), rangefeed.WithRowTimestampInInitialScan(true), diff --git a/pkg/sql/stats/stats_cache.go b/pkg/sql/stats/stats_cache.go index 85cce35b958b..5a13cdb0af11 100644 --- a/pkg/sql/stats/stats_cache.go +++ b/pkg/sql/stats/stats_cache.go @@ -179,6 +179,7 @@ func (sc *TableStatisticsCache) Start( []roachpb.Span{statsTableSpan}, sc.ClientDB.Clock().Now(), handleEvent, + rangefeed.WithSystemTablePriority(), ) return err } From 03b2bbd20cc8a52d99f7173fae5bc6827263fdf1 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Wed, 23 Nov 2022 20:53:32 -0500 Subject: [PATCH 5/5] kvadmission: disable AC-integration of rangefeed catch-up scans We want this to be disabled by default in 22.2, used only selectively. Release note: None --- pkg/kv/kvserver/kvadmission/kvadmission.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/kvadmission/kvadmission.go b/pkg/kv/kvserver/kvadmission/kvadmission.go index b682b584648e..5c826b651f61 100644 --- a/pkg/kv/kvserver/kvadmission/kvadmission.go +++ b/pkg/kv/kvserver/kvadmission/kvadmission.go @@ -78,7 +78,7 @@ var rangefeedCatchupScanElasticControlEnabled = settings.RegisterBoolSetting( settings.SystemOnly, "kvadmission.rangefeed_catchup_scan_elastic_control.enabled", "determines whether rangefeed catchup scans integrate with the elastic CPU control", - true, + false, ) // Controller provides admission control for the KV layer.