Skip to content

Commit

Permalink
Merge #35470
Browse files Browse the repository at this point in the history
35470: rangefeed: stop using time-bound iterator for catchup scan r=tbg a=danhhz

RangeFeed originally intended to use the time-bound iterator
performance optimization. However, they've had correctness issues in
the past (#28358, #34819) and no-one has the time for the due-diligence
necessary to be confidant in their correctness going forward. Not using
them causes the total time spent in RangeFeed catchup on changefeed
over tpcc-1000 to go from 40s -> 4853s, which is quite large but still
workable.

Closes #35122

Release note (enterprise change): In exchange for increased correctness
confidance, `CHANGEFEED`s using `changefeed.push.enabled` (the default)
now take slightly more resources on startup and range
rebalancing/splits.

Co-authored-by: Daniel Harrison <[email protected]>
  • Loading branch information
craig[bot] and danhhz committed Mar 12, 2019
2 parents 51179af + 1041ca6 commit 57c5b2b
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 3 deletions.
7 changes: 7 additions & 0 deletions pkg/storage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/rangefeed"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -1139,6 +1140,9 @@ type StoreMetrics struct {
// EncryptionAlgorithm is an enum representing the cipher in use, so we use a gauge.
EncryptionAlgorithm *metric.Gauge

// RangeFeed counts.
RangeFeedMetrics *rangefeed.Metrics

// Stats for efficient merges.
mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -1335,6 +1339,9 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {

// Encryption-at-rest.
EncryptionAlgorithm: metric.NewGauge(metaEncryptionAlgorithm),

// RangeFeed counters.
RangeFeedMetrics: rangefeed.NewMetrics(),
}

sm.raftRcvdMessages[raftpb.MsgProp] = sm.RaftRcvdMsgProp
Expand Down
41 changes: 41 additions & 0 deletions pkg/storage/rangefeed/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright 2019 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package rangefeed

import "github.com/cockroachdb/cockroach/pkg/util/metric"

var (
metaRangeFeedCatchupScanNanos = metric.Metadata{
Name: "kv.rangefeed.catchup_scan_nanos",
Help: "Time spent in RangeFeed catchup scan",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
)

// Metrics are for production monitoring of RangeFeeds.
type Metrics struct {
RangeFeedCatchupScanNanos *metric.Counter
}

// MetricStruct implements the metric.Struct interface.
func (*Metrics) MetricStruct() {}

// NewMetrics makes the metrics for RangeFeeds monitoring.
func NewMetrics() *Metrics {
return &Metrics{
RangeFeedCatchupScanNanos: metric.NewCounter(metaRangeFeedCatchupScanNanos),
}
}
6 changes: 5 additions & 1 deletion pkg/storage/rangefeed/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type Config struct {
// CheckStreamsInterval specifies interval at which a Processor will check
// all streams to make sure they have not been canceled.
CheckStreamsInterval time.Duration

// Metrics is for production monitoring of RangeFeeds.
Metrics *Metrics
}

// SetDefaults initializes unset fields in Config to values
Expand Down Expand Up @@ -354,7 +357,8 @@ func (p *Processor) Register(
p.syncEventC()

r := newRegistration(
span.AsRawSpanWithNoLocals(), startTS, catchupIter, p.Config.EventChanCap, stream, errC,
span.AsRawSpanWithNoLocals(), startTS, catchupIter, p.Config.EventChanCap,
p.Metrics, stream, errC,
)
select {
case p.regC <- r:
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/rangefeed/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -59,6 +60,7 @@ type registration struct {
span roachpb.Span
catchupIter engine.SimpleIterator
catchupTimestamp hlc.Timestamp
metrics *Metrics

// Output.
stream Stream
Expand Down Expand Up @@ -89,12 +91,14 @@ func newRegistration(
startTS hlc.Timestamp,
catchupIter engine.SimpleIterator,
bufferSz int,
metrics *Metrics,
stream Stream,
errC chan<- *roachpb.Error,
) registration {
r := registration{
span: span,
catchupIter: catchupIter,
metrics: metrics,
stream: stream,
errC: errC,
buf: make(chan *roachpb.RangeFeedEvent, bufferSz),
Expand Down Expand Up @@ -206,9 +210,11 @@ func (r *registration) runCatchupScan() error {
if r.catchupIter == nil {
return nil
}
start := timeutil.Now()
defer func() {
r.catchupIter.Close()
r.catchupIter = nil
r.metrics.RangeFeedCatchupScanNanos.Inc(timeutil.Since(start).Nanoseconds())
}()

var a bufalloc.ByteAllocator
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func newTestRegistration(
ts,
catchup,
5,
NewMetrics(),
s,
errC,
),
Expand Down Expand Up @@ -244,8 +245,10 @@ func TestRegistrationCatchUpScan(t *testing.T) {
EndKey: roachpb.Key("w"),
}, hlc.Timestamp{WallTime: 4}, iter)

require.Zero(t, r.metrics.RangeFeedCatchupScanNanos.Count())
require.NoError(t, r.runCatchupScan())
require.True(t, iter.closed)
require.NotZero(t, r.metrics.RangeFeedCatchupScanNanos.Count())

// Compare the events sent on the registration's Stream to the expected events.
expEvents := []*roachpb.RangeFeedEvent{
Expand Down
12 changes: 10 additions & 2 deletions pkg/storage/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,15 @@ func (r *Replica) RangeFeed(
var catchUpIter engine.SimpleIterator
if usingCatchupIter {
innerIter := r.Engine().NewIterator(engine.IterOptions{
UpperBound: args.Span.EndKey,
MinTimestampHint: args.Timestamp,
UpperBound: args.Span.EndKey,
// RangeFeed originally intended to use the time-bound iterator
// performance optimization. However, they've had correctness issues in
// the past (#28358, #34819) and no-one has the time for the due-diligence
// necessary to be confidant in their correctness going forward. Not using
// them causes the total time spent in RangeFeed catchup on changefeed
// over tpcc-1000 to go from 40s -> 4853s, which is quite large but still
// workable. See #35122 for details.
// MinTimestampHint: args.Timestamp,
})
catchUpIter = iteratorWithCloser{
SimpleIterator: innerIter,
Expand Down Expand Up @@ -254,6 +261,7 @@ func (r *Replica) maybeInitRangefeedRaftMuLocked() *rangefeed.Processor {
TxnPusher: &tp,
EventChanCap: 256,
EventChanTimeout: 50 * time.Millisecond,
Metrics: r.store.metrics.RangeFeedMetrics,
}
r.raftMu.rangefeed = rangefeed.NewProcessor(cfg)
r.store.addReplicaWithRangefeed(r.RangeID)
Expand Down

0 comments on commit 57c5b2b

Please sign in to comment.