Skip to content

Commit

Permalink
kv/kvserver: catchup -> catchUp
Browse files Browse the repository at this point in the history
Release justification: non-production code change

Release note: None
  • Loading branch information
ajwerner committed Aug 31, 2021
1 parent 2e70853 commit 9ff249e
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 63 deletions.
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/rangefeed/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

var (
metaRangeFeedCatchupScanNanos = metric.Metadata{
metaRangeFeedCatchUpScanNanos = metric.Metadata{
Name: "kv.rangefeed.catchup_scan_nanos",
Help: "Time spent in RangeFeed catchup scan",
Measurement: "Nanoseconds",
Expand All @@ -29,7 +29,7 @@ var (

// Metrics are for production monitoring of RangeFeeds.
type Metrics struct {
RangeFeedCatchupScanNanos *metric.Counter
RangeFeedCatchUpScanNanos *metric.Counter

RangeFeedSlowClosedTimestampLogN log.EveryN
RangeFeedSlowClosedTimestampNudge singleflight.Group
Expand All @@ -46,7 +46,7 @@ func (*Metrics) MetricStruct() {}
// NewMetrics makes the metrics for RangeFeeds monitoring.
func NewMetrics() *Metrics {
return &Metrics{
RangeFeedCatchupScanNanos: metric.NewCounter(metaRangeFeedCatchupScanNanos),
RangeFeedCatchUpScanNanos: metric.NewCounter(metaRangeFeedCatchUpScanNanos),
RangeFeedSlowClosedTimestampLogN: log.Every(5 * time.Second),
RangeFeedSlowClosedTimestampNudgeSem: make(chan struct{}, 1024),
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (p *Processor) run(
log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span)
}

// Construct the catchupIter before notifying the registration that it
// Construct the catchUpIter before notifying the registration that it
// has been registered. Note that if the catchUpScan is never run, then
// the iterator constructed here will be closed in disconnect.
r.maybeConstructCatchUpIter()
Expand Down Expand Up @@ -399,7 +399,7 @@ func (p *Processor) sendStop(pErr *roachpb.Error) {
func (p *Processor) Register(
span roachpb.RSpan,
startTS hlc.Timestamp,
catchupIterConstructor IteratorConstructor,
catchUpIterConstructor IteratorConstructor,
withDiff bool,
stream Stream,
errC chan<- *roachpb.Error,
Expand All @@ -410,7 +410,7 @@ func (p *Processor) Register(
p.syncEventC()

r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchupIterConstructor, withDiff,
span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff,
p.Config.EventChanCap, p.Metrics, stream, errC,
)
select {
Expand Down
84 changes: 42 additions & 42 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,18 @@ type Stream interface {
type registration struct {
// Input.
span roachpb.Span
catchupTimestamp hlc.Timestamp
catchUpTimestamp hlc.Timestamp
withDiff bool
metrics *Metrics

// catchupIterConstructor is used to construct the catchupIter if necessary.
// catchUpIterConstructor is used to construct the catchUpIter if necessary.
// The reason this constructor is plumbed down is to make sure that the
// iterator does not get constructed too late in server shutdown. However,
// it must also be stored in the struct to ensure that it is not constructed
// too late, after the raftMu has been dropped. Thus, this function, if
// non-nil, will be used to populate mu.catchupIter while the registration
// non-nil, will be used to populate mu.catchUpIter while the registration
// is being registered by the processor.
catchupIterConstructor func() storage.SimpleMVCCIterator
catchUpIterConstructor func() storage.SimpleMVCCIterator

// Output.
stream Stream
Expand All @@ -89,17 +89,17 @@ type registration struct {
outputLoopCancelFn func()
disconnected bool

// catchupIter is populated on the Processor's goroutine while the
// catchUpIter is populated on the Processor's goroutine while the
// Replica.raftMu is still held. If it is non-nil at the time that
// disconnect is called, it is closed by disconnect.
catchupIter storage.SimpleMVCCIterator
catchUpIter storage.SimpleMVCCIterator
}
}

func newRegistration(
span roachpb.Span,
startTS hlc.Timestamp,
catchupIterConstructor func() storage.SimpleMVCCIterator,
catchUpIterConstructor func() storage.SimpleMVCCIterator,
withDiff bool,
bufferSz int,
metrics *Metrics,
Expand All @@ -108,8 +108,8 @@ func newRegistration(
) registration {
r := registration{
span: span,
catchupTimestamp: startTS,
catchupIterConstructor: catchupIterConstructor,
catchUpTimestamp: startTS,
catchUpIterConstructor: catchUpIterConstructor,
withDiff: withDiff,
metrics: metrics,
stream: stream,
Expand All @@ -123,7 +123,7 @@ func newRegistration(

// publish attempts to send a single event to the output buffer for this
// registration. If the output buffer is full, the overflowed flag is set,
// indicating that live events were lost and a catchup scan should be initiated.
// indicating that live events were lost and a catch-up scan should be initiated.
// If overflowed is already set, events are ignored and not written to the
// buffer.
func (r *registration) publish(event *roachpb.RangeFeedEvent) {
Expand Down Expand Up @@ -223,9 +223,9 @@ func (r *registration) disconnect(pErr *roachpb.Error) {
r.mu.Lock()
defer r.mu.Unlock()
if !r.mu.disconnected {
if r.mu.catchupIter != nil {
r.mu.catchupIter.Close()
r.mu.catchupIter = nil
if r.mu.catchUpIter != nil {
r.mu.catchUpIter.Close()
r.mu.catchUpIter = nil
}
if r.mu.outputLoopCancelFn != nil {
r.mu.outputLoopCancelFn()
Expand All @@ -249,7 +249,7 @@ func (r *registration) disconnect(pErr *roachpb.Error) {
// have been emitted.
func (r *registration) outputLoop(ctx context.Context) error {
// If the registration has a catch-up scan, run it.
if err := r.maybeRunCatchupScan(); err != nil {
if err := r.maybeRunCatchUpScan(); err != nil {
err = errors.Wrap(err, "catch-up scan failed")
log.Errorf(ctx, "%v", err)
return err
Expand Down Expand Up @@ -294,22 +294,22 @@ func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.Ran
r.disconnect(roachpb.NewError(err))
}

// maybeRunCatchupScan starts a catchup scan which will output entries for all
// recorded changes in the replica that are newer than the catchupTimestamp.
// maybeRunCatchUpScan starts a catch-up scan which will output entries for all
// recorded changes in the replica that are newer than the catchUpTimestamp.
// This uses the iterator provided when the registration was originally created;
// after the scan completes, the iterator will be closed.
//
// If the registration does not have a catchUpIteratorConstructor, this method
// is a no-op.
func (r *registration) maybeRunCatchupScan() error {
catchupIter := r.detachCatchUpIter()
if catchupIter == nil {
func (r *registration) maybeRunCatchUpScan() error {
catchUpIter := r.detachCatchUpIter()
if catchUpIter == nil {
return nil
}
start := timeutil.Now()
defer func() {
catchupIter.Close()
r.metrics.RangeFeedCatchupScanNanos.Inc(timeutil.Since(start).Nanoseconds())
catchUpIter.Close()
r.metrics.RangeFeedCatchUpScanNanos.Inc(timeutil.Since(start).Nanoseconds())
}()

var a bufalloc.ByteAllocator
Expand Down Expand Up @@ -346,23 +346,23 @@ func (r *registration) maybeRunCatchupScan() error {
// versions of each key that are after the registration's startTS, so we
// can't use NextKey.
var meta enginepb.MVCCMetadata
catchupIter.SeekGE(startKey)
catchUpIter.SeekGE(startKey)
for {
if ok, err := catchupIter.Valid(); err != nil {
if ok, err := catchUpIter.Valid(); err != nil {
return err
} else if !ok || !catchupIter.UnsafeKey().Less(endKey) {
} else if !ok || !catchUpIter.UnsafeKey().Less(endKey) {
break
}

unsafeKey := catchupIter.UnsafeKey()
unsafeVal := catchupIter.UnsafeValue()
unsafeKey := catchUpIter.UnsafeKey()
unsafeVal := catchUpIter.UnsafeValue()
if !unsafeKey.IsValue() {
// Found a metadata key.
if err := protoutil.Unmarshal(unsafeVal, &meta); err != nil {
return errors.Wrapf(err, "unmarshaling mvcc meta: %v", unsafeKey)
}
if !meta.IsInline() {
// This is an MVCCMetadata key for an intent. The catchup scan
// This is an MVCCMetadata key for an intent. The catch-up scan
// only cares about committed values, so ignore this and skip
// past the corresponding provisional key-value. To do this,
// scan to the timestamp immediately before (i.e. the key
Expand All @@ -371,7 +371,7 @@ func (r *registration) maybeRunCatchupScan() error {
// Make a copy since should not pass an unsafe key from the iterator
// that provided it, when asking it to seek.
a, unsafeKey.Key = a.Copy(unsafeKey.Key, 0)
catchupIter.SeekGE(storage.MVCCKey{
catchUpIter.SeekGE(storage.MVCCKey{
Key: unsafeKey.Key,
Timestamp: meta.Timestamp.ToTimestamp().Prev(),
})
Expand All @@ -398,11 +398,11 @@ func (r *registration) maybeRunCatchupScan() error {

// Ignore the version if it's not inline and its timestamp is at
// or before the registration's (exclusive) starting timestamp.
ignore := !(ts.IsEmpty() || r.catchupTimestamp.Less(ts))
ignore := !(ts.IsEmpty() || r.catchUpTimestamp.Less(ts))
if ignore && !r.withDiff {
// Skip all the way to the next key.
// NB: fast-path to avoid value copy when !r.withDiff.
catchupIter.NextKey()
catchUpIter.NextKey()
continue
}

Expand All @@ -415,10 +415,10 @@ func (r *registration) maybeRunCatchupScan() error {

if ignore {
// Skip all the way to the next key.
catchupIter.NextKey()
catchUpIter.NextKey()
} else {
// Move to the next version of this key.
catchupIter.Next()
catchUpIter.Next()

var event roachpb.RangeFeedEvent
event.MustSetValue(&roachpb.RangeFeedValue{
Expand Down Expand Up @@ -447,7 +447,7 @@ func (r *registration) Range() interval.Range {
}

func (r registration) String() string {
return fmt.Sprintf("[%s @ %s+]", r.span, r.catchupTimestamp)
return fmt.Sprintf("[%s @ %s+]", r.span, r.catchUpTimestamp)
}

// registry holds a set of registrations and manages their lifecycle.
Expand Down Expand Up @@ -512,7 +512,7 @@ func (reg *registry) PublishToOverlapping(span roachpb.Span, event *roachpb.Rang
reg.forOverlappingRegs(span, func(r *registration) (bool, *roachpb.Error) {
// Don't publish events if they are equal to or less
// than the registration's starting timestamp.
if r.catchupTimestamp.Less(minTS) {
if r.catchUpTimestamp.Less(minTS) {
r.publish(event)
}
return false, nil
Expand Down Expand Up @@ -606,25 +606,25 @@ func (r *registration) waitForCaughtUp() error {
// maybeConstructCatchUpIter calls the catchUpIterConstructor and attaches
// the catchUpIter to be detached in the catchUpScan or closed on disconnect.
func (r *registration) maybeConstructCatchUpIter() {
if r.catchupIterConstructor == nil {
if r.catchUpIterConstructor == nil {
return
}

ci := r.catchupIterConstructor()
r.catchupIterConstructor = nil
catchUpIter := r.catchUpIterConstructor()
r.catchUpIterConstructor = nil

r.mu.Lock()
defer r.mu.Unlock()
r.mu.catchupIter = ci
r.mu.catchUpIter = catchUpIter
}

// detachCatchUpIter detaches the catchupIter that was previously attached.
// detachCatchUpIter detaches the catchUpIter that was previously attached.
func (r *registration) detachCatchUpIter() storage.SimpleMVCCIterator {
r.mu.Lock()
defer r.mu.Unlock()
catchupIter := r.mu.catchupIter
r.mu.catchupIter = nil
return catchupIter
catchUpIter := r.mu.catchUpIter
r.mu.catchUpIter = nil
return catchUpIter
}

// waitForCaughtUp waits for all registrations overlapping the given span to
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,10 +265,10 @@ func TestRegistrationCatchUpScan(t *testing.T) {
EndKey: roachpb.Key("w"),
}, hlc.Timestamp{WallTime: 4}, iter, true /* withDiff */)

require.Zero(t, r.metrics.RangeFeedCatchupScanNanos.Count())
require.NoError(t, r.maybeRunCatchupScan())
require.Zero(t, r.metrics.RangeFeedCatchUpScanNanos.Count())
require.NoError(t, r.maybeRunCatchUpScan())
require.True(t, iter.closed)
require.NotZero(t, r.metrics.RangeFeedCatchupScanNanos.Count())
require.NotZero(t, r.metrics.RangeFeedCatchUpScanNanos.Count())

// Compare the events sent on the registration's Stream to the expected events.
expEvents := []*roachpb.RangeFeedEvent{
Expand Down Expand Up @@ -566,14 +566,14 @@ func TestRegistrationString(t *testing.T) {
{
r: registration{
span: roachpb.Span{Key: roachpb.Key("d")},
catchupTimestamp: hlc.Timestamp{WallTime: 10, Logical: 1},
catchUpTimestamp: hlc.Timestamp{WallTime: 10, Logical: 1},
},
exp: `[d @ 0.000000010,1+]`,
},
{
r: registration{span: roachpb.Span{
Key: roachpb.Key("d"), EndKey: roachpb.Key("z")},
catchupTimestamp: hlc.Timestamp{WallTime: 40, Logical: 9},
catchUpTimestamp: hlc.Timestamp{WallTime: 40, Logical: 9},
},
exp: `[{d-z} @ 0.000000040,9+]`,
},
Expand Down
20 changes: 10 additions & 10 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (i iteratorWithCloser) Close() {
// RangeFeed registers a rangefeed over the specified span. It sends updates to
// the provided stream and returns with an optional error when the rangefeed is
// complete. The provided ConcurrentRequestLimiter is used to limit the number
// of rangefeeds using catchup iterators at the same time.
// of rangefeeds using catch-up iterators at the same time.
func (r *Replica) RangeFeed(
args *roachpb.RangeFeedRequest, stream roachpb.Internal_RangeFeedServer,
) *roachpb.Error {
Expand Down Expand Up @@ -188,21 +188,21 @@ func (r *Replica) rangeFeedWithRangeID(

// If we will be using a catch-up iterator, wait for the limiter here before
// locking raftMu.
usingCatchupIter := false
usingCatchUpIter := false
var iterSemRelease func()
if !args.Timestamp.IsEmpty() {
usingCatchupIter = true
usingCatchUpIter = true
alloc, err := r.store.limiters.ConcurrentRangefeedIters.Begin(ctx)
if err != nil {
return roachpb.NewError(err)
}
// Finish the iterator limit if we exit before the iterator finishes.
// The release function will be hooked into the Close method on the
// iterator below. The sync.Once prevents any races between exiting early
// from this call and finishing the catchup scan underneath the
// from this call and finishing the catch-up scan underneath the
// rangefeed.Processor. We need to release here in case we fail to
// register the processor, or, more perniciously, in the case where the
// processor gets registered by shut down before starting the catchup
// processor gets registered by shut down before starting the catch-up
// scan.
var iterSemReleaseOnce sync.Once
iterSemRelease = func() {
Expand All @@ -223,7 +223,7 @@ func (r *Replica) rangeFeedWithRangeID(

// Register the stream with a catch-up iterator.
var catchUpIterFunc rangefeed.IteratorConstructor
if usingCatchupIter {
if usingCatchUpIter {
catchUpIterFunc = func() storage.SimpleMVCCIterator {
// Assert that we still hold the raftMu when this is called to ensure
// that the catchUpIter reads from the current snapshot.
Expand All @@ -234,7 +234,7 @@ func (r *Replica) rangeFeedWithRangeID(
// performance optimization. However, they've had correctness issues in
// the past (#28358, #34819) and no-one has the time for the due-diligence
// necessary to be confidant in their correctness going forward. Not using
// them causes the total time spent in RangeFeed catchup on changefeed
// them causes the total time spent in RangeFeed catch-up on changefeed
// over tpcc-1000 to go from 40s -> 4853s, which is quite large but still
// workable. See #35122 for details.
// MinTimestampHint: args.Timestamp,
Expand Down Expand Up @@ -328,7 +328,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
ctx context.Context,
span roachpb.RSpan,
startTS hlc.Timestamp,
catchupIter rangefeed.IteratorConstructor,
catchUpIter rangefeed.IteratorConstructor,
withDiff bool,
stream rangefeed.Stream,
errC chan<- *roachpb.Error,
Expand All @@ -339,7 +339,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
r.rangefeedMu.Lock()
p := r.rangefeedMu.proc
if p != nil {
reg, filter := p.Register(span, startTS, catchupIter, withDiff, stream, errC)
reg, filter := p.Register(span, startTS, catchUpIter, withDiff, stream, errC)
if reg {
// Registered successfully with an existing processor.
// Update the rangefeed filter to avoid filtering ops
Expand Down Expand Up @@ -398,7 +398,7 @@ func (r *Replica) registerWithRangefeedRaftMuLocked(
// any other goroutines are able to stop the processor. In other words,
// this ensures that the only time the registration fails is during
// server shutdown.
reg, filter := p.Register(span, startTS, catchupIter, withDiff, stream, errC)
reg, filter := p.Register(span, startTS, catchUpIter, withDiff, stream, errC)
if !reg {
select {
case <-r.store.Stopper().ShouldQuiesce():
Expand Down

0 comments on commit 9ff249e

Please sign in to comment.