From d00178bff817e24522186f1eea068666919113a0 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Tue, 19 Sep 2023 15:01:50 -0400 Subject: [PATCH] kvserver,storage: ingest small snapshot as writes Small snapshots cause LSM overload by resulting in many tiny memtable flushes, which result in high sub-level count, which then needs to be compensated by running many inefficient compactions from L0 to Lbase. Despite some compaction scoring changes, we have not been able to fully eliminate impact of this in foreground traffic as discussed in https://github.com/cockroachdb/pebble/issues/2832#issuecomment-1722599653. Fixes #109808 Epic: none Release note (ops change): The cluster setting kv.snapshot.ingest_as_write_threshold controls the size threshold below which snapshots are converted to regular writes. It defaults to 100KiB. --- pkg/kv/kvserver/replica_raftstorage.go | 119 ++++++++++++++++++------- pkg/kv/kvserver/store_snapshot.go | 9 +- pkg/storage/engine.go | 17 +++- pkg/storage/pebble.go | 89 ++++++++++++++++++ pkg/storage/pebble_test.go | 114 ++++++++++++++++++++++- pkg/storage/sst.go | 7 ++ 6 files changed, 316 insertions(+), 39 deletions(-) diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index 4a05088e7707..0aac16f31df7 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" @@ -40,6 +41,19 @@ import ( "go.etcd.io/raft/v3/raftpb" ) +var snapshotIngestAsWriteThreshold = settings.RegisterByteSizeSetting( + settings.SystemOnly, + "kv.snapshot.ingest_as_write_threshold", + "size below which a range snapshot ingestion will be performed as a normal write", + func() int64 { + return int64(util.ConstantWithMetamorphicTestChoice( + "kv.snapshot.ingest_as_write_threshold", + 100<<10, /* default value is 100KiB */ + 1<<30, /* 1GiB causes everything to be a normal write */ + 0, /* 0B causes everything to be an ingest */ + ).(int)) + }()) + // replicaRaftStorage implements the raft.Storage interface. type replicaRaftStorage Replica @@ -337,14 +351,21 @@ type IncomingSnapshot struct { SSTStorageScratch *SSTSnapshotStorageScratch FromReplica roachpb.ReplicaDescriptor // The descriptor in the snapshot, never nil. - Desc *roachpb.RangeDescriptor - DataSize int64 + Desc *roachpb.RangeDescriptor + // Size of the key-value pairs. + DataSize int64 + // Size of the ssts containing these key-value pairs. + SSTSize int64 SharedSize int64 placeholder *ReplicaPlaceholder raftAppliedIndex kvpb.RaftIndex // logging only msgAppRespCh chan raftpb.Message // receives MsgAppResp if/when snap is applied sharedSSTs []pebble.SharedSSTMeta doExcise bool + // clearedSpans represents the key spans in the existing store that will be + // cleared by doing the Ingest*. This is tracked so that we can convert the + // ssts into a WriteBatch if the total size of the ssts is small. + clearedSpans []roachpb.Span } func (s IncomingSnapshot) String() string { @@ -534,6 +555,7 @@ func (r *Replica) applySnapshot( ingestion time.Time } log.KvDistribution.Infof(ctx, "applying %s", inSnap) + appliedAsWrite := false defer func(start time.Time) { var logDetails redact.StringBuilder logDetails.Printf("total=%0.0fms", timeutil.Since(start).Seconds()*1000) @@ -550,21 +572,25 @@ func (r *Replica) applySnapshot( } logDetails.Printf(" ingestion=%d@%0.0fms", len(inSnap.SSTStorageScratch.SSTs()), stats.ingestion.Sub(stats.subsumedReplicas).Seconds()*1000) - log.Infof(ctx, "applied %s (%s)", inSnap, logDetails) + var appliedAsWriteStr string + if appliedAsWrite { + appliedAsWriteStr = "as write " + } + log.Infof(ctx, "applied %s %s(%s)", inSnap, appliedAsWriteStr, logDetails) }(timeutil.Now()) - unreplicatedSSTFile, nonempty, err := writeUnreplicatedSST( + clearedSpans := inSnap.clearedSpans + unreplicatedSSTFile, clearedSpan, err := writeUnreplicatedSST( ctx, r.ID(), r.ClusterSettings(), nonemptySnap.Metadata, hs, &r.raftMu.stateLoader.StateLoader, ) if err != nil { return err } - if nonempty { - // TODO(itsbilal): Write to SST directly in unreplicatedSST rather than - // buffering in a MemObject first. - if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil { - return err - } + clearedSpans = append(clearedSpans, clearedSpan) + // TODO(itsbilal): Write to SST directly in unreplicatedSST rather than + // buffering in a MemObject first. + if err := inSnap.SSTStorageScratch.WriteSST(ctx, unreplicatedSSTFile.Data()); err != nil { + return err } // Update Raft entries. r.store.raftEntryCache.Drop(r.RangeID) @@ -596,13 +622,15 @@ func (r *Replica) applySnapshot( // problematic, as it would prevent this store from ever having a new replica // of the removed range. In this case, however, it's copacetic, as subsumed // ranges _can't_ have new replicas. - if err := clearSubsumedReplicaDiskData( + clearedSubsumedSpans, err := clearSubsumedReplicaDiskData( // TODO(sep-raft-log): needs access to both engines. ctx, r.store.ClusterSettings(), r.store.TODOEngine(), inSnap.SSTStorageScratch.WriteSST, desc, subsumedDescs, mergedTombstoneReplicaID, - ); err != nil { + ) + if err != nil { return err } + clearedSpans = append(clearedSpans, clearedSubsumedSpans...) stats.subsumedReplicas = timeutil.Now() // Ingest all SSTs atomically. @@ -612,6 +640,7 @@ func (r *Replica) applySnapshot( } } var ingestStats pebble.IngestOperationStats + var writeBytes uint64 // TODO: separate ingestions for log and statemachine engine. See: // // https://github.com/cockroachdb/cockroach/issues/93251 @@ -622,13 +651,30 @@ func (r *Replica) applySnapshot( return errors.Wrapf(err, "while ingesting %s and excising %s-%s", inSnap.SSTStorageScratch.SSTs(), exciseSpan.Key, exciseSpan.EndKey) } } else { - if ingestStats, err = - r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil { - return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs()) + if inSnap.SSTSize > snapshotIngestAsWriteThreshold.Get(&r.ClusterSettings().SV) { + if ingestStats, err = + r.store.TODOEngine().IngestLocalFilesWithStats(ctx, inSnap.SSTStorageScratch.SSTs()); err != nil { + return errors.Wrapf(err, "while ingesting %s", inSnap.SSTStorageScratch.SSTs()) + } + } else { + appliedAsWrite = true + err := r.store.TODOEngine().ConvertFilesToBatchAndCommit( + ctx, inSnap.SSTStorageScratch.SSTs(), clearedSpans) + if err != nil { + return errors.Wrapf(err, "while applying as batch %s", inSnap.SSTStorageScratch.SSTs()) + } + // Admission control wants the writeBytes to be roughly equivalent to + // the bytes in the SST when these writes are eventually flushed. We use + // the SST size of the incoming snapshot as that approximation. We've + // written additional SSTs to clear some data earlier in this method, + // but we ignore those since the bulk of the data is in the incoming + // snapshot. + writeBytes = uint64(inSnap.SSTSize) } } if r.store.cfg.KVAdmissionController != nil { - r.store.cfg.KVAdmissionController.SnapshotIngestedOrWritten(r.store.StoreID(), ingestStats, 0) + r.store.cfg.KVAdmissionController.SnapshotIngestedOrWritten( + r.store.StoreID(), ingestStats, writeBytes) } stats.ingestion = timeutil.Now() @@ -779,9 +825,6 @@ func (r *Replica) applySnapshot( // covers the RangeID-unreplicated keyspace. A range tombstone is // laid down and the Raft state provided by the arguments is overlaid // onto it. -// -// TODO(sep-raft-log): when is `nonempty` ever false? We always -// perform a number of writes to this SST. func writeUnreplicatedSST( ctx context.Context, id storage.FullReplicaID, @@ -789,7 +832,7 @@ func writeUnreplicatedSST( meta raftpb.SnapshotMetadata, hs raftpb.HardState, sl *logstore.StateLoader, -) (_ *storage.MemObject, nonempty bool, _ error) { +) (_ *storage.MemObject, clearedSpan roachpb.Span, _ error) { unreplicatedSSTFile := &storage.MemObject{} unreplicatedSST := storage.MakeIngestionSSTWriter( ctx, st, unreplicatedSSTFile, @@ -803,21 +846,22 @@ func writeUnreplicatedSST( unreplicatedPrefixKey := keys.MakeRangeIDUnreplicatedPrefix(id.RangeID) unreplicatedStart := unreplicatedPrefixKey unreplicatedEnd := unreplicatedPrefixKey.PrefixEnd() + clearedSpan = roachpb.Span{Key: unreplicatedStart, EndKey: unreplicatedEnd} if err := unreplicatedSST.ClearRawRange( unreplicatedStart, unreplicatedEnd, true /* pointKeys */, false, /* rangeKeys */ ); err != nil { - return nil, false, errors.Wrapf(err, "error clearing range of unreplicated SST writer") + return nil, roachpb.Span{}, errors.Wrapf(err, "error clearing range of unreplicated SST writer") } // Update HardState. if err := sl.SetHardState(ctx, &unreplicatedSST, hs); err != nil { - return nil, false, errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") + return nil, roachpb.Span{}, errors.Wrapf(err, "unable to write HardState to unreplicated SST writer") } // We've cleared all the raft state above, so we are forced to write the // RaftReplicaID again here. if err := sl.SetRaftReplicaID( ctx, &unreplicatedSST, id.ReplicaID); err != nil { - return nil, false, errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer") + return nil, roachpb.Span{}, errors.Wrapf(err, "unable to write RaftReplicaID to unreplicated SST writer") } if err := sl.SetRaftTruncatedState( @@ -827,13 +871,13 @@ func writeUnreplicatedSST( Term: kvpb.RaftTerm(meta.Term), }, ); err != nil { - return nil, false, errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer") + return nil, roachpb.Span{}, errors.Wrapf(err, "unable to write TruncatedState to unreplicated SST writer") } if err := unreplicatedSST.Finish(); err != nil { - return nil, false, err + return nil, roachpb.Span{}, err } - return unreplicatedSSTFile, unreplicatedSST.DataSize > 0, nil + return unreplicatedSSTFile, clearedSpan, nil } // clearSubsumedReplicaDiskData clears the on disk data of the subsumed @@ -851,7 +895,7 @@ func clearSubsumedReplicaDiskData( desc *roachpb.RangeDescriptor, subsumedDescs []*roachpb.RangeDescriptor, subsumedNextReplicaID roachpb.ReplicaID, -) error { +) (clearedSpans []roachpb.Span, _ error) { // NB: we don't clear RangeID local key spans here. That happens // via the call to preDestroyRaftMuLocked. getKeySpans := func(d *roachpb.RangeDescriptor) []roachpb.Span { @@ -876,18 +920,23 @@ func clearSubsumedReplicaDiskData( ClearUnreplicatedByRangeID: true, MustUseClearRange: true, } + subsumedClearedSpans := rditer.Select(subDesc.RangeID, rditer.SelectOpts{ + ReplicatedByRangeID: opts.ClearReplicatedByRangeID, + UnreplicatedByRangeID: opts.ClearUnreplicatedByRangeID, + }) + clearedSpans = append(clearedSpans, subsumedClearedSpans...) if err := kvstorage.DestroyReplica(ctx, subDesc.RangeID, reader, &subsumedReplSST, subsumedNextReplicaID, opts); err != nil { subsumedReplSST.Close() - return err + return nil, err } if err := subsumedReplSST.Finish(); err != nil { - return err + return nil, err } if subsumedReplSST.DataSize > 0 { // TODO(itsbilal): Write to SST directly in subsumedReplSST rather than // buffering in a MemObject first. if err := writeSST(ctx, subsumedReplSSTFile.Data()); err != nil { - return err + return nil, err } } @@ -933,16 +982,18 @@ func clearSubsumedReplicaDiskData( kvstorage.ClearRangeThresholdRangeKeys, ); err != nil { subsumedReplSST.Close() - return err + return nil, err } + clearedSpans = append(clearedSpans, + roachpb.Span{Key: keySpans[i].EndKey, EndKey: totalKeySpans[i].EndKey}) if err := subsumedReplSST.Finish(); err != nil { - return err + return nil, err } if subsumedReplSST.DataSize > 0 { // TODO(itsbilal): Write to SST directly in subsumedReplSST rather than // buffering in a MemObject first. if err := writeSST(ctx, subsumedReplSSTFile.Data()); err != nil { - return err + return nil, err } } } @@ -957,7 +1008,7 @@ func clearSubsumedReplicaDiskData( keySpans[i], totalKeySpans[i]) } } - return nil + return clearedSpans, nil } // clearSubsumedReplicaInMemoryData clears the in-memory data of the subsumed diff --git a/pkg/kv/kvserver/store_snapshot.go b/pkg/kv/kvserver/store_snapshot.go index 0fd62b969c53..ac5c74bb82c4 100644 --- a/pkg/kv/kvserver/store_snapshot.go +++ b/pkg/kv/kvserver/store_snapshot.go @@ -141,8 +141,11 @@ type multiSSTWriter struct { // The approximate size of the SST chunk to buffer in memory on the receiver // before flushing to disk. sstChunkSize int64 - // The total size of SST data. Updated on SST finalization. + // The total size of the key and value pairs (not the total size of the + // SSTs). Updated on SST finalization. dataSize int64 + // The total size of the SSTs. + sstSize int64 // if skipRangeDelForLastSpan is true, the last span is not ClearRanged in the // same sstable. We rely on the caller to take care of clearing this span // through a different process (eg. IngestAndExcise on pebble). @@ -198,6 +201,7 @@ func (msstw *multiSSTWriter) finalizeSST(ctx context.Context) error { return errors.Wrap(err, "failed to finish sst") } msstw.dataSize += msstw.currSST.DataSize + msstw.sstSize += int64(msstw.currSST.Meta.Size) msstw.currSpan++ msstw.currSST.Close() return nil @@ -654,6 +658,7 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( // the data. timingTag.start("sst") dataSize, err := msstw.Finish(ctx) + sstSize := msstw.sstSize if err != nil { return noSnap, errors.Wrapf(err, "finishing sst for raft snapshot") } @@ -677,11 +682,13 @@ func (kvSS *kvBatchSnapshotStrategy) Receive( FromReplica: header.RaftMessageRequest.FromReplica, Desc: header.State.Desc, DataSize: dataSize, + SSTSize: sstSize, SharedSize: sharedSize, raftAppliedIndex: header.State.RaftAppliedIndex, msgAppRespCh: make(chan raftpb.Message, 1), sharedSSTs: sharedSSTs, doExcise: doExcise, + clearedSpans: keyRanges, } timingTag.stop("totalTime") diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index c166bf0ecbe3..dfb4ff91f97a 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -1020,6 +1020,7 @@ type Engine interface { // files. These files can be referred to by multiple stores, but are not // modified or deleted by the Engine doing the ingestion. IngestExternalFiles(ctx context.Context, external []pebble.ExternalFile) (pebble.IngestOperationStats, error) + // PreIngestDelay offers an engine the chance to backpressure ingestions. // When called, it may choose to block if the engine determines that it is in // or approaching a state where further ingestions may risk its health. @@ -1028,7 +1029,21 @@ type Engine interface { // counts for the given key span, along with how many of those bytes are on // remote, as well as specifically external remote, storage. ApproximateDiskBytes(from, to roachpb.Key) (total, remote, external uint64, _ error) - + // ConvertFilesToBatchAndCommit converts local files with the given paths to + // a WriteBatch and commits the batch with sync=true. The files represented + // in paths must not be overlapping -- this is the same contract as + // IngestLocalFiles*. Additionally, clearedSpans represents the spans which + // must be deleted before writing the data contained in these paths. + // + // This method is expected to be used instead of IngestLocalFiles* or + // IngestAndExciseFiles when the sum of the file sizes is small. + // + // TODO(sumeer): support this as an alternative to IngestAndExciseFiles. + // This should be easy since we use NewSSTEngineIterator to read the ssts, + // which supports multiple levels. + ConvertFilesToBatchAndCommit( + ctx context.Context, paths []string, clearedSpans []roachpb.Span, + ) error // CompactRange ensures that the specified range of key value pairs is // optimized for space efficiency. CompactRange(start, end roachpb.Key) error diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 2e4d1be8c05d..9d3569da3905 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -2402,6 +2402,95 @@ func (p *Pebble) BufferedSize() int { return 0 } +// ConvertFilesToBatchAndCommit implements the Engine interface. +func (p *Pebble) ConvertFilesToBatchAndCommit( + _ context.Context, paths []string, clearedSpans []roachpb.Span, +) error { + files := make([]sstable.ReadableFile, len(paths)) + closeFiles := func() { + for i := range files { + if files[i] != nil { + files[i].Close() + } + } + } + for i, fileName := range paths { + f, err := p.FS.Open(fileName) + if err != nil { + closeFiles() + return err + } + files[i] = f + } + iter, err := NewSSTEngineIterator( + [][]sstable.ReadableFile{files}, + IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + LowerBound: roachpb.KeyMin, + UpperBound: roachpb.KeyMax, + }, true) + if err != nil { + // TODO(sumeer): we don't call closeFiles() since in the error case some + // of the files may be closed. See the code in + // https://github.com/cockroachdb/pebble/blob/master/external_iterator.go#L104-L113 + // which closes the opened readers. At this point in the code we don't + // know which files are already closed. The callee needs to be fixed to + // not close any of the files or close all the files in the error case. + // The natural behavior would be to not close any file. Fix this in + // Pebble, and then adjust the code here if needed. + return err + } + defer iter.Close() + + batch := p.NewWriteBatch() + for i := range clearedSpans { + err := + batch.ClearRawRange(clearedSpans[i].Key, clearedSpans[i].EndKey, true, true) + if err != nil { + return err + } + } + valid, err := iter.SeekEngineKeyGE(EngineKey{Key: roachpb.KeyMin}) + for valid { + hasPoint, hasRange := iter.HasPointAndRange() + if hasPoint { + var k EngineKey + if k, err = iter.UnsafeEngineKey(); err != nil { + break + } + var v []byte + if v, err = iter.UnsafeValue(); err != nil { + break + } + if err = batch.PutEngineKey(k, v); err != nil { + break + } + } + if hasRange && iter.RangeKeyChanged() { + var rangeBounds roachpb.Span + if rangeBounds, err = iter.EngineRangeBounds(); err != nil { + break + } + rangeKeys := iter.EngineRangeKeys() + for i := range rangeKeys { + if err = batch.PutEngineRangeKey(rangeBounds.Key, rangeBounds.EndKey, rangeKeys[i].Version, + rangeKeys[i].Value); err != nil { + break + } + } + if err != nil { + break + } + } + valid, err = iter.NextEngineKey() + } + if err != nil { + batch.Close() + return err + } + return batch.Commit(true) +} + type pebbleReadOnly struct { parent *Pebble // The iterator reuse optimization in pebbleReadOnly is for servicing a diff --git a/pkg/storage/pebble_test.go b/pkg/storage/pebble_test.go index 1edebb750039..e871033c0457 100644 --- a/pkg/storage/pebble_test.go +++ b/pkg/storage/pebble_test.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" + "github.com/cockroachdb/pebble/objstorage/objstorageprovider" "github.com/cockroachdb/pebble/vfs" "github.com/stretchr/testify/require" ) @@ -1382,7 +1383,7 @@ func TestApproximateDiskBytes(t *testing.T) { require.NoError(t, err) defer p.Close() - key := func(i int) roachpb.Key { + keyFunc := func(i int) roachpb.Key { return keys.SystemSQLCodec.TablePrefix(uint32(i)) } @@ -1390,7 +1391,7 @@ func TestApproximateDiskBytes(t *testing.T) { b := p.NewWriteBatch() for i := 0; i < 1000; i++ { require.NoError(t, b.PutMVCC( - MVCCKey{Key: key(i), Timestamp: hlc.Timestamp{WallTime: int64(i + 1)}}, + MVCCKey{Key: keyFunc(i), Timestamp: hlc.Timestamp{WallTime: int64(i + 1)}}, MVCCValue{Value: roachpb.Value{RawBytes: randutil.RandBytes(rng, 100)}}, )) } @@ -1406,9 +1407,116 @@ func TestApproximateDiskBytes(t *testing.T) { all := approxBytes(roachpb.Span{Key: roachpb.KeyMin, EndKey: roachpb.KeyMax}) for i := 0; i < 1000; i++ { - s := roachpb.Span{Key: key(i), EndKey: key(i + 1)} + s := roachpb.Span{Key: keyFunc(i), EndKey: keyFunc(i + 1)} if v := approxBytes(s); v >= all { t.Errorf("ApproximateDiskBytes(%q) = %d >= entire DB size %d", s, v, all) } } } + +func TestConvertFilesToBatchAndCommit(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + + const ingestEngine = 0 + const batchEngine = 1 + var engs [batchEngine + 1]Engine + mem := vfs.NewMem() + for i := range engs { + engs[i] = InMemFromFS(ctx, mem, fmt.Sprintf("eng-%d", i), st) + defer engs[i].Close() + } + // Populate points that will have MVCC value and an intent. + points := []testValue{ + intent(key(1), "value1", ts(1000)), + value(key(2), "value2", ts(1000)), + intent(key(2), "value3", ts(2000)), + intent(key(5), "value4", ts(1500)), + intent(key(6), "value4", ts(2500)), + } + for i := range engs { + // Put points + require.NoError(t, fillInData(ctx, engs[i], points)) + // Put wide range keys that will be partially cleared. + require.NoError(t, engs[i].PutMVCCRangeKey(MVCCRangeKey{ + StartKey: key(1), + EndKey: key(7), + Timestamp: ts(2300), + }, MVCCValue{})) + require.NoError(t, engs[i].PutMVCCRangeKey(MVCCRangeKey{ + StartKey: key(0), + EndKey: key(8), + Timestamp: ts(2700), + }, MVCCValue{})) + } + // Ingest into [2, 6) with 2 files. + fileName1 := "file1" + f1, err := mem.Create(fileName1) + require.NoError(t, err) + w1 := MakeIngestionSSTWriter(ctx, st, objstorageprovider.NewFileWritable(f1)) + startKey := key(2) + endKey := key(6) + lkStart, _ := keys.LockTableSingleKey(startKey, nil) + lkEnd, _ := keys.LockTableSingleKey(endKey, nil) + require.NoError(t, w1.ClearRawRange(lkStart, lkEnd, true, true)) + // Not a real lock table key, since lacks a version, but it doesn't matter + // for this test. We can't use MVCCPut since the intent and the + // corresponding provisional value belong in different ssts. + lk3, _ := keys.LockTableSingleKey(key(3), nil) + require.NoError(t, w1.PutEngineKey(EngineKey{Key: lk3}, []byte(""))) + require.NoError(t, w1.Finish()) + w1.Close() + + fileName2 := "file2" + f2, err := mem.Create(fileName2) + require.NoError(t, err) + w2 := MakeIngestionSSTWriter(ctx, st, objstorageprovider.NewFileWritable(f2)) + require.NoError(t, w2.ClearRawRange(startKey, endKey, true, true)) + val := roachpb.MakeValueFromString("value5") + val.InitChecksum(key(3)) + require.NoError(t, w2.PutMVCC( + MVCCKey{Key: key(3), Timestamp: ts(2800)}, MVCCValue{Value: val})) + require.NoError(t, w2.Finish()) + w2.Close() + + require.NoError(t, engs[batchEngine].ConvertFilesToBatchAndCommit( + ctx, []string{fileName1, fileName2}, []roachpb.Span{ + {Key: lkStart, EndKey: lkEnd}, {Key: startKey, EndKey: endKey}, + })) + require.NoError(t, engs[ingestEngine].IngestLocalFiles(ctx, []string{fileName1, fileName2})) + outputState := func(eng Engine) []string { + it, err := eng.NewEngineIterator(IterOptions{ + UpperBound: roachpb.KeyMax, + KeyTypes: IterKeyTypePointsAndRanges, + }) + require.NoError(t, err) + defer it.Close() + var state []string + valid, err := it.SeekEngineKeyGE(EngineKey{Key: roachpb.KeyMin}) + for valid { + hasPoint, hasRange := it.HasPointAndRange() + if hasPoint { + k, err := it.EngineKey() + require.NoError(t, err) + v, err := it.UnsafeValue() + require.NoError(t, err) + state = append(state, fmt.Sprintf("point %s = %x\n", k, v)) + } + if hasRange { + k, err := it.EngineRangeBounds() + require.NoError(t, err) + v := it.EngineRangeKeys() + for i := range v { + state = append(state, fmt.Sprintf( + "range [%s,%s) = %x/%x\n", k.Key, k.EndKey, v[i].Value, v[i].Version)) + } + } + valid, err = it.NextEngineKey() + } + require.NoError(t, err) + return state + } + require.Equal(t, outputState(engs[ingestEngine]), outputState(engs[batchEngine])) +} diff --git a/pkg/storage/sst.go b/pkg/storage/sst.go index a42bdca40eee..affc2376a853 100644 --- a/pkg/storage/sst.go +++ b/pkg/storage/sst.go @@ -53,6 +53,13 @@ func NewSSTIterator( return newPebbleSSTIterator(files, opts, forwardOnly) } +// NewSSTEngineIterator is like NewSSTIterator, but returns an EngineIterator. +func NewSSTEngineIterator( + files [][]sstable.ReadableFile, opts IterOptions, forwardOnly bool, +) (EngineIterator, error) { + return newPebbleSSTIterator(files, opts, forwardOnly) +} + // NewMemSSTIterator returns an MVCCIterator for the provided SST data, // similarly to NewSSTIterator(). func NewMemSSTIterator(sst []byte, verify bool, opts IterOptions) (MVCCIterator, error) {