Skip to content

Commit

Permalink
Merge #82450
Browse files Browse the repository at this point in the history
82450: kvserver/rangefeed: always use time-bound iterators for catchup scans r=stevendanna a=erikgrinaker

This patch always uses time-bound iterators for rangefeed catchup scans.
Previously, this was controlled by the default-off cluster setting
`kv.rangefeed.catchup_scan_iterator_optimization.enabled`, which has now
been removed. These have been used in production by several users, who
have seen significant performance improvements without reports of any
issues.

Resolves #79728.
Touches #82454.

Release note (performance improvement): Changefeed catchup scans now use
time-bound iterators, which improves their performance by avoiding
accessing data that is outside the catchup scan time interval.
Previously, this was controlled by the default-off cluster setting
`kv.rangefeed.catchup_scan_iterator_optimization.enabled`, which has now
been removed (it is in effect always enabled).

Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
craig[bot] and erikgrinaker committed Jun 6, 2022
2 parents 2c08deb + 8d3f4cf commit f0372fd
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 115 deletions.
14 changes: 0 additions & 14 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -90,19 +89,6 @@ type cdcTestArgs struct {
func cdcClusterSettings(t test.Test, db *sqlutils.SQLRunner) {
// kv.rangefeed.enabled is required for changefeeds to run
db.Exec(t, "SET CLUSTER SETTING kv.rangefeed.enabled = true")
randomlyRun(t, db, "SET CLUSTER SETTING kv.rangefeed.catchup_scan_iterator_optimization.enabled = false")
}

const randomSettingPercent = 0.50

var rng, _ = randutil.NewTestRand()

func randomlyRun(t test.Test, db *sqlutils.SQLRunner, query string) {
if rng.Float64() < randomSettingPercent {
db.Exec(t, query)
t.L().Printf("setting non-default cluster setting: %s", query)
}

}

func cdcBasicTest(ctx context.Context, t test.Test, c cluster.Cluster, args cdcTestArgs) {
Expand Down
54 changes: 22 additions & 32 deletions pkg/kv/kvserver/rangefeed/catchup_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,42 +53,32 @@ func (i simpleCatchupIterAdapter) NextIgnoringTime() {
var _ simpleCatchupIter = simpleCatchupIterAdapter{}

// NewCatchUpIterator returns a CatchUpIterator for the given Reader.
// If useTBI is true, a time-bound iterator will be used if possible,
// configured with a start time taken from the RangeFeedRequest.
func NewCatchUpIterator(
reader storage.Reader, args *roachpb.RangeFeedRequest, useTBI bool, closer func(),
reader storage.Reader, args *roachpb.RangeFeedRequest, closer func(),
) *CatchUpIterator {
ret := &CatchUpIterator{
return &CatchUpIterator{
simpleCatchupIter: storage.NewMVCCIncrementalIterator(reader,
storage.MVCCIncrementalIterOptions{
EnableTimeBoundIteratorOptimization: true,
EndKey: args.Span.EndKey,
// StartTime is exclusive but args.Timestamp
// is inclusive.
StartTime: args.Timestamp.Prev(),
EndTime: hlc.MaxTimestamp,
// We want to emit intents rather than error
// (the default behavior) so that we can skip
// over the provisional values during
// iteration.
IntentPolicy: storage.MVCCIncrementalIterIntentPolicyEmit,
// CatchUpScan currently emits all inline
// values it encounters.
//
// TODO(ssd): Re-evalutate if this behavior is
// still needed (#69357).
InlinePolicy: storage.MVCCIncrementalIterInlinePolicyEmit,
}),
close: closer,
}
if useTBI {
ret.simpleCatchupIter = storage.NewMVCCIncrementalIterator(reader, storage.MVCCIncrementalIterOptions{
EnableTimeBoundIteratorOptimization: true,
EndKey: args.Span.EndKey,
// StartTime is exclusive but args.Timestamp
// is inclusive.
StartTime: args.Timestamp.Prev(),
EndTime: hlc.MaxTimestamp,
// We want to emit intents rather than error
// (the default behavior) so that we can skip
// over the provisional values during
// iteration.
IntentPolicy: storage.MVCCIncrementalIterIntentPolicyEmit,
// CatchUpScan currently emits all inline
// values it encounters.
//
// TODO(ssd): Re-evalutate if this behavior is
// still needed (#69357).
InlinePolicy: storage.MVCCIncrementalIterInlinePolicyEmit,
})
} else {
iter := reader.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
UpperBound: args.Span.EndKey,
})
ret.simpleCatchupIter = simpleCatchupIterAdapter{SimpleMVCCIterator: iter}
}

return ret
}

// Close closes the iterator and calls the instantiator-supplied close
Expand Down
30 changes: 12 additions & 18 deletions pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func runCatchUpBenchmark(b *testing.B, emk engineMaker, opts benchOptions) {
},
WithDiff: opts.withDiff,
Span: span,
}, opts.useTBI, func() {})
}, func() {})
defer iter.Close()
counter := 0
err := iter.CatchUpScan(storage.MakeMVCCMetadataKey(startKey), storage.MakeMVCCMetadataKey(endKey), opts.ts, opts.withDiff, func(*roachpb.RangeFeedEvent) error {
Expand Down Expand Up @@ -136,22 +136,17 @@ func BenchmarkCatchUpScan(b *testing.B) {

for name, do := range dataOpts {
b.Run(name, func(b *testing.B) {
for _, useTBI := range []bool{true, false} {
b.Run(fmt.Sprintf("useTBI=%v", useTBI), func(b *testing.B) {
for _, withDiff := range []bool{true, false} {
b.Run(fmt.Sprintf("withDiff=%v", withDiff), func(b *testing.B) {
for _, tsExcludePercent := range []float64{0.0, 0.50, 0.75, 0.95, 0.99} {
wallTime := int64((5 * (float64(numKeys)*tsExcludePercent + 1)))
ts := hlc.Timestamp{WallTime: wallTime}
b.Run(fmt.Sprintf("perc=%2.2f", tsExcludePercent*100), func(b *testing.B) {
runCatchUpBenchmark(b, setupMVCCPebble, benchOptions{
dataOpts: do,
ts: ts,
useTBI: useTBI,
withDiff: withDiff,
})
})
}
for _, withDiff := range []bool{true, false} {
b.Run(fmt.Sprintf("withDiff=%v", withDiff), func(b *testing.B) {
for _, tsExcludePercent := range []float64{0.0, 0.50, 0.75, 0.95, 0.99} {
wallTime := int64((5 * (float64(numKeys)*tsExcludePercent + 1)))
ts := hlc.Timestamp{WallTime: wallTime}
b.Run(fmt.Sprintf("perc=%2.2f", tsExcludePercent*100), func(b *testing.B) {
runCatchUpBenchmark(b, setupMVCCPebble, benchOptions{
dataOpts: do,
ts: ts,
withDiff: withDiff,
})
})
}
})
Expand All @@ -170,7 +165,6 @@ type benchDataOptions struct {

type benchOptions struct {
ts hlc.Timestamp
useTBI bool
withDiff bool
dataOpts benchDataOptions
}
Expand Down
78 changes: 38 additions & 40 deletions pkg/kv/kvserver/rangefeed/catchup_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,46 +91,44 @@ func TestCatchupScan(t *testing.T) {
if err := storage.MVCCPut(ctx, eng, nil, kv1_4_4.Key.Key, txn.ReadTimestamp, hlc.ClockTimestamp{}, val, &txn); err != nil {
t.Fatal(err)
}
testutils.RunTrueAndFalse(t, "useTBI", func(t *testing.T, useTBI bool) {
testutils.RunTrueAndFalse(t, "withDiff", func(t *testing.T, withDiff bool) {
iter := NewCatchUpIterator(eng, &roachpb.RangeFeedRequest{
Header: roachpb.Header{
// Inclusive, so want everything >= ts2
Timestamp: ts2,
},
Span: roachpb.Span{
EndKey: roachpb.KeyMax,
},
WithDiff: withDiff,
}, useTBI, nil)
defer iter.Close()
var events []roachpb.RangeFeedValue
// ts1 here is exclusive, so we do not want the versions at ts1.
require.NoError(t, iter.CatchUpScan(storage.MakeMVCCMetadataKey(testKey1),
storage.MakeMVCCMetadataKey(roachpb.KeyMax), ts1, withDiff,
func(e *roachpb.RangeFeedEvent) error {
events = append(events, *e.Val)
return nil
}))
require.Equal(t, 4, len(events))
checkEquality := func(
kv storage.MVCCKeyValue, prevKV storage.MVCCKeyValue, event roachpb.RangeFeedValue) {
require.Equal(t, string(kv.Key.Key), string(event.Key))
require.Equal(t, kv.Key.Timestamp, event.Value.Timestamp)
require.Equal(t, string(kv.Value), string(event.Value.RawBytes))
if withDiff {
// TODO(sumeer): uncomment after clarifying CatchUpScan behavior.
// require.Equal(t, prevKV.Key.Timestamp, event.PrevValue.Timestamp)
require.Equal(t, string(prevKV.Value), string(event.PrevValue.RawBytes))
} else {
require.Equal(t, hlc.Timestamp{}, event.PrevValue.Timestamp)
require.Equal(t, 0, len(event.PrevValue.RawBytes))
}
testutils.RunTrueAndFalse(t, "withDiff", func(t *testing.T, withDiff bool) {
iter := NewCatchUpIterator(eng, &roachpb.RangeFeedRequest{
Header: roachpb.Header{
// Inclusive, so want everything >= ts2
Timestamp: ts2,
},
Span: roachpb.Span{
EndKey: roachpb.KeyMax,
},
WithDiff: withDiff,
}, nil)
defer iter.Close()
var events []roachpb.RangeFeedValue
// ts1 here is exclusive, so we do not want the versions at ts1.
require.NoError(t, iter.CatchUpScan(storage.MakeMVCCMetadataKey(testKey1),
storage.MakeMVCCMetadataKey(roachpb.KeyMax), ts1, withDiff,
func(e *roachpb.RangeFeedEvent) error {
events = append(events, *e.Val)
return nil
}))
require.Equal(t, 4, len(events))
checkEquality := func(
kv storage.MVCCKeyValue, prevKV storage.MVCCKeyValue, event roachpb.RangeFeedValue) {
require.Equal(t, string(kv.Key.Key), string(event.Key))
require.Equal(t, kv.Key.Timestamp, event.Value.Timestamp)
require.Equal(t, string(kv.Value), string(event.Value.RawBytes))
if withDiff {
// TODO(sumeer): uncomment after clarifying CatchUpScan behavior.
// require.Equal(t, prevKV.Key.Timestamp, event.PrevValue.Timestamp)
require.Equal(t, string(prevKV.Value), string(event.PrevValue.RawBytes))
} else {
require.Equal(t, hlc.Timestamp{}, event.PrevValue.Timestamp)
require.Equal(t, 0, len(event.PrevValue.RawBytes))
}
checkEquality(kv1_2_2, kv1_1_1, events[0])
checkEquality(kv1_3_3, kv1_2_2, events[1])
checkEquality(kv2_2_2, kv2_1_1, events[2])
checkEquality(kv2_5_3, kv2_2_2, events[3])
})
}
checkEquality(kv1_2_2, kv1_1_1, events[0])
checkEquality(kv1_3_3, kv1_2_2, events[1])
checkEquality(kv2_2_2, kv2_1_1, events[2])
checkEquality(kv2_5_3, kv2_2_2, events[3])
})
}
12 changes: 1 addition & 11 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -60,14 +59,6 @@ var RangeFeedRefreshInterval = settings.RegisterDurationSetting(
settings.NonNegativeDuration,
)

// RangefeedTBIEnabled controls whether or not we use a TBI during catch-up scan.
var RangefeedTBIEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"kv.rangefeed.catchup_scan_iterator_optimization.enabled",
"if true, rangefeeds will use time-bound iterators for catchup-scans when possible",
util.ConstantWithMetamorphicTestBool("kv.rangefeed.catchup_scan_iterator_optimization.enabled", true),
)

// lockedRangefeedStream is an implementation of rangefeed.Stream which provides
// support for concurrent calls to Send. Note that the default implementation of
// grpc.Stream is not safe for concurrent calls to Send.
Expand Down Expand Up @@ -231,8 +222,7 @@ func (r *Replica) rangeFeedWithRangeID(
// Assert that we still hold the raftMu when this is called to ensure
// that the catchUpIter reads from the current snapshot.
r.raftMu.AssertHeld()
return rangefeed.NewCatchUpIterator(r.Engine(),
args, RangefeedTBIEnabled.Get(&r.store.cfg.Settings.SV), iterSemRelease)
return rangefeed.NewCatchUpIterator(r.Engine(), args, iterSemRelease)
}
}
p := r.registerWithRangefeedRaftMuLocked(
Expand Down
3 changes: 3 additions & 0 deletions pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ var retiredSettings = map[string]struct{}{
"kv.bulk_ingest.buffer_increment": {},
"schemachanger.backfiller.buffer_increment": {},
"kv.rangefeed.separated_intent_scan.enabled": {},

// removed as of 22.2.
"kv.rangefeed.catchup_scan_iterator_optimization.enabled": {},
}

var sqlDefaultSettings = map[string]struct{}{
Expand Down

0 comments on commit f0372fd

Please sign in to comment.