Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
69445: server: buff timeout of server_test in BUILD.bazel r=otan a=otan

Release justification: test only change
Release note: None

69608: sql/catalog/multiregion: move multiregion validation to its package r=otan a=ajwerner

This code and these invariants are seen as the responsibility of the
multiregion team, so, let's move them to a package that that team owns.

Release justification: Low risk code movement.

Release note: None

69613: kv/kvserver/rangefeed: fix catchupIter construction synchronization r=nvanbenschoten a=ajwerner

The catchupIter could have been constructed after the state of the underlying
store has changed. In general this doesn't seem like a disaster unless the
range has been removed or the gc threshold has changed and data is gone.

So, maybe it is a disaster.

Release justification: fixes for high-priority or high-severity bugs in
existing functionality

Release note: None

Co-authored-by: Oliver Tan <[email protected]>
Co-authored-by: Andrew Werner <[email protected]>
  • Loading branch information
3 people committed Aug 31, 2021
4 parents c1ef81f + 83a2ce5 + e8e963b + 9ff249e commit 0c50eee
Show file tree
Hide file tree
Showing 21 changed files with 425 additions and 337 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
/pkg/sql/tests/rsg_test.go @cockroachdb/sql-experience

/pkg/sql/catalog/ @cockroachdb/sql-schema
/pkg/sql/catalog/multiregion @cockroachdb/multiregion
/pkg/sql/doctor/ @cockroachdb/sql-schema
/pkg/sql/gcjob/ @cockroachdb/sql-schema
/pkg/sql/gcjob_test/ @cockroachdb/sql-schema
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/backupresolver/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ func DescriptorsMatchingTargets(
// Get all the types used by this table.
desc := r.DescByID[tableDesc.GetParentID()]
dbDesc := desc.(catalog.DatabaseDescriptor)
typeIDs, err := tableDesc.GetAllReferencedTypeIDs(dbDesc, getTypeByID)
typeIDs, _, err := tableDesc.GetAllReferencedTypeIDs(dbDesc, getTypeByID)
if err != nil {
return ret, err
}
Expand Down Expand Up @@ -526,7 +526,7 @@ func DescriptorsMatchingTargets(
// Get all the types used by this table.
dbRaw := r.DescByID[desc.GetParentID()]
dbDesc := dbRaw.(catalog.DatabaseDescriptor)
typeIDs, err := desc.GetAllReferencedTypeIDs(dbDesc, getTypeByID)
typeIDs, _, err := desc.GetAllReferencedTypeIDs(dbDesc, getTypeByID)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,7 @@ func createImportingDescriptors(
if err != nil {
return err
}
typeIDs, err := table.GetAllReferencedTypeIDs(dbDesc, func(id descpb.ID) (catalog.TypeDescriptor, error) {
typeIDs, _, err := table.GetAllReferencedTypeIDs(dbDesc, func(id descpb.ID) (catalog.TypeDescriptor, error) {
return typesByID[id], nil
})
if err != nil {
Expand Down Expand Up @@ -2343,7 +2343,7 @@ func (r *restoreResumer) removeExistingTypeBackReferences(
}

// Get all types that this descriptor references.
referencedTypes, err := tbl.GetAllReferencedTypeIDs(dbDesc, lookup)
referencedTypes, _, err := tbl.GetAllReferencedTypeIDs(dbDesc, lookup)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/rangefeed/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

var (
metaRangeFeedCatchupScanNanos = metric.Metadata{
metaRangeFeedCatchUpScanNanos = metric.Metadata{
Name: "kv.rangefeed.catchup_scan_nanos",
Help: "Time spent in RangeFeed catchup scan",
Measurement: "Nanoseconds",
Expand All @@ -29,7 +29,7 @@ var (

// Metrics are for production monitoring of RangeFeeds.
type Metrics struct {
RangeFeedCatchupScanNanos *metric.Counter
RangeFeedCatchUpScanNanos *metric.Counter

RangeFeedSlowClosedTimestampLogN log.EveryN
RangeFeedSlowClosedTimestampNudge singleflight.Group
Expand All @@ -46,7 +46,7 @@ func (*Metrics) MetricStruct() {}
// NewMetrics makes the metrics for RangeFeeds monitoring.
func NewMetrics() *Metrics {
return &Metrics{
RangeFeedCatchupScanNanos: metric.NewCounter(metaRangeFeedCatchupScanNanos),
RangeFeedCatchUpScanNanos: metric.NewCounter(metaRangeFeedCatchUpScanNanos),
RangeFeedSlowClosedTimestampLogN: log.Every(5 * time.Second),
RangeFeedSlowClosedTimestampNudgeSem: make(chan struct{}, 1024),
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/kv/kvserver/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,11 @@ func (p *Processor) run(
log.Fatalf(ctx, "registration %s not in Processor's key range %v", r, p.Span)
}

// Construct the catchUpIter before notifying the registration that it
// has been registered. Note that if the catchUpScan is never run, then
// the iterator constructed here will be closed in disconnect.
r.maybeConstructCatchUpIter()

// Add the new registration to the registry.
p.reg.Register(&r)

Expand Down Expand Up @@ -394,7 +399,7 @@ func (p *Processor) sendStop(pErr *roachpb.Error) {
func (p *Processor) Register(
span roachpb.RSpan,
startTS hlc.Timestamp,
catchupIterConstructor IteratorConstructor,
catchUpIterConstructor IteratorConstructor,
withDiff bool,
stream Stream,
errC chan<- *roachpb.Error,
Expand All @@ -405,7 +410,7 @@ func (p *Processor) Register(
p.syncEventC()

r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchupIterConstructor, withDiff,
span.AsRawSpanWithNoLocals(), startTS, catchUpIterConstructor, withDiff,
p.Config.EventChanCap, p.Metrics, stream, errC,
)
select {
Expand Down
102 changes: 71 additions & 31 deletions pkg/kv/kvserver/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,19 @@ type Stream interface {
// has finished.
type registration struct {
// Input.
span roachpb.Span
catchupTimestamp hlc.Timestamp
catchupIterConstructor func() storage.SimpleMVCCIterator
withDiff bool
metrics *Metrics
span roachpb.Span
catchUpTimestamp hlc.Timestamp
withDiff bool
metrics *Metrics

// catchUpIterConstructor is used to construct the catchUpIter if necessary.
// The reason this constructor is plumbed down is to make sure that the
// iterator does not get constructed too late in server shutdown. However,
// it must also be stored in the struct to ensure that it is not constructed
// too late, after the raftMu has been dropped. Thus, this function, if
// non-nil, will be used to populate mu.catchUpIter while the registration
// is being registered by the processor.
catchUpIterConstructor func() storage.SimpleMVCCIterator

// Output.
stream Stream
Expand All @@ -80,13 +88,18 @@ type registration struct {
// Management of the output loop goroutine, used to ensure proper teardown.
outputLoopCancelFn func()
disconnected bool

// catchUpIter is populated on the Processor's goroutine while the
// Replica.raftMu is still held. If it is non-nil at the time that
// disconnect is called, it is closed by disconnect.
catchUpIter storage.SimpleMVCCIterator
}
}

func newRegistration(
span roachpb.Span,
startTS hlc.Timestamp,
catchupIterConstructor func() storage.SimpleMVCCIterator,
catchUpIterConstructor func() storage.SimpleMVCCIterator,
withDiff bool,
bufferSz int,
metrics *Metrics,
Expand All @@ -95,8 +108,8 @@ func newRegistration(
) registration {
r := registration{
span: span,
catchupTimestamp: startTS,
catchupIterConstructor: catchupIterConstructor,
catchUpTimestamp: startTS,
catchUpIterConstructor: catchUpIterConstructor,
withDiff: withDiff,
metrics: metrics,
stream: stream,
Expand All @@ -110,7 +123,7 @@ func newRegistration(

// publish attempts to send a single event to the output buffer for this
// registration. If the output buffer is full, the overflowed flag is set,
// indicating that live events were lost and a catchup scan should be initiated.
// indicating that live events were lost and a catch-up scan should be initiated.
// If overflowed is already set, events are ignored and not written to the
// buffer.
func (r *registration) publish(event *roachpb.RangeFeedEvent) {
Expand Down Expand Up @@ -210,6 +223,10 @@ func (r *registration) disconnect(pErr *roachpb.Error) {
r.mu.Lock()
defer r.mu.Unlock()
if !r.mu.disconnected {
if r.mu.catchUpIter != nil {
r.mu.catchUpIter.Close()
r.mu.catchUpIter = nil
}
if r.mu.outputLoopCancelFn != nil {
r.mu.outputLoopCancelFn()
}
Expand All @@ -232,7 +249,7 @@ func (r *registration) disconnect(pErr *roachpb.Error) {
// have been emitted.
func (r *registration) outputLoop(ctx context.Context) error {
// If the registration has a catch-up scan, run it.
if err := r.maybeRunCatchupScan(); err != nil {
if err := r.maybeRunCatchUpScan(); err != nil {
err = errors.Wrap(err, "catch-up scan failed")
log.Errorf(ctx, "%v", err)
return err
Expand Down Expand Up @@ -277,23 +294,22 @@ func (r *registration) runOutputLoop(ctx context.Context, _forStacks roachpb.Ran
r.disconnect(roachpb.NewError(err))
}

// maybeRunCatchupScan starts a catchup scan which will output entries for all
// recorded changes in the replica that are newer than the catchupTimestamp.
// maybeRunCatchUpScan starts a catch-up scan which will output entries for all
// recorded changes in the replica that are newer than the catchUpTimestamp.
// This uses the iterator provided when the registration was originally created;
// after the scan completes, the iterator will be closed.
//
// If the registration does not have a catchUpIteratorConstructor, this method
// is a no-op.
func (r *registration) maybeRunCatchupScan() error {
if r.catchupIterConstructor == nil {
func (r *registration) maybeRunCatchUpScan() error {
catchUpIter := r.detachCatchUpIter()
if catchUpIter == nil {
return nil
}
catchupIter := r.catchupIterConstructor()
r.catchupIterConstructor = nil
start := timeutil.Now()
defer func() {
catchupIter.Close()
r.metrics.RangeFeedCatchupScanNanos.Inc(timeutil.Since(start).Nanoseconds())
catchUpIter.Close()
r.metrics.RangeFeedCatchUpScanNanos.Inc(timeutil.Since(start).Nanoseconds())
}()

var a bufalloc.ByteAllocator
Expand Down Expand Up @@ -330,23 +346,23 @@ func (r *registration) maybeRunCatchupScan() error {
// versions of each key that are after the registration's startTS, so we
// can't use NextKey.
var meta enginepb.MVCCMetadata
catchupIter.SeekGE(startKey)
catchUpIter.SeekGE(startKey)
for {
if ok, err := catchupIter.Valid(); err != nil {
if ok, err := catchUpIter.Valid(); err != nil {
return err
} else if !ok || !catchupIter.UnsafeKey().Less(endKey) {
} else if !ok || !catchUpIter.UnsafeKey().Less(endKey) {
break
}

unsafeKey := catchupIter.UnsafeKey()
unsafeVal := catchupIter.UnsafeValue()
unsafeKey := catchUpIter.UnsafeKey()
unsafeVal := catchUpIter.UnsafeValue()
if !unsafeKey.IsValue() {
// Found a metadata key.
if err := protoutil.Unmarshal(unsafeVal, &meta); err != nil {
return errors.Wrapf(err, "unmarshaling mvcc meta: %v", unsafeKey)
}
if !meta.IsInline() {
// This is an MVCCMetadata key for an intent. The catchup scan
// This is an MVCCMetadata key for an intent. The catch-up scan
// only cares about committed values, so ignore this and skip
// past the corresponding provisional key-value. To do this,
// scan to the timestamp immediately before (i.e. the key
Expand All @@ -355,7 +371,7 @@ func (r *registration) maybeRunCatchupScan() error {
// Make a copy since should not pass an unsafe key from the iterator
// that provided it, when asking it to seek.
a, unsafeKey.Key = a.Copy(unsafeKey.Key, 0)
catchupIter.SeekGE(storage.MVCCKey{
catchUpIter.SeekGE(storage.MVCCKey{
Key: unsafeKey.Key,
Timestamp: meta.Timestamp.ToTimestamp().Prev(),
})
Expand All @@ -382,11 +398,11 @@ func (r *registration) maybeRunCatchupScan() error {

// Ignore the version if it's not inline and its timestamp is at
// or before the registration's (exclusive) starting timestamp.
ignore := !(ts.IsEmpty() || r.catchupTimestamp.Less(ts))
ignore := !(ts.IsEmpty() || r.catchUpTimestamp.Less(ts))
if ignore && !r.withDiff {
// Skip all the way to the next key.
// NB: fast-path to avoid value copy when !r.withDiff.
catchupIter.NextKey()
catchUpIter.NextKey()
continue
}

Expand All @@ -399,10 +415,10 @@ func (r *registration) maybeRunCatchupScan() error {

if ignore {
// Skip all the way to the next key.
catchupIter.NextKey()
catchUpIter.NextKey()
} else {
// Move to the next version of this key.
catchupIter.Next()
catchUpIter.Next()

var event roachpb.RangeFeedEvent
event.MustSetValue(&roachpb.RangeFeedValue{
Expand Down Expand Up @@ -431,7 +447,7 @@ func (r *registration) Range() interval.Range {
}

func (r registration) String() string {
return fmt.Sprintf("[%s @ %s+]", r.span, r.catchupTimestamp)
return fmt.Sprintf("[%s @ %s+]", r.span, r.catchUpTimestamp)
}

// registry holds a set of registrations and manages their lifecycle.
Expand Down Expand Up @@ -496,7 +512,7 @@ func (reg *registry) PublishToOverlapping(span roachpb.Span, event *roachpb.Rang
reg.forOverlappingRegs(span, func(r *registration) (bool, *roachpb.Error) {
// Don't publish events if they are equal to or less
// than the registration's starting timestamp.
if r.catchupTimestamp.Less(minTS) {
if r.catchUpTimestamp.Less(minTS) {
r.publish(event)
}
return false, nil
Expand Down Expand Up @@ -587,6 +603,30 @@ func (r *registration) waitForCaughtUp() error {
return errors.Errorf("registration %v failed to empty in time", r.Range())
}

// maybeConstructCatchUpIter calls the catchUpIterConstructor and attaches
// the catchUpIter to be detached in the catchUpScan or closed on disconnect.
func (r *registration) maybeConstructCatchUpIter() {
if r.catchUpIterConstructor == nil {
return
}

catchUpIter := r.catchUpIterConstructor()
r.catchUpIterConstructor = nil

r.mu.Lock()
defer r.mu.Unlock()
r.mu.catchUpIter = catchUpIter
}

// detachCatchUpIter detaches the catchUpIter that was previously attached.
func (r *registration) detachCatchUpIter() storage.SimpleMVCCIterator {
r.mu.Lock()
defer r.mu.Unlock()
catchUpIter := r.mu.catchUpIter
r.mu.catchUpIter = nil
return catchUpIter
}

// waitForCaughtUp waits for all registrations overlapping the given span to
// completely process their internal buffers.
func (reg *registry) waitForCaughtUp(span roachpb.Span) error {
Expand Down
36 changes: 19 additions & 17 deletions pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,21 @@ func newTestRegistration(
) *testRegistration {
s := newTestStream()
errC := make(chan *roachpb.Error, 1)
r := newRegistration(
span,
ts,
makeIteratorConstructor(catchup),
withDiff,
5,
NewMetrics(),
s,
errC,
)
r.maybeConstructCatchUpIter()
return &testRegistration{
registration: newRegistration(
span,
ts,
makeIteratorConstructor(catchup),
withDiff,
5,
NewMetrics(),
s,
errC,
),
stream: s,
errC: errC,
registration: r,
stream: s,
errC: errC,
}
}

Expand Down Expand Up @@ -263,10 +265,10 @@ func TestRegistrationCatchUpScan(t *testing.T) {
EndKey: roachpb.Key("w"),
}, hlc.Timestamp{WallTime: 4}, iter, true /* withDiff */)

require.Zero(t, r.metrics.RangeFeedCatchupScanNanos.Count())
require.NoError(t, r.maybeRunCatchupScan())
require.Zero(t, r.metrics.RangeFeedCatchUpScanNanos.Count())
require.NoError(t, r.maybeRunCatchUpScan())
require.True(t, iter.closed)
require.NotZero(t, r.metrics.RangeFeedCatchupScanNanos.Count())
require.NotZero(t, r.metrics.RangeFeedCatchUpScanNanos.Count())

// Compare the events sent on the registration's Stream to the expected events.
expEvents := []*roachpb.RangeFeedEvent{
Expand Down Expand Up @@ -564,14 +566,14 @@ func TestRegistrationString(t *testing.T) {
{
r: registration{
span: roachpb.Span{Key: roachpb.Key("d")},
catchupTimestamp: hlc.Timestamp{WallTime: 10, Logical: 1},
catchUpTimestamp: hlc.Timestamp{WallTime: 10, Logical: 1},
},
exp: `[d @ 0.000000010,1+]`,
},
{
r: registration{span: roachpb.Span{
Key: roachpb.Key("d"), EndKey: roachpb.Key("z")},
catchupTimestamp: hlc.Timestamp{WallTime: 40, Logical: 9},
catchUpTimestamp: hlc.Timestamp{WallTime: 40, Logical: 9},
},
exp: `[{d-z} @ 0.000000040,9+]`,
},
Expand Down
Loading

0 comments on commit 0c50eee

Please sign in to comment.