diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index d95ef1b9bb38..6d04f77801ee 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -268,7 +268,7 @@ func inputReader( type mergedSST struct { entry execinfrapb.RestoreSpanEntry - iter storage.SimpleMVCCIterator + iter *storage.ReadAsOfIterator cleanup func() } @@ -300,8 +300,10 @@ func (rd *restoreDataProcessor) openSSTs( // channel. sendIters := func(itersToSend []storage.SimpleMVCCIterator, dirsToSend []cloud.ExternalStorage) error { multiIter := storage.MakeMultiIterator(itersToSend) + readAsOfIter := storage.MakeReadAsOfIterator(multiIter, rd.spec.RestoreTime) cleanup := func() { + readAsOfIter.Close() multiIter.Close() for _, iter := range itersToSend { iter.Close() @@ -316,7 +318,7 @@ func (rd *restoreDataProcessor) openSSTs( mSST := mergedSST{ entry: entry, - iter: multiIter, + iter: readAsOfIter, cleanup: cleanup, } @@ -403,6 +405,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( entry := sst.entry iter := sst.iter + defer sst.cleanup() writeAtBatchTS := restoreAtNow.Get(&evalCtx.Settings.SV) @@ -449,38 +452,20 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( startKeyMVCC, endKeyMVCC := storage.MVCCKey{Key: entry.Span.Key}, storage.MVCCKey{Key: entry.Span.EndKey} - for iter.SeekGE(startKeyMVCC); ; { + for iter.SeekGE(startKeyMVCC); ; iter.NextKey() { ok, err := iter.Valid() if err != nil { return summary, err } - if !ok { - break - } - - if !rd.spec.RestoreTime.IsEmpty() { - // TODO(dan): If we have to skip past a lot of versions to find the - // latest one before args.EndTime, then this could be slow. - if rd.spec.RestoreTime.Less(iter.UnsafeKey().Timestamp) { - iter.Next() - continue - } - } if !ok || !iter.UnsafeKey().Less(endKeyMVCC) { break } - if len(iter.UnsafeValue()) == 0 { - // Value is deleted. - iter.NextKey() - continue - } keyScratch = append(keyScratch[:0], iter.UnsafeKey().Key...) valueScratch = append(valueScratch[:0], iter.UnsafeValue()...) key := storage.MVCCKey{Key: keyScratch, Timestamp: iter.UnsafeKey().Timestamp} value := roachpb.Value{RawBytes: valueScratch} - iter.NextKey() key.Key, ok, err = kr.RewriteKey(key.Key) if err != nil { diff --git a/pkg/storage/BUILD.bazel b/pkg/storage/BUILD.bazel index a92b50a04867..05f6783b463e 100644 --- a/pkg/storage/BUILD.bazel +++ b/pkg/storage/BUILD.bazel @@ -28,6 +28,7 @@ go_library( "pebble_iterator.go", "pebble_merge.go", "pebble_mvcc_scanner.go", + "read_as_of_iterator.go", "replicas_storage.go", "resource_limiter.go", "row_counter.go", diff --git a/pkg/storage/read_as_of_iterator.go b/pkg/storage/read_as_of_iterator.go new file mode 100644 index 000000000000..dbaa69923d71 --- /dev/null +++ b/pkg/storage/read_as_of_iterator.go @@ -0,0 +1,121 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package storage + +import "github.com/cockroachdb/cockroach/pkg/util/hlc" + +// ReadAsOfIterator wraps an iterator that implements the SimpleMVCCIterator +// such that all MVCC keys returned are not delete tombstones and will have +// timestamps less than the endtime timestamp, unless that key is invalid. +type ReadAsOfIterator struct { + iter SimpleMVCCIterator + + // endtime is the latest timestamp an MVCC key will have that the + // ReadAsOfIterator will return + endTime hlc.Timestamp + + // valid is the latest boolean value returned by Valid() + valid bool +} + +var _ SimpleMVCCIterator = &ReadAsOfIterator{} + +// Close no-ops. The caller is responsible for closing the underlying iterator. +func (f *ReadAsOfIterator) Close() { +} + +// SeekGE advances the iterator to the first key in the engine which is >= the +// provided key and less than f.endtime. +// +// If Valid() is false after SeekGE() returns, the iterator may be at an invalid +// key with a timestamp greater than or equal to f.endtime. +func (f *ReadAsOfIterator) SeekGE(key MVCCKey) { + f.iter.SeekGE(key) + f.advance() +} + +// Valid must be called after any call to Seek(), Next(), or similar methods. It +// returns (true, nil) if the iterator points to a valid key (it is undefined to +// call UnsafeKey(), UnsafeValue(), or similar methods unless Valid() has +// returned (true, nil)). It returns (false, nil) if the iterator has moved past +// the end of the valid range, or (false, err) if an error has occurred. Valid() +// will never return true with a non-nil error. +func (f *ReadAsOfIterator) Valid() (bool, error) { + var err error + f.valid, err = f.iter.Valid() + return f.valid, err +} + +// Next advances the iterator to the next valid key/value in the iteration that +// has a timestamp less than f.endtime. If Valid() is false after NextKey() +// returns, the iterator may be at an invalid key with a timestamp greater than +// or equal to f.endtime. +func (f *ReadAsOfIterator) Next() { + f.iter.Next() + f.advance() +} + +// NextKey advances the iterator to the next valid MVCC key that has a timestamp +// less than f.endtime. If Valid() is false after NextKey() returns, the +// iterator may be at an invalid key with a timestamp greater than or equal to +// f.endtime. +// +// This operation is distinct from Next which advances to the next version of +// the current key or the next key if the iterator is currently located at the +// last version for a key. +func (f *ReadAsOfIterator) NextKey() { + f.iter.NextKey() + f.advance() +} + +// UnsafeKey returns the current key, but the memory is invalidated on the next +// call to {NextKey,Seek}. +func (f *ReadAsOfIterator) UnsafeKey() MVCCKey { + return f.iter.UnsafeKey() +} + +// UnsafeValue returns the current value as a byte slice, but the memory is +// invalidated on the next call to {NextKey,Seek}. +func (f *ReadAsOfIterator) UnsafeValue() []byte { + return f.iter.UnsafeValue() +} + +// advance checks that the current key is valid, and if it is, advances past +// keys with timestamps later than the f.endTime and keys whose value has +// been deleted. +func (f *ReadAsOfIterator) advance() { + if _, _ = f.Valid(); !f.valid { + return + } + if !f.endTime.IsEmpty() { + for f.endTime.Less(f.iter.UnsafeKey().Timestamp) { + f.iter.Next() + if _, _ = f.Valid(); !f.valid { + return + } + } + } + for len(f.iter.UnsafeValue()) == 0 { + f.iter.NextKey() + if _, _ = f.Valid(); !f.valid { + return + } + } +} + +// MakeReadAsOfIterator constructs a ReadAsOfIterator which wraps a +// SimpleMVCCIterator such that all MVCC keys returned will not be tombstones +// and will have timestamps less than the endtime timestamp, unless that key is +// invalid. The caller is responsible for closing the underlying +// SimpleMVCCIterator. +func MakeReadAsOfIterator(iter SimpleMVCCIterator, endtime hlc.Timestamp) *ReadAsOfIterator { + return &ReadAsOfIterator{iter: iter, endTime: endtime} +} diff --git a/pkg/storage/sst_iterator_test.go b/pkg/storage/sst_iterator_test.go index 69870d293c10..83b17fa20127 100644 --- a/pkg/storage/sst_iterator_test.go +++ b/pkg/storage/sst_iterator_test.go @@ -83,7 +83,8 @@ func TestSSTIterator(t *testing.T) { sst := MakeIngestionSSTWriter(ctx, st, sstFile) defer sst.Close() var allKVs []MVCCKeyValue - for i := 0; i < 10; i++ { + maxWallTime := 10 + for i := 0; i < maxWallTime; i++ { kv := MVCCKeyValue{ Key: MVCCKey{ Key: []byte{'A' + byte(i)}, @@ -129,4 +130,26 @@ func TestSSTIterator(t *testing.T) { defer iter.Close() runTestSSTIterator(t, iter, allKVs) }) + t.Run("AsOf", func(t *testing.T) { + iter, err := NewMemSSTIterator(sstFile.Data(), false) + if err != nil { + t.Fatalf("%+v", err) + } + defer iter.Close() + asOfTimes := []hlc.Timestamp{ + {WallTime: int64(maxWallTime / 2)}, + {WallTime: int64(maxWallTime)}, + {}} + for _, asOf := range asOfTimes { + var asOfKVs []MVCCKeyValue + for _, kv := range allKVs { + if !asOf.IsEmpty() && asOf.Less(kv.Key.Timestamp) { + continue + } + asOfKVs = append(asOfKVs, kv) + } + asOfIter := MakeReadAsOfIterator(iter, asOf) + runTestSSTIterator(t, asOfIter, asOfKVs) + } + }) }