From 67b388b8bdee15d43917cbef244368f9cdf0696b Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Tue, 20 Aug 2024 10:40:36 -0400 Subject: [PATCH] db: allow excises to unconditionally be flushable ingests Previously, we'd only allow IngestAndExcise to become flushable ingests if we had a guarantee from the caller that the ingestion contained rangedel/rangekeydel overlapping the excise span. This change reworks flushable ingests to return rangedels/rangekeydels from their own iterators, even if one does not exist in the sstables themselves. This, plus the update to the WAL to be able to persist the excise span, allows for IngestAndExcise to allow flushable ingests in more cases than before. Also resolves TODOs in replayWAL() around properly sequencing flushable ingests so their sstables can go in levels lower than L0. This sequencing was necessary to make excises work correctly; as excises require an up-to-date view of the version to work correctly. Fixes #2676. --- batch.go | 30 +++++- batchrepr/reader.go | 2 +- data_test.go | 12 ++- flushable.go | 56 +++++++++++- flushable_test.go | 40 ++++++-- format_major_version.go | 15 ++- format_major_version_test.go | 8 +- ingest.go | 52 +++++++---- ingest_test.go | 41 +++++++-- internal.go | 1 + internal/base/internal.go | 22 ++++- internal/base/internal_test.go | 2 +- mem_table.go | 4 +- metamorphic/generator.go | 11 +-- metamorphic/ops.go | 24 ++--- metamorphic/parser.go | 2 +- open.go | 157 ++++++++++++++++++++++++-------- open_test.go | 2 +- testdata/checkpoint | 50 +++++----- testdata/checkpoint_shared | 28 +++--- testdata/concurrent_excise | 48 ++++++++-- testdata/determinism | 4 +- testdata/event_listener | 11 ++- testdata/excise | 98 +++++++++++++++++--- testdata/flushable_ingest | 28 +++--- testdata/ingested_flushable_api | 70 ++++++++++++-- tool/db.go | 2 +- tool/wal.go | 2 + 28 files changed, 622 insertions(+), 200 deletions(-) diff --git a/batch.go b/batch.go index 556874e2eb2..1c95d4ef976 100644 --- a/batch.go +++ b/batch.go @@ -552,6 +552,12 @@ func (b *Batch) refreshMemTableSize() error { } // This key kind doesn't contribute to the memtable size. continue + case InternalKeyKindExcise: + if b.minimumFormatMajorVersion < FormatFlushableIngestExcises { + b.minimumFormatMajorVersion = FormatFlushableIngestExcises + } + // This key kind doesn't contribute to the memtable size. + continue default: // Note In some circumstances this might be temporary memory // corruption that can be recovered by discarding the batch and @@ -608,7 +614,7 @@ func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error { b.countRangeDels++ case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete: b.countRangeKeys++ - case InternalKeyKindIngestSST: + case InternalKeyKindIngestSST, InternalKeyKindExcise: panic("pebble: invalid key kind for batch") case InternalKeyKindLogData: // LogData does not contribute to memtable size. @@ -1193,6 +1199,28 @@ func (b *Batch) ingestSST(fileNum base.FileNum) { b.minimumFormatMajorVersion = FormatFlushableIngest } +// Excise adds the excise span for a flushable ingest containing an excise. The data +// will only be written to the WAL (not added to memtables or sstables). +func (b *Batch) excise(start, end []byte) { + if b.Empty() { + b.ingestedSSTBatch = true + } else if !b.ingestedSSTBatch { + // Batch contains other key kinds. + panic("pebble: invalid call to excise") + } + + origMemTableSize := b.memTableSize + b.prepareDeferredKeyValueRecord(len(start), len(end), InternalKeyKindExcise) + copy(b.deferredOp.Key, start) + copy(b.deferredOp.Value, end) + // Since excise writes only to the WAL and does not affect the memtable, + // we restore b.memTableSize to its original value. Note that Batch.count + // is not reset because for the InternalKeyKindIngestSST/Excise the count + // is the number of sstable paths which have been added to the batch. + b.memTableSize = origMemTableSize + b.minimumFormatMajorVersion = FormatFlushableIngestExcises +} + // Empty returns true if the batch is empty, and false otherwise. func (b *Batch) Empty() bool { return batchrepr.IsEmpty(b.data) diff --git a/batchrepr/reader.go b/batchrepr/reader.go index 652e94b8b5c..cceb2be3971 100644 --- a/batchrepr/reader.go +++ b/batchrepr/reader.go @@ -97,7 +97,7 @@ func (r *Reader) Next() (kind base.InternalKeyKind, ukey []byte, value []byte, o switch kind { case base.InternalKeyKindSet, base.InternalKeyKindMerge, base.InternalKeyKindRangeDelete, base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete, - base.InternalKeyKindDeleteSized: + base.InternalKeyKindDeleteSized, base.InternalKeyKindExcise: *r, value, ok = DecodeStr(*r) if !ok { return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding %s value", kind) diff --git a/data_test.go b/data_test.go index 3679a39deaf..6dc73054475 100644 --- a/data_test.go +++ b/data_test.go @@ -1331,7 +1331,6 @@ func (d *DB) waitTableStats() { func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { var exciseSpan KeyRange - var sstContainsExciseTombstone bool paths := make([]string, 0, len(td.CmdArgs)) for i, arg := range td.CmdArgs { switch td.CmdArgs[i].Key { @@ -1345,14 +1344,15 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { } exciseSpan.Start = []byte(fields[0]) exciseSpan.End = []byte(fields[1]) - case "contains-excise-tombstone": - sstContainsExciseTombstone = true + case "no-wait": + // Handled by callers. + break default: paths = append(paths, arg.String()) } } - if _, err := d.IngestAndExcise(context.Background(), paths, nil /* shared */, nil /* external */, exciseSpan, sstContainsExciseTombstone); err != nil { + if _, err := d.IngestAndExcise(context.Background(), paths, nil /* shared */, nil /* external */, exciseSpan); err != nil { return err } return nil @@ -1361,6 +1361,10 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { paths := make([]string, 0, len(td.CmdArgs)) for _, arg := range td.CmdArgs { + if arg.Key == "no-wait" { + // Handled by callers. + continue + } paths = append(paths, arg.String()) } diff --git a/flushable.go b/flushable.go index c014bb9b117..41e58ae44b6 100644 --- a/flushable.go +++ b/flushable.go @@ -164,6 +164,7 @@ type ingestedFlushable struct { // exciseSpan is populated if an excise operation should be performed during // flush. exciseSpan KeyRange + seqNum base.SeqNum } func newIngestedFlushable( @@ -172,6 +173,7 @@ func newIngestedFlushable( newIters tableNewIters, newRangeKeyIters keyspanimpl.TableNewSpanIter, exciseSpan KeyRange, + seqNum base.SeqNum, ) *ingestedFlushable { if invariants.Enabled { for i := 1; i < len(files); i++ { @@ -200,6 +202,7 @@ func newIngestedFlushable( slice: manifest.NewLevelSliceKeySorted(comparer.Compare, files), hasRangeKeys: hasRangeKeys, exciseSpan: exciseSpan, + seqNum: seqNum, } return ret @@ -245,30 +248,69 @@ func (s *ingestedFlushable) constructRangeDelIter( // TODO(sumeer): *IterOptions are being ignored, so the index block load for // the point iterator in constructRangeDeIter is not tracked. func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator { - return keyspanimpl.NewLevelIter( + liter := keyspanimpl.NewLevelIter( context.TODO(), keyspan.SpanIterOptions{}, s.comparer.Compare, s.constructRangeDelIter, s.slice.Iter(), manifest.FlushableIngestsLayer(), manifest.KeyTypePoint, ) + if !s.exciseSpan.Valid() { + return liter + } + // We have an excise span to weave into the rangedel iterators. + // + // TODO(bilal): should this be pooled? + miter := &keyspanimpl.MergingIter{} + rdel := keyspan.Span{ + Start: s.exciseSpan.Start, + End: s.exciseSpan.End, + Keys: []keyspan.Key{{Trailer: base.MakeTrailer(s.seqNum, base.InternalKeyKindRangeDelete)}}, + } + rdelIter := keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rdel}) + miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rdelIter) + return miter } // newRangeKeyIter is part of the flushable interface. func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator { - if !s.containsRangeKeys() { - return nil + var rkeydelIter keyspan.FragmentIterator + if s.exciseSpan.Valid() { + // We have an excise span to weave into the rangekey iterators. + rkeydel := keyspan.Span{ + Start: s.exciseSpan.Start, + End: s.exciseSpan.End, + Keys: []keyspan.Key{{Trailer: base.MakeTrailer(s.seqNum, base.InternalKeyKindRangeKeyDelete)}}, + } + rkeydelIter = keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rkeydel}) + } + + if !s.hasRangeKeys { + if rkeydelIter == nil { + // NB: we have to return the nil literal as opposed to the nil + // value of rkeydelIter, otherwise callers of this function will + // have the return value fail == nil checks. + return nil + } + return rkeydelIter } - return keyspanimpl.NewLevelIter( + // TODO(bilal): should this be pooled? + miter := &keyspanimpl.MergingIter{} + liter := keyspanimpl.NewLevelIter( context.TODO(), keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters, s.slice.Iter(), manifest.FlushableIngestsLayer(), manifest.KeyTypeRange, ) + miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter) + if rkeydelIter != nil { + miter.AddLevel(rkeydelIter) + } + return miter } // containsRangeKeys is part of the flushable interface. func (s *ingestedFlushable) containsRangeKeys() bool { - return s.hasRangeKeys + return s.hasRangeKeys || s.exciseSpan.Valid() } // inuseBytes is part of the flushable interface. @@ -329,6 +371,10 @@ func (s *ingestedFlushable) anyFileOverlaps(bounds base.UserKeyBounds) bool { // checks above. return true } + if s.exciseSpan.Valid() { + uk := s.exciseSpan.UserKeyBounds() + return uk.Overlaps(s.comparer.Compare, &bounds) + } return false } diff --git a/flushable_test.go b/flushable_test.go index f8ee20580b4..3be5fd90bad 100644 --- a/flushable_test.go +++ b/flushable_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "strings" "testing" "github.com/cockroachdb/datadriven" @@ -47,7 +48,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { } reset() - loadFileMeta := func(paths []string) []*fileMetadata { + loadFileMeta := func(paths []string, exciseSpan KeyRange, seqNum base.SeqNum) []*fileMetadata { d.mu.Lock() pendingOutputs := make([]base.FileNum, len(paths)) for i := range paths { @@ -60,11 +61,17 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // not actually ingesting a file. lr, err := ingestLoad(context.Background(), d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs) if err != nil { - panic(err) + t.Fatal(err) } meta := make([]*fileMetadata, len(lr.local)) + if exciseSpan.Valid() { + seqNum++ + } for i := range meta { meta[i] = lr.local[i].fileMetadata + if err := setSeqNumInMetadata(meta[i], seqNum+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil { + t.Fatal(err) + } } if len(meta) == 0 { // All of the sstables to be ingested were empty. Nothing to do. @@ -77,8 +84,8 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { } // Verify the sstables do not overlap. - if err := ingestSortAndVerify(d.cmp, lr, KeyRange{}); err != nil { - panic("unsorted sstables") + if err := ingestSortAndVerify(d.cmp, lr, exciseSpan); err != nil { + t.Fatal(err) } // Hard link the sstables into the DB directory. Since the sstables aren't @@ -87,7 +94,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // fall back to copying, and if that fails we undo our work and return an // error. if err := ingestLinkLocal(context.Background(), jobID, d.opts, d.objProvider, lr.local); err != nil { - panic("couldn't hard link sstables") + t.Fatal(err) } // Fsync the directory we added the tables to. We need to do this at some @@ -95,12 +102,13 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // can have the tables referenced in the MANIFEST, but not present in the // directory. if err := d.dataDir.Sync(); err != nil { - panic("Couldn't sync data directory") + t.Fatal(err) } return meta } + var seqNum uint64 datadriven.RunTest(t, "testdata/ingested_flushable_api", func(t *testing.T, td *datadriven.TestData) string { switch td.Cmd { case "reset": @@ -114,12 +122,26 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { case "flushable": // Creates an ingestedFlushable over the input files. paths := make([]string, 0, len(td.CmdArgs)) + var exciseSpan KeyRange + startSeqNum := base.SeqNum(seqNum) for _, arg := range td.CmdArgs { - paths = append(paths, arg.String()) + switch arg.Key { + case "excise": + parts := strings.Split(arg.Vals[0], "-") + if len(parts) != 2 { + return fmt.Sprintf("invalid excise range: %s", arg.Vals[0]) + } + exciseSpan.Start = []byte(parts[0]) + exciseSpan.End = []byte(parts[1]) + seqNum++ + default: + paths = append(paths, arg.String()) + seqNum++ + } } - meta := loadFileMeta(paths) - flushable = newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, KeyRange{}) + meta := loadFileMeta(paths, exciseSpan, startSeqNum) + flushable = newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan, base.SeqNum(startSeqNum)) return "" case "iter": iter := flushable.newIter(nil) diff --git a/format_major_version.go b/format_major_version.go index 90ef9f0b6a2..dd62f32ff4c 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -185,6 +185,12 @@ const ( // fields in the Manifest and thus requires a format major version. FormatSyntheticPrefixSuffix + // FormatFlushableIngestExcises is a format major version that adds support for + // having excises unconditionally being written as flushable ingestions. This + // is implemented through adding a new key kind that can go in the same batches + // as flushable ingested sstables. + FormatFlushableIngestExcises + // TODO(msbutler): add major version for synthetic suffixes // -- Add new versions here -- @@ -222,7 +228,8 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat { switch v { case FormatDefault, FormatFlushableIngest, FormatPrePebblev1MarkedCompacted: return sstable.TableFormatPebblev3 - case FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix: + case FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix, + FormatFlushableIngestExcises: return sstable.TableFormatPebblev4 default: panic(fmt.Sprintf("pebble: unsupported format major version: %s", v)) @@ -234,7 +241,8 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat { func (v FormatMajorVersion) MinTableFormat() sstable.TableFormat { switch v { case FormatDefault, FormatFlushableIngest, FormatPrePebblev1MarkedCompacted, - FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix: + FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix, + FormatFlushableIngestExcises: return sstable.TableFormatPebblev1 default: panic(fmt.Sprintf("pebble: unsupported format major version: %s", v)) @@ -271,6 +279,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{ FormatSyntheticPrefixSuffix: func(d *DB) error { return d.finalizeFormatVersUpgrade(FormatSyntheticPrefixSuffix) }, + FormatFlushableIngestExcises: func(d *DB) error { + return d.finalizeFormatVersUpgrade(FormatFlushableIngestExcises) + }, } const formatVersionMarkerName = `format-version` diff --git a/format_major_version_test.go b/format_major_version_test.go index 3cf01976c48..364f36197c9 100644 --- a/format_major_version_test.go +++ b/format_major_version_test.go @@ -24,11 +24,12 @@ func TestFormatMajorVersionStableValues(t *testing.T) { require.Equal(t, FormatDeleteSizedAndObsolete, FormatMajorVersion(15)) require.Equal(t, FormatVirtualSSTables, FormatMajorVersion(16)) require.Equal(t, FormatSyntheticPrefixSuffix, FormatMajorVersion(17)) + require.Equal(t, FormatFlushableIngestExcises, FormatMajorVersion(18)) // When we add a new version, we should add a check for the new version in // addition to updating these expected values. - require.Equal(t, FormatNewest, FormatMajorVersion(17)) - require.Equal(t, internalFormatNewest, FormatMajorVersion(17)) + require.Equal(t, FormatNewest, FormatMajorVersion(18)) + require.Equal(t, internalFormatNewest, FormatMajorVersion(18)) } func TestFormatMajorVersion_MigrationDefined(t *testing.T) { @@ -53,6 +54,8 @@ func TestRatchetFormat(t *testing.T) { require.Equal(t, FormatVirtualSSTables, d.FormatMajorVersion()) require.NoError(t, d.RatchetFormatMajorVersion(FormatSyntheticPrefixSuffix)) require.Equal(t, FormatSyntheticPrefixSuffix, d.FormatMajorVersion()) + require.NoError(t, d.RatchetFormatMajorVersion(FormatFlushableIngestExcises)) + require.Equal(t, FormatFlushableIngestExcises, d.FormatMajorVersion()) require.NoError(t, d.Close()) @@ -203,6 +206,7 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) { FormatDeleteSizedAndObsolete: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4}, FormatVirtualSSTables: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4}, FormatSyntheticPrefixSuffix: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4}, + FormatFlushableIngestExcises: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4}, } // Valid versions. diff --git a/ingest.go b/ingest.go index 7332aac6b63..5e6020ae0f3 100644 --- a/ingest.go +++ b/ingest.go @@ -140,7 +140,7 @@ func ingestSynthesizeShared( // a.RANGEDEL.100, with a.RANGEDEL.100 being the smallest key. To create a // correct bound, we just use the maximum key kind (which sorts first). // Similarly, we use the smallest key kind for the largest key. - smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, base.InternalKeyKindMax) + smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, base.InternalKeyKindMaxForSstable) largestPointKey := base.MakeInternalKey(sm.LargestPointKey.UserKey, 0, 0) if sm.LargestPointKey.IsExclusiveSentinel() { largestPointKey = base.MakeRangeDeleteSentinelKey(sm.LargestPointKey.UserKey) @@ -216,12 +216,12 @@ func ingestLoad1External( if e.EndKeyIsInclusive { meta.ExtendPointKeyBounds( opts.Comparer.Compare, - base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax), + base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindMaxForSstable), base.MakeInternalKey(largestCopy, 0, 0)) } else { meta.ExtendPointKeyBounds( opts.Comparer.Compare, - base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax), + base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindMaxForSstable), base.MakeRangeDeleteSentinelKey(largestCopy)) } } @@ -1037,7 +1037,7 @@ func (d *DB) Ingest(ctx context.Context, paths []string) error { if d.opts.ReadOnly { return ErrReadOnly } - _, err := d.ingest(ctx, paths, nil /* shared */, KeyRange{}, false, nil /* external */) + _, err := d.ingest(ctx, paths, nil /* shared */, KeyRange{}, nil /* external */) return err } @@ -1125,7 +1125,7 @@ func (d *DB) IngestWithStats(ctx context.Context, paths []string) (IngestOperati if d.opts.ReadOnly { return IngestOperationStats{}, ErrReadOnly } - return d.ingest(ctx, paths, nil, KeyRange{}, false, nil) + return d.ingest(ctx, paths, nil, KeyRange{}, nil) } // IngestExternalFiles does the same as IngestWithStats, and additionally @@ -1145,7 +1145,7 @@ func (d *DB) IngestExternalFiles( if d.opts.Experimental.RemoteStorage == nil { return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured") } - return d.ingest(ctx, nil, nil, KeyRange{}, false, external) + return d.ingest(ctx, nil, nil, KeyRange{}, external) } // IngestAndExcise does the same as IngestWithStats, and additionally accepts a @@ -1164,7 +1164,6 @@ func (d *DB) IngestAndExcise( shared []SharedSSTMeta, external []ExternalFile, exciseSpan KeyRange, - sstsContainExciseTombstone bool, ) (IngestOperationStats, error) { if err := d.closed.Load(); err != nil { panic(err) @@ -1187,13 +1186,24 @@ func (d *DB) IngestAndExcise( v, FormatMinForSharedObjects, ) } - return d.ingest(ctx, paths, shared, exciseSpan, sstsContainExciseTombstone, external) + return d.ingest(ctx, paths, shared, exciseSpan, external) } // Both DB.mu and commitPipeline.mu must be held while this is called. func (d *DB) newIngestedFlushableEntry( meta []*fileMetadata, seqNum base.SeqNum, logNum base.DiskFileNum, exciseSpan KeyRange, ) (*flushableEntry, error) { + // If there's an excise being done atomically with the same ingest, we + // assign the lowest sequence number in the set of sequence numbers for this + // ingestion to the excise. Note that we've already allocated fileCount+1 + // sequence numbers in this case. + // + // This mimics the behaviour in the non-flushable ingest case (see the callsite + // for ingestUpdateSeqNum). + fileSeqNumStart := seqNum + if exciseSpan.Valid() { + fileSeqNumStart = seqNum + 1 // the first seqNum is reserved for the excise. + } // Update the sequence number for all of the sstables in the // metadata. Writing the metadata to the manifest when the // version edit is applied is the mechanism that persists the @@ -1203,12 +1213,12 @@ func (d *DB) newIngestedFlushableEntry( // time, then we'll lose the ingest sequence number information. But this // information will also be reconstructed on node restart. for i, m := range meta { - if err := setSeqNumInMetadata(m, seqNum+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil { + if err := setSeqNumInMetadata(m, fileSeqNumStart+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil { return nil, err } } - f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan) + f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan, seqNum) // NB: The logNum/seqNum are the WAL number which we're writing this entry // to and the sequence number within the WAL which we'll write this entry @@ -1242,6 +1252,9 @@ func (d *DB) handleIngestAsFlushable( meta []*fileMetadata, seqNum base.SeqNum, exciseSpan KeyRange, ) error { b := d.NewBatch() + if exciseSpan.Valid() { + b.excise(exciseSpan.Start, exciseSpan.End) + } for _, m := range meta { b.ingestSST(m.FileNum) } @@ -1271,6 +1284,11 @@ func (d *DB) handleIngestAsFlushable( d.mu.Lock() } + // The excise span is going to outlive this ingestion call. Copy it. + exciseSpan = KeyRange{ + Start: append([]byte(nil), exciseSpan.Start...), + End: append([]byte(nil), exciseSpan.End...), + } entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan) if err != nil { return err @@ -1313,7 +1331,6 @@ func (d *DB) ingest( paths []string, shared []SharedSSTMeta, exciseSpan KeyRange, - sstsContainExciseTombstone bool, external []ExternalFile, ) (IngestOperationStats, error) { if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil { @@ -1523,17 +1540,16 @@ func (d *DB) ingest( hasRemoteFiles := len(shared) > 0 || len(external) > 0 canIngestFlushable := d.FormatMajorVersion() >= FormatFlushableIngest && (len(d.mu.mem.queue) < d.opts.MemTableStopWritesThreshold) && - !d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles + !d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles && + (!exciseSpan.Valid() || d.FormatMajorVersion() >= FormatFlushableIngestExcises) - if !canIngestFlushable || (exciseSpan.Valid() && !sstsContainExciseTombstone) { + if !canIngestFlushable { // We're not able to ingest as a flushable, // so we must synchronously flush. // - // TODO(bilal): Currently, if any of the files being ingested are shared or - // there's an excise span present, we cannot use flushable ingests and need - // to wait synchronously. Either remove this caveat by fleshing out - // flushable ingest logic to also account for these cases, or remove this - // comment. Tracking issue: https://github.com/cockroachdb/pebble/issues/2676 + // TODO(bilal): Currently, if any of the files being ingested are shared, + // we cannot use flushable ingests and need + // to wait synchronously. if mem.flushable == d.mu.mem.mutable { err = d.makeRoomForWrite(nil) } diff --git a/ingest_test.go b/ingest_test.go index 8752a04a237..32f916c06bc 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -431,7 +431,7 @@ func TestOverlappingIngestedSSTs(t *testing.T) { ) cache := NewCache(0) defer func() { - if !closed { + if d != nil && !closed { require.NoError(t, d.Close()) } cache.Unref() @@ -443,6 +443,7 @@ func TestOverlappingIngestedSSTs(t *testing.T) { reset := func(strictMem bool) { if d != nil && !closed { require.NoError(t, d.Close()) + d = nil } blockFlush = false @@ -710,9 +711,19 @@ func TestExcise(t *testing.T) { case "ingest": flushed = false + noWait := false + for i := range td.CmdArgs { + switch td.CmdArgs[i].Key { + case "no-wait": + noWait = true + } + } if err := runIngestCmd(td, d, mem); err != nil { return err.Error() } + if noWait { + return "" + } // Wait for a possible flush. d.mu.Lock() for d.mu.compact.flushing { @@ -726,12 +737,26 @@ func TestExcise(t *testing.T) { case "ingest-and-excise": flushed = false - d.mu.Lock() - prevFlushableIngests := d.mu.versions.metrics.Flush.AsIngestCount - d.mu.Unlock() + noWait := false + for i := range td.CmdArgs { + switch td.CmdArgs[i].Key { + case "no-wait": + noWait = true + } + } + var prevFlushableIngests uint64 + if !noWait { + d.mu.Lock() + prevFlushableIngests = d.mu.versions.metrics.Flush.AsIngestCount + d.mu.Unlock() + } + if err := runIngestAndExciseCmd(td, d, mem); err != nil { return err.Error() } + if noWait { + return "" + } // Wait for a possible flush. d.mu.Lock() for d.mu.compact.flushing { @@ -1127,7 +1152,7 @@ func testIngestSharedImpl( require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey}, false) + _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey}) require.NoError(t, err) return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs)) @@ -1358,7 +1383,7 @@ func TestSimpleIngestShared(t *testing.T) { } _, err = d.IngestAndExcise( context.Background(), []string{}, []SharedSSTMeta{sharedSSTMeta}, nil, /* external */ - KeyRange{Start: []byte("d"), End: []byte("ee")}, false) + KeyRange{Start: []byte("d"), End: []byte("ee")}) require.NoError(t, err) // TODO(bilal): Once reading of shared sstables is in, verify that the values @@ -1628,7 +1653,7 @@ func TestConcurrentExcise(t *testing.T) { require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil, KeyRange{Start: startKey, End: endKey}, false) + _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil, KeyRange{Start: startKey, End: endKey}) require.NoError(t, err) return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs)) @@ -2063,7 +2088,7 @@ func TestIngestExternal(t *testing.T) { ) require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, nil /* shared */, externalFiles, KeyRange{Start: startKey, End: endKey}, false) + _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, nil /* shared */, externalFiles, KeyRange{Start: startKey, End: endKey}) require.NoError(t, err) return fmt.Sprintf("replicated %d external SSTs", len(externalFiles)) diff --git a/internal.go b/internal.go index 3e324347179..d6997c6bce6 100644 --- a/internal.go +++ b/internal.go @@ -29,6 +29,7 @@ const ( InternalKeyKindRangeKeyMax = base.InternalKeyKindRangeKeyMax InternalKeyKindIngestSST = base.InternalKeyKindIngestSST InternalKeyKindDeleteSized = base.InternalKeyKindDeleteSized + InternalKeyKindExcise = base.InternalKeyKindExcise InternalKeyKindInvalid = base.InternalKeyKindInvalid ) diff --git a/internal/base/internal.go b/internal/base/internal.go index e364508ba02..76b426d8ce3 100644 --- a/internal/base/internal.go +++ b/internal/base/internal.go @@ -126,8 +126,8 @@ const ( // InternalKeyKindIngestSST is used to distinguish a batch that corresponds to // the WAL entry for ingested sstables that are added to the flushable - // queue. This InternalKeyKind cannot appear, amongst other key kinds in a - // batch, or in an sstable. + // queue. This InternalKeyKind cannot appear amongst other key kinds in a + // batch (with the exception of alongside InternalKeyKindExcise), or in an sstable. InternalKeyKindIngestSST InternalKeyKind = 22 // InternalKeyKindDeleteSized keys behave identically to @@ -137,6 +137,14 @@ const ( // heuristics, but is not required to be accurate for correctness. InternalKeyKindDeleteSized InternalKeyKind = 23 + // InternalKeyKindExcise is used to persist the Excise part of an IngestAndExcise + // to a WAL. An Excise is similar to a RangeDel+RangeKeyDel combined, in that it + // deletes all point and range keys in a given key range while also immediately + // truncating sstables to exclude this key span. This InternalKeyKind cannot + // appear amongst other key kinds in a batch (with the exception of alongside + // InternalKeyKindIngestSST), or in an sstable. + InternalKeyKindExcise InternalKeyKind = 24 + // This maximum value isn't part of the file format. Future extensions may // increase this value. // @@ -146,7 +154,13 @@ const ( // which sorts 'less than or equal to' any other valid internalKeyKind, when // searching for any kind of internal key formed by a certain user key and // seqNum. - InternalKeyKindMax InternalKeyKind = 23 + InternalKeyKindMax InternalKeyKind = 24 + + // InternalKeyKindMaxForSstable is the largest valid key kind that can exist + // in an sstable. This should usually equal InternalKeyKindMax, except + // if the current InternalKeyKindMax is a kind that is never added to an + // sstable or memtable (eg. InternalKeyKindExcise). + InternalKeyKindMaxForSstable InternalKeyKind = InternalKeyKindDeleteSized // Internal to the sstable format. Not exposed by any sstable iterator. // Declared here to prevent definition of valid key kinds that set this bit. @@ -191,6 +205,7 @@ var internalKeyKindNames = []string{ InternalKeyKindRangeKeyDelete: "RANGEKEYDEL", InternalKeyKindIngestSST: "INGESTSST", InternalKeyKindDeleteSized: "DELSIZED", + InternalKeyKindExcise: "EXCISE", InternalKeyKindInvalid: "INVALID", } @@ -290,6 +305,7 @@ var kindsMap = map[string]InternalKeyKind{ "RANGEKEYDEL": InternalKeyKindRangeKeyDelete, "INGESTSST": InternalKeyKindIngestSST, "DELSIZED": InternalKeyKindDeleteSized, + "EXCISE": InternalKeyKindExcise, } // ParseInternalKey parses the string representation of an internal key. The diff --git a/internal/base/internal_test.go b/internal/base/internal_test.go index d06b7897414..626a7ec86fe 100644 --- a/internal/base/internal_test.go +++ b/internal/base/internal_test.go @@ -41,7 +41,7 @@ func TestInvalidInternalKey(t *testing.T) { "\x01\x02\x03\x04\x05\x06\x07", "foo", "foo\x08\x07\x06\x05\x04\x03\x02", - "foo\x18\x07\x06\x05\x04\x03\x02\x01", + "foo\x19\x07\x06\x05\x04\x03\x02\x01", } for _, tc := range testCases { k := DecodeInternalKey([]byte(tc)) diff --git a/mem_table.go b/mem_table.go index 5b035b6fc3f..cadfb51d414 100644 --- a/mem_table.go +++ b/mem_table.go @@ -230,8 +230,8 @@ func (m *memTable) apply(batch *Batch, seqNum base.SeqNum) error { // Don't increment seqNum for LogData, since these are not applied // to the memtable. seqNum-- - case InternalKeyKindIngestSST: - panic("pebble: cannot apply ingested sstable key kind to memtable") + case InternalKeyKindIngestSST, InternalKeyKindExcise: + panic("pebble: cannot apply ingested sstable or excise kind keys to memtable") default: err = ins.Add(&m.skl, ikey, value) } diff --git a/metamorphic/generator.go b/metamorphic/generator.go index a5829356225..cb40aab21f5 100644 --- a/metamorphic/generator.go +++ b/metamorphic/generator.go @@ -1300,12 +1300,11 @@ func (g *generator) writerIngestAndExcise() { } g.add(&ingestAndExciseOp{ - dbID: dbID, - batchID: batchID, - derivedDBID: derivedDBID, - exciseStart: start, - exciseEnd: end, - sstContainsExciseTombstone: g.rng.Intn(2) == 0, + dbID: dbID, + batchID: batchID, + derivedDBID: derivedDBID, + exciseStart: start, + exciseEnd: end, }) } diff --git a/metamorphic/ops.go b/metamorphic/ops.go index 10337389459..7c5ff02f02a 100644 --- a/metamorphic/ops.go +++ b/metamorphic/ops.go @@ -879,11 +879,10 @@ func (o *ingestOp) keys() []*[]byte { return nil } func (o *ingestOp) diagramKeyRanges() []pebble.KeyRange { return nil } type ingestAndExciseOp struct { - dbID objID - batchID objID - derivedDBID objID - exciseStart, exciseEnd []byte - sstContainsExciseTombstone bool + dbID objID + batchID objID + derivedDBID objID + exciseStart, exciseEnd []byte } func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) { @@ -899,13 +898,6 @@ func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) { return } - if o.sstContainsExciseTombstone { - // Add a rangedel and rangekeydel to the batch. This ensures it'll end up - // inside the sstable. Note that all entries in the sstable will have the - // same sequence number, so the ordering within the batch doesn't matter. - err = firstError(err, b.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts)) - err = firstError(err, b.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts)) - } path, writerMeta, err2 := buildForIngest(t, o.dbID, b, 0 /* i */) if err2 != nil { h.Recordf("Build(%s) // %v", o.batchID, err2) @@ -923,7 +915,7 @@ func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) { _, err := db.IngestAndExcise(context.Background(), []string{path}, nil /* shared */, nil /* external */, pebble.KeyRange{ Start: o.exciseStart, End: o.exciseEnd, - }, o.sstContainsExciseTombstone) + }) return err })) } else { @@ -956,7 +948,7 @@ func (o *ingestAndExciseOp) syncObjs() objIDSlice { } func (o *ingestAndExciseOp) String() string { - return fmt.Sprintf("%s.IngestAndExcise(%s, %q, %q, %t /* sstContainsExciseTombstone */)", o.dbID, o.batchID, o.exciseStart, o.exciseEnd, o.sstContainsExciseTombstone) + return fmt.Sprintf("%s.IngestAndExcise(%s, %q, %q)", o.dbID, o.batchID, o.exciseStart, o.exciseEnd) } func (o *ingestAndExciseOp) keys() []*[]byte { @@ -1976,7 +1968,7 @@ func (r *replicateOp) runSharedReplicate( return } - _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end}, false) + _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end}) h.Recordf("%s // %v", r, err) } @@ -2039,7 +2031,7 @@ func (r *replicateOp) runExternalReplicate( return } - _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, nil, externalSSTs /* external */, pebble.KeyRange{Start: r.start, End: r.end}, false /* sstContainsExciseTombstone */) + _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, nil, externalSSTs /* external */, pebble.KeyRange{Start: r.start, End: r.end}) h.Recordf("%s // %v", r, err) } diff --git a/metamorphic/parser.go b/metamorphic/parser.go index 833c3b45347..46924a77876 100644 --- a/metamorphic/parser.go +++ b/metamorphic/parser.go @@ -76,7 +76,7 @@ func opArgs(op op) (receiverID *objID, targetID *objID, args []interface{}) { case *ingestOp: return &t.dbID, nil, []interface{}{&t.batchIDs} case *ingestAndExciseOp: - return &t.dbID, nil, []interface{}{&t.batchID, &t.exciseStart, &t.exciseEnd, &t.sstContainsExciseTombstone} + return &t.dbID, nil, []interface{}{&t.batchID, &t.exciseStart, &t.exciseEnd} case *ingestExternalFilesOp: return &t.dbID, nil, []interface{}{&t.objs} case *initOp: diff --git a/open.go b/open.go index 72de7a0c6f6..17847a783da 100644 --- a/open.go +++ b/open.go @@ -502,6 +502,9 @@ func Open(dirname string, opts *Options) (db *DB, err error) { // 20.1 do not guarantee that closed WALs end cleanly. But the earliest // compatible Pebble format is newer and guarantees a clean EOF. strictWALTail := i < len(replayWALs)-1 + // NB: replayWAL can individually apply version edits, in which case it'll + // zero-out ve internally before reusing it. We need to empty out toFlush + // if it did that. flush, maxSeqNum, err := d.replayWAL(jobID, &ve, lf, strictWALTail) if err != nil { return nil, err @@ -511,6 +514,12 @@ func Open(dirname string, opts *Options) (db *DB, err error) { d.mu.versions.logSeqNum.Store(maxSeqNum) } } + if d.mu.mem.mutable == nil && !d.opts.ReadOnly { + // Recreate the mutable memtable if replayWAL got rid of it. + var entry *flushableEntry + d.mu.mem.mutable, entry = d.newMemTable(d.mu.versions.getNextDiskFileNum(), d.mu.versions.logSeqNum.Load(), 0 /* minSize */) + d.mu.mem.queue = append(d.mu.mem.queue, entry) + } d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load()) if !d.opts.ReadOnly { @@ -851,11 +860,32 @@ func (d *DB) replayWAL( if err != nil { return err } - newVE, _, err := d.runCompaction(jobID, c) + var newVE *versionEdit + if c.kind == compactionKindIngestedFlushable { + d.mu.versions.logLock() + newVE, err = d.runIngestFlush(c) + d.mu.versions.logUnlock() + } else { + newVE, _, err = d.runCompaction(jobID, c) + } if err != nil { return errors.Wrapf(err, "running compaction during WAL replay") } ve.NewFiles = append(ve.NewFiles, newVE.NewFiles...) + if newVE.DeletedFiles != nil { + if ve.DeletedFiles == nil { + ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata) + } + for i := range newVE.DeletedFiles { + ve.DeletedFiles[i] = newVE.DeletedFiles[i] + } + } + if newVE.CreatedBackingTables != nil { + ve.CreatedBackingTables = append(ve.CreatedBackingTables, newVE.CreatedBackingTables...) + } + if newVE.RemovedBackingTables != nil { + ve.RemovedBackingTables = append(ve.RemovedBackingTables, newVE.RemovedBackingTables...) + } return nil } defer func() { @@ -905,31 +935,101 @@ func (d *DB) replayWAL( batchesReplayed++ { br := b.Reader() - if kind, encodedFileNum, _, ok, err := br.Next(); err != nil { + if kind, encodedFileNum, val, ok, err := br.Next(); err != nil { return nil, 0, err - } else if ok && kind == InternalKeyKindIngestSST { + } else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) { + // We're in the flushable ingests (+ possibly excises) case. + // + // Ingests require an up-to-date view of the LSM to determine the target + // level of ingested sstables, and to accurately compute excises. Outside + // of this case, this function avoids calling logAndApply after every flushable + // as regular (memtable) flushes only add files to L0 and don't need an + // up-to-date Version. + // + // If we have an in-progress versionEdit, we need to apply it before we proceed + // with the flushable ingest. + if mem == nil && d.mu.mem.mutable != nil { + mem = d.mu.mem.mutable + entry = d.mu.mem.queue[len(d.mu.mem.queue)-1] + d.mu.mem.mutable = nil + d.mu.mem.queue = d.mu.mem.queue[:len(d.mu.mem.queue)-1] + } + if mem != nil { + mem.writerUnref() + } + flushMem() + // mem is nil here. + if !d.opts.ReadOnly && len(toFlush) > 0 { + if err := updateVE(); err != nil { + return nil, 0, err + } + } + if d.mu.versions.logSeqNum.Load() < maxSeqNum { + d.mu.versions.logSeqNum.Store(maxSeqNum) + } + d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load()) + if !d.opts.ReadOnly { + if len(ve.NewFiles) > 0 { + // Create an empty .log file. + newLogNum := d.mu.versions.getNextDiskFileNum() + ve.MinUnflushedLogNum = newLogNum + + // Create the manifest with the updated MinUnflushedLogNum before + // creating the new log file. If we created the log file first, a + // crash before the manifest is synced could leave two WALs with + // unclean tails. + d.mu.versions.logLock() + if err := d.mu.versions.logAndApply(jobID, ve, newFileMetrics(ve.NewFiles), false /* forceRotation */, func() []compactionInfo { + return nil + }); err != nil { + return nil, 0, err + } + } + for _, entry := range toFlush { + entry.readerUnrefLocked(true) + } + // We've already released the references to these memtables, so we clear + // them out of toFlush so the caller doesn't double-unref them. Plus we + // don't want to double-flush already flushed memtables in updateVE(). + toFlush = toFlush[:0] + *ve = versionEdit{} + } fileNums := make([]base.DiskFileNum, 0, b.Count()) + var exciseSpan KeyRange addFileNum := func(encodedFileNum []byte) { fileNum, n := binary.Uvarint(encodedFileNum) if n <= 0 { - panic("pebble: ingest sstable file num is invalid.") + panic("pebble: ingest sstable file num is invalid") } fileNums = append(fileNums, base.DiskFileNum(fileNum)) } - addFileNum(encodedFileNum) + if kind == base.InternalKeyKindExcise { + exciseSpan.Start = append([]byte(nil), encodedFileNum...) + exciseSpan.End = append([]byte(nil), val...) + } else { + addFileNum(encodedFileNum) + } for i := 1; i < int(b.Count()); i++ { - kind, encodedFileNum, _, ok, err := br.Next() + kind, key, val, ok, err := br.Next() if err != nil { return nil, 0, err } - if kind != InternalKeyKindIngestSST { - panic("pebble: invalid batch key kind.") + if kind != InternalKeyKindIngestSST && kind != InternalKeyKindExcise { + panic("pebble: invalid batch key kind") } if !ok { - panic("pebble: invalid batch count.") + panic("pebble: invalid batch count") } - addFileNum(encodedFileNum) + if kind == base.InternalKeyKindExcise { + if exciseSpan.Valid() { + panic("pebble: multiple excise spans in a single batch") + } + exciseSpan.Start = append([]byte(nil), key...) + exciseSpan.End = append([]byte(nil), val...) + continue + } + addFileNum(key) } if _, _, _, ok, err := br.Next(); err != nil { @@ -969,11 +1069,15 @@ func (d *DB) replayWAL( } } - if uint32(len(meta)) != b.Count() { + numFiles := len(meta) + if exciseSpan.Valid() { + numFiles++ + } + if uint32(numFiles) != b.Count() { panic("pebble: couldn't load all files in WAL entry.") } - entry, err = d.newIngestedFlushableEntry(meta, seqNum, base.DiskFileNum(ll.Num), KeyRange{}) + entry, err = d.newIngestedFlushableEntry(meta, seqNum, base.DiskFileNum(ll.Num), exciseSpan) if err != nil { return nil, 0, err } @@ -991,35 +1095,8 @@ func (d *DB) replayWAL( d.mu.mem.mutable = nil } else { toFlush = append(toFlush, entry) - // During WAL replay, the lsm only has L0, hence, the - // baseLevel is 1. For the sake of simplicity, we place the - // ingested files in L0 here, instead of finding their - // target levels. This is a simplification for the sake of - // simpler code. It is expected that WAL replay should be - // rare, and that flushables of type ingestedFlushable - // should also be rare. So, placing the ingested files in L0 - // is alright. - // - // TODO(bananabrick): Maybe refactor this function to allow - // us to easily place ingested files in levels as low as - // possible during WAL replay. It would require breaking up - // the application of ve to the manifest into chunks and is - // not pretty w/o a refactor to this function and how it's - // used. - c, err := newFlush( - d.opts, d.mu.versions.currentVersion(), - 1, /* base level */ - []*flushableEntry{entry}, - d.timeNow(), - ) - if err != nil { - return nil, 0, err - } - for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files { - ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: 0, Meta: file.FileMetadata}) - } } - return toFlush, maxSeqNum, nil + break } } diff --git a/open_test.go b/open_test.go index fff9024b4d9..9fe40f67c6b 100644 --- a/open_test.go +++ b/open_test.go @@ -331,7 +331,7 @@ func TestNewDBFilenames(t *testing.T) { "LOCK", "MANIFEST-000001", "OPTIONS-000003", - "marker.format-version.000004.017", + "marker.format-version.000005.018", "marker.manifest.000001.MANIFEST-000001", }, } diff --git a/testdata/checkpoint b/testdata/checkpoint index c323f8c0cd9..f9230195151 100644 --- a/testdata/checkpoint +++ b/testdata/checkpoint @@ -36,6 +36,10 @@ create: db/marker.format-version.000004.017 close: db/marker.format-version.000004.017 remove: db/marker.format-version.000003.016 sync: db +create: db/marker.format-version.000005.018 +close: db/marker.format-version.000005.018 +remove: db/marker.format-version.000004.017 +sync: db create: db/temporary.000003.dbtmp sync: db/temporary.000003.dbtmp close: db/temporary.000003.dbtmp @@ -98,9 +102,9 @@ close: . open-dir: checkpoints/checkpoint1 link: db/OPTIONS-000003 -> checkpoints/checkpoint1/OPTIONS-000003 open-dir: checkpoints/checkpoint1 -create: checkpoints/checkpoint1/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint1/marker.format-version.000001.017 -close: checkpoints/checkpoint1/marker.format-version.000001.017 +create: checkpoints/checkpoint1/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint1/marker.format-version.000001.018 +close: checkpoints/checkpoint1/marker.format-version.000001.018 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 link: db/000005.sst -> checkpoints/checkpoint1/000005.sst @@ -138,9 +142,9 @@ close: checkpoints open-dir: checkpoints/checkpoint2 link: db/OPTIONS-000003 -> checkpoints/checkpoint2/OPTIONS-000003 open-dir: checkpoints/checkpoint2 -create: checkpoints/checkpoint2/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint2/marker.format-version.000001.017 -close: checkpoints/checkpoint2/marker.format-version.000001.017 +create: checkpoints/checkpoint2/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint2/marker.format-version.000001.018 +close: checkpoints/checkpoint2/marker.format-version.000001.018 sync: checkpoints/checkpoint2 close: checkpoints/checkpoint2 link: db/000007.sst -> checkpoints/checkpoint2/000007.sst @@ -173,9 +177,9 @@ close: checkpoints open-dir: checkpoints/checkpoint3 link: db/OPTIONS-000003 -> checkpoints/checkpoint3/OPTIONS-000003 open-dir: checkpoints/checkpoint3 -create: checkpoints/checkpoint3/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint3/marker.format-version.000001.017 -close: checkpoints/checkpoint3/marker.format-version.000001.017 +create: checkpoints/checkpoint3/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint3/marker.format-version.000001.018 +close: checkpoints/checkpoint3/marker.format-version.000001.018 sync: checkpoints/checkpoint3 close: checkpoints/checkpoint3 link: db/000005.sst -> checkpoints/checkpoint3/000005.sst @@ -259,7 +263,7 @@ list db LOCK MANIFEST-000001 OPTIONS-000003 -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 list checkpoints/checkpoint1 @@ -269,7 +273,7 @@ list checkpoints/checkpoint1 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.017 +marker.format-version.000001.018 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint1 readonly @@ -336,7 +340,7 @@ list checkpoints/checkpoint2 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.017 +marker.format-version.000001.018 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint2 readonly @@ -378,7 +382,7 @@ list checkpoints/checkpoint3 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.017 +marker.format-version.000001.018 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint3 readonly @@ -492,9 +496,9 @@ close: checkpoints open-dir: checkpoints/checkpoint4 link: db/OPTIONS-000003 -> checkpoints/checkpoint4/OPTIONS-000003 open-dir: checkpoints/checkpoint4 -create: checkpoints/checkpoint4/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint4/marker.format-version.000001.017 -close: checkpoints/checkpoint4/marker.format-version.000001.017 +create: checkpoints/checkpoint4/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint4/marker.format-version.000001.018 +close: checkpoints/checkpoint4/marker.format-version.000001.018 sync: checkpoints/checkpoint4 close: checkpoints/checkpoint4 link: db/000010.sst -> checkpoints/checkpoint4/000010.sst @@ -582,7 +586,7 @@ list db LOCK MANIFEST-000001 OPTIONS-000003 -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 @@ -597,9 +601,9 @@ close: checkpoints open-dir: checkpoints/checkpoint5 link: db/OPTIONS-000003 -> checkpoints/checkpoint5/OPTIONS-000003 open-dir: checkpoints/checkpoint5 -create: checkpoints/checkpoint5/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint5/marker.format-version.000001.017 -close: checkpoints/checkpoint5/marker.format-version.000001.017 +create: checkpoints/checkpoint5/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint5/marker.format-version.000001.018 +close: checkpoints/checkpoint5/marker.format-version.000001.018 sync: checkpoints/checkpoint5 close: checkpoints/checkpoint5 link: db/000010.sst -> checkpoints/checkpoint5/000010.sst @@ -695,9 +699,9 @@ close: checkpoints open-dir: checkpoints/checkpoint6 link: db/OPTIONS-000003 -> checkpoints/checkpoint6/OPTIONS-000003 open-dir: checkpoints/checkpoint6 -create: checkpoints/checkpoint6/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint6/marker.format-version.000001.017 -close: checkpoints/checkpoint6/marker.format-version.000001.017 +create: checkpoints/checkpoint6/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint6/marker.format-version.000001.018 +close: checkpoints/checkpoint6/marker.format-version.000001.018 sync: checkpoints/checkpoint6 close: checkpoints/checkpoint6 link: db/000011.sst -> checkpoints/checkpoint6/000011.sst diff --git a/testdata/checkpoint_shared b/testdata/checkpoint_shared index fd264a5e7fc..6aff7d5ce3c 100644 --- a/testdata/checkpoint_shared +++ b/testdata/checkpoint_shared @@ -24,6 +24,10 @@ sync: db create: db/marker.format-version.000001.017 close: db/marker.format-version.000001.017 sync: db +create: db/marker.format-version.000002.018 +close: db/marker.format-version.000002.018 +remove: db/marker.format-version.000001.017 +sync: db create: db/temporary.000003.dbtmp sync: db/temporary.000003.dbtmp close: db/temporary.000003.dbtmp @@ -86,9 +90,9 @@ close: . open-dir: checkpoints/checkpoint1 link: db/OPTIONS-000003 -> checkpoints/checkpoint1/OPTIONS-000003 open-dir: checkpoints/checkpoint1 -create: checkpoints/checkpoint1/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint1/marker.format-version.000001.017 -close: checkpoints/checkpoint1/marker.format-version.000001.017 +create: checkpoints/checkpoint1/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint1/marker.format-version.000001.018 +close: checkpoints/checkpoint1/marker.format-version.000001.018 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 open: db/MANIFEST-000001 (options: *vfs.sequentialReadsOption) @@ -135,9 +139,9 @@ close: checkpoints open-dir: checkpoints/checkpoint2 link: db/OPTIONS-000003 -> checkpoints/checkpoint2/OPTIONS-000003 open-dir: checkpoints/checkpoint2 -create: checkpoints/checkpoint2/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint2/marker.format-version.000001.017 -close: checkpoints/checkpoint2/marker.format-version.000001.017 +create: checkpoints/checkpoint2/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint2/marker.format-version.000001.018 +close: checkpoints/checkpoint2/marker.format-version.000001.018 sync: checkpoints/checkpoint2 close: checkpoints/checkpoint2 open: db/MANIFEST-000001 (options: *vfs.sequentialReadsOption) @@ -180,9 +184,9 @@ close: checkpoints open-dir: checkpoints/checkpoint3 link: db/OPTIONS-000003 -> checkpoints/checkpoint3/OPTIONS-000003 open-dir: checkpoints/checkpoint3 -create: checkpoints/checkpoint3/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint3/marker.format-version.000001.017 -close: checkpoints/checkpoint3/marker.format-version.000001.017 +create: checkpoints/checkpoint3/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint3/marker.format-version.000001.018 +close: checkpoints/checkpoint3/marker.format-version.000001.018 sync: checkpoints/checkpoint3 close: checkpoints/checkpoint3 open: db/MANIFEST-000001 (options: *vfs.sequentialReadsOption) @@ -239,7 +243,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 REMOTE-OBJ-CATALOG-000001 -marker.format-version.000001.017 +marker.format-version.000002.018 marker.manifest.000001.MANIFEST-000001 marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001 @@ -249,7 +253,7 @@ list checkpoints/checkpoint1 MANIFEST-000001 OPTIONS-000003 REMOTE-OBJ-CATALOG-000001 -marker.format-version.000001.017 +marker.format-version.000001.018 marker.manifest.000001.MANIFEST-000001 marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001 @@ -301,7 +305,7 @@ list checkpoints/checkpoint2 MANIFEST-000001 OPTIONS-000003 REMOTE-OBJ-CATALOG-000001 -marker.format-version.000001.017 +marker.format-version.000001.018 marker.manifest.000001.MANIFEST-000001 marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001 diff --git a/testdata/concurrent_excise b/testdata/concurrent_excise index 270aaae5c89..281767fe90f 100644 --- a/testdata/concurrent_excise +++ b/testdata/concurrent_excise @@ -130,7 +130,7 @@ switch 1 ---- ok -# The below file-only snapshot should be errored out by the concurrent excise. +# The below file-only snapshot should not error out, despite a concurrent excise. batch set d something @@ -237,7 +237,7 @@ L6: build ext5 set bb something set b something -del b-c +del-range b c ---- lsm @@ -257,6 +257,19 @@ batch set bb new ---- +iter +first +next +next +next +next +---- +a: (new, .) +bb: (new, .) +d: (new, .) +e: (new, .) +. + lsm ---- L0.0: @@ -266,9 +279,19 @@ L6: 000005:[a#10,SET-a#10,SET] 000006:[e#11,SET-e#11,SET] -ingest-and-excise ext5 excise=b-c contains-excise-tombstone +ingest-and-excise ext5 excise=b-c +---- + +iter +first +next +next +next ---- -flushable ingest +a: (new, .) +b: (something, .) +bb: (something, .) +d: (new, .) lsm ---- @@ -277,7 +300,7 @@ L0.0: 000010:[d#13,SET-e#14,SET] L6: 000005:[a#10,SET-a#10,SET] - 000012:[b#16,SET-bb#16,SET] + 000012:[b#17,RANGEDEL-c#inf,RANGEDEL] 000006:[e#11,SET-e#11,SET] unblock c1 @@ -288,7 +311,20 @@ compact a-z ---- ok +iter +first +next +next +next +next +---- +a: (new, .) +b: (something, .) +bb: (something, .) +d: (new, .) +e: (new, .) + lsm ---- L6: - 000016:[a#0,SET-e#0,SET] + 000015:[a#0,SET-e#0,SET] diff --git a/testdata/determinism b/testdata/determinism index f7a30ce981e..e1026ba04bd 100644 --- a/testdata/determinism +++ b/testdata/determinism @@ -58,11 +58,11 @@ set alpha 5 ---- 7:batch -ingest-and-excise contains-excise-tombstone excise=a-b ext-ab +ingest-and-excise excise=a-b ext-ab ---- 8:ingest-and-excise -ingest-and-excise contains-excise-tombstone excise=b-c ext-bc +ingest-and-excise excise=b-c ext-bc ---- 9:ingest-and-excise diff --git a/testdata/event_listener b/testdata/event_listener index 982bd9ce6ba..5d32f9a2d10 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -47,6 +47,11 @@ close: db/marker.format-version.000004.017 remove: db/marker.format-version.000003.016 sync: db upgraded to format version: 017 +create: db/marker.format-version.000005.018 +close: db/marker.format-version.000005.018 +remove: db/marker.format-version.000004.017 +sync: db +upgraded to format version: 018 create: db/temporary.000003.dbtmp sync: db/temporary.000003.dbtmp close: db/temporary.000003.dbtmp @@ -352,9 +357,9 @@ close: . open-dir: checkpoint link: db/OPTIONS-000003 -> checkpoint/OPTIONS-000003 open-dir: checkpoint -create: checkpoint/marker.format-version.000001.017 -sync-data: checkpoint/marker.format-version.000001.017 -close: checkpoint/marker.format-version.000001.017 +create: checkpoint/marker.format-version.000001.018 +sync-data: checkpoint/marker.format-version.000001.018 +close: checkpoint/marker.format-version.000001.018 sync: checkpoint close: checkpoint link: db/000013.sst -> checkpoint/000013.sst diff --git a/testdata/excise b/testdata/excise index b6492ee658e..ec3e0dff5d2 100644 --- a/testdata/excise +++ b/testdata/excise @@ -542,8 +542,7 @@ c: (something, .) . -# Test to verify that IngestAndExcise now uses flushableIngest when directed to -# do so using a flag that is passed down by the caller. +# Test to verify that IngestAndExcise now uses flushableIngest. reset ---- @@ -625,18 +624,18 @@ set f something del b-e ---- -ingest-and-excise ext5 excise=b-e contains-excise-tombstone +ingest-and-excise ext5 excise=b-e ---- -flushable ingest +memtable flushed lsm ---- L0.0: - 000010:[b#17,SET-f#17,SET] - 000013:[x#15,SET-x#15,SET] + 000010:[b#18,SET-f#18,SET] + 000012:[x#15,SET-x#15,SET] L6: - 000014(000009):[a#0,SET-a#0,SET] - 000015(000009):[g#0,SET-g#0,SET] + 000013(000009):[a#0,SET-a#0,SET] + 000014(000009):[g#0,SET-g#0,SET] iter lower=c upper=e last @@ -667,7 +666,7 @@ compact a z lsm ---- L6: - 000018:[a#0,SET-x#0,SET] + 000017:[a#0,SET-x#0,SET] batch commit ---- @@ -683,17 +682,17 @@ set f somethingElse del b-e ---- -ingest-and-excise ext5 ext6 excise=b-e contains-excise-tombstone +ingest-and-excise ext5 ext6 excise=b-e ---- lsm ---- L0.0: - 000019:[a#22,SET-a#22,SET] - 000020:[b#23,SET-f#23,SET] + 000018:[a#22,SET-a#22,SET] + 000019:[b#23,SET-f#23,SET] L6: - 000021(000018):[a#0,SET-aa#0,SET] - 000022(000018):[f#0,SET-x#0,SET] + 000020(000017):[a#0,SET-aa#0,SET] + 000021(000017):[f#0,SET-x#0,SET] iter lower=a upper=f first @@ -709,3 +708,74 @@ b: (somethingElse, .) c: (somethingElse, .) . . + +# Two ovelrapping ingestions wait on one another even if +# the overlap is only on the excise span. + +reset +---- + +batch +set a foo +set b bar +set bb neverseen +set c baz +---- + +build ext7 +set b foo +set c bar +---- + +ingest-and-excise ext7 excise=b-g no-wait +---- + +build ext8 +set d gee +set e fee +---- + +ingest ext8 no-wait +---- + +iter +first +next +next +next +next +next +---- +a: (foo, .) +b: (foo, .) +c: (bar, .) +d: (gee, .) +e: (fee, .) +. + +flush +---- + +lsm +---- +L0.0: + 000007(000006):[a#10,SET-a#10,SET] +L6: + 000004:[b#15,SET-c#15,SET] + 000008:[d#16,SET-e#16,SET] + + +iter +first +next +next +next +next +next +---- +a: (foo, .) +b: (foo, .) +c: (bar, .) +d: (gee, .) +e: (fee, .) +. diff --git a/testdata/flushable_ingest b/testdata/flushable_ingest index c5806832d06..8d19b96ffec 100644 --- a/testdata/flushable_ingest +++ b/testdata/flushable_ingest @@ -60,7 +60,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 # Test basic WAL replay @@ -81,7 +81,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 open @@ -94,6 +94,7 @@ L0.1: 000004:[a#11,SET-a#11,SET] L0.0: 000009:[a#10,SET-a#10,SET] +L6: 000005:[b#12,SET-b#12,SET] 000006:[d#13,SET-d#13,SET] @@ -389,7 +390,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 close @@ -409,7 +410,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 open @@ -422,9 +423,10 @@ L0.1: 000004:[a#11,SET-a#11,SET] L0.0: 000009:[a#10,SET-a#10,SET] + 000012:[f#14,SET-f#14,SET] +L6: 000005:[b#12,SET-b#12,SET] 000006:[d#13,SET-d#13,SET] - 000010:[f#14,SET-f#14,SET] # Check if the new mutable memtable is using a new log file, and that the # previous log files have been deleted appropriately after the flush. @@ -434,15 +436,15 @@ ls 000005.sst 000006.sst 000009.sst -000010.sst -000011.log +000012.sst +000014.log LOCK MANIFEST-000001 -MANIFEST-000012 -OPTIONS-000013 +MANIFEST-000011 +OPTIONS-000015 ext -marker.format-version.000004.017 -marker.manifest.000002.MANIFEST-000012 +marker.format-version.000005.018 +marker.manifest.000002.MANIFEST-000011 # Make sure that the new mutable memtable can accept writes. batch @@ -584,7 +586,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 close @@ -603,7 +605,7 @@ MANIFEST-000001 OPTIONS-000003 ext ext1 -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 open diff --git a/testdata/ingested_flushable_api b/testdata/ingested_flushable_api index 5d36168efe5..a54cca56b47 100644 --- a/testdata/ingested_flushable_api +++ b/testdata/ingested_flushable_api @@ -40,7 +40,7 @@ iter rangekeyIter ---- -d-g:{(#0,RANGEKEYSET,1,val1)} +d-g:{(#1,RANGEKEYSET,1,val1)} containsRangeKey ---- @@ -65,8 +65,8 @@ iter rangedelIter ---- -a-j:{(#0,RANGEDEL)} -o-z:{(#0,RANGEDEL)} +a-j:{(#2,RANGEDEL)} +o-z:{(#2,RANGEDEL)} rangekeyIter ---- @@ -93,15 +93,73 @@ flushable ext4 iter ---- -k#0,SET +k#3,SET rangekeyIter ---- -y-z:{(#0,RANGEKEYSET,1,val1)} +y-z:{(#3,RANGEKEYSET,1,val1)} rangedelIter ---- -a-j:{(#0,RANGEDEL)} +a-j:{(#3,RANGEDEL)} + +readyForFlush +---- +true + +containsRangeKey +---- +true + +build ext5 +set a aa +set k kk +range-key-set y z 1 val1 +---- + +flushable ext5 excise=a-g +---- + +iter +---- +a#5,SET +k#5,SET + +rangekeyIter +---- +a-g:{(#4,RANGEKEYDEL)} +y-z:{(#5,RANGEKEYSET,1,val1)} + +rangedelIter +---- +a-g:{(#4,RANGEDEL)} + +readyForFlush +---- +true + +containsRangeKey +---- +true + +build ext6 +range-key-set a c 1 val1 +---- + +flushable ext6 excise=d-g +---- + +iter +---- + +rangekeyIter +---- +a-c:{(#7,RANGEKEYSET,1,val1)} +d-g:{(#6,RANGEKEYDEL)} + +rangedelIter +---- +d-g:{(#6,RANGEDEL)} readyForFlush ---- diff --git a/tool/db.go b/tool/db.go index 51396f52d99..2f7a2bd5bd8 100644 --- a/tool/db.go +++ b/tool/db.go @@ -625,7 +625,7 @@ func (d *dbT) runExcise(cmd *cobra.Command, args []string) { return } - _, err = db.IngestAndExcise(context.Background(), []string{path}, nil, nil, span, true /* sstsContainExciseTombstone */) + _, err = db.IngestAndExcise(context.Background(), []string{path}, nil, nil, span) if err != nil { fmt.Fprintf(stderr, "Error excising: %s\n", err) return diff --git a/tool/wal.go b/tool/wal.go index 27a279b8504..4d0ffde5abd 100644 --- a/tool/wal.go +++ b/tool/wal.go @@ -271,6 +271,8 @@ func (w *walT) dumpBatch( case base.InternalKeyKindIngestSST: fileNum, _ := binary.Uvarint(ukey) fmt.Fprintf(stdout, "%s", base.FileNum(fileNum)) + case base.InternalKeyKindExcise: + fmt.Fprintf(stdout, "%s,%s", w.fmtKey.fn(ukey), w.fmtKey.fn(value)) case base.InternalKeyKindSingleDelete: fmt.Fprintf(stdout, "%s", w.fmtKey.fn(ukey)) case base.InternalKeyKindSetWithDelete: