Skip to content

Commit

Permalink
Merge #90565
Browse files Browse the repository at this point in the history
90565: rangefeed,storage: use time-bound iterator for with-diff catchup scan r=jayshrivastava a=jayshrivastava

Backport of #80591

/cc https://github.com/orgs/cockroachdb/teams/release

Release note: None
Epic: None

Co-authored-by: sumeerbhola <[email protected]>
  • Loading branch information
craig[bot] and sumeerbhola committed Oct 28, 2022
2 parents 5523ed8 + 2cc03c1 commit 9fd098d
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 74 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/rangefeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_test(
size = "small",
srcs = [
"catchup_scan_bench_test.go",
"catchup_scan_test.go",
"processor_test.go",
"registry_test.go",
"resolved_timestamp_test.go",
Expand Down
86 changes: 60 additions & 26 deletions pkg/kv/kvserver/rangefeed/catchup_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,34 @@ import (

// A CatchUpIterator is an iterator for catchUp-scans.
type CatchUpIterator struct {
storage.SimpleMVCCIterator
simpleCatchupIter
close func()
}

// simpleCatchupIter is an extension of SimpleMVCCIterator that allows for the
// primary iterator to be implemented using a regular MVCCIterator or a
// (often) more efficient MVCCIncrementalIterator. When the caller wants to
// iterate to see older versions of a key, the desire of the caller needs to
// be expressed using one of two methods:
// - Next: when it wants to omit any versions that are not within the time
// bounds.
// - NextIgnoringTime: when it wants to see the next older version even if it
// is not within the time bounds.
type simpleCatchupIter interface {
storage.SimpleMVCCIterator
NextIgnoringTime()
}

type simpleCatchupIterAdapter struct {
storage.SimpleMVCCIterator
}

func (i simpleCatchupIterAdapter) NextIgnoringTime() {
i.SimpleMVCCIterator.Next()
}

var _ simpleCatchupIter = simpleCatchupIterAdapter{}

// NewCatchUpIterator returns a CatchUpIterator for the given Reader.
// If useTBI is true, a time-bound iterator will be used if possible,
// configured with a start time taken from the RangeFeedRequest.
Expand All @@ -37,13 +61,8 @@ func NewCatchUpIterator(
ret := &CatchUpIterator{
close: closer,
}
// TODO(ssd): The withDiff option requires us to iterate over
// values arbitrarily in the past so that we can populate the
// previous value of a key. This is possible since the
// IncrementalIterator has a non-timebound iterator
// internally, but it is not yet implemented.
if useTBI && !args.WithDiff {
ret.SimpleMVCCIterator = storage.NewMVCCIncrementalIterator(reader, storage.MVCCIncrementalIterOptions{
if useTBI {
ret.simpleCatchupIter = storage.NewMVCCIncrementalIterator(reader, storage.MVCCIncrementalIterOptions{
EnableTimeBoundIteratorOptimization: true,
EndKey: args.Span.EndKey,
// StartTime is exclusive but args.Timestamp
Expand All @@ -63,9 +82,10 @@ func NewCatchUpIterator(
InlinePolicy: storage.MVCCIncrementalIterInlinePolicyEmit,
})
} else {
ret.SimpleMVCCIterator = reader.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
iter := reader.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
UpperBound: args.Span.EndKey,
})
ret.simpleCatchupIter = simpleCatchupIterAdapter{SimpleMVCCIterator: iter}
}

return ret
Expand All @@ -74,7 +94,7 @@ func NewCatchUpIterator(
// Close closes the iterator and calls the instantiator-supplied close
// callback.
func (i *CatchUpIterator) Close() {
i.SimpleMVCCIterator.Close()
i.simpleCatchupIter.Close()
if i.close != nil {
i.close()
}
Expand All @@ -86,8 +106,8 @@ func (i *CatchUpIterator) Close() {
type outputEventFn func(e *roachpb.RangeFeedEvent) error

// CatchUpScan iterates over all changes for the given span of keys,
// starting at catchUpTimestamp. Keys and Values are emitted as
// RangeFeedEvents passed to the given outputFn.
// starting at catchUpTimestamp. Keys and Values are emitted as
// RangeFeedEvents passed to the given outputFn. catchUpTimestamp is exclusive.
func (i *CatchUpIterator) CatchUpScan(
startKey, endKey storage.MVCCKey,
catchUpTimestamp hlc.Timestamp,
Expand All @@ -107,6 +127,8 @@ func (i *CatchUpIterator) CatchUpScan(
if reorderBuf[l-1].Val.PrevValue.IsPresent() {
panic("RangeFeedValue.PrevVal unexpectedly set")
}
// TODO(sumeer): find out if it is deliberate that we are not populating
// PrevValue.Timestamp.
reorderBuf[l-1].Val.PrevValue.RawBytes = val
}
}
Expand Down Expand Up @@ -143,18 +165,23 @@ func (i *CatchUpIterator) CatchUpScan(
}
if !meta.IsInline() {
// This is an MVCCMetadata key for an intent. The catchUp scan
// only cares about committed values, so ignore this and skip
// past the corresponding provisional key-value. To do this,
// scan to the timestamp immediately before (i.e. the key
// immediately after) the provisional key.
//
// Make a copy since should not pass an unsafe key from the iterator
// that provided it, when asking it to seek.
a, unsafeKey.Key = a.Copy(unsafeKey.Key, 0)
i.SeekGE(storage.MVCCKey{
Key: unsafeKey.Key,
Timestamp: meta.Timestamp.ToTimestamp().Prev(),
})
// only cares about committed values, so ignore this and skip past
// the corresponding provisional key-value. To do this, iterate to
// the provisional key-value, validate its timestamp, then iterate
// again. When using MVCCIncrementalIterator we know that the
// provisional value will also be within the time bounds so we use
// Next.
i.Next()
if ok, err := i.Valid(); err != nil {
return errors.Wrap(err, "iterating to provisional value for intent")
} else if !ok {
return errors.Errorf("expected provisional value for intent")
}
if !meta.Timestamp.ToTimestamp().EqOrdering(i.UnsafeKey().Timestamp) {
return errors.Errorf("expected provisional value for intent with ts %s, found %s",
meta.Timestamp, i.UnsafeKey().Timestamp)
}
i.Next()
continue
}

Expand Down Expand Up @@ -231,8 +258,15 @@ func (i *CatchUpIterator) CatchUpScan(
// Skip all the way to the next key.
i.NextKey()
} else {
// Move to the next version of this key.
i.Next()
// Move to the next version of this key (there may not be one, in which
// case it will move to the next key).
if withDiff {
// Need to see the next version even if it is older than the time
// bounds.
i.NextIgnoringTime()
} else {
i.Next()
}
}
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,7 @@ func BenchmarkCatchUpScan(b *testing.B) {
b.Run(name, func(b *testing.B) {
for _, useTBI := range []bool{true, false} {
b.Run(fmt.Sprintf("useTBI=%v", useTBI), func(b *testing.B) {
// TODO(ssd): withDiff isn't currently supported by the TBI optimization.
for _, withDiff := range []bool{false} {
for _, withDiff := range []bool{true, false} {
b.Run(fmt.Sprintf("withDiff=%v", withDiff), func(b *testing.B) {
for _, tsExcludePercent := range []float64{0.0, 0.50, 0.75, 0.95, 0.99} {
wallTime := int64((5 * (float64(numKeys)*tsExcludePercent + 1)))
Expand Down
136 changes: 136 additions & 0 deletions pkg/kv/kvserver/rangefeed/catchup_scan_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package rangefeed

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/stretchr/testify/require"
)

func TestCatchupScan(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

var (
testKey1 = roachpb.Key("/db1")
testKey2 = roachpb.Key("/db2")

testValue1 = []byte("val1")
testValue2 = []byte("val2")
testValue3 = []byte("val3")
testValue4 = []byte("val4")

ts1 = hlc.Timestamp{WallTime: 1, Logical: 0}
ts2 = hlc.Timestamp{WallTime: 2, Logical: 0}
ts3 = hlc.Timestamp{WallTime: 3, Logical: 0}
ts4 = hlc.Timestamp{WallTime: 4, Logical: 0}
ts5 = hlc.Timestamp{WallTime: 4, Logical: 0}
)

makeTxn := func(key roachpb.Key, val []byte, ts hlc.Timestamp,
) (roachpb.Transaction, roachpb.Value) {
txnID := uuid.MakeV4()
txnMeta := enginepb.TxnMeta{
Key: key,
ID: txnID,
Epoch: 1,
WriteTimestamp: ts,
}
return roachpb.Transaction{
TxnMeta: txnMeta,
ReadTimestamp: ts,
}, roachpb.Value{
RawBytes: val,
}
}

makeKTV := func(key roachpb.Key, ts hlc.Timestamp, value []byte) storage.MVCCKeyValue {
return storage.MVCCKeyValue{Key: storage.MVCCKey{Key: key, Timestamp: ts}, Value: value}
}
// testKey1 has an intent and provisional value that will be skipped. Both
// testKey1 and testKey2 have a value that is older than what we need with
// the catchup scan, but will be read if a diff is desired.
kv1_1_1 := makeKTV(testKey1, ts1, testValue1)
kv1_2_2 := makeKTV(testKey1, ts2, testValue2)
kv1_3_3 := makeKTV(testKey1, ts3, testValue3)
kv1_4_4 := makeKTV(testKey1, ts4, testValue4)
txn, val := makeTxn(testKey1, testValue4, ts4)
kv2_1_1 := makeKTV(testKey2, ts1, testValue1)
kv2_2_2 := makeKTV(testKey2, ts2, testValue2)
kv2_5_3 := makeKTV(testKey2, ts5, testValue3)

eng := storage.NewDefaultInMemForTesting()
defer eng.Close()
// Put with no intent.
for _, kv := range []storage.MVCCKeyValue{kv1_1_1, kv1_2_2, kv1_3_3, kv2_1_1, kv2_2_2, kv2_5_3} {
v := roachpb.Value{RawBytes: kv.Value}
if err := storage.MVCCPut(ctx, eng, nil, kv.Key.Key, kv.Key.Timestamp, v, nil); err != nil {
t.Fatal(err)
}
}
// Put with an intent.
if err := storage.MVCCPut(ctx, eng, nil, kv1_4_4.Key.Key, txn.ReadTimestamp, val, &txn); err != nil {
t.Fatal(err)
}
testutils.RunTrueAndFalse(t, "useTBI", func(t *testing.T, useTBI bool) {
testutils.RunTrueAndFalse(t, "withDiff", func(t *testing.T, withDiff bool) {
iter := NewCatchUpIterator(eng, &roachpb.RangeFeedRequest{
Header: roachpb.Header{
// Inclusive, so want everything >= ts2
Timestamp: ts2,
},
Span: roachpb.Span{
EndKey: roachpb.KeyMax,
},
WithDiff: withDiff,
}, useTBI, nil)
defer iter.Close()
var events []roachpb.RangeFeedValue
// ts1 here is exclusive, so we do not want the versions at ts1.
require.NoError(t, iter.CatchUpScan(storage.MakeMVCCMetadataKey(testKey1),
storage.MakeMVCCMetadataKey(roachpb.KeyMax), ts1, withDiff,
func(e *roachpb.RangeFeedEvent) error {
events = append(events, *e.Val)
return nil
}))
require.Equal(t, 4, len(events))
checkEquality := func(
kv storage.MVCCKeyValue, prevKV storage.MVCCKeyValue, event roachpb.RangeFeedValue) {
require.Equal(t, string(kv.Key.Key), string(event.Key))
require.Equal(t, kv.Key.Timestamp, event.Value.Timestamp)
require.Equal(t, string(kv.Value), string(event.Value.RawBytes))
if withDiff {
// TODO(sumeer): uncomment after clarifying CatchUpScan behavior.
// require.Equal(t, prevKV.Key.Timestamp, event.PrevValue.Timestamp)
require.Equal(t, string(prevKV.Value), string(event.PrevValue.RawBytes))
} else {
require.Equal(t, hlc.Timestamp{}, event.PrevValue.Timestamp)
require.Equal(t, 0, len(event.PrevValue.RawBytes))
}
}
checkEquality(kv1_2_2, kv1_1_1, events[0])
checkEquality(kv1_3_3, kv1_2_2, events[1])
checkEquality(kv2_2_2, kv2_1_1, events[2])
checkEquality(kv2_5_3, kv2_2_2, events[3])
})
})
}
4 changes: 3 additions & 1 deletion pkg/kv/kvserver/rangefeed/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ func makeCatchUpIteratorConstructor(iter storage.SimpleMVCCIterator) CatchUpIter
if iter == nil {
return nil
}
return func() *CatchUpIterator { return &CatchUpIterator{SimpleMVCCIterator: iter} }
return func() *CatchUpIterator {
return &CatchUpIterator{simpleCatchupIter: simpleCatchupIterAdapter{iter}}
}
}

func newTestRegistration(
Expand Down
Loading

0 comments on commit 9fd098d

Please sign in to comment.