From 4fc6ccd93ae2c31fbdd9302a536f459d9ee385ec Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Fri, 3 Nov 2023 16:20:46 -0400 Subject: [PATCH] metamorphic: add IngestAndExcise, make EFOS determinitic This change updates the metamorphic tests to support a new useExcise testOption and a new IngestAndExcise operation, which calls db.IngestAndExcise() if useExcise is true, or calls it with an sstable containing DeleteRange/RangeKeyDelete for the same span if useExcise is false. Furthermore, a new private option is added, efosAlwaysCreatesIterators. If the test option for useExcise (or the previously added useSharedReplicate) is enabled, one of these two things happen to guarantee EFOS determinism around excises: 1) If efosAlwaysCreatesIterators is true, the behaviour of EFOS is changed to always hold onto a readState and never return errors from NewIter. 2) If efosAlwaysCreatesIterators is false, regular EFOS behaviour is retained, but a flush is completed after every EFOS instantiation to guarantee that it becomes excise-immune right at instantiation. Note that regular snapshots are never created when useExcise or useSharedReplicate is true. Fixes #2885. --- db.go | 23 +++- internal/metamorphic/meta_test.go | 2 + metamorphic/config.go | 2 + metamorphic/generator.go | 60 +++++---- metamorphic/key_manager.go | 6 + metamorphic/ops.go | 209 +++++++++++++++++++++++++----- metamorphic/options.go | 37 ++++++ metamorphic/parser.go | 5 + options.go | 12 ++ snapshot.go | 101 ++++++++++++--- tool/manifest.go | 6 + 11 files changed, 380 insertions(+), 83 deletions(-) diff --git a/db.go b/db.go index ea23aaaea9..4c88395ecd 100644 --- a/db.go +++ b/db.go @@ -1001,9 +1001,12 @@ var iterAllocPool = sync.Pool{ // and the specified seqNum will be used as the snapshot seqNum. // - EFOS in file-only state: Only `seqNum` and `vers` are set. All the // relevant SSTs are referenced by the *version. +// - EFOS that has been excised but is in alwaysCreateIters mode (tests only). +// Only `seqNum` and `readState` are set. type snapshotIterOpts struct { - seqNum uint64 - vers *version + seqNum uint64 + vers *version + readState *readState } type batchIterOpts struct { @@ -1057,8 +1060,13 @@ func (d *DB) newIter( // files in the associated version from being deleted if there is a current // compaction. The readState is unref'd by Iterator.Close(). if internalOpts.snapshot.vers == nil { - // NB: loadReadState() calls readState.ref(). - readState = d.loadReadState() + if internalOpts.snapshot.readState != nil { + readState = internalOpts.snapshot.readState + readState.ref() + } else { + // NB: loadReadState() calls readState.ref(). + readState = d.loadReadState() + } } else { // vers != nil internalOpts.snapshot.vers.Ref() @@ -1289,7 +1297,12 @@ func (d *DB) newInternalIter( // compaction. The readState is unref'd by Iterator.Close(). var readState *readState if sOpts.vers == nil { - readState = d.loadReadState() + if sOpts.readState != nil { + readState = sOpts.readState + readState.ref() + } else { + readState = d.loadReadState() + } } if sOpts.vers != nil { sOpts.vers.Ref() diff --git a/internal/metamorphic/meta_test.go b/internal/metamorphic/meta_test.go index 02e9beb8d6..7fd5baab96 100644 --- a/internal/metamorphic/meta_test.go +++ b/internal/metamorphic/meta_test.go @@ -76,6 +76,7 @@ func TestMetaTwoInstance(t *testing.T) { case runOnceFlags.Compare != "": runDirs := strings.Split(runOnceFlags.Compare, ",") onceOpts := runOnceFlags.MakeRunOnceOptions() + onceOpts = append(onceOpts, metamorphic.MultiInstance(2)) metamorphic.Compare(t, runOnceFlags.Dir, runOnceFlags.Seed, runDirs, onceOpts...) case runOnceFlags.RunDir != "": @@ -83,6 +84,7 @@ func TestMetaTwoInstance(t *testing.T) { // runOptions() below) or the user specified it manually in order to re-run // a test. onceOpts := runOnceFlags.MakeRunOnceOptions() + onceOpts = append(onceOpts, metamorphic.MultiInstance(2)) metamorphic.RunOnce(t, runOnceFlags.RunDir, runOnceFlags.Seed, filepath.Join(runOnceFlags.RunDir, "history"), onceOpts...) default: diff --git a/metamorphic/config.go b/metamorphic/config.go index 891c56568a..8047448bd9 100644 --- a/metamorphic/config.go +++ b/metamorphic/config.go @@ -45,6 +45,7 @@ const ( writerDelete writerDeleteRange writerIngest + writerIngestAndExcise writerMerge writerRangeKeyDelete writerRangeKeySet @@ -154,6 +155,7 @@ func defaultConfig() config { writerDelete: 100, writerDeleteRange: 50, writerIngest: 100, + writerIngestAndExcise: 50, writerMerge: 100, writerRangeKeySet: 10, writerRangeKeyUnset: 10, diff --git a/metamorphic/generator.go b/metamorphic/generator.go index 8e7e6d45b3..5845c9b934 100644 --- a/metamorphic/generator.go +++ b/metamorphic/generator.go @@ -171,6 +171,7 @@ func generate(rng *rand.Rand, count uint64, cfg config, km *keyManager) []op { writerDelete: g.writerDelete, writerDeleteRange: g.writerDeleteRange, writerIngest: g.writerIngest, + writerIngestAndExcise: g.writerIngestAndExcise, writerMerge: g.writerMerge, writerRangeKeyDelete: g.writerRangeKeyDelete, writerRangeKeySet: g.writerRangeKeySet, @@ -1174,7 +1175,7 @@ func (g *generator) replicate() { var startKey, endKey []byte startKey = g.randKeyToRead(0.001) // 0.1% new keys endKey = g.randKeyToRead(0.001) // 0.1% new keys - for g.cmp(startKey, endKey) == 0 { + for g.equal(startKey, endKey) { endKey = g.randKeyToRead(0.01) // 1% new keys } if g.cmp(startKey, endKey) > 0 { @@ -1228,28 +1229,15 @@ func (g *generator) newSnapshot() { snapID: snapID, } - // With 75% probability, impose bounds on the keys that may be read with the - // snapshot. Setting bounds allows some runs of the metamorphic test to use - // a EventuallyFileOnlySnapshot instead of a Snapshot, testing equivalence - // between the two for reads within those bounds. - // - // If we're in multi-instance mode, we must always create bounds, as we will - // always create EventuallyFileOnlySnapshots to allow commands that use excises - // (eg. replicateOp) to work. - if g.rng.Float64() < 0.75 || g.dbs.Len() > 1 { - s.bounds = g.generateDisjointKeyRanges( - g.rng.Intn(5) + 1, /* between 1-5 */ - ) - g.snapshotBounds[snapID] = s.bounds - } + // Impose bounds on the keys that may be read with the snapshot. Setting bounds + // allows some runs of the metamorphic test to use a EventuallyFileOnlySnapshot + // instead of a Snapshot, testing equivalence between the two for reads within + // those bounds. + s.bounds = g.generateDisjointKeyRanges( + g.rng.Intn(5) + 1, /* between 1-5 */ + ) + g.snapshotBounds[snapID] = s.bounds g.add(s) - if g.dbs.Len() > 1 { - // Do a flush after each EFOS, if we're in multi-instance mode. This limits - // the testing area of EFOS, but allows them to be used alongside operations - // that do an excise (eg. replicateOp). This will be revisited when - // https://github.com/cockroachdb/pebble/issues/2885 is implemented. - g.add(&flushOp{dbID}) - } } func (g *generator) snapshotClose() { @@ -1454,6 +1442,34 @@ func (g *generator) writerIngest() { }) } +func (g *generator) writerIngestAndExcise() { + if len(g.liveBatches) == 0 { + return + } + + dbID := g.dbs.rand(g.rng) + batchID := g.liveBatches.rand(g.rng) + g.removeBatchFromGenerator(batchID) + + start := g.randKeyToWrite(0.001) + end := g.randKeyToWrite(0.001) + for g.equal(start, end) { + end = g.randKeyToWrite(0.001) + } + if g.cmp(start, end) > 0 { + start, end = end, start + } + derivedDBID := g.objDB[batchID] + + g.add(&ingestAndExciseOp{ + dbID: dbID, + batchID: batchID, + derivedDBID: derivedDBID, + exciseStart: start, + exciseEnd: end, + }) +} + func (g *generator) writerMerge() { if len(g.liveWriters) == 0 { return diff --git a/metamorphic/key_manager.go b/metamorphic/key_manager.go index 5943aa3042..8c1e1ce682 100644 --- a/metamorphic/key_manager.go +++ b/metamorphic/key_manager.go @@ -406,6 +406,10 @@ func (k *keyManager) update(o op) { k.mergeKeysInto(batchID, s.dbID) } k.checkForDelOrSingleDelTransitionInDB(s.dbID) + case *ingestAndExciseOp: + // Merge all keys in the batch with the keys in the DB. + k.mergeKeysInto(s.batchID, s.dbID) + k.checkForDelOrSingleDelTransitionInDB(s.dbID) case *applyOp: // Merge the keys from this writer into the parent writer. k.mergeKeysInto(s.batchID, s.writerID) @@ -517,6 +521,8 @@ func opWrittenKeys(untypedOp op) [][]byte { case *flushOp: case *getOp: case *ingestOp: + case *ingestAndExciseOp: + return [][]byte{t.exciseStart, t.exciseEnd} case *initOp: case *iterFirstOp: case *iterLastOp: diff --git a/metamorphic/ops.go b/metamorphic/ops.go index 3743b8a08e..f01135c01c 100644 --- a/metamorphic/ops.go +++ b/metamorphic/ops.go @@ -518,13 +518,15 @@ func (o *ingestOp) run(t *test, h historyRecorder) { h.Recordf("%s // %v", o, err) } -func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (string, error) { - path := t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", o.dbID.slot(), i)) +func buildForIngest( + t *test, dbID objID, h historyRecorder, b *pebble.Batch, i int, +) (string, *sstable.WriterMetadata, error) { + path := t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i)) f, err := t.opts.FS.Create(path) if err != nil { - return "", err + return "", nil, err } - db := t.getDB(o.dbID) + db := t.getDB(dbID) iter, rangeDelIter, rangeKeyIter := private.BatchSort(b) defer closeIters(iter, rangeDelIter, rangeKeyIter) @@ -550,16 +552,16 @@ func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (st // It's possible that we wrote the key on a batch from a db that supported // DeleteSized, but are now ingesting into a db that does not. Detect // this case and translate the key to an InternalKeyKindDelete. - if key.Kind() == pebble.InternalKeyKindDeleteSized && !t.isFMV(o.dbID, pebble.FormatDeleteSizedAndObsolete) { + if key.Kind() == pebble.InternalKeyKindDeleteSized && !t.isFMV(dbID, pebble.FormatDeleteSizedAndObsolete) { value = pebble.LazyValue{} key.SetKind(pebble.InternalKeyKindDelete) } if err := w.Add(*key, value.InPlaceValue()); err != nil { - return "", err + return "", nil, err } } if err := iter.Close(); err != nil { - return "", err + return "", nil, err } iter = nil @@ -569,11 +571,11 @@ func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (st // NB: We don't have to copy the key or value since we're reading from a // batch which doesn't do prefix compression. if err := w.DeleteRange(t.Start, t.End); err != nil { - return "", err + return "", nil, err } } if err := rangeDelIter.Close(); err != nil { - return "", err + return "", nil, err } rangeDelIter = nil } @@ -596,29 +598,35 @@ func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (st } err = rangekey.Coalesce(t.opts.Comparer.Compare, equal, span.Keys, &collapsed.Keys) if err != nil { - return "", err + return "", nil, err } for i := range collapsed.Keys { collapsed.Keys[i].Trailer = base.MakeTrailer(0, collapsed.Keys[i].Kind()) } keyspan.SortKeysByTrailer(&collapsed.Keys) if err := rangekey.Encode(&collapsed, w.AddRangeKey); err != nil { - return "", err + return "", nil, err } } if err := rangeKeyIter.Error(); err != nil { - return "", err + return "", nil, err } if err := rangeKeyIter.Close(); err != nil { - return "", err + return "", nil, err } rangeKeyIter = nil } if err := w.Close(); err != nil { - return "", err + return "", nil, err } - return path, nil + meta, err := w.Metadata() + return path, meta, err +} + +func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (string, error) { + path, _, err := buildForIngest(t, o.dbID, h, b, i) + return path, err } func (o *ingestOp) receiver() objID { return o.dbID } @@ -786,6 +794,84 @@ func (o *ingestOp) String() string { return buf.String() } +type ingestAndExciseOp struct { + dbID objID + batchID objID + derivedDBID objID + exciseStart, exciseEnd []byte +} + +func (o *ingestAndExciseOp) run(t *test, h historyRecorder) { + var err error + b := t.getBatch(o.batchID) + t.clearObj(o.batchID) + if t.testOpts.Opts.Comparer.Compare(o.exciseEnd, o.exciseStart) <= 0 { + panic("non-well-formed excise span") + } + if b.Empty() { + // No-op. + h.Recordf("%s // %v", o, err) + return + } + path, writerMeta, err2 := o.build(t, h, b, 0 /* i */) + if err2 != nil { + h.Recordf("Build(%s) // %v", o.batchID, err2) + return + } + err = firstError(err, err2) + err = firstError(err, b.Close()) + + if writerMeta.Properties.NumEntries == 0 { + // No-op. + h.Recordf("%s // %v", o, err) + return + } + db := t.getDB(o.dbID) + if !t.testOpts.useExcise { + // Do a rangedel and rangekeydel before the ingestion. This mimics the + // behaviour of an excise. + err = firstError(err, db.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts)) + err = firstError(err, db.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts)) + } + + if t.testOpts.useExcise { + err = firstError(err, withRetries(func() error { + _, err := t.getDB(o.dbID).IngestAndExcise([]string{path}, nil /* sharedSSTs */, pebble.KeyRange{ + Start: o.exciseStart, + End: o.exciseEnd, + }) + return err + })) + } else { + err = firstError(err, withRetries(func() error { + return t.getDB(o.dbID).Ingest([]string{path}) + })) + } + + h.Recordf("%s // %v", o, err) +} + +func (o *ingestAndExciseOp) build( + t *test, h historyRecorder, b *pebble.Batch, i int, +) (string, *sstable.WriterMetadata, error) { + return buildForIngest(t, o.dbID, h, b, i) +} + +func (o *ingestAndExciseOp) receiver() objID { return o.dbID } +func (o *ingestAndExciseOp) syncObjs() objIDSlice { + // Ingest should not be concurrent with mutating the batches that will be + // ingested as sstables. + objs := []objID{o.batchID} + if o.derivedDBID != o.dbID { + objs = append(objs, o.derivedDBID) + } + return objs +} + +func (o *ingestAndExciseOp) String() string { + return fmt.Sprintf("%s.IngestAndExcise(%s, %q, %q)", o.dbID, o.batchID, o.exciseStart, o.exciseEnd) +} + // getOp models a Reader.Get operation. type getOp struct { readerID objID @@ -1353,10 +1439,27 @@ type newSnapshotOp struct { } func (o *newSnapshotOp) run(t *test, h historyRecorder) { + bounds := o.bounds + if len(bounds) == 0 { + panic("bounds unexpectedly unset for newSnapshotOp") + } // Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/ - if len(t.dbs) > 1 || (len(o.bounds) > 0 && ((11400714819323198485*uint64(t.idx)*t.testOpts.seedEFOS)>>63) == 1) { - s := t.getDB(o.dbID).NewEventuallyFileOnlySnapshot(o.bounds) + createEfos := ((11400714819323198485 * uint64(t.idx) * t.testOpts.seedEFOS) >> 63) == 1 + // If either of these options is true, an EFOS _must_ be created, regardless + // of what the fibonacci hash returned. + excisePossible := t.testOpts.useSharedReplicate || t.testOpts.useExcise + if createEfos || excisePossible { + s := t.getDB(o.dbID).NewEventuallyFileOnlySnapshot(bounds) t.setSnapshot(o.snapID, s) + // If the EFOS isn't guaranteed to always create iterators, we must force + // a flush on this DB so it transitions this EFOS into a file-only snapshot. + if excisePossible && !t.testOpts.efosAlwaysCreatesIters { + err := t.getDB(o.dbID).Flush() + if err != nil { + h.Recordf("%s // %v", o, err) + panic(errors.Wrap(err, "newSnapshotOp")) + } + } } else { s := t.getDB(o.dbID).NewSnapshot() t.setSnapshot(o.snapID, s) @@ -1456,14 +1559,11 @@ func (r *replicateOp) runSharedReplicate( }, func(start, end []byte, keys []keyspan.Key) error { s := keyspan.Span{ - Start: start, - End: end, - Keys: keys, - KeysOrder: 0, + Start: start, + End: end, + Keys: keys, } - return rangekey.Encode(&s, func(k base.InternalKey, v []byte) error { - return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v) - }) + return rangekey.Encode(&s, w.AddRangeKey) }, func(sst *pebble.SharedSSTMeta) error { sharedSSTs = append(sharedSSTs, *sst) @@ -1475,6 +1575,31 @@ func (r *replicateOp) runSharedReplicate( return } + err = w.Close() + if err != nil { + h.Recordf("%s // %v", r, err) + return + } + meta, err := w.Metadata() + if err != nil { + h.Recordf("%s // %v", r, err) + return + } + if len(sharedSSTs) == 0 && meta.Properties.NumEntries == 0 { + // IngestAndExcise below will be a no-op. We should do a + // DeleteRange+RangeKeyDel to mimic the behaviour of the non-shared-replicate + // case. + // + // TODO(bilal): Remove this when we support excises with no matching ingests. + if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil { + h.Recordf("%s // %v", r, err) + return + } + err := dest.DeleteRange(r.start, r.end, t.writeOpts) + h.Recordf("%s // %v", r, err) + return + } + _, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, pebble.KeyRange{Start: r.start, End: r.end}) h.Recordf("%s // %v", r, err) } @@ -1501,6 +1626,15 @@ func (r *replicateOp) run(t *test, h historyRecorder) { return } + // First, do a RangeKeyDelete and DeleteRange on the whole span. + if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil { + h.Recordf("%s // %v", r, err) + return + } + if err := dest.DeleteRange(r.start, r.end, t.writeOpts); err != nil { + h.Recordf("%s // %v", r, err) + return + } iter, err := source.NewIter(&pebble.IterOptions{ LowerBound: r.start, UpperBound: r.end, @@ -1511,16 +1645,7 @@ func (r *replicateOp) run(t *test, h historyRecorder) { } defer iter.Close() - // Write rangedels and rangekeydels for the range. This mimics the Excise - // that runSharedReplicate would do. - if err := w.DeleteRange(r.start, r.end); err != nil { - panic(err) - } - if err := w.RangeKeyDelete(r.start, r.end); err != nil { - panic(err) - } - - for ok := iter.SeekGE(r.start); ok && iter.Error() != nil; ok = iter.Next() { + for ok := iter.SeekGE(r.start); ok && iter.Error() == nil; ok = iter.Next() { hasPoint, hasRange := iter.HasPointAndRange() if hasPoint { val, err := iter.ValueAndErr() @@ -1534,13 +1659,25 @@ func (r *replicateOp) run(t *test, h historyRecorder) { if hasRange && iter.RangeKeyChanged() { rangeKeys := iter.RangeKeys() rkStart, rkEnd := iter.RangeBounds() + + span := &keyspan.Span{Start: rkStart, End: rkEnd, Keys: make([]keyspan.Key, len(rangeKeys))} for i := range rangeKeys { - if err := w.RangeKeySet(rkStart, rkEnd, rangeKeys[i].Suffix, rangeKeys[i].Value); err != nil { - panic(err) + span.Keys[i] = keyspan.Key{ + Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet), + Suffix: rangeKeys[i].Suffix, + Value: rangeKeys[i].Value, } } + keyspan.SortKeysByTrailer(&span.Keys) + if err := rangekey.Encode(span, w.AddRangeKey); err != nil { + panic(err) + } } } + if err := iter.Error(); err != nil { + h.Recordf("%s // %v", r, err) + return + } if err := w.Close(); err != nil { panic(err) } diff --git a/metamorphic/options.go b/metamorphic/options.go index b288c44068..394d7daf94 100644 --- a/metamorphic/options.go +++ b/metamorphic/options.go @@ -124,6 +124,16 @@ func parseOptions( return true } return true + case "TestOptions.use_shared_replicate": + opts.useSharedReplicate = true + return true + case "TestOptions.use_excise": + opts.useExcise = true + return true + case "TestOptions.efos_always_creates_iterators": + opts.efosAlwaysCreatesIters = true + opts.Opts.TestingAlwaysCreateEFOSIterators(true /* value */) + return true default: if customOptionParsers == nil { return false @@ -191,6 +201,15 @@ func optionsToString(opts *TestOptions) string { if opts.ingestSplit { fmt.Fprintf(&buf, " ingest_split=%v\n", opts.ingestSplit) } + if opts.useSharedReplicate { + fmt.Fprintf(&buf, " use_shared_replicate=%v\n", opts.useSharedReplicate) + } + if opts.useExcise { + fmt.Fprintf(&buf, " use_excise=%v\n", opts.useExcise) + } + if opts.efosAlwaysCreatesIters { + fmt.Fprintf(&buf, " efos_always_creates_iterators=%v\n", opts.efosAlwaysCreatesIters) + } for _, customOpt := range opts.CustomOpts { fmt.Fprintf(&buf, " %s=%s\n", customOpt.Name(), customOpt.Value()) } @@ -267,6 +286,16 @@ type TestOptions struct { // Enables ingest splits. Saved here for serialization as Options does not // serialize this. ingestSplit bool + // Enables operations that do excises. Note that a false value for this does + // not guarantee the lack of excises, as useSharedReplicate can also cause + // excises. However !useExcise && !useSharedReplicate can be used to guarantee + // lack of excises. + useExcise bool + // Enables EFOS to always create iterators, even if a conflicting excise + // happens. Used to guarantee EFOS determinism when conflicting excises are + // in play. If false, EFOS determinism is maintained by having the DB do a + // flush after every new EFOS. + efosAlwaysCreatesIters bool } // CustomOption defines a custom option that configures the behavior of an @@ -571,6 +600,14 @@ func randomOptions( testOpts.seedEFOS = rng.Uint64() testOpts.ingestSplit = rng.Intn(2) == 0 opts.Experimental.IngestSplit = func() bool { return testOpts.ingestSplit } + testOpts.useExcise = rng.Intn(2) == 0 + if testOpts.useExcise || testOpts.useSharedReplicate { + testOpts.efosAlwaysCreatesIters = rng.Intn(2) == 0 + opts.TestingAlwaysCreateEFOSIterators(testOpts.efosAlwaysCreatesIters) + if testOpts.Opts.FormatMajorVersion < pebble.FormatVirtualSSTables { + testOpts.Opts.FormatMajorVersion = pebble.FormatVirtualSSTables + } + } testOpts.Opts.EnsureDefaults() return testOpts } diff --git a/metamorphic/parser.go b/metamorphic/parser.go index dcc19e2929..7c456f19d1 100644 --- a/metamorphic/parser.go +++ b/metamorphic/parser.go @@ -69,6 +69,8 @@ func opArgs(op op) (receiverID *objID, targetID *objID, args []interface{}) { return &t.readerID, nil, []interface{}{&t.key} case *ingestOp: return &t.dbID, nil, []interface{}{&t.batchIDs} + case *ingestAndExciseOp: + return &t.dbID, nil, []interface{}{&t.batchID, &t.exciseStart, &t.exciseEnd} case *initOp: return nil, nil, []interface{}{&t.dbSlots, &t.batchSlots, &t.iterSlots, &t.snapshotSlots} case *iterLastOp: @@ -132,6 +134,7 @@ var methods = map[string]*methodInfo{ "Flush": makeMethod(flushOp{}, dbTag), "Get": makeMethod(getOp{}, dbTag, batchTag, snapTag), "Ingest": makeMethod(ingestOp{}, dbTag), + "IngestAndExcise": makeMethod(ingestAndExciseOp{}, dbTag), "Init": makeMethod(initOp{}, dbTag), "Last": makeMethod(iterLastOp{}, iterTag), "Merge": makeMethod(mergeOp{}, dbTag, batchTag), @@ -588,6 +591,8 @@ func computeDerivedFields(ops []op) { for i := range v.batchIDs { v.derivedDBIDs[i] = objToDB[v.batchIDs[i]] } + case *ingestAndExciseOp: + v.derivedDBID = objToDB[v.batchID] case *deleteOp: derivedDBID := v.writerID if v.writerID.tag() != dbTag { diff --git a/options.go b/options.go index 8a2c609ef0..244b101249 100644 --- a/options.go +++ b/options.go @@ -973,6 +973,11 @@ type Options struct { // against the FS are made after the DB is closed, the FS may leak a // goroutine indefinitely. fsCloser io.Closer + + // efosAlwaysCreatesIterators is set by some tests to force + // EventuallyFileOnlySnapshots to always create iterators, even after a + // conflicting excise. + efosAlwaysCreatesIterators bool } } @@ -1146,6 +1151,13 @@ func (o *Options) AddEventListener(l EventListener) { o.EventListener = &l } +// TestingAlwaysCreateEFOSIterators is used to toggle a private option for +// having EventuallyFileOnlySnapshots always create iterators. Meant to only +// be used in tests. +func (o *Options) TestingAlwaysCreateEFOSIterators(value bool) { + o.private.efosAlwaysCreatesIterators = value +} + func (o *Options) equal() Equal { if o.Comparer.Equal == nil { return bytes.Equal diff --git a/snapshot.go b/snapshot.go index 5477b54717..cdbf028d19 100644 --- a/snapshot.go +++ b/snapshot.go @@ -250,6 +250,10 @@ type EventuallyFileOnlySnapshot struct { snap *Snapshot // The wrapped version reference, if a file-only snapshot. vers *version + + // The readState corresponding to when this EFOS was created. Only set + // if alwaysCreateIters is true. + rs *readState } // Key ranges to watch for an excise on. @@ -262,6 +266,12 @@ type EventuallyFileOnlySnapshot struct { db *DB seqNum uint64 + // If true, this EventuallyFileOnlySnapshot will always generate iterators that + // retain snapshot semantics, by holding onto the readState if a conflicting + // excise were to happen. Only used in some tests to enforce deterministic + // behaviour around excises. + alwaysCreateIters bool + closed chan struct{} } @@ -282,10 +292,14 @@ func (d *DB) makeEventuallyFileOnlySnapshot( } } es := &EventuallyFileOnlySnapshot{ - db: d, - seqNum: seqNum, - protectedRanges: keyRanges, - closed: make(chan struct{}), + db: d, + seqNum: seqNum, + protectedRanges: keyRanges, + closed: make(chan struct{}), + alwaysCreateIters: d.opts.private.efosAlwaysCreatesIterators, + } + if es.alwaysCreateIters { + es.mu.rs = d.loadReadState() } if isFileOnly { es.mu.vers = d.mu.versions.currentVersion() @@ -429,6 +443,9 @@ func (es *EventuallyFileOnlySnapshot) Close() error { if es.mu.vers != nil { es.mu.vers.UnrefLocked() } + if es.mu.rs != nil { + es.mu.rs.unrefLocked() + } return nil } @@ -464,6 +481,31 @@ func (es *EventuallyFileOnlySnapshot) NewIter(o *IterOptions) (*Iterator, error) return es.NewIterWithContext(context.Background(), o) } +func (es *EventuallyFileOnlySnapshot) newAlwaysCreateIterWithContext( + ctx context.Context, o *IterOptions, +) (*Iterator, error) { + // Grab the db mutex. This avoids races down below, where we could get + // excised between the es.excised.Load() call, and the newIter call. + es.db.mu.Lock() + defer es.db.mu.Unlock() + es.mu.Lock() + defer es.mu.Unlock() + if es.mu.vers != nil { + sOpts := snapshotIterOpts{seqNum: es.seqNum, vers: es.mu.vers} + return es.db.newIter(ctx, nil /* batch */, newIterOpts{snapshot: sOpts}, o), nil + } + + sOpts := snapshotIterOpts{seqNum: es.seqNum} + if es.excised.Load() { + if es.mu.rs == nil { + return nil, errors.AssertionFailedf("unexpected nil readState in EFOS' alwaysCreateIters mode") + } + sOpts.readState = es.mu.rs + } + iter := es.db.newIter(ctx, nil /* batch */, newIterOpts{snapshot: sOpts}, o) + return iter, nil +} + // NewIterWithContext is like NewIter, and additionally accepts a context for // tracing. func (es *EventuallyFileOnlySnapshot) NewIterWithContext( @@ -475,6 +517,9 @@ func (es *EventuallyFileOnlySnapshot) NewIterWithContext( default: } + if es.alwaysCreateIters { + return es.newAlwaysCreateIterWithContext(ctx, o) + } es.mu.Lock() defer es.mu.Unlock() if es.mu.vers != nil { @@ -482,14 +527,14 @@ func (es *EventuallyFileOnlySnapshot) NewIterWithContext( return es.db.newIter(ctx, nil /* batch */, newIterOpts{snapshot: sOpts}, o), nil } + sOpts := snapshotIterOpts{seqNum: es.seqNum} if es.excised.Load() { return nil, ErrSnapshotExcised } - sOpts := snapshotIterOpts{seqNum: es.seqNum} iter := es.db.newIter(ctx, nil /* batch */, newIterOpts{snapshot: sOpts}, o) // If excised is true, then keys relevant to the snapshot might not be - // present in the readState being used by the iterator. Error out. + // present in the readState being used by the iterator. if es.excised.Load() { iter.Close() return nil, ErrSnapshotExcised @@ -515,22 +560,10 @@ func (es *EventuallyFileOnlySnapshot) ScanInternal( if es.db == nil { panic(ErrClosed) } - if es.excised.Load() { + if es.excised.Load() && !es.alwaysCreateIters { return ErrSnapshotExcised } var sOpts snapshotIterOpts - es.mu.Lock() - if es.mu.vers != nil { - sOpts = snapshotIterOpts{ - seqNum: es.seqNum, - vers: es.mu.vers, - } - } else { - sOpts = snapshotIterOpts{ - seqNum: es.seqNum, - } - } - es.mu.Unlock() opts := &scanInternalOptions{ CategoryAndQoS: categoryAndQoS, IterOptions: IterOptions{ @@ -544,15 +577,43 @@ func (es *EventuallyFileOnlySnapshot) ScanInternal( visitSharedFile: visitSharedFile, skipSharedLevels: visitSharedFile != nil, } + if es.alwaysCreateIters { + // Grab the db mutex. This avoids races down below as it prevents excises + // from taking effect until the iterator is instantiated. + es.db.mu.Lock() + } + es.mu.Lock() + if es.mu.vers != nil { + sOpts = snapshotIterOpts{ + seqNum: es.seqNum, + vers: es.mu.vers, + } + } else { + if es.excised.Load() && es.alwaysCreateIters { + sOpts = snapshotIterOpts{ + readState: es.mu.rs, + seqNum: es.seqNum, + } + } else { + sOpts = snapshotIterOpts{ + seqNum: es.seqNum, + } + } + } + es.mu.Unlock() iter, err := es.db.newInternalIter(ctx, sOpts, opts) if err != nil { return err } defer iter.close() + if es.alwaysCreateIters { + // See the similar conditional above where we grab this mutex. + es.db.mu.Unlock() + } // If excised is true, then keys relevant to the snapshot might not be // present in the readState being used by the iterator. Error out. - if es.excised.Load() { + if es.excised.Load() && !es.alwaysCreateIters { return ErrSnapshotExcised } diff --git a/tool/manifest.go b/tool/manifest.go index fe210f5cd0..3cd9fa8cef 100644 --- a/tool/manifest.go +++ b/tool/manifest.go @@ -111,6 +111,9 @@ func (m *manifestT) printLevels(cmp base.Compare, stdout io.Writer, v *manifest. fmt.Fprintf(stdout, " %s:%d", f.FileNum, f.Size) formatSeqNumRange(stdout, f.SmallestSeqNum, f.LargestSeqNum) formatKeyRange(stdout, m.fmtKey, &f.Smallest, &f.Largest) + if f.Virtual { + fmt.Fprintf(stdout, "(virtual:backingNum=%s)", f.FileBacking.DiskFileNum) + } fmt.Fprintf(stdout, "\n") }) } @@ -125,6 +128,9 @@ func (m *manifestT) printLevels(cmp base.Compare, stdout io.Writer, v *manifest. fmt.Fprintf(stdout, " %s:%d", f.FileNum, f.Size) formatSeqNumRange(stdout, f.SmallestSeqNum, f.LargestSeqNum) formatKeyRange(stdout, m.fmtKey, &f.Smallest, &f.Largest) + if f.Virtual { + fmt.Fprintf(stdout, "(virtual:backingNum=%s)", f.FileBacking.DiskFileNum) + } fmt.Fprintf(stdout, "\n") } }