diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 385b0243a67e..d2eb509d1a7c 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -80,3 +80,12 @@ var FrontierCheckpointMaxBytes = settings.RegisterByteSizeSetting( "controls the maximum size of the checkpoint as a total size of key bytes", 1<<20, ) + +// ScanRequestLimit is the number of Scan requests that can run at once. +// Scan requests are issued when changefeed performs the backfill. +// If set to 0, a reasonable default will be chosen. +var ScanRequestLimit = settings.RegisterIntSetting( + "changefeed.backfill.concurrent_scan_requests", + "number of concurrent scan requests per node issued during a backfill", + 0, +) diff --git a/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel b/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel index dd79b6a21e0a..5789db4986f8 100644 --- a/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel +++ b/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel @@ -19,8 +19,8 @@ go_library( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient/kvcoord", - "//pkg/kv/kvserver", "//pkg/roachpb", + "//pkg/settings", "//pkg/settings/cluster", "//pkg/sql/covering", "//pkg/storage/enginepb", diff --git a/pkg/ccl/changefeedccl/kvfeed/scanner.go b/pkg/ccl/changefeedccl/kvfeed/scanner.go index 9abca43a9d5c..8e05c08e80cc 100644 --- a/pkg/ccl/changefeedccl/kvfeed/scanner.go +++ b/pkg/ccl/changefeedccl/kvfeed/scanner.go @@ -13,14 +13,15 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/covering" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -64,17 +65,7 @@ func (p *scanRequestScanner) Scan( return err } - // Export requests for the various watched spans are executed in parallel, - // with a semaphore-enforced limit based on a cluster setting. - // The spans here generally correspond with range boundaries. - approxNodeCount, err := clusterNodeCount(p.gossip) - if err != nil { - // can't count nodes in tenants - approxNodeCount = 1 - } - - maxConcurrentExports := approxNodeCount * - int(kvserver.ExportRequestsLimit.Get(&p.settings.SV)) + maxConcurrentExports := maxConcurrentExportRequests(p.gossip, &p.settings.SV) exportLim := limit.MakeConcurrentRequestLimiter("changefeedExportRequestLimiter", maxConcurrentExports) g := ctxgroup.WithContext(ctx) // atomicFinished is used only to enhance debugging messages. @@ -267,15 +258,34 @@ func allRangeSpans( } // clusterNodeCount returns the approximate number of nodes in the cluster. -func clusterNodeCount(gw gossip.OptionalGossip) (int, error) { +func clusterNodeCount(gw gossip.OptionalGossip) int { g, err := gw.OptionalErr(47971) if err != nil { - return 0, err + // can't count nodes in tenants + return 1 } var nodes int _ = g.IterateInfos(gossip.KeyNodeIDPrefix, func(_ string, _ gossip.Info) error { nodes++ return nil }) - return nodes, nil + return nodes +} + +// maxConcurrentExportRequests returns the number of concurrent scan requests. +func maxConcurrentExportRequests(gw gossip.OptionalGossip, sv *settings.Values) int { + // If the user specified ScanRequestLimit -- use that value. + if max := changefeedbase.ScanRequestLimit.Get(sv); max > 0 { + return int(max) + } + + nodes := clusterNodeCount(gw) + // This is all hand-wavy: 3 per node used to be the default for a very long time. + // However, this could get out of hand if the clusters are large. + // So cap the max to an arbitrary value of a 100. + max := 3 * nodes + if max > 100 { + max = 100 + } + return max }