Skip to content

Commit

Permalink
kvclient: use NormalPri for system-table rangefeed ..
Browse files Browse the repository at this point in the history
.. catch-up scans, introduce a private cluster setting
(kvadmission.rangefeed_catchup_scan_elastic_control.enabled) to
selectively switch off catch-up scan integration if needed, and plumb
kvadmission.Pacer in explicitly to rangefeed catchup scan loop instead
of opaquely through the surrounding context.

Release note: None
  • Loading branch information
irfansharif committed Oct 26, 2022
1 parent fd15a1c commit 9f11345
Show file tree
Hide file tree
Showing 20 changed files with 114 additions and 54 deletions.
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ var ScanRequestLimit = settings.RegisterIntSetting(
)

// ScanRequestSize is the target size of the scan request response.
//
// TODO(cdc,yevgeniy,irfansharif): 16 MiB is too large for "elastic" work such
// as this; reduce the default. Evaluate this as part of #90089.
var ScanRequestSize = settings.RegisterIntSetting(
settings.TenantWritable,
"changefeed.backfill.scan_request_size",
Expand Down
9 changes: 8 additions & 1 deletion pkg/ccl/changefeedccl/kvfeed/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,14 @@ func (p *scanRequestScanner) exportSpan(
r.ScanFormat = roachpb.BATCH_RESPONSE
b.Header.TargetBytes = targetBytesPerScan
b.AdmissionHeader = roachpb.AdmissionHeader{
Priority: int32(admissionpb.BulkNormalPri),
// TODO(irfansharif): Make this configurable if we want system table
// scanners or support "high priority" changefeeds to run at higher
// priorities. We use higher AC priorities for system-internal
// rangefeeds listening in on system table changes.
Priority: int32(admissionpb.BulkNormalPri),
// We specify a creation time for each batch (as opposed to at the
// txn level) -- this way later batches from earlier txns don't just
// out compete batches from newer txns.
CreateTime: start.UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
Expand Down
21 changes: 18 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func maxConcurrentCatchupScans(sv *settings.Values) int {

type rangeFeedConfig struct {
useMuxRangeFeed bool
overSystemTable bool
}

// RangeFeedOption configures a RangeFeed.
Expand All @@ -104,6 +105,14 @@ func WithMuxRangeFeed() RangeFeedOption {
})
}

// WithSystemTablePriority is used for system-internal rangefeeds, it uses a
// higher admission priority during catch up scans.
func WithSystemTablePriority() RangeFeedOption {
return optionFunc(func(c *rangeFeedConfig) {
c.overSystemTable = true
})
}

// A "kill switch" to disable multiplexing rangefeed if severe issues discovered with new implementation.
var enableMuxRangeFeed = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_MULTIPLEXING_RANGEFEED", true)

Expand Down Expand Up @@ -196,7 +205,7 @@ func (ds *DistSender) RangeFeedSpans(
// Spawn a child goroutine to process this feed.
g.GoCtx(func(ctx context.Context) error {
return ds.partialRangeFeed(ctx, rr, eventProducer, sri.rs, sri.startAfter,
sri.token, withDiff, &catchupSem, rangeCh, eventCh)
sri.token, withDiff, &catchupSem, rangeCh, eventCh, cfg)
})
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -372,6 +381,7 @@ func (ds *DistSender) partialRangeFeed(
catchupSem *limit.ConcurrentRequestLimiter,
rangeCh chan<- singleRangeInfo,
eventCh chan<- RangeFeedMessage,
cfg rangeFeedConfig,
) error {
// Bound the partial rangefeed to the partial span.
span := rs.AsRawSpanWithNoLocals()
Expand Down Expand Up @@ -408,7 +418,7 @@ func (ds *DistSender) partialRangeFeed(
// Establish a RangeFeed for a single Range.
maxTS, err := ds.singleRangeFeed(
ctx, span, startAfter, withDiff, token.Desc(),
catchupSem, eventCh, streamProducerFactory, active.onRangeEvent)
catchupSem, eventCh, streamProducerFactory, active.onRangeEvent, cfg)

// Forward the timestamp in case we end up sending it again.
startAfter.Forward(maxTS)
Expand Down Expand Up @@ -496,11 +506,16 @@ func (ds *DistSender) singleRangeFeed(
eventCh chan<- RangeFeedMessage,
streamProducerFactory rangeFeedEventProducerFactory,
onRangeEvent onRangeEventCb,
cfg rangeFeedConfig,
) (hlc.Timestamp, error) {
// Ensure context is cancelled on all errors, to prevent gRPC stream leaks.
ctx, cancelFeed := context.WithCancel(ctx)
defer cancelFeed()

admissionPri := admissionpb.BulkNormalPri
if cfg.overSystemTable {
admissionPri = admissionpb.NormalPri
}
args := roachpb.RangeFeedRequest{
Span: span,
Header: roachpb.Header{
Expand All @@ -511,7 +526,7 @@ func (ds *DistSender) singleRangeFeed(
AdmissionHeader: roachpb.AdmissionHeader{
// NB: AdmissionHeader is used only at the start of the range feed
// stream since the initial catch-up scan is expensive.
Priority: int32(admissionpb.BulkNormalPri),
Priority: int32(admissionPri),
CreateTime: timeutil.Now().UnixNano(),
Source: roachpb.AdmissionHeader_FROM_SQL,
NoMemoryReservedAtSource: true,
Expand Down
10 changes: 10 additions & 0 deletions pkg/kv/kvclient/rangefeed/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type config struct {
onSSTable OnSSTable
onDeleteRange OnDeleteRange
extraPProfLabels []string
overSystemTable bool
}

type scanConfig struct {
Expand Down Expand Up @@ -287,3 +288,12 @@ func WithPProfLabel(key, value string) Option {
c.extraPProfLabels = append(c.extraPProfLabels, key, value)
})
}

// WithSystemTablePriority communicates that the rangefeed is over a system
// table and thus operates at a higher priority (this primarily affects
// admission control).
func WithSystemTablePriority() Option {
return optionFunc(func(c *config) {
c.overSystemTable = true
})
}
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/rangefeed/db_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ func (dbc *dbAdapter) RangeFeed(
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error {
return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC)
return dbc.distSender.RangeFeed(ctx, spans, startFrom, withDiff, eventC, opts...)
}

// concurrentBoundAccount is a thread safe bound account.
Expand Down
13 changes: 9 additions & 4 deletions pkg/kv/kvclient/rangefeed/mocks_generated_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion pkg/kv/kvclient/rangefeed/rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type DB interface {
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error

// Scan encapsulates scanning a key span at a given point in time. The method
Expand Down Expand Up @@ -287,6 +288,11 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {
// draining when the rangefeed fails.
eventCh := make(chan kvcoord.RangeFeedMessage)

var rangefeedOpts []kvcoord.RangeFeedOption
if f.overSystemTable {
rangefeedOpts = append(rangefeedOpts, kvcoord.WithSystemTablePriority())
}

for i := 0; r.Next(); i++ {
ts := frontier.Frontier()
if log.ExpensiveLogEnabled(ctx, 1) {
Expand All @@ -296,7 +302,7 @@ func (f *RangeFeed) run(ctx context.Context, frontier *span.Frontier) {
start := timeutil.Now()

rangeFeedTask := func(ctx context.Context) error {
return f.client.RangeFeed(ctx, f.spans, ts, f.withDiff, eventCh)
return f.client.RangeFeed(ctx, f.spans, ts, f.withDiff, eventCh, rangefeedOpts...)
}
processEventsTask := func(ctx context.Context) error {
return f.processEvents(ctx, frontier, eventCh)
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvclient/rangefeed/rangefeed_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (m *mockClient) RangeFeed(
startFrom hlc.Timestamp,
withDiff bool,
eventC chan<- kvcoord.RangeFeedMessage,
opts ...kvcoord.RangeFeedOption,
) error {
return m.rangefeed(ctx, spans, startFrom, withDiff, eventC)
}
Expand Down Expand Up @@ -364,7 +365,7 @@ func TestBackoffOnRangefeedFailure(t *testing.T) {
Times(3).
Return(errors.New("rangefeed failed"))
db.EXPECT().RangeFeed(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Do(func(context.Context, []roachpb.Span, hlc.Timestamp, bool, chan<- kvcoord.RangeFeedMessage) {
Do(func(context.Context, []roachpb.Span, hlc.Timestamp, bool, chan<- kvcoord.RangeFeedMessage, ...kvcoord.RangeFeedOption) {
cancel()
}).
Return(nil)
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvclient/rangefeed/rangefeedcache/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,10 @@ func (s *Watcher) Run(ctx context.Context) error {
case frontierBumpedCh <- struct{}{}:
}
}),
// TODO(irfansharif): Consider making this configurable on the Watcher
// type. As of 2022-11 all uses of this type are system-internal ones
// where a higher admission-pri makes sense.
rangefeed.WithSystemTablePriority(),
rangefeed.WithDiff(s.withPrevValue),
rangefeed.WithRowTimestampInInitialScan(true),
rangefeed.WithOnInitialScanError(func(ctx context.Context, err error) (shouldFail bool) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/client_replica_circuit_breaker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {
defer cancel()
stream1 := &dummyStream{t: t, ctx: ctx, name: "rangefeed1", recv: make(chan *roachpb.RangeFeedEvent)}
require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream1", func(ctx context.Context) {
err := tc.repls[0].RangeFeed(ctx, args, stream1).GoError()
err := tc.repls[0].RangeFeed(args, stream1, nil /* pacer */).GoError()
if ctx.Err() != nil {
return // main goroutine stopping
}
Expand Down Expand Up @@ -496,7 +496,7 @@ func TestReplicaCircuitBreaker_RangeFeed(t *testing.T) {
// the breaker.
stream2 := &dummyStream{t: t, ctx: ctx, name: "rangefeed2", recv: make(chan *roachpb.RangeFeedEvent)}
require.NoError(t, tc.Stopper().RunAsyncTask(ctx, "stream2", func(ctx context.Context) {
err := tc.repls[0].RangeFeed(ctx, args, stream2).GoError()
err := tc.repls[0].RangeFeed(args, stream2, nil /* pacer */).GoError()
if ctx.Err() != nil {
return // main goroutine stopping
}
Expand Down
44 changes: 17 additions & 27 deletions pkg/kv/kvserver/kvadmission/kvadmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ var elasticCPUDurationPerExportRequest = settings.RegisterDurationSetting(
)

// elasticCPUDurationPerRangefeedScanUnit controls how many CPU tokens are
// allotted for each unit of work during rangefeed catchup scans.
// allotted for each unit of work during rangefeed catchup scans. Only takes
// effect if kvadmission.rangefeed_catchup_scan_elastic_control.enabled is set.
var elasticCPUDurationPerRangefeedScanUnit = settings.RegisterDurationSetting(
settings.SystemOnly,
"kvadmission.elastic_cpu.duration_per_rangefeed_scan_unit",
Expand All @@ -71,6 +72,15 @@ var elasticCPUDurationPerRangefeedScanUnit = settings.RegisterDurationSetting(
},
)

// rangefeedCatchupScanElasticControlEnabled determines whether rangefeed catch
// up scans integrate with elastic CPU control.
var rangefeedCatchupScanElasticControlEnabled = settings.RegisterBoolSetting(
settings.SystemOnly,
"kvadmission.rangefeed_catchup_scan_elastic_control.enabled",
"determines whether rangefeed catchup scans integrate with the elastic CPU control",
true,
)

// Controller provides admission control for the KV layer.
type Controller interface {
// AdmitKVWork must be called before performing KV work.
Expand All @@ -83,8 +93,9 @@ type Controller interface {
// executing.
AdmittedKVWorkDone(Handle, *StoreWriteBytes)
// AdmitRangefeedRequest must be called before serving rangefeed requests.
// It returns a Pacer that's used within rangefeed catchup scans (typically
// CPU-intensive and affects scheduling latencies negatively).
// If enabled, it returns a non-nil Pacer that's to be used within rangefeed
// catchup scans (typically CPU-intensive and affecting scheduling
// latencies).
AdmitRangefeedRequest(roachpb.TenantID, *roachpb.RangeFeedRequest) *Pacer
// SetTenantWeightProvider is used to set the provider that will be
// periodically polled for weights. The stopper should be used to terminate
Expand Down Expand Up @@ -292,10 +303,9 @@ func (n *controllerImpl) AdmittedKVWorkDone(ah Handle, writeBytes *StoreWriteByt
func (n *controllerImpl) AdmitRangefeedRequest(
tenantID roachpb.TenantID, request *roachpb.RangeFeedRequest,
) *Pacer {
// TODO(irfansharif): We need to version gate/be defensive when integrating
// rangefeeds since admission headers will not be fully set on older version
// nodes. See EnableRangefeedElasticCPUControl in cockroach_versions.go.
// Consider a cluster setting too.
if !rangefeedCatchupScanElasticControlEnabled.Get(&n.settings.SV) {
return nil
}

return &Pacer{
unit: elasticCPUDurationPerRangefeedScanUnit.Get(&n.settings.SV),
Expand Down Expand Up @@ -469,23 +479,3 @@ func (p *Pacer) Close() {
p.wq.AdmittedWorkDone(p.cur)
p.cur = nil
}

type pacerKey struct{}

// ContextWithPacer returns a Context wrapping the supplied Pacer, if any.
func ContextWithPacer(ctx context.Context, h *Pacer) context.Context {
if h == nil {
return ctx
}
return context.WithValue(ctx, pacerKey{}, h)
}

// PacerFromContext returns the Pacer contained in the Context, if any.
func PacerFromContext(ctx context.Context) *Pacer {
val := ctx.Value(pacerKey{})
h, ok := val.(*Pacer)
if !ok {
return nil
}
return h
}
16 changes: 12 additions & 4 deletions pkg/kv/kvserver/rangefeed/catchup_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ package rangefeed
import (
"bytes"
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvadmission"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -154,13 +156,23 @@ func (i *CatchUpIterator) CatchUpScan(
var lastKey roachpb.Key
var meta enginepb.MVCCMetadata
i.SeekGE(storage.MVCCKey{Key: i.span.Key})

every := log.Every(100 * time.Millisecond)
for {
if ok, err := i.Valid(); err != nil {
return err
} else if !ok {
break
}

if err := i.pacer.Pace(ctx); err != nil {
// We're unable to pace things automatically -- shout loudly
// semi-infrequently but don't fail the rangefeed itself.
if every.ShouldLog() {
log.Errorf(ctx, "automatic pacing: %v", err)
}
}

// Emit any new MVCC range tombstones when their start key is encountered.
// Range keys can currently only be MVCC range tombstones.
//
Expand Down Expand Up @@ -330,10 +342,6 @@ func (i *CatchUpIterator) CatchUpScan(
i.Next()
}
}

if err := i.pacer.Pace(ctx); err != nil {
return errors.Wrap(err, "automatic pacing: %v")
}
}

// Output events for the last key encountered.
Expand Down
9 changes: 5 additions & 4 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,22 @@ func (tp *rangefeedTxnPusher) ResolveIntents(
// complete. The surrounding store's ConcurrentRequestLimiter is used to limit
// the number of rangefeeds using catch-up iterators at the same time.
func (r *Replica) RangeFeed(
ctx context.Context, args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink,
args *roachpb.RangeFeedRequest, stream roachpb.RangeFeedEventSink, pacer *kvadmission.Pacer,
) *roachpb.Error {
return r.rangeFeedWithRangeID(ctx, r.RangeID, args, stream)
return r.rangeFeedWithRangeID(r.RangeID, args, stream, pacer)
}

func (r *Replica) rangeFeedWithRangeID(
ctx context.Context,
_forStacks roachpb.RangeID,
args *roachpb.RangeFeedRequest,
stream roachpb.RangeFeedEventSink,
pacer *kvadmission.Pacer,
) *roachpb.Error {
if !r.isRangefeedEnabled() && !RangefeedEnabled.Get(&r.store.cfg.Settings.SV) {
return roachpb.NewErrorf("rangefeeds require the kv.rangefeed.enabled setting. See %s",
docs.URL(`change-data-capture.html#enable-rangefeeds-to-reduce-latency`))
}
ctx := r.AnnotateCtx(stream.Context())

rSpan, err := keys.SpanAddr(args.Span)
if err != nil {
Expand Down Expand Up @@ -225,7 +226,7 @@ func (r *Replica) rangeFeedWithRangeID(
// Assert that we still hold the raftMu when this is called to ensure
// that the catchUpIter reads from the current snapshot.
r.raftMu.AssertHeld()
return rangefeed.NewCatchUpIterator(r.Engine(), span, startTime, iterSemRelease, kvadmission.PacerFromContext(ctx))
return rangefeed.NewCatchUpIterator(r.Engine(), span, startTime, iterSemRelease, pacer)
}
}
p := r.registerWithRangefeedRaftMuLocked(
Expand Down
Loading

0 comments on commit 9f11345

Please sign in to comment.