From a3a825bcfe31e8c99555a81f54275e9e4d7dc3a6 Mon Sep 17 00:00:00 2001 From: irfan sharif Date: Mon, 17 Oct 2022 20:39:39 +0000 Subject: [PATCH] changefeed,kvcoord: populate AC headers for backfill work This is an opportunistic change and something to backport to v22.2. In v22.2 we introduced a disabled-by-default elastic CPU limiter (#86638) to dynamically grant CPU time for background work like backups -- something hope to enable-by-default in CC and under observation for select 22.2 clusters. Recently we found another use for this limiter -- rangefeed catchup scans (#89709). It's unclear yet whether we want that integration to make it back to v22.2 but this commit leaves that option open by populating the right AC headers we'd need for the integration. Populating these AC headers are safe -- the Rangefeed RPC is not hooked into AC yet, so these headers are not looked at. For the initial scan requests we're only setting a lower priority bit, something 22.1 nodes already know to handle (they do so for all batch requests). Release note: None --- pkg/ccl/changefeedccl/kvfeed/BUILD.bazel | 1 + pkg/ccl/changefeedccl/kvfeed/scanner.go | 7 +++++++ pkg/kv/kvclient/kvcoord/BUILD.bazel | 1 + pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go | 9 +++++++++ 4 files changed, 18 insertions(+) diff --git a/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel b/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel index a5efc9e0041e..a305023d835a 100644 --- a/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel +++ b/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel @@ -25,6 +25,7 @@ go_library( "//pkg/settings/cluster", "//pkg/sql/covering", "//pkg/storage/enginepb", + "//pkg/util/admission/admissionpb", "//pkg/util/ctxgroup", "//pkg/util/hlc", "//pkg/util/limit", diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner.go b/pkg/ccl/changefeedccl/kvfeed/scanner.go index 36c3f6887c72..2c1f2195b7ed 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/covering" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/limit" @@ -140,6 +141,12 @@ func (p *scanRequestScanner) exportSpan( r := roachpb.NewScan(remaining.Key, remaining.EndKey, false /* forUpdate */).(*roachpb.ScanRequest) r.ScanFormat = roachpb.BATCH_RESPONSE b.Header.TargetBytes = targetBytesPerScan + b.AdmissionHeader = roachpb.AdmissionHeader{ + Priority: int32(admissionpb.BulkNormalPri), + CreateTime: start.UnixNano(), + Source: roachpb.AdmissionHeader_FROM_SQL, + NoMemoryReservedAtSource: true, + } // NB: We use a raw request rather than the Scan() method because we want // the MVCC timestamps which are encoded in the response but are filtered // during result parsing. diff --git a/pkg/kv/kvclient/kvcoord/BUILD.bazel b/pkg/kv/kvclient/kvcoord/BUILD.bazel index da1f5bfb569f..e032761d174e 100644 --- a/pkg/kv/kvclient/kvcoord/BUILD.bazel +++ b/pkg/kv/kvclient/kvcoord/BUILD.bazel @@ -61,6 +61,7 @@ go_library( "//pkg/sql/pgwire/pgerror", "//pkg/storage/enginepb", "//pkg/util", + "//pkg/util/admission/admissionpb", "//pkg/util/buildutil", "//pkg/util/contextutil", "//pkg/util/ctxgroup", diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 9ce54fa84766..f9d4e113e1e7 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/envutil" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" @@ -506,6 +507,14 @@ func (ds *DistSender) singleRangeFeed( RangeID: desc.RangeID, }, WithDiff: withDiff, + AdmissionHeader: roachpb.AdmissionHeader{ + // NB: AdmissionHeader is used only at the start of the range feed + // stream since the initial catch-up scan is expensive. + Priority: int32(admissionpb.BulkNormalPri), + CreateTime: timeutil.Now().UnixNano(), + Source: roachpb.AdmissionHeader_FROM_SQL, + NoMemoryReservedAtSource: true, + }, } var latencyFn LatencyFunc