Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/kvserver/rangefeed: fix catchupIter construction synchronization #69613

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/rangefeed/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

var (
metaRangeFeedCatchupScanNanos = metric.Metadata{
metaRangeFeedCatchUpScanNanos = metric.Metadata{
Name: "kv.rangefeed.catchup_scan_nanos",
Help: "Time spent in RangeFeed catchup scan",
Measurement: "Nanoseconds",
Expand All @@ -29,7 +29,7 @@ var (

// Metrics are for production monitoring of RangeFeeds.
type Metrics struct {
RangeFeedCatchupScanNanos *metric.Counter
RangeFeedCatchUpScanNanos *metric.Counter

RangeFeedSlowClosedTimestampLogN log.EveryN
RangeFeedSlowClosedTimestampNudge singleflight.Group
Expand All @@ -46,7 +46,7 @@ func (*Metrics) MetricStruct() {}
// NewMetrics makes the metrics for RangeFeeds monitoring.
func NewMetrics() *Metrics {
return &Metrics{
RangeFeedCatchupScanNanos: metric.NewCounter(metaRangeFeedCatchupScanNanos),
RangeFeedCatchUpScanNanos: metric.NewCounter(metaRangeFeedCatchUpScanNanos),
RangeFeedSlowClosedTimestampLogN: log.Every(5 * time.Second),
RangeFeedSlowClosedTimestampNudgeSem: make(chan struct{}, 1024),
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,11 @@ func (p *Processor) run(
log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span)
}

// Construct the catchUpIter before notifying the registration that it
// has been registered. Note that if the catchUpScan is never run, then
// the iterator constructed here will be closed in disconnect.
r.maybeConstructCatchUpIter()

// Add the new registration to the registry.
p.reg.Register(&r)

Expand Down Expand Up @@ -394,7 +399,7 @@ func (p *Processor) sendStop(pErr *roachpb.Error) {
func (p *Processor) Register(
span roachpb.RSpan,
startTS hlc.Timestamp,
catchupIterConstructor IteratorConstructor,
catchUpIterConstructor IteratorConstructor,
withDiff bool,
stream Stream,
errC chan<- *roachpb.Error,
Expand All @@ -405,7 +410,7 @@ func (p *Processor) Register(
p.syncEventC()

r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchupIterConstructor, withDiff,
span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff,
p.Config.EventChanCap, p.Metrics, stream, errC,
)
select {
Expand Down
102 changes: 71 additions & 31 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,19 @@ type Stream interface {
// has finished.
type registration struct {
// Input.
span roachpb.Span
catchupTimestamp hlc.Timestamp
catchupIterConstructor func() storage.SimpleMVCCIterator
withDiff bool
metrics *Metrics
span roachpb.Span
catchUpTimestamp hlc.Timestamp
withDiff bool
metrics *Metrics

// catchUpIterConstructor is used to construct the catchUpIter if necessary.
// The reason this constructor is plumbed down is to make sure that the
// iterator does not get constructed too late in server shutdown. However,
// it must also be stored in the struct to ensure that it is not constructed
// too late, after the raftMu has been dropped. Thus, this function, if
// non-nil, will be used to populate mu.catchUpIter while the registration
// is being registered by the processor.
catchUpIterConstructor func() storage.SimpleMVCCIterator

// Output.
stream Stream
Expand All @@ -80,13 +88,18 @@ type registration struct {
// Management of the output loop goroutine, used to ensure proper teardown.
outputLoopCancelFn func()
disconnected bool

// catchUpIter is populated on the Processor's goroutine while the
// Replica.raftMu is still held. If it is non-nil at the time that
// disconnect is called, it is closed by disconnect.
catchUpIter storage.SimpleMVCCIterator
}
}

func newRegistration(
span roachpb.Span,
startTS hlc.Timestamp,
catchupIterConstructor func() storage.SimpleMVCCIterator,
catchUpIterConstructor func() storage.SimpleMVCCIterator,
withDiff bool,
bufferSz int,
metrics *Metrics,
Expand All @@ -95,8 +108,8 @@ func newRegistration(
) registration {
r := registration{
span: span,
catchupTimestamp: startTS,
catchupIterConstructor: catchupIterConstructor,
catchUpTimestamp: startTS,
catchUpIterConstructor: catchUpIterConstructor,
withDiff: withDiff,
metrics: metrics,
stream: stream,
Expand All @@ -110,7 +123,7 @@ func newRegistration(

// publish attempts to send a single event to the output buffer for this
// registration. If the output buffer is full, the overflowed flag is set,
// indicating that live events were lost and a catchup scan should be initiated.
// indicating that live events were lost and a catch-up scan should be initiated.
// If overflowed is already set, events are ignored and not written to the
// buffer.
func (r *registration) publish(event *roachpb.RangeFeedEvent) {
Expand Down Expand Up @@ -210,6 +223,10 @@ func (r *registration) disconnect(pErr *roachpb.Error) {
r.mu.Lock()
defer r.mu.Unlock()
if !r.mu.disconnected {
if r.mu.catchUpIter != nil {
r.mu.catchUpIter.Close()
r.mu.catchUpIter = nil
}
if r.mu.outputLoopCancelFn != nil {
r.mu.outputLoopCancelFn()
}
Expand All @@ -232,7 +249,7 @@ func (r *registration) disconnect(pErr *roachpb.Error) {
// have been emitted.
func (r *registration) outputLoop(ctx context.Context) error {
// If the registration has a catch-up scan, run it.
if err := r.maybeRunCatchupScan(); err != nil {
if err := r.maybeRunCatchUpScan(); err != nil {
err = errors.Wrap(err, "catch-up scan failed")
log.Errorf(ctx, "%v", err)
return err
Expand Down Expand Up @@ -277,23 +294,22 @@ func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.Ran
r.disconnect(roachpb.NewError(err))
}

// maybeRunCatchupScan starts a catchup scan which will output entries for all
// recorded changes in the replica that are newer than the catchupTimestamp.
// maybeRunCatchUpScan starts a catch-up scan which will output entries for all
// recorded changes in the replica that are newer than the catchUpTimestamp.
// This uses the iterator provided when the registration was originally created;
// after the scan completes, the iterator will be closed.
//
// If the registration does not have a catchUpIteratorConstructor, this method
// is a no-op.
func (r *registration) maybeRunCatchupScan() error {
if r.catchupIterConstructor == nil {
func (r *registration) maybeRunCatchUpScan() error {
catchUpIter := r.detachCatchUpIter()
if catchUpIter == nil {
return nil
}
catchupIter := r.catchupIterConstructor()
r.catchupIterConstructor = nil
start := timeutil.Now()
defer func() {
catchupIter.Close()
r.metrics.RangeFeedCatchupScanNanos.Inc(timeutil.Since(start).Nanoseconds())
catchUpIter.Close()
r.metrics.RangeFeedCatchUpScanNanos.Inc(timeutil.Since(start).Nanoseconds())
}()

var a bufalloc.ByteAllocator
Expand Down Expand Up @@ -330,23 +346,23 @@ func (r *registration) maybeRunCatchupScan() error {
// versions of each key that are after the registration's startTS, so we
// can't use NextKey.
var meta enginepb.MVCCMetadata
catchupIter.SeekGE(startKey)
catchUpIter.SeekGE(startKey)
for {
if ok, err := catchupIter.Valid(); err != nil {
if ok, err := catchUpIter.Valid(); err != nil {
return err
} else if !ok || !catchupIter.UnsafeKey().Less(endKey) {
} else if !ok || !catchUpIter.UnsafeKey().Less(endKey) {
break
}

unsafeKey := catchupIter.UnsafeKey()
unsafeVal := catchupIter.UnsafeValue()
unsafeKey := catchUpIter.UnsafeKey()
unsafeVal := catchUpIter.UnsafeValue()
if !unsafeKey.IsValue() {
// Found a metadata key.
if err := protoutil.Unmarshal(unsafeVal, &meta); err != nil {
return errors.Wrapf(err, "unmarshaling mvcc meta: %v", unsafeKey)
}
if !meta.IsInline() {
// This is an MVCCMetadata key for an intent. The catchup scan
// This is an MVCCMetadata key for an intent. The catch-up scan
// only cares about committed values, so ignore this and skip
// past the corresponding provisional key-value. To do this,
// scan to the timestamp immediately before (i.e. the key
Expand All @@ -355,7 +371,7 @@ func (r *registration) maybeRunCatchupScan() error {
// Make a copy since should not pass an unsafe key from the iterator
// that provided it, when asking it to seek.
a, unsafeKey.Key = a.Copy(unsafeKey.Key, 0)
catchupIter.SeekGE(storage.MVCCKey{
catchUpIter.SeekGE(storage.MVCCKey{
Key: unsafeKey.Key,
Timestamp: meta.Timestamp.ToTimestamp().Prev(),
})
Expand All @@ -382,11 +398,11 @@ func (r *registration) maybeRunCatchupScan() error {

// Ignore the version if it's not inline and its timestamp is at
// or before the registration's (exclusive) starting timestamp.
ignore := !(ts.IsEmpty() || r.catchupTimestamp.Less(ts))
ignore := !(ts.IsEmpty() || r.catchUpTimestamp.Less(ts))
if ignore && !r.withDiff {
// Skip all the way to the next key.
// NB: fast-path to avoid value copy when !r.withDiff.
catchupIter.NextKey()
catchUpIter.NextKey()
continue
}

Expand All @@ -399,10 +415,10 @@ func (r *registration) maybeRunCatchupScan() error {

if ignore {
// Skip all the way to the next key.
catchupIter.NextKey()
catchUpIter.NextKey()
} else {
// Move to the next version of this key.
catchupIter.Next()
catchUpIter.Next()

var event roachpb.RangeFeedEvent
event.MustSetValue(&roachpb.RangeFeedValue{
Expand Down Expand Up @@ -431,7 +447,7 @@ func (r *registration) Range() interval.Range {
}

func (r registration) String() string {
return fmt.Sprintf("[%s @ %s+]", r.span, r.catchupTimestamp)
return fmt.Sprintf("[%s @ %s+]", r.span, r.catchUpTimestamp)
}

// registry holds a set of registrations and manages their lifecycle.
Expand Down Expand Up @@ -496,7 +512,7 @@ func (reg *registry) PublishToOverlapping(span roachpb.Span, event *roachpb.Rang
reg.forOverlappingRegs(span, func(r *registration) (bool, *roachpb.Error) {
// Don't publish events if they are equal to or less
// than the registration's starting timestamp.
if r.catchupTimestamp.Less(minTS) {
if r.catchUpTimestamp.Less(minTS) {
r.publish(event)
}
return false, nil
Expand Down Expand Up @@ -587,6 +603,30 @@ func (r *registration) waitForCaughtUp() error {
return errors.Errorf("registration %v failed to empty in time", r.Range())
}

// maybeConstructCatchUpIter calls the catchUpIterConstructor and attaches
// the catchUpIter to be detached in the catchUpScan or closed on disconnect.
func (r *registration) maybeConstructCatchUpIter() {
if r.catchUpIterConstructor == nil {
return
}

catchUpIter := r.catchUpIterConstructor()
r.catchUpIterConstructor = nil

r.mu.Lock()
defer r.mu.Unlock()
r.mu.catchUpIter = catchUpIter
}

// detachCatchUpIter detaches the catchUpIter that was previously attached.
func (r *registration) detachCatchUpIter() storage.SimpleMVCCIterator {
r.mu.Lock()
defer r.mu.Unlock()
catchUpIter := r.mu.catchUpIter
r.mu.catchUpIter = nil
return catchUpIter
}

// waitForCaughtUp waits for all registrations overlapping the given span to
// completely process their internal buffers.
func (reg *registry) waitForCaughtUp(span roachpb.Span) error {
Expand Down
36 changes: 19 additions & 17 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,21 @@ func newTestRegistration(
) *testRegistration {
s := newTestStream()
errC := make(chan *roachpb.Error, 1)
r := newRegistration(
span,
ts,
makeIteratorConstructor(catchup),
withDiff,
5,
NewMetrics(),
s,
errC,
)
r.maybeConstructCatchUpIter()
return &testRegistration{
registration: newRegistration(
span,
ts,
makeIteratorConstructor(catchup),
withDiff,
5,
NewMetrics(),
s,
errC,
),
stream: s,
errC: errC,
registration: r,
stream: s,
errC: errC,
}
}

Expand Down Expand Up @@ -263,10 +265,10 @@ func TestRegistrationCatchUpScan(t *testing.T) {
EndKey: roachpb.Key("w"),
}, hlc.Timestamp{WallTime: 4}, iter, true /* withDiff */)

require.Zero(t, r.metrics.RangeFeedCatchupScanNanos.Count())
require.NoError(t, r.maybeRunCatchupScan())
require.Zero(t, r.metrics.RangeFeedCatchUpScanNanos.Count())
require.NoError(t, r.maybeRunCatchUpScan())
require.True(t, iter.closed)
require.NotZero(t, r.metrics.RangeFeedCatchupScanNanos.Count())
require.NotZero(t, r.metrics.RangeFeedCatchUpScanNanos.Count())

// Compare the events sent on the registration's Stream to the expected events.
expEvents := []*roachpb.RangeFeedEvent{
Expand Down Expand Up @@ -564,14 +566,14 @@ func TestRegistrationString(t *testing.T) {
{
r: registration{
span: roachpb.Span{Key: roachpb.Key("d")},
catchupTimestamp: hlc.Timestamp{WallTime: 10, Logical: 1},
catchUpTimestamp: hlc.Timestamp{WallTime: 10, Logical: 1},
},
exp: `[d @ 0.000000010,1+]`,
},
{
r: registration{span: roachpb.Span{
Key: roachpb.Key("d"), EndKey: roachpb.Key("z")},
catchupTimestamp: hlc.Timestamp{WallTime: 40, Logical: 9},
catchUpTimestamp: hlc.Timestamp{WallTime: 40, Logical: 9},
},
exp: `[{d-z} @ 0.000000040,9+]`,
},
Expand Down
Loading