Skip to content

Commit

Permalink
kvclient: use NormalPri for system-table rangefeed ..
Browse files Browse the repository at this point in the history
.. catch-up scans, introduce a private cluster setting
(kvadmission.rangefeed_catchup_scan_elastic_control.enabled) to
selectively switch off catch-up scan integration if needed, and plumb
kvadmission.Pacer in explicitly to rangefeed catchup scan loop instead
of opaquely through the surrounding context.

Release note: None
  • Loading branch information
irfansharif committed Nov 3, 2022
1 parent 15d53db commit a042b60
Show file tree
Hide file tree
Showing 22 changed files with 165 additions and 84 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ var ScanRequestLimit = settings.RegisterIntSetting(
)

// ScanRequestSize is the target size of the scan request response.
//
// TODO(cdc,yevgeniy,irfansharif): 16 MiB is too large for "elastic" work such
// as this; reduce the default. Evaluate this as part of #90089.
var ScanRequestSize = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.backfill.scan_request_size",
Expand Down
9 changes: 8 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,14 @@ func (p *scanRequestScanner) exportSpan(
r.ScanFormat = roachpb.BATCH_RESPONSE
b.Header.TargetBytes = targetBytesPerScan
b.AdmissionHeader = roachpb.AdmissionHeader{
Priority: int32(admissionpb.BulkNormalPri),
// TODO(irfansharif): Make this configurable if we want system table
// scanners or support "high priority" changefeeds to run at higher
// priorities. We use higher AC priorities for system-internal
// rangefeeds listening in on system table changes.
Priority: int32(admissionpb.BulkNormalPri),
// We specify a creation time for each batch (as opposed to at the
// txn level) -- this way later batches from earlier txns don't just
// out compete batches from newer txns.
CreateTime: start.UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,11 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn {
// is marked as poisoned and all future ops fail fast until the retry. The
// callback may return either nil or the retryable error. Txn is responsible for
// resetting the transaction and retrying the callback.
//
// TODO(irfansharif): Audit uses of this since API since it bypasses AC. Make
// the other variant (TxnWithAdmissionControl) the default, or maybe rename this
// to be more explicit (TxnWithoutAdmissionControl) so new callers have to be
// conscious about what they want.
func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) error) error {
return db.TxnWithAdmissionControl(
ctx, roachpb.AdmissionHeader_OTHER, admissionpb.NormalPri, retryable)
Expand Down
21 changes: 18 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func maxConcurrentCatchupScans(sv *settings.Values) int {

type rangeFeedConfig struct {
useMuxRangeFeed bool
overSystemTable bool
}

// RangeFeedOption configures a RangeFeed.
Expand All @@ -104,6 +105,14 @@ func WithMuxRangeFeed() RangeFeedOption {
})
}

// WithSystemTablePriority is used for system-internal rangefeeds, it uses a
// higher admission priority during catch up scans.
func WithSystemTablePriority() RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.overSystemTable = true
})
}

// A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation.
var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true)

Expand Down Expand Up @@ -196,7 +205,7 @@ func (ds *DistSender) RangeFeedSpans(
// Spawn a child goroutine to process this feed.
g.GoCtx(func(ctx context.Context) error {
return ds.partialRangeFeed(ctx, rr, eventProducer, sri.rs, sri.startAfter,
sri.token, withDiff, &catchupSem, rangeCh, eventCh)
sri.token, withDiff, &catchupSem, rangeCh, eventCh, cfg)
})
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -372,6 +381,7 @@ func (ds *DistSender) partialRangeFeed(
catchupSem *limit.ConcurrentRequestLimiter,
rangeCh chan<- singleRangeInfo,
eventCh chan<- RangeFeedMessage,
cfg rangeFeedConfig,
) error {
// Bound the partial rangefeed to the partial span.
span := rs.AsRawSpanWithNoLocals()
Expand Down Expand Up @@ -408,7 +418,7 @@ func (ds *DistSender) partialRangeFeed(
// Establish a RangeFeed for a single Range.
maxTS, err := ds.singleRangeFeed(
ctx, span, startAfter, withDiff, token.Desc(),
catchupSem, eventCh, streamProducerFactory, active.onRangeEvent)
catchupSem, eventCh, streamProducerFactory, active.onRangeEvent, cfg)

// Forward the timestamp in case we end up sending it again.
startAfter.Forward(maxTS)
Expand Down Expand Up @@ -496,11 +506,16 @@ func (ds *DistSender) singleRangeFeed(
eventCh chan<- RangeFeedMessage,
streamProducerFactory rangeFeedEventProducerFactory,
onRangeEvent onRangeEventCb,
cfg rangeFeedConfig,
) (hlc.Timestamp, error) {
// Ensure context is cancelled on all errors, to prevent gRPC stream leaks.
ctx, cancelFeed := context.WithCancel(ctx)
defer cancelFeed()

admissionPri := admissionpb.BulkNormalPri
if cfg.overSystemTable {
admissionPri = admissionpb.NormalPri
}
args := roachpb.RangeFeedRequest{
Span: span,
Header: roachpb.Header{
Expand All @@ -511,7 +526,7 @@ func (ds *DistSender) singleRangeFeed(
AdmissionHeader: roachpb.AdmissionHeader{
// NB: AdmissionHeader is used only at the start of the range feed
// stream since the initial catch-up scan is expensive.
Priority: int32(admissionpb.BulkNormalPri),
Priority: int32(admissionPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvclient/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/util/admission/admissionpb",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/limit",
Expand Down
15 changes: 15 additions & 0 deletions pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ type scanConfig struct {

// configures retry behavior
retryBehavior ScanRetryBehavior

// overSystemTable indicates whether this rangefeed is over a system table
// (used internally for CRDB's own functioning) and therefore should be
// treated with a more appropriate admission pri (NormalPri instead of
// BulkNormalPri).
overSystemTable bool
}

type optionFunc func(*config)
Expand Down Expand Up @@ -287,3 +293,12 @@ func WithPProfLabel(key, value string) Option {
c.extraPProfLabels = append(c.extraPProfLabels, key, value)
})
}

// WithSystemTablePriority communicates that the rangefeed is over a system
// table and thus operates at a higher priority (this primarily affects
// admission control).
func WithSystemTablePriority() Option {
return optionFunc(func(c *config) {
c.overSystemTable = true
})
}
73 changes: 42 additions & 31 deletions pkg/kv/kvclient/rangefeed/db_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/limit"
Expand Down Expand Up @@ -74,8 +75,9 @@ func (dbc *dbAdapter) RangeFeed(
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error {
return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC)
return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC, opts...)
}

// concurrentBoundAccount is a thread safe bound account.
Expand Down Expand Up @@ -118,7 +120,7 @@ func (dbc *dbAdapter) Scan(
// If we don't have parallelism configured, just scan each span in turn.
if cfg.scanParallelism == nil {
for _, sp := range spans {
if err := dbc.scanSpan(ctx, sp, asOf, rowFn, cfg.targetScanBytes, cfg.onSpanDone, acc); err != nil {
if err := dbc.scanSpan(ctx, sp, asOf, rowFn, cfg.targetScanBytes, cfg.onSpanDone, cfg.overSystemTable, acc); err != nil {
return err
}
}
Expand Down Expand Up @@ -154,7 +156,7 @@ func (dbc *dbAdapter) Scan(
g := ctxgroup.WithContext(ctx)
err := dbc.divideAndSendScanRequests(
ctx, &g, spans, asOf, rowFn,
parallelismFn, cfg.targetScanBytes, cfg.onSpanDone, acc)
parallelismFn, cfg.targetScanBytes, cfg.onSpanDone, cfg.overSystemTable, acc)
if err != nil {
cancel()
}
Expand All @@ -168,6 +170,7 @@ func (dbc *dbAdapter) scanSpan(
rowFn func(value roachpb.KeyValue),
targetScanBytes int64,
onScanDone OnScanCompleted,
overSystemTable bool,
acc *concurrentBoundAccount,
) error {
if acc != nil {
Expand All @@ -177,39 +180,46 @@ func (dbc *dbAdapter) scanSpan(
defer acc.Shrink(ctx, targetScanBytes)
}

return dbc.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetFixedTimestamp(ctx, asOf); err != nil {
return err
}
sp := span
var b kv.Batch
for {
b.Header.TargetBytes = targetScanBytes
b.Scan(sp.Key, sp.EndKey)
if err := txn.Run(ctx, &b); err != nil {
admissionPri := admissionpb.BulkNormalPri
if overSystemTable {
admissionPri = admissionpb.NormalPri
}
return dbc.db.TxnWithAdmissionControl(ctx,
roachpb.AdmissionHeader_ROOT_KV,
admissionPri,
func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetFixedTimestamp(ctx, asOf); err != nil {
return err
}
res := b.Results[0]
for _, row := range res.Rows {
rowFn(roachpb.KeyValue{Key: row.Key, Value: *row.Value})
}
if res.ResumeSpan == nil {
if onScanDone != nil {
return onScanDone(ctx, sp)
sp := span
var b kv.Batch
for {
b.Header.TargetBytes = targetScanBytes
b.Scan(sp.Key, sp.EndKey)
if err := txn.Run(ctx, &b); err != nil {
return err
}
res := b.Results[0]
for _, row := range res.Rows {
rowFn(roachpb.KeyValue{Key: row.Key, Value: *row.Value})
}
if res.ResumeSpan == nil {
if onScanDone != nil {
return onScanDone(ctx, sp)
}
return nil
}
return nil
}

if onScanDone != nil {
if err := onScanDone(ctx, roachpb.Span{Key: sp.Key, EndKey: res.ResumeSpan.Key}); err != nil {
return err
if onScanDone != nil {
if err := onScanDone(ctx, roachpb.Span{Key: sp.Key, EndKey: res.ResumeSpan.Key}); err != nil {
return err
}
}
}

sp = res.ResumeSpanAsValue()
b = kv.Batch{}
}
})
sp = res.ResumeSpanAsValue()
b = kv.Batch{}
}
})
}

// divideAndSendScanRequests divides spans into small ranges based on range boundaries,
Expand All @@ -224,6 +234,7 @@ func (dbc *dbAdapter) divideAndSendScanRequests(
parallelismFn func() int,
targetScanBytes int64,
onSpanDone OnScanCompleted,
overSystemTable bool,
acc *concurrentBoundAccount,
) error {
// Build a span group so that we can iterate spans in order.
Expand Down Expand Up @@ -261,7 +272,7 @@ func (dbc *dbAdapter) divideAndSendScanRequests(
sp := partialRS.AsRawSpanWithNoLocals()
workGroup.GoCtx(func(ctx context.Context) error {
defer limAlloc.Release()
return dbc.scanSpan(ctx, sp, asOf, rowFn, targetScanBytes, onSpanDone, acc)
return dbc.scanSpan(ctx, sp, asOf, rowFn, targetScanBytes, onSpanDone, overSystemTable, acc)
})

if !ri.NeedAnother(nextRS) {
Expand Down
13 changes: 9 additions & 4 deletions pkg/kv/kvclient/rangefeed/mocks_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type DB interface {
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error

// Scan encapsulates scanning a key span at a given point in time. The method
Expand Down Expand Up @@ -287,6 +288,11 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {
// draining when the rangefeed fails.
eventCh := make(chan kvcoord.RangeFeedMessage)

var rangefeedOpts []kvcoord.RangeFeedOption
if f.scanConfig.overSystemTable {
rangefeedOpts = append(rangefeedOpts, kvcoord.WithSystemTablePriority())
}

for i := 0; r.Next(); i++ {
ts := frontier.Frontier()
if log.ExpensiveLogEnabled(ctx, 1) {
Expand All @@ -296,7 +302,7 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {
start := timeutil.Now()

rangeFeedTask := func(ctx context.Context) error {
return f.client.RangeFeed(ctx, f.spans, ts, f.withDiff, eventCh)
return f.client.RangeFeed(ctx, f.spans, ts, f.withDiff, eventCh, rangefeedOpts...)
}
processEventsTask := func(ctx context.Context) error {
return f.processEvents(ctx, frontier, eventCh)
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (m *mockClient) RangeFeed(
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error {
return m.rangefeed(ctx, spans, startFrom, withDiff, eventC)
}
Expand Down Expand Up @@ -364,7 +365,7 @@ func TestBackoffOnRangefeedFailure(t *testing.T) {
Times(3).
Return(errors.New("rangefeed failed"))
db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Do(func(context.Context, []roachpb.Span, hlc.Timestamp, bool, chan<- kvcoord.RangeFeedMessage) {
Do(func(context.Context, []roachpb.Span, hlc.Timestamp, bool, chan<- kvcoord.RangeFeedMessage, ...kvcoord.RangeFeedOption) {
cancel()
}).
Return(nil)
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ func (s *Watcher) Run(ctx context.Context) error {
case frontierBumpedCh <- struct{}{}:
}
}),
// TODO(irfansharif): Consider making this configurable on the Watcher
// type. As of 2022-11 all uses of this type are system-internal ones
// where a higher admission-pri makes sense.
rangefeed.WithSystemTablePriority(),
rangefeed.WithDiff(s.withPrevValue),
rangefeed.WithRowTimestampInInitialScan(true),
rangefeed.WithOnInitialScanError(func(ctx context.Context, err error) (shouldFail bool) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {
defer cancel()
stream1 := &dummyStream{t: t, ctx: ctx, name: "rangefeed1", recv: make(chan *roachpb.RangeFeedEvent)}
require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream1", func(ctx context.Context) {
err := tc.repls[0].RangeFeed(ctx, args, stream1).GoError()
err := tc.repls[0].RangeFeed(args, stream1, nil /* pacer */).GoError()
if ctx.Err() != nil {
return // main goroutine stopping
}
Expand Down Expand Up @@ -496,7 +496,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {
// the breaker.
stream2 := &dummyStream{t: t, ctx: ctx, name: "rangefeed2", recv: make(chan *roachpb.RangeFeedEvent)}
require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream2", func(ctx context.Context) {
err := tc.repls[0].RangeFeed(ctx, args, stream2).GoError()
err := tc.repls[0].RangeFeed(args, stream2, nil /* pacer */).GoError()
if ctx.Err() != nil {
return // main goroutine stopping
}
Expand Down
Loading

0 comments on commit a042b60

Please sign in to comment.