diff --git a/batch.go b/batch.go index 556874e2eb..1c95d4ef97 100644 --- a/batch.go +++ b/batch.go @@ -552,6 +552,12 @@ func (b *Batch) refreshMemTableSize() error { } // This key kind doesn't contribute to the memtable size. continue + case InternalKeyKindExcise: + if b.minimumFormatMajorVersion < FormatFlushableIngestExcises { + b.minimumFormatMajorVersion = FormatFlushableIngestExcises + } + // This key kind doesn't contribute to the memtable size. + continue default: // Note In some circumstances this might be temporary memory // corruption that can be recovered by discarding the batch and @@ -608,7 +614,7 @@ func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error { b.countRangeDels++ case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete: b.countRangeKeys++ - case InternalKeyKindIngestSST: + case InternalKeyKindIngestSST, InternalKeyKindExcise: panic("pebble: invalid key kind for batch") case InternalKeyKindLogData: // LogData does not contribute to memtable size. @@ -1193,6 +1199,28 @@ func (b *Batch) ingestSST(fileNum base.FileNum) { b.minimumFormatMajorVersion = FormatFlushableIngest } +// Excise adds the excise span for a flushable ingest containing an excise. The data +// will only be written to the WAL (not added to memtables or sstables). +func (b *Batch) excise(start, end []byte) { + if b.Empty() { + b.ingestedSSTBatch = true + } else if !b.ingestedSSTBatch { + // Batch contains other key kinds. + panic("pebble: invalid call to excise") + } + + origMemTableSize := b.memTableSize + b.prepareDeferredKeyValueRecord(len(start), len(end), InternalKeyKindExcise) + copy(b.deferredOp.Key, start) + copy(b.deferredOp.Value, end) + // Since excise writes only to the WAL and does not affect the memtable, + // we restore b.memTableSize to its original value. Note that Batch.count + // is not reset because for the InternalKeyKindIngestSST/Excise the count + // is the number of sstable paths which have been added to the batch. + b.memTableSize = origMemTableSize + b.minimumFormatMajorVersion = FormatFlushableIngestExcises +} + // Empty returns true if the batch is empty, and false otherwise. func (b *Batch) Empty() bool { return batchrepr.IsEmpty(b.data) diff --git a/batchrepr/reader.go b/batchrepr/reader.go index 652e94b8b5..cceb2be397 100644 --- a/batchrepr/reader.go +++ b/batchrepr/reader.go @@ -97,7 +97,7 @@ func (r *Reader) Next() (kind base.InternalKeyKind, ukey []byte, value []byte, o switch kind { case base.InternalKeyKindSet, base.InternalKeyKindMerge, base.InternalKeyKindRangeDelete, base.InternalKeyKindRangeKeySet, base.InternalKeyKindRangeKeyUnset, base.InternalKeyKindRangeKeyDelete, - base.InternalKeyKindDeleteSized: + base.InternalKeyKindDeleteSized, base.InternalKeyKindExcise: *r, value, ok = DecodeStr(*r) if !ok { return 0, nil, nil, false, errors.Wrapf(ErrInvalidBatch, "decoding %s value", kind) diff --git a/data_test.go b/data_test.go index 3679a39dea..453a0e6948 100644 --- a/data_test.go +++ b/data_test.go @@ -1331,7 +1331,6 @@ func (d *DB) waitTableStats() { func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { var exciseSpan KeyRange - var sstContainsExciseTombstone bool paths := make([]string, 0, len(td.CmdArgs)) for i, arg := range td.CmdArgs { switch td.CmdArgs[i].Key { @@ -1345,14 +1344,14 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { } exciseSpan.Start = []byte(fields[0]) exciseSpan.End = []byte(fields[1]) - case "contains-excise-tombstone": - sstContainsExciseTombstone = true + case "no-wait": + // Handled by callers. default: paths = append(paths, arg.String()) } } - if _, err := d.IngestAndExcise(context.Background(), paths, nil /* shared */, nil /* external */, exciseSpan, sstContainsExciseTombstone); err != nil { + if _, err := d.IngestAndExcise(context.Background(), paths, nil /* shared */, nil /* external */, exciseSpan); err != nil { return err } return nil @@ -1361,6 +1360,10 @@ func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { paths := make([]string, 0, len(td.CmdArgs)) for _, arg := range td.CmdArgs { + if arg.Key == "no-wait" { + // Handled by callers. + continue + } paths = append(paths, arg.String()) } diff --git a/flushable.go b/flushable.go index c014bb9b11..41e58ae44b 100644 --- a/flushable.go +++ b/flushable.go @@ -164,6 +164,7 @@ type ingestedFlushable struct { // exciseSpan is populated if an excise operation should be performed during // flush. exciseSpan KeyRange + seqNum base.SeqNum } func newIngestedFlushable( @@ -172,6 +173,7 @@ func newIngestedFlushable( newIters tableNewIters, newRangeKeyIters keyspanimpl.TableNewSpanIter, exciseSpan KeyRange, + seqNum base.SeqNum, ) *ingestedFlushable { if invariants.Enabled { for i := 1; i < len(files); i++ { @@ -200,6 +202,7 @@ func newIngestedFlushable( slice: manifest.NewLevelSliceKeySorted(comparer.Compare, files), hasRangeKeys: hasRangeKeys, exciseSpan: exciseSpan, + seqNum: seqNum, } return ret @@ -245,30 +248,69 @@ func (s *ingestedFlushable) constructRangeDelIter( // TODO(sumeer): *IterOptions are being ignored, so the index block load for // the point iterator in constructRangeDeIter is not tracked. func (s *ingestedFlushable) newRangeDelIter(_ *IterOptions) keyspan.FragmentIterator { - return keyspanimpl.NewLevelIter( + liter := keyspanimpl.NewLevelIter( context.TODO(), keyspan.SpanIterOptions{}, s.comparer.Compare, s.constructRangeDelIter, s.slice.Iter(), manifest.FlushableIngestsLayer(), manifest.KeyTypePoint, ) + if !s.exciseSpan.Valid() { + return liter + } + // We have an excise span to weave into the rangedel iterators. + // + // TODO(bilal): should this be pooled? + miter := &keyspanimpl.MergingIter{} + rdel := keyspan.Span{ + Start: s.exciseSpan.Start, + End: s.exciseSpan.End, + Keys: []keyspan.Key{{Trailer: base.MakeTrailer(s.seqNum, base.InternalKeyKindRangeDelete)}}, + } + rdelIter := keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rdel}) + miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter, rdelIter) + return miter } // newRangeKeyIter is part of the flushable interface. func (s *ingestedFlushable) newRangeKeyIter(o *IterOptions) keyspan.FragmentIterator { - if !s.containsRangeKeys() { - return nil + var rkeydelIter keyspan.FragmentIterator + if s.exciseSpan.Valid() { + // We have an excise span to weave into the rangekey iterators. + rkeydel := keyspan.Span{ + Start: s.exciseSpan.Start, + End: s.exciseSpan.End, + Keys: []keyspan.Key{{Trailer: base.MakeTrailer(s.seqNum, base.InternalKeyKindRangeKeyDelete)}}, + } + rkeydelIter = keyspan.NewIter(s.comparer.Compare, []keyspan.Span{rkeydel}) + } + + if !s.hasRangeKeys { + if rkeydelIter == nil { + // NB: we have to return the nil literal as opposed to the nil + // value of rkeydelIter, otherwise callers of this function will + // have the return value fail == nil checks. + return nil + } + return rkeydelIter } - return keyspanimpl.NewLevelIter( + // TODO(bilal): should this be pooled? + miter := &keyspanimpl.MergingIter{} + liter := keyspanimpl.NewLevelIter( context.TODO(), keyspan.SpanIterOptions{}, s.comparer.Compare, s.newRangeKeyIters, s.slice.Iter(), manifest.FlushableIngestsLayer(), manifest.KeyTypeRange, ) + miter.Init(s.comparer, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), liter) + if rkeydelIter != nil { + miter.AddLevel(rkeydelIter) + } + return miter } // containsRangeKeys is part of the flushable interface. func (s *ingestedFlushable) containsRangeKeys() bool { - return s.hasRangeKeys + return s.hasRangeKeys || s.exciseSpan.Valid() } // inuseBytes is part of the flushable interface. @@ -329,6 +371,10 @@ func (s *ingestedFlushable) anyFileOverlaps(bounds base.UserKeyBounds) bool { // checks above. return true } + if s.exciseSpan.Valid() { + uk := s.exciseSpan.UserKeyBounds() + return uk.Overlaps(s.comparer.Compare, &bounds) + } return false } diff --git a/flushable_test.go b/flushable_test.go index f8ee20580b..3be5fd90ba 100644 --- a/flushable_test.go +++ b/flushable_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "strings" "testing" "github.com/cockroachdb/datadriven" @@ -47,7 +48,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { } reset() - loadFileMeta := func(paths []string) []*fileMetadata { + loadFileMeta := func(paths []string, exciseSpan KeyRange, seqNum base.SeqNum) []*fileMetadata { d.mu.Lock() pendingOutputs := make([]base.FileNum, len(paths)) for i := range paths { @@ -60,11 +61,17 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // not actually ingesting a file. lr, err := ingestLoad(context.Background(), d.opts, d.FormatMajorVersion(), paths, nil, nil, d.cacheID, pendingOutputs) if err != nil { - panic(err) + t.Fatal(err) } meta := make([]*fileMetadata, len(lr.local)) + if exciseSpan.Valid() { + seqNum++ + } for i := range meta { meta[i] = lr.local[i].fileMetadata + if err := setSeqNumInMetadata(meta[i], seqNum+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil { + t.Fatal(err) + } } if len(meta) == 0 { // All of the sstables to be ingested were empty. Nothing to do. @@ -77,8 +84,8 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { } // Verify the sstables do not overlap. - if err := ingestSortAndVerify(d.cmp, lr, KeyRange{}); err != nil { - panic("unsorted sstables") + if err := ingestSortAndVerify(d.cmp, lr, exciseSpan); err != nil { + t.Fatal(err) } // Hard link the sstables into the DB directory. Since the sstables aren't @@ -87,7 +94,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // fall back to copying, and if that fails we undo our work and return an // error. if err := ingestLinkLocal(context.Background(), jobID, d.opts, d.objProvider, lr.local); err != nil { - panic("couldn't hard link sstables") + t.Fatal(err) } // Fsync the directory we added the tables to. We need to do this at some @@ -95,12 +102,13 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // can have the tables referenced in the MANIFEST, but not present in the // directory. if err := d.dataDir.Sync(); err != nil { - panic("Couldn't sync data directory") + t.Fatal(err) } return meta } + var seqNum uint64 datadriven.RunTest(t, "testdata/ingested_flushable_api", func(t *testing.T, td *datadriven.TestData) string { switch td.Cmd { case "reset": @@ -114,12 +122,26 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { case "flushable": // Creates an ingestedFlushable over the input files. paths := make([]string, 0, len(td.CmdArgs)) + var exciseSpan KeyRange + startSeqNum := base.SeqNum(seqNum) for _, arg := range td.CmdArgs { - paths = append(paths, arg.String()) + switch arg.Key { + case "excise": + parts := strings.Split(arg.Vals[0], "-") + if len(parts) != 2 { + return fmt.Sprintf("invalid excise range: %s", arg.Vals[0]) + } + exciseSpan.Start = []byte(parts[0]) + exciseSpan.End = []byte(parts[1]) + seqNum++ + default: + paths = append(paths, arg.String()) + seqNum++ + } } - meta := loadFileMeta(paths) - flushable = newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, KeyRange{}) + meta := loadFileMeta(paths, exciseSpan, startSeqNum) + flushable = newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan, base.SeqNum(startSeqNum)) return "" case "iter": iter := flushable.newIter(nil) diff --git a/format_major_version.go b/format_major_version.go index 90ef9f0b6a..dd62f32ff4 100644 --- a/format_major_version.go +++ b/format_major_version.go @@ -185,6 +185,12 @@ const ( // fields in the Manifest and thus requires a format major version. FormatSyntheticPrefixSuffix + // FormatFlushableIngestExcises is a format major version that adds support for + // having excises unconditionally being written as flushable ingestions. This + // is implemented through adding a new key kind that can go in the same batches + // as flushable ingested sstables. + FormatFlushableIngestExcises + // TODO(msbutler): add major version for synthetic suffixes // -- Add new versions here -- @@ -222,7 +228,8 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat { switch v { case FormatDefault, FormatFlushableIngest, FormatPrePebblev1MarkedCompacted: return sstable.TableFormatPebblev3 - case FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix: + case FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix, + FormatFlushableIngestExcises: return sstable.TableFormatPebblev4 default: panic(fmt.Sprintf("pebble: unsupported format major version: %s", v)) @@ -234,7 +241,8 @@ func (v FormatMajorVersion) MaxTableFormat() sstable.TableFormat { func (v FormatMajorVersion) MinTableFormat() sstable.TableFormat { switch v { case FormatDefault, FormatFlushableIngest, FormatPrePebblev1MarkedCompacted, - FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix: + FormatDeleteSizedAndObsolete, FormatVirtualSSTables, FormatSyntheticPrefixSuffix, + FormatFlushableIngestExcises: return sstable.TableFormatPebblev1 default: panic(fmt.Sprintf("pebble: unsupported format major version: %s", v)) @@ -271,6 +279,9 @@ var formatMajorVersionMigrations = map[FormatMajorVersion]func(*DB) error{ FormatSyntheticPrefixSuffix: func(d *DB) error { return d.finalizeFormatVersUpgrade(FormatSyntheticPrefixSuffix) }, + FormatFlushableIngestExcises: func(d *DB) error { + return d.finalizeFormatVersUpgrade(FormatFlushableIngestExcises) + }, } const formatVersionMarkerName = `format-version` diff --git a/format_major_version_test.go b/format_major_version_test.go index 3cf01976c4..364f36197c 100644 --- a/format_major_version_test.go +++ b/format_major_version_test.go @@ -24,11 +24,12 @@ func TestFormatMajorVersionStableValues(t *testing.T) { require.Equal(t, FormatDeleteSizedAndObsolete, FormatMajorVersion(15)) require.Equal(t, FormatVirtualSSTables, FormatMajorVersion(16)) require.Equal(t, FormatSyntheticPrefixSuffix, FormatMajorVersion(17)) + require.Equal(t, FormatFlushableIngestExcises, FormatMajorVersion(18)) // When we add a new version, we should add a check for the new version in // addition to updating these expected values. - require.Equal(t, FormatNewest, FormatMajorVersion(17)) - require.Equal(t, internalFormatNewest, FormatMajorVersion(17)) + require.Equal(t, FormatNewest, FormatMajorVersion(18)) + require.Equal(t, internalFormatNewest, FormatMajorVersion(18)) } func TestFormatMajorVersion_MigrationDefined(t *testing.T) { @@ -53,6 +54,8 @@ func TestRatchetFormat(t *testing.T) { require.Equal(t, FormatVirtualSSTables, d.FormatMajorVersion()) require.NoError(t, d.RatchetFormatMajorVersion(FormatSyntheticPrefixSuffix)) require.Equal(t, FormatSyntheticPrefixSuffix, d.FormatMajorVersion()) + require.NoError(t, d.RatchetFormatMajorVersion(FormatFlushableIngestExcises)) + require.Equal(t, FormatFlushableIngestExcises, d.FormatMajorVersion()) require.NoError(t, d.Close()) @@ -203,6 +206,7 @@ func TestFormatMajorVersions_TableFormat(t *testing.T) { FormatDeleteSizedAndObsolete: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4}, FormatVirtualSSTables: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4}, FormatSyntheticPrefixSuffix: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4}, + FormatFlushableIngestExcises: {sstable.TableFormatPebblev1, sstable.TableFormatPebblev4}, } // Valid versions. diff --git a/ingest.go b/ingest.go index f43c1a8e53..fdfe93117d 100644 --- a/ingest.go +++ b/ingest.go @@ -141,7 +141,7 @@ func ingestSynthesizeShared( // a.RANGEDEL.100, with a.RANGEDEL.100 being the smallest key. To create a // correct bound, we just use the maximum key kind (which sorts first). // Similarly, we use the smallest key kind for the largest key. - smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, base.InternalKeyKindMax) + smallestPointKey := base.MakeInternalKey(sm.SmallestPointKey.UserKey, 0, base.InternalKeyKindMaxForSstable) largestPointKey := base.MakeInternalKey(sm.LargestPointKey.UserKey, 0, 0) if sm.LargestPointKey.IsExclusiveSentinel() { largestPointKey = base.MakeRangeDeleteSentinelKey(sm.LargestPointKey.UserKey) @@ -217,12 +217,12 @@ func ingestLoad1External( if e.EndKeyIsInclusive { meta.ExtendPointKeyBounds( opts.Comparer.Compare, - base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax), + base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindMaxForSstable), base.MakeInternalKey(largestCopy, 0, 0)) } else { meta.ExtendPointKeyBounds( opts.Comparer.Compare, - base.MakeInternalKey(smallestCopy, 0, InternalKeyKindMax), + base.MakeInternalKey(smallestCopy, 0, base.InternalKeyKindMaxForSstable), base.MakeRangeDeleteSentinelKey(largestCopy)) } } @@ -1038,7 +1038,7 @@ func (d *DB) Ingest(ctx context.Context, paths []string) error { if d.opts.ReadOnly { return ErrReadOnly } - _, err := d.ingest(ctx, paths, nil /* shared */, KeyRange{}, false, nil /* external */) + _, err := d.ingest(ctx, paths, nil /* shared */, KeyRange{}, nil /* external */) return err } @@ -1126,7 +1126,7 @@ func (d *DB) IngestWithStats(ctx context.Context, paths []string) (IngestOperati if d.opts.ReadOnly { return IngestOperationStats{}, ErrReadOnly } - return d.ingest(ctx, paths, nil, KeyRange{}, false, nil) + return d.ingest(ctx, paths, nil, KeyRange{}, nil) } // IngestExternalFiles does the same as IngestWithStats, and additionally @@ -1146,7 +1146,7 @@ func (d *DB) IngestExternalFiles( if d.opts.Experimental.RemoteStorage == nil { return IngestOperationStats{}, errors.New("pebble: cannot ingest external files without shared storage configured") } - return d.ingest(ctx, nil, nil, KeyRange{}, false, external) + return d.ingest(ctx, nil, nil, KeyRange{}, external) } // IngestAndExcise does the same as IngestWithStats, and additionally accepts a @@ -1165,7 +1165,6 @@ func (d *DB) IngestAndExcise( shared []SharedSSTMeta, external []ExternalFile, exciseSpan KeyRange, - sstsContainExciseTombstone bool, ) (IngestOperationStats, error) { if err := d.closed.Load(); err != nil { panic(err) @@ -1188,13 +1187,24 @@ func (d *DB) IngestAndExcise( v, FormatMinForSharedObjects, ) } - return d.ingest(ctx, paths, shared, exciseSpan, sstsContainExciseTombstone, external) + return d.ingest(ctx, paths, shared, exciseSpan, external) } // Both DB.mu and commitPipeline.mu must be held while this is called. func (d *DB) newIngestedFlushableEntry( meta []*fileMetadata, seqNum base.SeqNum, logNum base.DiskFileNum, exciseSpan KeyRange, ) (*flushableEntry, error) { + // If there's an excise being done atomically with the same ingest, we + // assign the lowest sequence number in the set of sequence numbers for this + // ingestion to the excise. Note that we've already allocated fileCount+1 + // sequence numbers in this case. + // + // This mimics the behaviour in the non-flushable ingest case (see the callsite + // for ingestUpdateSeqNum). + fileSeqNumStart := seqNum + if exciseSpan.Valid() { + fileSeqNumStart = seqNum + 1 // the first seqNum is reserved for the excise. + } // Update the sequence number for all of the sstables in the // metadata. Writing the metadata to the manifest when the // version edit is applied is the mechanism that persists the @@ -1204,12 +1214,12 @@ func (d *DB) newIngestedFlushableEntry( // time, then we'll lose the ingest sequence number information. But this // information will also be reconstructed on node restart. for i, m := range meta { - if err := setSeqNumInMetadata(m, seqNum+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil { + if err := setSeqNumInMetadata(m, fileSeqNumStart+base.SeqNum(i), d.cmp, d.opts.Comparer.FormatKey); err != nil { return nil, err } } - f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan) + f := newIngestedFlushable(meta, d.opts.Comparer, d.newIters, d.tableNewRangeKeyIter, exciseSpan, seqNum) // NB: The logNum/seqNum are the WAL number which we're writing this entry // to and the sequence number within the WAL which we'll write this entry @@ -1243,6 +1253,9 @@ func (d *DB) handleIngestAsFlushable( meta []*fileMetadata, seqNum base.SeqNum, exciseSpan KeyRange, ) error { b := d.NewBatch() + if exciseSpan.Valid() { + b.excise(exciseSpan.Start, exciseSpan.End) + } for _, m := range meta { b.ingestSST(m.FileNum) } @@ -1272,6 +1285,11 @@ func (d *DB) handleIngestAsFlushable( d.mu.Lock() } + // The excise span is going to outlive this ingestion call. Copy it. + exciseSpan = KeyRange{ + Start: slices.Clone(exciseSpan.Start), + End: slices.Clone(exciseSpan.End), + } entry, err := d.newIngestedFlushableEntry(meta, seqNum, logNum, exciseSpan) if err != nil { return err @@ -1314,7 +1332,6 @@ func (d *DB) ingest( paths []string, shared []SharedSSTMeta, exciseSpan KeyRange, - sstsContainExciseTombstone bool, external []ExternalFile, ) (IngestOperationStats, error) { if len(shared) > 0 && d.opts.Experimental.RemoteStorage == nil { @@ -1524,17 +1541,16 @@ func (d *DB) ingest( hasRemoteFiles := len(shared) > 0 || len(external) > 0 canIngestFlushable := d.FormatMajorVersion() >= FormatFlushableIngest && (len(d.mu.mem.queue) < d.opts.MemTableStopWritesThreshold) && - !d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles + !d.opts.Experimental.DisableIngestAsFlushable() && !hasRemoteFiles && + (!exciseSpan.Valid() || d.FormatMajorVersion() >= FormatFlushableIngestExcises) - if !canIngestFlushable || (exciseSpan.Valid() && !sstsContainExciseTombstone) { + if !canIngestFlushable { // We're not able to ingest as a flushable, // so we must synchronously flush. // - // TODO(bilal): Currently, if any of the files being ingested are shared or - // there's an excise span present, we cannot use flushable ingests and need - // to wait synchronously. Either remove this caveat by fleshing out - // flushable ingest logic to also account for these cases, or remove this - // comment. Tracking issue: https://github.com/cockroachdb/pebble/issues/2676 + // TODO(bilal): Currently, if any of the files being ingested are shared, + // we cannot use flushable ingests and need + // to wait synchronously. if mem.flushable == d.mu.mem.mutable { err = d.makeRoomForWrite(nil) } diff --git a/ingest_test.go b/ingest_test.go index de39271742..b80112a415 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -431,7 +431,7 @@ func TestOverlappingIngestedSSTs(t *testing.T) { ) cache := NewCache(0) defer func() { - if !closed { + if d != nil && !closed { require.NoError(t, d.Close()) } cache.Unref() @@ -443,6 +443,7 @@ func TestOverlappingIngestedSSTs(t *testing.T) { reset := func(strictMem bool) { if d != nil && !closed { require.NoError(t, d.Close()) + d = nil } blockFlush = false @@ -710,9 +711,19 @@ func TestExcise(t *testing.T) { case "ingest": flushed = false + noWait := false + for i := range td.CmdArgs { + switch td.CmdArgs[i].Key { + case "no-wait": + noWait = true + } + } if err := runIngestCmd(td, d, mem); err != nil { return err.Error() } + if noWait { + return "" + } // Wait for a possible flush. d.mu.Lock() for d.mu.compact.flushing { @@ -726,12 +737,26 @@ func TestExcise(t *testing.T) { case "ingest-and-excise": flushed = false - d.mu.Lock() - prevFlushableIngests := d.mu.versions.metrics.Flush.AsIngestCount - d.mu.Unlock() + noWait := false + for i := range td.CmdArgs { + switch td.CmdArgs[i].Key { + case "no-wait": + noWait = true + } + } + var prevFlushableIngests uint64 + if !noWait { + d.mu.Lock() + prevFlushableIngests = d.mu.versions.metrics.Flush.AsIngestCount + d.mu.Unlock() + } + if err := runIngestAndExciseCmd(td, d, mem); err != nil { return err.Error() } + if noWait { + return "" + } // Wait for a possible flush. d.mu.Lock() for d.mu.compact.flushing { @@ -1127,7 +1152,7 @@ func testIngestSharedImpl( require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey}, false) + _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil /* external */, KeyRange{Start: startKey, End: endKey}) require.NoError(t, err) return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs)) @@ -1358,7 +1383,7 @@ func TestSimpleIngestShared(t *testing.T) { } _, err = d.IngestAndExcise( context.Background(), []string{}, []SharedSSTMeta{sharedSSTMeta}, nil, /* external */ - KeyRange{Start: []byte("d"), End: []byte("ee")}, false) + KeyRange{Start: []byte("d"), End: []byte("ee")}) require.NoError(t, err) // TODO(bilal): Once reading of shared sstables is in, verify that the values @@ -1628,7 +1653,7 @@ func TestConcurrentExcise(t *testing.T) { require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil, KeyRange{Start: startKey, End: endKey}, false) + _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil, KeyRange{Start: startKey, End: endKey}) require.NoError(t, err) return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs)) @@ -2063,7 +2088,7 @@ func TestIngestExternal(t *testing.T) { ) require.NoError(t, err) require.NoError(t, w.Close()) - _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, nil /* shared */, externalFiles, KeyRange{Start: startKey, End: endKey}, false) + _, err = to.IngestAndExcise(context.Background(), []string{sstPath}, nil /* shared */, externalFiles, KeyRange{Start: startKey, End: endKey}) require.NoError(t, err) return fmt.Sprintf("replicated %d external SSTs", len(externalFiles)) diff --git a/internal.go b/internal.go index 3e32434717..d6997c6bce 100644 --- a/internal.go +++ b/internal.go @@ -29,6 +29,7 @@ const ( InternalKeyKindRangeKeyMax = base.InternalKeyKindRangeKeyMax InternalKeyKindIngestSST = base.InternalKeyKindIngestSST InternalKeyKindDeleteSized = base.InternalKeyKindDeleteSized + InternalKeyKindExcise = base.InternalKeyKindExcise InternalKeyKindInvalid = base.InternalKeyKindInvalid ) diff --git a/internal/base/internal.go b/internal/base/internal.go index e364508ba0..76b426d8ce 100644 --- a/internal/base/internal.go +++ b/internal/base/internal.go @@ -126,8 +126,8 @@ const ( // InternalKeyKindIngestSST is used to distinguish a batch that corresponds to // the WAL entry for ingested sstables that are added to the flushable - // queue. This InternalKeyKind cannot appear, amongst other key kinds in a - // batch, or in an sstable. + // queue. This InternalKeyKind cannot appear amongst other key kinds in a + // batch (with the exception of alongside InternalKeyKindExcise), or in an sstable. InternalKeyKindIngestSST InternalKeyKind = 22 // InternalKeyKindDeleteSized keys behave identically to @@ -137,6 +137,14 @@ const ( // heuristics, but is not required to be accurate for correctness. InternalKeyKindDeleteSized InternalKeyKind = 23 + // InternalKeyKindExcise is used to persist the Excise part of an IngestAndExcise + // to a WAL. An Excise is similar to a RangeDel+RangeKeyDel combined, in that it + // deletes all point and range keys in a given key range while also immediately + // truncating sstables to exclude this key span. This InternalKeyKind cannot + // appear amongst other key kinds in a batch (with the exception of alongside + // InternalKeyKindIngestSST), or in an sstable. + InternalKeyKindExcise InternalKeyKind = 24 + // This maximum value isn't part of the file format. Future extensions may // increase this value. // @@ -146,7 +154,13 @@ const ( // which sorts 'less than or equal to' any other valid internalKeyKind, when // searching for any kind of internal key formed by a certain user key and // seqNum. - InternalKeyKindMax InternalKeyKind = 23 + InternalKeyKindMax InternalKeyKind = 24 + + // InternalKeyKindMaxForSstable is the largest valid key kind that can exist + // in an sstable. This should usually equal InternalKeyKindMax, except + // if the current InternalKeyKindMax is a kind that is never added to an + // sstable or memtable (eg. InternalKeyKindExcise). + InternalKeyKindMaxForSstable InternalKeyKind = InternalKeyKindDeleteSized // Internal to the sstable format. Not exposed by any sstable iterator. // Declared here to prevent definition of valid key kinds that set this bit. @@ -191,6 +205,7 @@ var internalKeyKindNames = []string{ InternalKeyKindRangeKeyDelete: "RANGEKEYDEL", InternalKeyKindIngestSST: "INGESTSST", InternalKeyKindDeleteSized: "DELSIZED", + InternalKeyKindExcise: "EXCISE", InternalKeyKindInvalid: "INVALID", } @@ -290,6 +305,7 @@ var kindsMap = map[string]InternalKeyKind{ "RANGEKEYDEL": InternalKeyKindRangeKeyDelete, "INGESTSST": InternalKeyKindIngestSST, "DELSIZED": InternalKeyKindDeleteSized, + "EXCISE": InternalKeyKindExcise, } // ParseInternalKey parses the string representation of an internal key. The diff --git a/internal/base/internal_test.go b/internal/base/internal_test.go index d06b789741..626a7ec86f 100644 --- a/internal/base/internal_test.go +++ b/internal/base/internal_test.go @@ -41,7 +41,7 @@ func TestInvalidInternalKey(t *testing.T) { "\x01\x02\x03\x04\x05\x06\x07", "foo", "foo\x08\x07\x06\x05\x04\x03\x02", - "foo\x18\x07\x06\x05\x04\x03\x02\x01", + "foo\x19\x07\x06\x05\x04\x03\x02\x01", } for _, tc := range testCases { k := DecodeInternalKey([]byte(tc)) diff --git a/mem_table.go b/mem_table.go index d2a08421ca..277a83537d 100644 --- a/mem_table.go +++ b/mem_table.go @@ -230,8 +230,8 @@ func (m *memTable) apply(batch *Batch, seqNum base.SeqNum) error { // Don't increment seqNum for LogData, since these are not applied // to the memtable. seqNum-- - case InternalKeyKindIngestSST: - panic("pebble: cannot apply ingested sstable key kind to memtable") + case InternalKeyKindIngestSST, InternalKeyKindExcise: + panic("pebble: cannot apply ingested sstable or excise kind keys to memtable") default: err = ins.Add(&m.skl, ikey, value) } diff --git a/metamorphic/generator.go b/metamorphic/generator.go index a582935622..cb40aab21f 100644 --- a/metamorphic/generator.go +++ b/metamorphic/generator.go @@ -1300,12 +1300,11 @@ func (g *generator) writerIngestAndExcise() { } g.add(&ingestAndExciseOp{ - dbID: dbID, - batchID: batchID, - derivedDBID: derivedDBID, - exciseStart: start, - exciseEnd: end, - sstContainsExciseTombstone: g.rng.Intn(2) == 0, + dbID: dbID, + batchID: batchID, + derivedDBID: derivedDBID, + exciseStart: start, + exciseEnd: end, }) } diff --git a/metamorphic/ops.go b/metamorphic/ops.go index 1033738945..7c5ff02f02 100644 --- a/metamorphic/ops.go +++ b/metamorphic/ops.go @@ -879,11 +879,10 @@ func (o *ingestOp) keys() []*[]byte { return nil } func (o *ingestOp) diagramKeyRanges() []pebble.KeyRange { return nil } type ingestAndExciseOp struct { - dbID objID - batchID objID - derivedDBID objID - exciseStart, exciseEnd []byte - sstContainsExciseTombstone bool + dbID objID + batchID objID + derivedDBID objID + exciseStart, exciseEnd []byte } func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) { @@ -899,13 +898,6 @@ func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) { return } - if o.sstContainsExciseTombstone { - // Add a rangedel and rangekeydel to the batch. This ensures it'll end up - // inside the sstable. Note that all entries in the sstable will have the - // same sequence number, so the ordering within the batch doesn't matter. - err = firstError(err, b.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts)) - err = firstError(err, b.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts)) - } path, writerMeta, err2 := buildForIngest(t, o.dbID, b, 0 /* i */) if err2 != nil { h.Recordf("Build(%s) // %v", o.batchID, err2) @@ -923,7 +915,7 @@ func (o *ingestAndExciseOp) run(t *Test, h historyRecorder) { _, err := db.IngestAndExcise(context.Background(), []string{path}, nil /* shared */, nil /* external */, pebble.KeyRange{ Start: o.exciseStart, End: o.exciseEnd, - }, o.sstContainsExciseTombstone) + }) return err })) } else { @@ -956,7 +948,7 @@ func (o *ingestAndExciseOp) syncObjs() objIDSlice { } func (o *ingestAndExciseOp) String() string { - return fmt.Sprintf("%s.IngestAndExcise(%s, %q, %q, %t /* sstContainsExciseTombstone */)", o.dbID, o.batchID, o.exciseStart, o.exciseEnd, o.sstContainsExciseTombstone) + return fmt.Sprintf("%s.IngestAndExcise(%s, %q, %q)", o.dbID, o.batchID, o.exciseStart, o.exciseEnd) } func (o *ingestAndExciseOp) keys() []*[]byte { @@ -1976,7 +1968,7 @@ func (r *replicateOp) runSharedReplicate( return } - _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end}, false) + _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, sharedSSTs, nil /* external */, pebble.KeyRange{Start: r.start, End: r.end}) h.Recordf("%s // %v", r, err) } @@ -2039,7 +2031,7 @@ func (r *replicateOp) runExternalReplicate( return } - _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, nil, externalSSTs /* external */, pebble.KeyRange{Start: r.start, End: r.end}, false /* sstContainsExciseTombstone */) + _, err = dest.IngestAndExcise(context.Background(), []string{sstPath}, nil, externalSSTs /* external */, pebble.KeyRange{Start: r.start, End: r.end}) h.Recordf("%s // %v", r, err) } diff --git a/metamorphic/parser.go b/metamorphic/parser.go index 833c3b4534..46924a7787 100644 --- a/metamorphic/parser.go +++ b/metamorphic/parser.go @@ -76,7 +76,7 @@ func opArgs(op op) (receiverID *objID, targetID *objID, args []interface{}) { case *ingestOp: return &t.dbID, nil, []interface{}{&t.batchIDs} case *ingestAndExciseOp: - return &t.dbID, nil, []interface{}{&t.batchID, &t.exciseStart, &t.exciseEnd, &t.sstContainsExciseTombstone} + return &t.dbID, nil, []interface{}{&t.batchID, &t.exciseStart, &t.exciseEnd} case *ingestExternalFilesOp: return &t.dbID, nil, []interface{}{&t.objs} case *initOp: diff --git a/open.go b/open.go index 7bf96b1ad1..031fd92d13 100644 --- a/open.go +++ b/open.go @@ -502,6 +502,9 @@ func Open(dirname string, opts *Options) (db *DB, err error) { // 20.1 do not guarantee that closed WALs end cleanly. But the earliest // compatible Pebble format is newer and guarantees a clean EOF. strictWALTail := i < len(replayWALs)-1 + // NB: replayWAL can individually apply version edits, in which case it'll + // zero-out ve internally before reusing it. We need to empty out toFlush + // if it did that. flush, maxSeqNum, err := d.replayWAL(jobID, &ve, lf, strictWALTail) if err != nil { return nil, err @@ -511,6 +514,12 @@ func Open(dirname string, opts *Options) (db *DB, err error) { d.mu.versions.logSeqNum.Store(maxSeqNum) } } + if d.mu.mem.mutable == nil && !d.opts.ReadOnly { + // Recreate the mutable memtable if replayWAL got rid of it. + var entry *flushableEntry + d.mu.mem.mutable, entry = d.newMemTable(d.mu.versions.getNextDiskFileNum(), d.mu.versions.logSeqNum.Load(), 0 /* minSize */) + d.mu.mem.queue = append(d.mu.mem.queue, entry) + } d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load()) if !d.opts.ReadOnly { @@ -851,11 +860,28 @@ func (d *DB) replayWAL( if err != nil { return err } - newVE, _, err := d.runCompaction(jobID, c) + var newVE *versionEdit + if c.kind == compactionKindIngestedFlushable { + d.mu.versions.logLock() + newVE, err = d.runIngestFlush(c) + d.mu.versions.logUnlock() + } else { + newVE, _, err = d.runCompaction(jobID, c) + } if err != nil { return errors.Wrapf(err, "running compaction during WAL replay") } ve.NewFiles = append(ve.NewFiles, newVE.NewFiles...) + if newVE.DeletedFiles != nil { + if ve.DeletedFiles == nil { + ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata) + } + for i := range newVE.DeletedFiles { + ve.DeletedFiles[i] = newVE.DeletedFiles[i] + } + } + ve.CreatedBackingTables = append(ve.CreatedBackingTables, newVE.CreatedBackingTables...) + ve.RemovedBackingTables = append(ve.RemovedBackingTables, newVE.RemovedBackingTables...) return nil } defer func() { @@ -905,31 +931,101 @@ func (d *DB) replayWAL( batchesReplayed++ { br := b.Reader() - if kind, encodedFileNum, _, ok, err := br.Next(); err != nil { + if kind, encodedFileNum, val, ok, err := br.Next(); err != nil { return nil, 0, err - } else if ok && kind == InternalKeyKindIngestSST { + } else if ok && (kind == InternalKeyKindIngestSST || kind == InternalKeyKindExcise) { + // We're in the flushable ingests (+ possibly excises) case. + // + // Ingests require an up-to-date view of the LSM to determine the target + // level of ingested sstables, and to accurately compute excises. Outside + // of this case, this function avoids calling logAndApply after every flushable + // as regular (memtable) flushes only add files to L0 and don't need an + // up-to-date Version. + // + // If we have an in-progress versionEdit, we need to apply it before we proceed + // with the flushable ingest. + if mem == nil && d.mu.mem.mutable != nil { + mem = d.mu.mem.mutable + entry = d.mu.mem.queue[len(d.mu.mem.queue)-1] + d.mu.mem.mutable = nil + d.mu.mem.queue = d.mu.mem.queue[:len(d.mu.mem.queue)-1] + } + if mem != nil { + mem.writerUnref() + } + flushMem() + // mem is nil here. + if !d.opts.ReadOnly && len(toFlush) > 0 { + if err := updateVE(); err != nil { + return nil, 0, err + } + } + if d.mu.versions.logSeqNum.Load() < maxSeqNum { + d.mu.versions.logSeqNum.Store(maxSeqNum) + } + d.mu.versions.visibleSeqNum.Store(d.mu.versions.logSeqNum.Load()) + if !d.opts.ReadOnly { + if len(ve.NewFiles) > 0 { + // Create an empty .log file. + newLogNum := d.mu.versions.getNextDiskFileNum() + ve.MinUnflushedLogNum = newLogNum + + // Create the manifest with the updated MinUnflushedLogNum before + // creating the new log file. If we created the log file first, a + // crash before the manifest is synced could leave two WALs with + // unclean tails. + d.mu.versions.logLock() + if err := d.mu.versions.logAndApply(jobID, ve, newFileMetrics(ve.NewFiles), false /* forceRotation */, func() []compactionInfo { + return nil + }); err != nil { + return nil, 0, err + } + } + for _, entry := range toFlush { + entry.readerUnrefLocked(true) + } + // We've already released the references to these memtables, so we clear + // them out of toFlush so the caller doesn't double-unref them. Plus we + // don't want to double-flush already flushed memtables in updateVE(). + toFlush = toFlush[:0] + *ve = versionEdit{} + } fileNums := make([]base.DiskFileNum, 0, b.Count()) + var exciseSpan KeyRange addFileNum := func(encodedFileNum []byte) { fileNum, n := binary.Uvarint(encodedFileNum) if n <= 0 { - panic("pebble: ingest sstable file num is invalid.") + panic("pebble: ingest sstable file num is invalid") } fileNums = append(fileNums, base.DiskFileNum(fileNum)) } - addFileNum(encodedFileNum) + if kind == base.InternalKeyKindExcise { + exciseSpan.Start = slices.Clone(encodedFileNum) + exciseSpan.End = slices.Clone(val) + } else { + addFileNum(encodedFileNum) + } for i := 1; i < int(b.Count()); i++ { - kind, encodedFileNum, _, ok, err := br.Next() + kind, key, val, ok, err := br.Next() if err != nil { return nil, 0, err } - if kind != InternalKeyKindIngestSST { - panic("pebble: invalid batch key kind.") + if kind != InternalKeyKindIngestSST && kind != InternalKeyKindExcise { + panic("pebble: invalid batch key kind") } if !ok { - panic("pebble: invalid batch count.") + panic("pebble: invalid batch count") } - addFileNum(encodedFileNum) + if kind == base.InternalKeyKindExcise { + if exciseSpan.Valid() { + panic("pebble: multiple excise spans in a single batch") + } + exciseSpan.Start = slices.Clone(key) + exciseSpan.End = slices.Clone(val) + continue + } + addFileNum(key) } if _, _, _, ok, err := br.Next(); err != nil { @@ -969,11 +1065,15 @@ func (d *DB) replayWAL( } } - if uint32(len(meta)) != b.Count() { + numFiles := len(meta) + if exciseSpan.Valid() { + numFiles++ + } + if uint32(numFiles) != b.Count() { panic("pebble: couldn't load all files in WAL entry.") } - entry, err = d.newIngestedFlushableEntry(meta, seqNum, base.DiskFileNum(ll.Num), KeyRange{}) + entry, err = d.newIngestedFlushableEntry(meta, seqNum, base.DiskFileNum(ll.Num), exciseSpan) if err != nil { return nil, 0, err } @@ -991,35 +1091,8 @@ func (d *DB) replayWAL( d.mu.mem.mutable = nil } else { toFlush = append(toFlush, entry) - // During WAL replay, the lsm only has L0, hence, the - // baseLevel is 1. For the sake of simplicity, we place the - // ingested files in L0 here, instead of finding their - // target levels. This is a simplification for the sake of - // simpler code. It is expected that WAL replay should be - // rare, and that flushables of type ingestedFlushable - // should also be rare. So, placing the ingested files in L0 - // is alright. - // - // TODO(bananabrick): Maybe refactor this function to allow - // us to easily place ingested files in levels as low as - // possible during WAL replay. It would require breaking up - // the application of ve to the manifest into chunks and is - // not pretty w/o a refactor to this function and how it's - // used. - c, err := newFlush( - d.opts, d.mu.versions.currentVersion(), - 1, /* base level */ - []*flushableEntry{entry}, - d.timeNow(), - ) - if err != nil { - return nil, 0, err - } - for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files { - ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: 0, Meta: file.FileMetadata}) - } } - return toFlush, maxSeqNum, nil + break } } diff --git a/open_test.go b/open_test.go index fff9024b4d..9fe40f67c6 100644 --- a/open_test.go +++ b/open_test.go @@ -331,7 +331,7 @@ func TestNewDBFilenames(t *testing.T) { "LOCK", "MANIFEST-000001", "OPTIONS-000003", - "marker.format-version.000004.017", + "marker.format-version.000005.018", "marker.manifest.000001.MANIFEST-000001", }, } diff --git a/testdata/checkpoint b/testdata/checkpoint index c323f8c0cd..f923019515 100644 --- a/testdata/checkpoint +++ b/testdata/checkpoint @@ -36,6 +36,10 @@ create: db/marker.format-version.000004.017 close: db/marker.format-version.000004.017 remove: db/marker.format-version.000003.016 sync: db +create: db/marker.format-version.000005.018 +close: db/marker.format-version.000005.018 +remove: db/marker.format-version.000004.017 +sync: db create: db/temporary.000003.dbtmp sync: db/temporary.000003.dbtmp close: db/temporary.000003.dbtmp @@ -98,9 +102,9 @@ close: . open-dir: checkpoints/checkpoint1 link: db/OPTIONS-000003 -> checkpoints/checkpoint1/OPTIONS-000003 open-dir: checkpoints/checkpoint1 -create: checkpoints/checkpoint1/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint1/marker.format-version.000001.017 -close: checkpoints/checkpoint1/marker.format-version.000001.017 +create: checkpoints/checkpoint1/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint1/marker.format-version.000001.018 +close: checkpoints/checkpoint1/marker.format-version.000001.018 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 link: db/000005.sst -> checkpoints/checkpoint1/000005.sst @@ -138,9 +142,9 @@ close: checkpoints open-dir: checkpoints/checkpoint2 link: db/OPTIONS-000003 -> checkpoints/checkpoint2/OPTIONS-000003 open-dir: checkpoints/checkpoint2 -create: checkpoints/checkpoint2/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint2/marker.format-version.000001.017 -close: checkpoints/checkpoint2/marker.format-version.000001.017 +create: checkpoints/checkpoint2/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint2/marker.format-version.000001.018 +close: checkpoints/checkpoint2/marker.format-version.000001.018 sync: checkpoints/checkpoint2 close: checkpoints/checkpoint2 link: db/000007.sst -> checkpoints/checkpoint2/000007.sst @@ -173,9 +177,9 @@ close: checkpoints open-dir: checkpoints/checkpoint3 link: db/OPTIONS-000003 -> checkpoints/checkpoint3/OPTIONS-000003 open-dir: checkpoints/checkpoint3 -create: checkpoints/checkpoint3/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint3/marker.format-version.000001.017 -close: checkpoints/checkpoint3/marker.format-version.000001.017 +create: checkpoints/checkpoint3/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint3/marker.format-version.000001.018 +close: checkpoints/checkpoint3/marker.format-version.000001.018 sync: checkpoints/checkpoint3 close: checkpoints/checkpoint3 link: db/000005.sst -> checkpoints/checkpoint3/000005.sst @@ -259,7 +263,7 @@ list db LOCK MANIFEST-000001 OPTIONS-000003 -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 list checkpoints/checkpoint1 @@ -269,7 +273,7 @@ list checkpoints/checkpoint1 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.017 +marker.format-version.000001.018 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint1 readonly @@ -336,7 +340,7 @@ list checkpoints/checkpoint2 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.017 +marker.format-version.000001.018 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint2 readonly @@ -378,7 +382,7 @@ list checkpoints/checkpoint3 000007.sst MANIFEST-000001 OPTIONS-000003 -marker.format-version.000001.017 +marker.format-version.000001.018 marker.manifest.000001.MANIFEST-000001 open checkpoints/checkpoint3 readonly @@ -492,9 +496,9 @@ close: checkpoints open-dir: checkpoints/checkpoint4 link: db/OPTIONS-000003 -> checkpoints/checkpoint4/OPTIONS-000003 open-dir: checkpoints/checkpoint4 -create: checkpoints/checkpoint4/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint4/marker.format-version.000001.017 -close: checkpoints/checkpoint4/marker.format-version.000001.017 +create: checkpoints/checkpoint4/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint4/marker.format-version.000001.018 +close: checkpoints/checkpoint4/marker.format-version.000001.018 sync: checkpoints/checkpoint4 close: checkpoints/checkpoint4 link: db/000010.sst -> checkpoints/checkpoint4/000010.sst @@ -582,7 +586,7 @@ list db LOCK MANIFEST-000001 OPTIONS-000003 -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 @@ -597,9 +601,9 @@ close: checkpoints open-dir: checkpoints/checkpoint5 link: db/OPTIONS-000003 -> checkpoints/checkpoint5/OPTIONS-000003 open-dir: checkpoints/checkpoint5 -create: checkpoints/checkpoint5/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint5/marker.format-version.000001.017 -close: checkpoints/checkpoint5/marker.format-version.000001.017 +create: checkpoints/checkpoint5/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint5/marker.format-version.000001.018 +close: checkpoints/checkpoint5/marker.format-version.000001.018 sync: checkpoints/checkpoint5 close: checkpoints/checkpoint5 link: db/000010.sst -> checkpoints/checkpoint5/000010.sst @@ -695,9 +699,9 @@ close: checkpoints open-dir: checkpoints/checkpoint6 link: db/OPTIONS-000003 -> checkpoints/checkpoint6/OPTIONS-000003 open-dir: checkpoints/checkpoint6 -create: checkpoints/checkpoint6/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint6/marker.format-version.000001.017 -close: checkpoints/checkpoint6/marker.format-version.000001.017 +create: checkpoints/checkpoint6/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint6/marker.format-version.000001.018 +close: checkpoints/checkpoint6/marker.format-version.000001.018 sync: checkpoints/checkpoint6 close: checkpoints/checkpoint6 link: db/000011.sst -> checkpoints/checkpoint6/000011.sst diff --git a/testdata/checkpoint_shared b/testdata/checkpoint_shared index fd264a5e7f..6aff7d5ce3 100644 --- a/testdata/checkpoint_shared +++ b/testdata/checkpoint_shared @@ -24,6 +24,10 @@ sync: db create: db/marker.format-version.000001.017 close: db/marker.format-version.000001.017 sync: db +create: db/marker.format-version.000002.018 +close: db/marker.format-version.000002.018 +remove: db/marker.format-version.000001.017 +sync: db create: db/temporary.000003.dbtmp sync: db/temporary.000003.dbtmp close: db/temporary.000003.dbtmp @@ -86,9 +90,9 @@ close: . open-dir: checkpoints/checkpoint1 link: db/OPTIONS-000003 -> checkpoints/checkpoint1/OPTIONS-000003 open-dir: checkpoints/checkpoint1 -create: checkpoints/checkpoint1/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint1/marker.format-version.000001.017 -close: checkpoints/checkpoint1/marker.format-version.000001.017 +create: checkpoints/checkpoint1/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint1/marker.format-version.000001.018 +close: checkpoints/checkpoint1/marker.format-version.000001.018 sync: checkpoints/checkpoint1 close: checkpoints/checkpoint1 open: db/MANIFEST-000001 (options: *vfs.sequentialReadsOption) @@ -135,9 +139,9 @@ close: checkpoints open-dir: checkpoints/checkpoint2 link: db/OPTIONS-000003 -> checkpoints/checkpoint2/OPTIONS-000003 open-dir: checkpoints/checkpoint2 -create: checkpoints/checkpoint2/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint2/marker.format-version.000001.017 -close: checkpoints/checkpoint2/marker.format-version.000001.017 +create: checkpoints/checkpoint2/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint2/marker.format-version.000001.018 +close: checkpoints/checkpoint2/marker.format-version.000001.018 sync: checkpoints/checkpoint2 close: checkpoints/checkpoint2 open: db/MANIFEST-000001 (options: *vfs.sequentialReadsOption) @@ -180,9 +184,9 @@ close: checkpoints open-dir: checkpoints/checkpoint3 link: db/OPTIONS-000003 -> checkpoints/checkpoint3/OPTIONS-000003 open-dir: checkpoints/checkpoint3 -create: checkpoints/checkpoint3/marker.format-version.000001.017 -sync-data: checkpoints/checkpoint3/marker.format-version.000001.017 -close: checkpoints/checkpoint3/marker.format-version.000001.017 +create: checkpoints/checkpoint3/marker.format-version.000001.018 +sync-data: checkpoints/checkpoint3/marker.format-version.000001.018 +close: checkpoints/checkpoint3/marker.format-version.000001.018 sync: checkpoints/checkpoint3 close: checkpoints/checkpoint3 open: db/MANIFEST-000001 (options: *vfs.sequentialReadsOption) @@ -239,7 +243,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 REMOTE-OBJ-CATALOG-000001 -marker.format-version.000001.017 +marker.format-version.000002.018 marker.manifest.000001.MANIFEST-000001 marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001 @@ -249,7 +253,7 @@ list checkpoints/checkpoint1 MANIFEST-000001 OPTIONS-000003 REMOTE-OBJ-CATALOG-000001 -marker.format-version.000001.017 +marker.format-version.000001.018 marker.manifest.000001.MANIFEST-000001 marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001 @@ -301,7 +305,7 @@ list checkpoints/checkpoint2 MANIFEST-000001 OPTIONS-000003 REMOTE-OBJ-CATALOG-000001 -marker.format-version.000001.017 +marker.format-version.000001.018 marker.manifest.000001.MANIFEST-000001 marker.remote-obj-catalog.000001.REMOTE-OBJ-CATALOG-000001 diff --git a/testdata/concurrent_excise b/testdata/concurrent_excise index 270aaae5c8..281767fe90 100644 --- a/testdata/concurrent_excise +++ b/testdata/concurrent_excise @@ -130,7 +130,7 @@ switch 1 ---- ok -# The below file-only snapshot should be errored out by the concurrent excise. +# The below file-only snapshot should not error out, despite a concurrent excise. batch set d something @@ -237,7 +237,7 @@ L6: build ext5 set bb something set b something -del b-c +del-range b c ---- lsm @@ -257,6 +257,19 @@ batch set bb new ---- +iter +first +next +next +next +next +---- +a: (new, .) +bb: (new, .) +d: (new, .) +e: (new, .) +. + lsm ---- L0.0: @@ -266,9 +279,19 @@ L6: 000005:[a#10,SET-a#10,SET] 000006:[e#11,SET-e#11,SET] -ingest-and-excise ext5 excise=b-c contains-excise-tombstone +ingest-and-excise ext5 excise=b-c +---- + +iter +first +next +next +next ---- -flushable ingest +a: (new, .) +b: (something, .) +bb: (something, .) +d: (new, .) lsm ---- @@ -277,7 +300,7 @@ L0.0: 000010:[d#13,SET-e#14,SET] L6: 000005:[a#10,SET-a#10,SET] - 000012:[b#16,SET-bb#16,SET] + 000012:[b#17,RANGEDEL-c#inf,RANGEDEL] 000006:[e#11,SET-e#11,SET] unblock c1 @@ -288,7 +311,20 @@ compact a-z ---- ok +iter +first +next +next +next +next +---- +a: (new, .) +b: (something, .) +bb: (something, .) +d: (new, .) +e: (new, .) + lsm ---- L6: - 000016:[a#0,SET-e#0,SET] + 000015:[a#0,SET-e#0,SET] diff --git a/testdata/determinism b/testdata/determinism index f7a30ce981..e1026ba04b 100644 --- a/testdata/determinism +++ b/testdata/determinism @@ -58,11 +58,11 @@ set alpha 5 ---- 7:batch -ingest-and-excise contains-excise-tombstone excise=a-b ext-ab +ingest-and-excise excise=a-b ext-ab ---- 8:ingest-and-excise -ingest-and-excise contains-excise-tombstone excise=b-c ext-bc +ingest-and-excise excise=b-c ext-bc ---- 9:ingest-and-excise diff --git a/testdata/event_listener b/testdata/event_listener index ec7d1f325c..b5ca6ef57a 100644 --- a/testdata/event_listener +++ b/testdata/event_listener @@ -47,6 +47,11 @@ close: db/marker.format-version.000004.017 remove: db/marker.format-version.000003.016 sync: db upgraded to format version: 017 +create: db/marker.format-version.000005.018 +close: db/marker.format-version.000005.018 +remove: db/marker.format-version.000004.017 +sync: db +upgraded to format version: 018 create: db/temporary.000003.dbtmp sync: db/temporary.000003.dbtmp close: db/temporary.000003.dbtmp @@ -354,9 +359,9 @@ close: . open-dir: checkpoint link: db/OPTIONS-000003 -> checkpoint/OPTIONS-000003 open-dir: checkpoint -create: checkpoint/marker.format-version.000001.017 -sync-data: checkpoint/marker.format-version.000001.017 -close: checkpoint/marker.format-version.000001.017 +create: checkpoint/marker.format-version.000001.018 +sync-data: checkpoint/marker.format-version.000001.018 +close: checkpoint/marker.format-version.000001.018 sync: checkpoint close: checkpoint link: db/000013.sst -> checkpoint/000013.sst diff --git a/testdata/excise b/testdata/excise index b6492ee658..ec3e0dff5d 100644 --- a/testdata/excise +++ b/testdata/excise @@ -542,8 +542,7 @@ c: (something, .) . -# Test to verify that IngestAndExcise now uses flushableIngest when directed to -# do so using a flag that is passed down by the caller. +# Test to verify that IngestAndExcise now uses flushableIngest. reset ---- @@ -625,18 +624,18 @@ set f something del b-e ---- -ingest-and-excise ext5 excise=b-e contains-excise-tombstone +ingest-and-excise ext5 excise=b-e ---- -flushable ingest +memtable flushed lsm ---- L0.0: - 000010:[b#17,SET-f#17,SET] - 000013:[x#15,SET-x#15,SET] + 000010:[b#18,SET-f#18,SET] + 000012:[x#15,SET-x#15,SET] L6: - 000014(000009):[a#0,SET-a#0,SET] - 000015(000009):[g#0,SET-g#0,SET] + 000013(000009):[a#0,SET-a#0,SET] + 000014(000009):[g#0,SET-g#0,SET] iter lower=c upper=e last @@ -667,7 +666,7 @@ compact a z lsm ---- L6: - 000018:[a#0,SET-x#0,SET] + 000017:[a#0,SET-x#0,SET] batch commit ---- @@ -683,17 +682,17 @@ set f somethingElse del b-e ---- -ingest-and-excise ext5 ext6 excise=b-e contains-excise-tombstone +ingest-and-excise ext5 ext6 excise=b-e ---- lsm ---- L0.0: - 000019:[a#22,SET-a#22,SET] - 000020:[b#23,SET-f#23,SET] + 000018:[a#22,SET-a#22,SET] + 000019:[b#23,SET-f#23,SET] L6: - 000021(000018):[a#0,SET-aa#0,SET] - 000022(000018):[f#0,SET-x#0,SET] + 000020(000017):[a#0,SET-aa#0,SET] + 000021(000017):[f#0,SET-x#0,SET] iter lower=a upper=f first @@ -709,3 +708,74 @@ b: (somethingElse, .) c: (somethingElse, .) . . + +# Two ovelrapping ingestions wait on one another even if +# the overlap is only on the excise span. + +reset +---- + +batch +set a foo +set b bar +set bb neverseen +set c baz +---- + +build ext7 +set b foo +set c bar +---- + +ingest-and-excise ext7 excise=b-g no-wait +---- + +build ext8 +set d gee +set e fee +---- + +ingest ext8 no-wait +---- + +iter +first +next +next +next +next +next +---- +a: (foo, .) +b: (foo, .) +c: (bar, .) +d: (gee, .) +e: (fee, .) +. + +flush +---- + +lsm +---- +L0.0: + 000007(000006):[a#10,SET-a#10,SET] +L6: + 000004:[b#15,SET-c#15,SET] + 000008:[d#16,SET-e#16,SET] + + +iter +first +next +next +next +next +next +---- +a: (foo, .) +b: (foo, .) +c: (bar, .) +d: (gee, .) +e: (fee, .) +. diff --git a/testdata/flushable_ingest b/testdata/flushable_ingest index c5806832d0..8d19b96ffe 100644 --- a/testdata/flushable_ingest +++ b/testdata/flushable_ingest @@ -60,7 +60,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 # Test basic WAL replay @@ -81,7 +81,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 open @@ -94,6 +94,7 @@ L0.1: 000004:[a#11,SET-a#11,SET] L0.0: 000009:[a#10,SET-a#10,SET] +L6: 000005:[b#12,SET-b#12,SET] 000006:[d#13,SET-d#13,SET] @@ -389,7 +390,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 close @@ -409,7 +410,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 open @@ -422,9 +423,10 @@ L0.1: 000004:[a#11,SET-a#11,SET] L0.0: 000009:[a#10,SET-a#10,SET] + 000012:[f#14,SET-f#14,SET] +L6: 000005:[b#12,SET-b#12,SET] 000006:[d#13,SET-d#13,SET] - 000010:[f#14,SET-f#14,SET] # Check if the new mutable memtable is using a new log file, and that the # previous log files have been deleted appropriately after the flush. @@ -434,15 +436,15 @@ ls 000005.sst 000006.sst 000009.sst -000010.sst -000011.log +000012.sst +000014.log LOCK MANIFEST-000001 -MANIFEST-000012 -OPTIONS-000013 +MANIFEST-000011 +OPTIONS-000015 ext -marker.format-version.000004.017 -marker.manifest.000002.MANIFEST-000012 +marker.format-version.000005.018 +marker.manifest.000002.MANIFEST-000011 # Make sure that the new mutable memtable can accept writes. batch @@ -584,7 +586,7 @@ LOCK MANIFEST-000001 OPTIONS-000003 ext -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 close @@ -603,7 +605,7 @@ MANIFEST-000001 OPTIONS-000003 ext ext1 -marker.format-version.000004.017 +marker.format-version.000005.018 marker.manifest.000001.MANIFEST-000001 open diff --git a/testdata/ingested_flushable_api b/testdata/ingested_flushable_api index 5d36168efe..a54cca56b4 100644 --- a/testdata/ingested_flushable_api +++ b/testdata/ingested_flushable_api @@ -40,7 +40,7 @@ iter rangekeyIter ---- -d-g:{(#0,RANGEKEYSET,1,val1)} +d-g:{(#1,RANGEKEYSET,1,val1)} containsRangeKey ---- @@ -65,8 +65,8 @@ iter rangedelIter ---- -a-j:{(#0,RANGEDEL)} -o-z:{(#0,RANGEDEL)} +a-j:{(#2,RANGEDEL)} +o-z:{(#2,RANGEDEL)} rangekeyIter ---- @@ -93,15 +93,73 @@ flushable ext4 iter ---- -k#0,SET +k#3,SET rangekeyIter ---- -y-z:{(#0,RANGEKEYSET,1,val1)} +y-z:{(#3,RANGEKEYSET,1,val1)} rangedelIter ---- -a-j:{(#0,RANGEDEL)} +a-j:{(#3,RANGEDEL)} + +readyForFlush +---- +true + +containsRangeKey +---- +true + +build ext5 +set a aa +set k kk +range-key-set y z 1 val1 +---- + +flushable ext5 excise=a-g +---- + +iter +---- +a#5,SET +k#5,SET + +rangekeyIter +---- +a-g:{(#4,RANGEKEYDEL)} +y-z:{(#5,RANGEKEYSET,1,val1)} + +rangedelIter +---- +a-g:{(#4,RANGEDEL)} + +readyForFlush +---- +true + +containsRangeKey +---- +true + +build ext6 +range-key-set a c 1 val1 +---- + +flushable ext6 excise=d-g +---- + +iter +---- + +rangekeyIter +---- +a-c:{(#7,RANGEKEYSET,1,val1)} +d-g:{(#6,RANGEKEYDEL)} + +rangedelIter +---- +d-g:{(#6,RANGEDEL)} readyForFlush ---- diff --git a/tool/db.go b/tool/db.go index 51396f52d9..2f7a2bd5bd 100644 --- a/tool/db.go +++ b/tool/db.go @@ -625,7 +625,7 @@ func (d *dbT) runExcise(cmd *cobra.Command, args []string) { return } - _, err = db.IngestAndExcise(context.Background(), []string{path}, nil, nil, span, true /* sstsContainExciseTombstone */) + _, err = db.IngestAndExcise(context.Background(), []string{path}, nil, nil, span) if err != nil { fmt.Fprintf(stderr, "Error excising: %s\n", err) return diff --git a/tool/wal.go b/tool/wal.go index 27a279b850..4d0ffde5ab 100644 --- a/tool/wal.go +++ b/tool/wal.go @@ -271,6 +271,8 @@ func (w *walT) dumpBatch( case base.InternalKeyKindIngestSST: fileNum, _ := binary.Uvarint(ukey) fmt.Fprintf(stdout, "%s", base.FileNum(fileNum)) + case base.InternalKeyKindExcise: + fmt.Fprintf(stdout, "%s,%s", w.fmtKey.fn(ukey), w.fmtKey.fn(value)) case base.InternalKeyKindSingleDelete: fmt.Fprintf(stdout, "%s", w.fmtKey.fn(ukey)) case base.InternalKeyKindSetWithDelete: