Skip to content

Commit

Permalink
changfeedccl: Introduce a knob to control concurrency of scan requests.
Browse files Browse the repository at this point in the history
Stop relying on ExportRequestLimit to determine the number of concurrent
export requests, and introduce a decidated ScanRequestLimit setting.

If the setting is specified, uses that setting; otherwise, the default
value is computed as 3 * (number of nodes in the cluster), which is the
old behavior, but we cap this number so that concurrency does not get
out of hand if running in a very large cluster.

Fixes #67190

Release Nodes: Provide a better configurability of scan request
concurrency.  Scan requests are issued by changefeeds during the
backfill.
  • Loading branch information
Yevgeniy Miretskiy committed Jul 6, 2021
1 parent 0682747 commit 96e41c9
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 16 deletions.
9 changes: 9 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,12 @@ var FrontierCheckpointMaxBytes = settings.RegisterByteSizeSetting(
"controls the maximum size of the checkpoint as a total size of key bytes",
1<<20,
)

// ScanRequestLimit is the number of Scan requests that can run at once.
// Scan requests are issued when changefeed performs the backfill.
// If set to 0, a reasonable default will be chosen.
var ScanRequestLimit = settings.RegisterIntSetting(
"changefeed.backfill.concurrent_scan_requests",
"number of concurrent scan requests per node issued during a backfill",
0,
)
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/kvfeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/covering",
"//pkg/storage/enginepb",
Expand Down
40 changes: 25 additions & 15 deletions pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import (
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/covering"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
Expand Down Expand Up @@ -64,17 +65,7 @@ func (p *scanRequestScanner) Scan(
return err
}

// Export requests for the various watched spans are executed in parallel,
// with a semaphore-enforced limit based on a cluster setting.
// The spans here generally correspond with range boundaries.
approxNodeCount, err := clusterNodeCount(p.gossip)
if err != nil {
// can't count nodes in tenants
approxNodeCount = 1
}

maxConcurrentExports := approxNodeCount *
int(kvserver.ExportRequestsLimit.Get(&p.settings.SV))
maxConcurrentExports := maxConcurrentExportRequests(p.gossip, &p.settings.SV)
exportLim := limit.MakeConcurrentRequestLimiter("changefeedExportRequestLimiter", maxConcurrentExports)
g := ctxgroup.WithContext(ctx)
// atomicFinished is used only to enhance debugging messages.
Expand Down Expand Up @@ -267,15 +258,34 @@ func allRangeSpans(
}

// clusterNodeCount returns the approximate number of nodes in the cluster.
func clusterNodeCount(gw gossip.OptionalGossip) (int, error) {
func clusterNodeCount(gw gossip.OptionalGossip) int {
g, err := gw.OptionalErr(47971)
if err != nil {
return 0, err
// can't count nodes in tenants
return 1
}
var nodes int
_ = g.IterateInfos(gossip.KeyNodeIDPrefix, func(_ string, _ gossip.Info) error {
nodes++
return nil
})
return nodes, nil
return nodes
}

// maxConcurrentExportRequests returns the number of concurrent scan requests.
func maxConcurrentExportRequests(gw gossip.OptionalGossip, sv *settings.Values) int {
// If the user specified ScanRequestLimit -- use that value.
if max := changefeedbase.ScanRequestLimit.Get(sv); max > 0 {
return int(max)
}

nodes := clusterNodeCount(gw)
// This is all hand-wavy: 3 per node used to be the default for a very long time.
// However, this could get out of hand if the clusters are large.
// So cap the max to an arbitrary value of a 100.
max := 3 * nodes
if max > 100 {
max = 100
}
return max
}

0 comments on commit 96e41c9

Please sign in to comment.