Skip to content

Commit

Permalink
metamorphic: add IngestAndExcise, make EFOS determinitic
Browse files Browse the repository at this point in the history
This change updates the metamorphic tests to support a new
useExcise testOption and a new IngestAndExcise operation, which
calls db.IngestAndExcise() if useExcise is true, or calls it
with an sstable containing DeleteRange/RangeKeyDelete for the
same span if useExcise is false.

Furthermore, a new private option is added, efosAlwaysCreatesIterators.
If the test option for useExcise (or the previously added useSharedReplicate)
is enabled, one of these two things happen to guarantee EFOS determinism
around excises:

1) If efosAlwaysCreatesIterators is true, the behaviour of EFOS
is changed to always hold onto a readState and never return
errors from NewIter.
2) If efosAlwaysCreatesIterators is false, regular EFOS behaviour
is retained, but a flush is completed after every EFOS instantiation
to guarantee that it becomes excise-immune right at instantiation.

Note that regular snapshots are never created when useExcise
or useSharedReplicate is true.

Fixes #2885.
  • Loading branch information
itsbilal committed Nov 8, 2023
1 parent 369800c commit 263b28f
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 41 deletions.
21 changes: 16 additions & 5 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,9 +995,12 @@ var iterAllocPool = sync.Pool{
// and the specified seqNum will be used as the snapshot seqNum.
// - EFOS in file-only state: Only `seqNum` and `vers` are set. All the
// relevant SSTs are referenced by the *version.
// - EFOS that has been excised but is in alwaysCreateIters mode (tests only).
// Only `seqNum` and `readState` are set.
type snapshotIterOpts struct {
seqNum uint64
vers *version
seqNum uint64
vers *version
readState *readState
}

type batchIterOpts struct {
Expand Down Expand Up @@ -1051,8 +1054,12 @@ func (d *DB) newIter(
// files in the associated version from being deleted if there is a current
// compaction. The readState is unref'd by Iterator.Close().
if internalOpts.snapshot.vers == nil {
// NB: loadReadState() calls readState.ref().
readState = d.loadReadState()
if internalOpts.snapshot.readState != nil {
readState = internalOpts.snapshot.readState
} else {
// NB: loadReadState() calls readState.ref().
readState = d.loadReadState()
}
} else {
// vers != nil
internalOpts.snapshot.vers.Ref()
Expand Down Expand Up @@ -1276,7 +1283,11 @@ func (d *DB) newInternalIter(sOpts snapshotIterOpts, o *scanInternalOptions) *sc
// compaction. The readState is unref'd by Iterator.Close().
var readState *readState
if sOpts.vers == nil {
readState = d.loadReadState()
if sOpts.readState != nil {
readState = sOpts.readState
} else {
readState = d.loadReadState()
}
}
if sOpts.vers != nil {
sOpts.vers.Ref()
Expand Down
2 changes: 2 additions & 0 deletions metamorphic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
writerDelete
writerDeleteRange
writerIngest
writerIngestAndExcise
writerMerge
writerRangeKeyDelete
writerRangeKeySet
Expand Down Expand Up @@ -152,6 +153,7 @@ func defaultConfig() config {
writerDelete: 100,
writerDeleteRange: 50,
writerIngest: 100,
writerIngestAndExcise: 50,
writerMerge: 100,
writerRangeKeySet: 10,
writerRangeKeyUnset: 10,
Expand Down
54 changes: 33 additions & 21 deletions metamorphic/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func generate(rng *rand.Rand, count uint64, cfg config, km *keyManager) []op {
writerDelete: g.writerDelete,
writerDeleteRange: g.writerDeleteRange,
writerIngest: g.writerIngest,
writerIngestAndExcise: g.writerIngestAndExcise,
writerMerge: g.writerMerge,
writerRangeKeyDelete: g.writerRangeKeyDelete,
writerRangeKeySet: g.writerRangeKeySet,
Expand Down Expand Up @@ -1220,28 +1221,15 @@ func (g *generator) newSnapshot() {
snapID: snapID,
}

// With 75% probability, impose bounds on the keys that may be read with the
// snapshot. Setting bounds allows some runs of the metamorphic test to use
// a EventuallyFileOnlySnapshot instead of a Snapshot, testing equivalence
// between the two for reads within those bounds.
//
// If we're in multi-instance mode, we must always create bounds, as we will
// always create EventuallyFileOnlySnapshots to allow commands that use excises
// (eg. replicateOp) to work.
if g.rng.Float64() < 0.75 || g.dbs.Len() > 1 {
s.bounds = g.generateDisjointKeyRanges(
g.rng.Intn(5) + 1, /* between 1-5 */
)
g.snapshotBounds[snapID] = s.bounds
}
// Impose bounds on the keys that may be read with the snapshot. Setting bounds
// allows some runs of the metamorphic test to use a EventuallyFileOnlySnapshot
// instead of a Snapshot, testing equivalence between the two for reads within
// those bounds.
s.bounds = g.generateDisjointKeyRanges(
g.rng.Intn(5) + 1, /* between 1-5 */
)
g.snapshotBounds[snapID] = s.bounds
g.add(s)
if g.dbs.Len() > 1 {
// Do a flush after each EFOS, if we're in multi-instance mode. This limits
// the testing area of EFOS, but allows them to be used alongside operations
// that do an excise (eg. replicateOp). This will be revisited when
// https://github.com/cockroachdb/pebble/issues/2885 is implemented.
g.add(&flushOp{dbID})
}
}

func (g *generator) snapshotClose() {
Expand Down Expand Up @@ -1446,6 +1434,30 @@ func (g *generator) writerIngest() {
})
}

func (g *generator) writerIngestAndExcise() {
if len(g.liveBatches) == 0 {
return
}

dbID := g.dbs.rand(g.rng)
// Ingest between 1 and 3 batches.
batchID := g.liveBatches.rand(g.rng)
g.removeBatchFromGenerator(batchID)

start := g.randKeyToWrite(0.001)
end := g.randKeyToWrite(0.001)
if g.cmp(start, end) > 0 {
start, end = end, start
}

g.add(&ingestAndExciseOp{
dbID: dbID,
batchID: batchID,
exciseStart: start,
exciseEnd: end,
})
}

func (g *generator) writerMerge() {
if len(g.liveWriters) == 0 {
return
Expand Down
164 changes: 160 additions & 4 deletions metamorphic/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,131 @@ func (o *ingestOp) String() string {
return buf.String()
}

type ingestAndExciseOp struct {
dbID objID
batchID objID
exciseStart, exciseEnd []byte
}

func (o *ingestAndExciseOp) run(t *test, h historyRecorder) {
var err error
b := t.getBatch(o.batchID)
t.clearObj(o.batchID)
if b.Empty() {
h.Recordf("%s // %v", o, err)
return
}
if !t.testOpts.useExcise {
// Throw in a rangedel into the batch. This will be
// fragmented / deduplicated by the batch iterator.
err = firstError(err, b.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts))
}
path, err2 := o.build(t, h, b, 0 /* i */)
if err2 != nil {
h.Recordf("Build(%s) // %v", o.batchID, err2)
}
err = firstError(err, err2)
err = firstError(err, b.Close())

if t.testOpts.useExcise {
err = firstError(err, withRetries(func() error {
_, err := t.getDB(o.dbID).IngestAndExcise([]string{path}, nil /* sharedSSTs */, pebble.KeyRange{
Start: o.exciseStart,
End: o.exciseEnd,
})
return err
}))
} else {
err = firstError(err, withRetries(func() error {
return t.getDB(o.dbID).Ingest([]string{path})
}))
}

h.Recordf("%s // %v", o, err)
}

func (o *ingestAndExciseOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (string, error) {
path := t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", o.dbID.slot(), i))
f, err := t.opts.FS.Create(path)
if err != nil {
return "", err
}
db := t.getDB(o.dbID)

iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
defer closeIters(iter, rangeDelIter, rangeKeyIter)

equal := t.opts.Comparer.Equal
tableFormat := db.FormatMajorVersion().MaxTableFormat()
w := sstable.NewWriter(
objstorageprovider.NewFileWritable(f),
t.opts.MakeWriterOptions(0, tableFormat),
)

var lastUserKey []byte
for key, value := iter.First(); key != nil; key, value = iter.Next() {
// Ignore duplicate keys.
if equal(lastUserKey, key.UserKey) {
continue
}
// NB: We don't have to copy the key or value since we're reading from a
// batch which doesn't do prefix compression.
lastUserKey = key.UserKey

key.SetSeqNum(base.SeqNumZero)
// It's possible that we wrote the key on a batch from a db that supported
// DeleteSized, but are now ingesting into a db that does not. Detect
// this case and translate the key to an InternalKeyKindDelete.
if key.Kind() == pebble.InternalKeyKindDeleteSized && !t.isFMV(o.dbID, pebble.FormatDeleteSizedAndObsolete) {
value = pebble.LazyValue{}
key.SetKind(pebble.InternalKeyKindDelete)
}
if err := w.Add(*key, value.InPlaceValue()); err != nil {
return "", err
}
}
if err := iter.Close(); err != nil {
return "", err
}
iter = nil

if rangeDelIter != nil {
// NB: The range tombstones have already been fragmented by the Batch.
for t := rangeDelIter.First(); t != nil; t = rangeDelIter.Next() {
// NB: We don't have to copy the key or value since we're reading from a
// batch which doesn't do prefix compression.
if err := w.DeleteRange(t.Start, t.End); err != nil {
return "", err
}
}
if err := rangeDelIter.Close(); err != nil {
return "", err
}
rangeDelIter = nil
}
if !t.testOpts.useExcise {
if err := w.RangeKeyDelete(o.exciseStart, o.exciseEnd); err != nil {
return "", err
}
}

if err := w.Close(); err != nil {
return "", err
}
return path, nil
}

func (o *ingestAndExciseOp) receiver() objID { return o.dbID }
func (o *ingestAndExciseOp) syncObjs() objIDSlice {
// Ingest should not be concurrent with mutating the batches that will be
// ingested as sstables.
return []objID{o.batchID}
}

func (o *ingestAndExciseOp) String() string {
return fmt.Sprintf("%s.IngestAndExcise(%s, %q, %q)", o.dbID, o.batchID, o.exciseStart, o.exciseEnd)
}

// getOp models a Reader.Get operation.
type getOp struct {
readerID objID
Expand Down Expand Up @@ -1237,10 +1362,28 @@ type newSnapshotOp struct {
}

func (o *newSnapshotOp) run(t *test, h historyRecorder) {
bounds := o.bounds
if len(bounds) == 0 {
panic("bounds unexpectedly unset for newSnapshotOp")
}
// Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
if len(t.dbs) > 1 || (len(o.bounds) > 0 && ((11400714819323198485*uint64(t.idx)*t.testOpts.seedEFOS)>>63) == 1) {
s := t.getDB(o.dbID).NewEventuallyFileOnlySnapshot(o.bounds)
createEfos := ((11400714819323198485 * uint64(t.idx) * t.testOpts.seedEFOS) >> 63) == 1
// If either of these options is true, an EFOS _must_ be created, regardless
// of what the fibonacci hash returned.
excisePossible := t.testOpts.useSharedReplicate || t.testOpts.useExcise
if createEfos || excisePossible {
s := t.getDB(o.dbID).NewEventuallyFileOnlySnapshot(bounds)
t.setSnapshot(o.snapID, s)
// If the EFOS isn't guaranteed to always create iterators, we must force
// a flush on this DB so it transitions this EFOS into a file-only snapshot.
if excisePossible && !t.testOpts.efosAlwaysCreatesIters {
err := t.getDB(o.dbID).Flush()
if err != nil {
h.Recordf("%s // %v", o, err)
h.history.err.Store(errors.Wrap(err, "newSnapshotOp"))
return
}
}
} else {
s := t.getDB(o.dbID).NewSnapshot()
t.setSnapshot(o.snapID, s)
Expand Down Expand Up @@ -1359,6 +1502,12 @@ func (r *replicateOp) runSharedReplicate(
return
}

err = w.Close()
if err != nil {
h.Recordf("%s // %v", r, err)
return
}

_, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, pebble.KeyRange{Start: r.start, End: r.end})
h.Recordf("%s // %v", r, err)
}
Expand Down Expand Up @@ -1418,11 +1567,18 @@ func (r *replicateOp) run(t *test, h historyRecorder) {
if hasRange && iter.RangeKeyChanged() {
rangeKeys := iter.RangeKeys()
rkStart, rkEnd := iter.RangeBounds()

span := &keyspan.Span{Start: rkStart, End: rkEnd, Keys: make([]keyspan.Key, len(rangeKeys))}
for i := range rangeKeys {
if err := w.RangeKeySet(rkStart, rkEnd, rangeKeys[i].Suffix, rangeKeys[i].Value); err != nil {
panic(err)
span.Keys[i] = keyspan.Key{
Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
Suffix: rangeKeys[i].Suffix,
Value: rangeKeys[i].Value,
}
}
if err := rangekey.Encode(span, w.AddRangeKey); err != nil {
panic(err)
}
}
}
if err := w.Close(); err != nil {
Expand Down
37 changes: 37 additions & 0 deletions metamorphic/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,16 @@ func parseOptions(
return true
}
return true
case "TestOptions.use_shared_replicate":
opts.useSharedReplicate = true
return true
case "TestOptions.use_excise":
opts.useExcise = true
return true
case "TestOptions.efos_always_creates_iterators":
opts.efosAlwaysCreatesIters = true
opts.Opts.TestingAlwaysCreateEFOSIterators(true /* value */)
return true
default:
if customOptionParsers == nil {
return false
Expand Down Expand Up @@ -191,6 +201,15 @@ func optionsToString(opts *TestOptions) string {
if opts.ingestSplit {
fmt.Fprintf(&buf, " ingest_split=%v\n", opts.ingestSplit)
}
if opts.useSharedReplicate {
fmt.Fprintf(&buf, " use_shared_replicate=%v\n", opts.useSharedReplicate)
}
if opts.useExcise {
fmt.Fprintf(&buf, " use_excise=%v\n", opts.useExcise)
}
if opts.efosAlwaysCreatesIters {
fmt.Fprintf(&buf, " efos_always_creates_iterators=%v\n", opts.efosAlwaysCreatesIters)
}
for _, customOpt := range opts.CustomOpts {
fmt.Fprintf(&buf, " %s=%s\n", customOpt.Name(), customOpt.Value())
}
Expand Down Expand Up @@ -267,6 +286,16 @@ type TestOptions struct {
// Enables ingest splits. Saved here for serialization as Options does not
// serialize this.
ingestSplit bool
// Enables operations that do excises. Note that a false value for this does
// not guarantee the lack of excises, as useSharedReplicate can also cause
// excises. However !useExcise && !useSharedReplicate can be used to guarantee
// lack of excises.
useExcise bool
// Enables EFOS to always create iterators, even if a conflicting excise
// happens. Used to guarantee EFOS determinism when conflicting excises are
// in play. If false, EFOS determinism is maintained by having the DB do a
// flush after every new EFOS.
efosAlwaysCreatesIters bool
}

// CustomOption defines a custom option that configures the behavior of an
Expand Down Expand Up @@ -571,6 +600,14 @@ func randomOptions(
testOpts.seedEFOS = rng.Uint64()
testOpts.ingestSplit = rng.Intn(2) == 0
opts.Experimental.IngestSplit = func() bool { return testOpts.ingestSplit }
testOpts.useExcise = rng.Intn(2) == 0
if testOpts.useExcise || testOpts.useSharedReplicate {
testOpts.efosAlwaysCreatesIters = rng.Intn(2) == 0
opts.TestingAlwaysCreateEFOSIterators(testOpts.efosAlwaysCreatesIters)
if testOpts.Opts.FormatMajorVersion < pebble.FormatVirtualSSTables {
testOpts.Opts.FormatMajorVersion = pebble.FormatVirtualSSTables
}
}
testOpts.Opts.EnsureDefaults()
return testOpts
}
Expand Down
Loading

0 comments on commit 263b28f

Please sign in to comment.