From d328d13e6d945d9fe967da4035509933dfd63ee2 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Wed, 6 Mar 2019 10:34:29 -0800 Subject: [PATCH 1/2] rangefeed: add `kv.rangefeed.catchup_scan_nanos` metric The RangeFeed catchup scan will be much more expensive without time-bound iterator. If a user is having changefeed performance issues, this metric will quickly identify if that's because of RangeFeed catchup scans. The reason for the catchup scans (splits rebalances) should be pretty obvious from other graphs at that point. Release note: None --- pkg/storage/metrics.go | 7 +++++ pkg/storage/rangefeed/metrics.go | 41 ++++++++++++++++++++++++++ pkg/storage/rangefeed/processor.go | 6 +++- pkg/storage/rangefeed/registry.go | 6 ++++ pkg/storage/rangefeed/registry_test.go | 3 ++ pkg/storage/replica_rangefeed.go | 1 + 6 files changed, 63 insertions(+), 1 deletion(-) create mode 100644 pkg/storage/rangefeed/metrics.go diff --git a/pkg/storage/metrics.go b/pkg/storage/metrics.go index fb28bc6dddf9..c2397e2f39d9 100644 --- a/pkg/storage/metrics.go +++ b/pkg/storage/metrics.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/rangefeed" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -1139,6 +1140,9 @@ type StoreMetrics struct { // EncryptionAlgorithm is an enum representing the cipher in use, so we use a gauge. EncryptionAlgorithm *metric.Gauge + // RangeFeed counts. + RangeFeedMetrics *rangefeed.Metrics + // Stats for efficient merges. mu struct { syncutil.Mutex @@ -1335,6 +1339,9 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { // Encryption-at-rest. EncryptionAlgorithm: metric.NewGauge(metaEncryptionAlgorithm), + + // RangeFeed counters. + RangeFeedMetrics: rangefeed.NewMetrics(), } sm.raftRcvdMessages[raftpb.MsgProp] = sm.RaftRcvdMsgProp diff --git a/pkg/storage/rangefeed/metrics.go b/pkg/storage/rangefeed/metrics.go new file mode 100644 index 000000000000..4a766d785680 --- /dev/null +++ b/pkg/storage/rangefeed/metrics.go @@ -0,0 +1,41 @@ +// Copyright 2019 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package rangefeed + +import "github.com/cockroachdb/cockroach/pkg/util/metric" + +var ( + metaRangeFeedCatchupScanNanos = metric.Metadata{ + Name: "kv.rangefeed.catchup_scan_nanos", + Help: "Time spent in RangeFeed catchup scan", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } +) + +// Metrics are for production monitoring of RangeFeeds. +type Metrics struct { + RangeFeedCatchupScanNanos *metric.Counter +} + +// MetricStruct implements the metric.Struct interface. +func (*Metrics) MetricStruct() {} + +// NewMetrics makes the metrics for RangeFeeds monitoring. +func NewMetrics() *Metrics { + return &Metrics{ + RangeFeedCatchupScanNanos: metric.NewCounter(metaRangeFeedCatchupScanNanos), + } +} diff --git a/pkg/storage/rangefeed/processor.go b/pkg/storage/rangefeed/processor.go index bbb4b6063e58..5cc8a7489743 100644 --- a/pkg/storage/rangefeed/processor.go +++ b/pkg/storage/rangefeed/processor.go @@ -75,6 +75,9 @@ type Config struct { // CheckStreamsInterval specifies interval at which a Processor will check // all streams to make sure they have not been canceled. CheckStreamsInterval time.Duration + + // Metrics is for production monitoring of RangeFeeds. + Metrics *Metrics } // SetDefaults initializes unset fields in Config to values @@ -354,7 +357,8 @@ func (p *Processor) Register( p.syncEventC() r := newRegistration( - span.AsRawSpanWithNoLocals(), startTS, catchupIter, p.Config.EventChanCap, stream, errC, + span.AsRawSpanWithNoLocals(), startTS, catchupIter, p.Config.EventChanCap, + p.Metrics, stream, errC, ) select { case p.regC <- r: diff --git a/pkg/storage/rangefeed/registry.go b/pkg/storage/rangefeed/registry.go index 966a7afbefd3..4208d8b668f6 100644 --- a/pkg/storage/rangefeed/registry.go +++ b/pkg/storage/rangefeed/registry.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/pkg/errors" ) @@ -59,6 +60,7 @@ type registration struct { span roachpb.Span catchupIter engine.SimpleIterator catchupTimestamp hlc.Timestamp + metrics *Metrics // Output. stream Stream @@ -89,12 +91,14 @@ func newRegistration( startTS hlc.Timestamp, catchupIter engine.SimpleIterator, bufferSz int, + metrics *Metrics, stream Stream, errC chan<- *roachpb.Error, ) registration { r := registration{ span: span, catchupIter: catchupIter, + metrics: metrics, stream: stream, errC: errC, buf: make(chan *roachpb.RangeFeedEvent, bufferSz), @@ -206,9 +210,11 @@ func (r *registration) runCatchupScan() error { if r.catchupIter == nil { return nil } + start := timeutil.Now() defer func() { r.catchupIter.Close() r.catchupIter = nil + r.metrics.RangeFeedCatchupScanNanos.Inc(timeutil.Since(start).Nanoseconds()) }() var a bufalloc.ByteAllocator diff --git a/pkg/storage/rangefeed/registry_test.go b/pkg/storage/rangefeed/registry_test.go index 073e4c241b03..09f47ac747b3 100644 --- a/pkg/storage/rangefeed/registry_test.go +++ b/pkg/storage/rangefeed/registry_test.go @@ -110,6 +110,7 @@ func newTestRegistration( ts, catchup, 5, + NewMetrics(), s, errC, ), @@ -244,8 +245,10 @@ func TestRegistrationCatchUpScan(t *testing.T) { EndKey: roachpb.Key("w"), }, hlc.Timestamp{WallTime: 4}, iter) + require.Zero(t, r.metrics.RangeFeedCatchupScanNanos.Count()) require.NoError(t, r.runCatchupScan()) require.True(t, iter.closed) + require.NotZero(t, r.metrics.RangeFeedCatchupScanNanos.Count()) // Compare the events sent on the registration's Stream to the expected events. expEvents := []*roachpb.RangeFeedEvent{ diff --git a/pkg/storage/replica_rangefeed.go b/pkg/storage/replica_rangefeed.go index db2c1dbdcccc..c41cc154b60a 100644 --- a/pkg/storage/replica_rangefeed.go +++ b/pkg/storage/replica_rangefeed.go @@ -254,6 +254,7 @@ func (r *Replica) maybeInitRangefeedRaftMuLocked() *rangefeed.Processor { TxnPusher: &tp, EventChanCap: 256, EventChanTimeout: 50 * time.Millisecond, + Metrics: r.store.metrics.RangeFeedMetrics, } r.raftMu.rangefeed = rangefeed.NewProcessor(cfg) r.store.addReplicaWithRangefeed(r.RangeID) From 1041ca63182813c6057105eae41985730dd23b45 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Wed, 6 Mar 2019 10:54:41 -0800 Subject: [PATCH 2/2] rangefeed: stop using time-bound iterator for catchup scan RangeFeed originally intended to use the time-bound iterator performance optimization. However, they've had correctness issues in the past (#28358, #34819) and no-one has the time for the due-diligence necessary to be confidant in their correctness going forward. Not using them causes the total time spent in RangeFeed catchup on changefeed over tpcc-1000 to go from 40s -> 4853s, which is quite large but still workable. Closes #35122 Release note (enterprise change): In exchange for increased correctness confidance, `CHANGEFEED`s using `changefeed.push.enabled` (the default) now take slightly more resources on startup and range rebalancing/splits. --- pkg/storage/replica_rangefeed.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/pkg/storage/replica_rangefeed.go b/pkg/storage/replica_rangefeed.go index c41cc154b60a..f961920b958b 100644 --- a/pkg/storage/replica_rangefeed.go +++ b/pkg/storage/replica_rangefeed.go @@ -212,8 +212,15 @@ func (r *Replica) RangeFeed( var catchUpIter engine.SimpleIterator if usingCatchupIter { innerIter := r.Engine().NewIterator(engine.IterOptions{ - UpperBound: args.Span.EndKey, - MinTimestampHint: args.Timestamp, + UpperBound: args.Span.EndKey, + // RangeFeed originally intended to use the time-bound iterator + // performance optimization. However, they've had correctness issues in + // the past (#28358, #34819) and no-one has the time for the due-diligence + // necessary to be confidant in their correctness going forward. Not using + // them causes the total time spent in RangeFeed catchup on changefeed + // over tpcc-1000 to go from 40s -> 4853s, which is quite large but still + // workable. See #35122 for details. + // MinTimestampHint: args.Timestamp, }) catchUpIter = iteratorWithCloser{ SimpleIterator: innerIter,