From 8d3f4cfcfdfc15bd2ed8e735230ec4575e84fba4 Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sun, 5 Jun 2022 10:20:26 +0000 Subject: [PATCH] kvserver/rangefeed: always use time-bound iterators for catchup scans This patch always uses time-bound iterators for rangefeed catchup scans. Previously, this was controlled by the default-off cluster setting `kv.rangefeed.catchup_scan_iterator_optimization.enabled`, which has now been removed. These have been used in production by several users, who have seen significant performance improvements without reports of any issues. Release note (performance improvement): Changefeed catchup scans now use time-bound iterators, which improves their performance by avoiding accessing data that is outside the catchup scan time interval. Previously, this was controlled by the default-off cluster setting `kv.rangefeed.catchup_scan_iterator_optimization.enabled`, which has now been removed (it is in effect always enabled). --- pkg/cmd/roachtest/tests/cdc.go | 14 ---- pkg/kv/kvserver/rangefeed/catchup_scan.go | 54 ++++++------- .../rangefeed/catchup_scan_bench_test.go | 30 +++---- .../kvserver/rangefeed/catchup_scan_test.go | 78 +++++++++---------- pkg/kv/kvserver/replica_rangefeed.go | 12 +-- pkg/settings/registry.go | 3 + 6 files changed, 76 insertions(+), 115 deletions(-) diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index 7a08a3f4d330..d41fdfd12e9a 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -46,7 +46,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" - "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -90,19 +89,6 @@ type cdcTestArgs struct { func cdcClusterSettings(t test.Test, db *sqlutils.SQLRunner) { // kv.rangefeed.enabled is required for changefeeds to run db.Exec(t, "SET CLUSTER SETTING kv.rangefeed.enabled = true") - randomlyRun(t, db, "SET CLUSTER SETTING kv.rangefeed.catchup_scan_iterator_optimization.enabled = false") -} - -const randomSettingPercent = 0.50 - -var rng, _ = randutil.NewTestRand() - -func randomlyRun(t test.Test, db *sqlutils.SQLRunner, query string) { - if rng.Float64() < randomSettingPercent { - db.Exec(t, query) - t.L().Printf("setting non-default cluster setting: %s", query) - } - } func cdcBasicTest(ctx context.Context, t test.Test, c cluster.Cluster, args cdcTestArgs) { diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan.go b/pkg/kv/kvserver/rangefeed/catchup_scan.go index 357d4c63cfb7..226881a9a342 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan.go @@ -53,42 +53,32 @@ func (i simpleCatchupIterAdapter) NextIgnoringTime() { var _ simpleCatchupIter = simpleCatchupIterAdapter{} // NewCatchUpIterator returns a CatchUpIterator for the given Reader. -// If useTBI is true, a time-bound iterator will be used if possible, -// configured with a start time taken from the RangeFeedRequest. func NewCatchUpIterator( - reader storage.Reader, args *roachpb.RangeFeedRequest, useTBI bool, closer func(), + reader storage.Reader, args *roachpb.RangeFeedRequest, closer func(), ) *CatchUpIterator { - ret := &CatchUpIterator{ + return &CatchUpIterator{ + simpleCatchupIter: storage.NewMVCCIncrementalIterator(reader, + storage.MVCCIncrementalIterOptions{ + EnableTimeBoundIteratorOptimization: true, + EndKey: args.Span.EndKey, + // StartTime is exclusive but args.Timestamp + // is inclusive. + StartTime: args.Timestamp.Prev(), + EndTime: hlc.MaxTimestamp, + // We want to emit intents rather than error + // (the default behavior) so that we can skip + // over the provisional values during + // iteration. + IntentPolicy: storage.MVCCIncrementalIterIntentPolicyEmit, + // CatchUpScan currently emits all inline + // values it encounters. + // + // TODO(ssd): Re-evalutate if this behavior is + // still needed (#69357). + InlinePolicy: storage.MVCCIncrementalIterInlinePolicyEmit, + }), close: closer, } - if useTBI { - ret.simpleCatchupIter = storage.NewMVCCIncrementalIterator(reader, storage.MVCCIncrementalIterOptions{ - EnableTimeBoundIteratorOptimization: true, - EndKey: args.Span.EndKey, - // StartTime is exclusive but args.Timestamp - // is inclusive. - StartTime: args.Timestamp.Prev(), - EndTime: hlc.MaxTimestamp, - // We want to emit intents rather than error - // (the default behavior) so that we can skip - // over the provisional values during - // iteration. - IntentPolicy: storage.MVCCIncrementalIterIntentPolicyEmit, - // CatchUpScan currently emits all inline - // values it encounters. - // - // TODO(ssd): Re-evalutate if this behavior is - // still needed (#69357). - InlinePolicy: storage.MVCCIncrementalIterInlinePolicyEmit, - }) - } else { - iter := reader.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ - UpperBound: args.Span.EndKey, - }) - ret.simpleCatchupIter = simpleCatchupIterAdapter{SimpleMVCCIterator: iter} - } - - return ret } // Close closes the iterator and calls the instantiator-supplied close diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go index 428a6ef0dd16..a3bb5aa11097 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go @@ -52,7 +52,7 @@ func runCatchUpBenchmark(b *testing.B, emk engineMaker, opts benchOptions) { }, WithDiff: opts.withDiff, Span: span, - }, opts.useTBI, func() {}) + }, func() {}) defer iter.Close() counter := 0 err := iter.CatchUpScan(storage.MakeMVCCMetadataKey(startKey), storage.MakeMVCCMetadataKey(endKey), opts.ts, opts.withDiff, func(*roachpb.RangeFeedEvent) error { @@ -136,22 +136,17 @@ func BenchmarkCatchUpScan(b *testing.B) { for name, do := range dataOpts { b.Run(name, func(b *testing.B) { - for _, useTBI := range []bool{true, false} { - b.Run(fmt.Sprintf("useTBI=%v", useTBI), func(b *testing.B) { - for _, withDiff := range []bool{true, false} { - b.Run(fmt.Sprintf("withDiff=%v", withDiff), func(b *testing.B) { - for _, tsExcludePercent := range []float64{0.0, 0.50, 0.75, 0.95, 0.99} { - wallTime := int64((5 * (float64(numKeys)*tsExcludePercent + 1))) - ts := hlc.Timestamp{WallTime: wallTime} - b.Run(fmt.Sprintf("perc=%2.2f", tsExcludePercent*100), func(b *testing.B) { - runCatchUpBenchmark(b, setupMVCCPebble, benchOptions{ - dataOpts: do, - ts: ts, - useTBI: useTBI, - withDiff: withDiff, - }) - }) - } + for _, withDiff := range []bool{true, false} { + b.Run(fmt.Sprintf("withDiff=%v", withDiff), func(b *testing.B) { + for _, tsExcludePercent := range []float64{0.0, 0.50, 0.75, 0.95, 0.99} { + wallTime := int64((5 * (float64(numKeys)*tsExcludePercent + 1))) + ts := hlc.Timestamp{WallTime: wallTime} + b.Run(fmt.Sprintf("perc=%2.2f", tsExcludePercent*100), func(b *testing.B) { + runCatchUpBenchmark(b, setupMVCCPebble, benchOptions{ + dataOpts: do, + ts: ts, + withDiff: withDiff, + }) }) } }) @@ -170,7 +165,6 @@ type benchDataOptions struct { type benchOptions struct { ts hlc.Timestamp - useTBI bool withDiff bool dataOpts benchDataOptions } diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_test.go index 7991581958a1..b05684b44374 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan_test.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan_test.go @@ -91,46 +91,44 @@ func TestCatchupScan(t *testing.T) { if err := storage.MVCCPut(ctx, eng, nil, kv1_4_4.Key.Key, txn.ReadTimestamp, hlc.ClockTimestamp{}, val, &txn); err != nil { t.Fatal(err) } - testutils.RunTrueAndFalse(t, "useTBI", func(t *testing.T, useTBI bool) { - testutils.RunTrueAndFalse(t, "withDiff", func(t *testing.T, withDiff bool) { - iter := NewCatchUpIterator(eng, &roachpb.RangeFeedRequest{ - Header: roachpb.Header{ - // Inclusive, so want everything >= ts2 - Timestamp: ts2, - }, - Span: roachpb.Span{ - EndKey: roachpb.KeyMax, - }, - WithDiff: withDiff, - }, useTBI, nil) - defer iter.Close() - var events []roachpb.RangeFeedValue - // ts1 here is exclusive, so we do not want the versions at ts1. - require.NoError(t, iter.CatchUpScan(storage.MakeMVCCMetadataKey(testKey1), - storage.MakeMVCCMetadataKey(roachpb.KeyMax), ts1, withDiff, - func(e *roachpb.RangeFeedEvent) error { - events = append(events, *e.Val) - return nil - })) - require.Equal(t, 4, len(events)) - checkEquality := func( - kv storage.MVCCKeyValue, prevKV storage.MVCCKeyValue, event roachpb.RangeFeedValue) { - require.Equal(t, string(kv.Key.Key), string(event.Key)) - require.Equal(t, kv.Key.Timestamp, event.Value.Timestamp) - require.Equal(t, string(kv.Value), string(event.Value.RawBytes)) - if withDiff { - // TODO(sumeer): uncomment after clarifying CatchUpScan behavior. - // require.Equal(t, prevKV.Key.Timestamp, event.PrevValue.Timestamp) - require.Equal(t, string(prevKV.Value), string(event.PrevValue.RawBytes)) - } else { - require.Equal(t, hlc.Timestamp{}, event.PrevValue.Timestamp) - require.Equal(t, 0, len(event.PrevValue.RawBytes)) - } + testutils.RunTrueAndFalse(t, "withDiff", func(t *testing.T, withDiff bool) { + iter := NewCatchUpIterator(eng, &roachpb.RangeFeedRequest{ + Header: roachpb.Header{ + // Inclusive, so want everything >= ts2 + Timestamp: ts2, + }, + Span: roachpb.Span{ + EndKey: roachpb.KeyMax, + }, + WithDiff: withDiff, + }, nil) + defer iter.Close() + var events []roachpb.RangeFeedValue + // ts1 here is exclusive, so we do not want the versions at ts1. + require.NoError(t, iter.CatchUpScan(storage.MakeMVCCMetadataKey(testKey1), + storage.MakeMVCCMetadataKey(roachpb.KeyMax), ts1, withDiff, + func(e *roachpb.RangeFeedEvent) error { + events = append(events, *e.Val) + return nil + })) + require.Equal(t, 4, len(events)) + checkEquality := func( + kv storage.MVCCKeyValue, prevKV storage.MVCCKeyValue, event roachpb.RangeFeedValue) { + require.Equal(t, string(kv.Key.Key), string(event.Key)) + require.Equal(t, kv.Key.Timestamp, event.Value.Timestamp) + require.Equal(t, string(kv.Value), string(event.Value.RawBytes)) + if withDiff { + // TODO(sumeer): uncomment after clarifying CatchUpScan behavior. + // require.Equal(t, prevKV.Key.Timestamp, event.PrevValue.Timestamp) + require.Equal(t, string(prevKV.Value), string(event.PrevValue.RawBytes)) + } else { + require.Equal(t, hlc.Timestamp{}, event.PrevValue.Timestamp) + require.Equal(t, 0, len(event.PrevValue.RawBytes)) } - checkEquality(kv1_2_2, kv1_1_1, events[0]) - checkEquality(kv1_3_3, kv1_2_2, events[1]) - checkEquality(kv2_2_2, kv2_1_1, events[2]) - checkEquality(kv2_5_3, kv2_2_2, events[3]) - }) + } + checkEquality(kv1_2_2, kv1_1_1, events[0]) + checkEquality(kv1_3_3, kv1_2_2, events[1]) + checkEquality(kv2_2_2, kv2_1_1, events[2]) + checkEquality(kv2_5_3, kv2_2_2, events[3]) }) } diff --git a/pkg/kv/kvserver/replica_rangefeed.go b/pkg/kv/kvserver/replica_rangefeed.go index 495185544a4a..c98fec3ca9e3 100644 --- a/pkg/kv/kvserver/replica_rangefeed.go +++ b/pkg/kv/kvserver/replica_rangefeed.go @@ -27,7 +27,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" - "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -60,14 +59,6 @@ var RangeFeedRefreshInterval = settings.RegisterDurationSetting( settings.NonNegativeDuration, ) -// RangefeedTBIEnabled controls whether or not we use a TBI during catch-up scan. -var RangefeedTBIEnabled = settings.RegisterBoolSetting( - settings.TenantWritable, - "kv.rangefeed.catchup_scan_iterator_optimization.enabled", - "if true, rangefeeds will use time-bound iterators for catchup-scans when possible", - util.ConstantWithMetamorphicTestBool("kv.rangefeed.catchup_scan_iterator_optimization.enabled", true), -) - // lockedRangefeedStream is an implementation of rangefeed.Stream which provides // support for concurrent calls to Send. Note that the default implementation of // grpc.Stream is not safe for concurrent calls to Send. @@ -231,8 +222,7 @@ func (r *Replica) rangeFeedWithRangeID( // Assert that we still hold the raftMu when this is called to ensure // that the catchUpIter reads from the current snapshot. r.raftMu.AssertHeld() - return rangefeed.NewCatchUpIterator(r.Engine(), - args, RangefeedTBIEnabled.Get(&r.store.cfg.Settings.SV), iterSemRelease) + return rangefeed.NewCatchUpIterator(r.Engine(), args, iterSemRelease) } } p := r.registerWithRangefeedRaftMuLocked( diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 0c74853a99a8..fdfae86d90a3 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -126,6 +126,9 @@ var retiredSettings = map[string]struct{}{ "kv.bulk_ingest.buffer_increment": {}, "schemachanger.backfiller.buffer_increment": {}, "kv.rangefeed.separated_intent_scan.enabled": {}, + + // removed as of 22.2. + "kv.rangefeed.catchup_scan_iterator_optimization.enabled": {}, } var sqlDefaultSettings = map[string]struct{}{