Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.2: kv,rangefeed: integrate catchup scans with elastic cpu #92439

Merged
merged 5 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,10 @@
<tr><td><code>kv.snapshot_delegation.enabled</code></td><td>boolean</td><td><code>false</code></td><td>set to true to allow snapshots from follower replicas</td></tr>
<tr><td><code>kv.snapshot_rebalance.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for rebalance and upreplication snapshots</td></tr>
<tr><td><code>kv.snapshot_recovery.max_rate</code></td><td>byte size</td><td><code>32 MiB</code></td><td>the rate limit (bytes/sec) to use for recovery snapshots</td></tr>
<tr><td><code>kv.store.admission.provisioned_bandwidth</code></td><td>byte size</td><td><code>0 B</code></td><td>if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be over-ridden on a per-store basis using the --store flag</td></tr>
<tr><td><code>kv.transaction.max_intents_bytes</code></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track locks in transactions</td></tr>
<tr><td><code>kv.transaction.max_refresh_spans_bytes</code></td><td>integer</td><td><code>4194304</code></td><td>maximum number of bytes used to track refresh spans in serializable transactions</td></tr>
<tr><td><code>kv.transaction.reject_over_max_intents_budget.enabled</code></td><td>boolean</td><td><code>false</code></td><td>if set, transactions that exceed their lock tracking budget (kv.transaction.max_intents_bytes) are rejected instead of having their lock spans imprecisely compressed</td></tr>
<tr><td><code>kvadmission.store.provisioned_bandwidth</code></td><td>byte size</td><td><code>0 B</code></td><td>if set to a non-zero value, this is used as the provisioned bandwidth (in bytes/s), for each store. It can be over-ridden on a per-store basis using the --store flag</td></tr>
<tr><td><code>schedules.backup.gc_protection.enabled</code></td><td>boolean</td><td><code>true</code></td><td>enable chaining of GC protection across backups run as part of a schedule</td></tr>
<tr><td><code>security.ocsp.mode</code></td><td>enumeration</td><td><code>off</code></td><td>use OCSP to check whether TLS certificates are revoked. If the OCSP server is unreachable, in strict mode all certificates will be rejected and in lax mode all certificates will be accepted. [off = 0, lax = 1, strict = 2]</td></tr>
<tr><td><code>security.ocsp.timeout</code></td><td>duration</td><td><code>3s</code></td><td>timeout before considering the OCSP server unreachable</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,7 @@ GO_TARGETS = [
"//pkg/kv/kvserver/idalloc:idalloc_test",
"//pkg/kv/kvserver/intentresolver:intentresolver",
"//pkg/kv/kvserver/intentresolver:intentresolver_test",
"//pkg/kv/kvserver/kvadmission:kvadmission",
"//pkg/kv/kvserver/kvserverbase:kvserverbase",
"//pkg/kv/kvserver/kvserverpb:kvserverpb",
"//pkg/kv/kvserver/liveness/livenesspb:livenesspb",
Expand Down Expand Up @@ -2439,6 +2440,7 @@ GET_X_DATA_TARGETS = [
"//pkg/kv/kvserver/gc:get_x_data",
"//pkg/kv/kvserver/idalloc:get_x_data",
"//pkg/kv/kvserver/intentresolver:get_x_data",
"//pkg/kv/kvserver/kvadmission:get_x_data",
"//pkg/kv/kvserver/kvserverbase:get_x_data",
"//pkg/kv/kvserver/kvserverpb:get_x_data",
"//pkg/kv/kvserver/liveness:get_x_data",
Expand Down
7 changes: 5 additions & 2 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ var FrontierCheckpointMaxBytes = settings.RegisterByteSizeSetting(
settings.TenantWritable,
"changefeed.frontier_checkpoint_max_bytes",
"controls the maximum size of the checkpoint as a total size of key bytes",
1<<20,
1<<20, // 1 MiB
)

// ScanRequestLimit is the number of Scan requests that can run at once.
Expand All @@ -122,11 +122,14 @@ var ScanRequestLimit = settings.RegisterIntSetting(
)

// ScanRequestSize is the target size of the scan request response.
//
// TODO(cdc,yevgeniy,irfansharif): 16 MiB is too large for "elastic" work such
// as this; reduce the default. Evaluate this as part of #90089.
var ScanRequestSize = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.backfill.scan_request_size",
"the maximum number of bytes returned by each scan request",
16<<20,
16<<20, // 16 MiB
)

// SinkThrottleConfig describes throttling configuration for the sink.
Expand Down
9 changes: 8 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,14 @@ func (p *scanRequestScanner) exportSpan(
r.ScanFormat = roachpb.BATCH_RESPONSE
b.Header.TargetBytes = targetBytesPerScan
b.AdmissionHeader = roachpb.AdmissionHeader{
Priority: int32(admissionpb.BulkNormalPri),
// TODO(irfansharif): Make this configurable if we want system table
// scanners or support "high priority" changefeeds to run at higher
// priorities. We use higher AC priorities for system-internal
// rangefeeds listening in on system table changes.
Priority: int32(admissionpb.BulkNormalPri),
// We specify a creation time for each batch (as opposed to at the
// txn level) -- this way later batches from earlier txns don't just
// out compete batches from newer txns.
CreateTime: start.UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
Expand Down
1 change: 0 additions & 1 deletion pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,6 @@ const (
// supported in cloud storage and KMS.
SupportAssumeRoleAuth

// FixUserfileRelatedDescriptorCorruption adds a migration which uses
// heuristics to identify invalid table descriptors for userfile-related
// descriptors.
FixUserfileRelatedDescriptorCorruption
Expand Down
7 changes: 2 additions & 5 deletions pkg/cmd/roachprod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,15 +925,12 @@ var grafanaURLCmd = &cobra.Command{
Short: `returns a url to the grafana dashboard`,
Args: cobra.ExactArgs(1),
Run: wrap(func(cmd *cobra.Command, args []string) error {
urls, err := roachprod.GrafanaURL(context.Background(), roachprodLibraryLogger, args[0],
url, err := roachprod.GrafanaURL(context.Background(), roachprodLibraryLogger, args[0],
grafanaurlOpen)
if err != nil {
return err
}
for _, url := range urls {
fmt.Println(url)
}
fmt.Println("username: admin; pwd: admin")
fmt.Println(url)
return nil
}),
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -912,6 +912,11 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn {
// is marked as poisoned and all future ops fail fast until the retry. The
// callback may return either nil or the retryable error. Txn is responsible for
// resetting the transaction and retrying the callback.
//
// TODO(irfansharif): Audit uses of this since API since it bypasses AC. Make
// the other variant (TxnWithAdmissionControl) the default, or maybe rename this
// to be more explicit (TxnWithoutAdmissionControl) so new callers have to be
// conscious about what they want.
func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) error) error {
return db.TxnWithAdmissionControl(
ctx, roachpb.AdmissionHeader_OTHER, admissionpb.NormalPri, retryable)
Expand Down
21 changes: 18 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func maxConcurrentCatchupScans(sv *settings.Values) int {

type rangeFeedConfig struct {
useMuxRangeFeed bool
overSystemTable bool
}

// RangeFeedOption configures a RangeFeed.
Expand All @@ -104,6 +105,14 @@ func WithMuxRangeFeed() RangeFeedOption {
})
}

// WithSystemTablePriority is used for system-internal rangefeeds, it uses a
// higher admission priority during catch up scans.
func WithSystemTablePriority() RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.overSystemTable = true
})
}

// A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation.
var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true)

Expand Down Expand Up @@ -196,7 +205,7 @@ func (ds *DistSender) RangeFeedSpans(
// Spawn a child goroutine to process this feed.
g.GoCtx(func(ctx context.Context) error {
return ds.partialRangeFeed(ctx, rr, eventProducer, sri.rs, sri.startAfter,
sri.token, withDiff, &catchupSem, rangeCh, eventCh)
sri.token, withDiff, &catchupSem, rangeCh, eventCh, cfg)
})
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -372,6 +381,7 @@ func (ds *DistSender) partialRangeFeed(
catchupSem *limit.ConcurrentRequestLimiter,
rangeCh chan<- singleRangeInfo,
eventCh chan<- RangeFeedMessage,
cfg rangeFeedConfig,
) error {
// Bound the partial rangefeed to the partial span.
span := rs.AsRawSpanWithNoLocals()
Expand Down Expand Up @@ -408,7 +418,7 @@ func (ds *DistSender) partialRangeFeed(
// Establish a RangeFeed for a single Range.
maxTS, err := ds.singleRangeFeed(
ctx, span, startAfter, withDiff, token.Desc(),
catchupSem, eventCh, streamProducerFactory, active.onRangeEvent)
catchupSem, eventCh, streamProducerFactory, active.onRangeEvent, cfg)

// Forward the timestamp in case we end up sending it again.
startAfter.Forward(maxTS)
Expand Down Expand Up @@ -496,11 +506,16 @@ func (ds *DistSender) singleRangeFeed(
eventCh chan<- RangeFeedMessage,
streamProducerFactory rangeFeedEventProducerFactory,
onRangeEvent onRangeEventCb,
cfg rangeFeedConfig,
) (hlc.Timestamp, error) {
// Ensure context is cancelled on all errors, to prevent gRPC stream leaks.
ctx, cancelFeed := context.WithCancel(ctx)
defer cancelFeed()

admissionPri := admissionpb.BulkNormalPri
if cfg.overSystemTable {
admissionPri = admissionpb.NormalPri
}
args := roachpb.RangeFeedRequest{
Span: span,
Header: roachpb.Header{
Expand All @@ -511,7 +526,7 @@ func (ds *DistSender) singleRangeFeed(
AdmissionHeader: roachpb.AdmissionHeader{
// NB: AdmissionHeader is used only at the start of the range feed
// stream since the initial catch-up scan is expensive.
Priority: int32(admissionpb.BulkNormalPri),
Priority: int32(admissionPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/limit",
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ type scanConfig struct {

// configures retry behavior
retryBehavior ScanRetryBehavior

// overSystemTable indicates whether this rangefeed is over a system table
// (used internally for CRDB's own functioning) and therefore should be
// treated with a more appropriate admission pri (NormalPri instead of
// BulkNormalPri).
overSystemTable bool
}

type optionFunc func(*config)
Expand Down Expand Up @@ -287,3 +293,12 @@ func WithPProfLabel(key, value string) Option {
c.extraPProfLabels = append(c.extraPProfLabels, key, value)
})
}

// WithSystemTablePriority communicates that the rangefeed is over a system
// table and thus operates at a higher priority (this primarily affects
// admission control).
func WithSystemTablePriority() Option {
return optionFunc(func(c *config) {
c.overSystemTable = true
})
}
73 changes: 42 additions & 31 deletions pkg/kv/kvclient/rangefeed/db_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
Expand Down Expand Up @@ -74,8 +75,9 @@ func (dbc *dbAdapter) RangeFeed(
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error {
return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC)
return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC, opts...)
}

// concurrentBoundAccount is a thread safe bound account.
Expand Down Expand Up @@ -118,7 +120,7 @@ func (dbc *dbAdapter) Scan(
// If we don't have parallelism configured, just scan each span in turn.
if cfg.scanParallelism == nil {
for _, sp := range spans {
if err := dbc.scanSpan(ctx, sp, asOf, rowFn, cfg.targetScanBytes, cfg.onSpanDone, acc); err != nil {
if err := dbc.scanSpan(ctx, sp, asOf, rowFn, cfg.targetScanBytes, cfg.onSpanDone, cfg.overSystemTable, acc); err != nil {
return err
}
}
Expand Down Expand Up @@ -154,7 +156,7 @@ func (dbc *dbAdapter) Scan(
g := ctxgroup.WithContext(ctx)
err := dbc.divideAndSendScanRequests(
ctx, &g, spans, asOf, rowFn,
parallelismFn, cfg.targetScanBytes, cfg.onSpanDone, acc)
parallelismFn, cfg.targetScanBytes, cfg.onSpanDone, cfg.overSystemTable, acc)
if err != nil {
cancel()
}
Expand All @@ -168,6 +170,7 @@ func (dbc *dbAdapter) scanSpan(
rowFn func(value roachpb.KeyValue),
targetScanBytes int64,
onScanDone OnScanCompleted,
overSystemTable bool,
acc *concurrentBoundAccount,
) error {
if acc != nil {
Expand All @@ -177,39 +180,46 @@ func (dbc *dbAdapter) scanSpan(
defer acc.Shrink(ctx, targetScanBytes)
}

return dbc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetFixedTimestamp(ctx, asOf); err != nil {
return err
}
sp := span
var b kv.Batch
for {
b.Header.TargetBytes = targetScanBytes
b.Scan(sp.Key, sp.EndKey)
if err := txn.Run(ctx, &b); err != nil {
admissionPri := admissionpb.BulkNormalPri
if overSystemTable {
admissionPri = admissionpb.NormalPri
}
return dbc.db.TxnWithAdmissionControl(ctx,
roachpb.AdmissionHeader_ROOT_KV,
admissionPri,
func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetFixedTimestamp(ctx, asOf); err != nil {
return err
}
res := b.Results[0]
for _, row := range res.Rows {
rowFn(roachpb.KeyValue{Key: row.Key, Value: *row.Value})
}
if res.ResumeSpan == nil {
if onScanDone != nil {
return onScanDone(ctx, sp)
sp := span
var b kv.Batch
for {
b.Header.TargetBytes = targetScanBytes
b.Scan(sp.Key, sp.EndKey)
if err := txn.Run(ctx, &b); err != nil {
return err
}
res := b.Results[0]
for _, row := range res.Rows {
rowFn(roachpb.KeyValue{Key: row.Key, Value: *row.Value})
}
if res.ResumeSpan == nil {
if onScanDone != nil {
return onScanDone(ctx, sp)
}
return nil
}
return nil
}

if onScanDone != nil {
if err := onScanDone(ctx, roachpb.Span{Key: sp.Key, EndKey: res.ResumeSpan.Key}); err != nil {
return err
if onScanDone != nil {
if err := onScanDone(ctx, roachpb.Span{Key: sp.Key, EndKey: res.ResumeSpan.Key}); err != nil {
return err
}
}
}

sp = res.ResumeSpanAsValue()
b = kv.Batch{}
}
})
sp = res.ResumeSpanAsValue()
b = kv.Batch{}
}
})
}

// divideAndSendScanRequests divides spans into small ranges based on range boundaries,
Expand All @@ -224,6 +234,7 @@ func (dbc *dbAdapter) divideAndSendScanRequests(
parallelismFn func() int,
targetScanBytes int64,
onSpanDone OnScanCompleted,
overSystemTable bool,
acc *concurrentBoundAccount,
) error {
// Build a span group so that we can iterate spans in order.
Expand Down Expand Up @@ -261,7 +272,7 @@ func (dbc *dbAdapter) divideAndSendScanRequests(
sp := partialRS.AsRawSpanWithNoLocals()
workGroup.GoCtx(func(ctx context.Context) error {
defer limAlloc.Release()
return dbc.scanSpan(ctx, sp, asOf, rowFn, targetScanBytes, onSpanDone, acc)
return dbc.scanSpan(ctx, sp, asOf, rowFn, targetScanBytes, onSpanDone, overSystemTable, acc)
})

if !ri.NeedAnother(nextRS) {
Expand Down
13 changes: 9 additions & 4 deletions pkg/kv/kvclient/rangefeed/mocks_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading