diff --git a/pkg/kv/kvserver/rangefeed/BUILD.bazel b/pkg/kv/kvserver/rangefeed/BUILD.bazel index a019042dd6ce..7c212c7eccb1 100644 --- a/pkg/kv/kvserver/rangefeed/BUILD.bazel +++ b/pkg/kv/kvserver/rangefeed/BUILD.bazel @@ -39,6 +39,7 @@ go_test( size = "small", srcs = [ "catchup_scan_bench_test.go", + "catchup_scan_test.go", "processor_test.go", "registry_test.go", "resolved_timestamp_test.go", diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan.go b/pkg/kv/kvserver/rangefeed/catchup_scan.go index b1dd40b37d0e..0482e0c84bbb 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan.go @@ -24,10 +24,34 @@ import ( // A CatchUpIterator is an iterator for catchUp-scans. type CatchUpIterator struct { - storage.SimpleMVCCIterator + simpleCatchupIter close func() } +// simpleCatchupIter is an extension of SimpleMVCCIterator that allows for the +// primary iterator to be implemented using a regular MVCCIterator or a +// (often) more efficient MVCCIncrementalIterator. When the caller wants to +// iterate to see older versions of a key, the desire of the caller needs to +// be expressed using one of two methods: +// - Next: when it wants to omit any versions that are not within the time +// bounds. +// - NextIgnoringTime: when it wants to see the next older version even if it +// is not within the time bounds. +type simpleCatchupIter interface { + storage.SimpleMVCCIterator + NextIgnoringTime() +} + +type simpleCatchupIterAdapter struct { + storage.SimpleMVCCIterator +} + +func (i simpleCatchupIterAdapter) NextIgnoringTime() { + i.SimpleMVCCIterator.Next() +} + +var _ simpleCatchupIter = simpleCatchupIterAdapter{} + // NewCatchUpIterator returns a CatchUpIterator for the given Reader. // If useTBI is true, a time-bound iterator will be used if possible, // configured with a start time taken from the RangeFeedRequest. @@ -37,13 +61,8 @@ func NewCatchUpIterator( ret := &CatchUpIterator{ close: closer, } - // TODO(ssd): The withDiff option requires us to iterate over - // values arbitrarily in the past so that we can populate the - // previous value of a key. This is possible since the - // IncrementalIterator has a non-timebound iterator - // internally, but it is not yet implemented. - if useTBI && !args.WithDiff { - ret.SimpleMVCCIterator = storage.NewMVCCIncrementalIterator(reader, storage.MVCCIncrementalIterOptions{ + if useTBI { + ret.simpleCatchupIter = storage.NewMVCCIncrementalIterator(reader, storage.MVCCIncrementalIterOptions{ EnableTimeBoundIteratorOptimization: true, EndKey: args.Span.EndKey, // StartTime is exclusive but args.Timestamp @@ -63,9 +82,10 @@ func NewCatchUpIterator( InlinePolicy: storage.MVCCIncrementalIterInlinePolicyEmit, }) } else { - ret.SimpleMVCCIterator = reader.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ + iter := reader.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{ UpperBound: args.Span.EndKey, }) + ret.simpleCatchupIter = simpleCatchupIterAdapter{SimpleMVCCIterator: iter} } return ret @@ -74,7 +94,7 @@ func NewCatchUpIterator( // Close closes the iterator and calls the instantiator-supplied close // callback. func (i *CatchUpIterator) Close() { - i.SimpleMVCCIterator.Close() + i.simpleCatchupIter.Close() if i.close != nil { i.close() } @@ -86,8 +106,8 @@ func (i *CatchUpIterator) Close() { type outputEventFn func(e *roachpb.RangeFeedEvent) error // CatchUpScan iterates over all changes for the given span of keys, -// starting at catchUpTimestamp. Keys and Values are emitted as -// RangeFeedEvents passed to the given outputFn. +// starting at catchUpTimestamp. Keys and Values are emitted as +// RangeFeedEvents passed to the given outputFn. catchUpTimestamp is exclusive. func (i *CatchUpIterator) CatchUpScan( startKey, endKey storage.MVCCKey, catchUpTimestamp hlc.Timestamp, @@ -107,6 +127,8 @@ func (i *CatchUpIterator) CatchUpScan( if reorderBuf[l-1].Val.PrevValue.IsPresent() { panic("RangeFeedValue.PrevVal unexpectedly set") } + // TODO(sumeer): find out if it is deliberate that we are not populating + // PrevValue.Timestamp. reorderBuf[l-1].Val.PrevValue.RawBytes = val } } @@ -143,18 +165,23 @@ func (i *CatchUpIterator) CatchUpScan( } if !meta.IsInline() { // This is an MVCCMetadata key for an intent. The catchUp scan - // only cares about committed values, so ignore this and skip - // past the corresponding provisional key-value. To do this, - // scan to the timestamp immediately before (i.e. the key - // immediately after) the provisional key. - // - // Make a copy since should not pass an unsafe key from the iterator - // that provided it, when asking it to seek. - a, unsafeKey.Key = a.Copy(unsafeKey.Key, 0) - i.SeekGE(storage.MVCCKey{ - Key: unsafeKey.Key, - Timestamp: meta.Timestamp.ToTimestamp().Prev(), - }) + // only cares about committed values, so ignore this and skip past + // the corresponding provisional key-value. To do this, iterate to + // the provisional key-value, validate its timestamp, then iterate + // again. When using MVCCIncrementalIterator we know that the + // provisional value will also be within the time bounds so we use + // Next. + i.Next() + if ok, err := i.Valid(); err != nil { + return errors.Wrap(err, "iterating to provisional value for intent") + } else if !ok { + return errors.Errorf("expected provisional value for intent") + } + if !meta.Timestamp.ToTimestamp().EqOrdering(i.UnsafeKey().Timestamp) { + return errors.Errorf("expected provisional value for intent with ts %s, found %s", + meta.Timestamp, i.UnsafeKey().Timestamp) + } + i.Next() continue } @@ -231,8 +258,15 @@ func (i *CatchUpIterator) CatchUpScan( // Skip all the way to the next key. i.NextKey() } else { - // Move to the next version of this key. - i.Next() + // Move to the next version of this key (there may not be one, in which + // case it will move to the next key). + if withDiff { + // Need to see the next version even if it is older than the time + // bounds. + i.NextIgnoringTime() + } else { + i.Next() + } } } diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go index 467fc6395675..78c4843f4427 100644 --- a/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go +++ b/pkg/kv/kvserver/rangefeed/catchup_scan_bench_test.go @@ -132,8 +132,7 @@ func BenchmarkCatchUpScan(b *testing.B) { b.Run(name, func(b *testing.B) { for _, useTBI := range []bool{true, false} { b.Run(fmt.Sprintf("useTBI=%v", useTBI), func(b *testing.B) { - // TODO(ssd): withDiff isn't currently supported by the TBI optimization. - for _, withDiff := range []bool{false} { + for _, withDiff := range []bool{true, false} { b.Run(fmt.Sprintf("withDiff=%v", withDiff), func(b *testing.B) { for _, tsExcludePercent := range []float64{0.0, 0.50, 0.75, 0.95, 0.99} { wallTime := int64((5 * (float64(numKeys)*tsExcludePercent + 1))) diff --git a/pkg/kv/kvserver/rangefeed/catchup_scan_test.go b/pkg/kv/kvserver/rangefeed/catchup_scan_test.go new file mode 100644 index 000000000000..62bc973d3c0b --- /dev/null +++ b/pkg/kv/kvserver/rangefeed/catchup_scan_test.go @@ -0,0 +1,136 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package rangefeed + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/stretchr/testify/require" +) + +func TestCatchupScan(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + var ( + testKey1 = roachpb.Key("/db1") + testKey2 = roachpb.Key("/db2") + + testValue1 = []byte("val1") + testValue2 = []byte("val2") + testValue3 = []byte("val3") + testValue4 = []byte("val4") + + ts1 = hlc.Timestamp{WallTime: 1, Logical: 0} + ts2 = hlc.Timestamp{WallTime: 2, Logical: 0} + ts3 = hlc.Timestamp{WallTime: 3, Logical: 0} + ts4 = hlc.Timestamp{WallTime: 4, Logical: 0} + ts5 = hlc.Timestamp{WallTime: 4, Logical: 0} + ) + + makeTxn := func(key roachpb.Key, val []byte, ts hlc.Timestamp, + ) (roachpb.Transaction, roachpb.Value) { + txnID := uuid.MakeV4() + txnMeta := enginepb.TxnMeta{ + Key: key, + ID: txnID, + Epoch: 1, + WriteTimestamp: ts, + } + return roachpb.Transaction{ + TxnMeta: txnMeta, + ReadTimestamp: ts, + }, roachpb.Value{ + RawBytes: val, + } + } + + makeKTV := func(key roachpb.Key, ts hlc.Timestamp, value []byte) storage.MVCCKeyValue { + return storage.MVCCKeyValue{Key: storage.MVCCKey{Key: key, Timestamp: ts}, Value: value} + } + // testKey1 has an intent and provisional value that will be skipped. Both + // testKey1 and testKey2 have a value that is older than what we need with + // the catchup scan, but will be read if a diff is desired. + kv1_1_1 := makeKTV(testKey1, ts1, testValue1) + kv1_2_2 := makeKTV(testKey1, ts2, testValue2) + kv1_3_3 := makeKTV(testKey1, ts3, testValue3) + kv1_4_4 := makeKTV(testKey1, ts4, testValue4) + txn, val := makeTxn(testKey1, testValue4, ts4) + kv2_1_1 := makeKTV(testKey2, ts1, testValue1) + kv2_2_2 := makeKTV(testKey2, ts2, testValue2) + kv2_5_3 := makeKTV(testKey2, ts5, testValue3) + + eng := storage.NewDefaultInMemForTesting() + defer eng.Close() + // Put with no intent. + for _, kv := range []storage.MVCCKeyValue{kv1_1_1, kv1_2_2, kv1_3_3, kv2_1_1, kv2_2_2, kv2_5_3} { + v := roachpb.Value{RawBytes: kv.Value} + if err := storage.MVCCPut(ctx, eng, nil, kv.Key.Key, kv.Key.Timestamp, v, nil); err != nil { + t.Fatal(err) + } + } + // Put with an intent. + if err := storage.MVCCPut(ctx, eng, nil, kv1_4_4.Key.Key, txn.ReadTimestamp, val, &txn); err != nil { + t.Fatal(err) + } + testutils.RunTrueAndFalse(t, "useTBI", func(t *testing.T, useTBI bool) { + testutils.RunTrueAndFalse(t, "withDiff", func(t *testing.T, withDiff bool) { + iter := NewCatchUpIterator(eng, &roachpb.RangeFeedRequest{ + Header: roachpb.Header{ + // Inclusive, so want everything >= ts2 + Timestamp: ts2, + }, + Span: roachpb.Span{ + EndKey: roachpb.KeyMax, + }, + WithDiff: withDiff, + }, useTBI, nil) + defer iter.Close() + var events []roachpb.RangeFeedValue + // ts1 here is exclusive, so we do not want the versions at ts1. + require.NoError(t, iter.CatchUpScan(storage.MakeMVCCMetadataKey(testKey1), + storage.MakeMVCCMetadataKey(roachpb.KeyMax), ts1, withDiff, + func(e *roachpb.RangeFeedEvent) error { + events = append(events, *e.Val) + return nil + })) + require.Equal(t, 4, len(events)) + checkEquality := func( + kv storage.MVCCKeyValue, prevKV storage.MVCCKeyValue, event roachpb.RangeFeedValue) { + require.Equal(t, string(kv.Key.Key), string(event.Key)) + require.Equal(t, kv.Key.Timestamp, event.Value.Timestamp) + require.Equal(t, string(kv.Value), string(event.Value.RawBytes)) + if withDiff { + // TODO(sumeer): uncomment after clarifying CatchUpScan behavior. + // require.Equal(t, prevKV.Key.Timestamp, event.PrevValue.Timestamp) + require.Equal(t, string(prevKV.Value), string(event.PrevValue.RawBytes)) + } else { + require.Equal(t, hlc.Timestamp{}, event.PrevValue.Timestamp) + require.Equal(t, 0, len(event.PrevValue.RawBytes)) + } + } + checkEquality(kv1_2_2, kv1_1_1, events[0]) + checkEquality(kv1_3_3, kv1_2_2, events[1]) + checkEquality(kv2_2_2, kv2_1_1, events[2]) + checkEquality(kv2_5_3, kv2_2_2, events[3]) + }) + }) +} diff --git a/pkg/kv/kvserver/rangefeed/registry_test.go b/pkg/kv/kvserver/rangefeed/registry_test.go index 38273abfd152..aed6bf38fc49 100644 --- a/pkg/kv/kvserver/rangefeed/registry_test.go +++ b/pkg/kv/kvserver/rangefeed/registry_test.go @@ -99,7 +99,9 @@ func makeCatchUpIteratorConstructor(iter storage.SimpleMVCCIterator) CatchUpIter if iter == nil { return nil } - return func() *CatchUpIterator { return &CatchUpIterator{SimpleMVCCIterator: iter} } + return func() *CatchUpIterator { + return &CatchUpIterator{simpleCatchupIter: simpleCatchupIterAdapter{iter}} + } } func newTestRegistration( diff --git a/pkg/storage/mvcc_incremental_iterator.go b/pkg/storage/mvcc_incremental_iterator.go index d7c00053de48..ed1c6673368d 100644 --- a/pkg/storage/mvcc_incremental_iterator.go +++ b/pkg/storage/mvcc_incremental_iterator.go @@ -41,20 +41,21 @@ import ( // CockroachDB uses that as a sentinel for key metadata anyway. // // Expected usage: -// iter := NewMVCCIncrementalIterator(e, IterOptions{ -// StartTime: startTime, -// EndTime: endTime, -// UpperBound: endKey, -// }) -// defer iter.Close() -// for iter.SeekGE(startKey); ; iter.Next() { -// ok, err := iter.Valid() -// if !ok { ... } -// [code using iter.Key() and iter.Value()] -// } -// if err := iter.Error(); err != nil { -// ... -// } +// +// iter := NewMVCCIncrementalIterator(e, IterOptions{ +// StartTime: startTime, +// EndTime: endTime, +// UpperBound: endKey, +// }) +// defer iter.Close() +// for iter.SeekGE(startKey); ; iter.Next() { +// ok, err := iter.Valid() +// if !ok { ... } +// [code using iter.Key() and iter.Value()] +// } +// if err := iter.Error(); err != nil { +// ... +// } // // Note regarding the correctness of the time-bound iterator optimization: // @@ -159,8 +160,8 @@ type MVCCIncrementalIterOptions struct { // specified reader and options. The timestamp hint range should not be more // restrictive than the start and end time range. // TODO(pbardea): Add validation here and in C++ implementation that the -// timestamp hints are not more restrictive than incremental iterator's -// (startTime, endTime] interval. +// timestamp hints are not more restrictive than incremental iterator's +// (startTime, endTime] interval. func NewMVCCIncrementalIterator( reader Reader, opts MVCCIncrementalIterOptions, ) *MVCCIncrementalIterator { @@ -272,8 +273,8 @@ func (i *MVCCIncrementalIterator) NextKey() { // maybeSkipKeys checks if any keys can be skipped by using a time-bound // iterator. If keys can be skipped, it will update the main iterator to point // to the earliest version of the next candidate key. -// It is expected that TBI is at a key <= main iterator key when calling -// maybeSkipKeys(). +// It is expected (but not required) that TBI is at a key <= main iterator key +// when calling maybeSkipKeys(). func (i *MVCCIncrementalIterator) maybeSkipKeys() { if i.timeBoundIter == nil { // If there is no time bound iterator, we cannot skip any keys. @@ -284,16 +285,15 @@ func (i *MVCCIncrementalIterator) maybeSkipKeys() { if iterKey.Compare(tbiKey) > 0 { // If the iterKey got ahead of the TBI key, advance the TBI Key. // - // The case where iterKey == tbiKey, after this call, is the fast-path is - // when the TBI and the main iterator are in lockstep. In this case, the - // main iterator was referencing the next key that would be visited by the - // TBI. This means that for the incremental iterator to perform a Next or - // NextKey will require only 1 extra NextKey invocation while they remain in - // lockstep. This could be common if most keys are modified or the - // modifications are clustered in keyspace. - // - // NB: The Seek() below is expensive, so we aim to avoid it if both - // iterators remain in lockstep as described above. + // We fast-path the case where the main iterator is referencing the next + // key that would be visited by the TBI. In that case, after the following + // NextKey call, we will have iterKey == tbiKey. This means that for the + // incremental iterator to perform a Next or NextKey will require only 1 + // extra NextKey invocation while they remain in lockstep. This case will + // be common if most keys are modified, or the modifications are clustered + // in keyspace, which makes the incremental iterator optimization + // ineffective. And so in this case we want to minimize the extra cost of + // using the incremental iterator, by avoiding a SeekGE. i.timeBoundIter.NextKey() if ok, err := i.timeBoundIter.Valid(); !ok { i.err = err @@ -325,7 +325,9 @@ func (i *MVCCIncrementalIterator) maybeSkipKeys() { // same as the main iterator, we may be able to skip over a large group // of keys. The main iterator is seeked to the TBI in hopes that many // keys were skipped. Note that a Seek is an order of magnitude more - // expensive than a Next call. + // expensive than a Next call, but the engine has low-level + // optimizations that attempt to make it cheaper if the seeked key is + // "nearby" (within the same sstable block). seekKey := MakeMVCCMetadataKey(tbiKey) i.iter.SeekGE(seekKey) if !i.checkValidAndSaveErr() { @@ -349,7 +351,7 @@ func (i *MVCCIncrementalIterator) initMetaAndCheckForIntentOrInlineError() error } // The key is a metakey (an intent or inline meta). If an inline meta, we - // will error below. If an intent meta, then this is used later to see if + // will handle below. If an intent meta, then this is used later to see if // the timestamp of this intent is within the incremental iterator's time // bounds. if i.err = protoutil.Unmarshal(i.iter.UnsafeValue(), &i.meta); i.err != nil { @@ -408,10 +410,12 @@ func (i *MVCCIncrementalIterator) initMetaAndCheckForIntentOrInlineError() error // It populates i.err with an error if either of the following was encountered: // // a) an inline value when the inline policy is -// MVCCIncrementalIterInlinePolicyError; or +// +// MVCCIncrementalIterInlinePolicyError; or // // b) an intent with a timestamp within the incremental iterator's bounds when -// the intent policy is MVCCIncrementalIterIntentPolicyError. +// +// the intent policy is MVCCIncrementalIterIntentPolicyError. func (i *MVCCIncrementalIterator) advance() { for { i.maybeSkipKeys() @@ -431,6 +435,8 @@ func (i *MVCCIncrementalIterator) advance() { return } + // INVARIANT: we have an intent or an MVCC value. + if i.meta.Txn != nil { switch i.intentPolicy { case MVCCIncrementalIterIntentPolicyEmit: @@ -452,7 +458,7 @@ func (i *MVCCIncrementalIterator) advance() { // Note that MVCC keys are sorted by key, then by _descending_ timestamp // order with the exception of the metakey (timestamp 0) being sorted - // first. See mvcc.h for more information. + // first. metaTimestamp := i.meta.Timestamp.ToTimestamp() if i.endTime.Less(metaTimestamp) { i.iter.Next() @@ -528,7 +534,7 @@ func (i *MVCCIncrementalIterator) NextIgnoringTime() { continue } - // We have a valid KV or an intent to emit. + // We have a valid KV or an intent or an inline value to emit. return } } diff --git a/pkg/storage/mvcc_incremental_iterator_test.go b/pkg/storage/mvcc_incremental_iterator_test.go index dccee47d1c7c..d05051236f1c 100644 --- a/pkg/storage/mvcc_incremental_iterator_test.go +++ b/pkg/storage/mvcc_incremental_iterator_test.go @@ -519,6 +519,7 @@ func TestMVCCIncrementalIteratorInlinePolicy(t *testing.T) { keyMax = roachpb.KeyMax testKey1 = roachpb.Key("/db1") testKey2 = roachpb.Key("/db2") + testKey3 = roachpb.Key("/db3") testValue1 = []byte("val1") testValue2 = []byte("val2") @@ -537,11 +538,12 @@ func TestMVCCIncrementalIteratorInlinePolicy(t *testing.T) { inline1_1_1 := makeKVT(testKey1, testValue1, hlc.Timestamp{}) kv2_1_1 := makeKVT(testKey2, testValue1, ts1) kv2_2_2 := makeKVT(testKey2, testValue2, ts2) + inline3_2_1 := makeKVT(testKey3, testValue2, hlc.Timestamp{}) for _, engineImpl := range mvccEngineImpls { e := engineImpl.create() defer e.Close() - for _, kv := range []MVCCKeyValue{inline1_1_1, kv2_1_1, kv2_2_2} { + for _, kv := range []MVCCKeyValue{inline1_1_1, kv2_1_1, kv2_2_2, inline3_2_1} { v := roachpb.Value{RawBytes: kv.Value} if err := MVCCPut(ctx, e, nil, kv.Key.Key, kv.Key.Timestamp, v, nil); err != nil { t.Fatal(err) @@ -572,8 +574,33 @@ func TestMVCCIncrementalIteratorInlinePolicy(t *testing.T) { expectKeyValue(t, iter, kv2_2_2) iter.Next() expectKeyValue(t, iter, kv2_1_1) + iter.Next() + expectInlineKeyValue(t, iter, inline3_2_1) + iter.SeekGE(MakeMVCCMetadataKey(testKey2)) + expectKeyValue(t, iter, kv2_2_2) + iter.NextIgnoringTime() + expectKeyValue(t, iter, kv2_1_1) + iter.NextIgnoringTime() + expectInlineKeyValue(t, iter, inline3_2_1) }) + t.Run("PolicyError returns error on NextIgnoringTime if inline value is found", + func(t *testing.T) { + iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{ + EndKey: keyMax, + StartTime: tsMin, + EndTime: tsMax, + InlinePolicy: MVCCIncrementalIterInlinePolicyError, + }) + defer iter.Close() + iter.SeekGE(MakeMVCCMetadataKey(testKey2)) + expectKeyValue(t, iter, kv2_2_2) + iter.NextIgnoringTime() + expectKeyValue(t, iter, kv2_1_1) + iter.NextIgnoringTime() + _, err := iter.Valid() + assert.EqualError(t, err, "unexpected inline value found: \"/db3\"") + }) }) } } @@ -657,6 +684,16 @@ func TestMVCCIncrementalIteratorIntentPolicy(t *testing.T) { } _, err := iter.Valid() assert.EqualError(t, err, intentErr.Error()) + + iter.SeekGE(MakeMVCCMetadataKey(testKey1)) + _, err = iter.Valid() + require.NoError(t, err) + for ; ; iter.NextIgnoringTime() { + if ok, err := iter.Valid(); !ok { + assert.EqualError(t, err, intentErr.Error()) + break + } + } }) t.Run("PolicyError ignores intents outside of time range", func(t *testing.T) { iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{ @@ -679,17 +716,21 @@ func TestMVCCIncrementalIteratorIntentPolicy(t *testing.T) { EndTime: tsMax, IntentPolicy: MVCCIncrementalIterIntentPolicyEmit, }) - iter.SeekGE(MakeMVCCMetadataKey(testKey1)) - for _, kv := range []MVCCKeyValue{kv1_3_3, kv1_2_2, kv1_1_1} { - expectKeyValue(t, iter, kv) - iter.Next() + defer iter.Close() + testIterWithNextFunc := func(nextFunc func()) { + iter.SeekGE(MakeMVCCMetadataKey(testKey1)) + for _, kv := range []MVCCKeyValue{kv1_3_3, kv1_2_2, kv1_1_1} { + expectKeyValue(t, iter, kv) + nextFunc() + } + expectIntent(t, iter, intent2_2_2) + nextFunc() + expectKeyValue(t, iter, kv2_2_2) + nextFunc() + expectKeyValue(t, iter, kv2_1_1) } - expectIntent(t, iter, intent2_2_2) - iter.Next() - expectKeyValue(t, iter, kv2_2_2) - iter.Next() - expectKeyValue(t, iter, kv2_1_1) - + testIterWithNextFunc(iter.Next) + testIterWithNextFunc(iter.NextIgnoringTime) }) t.Run("PolicyEmit ignores intents outside of time range", func(t *testing.T) { iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{