Skip to content

Commit

Permalink
*: add error return values to New{MVCC,Engine}Iterator
Browse files Browse the repository at this point in the history
Now that the iterator creation return paths can return
errors from Pebble, thread that through the interface
and all callers. Mechanical but very large and widespread
change.

Epic: none

Release note: None
  • Loading branch information
itsbilal committed Aug 21, 2023
1 parent 00a4d0c commit be542c7
Show file tree
Hide file tree
Showing 72 changed files with 1,061 additions and 331 deletions.
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,9 +223,10 @@ CREATE TABLE data2.foo (a int);
store := tcRestore.GetFirstStoreFromServer(t, 0)
startKey := keys.SystemSQLCodec.TablePrefix(uint32(id))
endKey := startKey.PrefixEnd()
it := store.TODOEngine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
it, err := store.TODOEngine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
UpperBound: endKey,
})
require.NoError(t, err)
defer it.Close()
it.SeekGE(storage.MVCCKey{Key: startKey})
hasKey, err := it.Valid()
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/backupccl/restore_data_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ func slurpSSTablesLatestKey(
}

var kvs []storage.MVCCKeyValue
it := batch.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: roachpb.KeyMax})
it, err := batch.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: roachpb.KeyMax})
require.NoError(t, err)
defer it.Close()
for it.SeekGE(start); ; it.NextKey() {
if ok, err := it.Valid(); err != nil {
Expand Down
11 changes: 7 additions & 4 deletions pkg/ccl/storageccl/engineccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func loadTestData(
func runIterate(
b *testing.B,
loadFactor float32,
makeIterator func(storage.Engine, hlc.Timestamp, hlc.Timestamp) storage.MVCCIterator,
makeIterator func(storage.Engine, hlc.Timestamp, hlc.Timestamp) (storage.MVCCIterator, error),
) {
const numKeys = 100000
const numBatches = 100
Expand All @@ -149,7 +149,10 @@ func runIterate(
if endTime.IsEmpty() {
endTime = endTime.Next()
}
it := makeIterator(eng, startTime, endTime)
it, err := makeIterator(eng, startTime, endTime)
if err != nil {
b.Fatal(err)
}
defer it.Close()
for it.SeekGE(storage.MVCCKey{Key: keys.LocalMax}); ; it.Next() {
if ok, err := it.Valid(); !ok {
Expand All @@ -172,12 +175,12 @@ func BenchmarkTimeBoundIterate(b *testing.B) {
for _, loadFactor := range []float32{1.0, 0.5, 0.1, 0.05, 0.0} {
b.Run(fmt.Sprintf("LoadFactor=%.2f", loadFactor), func(b *testing.B) {
b.Run("NormalIterator", func(b *testing.B) {
runIterate(b, loadFactor, func(e storage.Engine, _, _ hlc.Timestamp) storage.MVCCIterator {
runIterate(b, loadFactor, func(e storage.Engine, _, _ hlc.Timestamp) (storage.MVCCIterator, error) {
return e.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: roachpb.KeyMax})
})
})
b.Run("TimeBoundIterator", func(b *testing.B) {
runIterate(b, loadFactor, func(e storage.Engine, startTime, endTime hlc.Timestamp) storage.MVCCIterator {
runIterate(b, loadFactor, func(e storage.Engine, startTime, endTime hlc.Timestamp) (storage.MVCCIterator, error) {
return e.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
MinTimestampHint: startTime,
MaxTimestampHint: endTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,16 @@ func assertExactlyEqualKVs(
) hlc.Timestamp {
// Iterate over the store.
store := tc.GetFirstStoreFromServer(t, 0)
it := store.TODOEngine().NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
it, err := store.TODOEngine().NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
LowerBound: tenantPrefix,
UpperBound: tenantPrefix.PrefixEnd(),
})
if err != nil {
t.Fatal(err)
}
defer it.Close()
var prevKey roachpb.Key
var valueTimestampTuples []roachpb.KeyValue
var err error
var maxKVTimestampSeen hlc.Timestamp
var matchingKVs int
for it.SeekGE(storage.MVCCKey{}); ; it.Next() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,10 +544,13 @@ func assertEqualKVs(
// Iterate over the store.
store, err := srv.GetStores().(*kvserver.Stores).GetStore(srv.GetFirstStoreID())
require.NoError(t, err)
it := store.TODOEngine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
it, err := store.TODOEngine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
LowerBound: targetSpan.Key,
UpperBound: targetSpan.EndKey,
})
if err != nil {
t.Fatal(err)
}
defer it.Close()
var prevKey roachpb.Key
var valueTimestampTuples []roachpb.KeyValue
Expand Down
5 changes: 4 additions & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -1272,10 +1272,13 @@ func runDebugIntentCount(cmd *cobra.Command, args []string) error {
}
})

iter := db.NewEngineIterator(storage.IterOptions{
iter, err := db.NewEngineIterator(storage.IterOptions{
LowerBound: keys.LockTableSingleKeyStart,
UpperBound: keys.LockTableSingleKeyEnd,
})
if err != nil {
return err
}
defer iter.Close()
seekKey := storage.EngineKey{Key: keys.LockTableSingleKeyStart}

Expand Down
11 changes: 9 additions & 2 deletions pkg/kv/kvnemesis/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ func (e *Engine) Get(key roachpb.Key, ts hlc.Timestamp) roachpb.Value {
Suffix: storage.EncodeMVCCTimestampSuffix(ts),
},
}
iter := e.kvs.NewIter(&opts)
iter, err := e.kvs.NewIter(&opts)
if err != nil {
panic(err)
}
defer func() { _ = iter.Close() }()
iter.SeekGE(storage.EncodeMVCCKey(storage.MVCCKey{Key: key, Timestamp: ts}))
for iter.Valid() {
Expand Down Expand Up @@ -126,7 +129,11 @@ func (e *Engine) DeleteRange(from, to roachpb.Key, ts hlc.Timestamp, val []byte)
func (e *Engine) Iterate(
fn func(key, endKey roachpb.Key, ts hlc.Timestamp, value []byte, err error),
) {
iter := e.kvs.NewIter(&pebble.IterOptions{KeyTypes: pebble.IterKeyTypePointsAndRanges})
iter, err := e.kvs.NewIter(&pebble.IterOptions{KeyTypes: pebble.IterKeyTypePointsAndRanges})
if err != nil {
fn(nil, nil, hlc.Timestamp{}, nil, err)
return
}
defer func() { _ = iter.Close() }()
for iter.First(); iter.Valid(); iter.Next() {
hasPoint, _ := iter.HasPointAndRange()
Expand Down
10 changes: 8 additions & 2 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1308,11 +1308,14 @@ func validReadTimes(b *pebble.Batch, key roachpb.Key, value []byte) disjointTime
var hist []storage.MVCCValue
lowerBound := storage.EncodeMVCCKey(storage.MVCCKey{Key: key})
upperBound := storage.EncodeMVCCKey(storage.MVCCKey{Key: key.Next()})
iter := b.NewIter(&pebble.IterOptions{
iter, err := b.NewIter(&pebble.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
LowerBound: lowerBound,
UpperBound: upperBound,
})
if err != nil {
panic(err)
}
defer func() { _ = iter.Close() }()

iter.SeekGE(lowerBound)
Expand Down Expand Up @@ -1436,7 +1439,10 @@ func validScanTime(b *pebble.Batch, span roachpb.Span, kvs []roachpb.KeyValue) m
// Note that this iterator ignores MVCC range deletions. We use this iterator
// only to *discover* point keys; we then invoke validReadTimes for each of
// them which *does* take into account MVCC range deletions.
iter := b.NewIter(nil)
iter, err := b.NewIter(nil)
if err != nil {
panic(err)
}
defer func() { _ = iter.Close() }()

iter.SeekGE(storage.EncodeMVCCKey(storage.MVCCKey{Key: span.Key}))
Expand Down
37 changes: 30 additions & 7 deletions pkg/kv/kvserver/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,10 @@ func TestSpanSetBatchBoundaries(t *testing.T) {
})

t.Run("forward scans", func(t *testing.T) {
iter := batch.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: roachpb.KeyMax})
iter, err := batch.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: roachpb.KeyMax})
if err != nil {
t.Fatal(err)
}
defer iter.Close()

// MVCCIterators check boundaries on seek and next/prev
Expand Down Expand Up @@ -199,7 +202,12 @@ func TestSpanSetBatchBoundaries(t *testing.T) {
}

t.Run("reverse scans", func(t *testing.T) {
iter := spanset.NewIterator(eng.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: roachpb.KeyMax}), &ss)
innerIter, err := eng.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: roachpb.KeyMax})
if err != nil {
t.Fatal(err)
}
iter := spanset.NewIterator(innerIter, &ss)

defer iter.Close()
iter.SeekLT(outsideKey4)
if _, err := iter.Valid(); !isReadSpanErr(err) {
Expand Down Expand Up @@ -383,7 +391,10 @@ func TestSpanSetIteratorTimestamps(t *testing.T) {

func() {
// When accessing at t=1, we're able to read through latches declared at t=1 and t=2.
iter := batchAt1.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: roachpb.KeyMax})
iter, err := batchAt1.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: roachpb.KeyMax})
if err != nil {
t.Fatal(err)
}
defer iter.Close()

iter.SeekGE(k1)
Expand All @@ -405,7 +416,10 @@ func TestSpanSetIteratorTimestamps(t *testing.T) {

func() {
// When accessing at t=2, we're only able to read through the latch declared at t=2.
iter := batchAt2.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: roachpb.KeyMax})
iter, err := batchAt2.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: roachpb.KeyMax})
if err != nil {
t.Fatal(err)
}
defer iter.Close()

iter.SeekGE(k1)
Expand All @@ -425,7 +439,10 @@ func TestSpanSetIteratorTimestamps(t *testing.T) {
for _, batch := range []storage.Batch{batchAt3, batchNonMVCC} {
// When accessing at t=3, we're unable to read through any of the declared latches.
// Same is true when accessing without a timestamp.
iter := batch.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: roachpb.KeyMax})
iter, err := batch.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{UpperBound: roachpb.KeyMax})
if err != nil {
t.Fatal(err)
}
defer iter.Close()

iter.SeekGE(k1)
Expand All @@ -442,7 +459,10 @@ func TestSpanSetIteratorTimestamps(t *testing.T) {
// The behavior is the same as above for an EngineIterator.
func() {
// When accessing at t=1, we're able to read through latches declared at t=1 and t=2.
iter := batchAt1.NewEngineIterator(storage.IterOptions{UpperBound: roachpb.KeyMax})
iter, err := batchAt1.NewEngineIterator(storage.IterOptions{UpperBound: roachpb.KeyMax})
if err != nil {
t.Fatal(err)
}
defer iter.Close()

if ok, err := iter.SeekEngineKeyGE(k1e); !ok {
Expand Down Expand Up @@ -470,7 +490,10 @@ func TestSpanSetIteratorTimestamps(t *testing.T) {

func() {
// When accessing at t=2, we're only able to read through the latch declared at t=2.
iter := batchAt2.NewEngineIterator(storage.IterOptions{UpperBound: roachpb.KeyMax})
iter, err := batchAt2.NewEngineIterator(storage.IterOptions{UpperBound: roachpb.KeyMax})
if err != nil {
t.Fatal(err)
}
defer iter.Close()

if ok, _ := iter.SeekEngineKeyGE(k1e); ok {
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,13 +404,16 @@ func EvalAddSSTable(
// addition, and instead just use this key-only iterator. If a caller actually
// needs to know what data is there, it must issue its own real Scan.
if args.ReturnFollowingLikelyNonEmptySpanStart {
existingIter := spanset.DisableReaderAssertions(readWriter).NewMVCCIterator(
existingIter, err := spanset.DisableReaderAssertions(readWriter).NewMVCCIterator(
storage.MVCCKeyIterKind, // don't care if it is committed or not, just that it isn't empty.
storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
UpperBound: reply.RangeSpan.EndKey,
},
)
if err != nil {
return result.Result{}, errors.Wrap(err, "error when creating iterator for non-empty span")
}
defer existingIter.Close()
existingIter.SeekGE(end)
if ok, err := existingIter.Valid(); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_clear_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,14 @@ func computeStatsDelta(
if !entireRange {
leftPeekBound, rightPeekBound := rangeTombstonePeekBounds(
from, to, desc.StartKey.AsRawKey(), desc.EndKey.AsRawKey())
rkIter := readWriter.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
rkIter, err := readWriter.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
KeyTypes: storage.IterKeyTypeRangesOnly,
LowerBound: leftPeekBound,
UpperBound: rightPeekBound,
})
if err != nil {
return enginepb.MVCCStats{}, err
}
defer rkIter.Close()

if cmp, lhs, err := storage.PeekRangeKeysLeft(rkIter, from); err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_clear_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,14 @@ func TestCmdClearRange(t *testing.T) {
require.Equal(t, tc.expClearIter, batch.clearIterCount == 1)

// Ensure that the data is gone.
iter := eng.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
iter, err := eng.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
LowerBound: startKey,
UpperBound: endKey,
})
if err != nil {
t.Fatal(err)
}
defer iter.Close()
iter.SeekGE(storage.MVCCKey{Key: keys.LocalMax})
ok, err := iter.Valid()
Expand Down
10 changes: 8 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_delete_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,11 +308,14 @@ func TestDeleteRangeTombstone(t *testing.T) {
// operated on. The command should not have written an actual rangekey!
func checkPredicateDeleteRange(t *testing.T, engine storage.Reader, rKeyInfo storage.MVCCRangeKey) {

iter := engine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
iter, err := engine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
LowerBound: rKeyInfo.StartKey,
UpperBound: rKeyInfo.EndKey,
})
if err != nil {
t.Fatal(err)
}
defer iter.Close()

for iter.SeekGE(storage.MVCCKey{Key: rKeyInfo.StartKey}); ; iter.NextKey() {
Expand Down Expand Up @@ -345,11 +348,14 @@ func checkDeleteRangeTombstone(
written bool,
now hlc.ClockTimestamp,
) {
iter := engine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
iter, err := engine.NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
KeyTypes: storage.IterKeyTypeRangesOnly,
LowerBound: rangeKey.StartKey,
UpperBound: rangeKey.EndKey,
})
if err != nil {
t.Fatal(err)
}
defer iter.Close()
iter.SeekGE(storage.MVCCKey{Key: rangeKey.StartKey})

Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1410,11 +1410,14 @@ func computeSplitRangeKeyStatsDelta(
splitKey.Prevish(roachpb.PrevishKeyLength), splitKey.Next(),
lhs.StartKey.AsRawKey(), rhs.EndKey.AsRawKey())

iter := r.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
iter, err := r.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
KeyTypes: storage.IterKeyTypeRangesOnly,
LowerBound: leftPeekBound,
UpperBound: rightPeekBound,
})
if err != nil {
return ms, err
}
defer iter.Close()

if cmp, rangeKeys, err := storage.PeekRangeKeysRight(iter, splitKey); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func exportUsingGoIterator(
return nil, nil
}

iter := storage.NewMVCCIncrementalIterator(reader, storage.MVCCIncrementalIterOptions{
iter := storage.NewMVCCIncrementalIterator(reader.(storage.ReaderWithMustIterators), storage.MVCCIncrementalIterOptions{
EndKey: endKey,
StartTime: startTime,
EndTime: endTime,
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_query_resolved_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ func computeMinIntentTimestamp(
) (hlc.Timestamp, []roachpb.Intent, error) {
ltStart, _ := keys.LockTableSingleKey(span.Key, nil)
ltEnd, _ := keys.LockTableSingleKey(span.EndKey, nil)
iter := reader.NewEngineIterator(storage.IterOptions{LowerBound: ltStart, UpperBound: ltEnd})
iter, err := reader.NewEngineIterator(storage.IterOptions{LowerBound: ltStart, UpperBound: ltEnd})
if err != nil {
return hlc.Timestamp{}, nil, err
}
defer iter.Close()

var meta enginepb.MVCCMetadata
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/batcheval/cmd_refresh_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func refreshRange(
// Construct an incremental iterator with the desired time bounds. Incremental
// iterators will emit MVCC tombstones by default and will emit intents when
// configured to do so (see IntentPolicy).
iter := storage.NewMVCCIncrementalIterator(reader, storage.MVCCIncrementalIterOptions{
iter := storage.NewMVCCIncrementalIterator(reader.(storage.ReaderWithMustIterators), storage.MVCCIncrementalIterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
StartKey: span.Key,
EndKey: span.EndKey,
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_revert_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,16 @@ func isEmptyKeyTimeRange(
// may not be in the time range but the fact the TBI found any key indicates
// that there is *a* key in the SST that is in the time range. Thus we should
// proceed to iteration that actually checks timestamps on each key.
iter := readWriter.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
iter, err := readWriter.NewMVCCIterator(storage.MVCCKeyIterKind, storage.IterOptions{
KeyTypes: storage.IterKeyTypePointsAndRanges,
LowerBound: from,
UpperBound: to,
MinTimestampHint: since.Next(), // make exclusive
MaxTimestampHint: until,
})
if err != nil {
return false, err
}
defer iter.Close()
iter.SeekGE(storage.MVCCKey{Key: from})
ok, err := iter.Valid()
Expand Down
Loading

0 comments on commit be542c7

Please sign in to comment.