Skip to content

Commit

Permalink
metamorphic: add IngestAndExcise, make EFOS determinitic
Browse files Browse the repository at this point in the history
This change updates the metamorphic tests to support a new
useExcise testOption and a new IngestAndExcise operation, which
calls db.IngestAndExcise() if useExcise is true, or calls it
with an sstable containing DeleteRange/RangeKeyDelete for the
same span if useExcise is false.

Furthermore, a new private option is added, efosAlwaysCreatesIterators.
If the test option for useExcise (or the previously added useSharedReplicate)
is enabled, one of these two things happen to guarantee EFOS determinism
around excises:

1) If efosAlwaysCreatesIterators is true, the behaviour of EFOS
is changed to always hold onto a readState and never return
errors from NewIter.
2) If efosAlwaysCreatesIterators is false, regular EFOS behaviour
is retained, but a flush is completed after every EFOS instantiation
to guarantee that it becomes excise-immune right at instantiation.

Note that regular snapshots are never created when useExcise
or useSharedReplicate is true.

Fixes cockroachdb#2885.
  • Loading branch information
itsbilal committed Nov 10, 2023
1 parent dfe1b00 commit 1657dc5
Show file tree
Hide file tree
Showing 8 changed files with 275 additions and 55 deletions.
21 changes: 16 additions & 5 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,9 +995,12 @@ var iterAllocPool = sync.Pool{
// and the specified seqNum will be used as the snapshot seqNum.
// - EFOS in file-only state: Only `seqNum` and `vers` are set. All the
// relevant SSTs are referenced by the *version.
// - EFOS that has been excised but is in alwaysCreateIters mode (tests only).
// Only `seqNum` and `readState` are set.
type snapshotIterOpts struct {
seqNum uint64
vers *version
seqNum uint64
vers *version
readState *readState
}

type batchIterOpts struct {
Expand Down Expand Up @@ -1051,8 +1054,12 @@ func (d *DB) newIter(
// files in the associated version from being deleted if there is a current
// compaction. The readState is unref'd by Iterator.Close().
if internalOpts.snapshot.vers == nil {
// NB: loadReadState() calls readState.ref().
readState = d.loadReadState()
if internalOpts.snapshot.readState != nil {
readState = internalOpts.snapshot.readState
} else {
// NB: loadReadState() calls readState.ref().
readState = d.loadReadState()
}
} else {
// vers != nil
internalOpts.snapshot.vers.Ref()
Expand Down Expand Up @@ -1276,7 +1283,11 @@ func (d *DB) newInternalIter(sOpts snapshotIterOpts, o *scanInternalOptions) *sc
// compaction. The readState is unref'd by Iterator.Close().
var readState *readState
if sOpts.vers == nil {
readState = d.loadReadState()
if sOpts.readState != nil {
readState = sOpts.readState
} else {
readState = d.loadReadState()
}
}
if sOpts.vers != nil {
sOpts.vers.Ref()
Expand Down
2 changes: 2 additions & 0 deletions metamorphic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
writerDelete
writerDeleteRange
writerIngest
writerIngestAndExcise
writerMerge
writerRangeKeyDelete
writerRangeKeySet
Expand Down Expand Up @@ -152,6 +153,7 @@ func defaultConfig() config {
writerDelete: 100,
writerDeleteRange: 50,
writerIngest: 100,
writerIngestAndExcise: 50,
writerMerge: 100,
writerRangeKeySet: 10,
writerRangeKeyUnset: 10,
Expand Down
54 changes: 33 additions & 21 deletions metamorphic/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ func generate(rng *rand.Rand, count uint64, cfg config, km *keyManager) []op {
writerDelete: g.writerDelete,
writerDeleteRange: g.writerDeleteRange,
writerIngest: g.writerIngest,
writerIngestAndExcise: g.writerIngestAndExcise,
writerMerge: g.writerMerge,
writerRangeKeyDelete: g.writerRangeKeyDelete,
writerRangeKeySet: g.writerRangeKeySet,
Expand Down Expand Up @@ -1220,28 +1221,15 @@ func (g *generator) newSnapshot() {
snapID: snapID,
}

// With 75% probability, impose bounds on the keys that may be read with the
// snapshot. Setting bounds allows some runs of the metamorphic test to use
// a EventuallyFileOnlySnapshot instead of a Snapshot, testing equivalence
// between the two for reads within those bounds.
//
// If we're in multi-instance mode, we must always create bounds, as we will
// always create EventuallyFileOnlySnapshots to allow commands that use excises
// (eg. replicateOp) to work.
if g.rng.Float64() < 0.75 || g.dbs.Len() > 1 {
s.bounds = g.generateDisjointKeyRanges(
g.rng.Intn(5) + 1, /* between 1-5 */
)
g.snapshotBounds[snapID] = s.bounds
}
// Impose bounds on the keys that may be read with the snapshot. Setting bounds
// allows some runs of the metamorphic test to use a EventuallyFileOnlySnapshot
// instead of a Snapshot, testing equivalence between the two for reads within
// those bounds.
s.bounds = g.generateDisjointKeyRanges(
g.rng.Intn(5) + 1, /* between 1-5 */
)
g.snapshotBounds[snapID] = s.bounds
g.add(s)
if g.dbs.Len() > 1 {
// Do a flush after each EFOS, if we're in multi-instance mode. This limits
// the testing area of EFOS, but allows them to be used alongside operations
// that do an excise (eg. replicateOp). This will be revisited when
// https://github.com/cockroachdb/pebble/issues/2885 is implemented.
g.add(&flushOp{dbID})
}
}

func (g *generator) snapshotClose() {
Expand Down Expand Up @@ -1446,6 +1434,30 @@ func (g *generator) writerIngest() {
})
}

func (g *generator) writerIngestAndExcise() {
if len(g.liveBatches) == 0 {
return
}

dbID := g.dbs.rand(g.rng)
// Ingest between 1 and 3 batches.
batchID := g.liveBatches.rand(g.rng)
g.removeBatchFromGenerator(batchID)

start := g.randKeyToWrite(0.001)
end := g.randKeyToWrite(0.001)
if g.cmp(start, end) > 0 {
start, end = end, start
}

g.add(&ingestAndExciseOp{
dbID: dbID,
batchID: batchID,
exciseStart: start,
exciseEnd: end,
})
}

func (g *generator) writerMerge() {
if len(g.liveWriters) == 0 {
return
Expand Down
149 changes: 131 additions & 18 deletions metamorphic/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,13 +518,15 @@ func (o *ingestOp) run(t *test, h historyRecorder) {
h.Recordf("%s // %v", o, err)
}

func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (string, error) {
path := t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", o.dbID.slot(), i))
func buildForIngest(
t *test, dbID objID, h historyRecorder, b *pebble.Batch, i int,
) (string, error) {
path := t.opts.FS.PathJoin(t.tmpDir, fmt.Sprintf("ext%d-%d", dbID.slot(), i))
f, err := t.opts.FS.Create(path)
if err != nil {
return "", err
}
db := t.getDB(o.dbID)
db := t.getDB(dbID)

iter, rangeDelIter, rangeKeyIter := private.BatchSort(b)
defer closeIters(iter, rangeDelIter, rangeKeyIter)
Expand All @@ -550,7 +552,7 @@ func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (st
// It's possible that we wrote the key on a batch from a db that supported
// DeleteSized, but are now ingesting into a db that does not. Detect
// this case and translate the key to an InternalKeyKindDelete.
if key.Kind() == pebble.InternalKeyKindDeleteSized && !t.isFMV(o.dbID, pebble.FormatDeleteSizedAndObsolete) {
if key.Kind() == pebble.InternalKeyKindDeleteSized && !t.isFMV(dbID, pebble.FormatDeleteSizedAndObsolete) {
value = pebble.LazyValue{}
key.SetKind(pebble.InternalKeyKindDelete)
}
Expand Down Expand Up @@ -610,6 +612,10 @@ func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (st
return path, nil
}

func (o *ingestOp) build(t *test, h historyRecorder, b *pebble.Batch, i int) (string, error) {
return buildForIngest(t, o.dbID, h, b, i)
}

func (o *ingestOp) receiver() objID { return o.dbID }
func (o *ingestOp) syncObjs() objIDSlice {
// Ingest should not be concurrent with mutating the batches that will be
Expand Down Expand Up @@ -754,6 +760,68 @@ func (o *ingestOp) String() string {
return buf.String()
}

type ingestAndExciseOp struct {
dbID objID
batchID objID
exciseStart, exciseEnd []byte
}

func (o *ingestAndExciseOp) run(t *test, h historyRecorder) {
var err error
b := t.getBatch(o.batchID)
t.clearObj(o.batchID)
if b.Empty() || t.testOpts.Opts.Comparer.Compare(o.exciseEnd, o.exciseStart) <= 0 {
// No-op.
h.Recordf("%s // %v", o, err)
return
}
if !t.testOpts.useExcise {
// Throw in a rangedel and rangekeydel into the batch. This will be
// fragmented / deduplicated by the batch iterator.
err = firstError(err, b.DeleteRange(o.exciseStart, o.exciseEnd, t.writeOpts))
err = firstError(err, b.RangeKeyDelete(o.exciseStart, o.exciseEnd, t.writeOpts))
}
path, err2 := o.build(t, h, b, 0 /* i */)
if err2 != nil {
h.Recordf("Build(%s) // %v", o.batchID, err2)
}
err = firstError(err, err2)
err = firstError(err, b.Close())

if t.testOpts.useExcise {
err = firstError(err, withRetries(func() error {
_, err := t.getDB(o.dbID).IngestAndExcise([]string{path}, nil /* sharedSSTs */, pebble.KeyRange{
Start: o.exciseStart,
End: o.exciseEnd,
})
return err
}))
} else {
err = firstError(err, withRetries(func() error {
return t.getDB(o.dbID).Ingest([]string{path})
}))
}

h.Recordf("%s // %v", o, err)
}

func (o *ingestAndExciseOp) build(
t *test, h historyRecorder, b *pebble.Batch, i int,
) (string, error) {
return buildForIngest(t, o.dbID, h, b, i)
}

func (o *ingestAndExciseOp) receiver() objID { return o.dbID }
func (o *ingestAndExciseOp) syncObjs() objIDSlice {
// Ingest should not be concurrent with mutating the batches that will be
// ingested as sstables.
return []objID{o.batchID}
}

func (o *ingestAndExciseOp) String() string {
return fmt.Sprintf("%s.IngestAndExcise(%s, %q, %q)", o.dbID, o.batchID, o.exciseStart, o.exciseEnd)
}

// getOp models a Reader.Get operation.
type getOp struct {
readerID objID
Expand Down Expand Up @@ -1291,10 +1359,28 @@ type newSnapshotOp struct {
}

func (o *newSnapshotOp) run(t *test, h historyRecorder) {
bounds := o.bounds
if len(bounds) == 0 {
panic("bounds unexpectedly unset for newSnapshotOp")
}
// Fibonacci hash https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/
if len(t.dbs) > 1 || (len(o.bounds) > 0 && ((11400714819323198485*uint64(t.idx)*t.testOpts.seedEFOS)>>63) == 1) {
s := t.getDB(o.dbID).NewEventuallyFileOnlySnapshot(o.bounds)
createEfos := ((11400714819323198485 * uint64(t.idx) * t.testOpts.seedEFOS) >> 63) == 1
// If either of these options is true, an EFOS _must_ be created, regardless
// of what the fibonacci hash returned.
excisePossible := t.testOpts.useSharedReplicate || t.testOpts.useExcise
if createEfos || excisePossible {
s := t.getDB(o.dbID).NewEventuallyFileOnlySnapshot(bounds)
t.setSnapshot(o.snapID, s)
// If the EFOS isn't guaranteed to always create iterators, we must force
// a flush on this DB so it transitions this EFOS into a file-only snapshot.
if excisePossible && !t.testOpts.efosAlwaysCreatesIters {
err := t.getDB(o.dbID).Flush()
if err != nil {
h.Recordf("%s // %v", o, err)
h.history.err.Store(errors.Wrap(err, "newSnapshotOp"))
return
}
}
} else {
s := t.getDB(o.dbID).NewSnapshot()
t.setSnapshot(o.snapID, s)
Expand Down Expand Up @@ -1396,12 +1482,18 @@ func (r *replicateOp) runSharedReplicate(
s := keyspan.Span{
Start: start,
End: end,
Keys: keys,
Keys: make([]keyspan.Key, len(keys)),
KeysOrder: 0,
}
return rangekey.Encode(&s, func(k base.InternalKey, v []byte) error {
return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v)
})
for i := range keys {
s.Keys[i] = keyspan.Key{
Trailer: base.MakeTrailer(0, keys[i].Kind()),
Suffix: keys[i].Suffix,
Value: keys[i].Value,
}
}
keyspan.SortKeysByTrailer(&s.Keys)
return rangekey.Encode(&s, w.AddRangeKey)
},
func(sst *pebble.SharedSSTMeta) error {
sharedSSTs = append(sharedSSTs, *sst)
Expand All @@ -1413,6 +1505,12 @@ func (r *replicateOp) runSharedReplicate(
return
}

err = w.Close()
if err != nil {
h.Recordf("%s // %v", r, err)
return
}

_, err = dest.IngestAndExcise([]string{sstPath}, sharedSSTs, pebble.KeyRange{Start: r.start, End: r.end})
h.Recordf("%s // %v", r, err)
}
Expand All @@ -1439,6 +1537,12 @@ func (r *replicateOp) run(t *test, h historyRecorder) {
return
}

// First, do a RangeKeyDelete on the whole span. Adding one to the sstable
// is more challenging as it needs to be fragmented.
if err := dest.RangeKeyDelete(r.start, r.end, t.writeOpts); err != nil {
h.Recordf("%s // %v", r, err)
return
}
iter, err := source.NewIter(&pebble.IterOptions{
LowerBound: r.start,
UpperBound: r.end,
Expand All @@ -1449,16 +1553,13 @@ func (r *replicateOp) run(t *test, h historyRecorder) {
}
defer iter.Close()

// Write rangedels and rangekeydels for the range. This mimics the Excise
// that runSharedReplicate would do.
// Write a rangedel for the range. This, plus the earlier RangeKeyDelete,
// mimics the Excise that runSharedReplicate would do.
if err := w.DeleteRange(r.start, r.end); err != nil {
panic(err)
}
if err := w.RangeKeyDelete(r.start, r.end); err != nil {
panic(err)
}

for ok := iter.SeekGE(r.start); ok && iter.Error() != nil; ok = iter.Next() {
for ok := iter.SeekGE(r.start); ok && iter.Error() == nil; ok = iter.Next() {
hasPoint, hasRange := iter.HasPointAndRange()
if hasPoint {
val, err := iter.ValueAndErr()
Expand All @@ -1472,13 +1573,25 @@ func (r *replicateOp) run(t *test, h historyRecorder) {
if hasRange && iter.RangeKeyChanged() {
rangeKeys := iter.RangeKeys()
rkStart, rkEnd := iter.RangeBounds()

span := &keyspan.Span{Start: rkStart, End: rkEnd, Keys: make([]keyspan.Key, len(rangeKeys))}
for i := range rangeKeys {
if err := w.RangeKeySet(rkStart, rkEnd, rangeKeys[i].Suffix, rangeKeys[i].Value); err != nil {
panic(err)
span.Keys[i] = keyspan.Key{
Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeKeySet),
Suffix: rangeKeys[i].Suffix,
Value: rangeKeys[i].Value,
}
}
keyspan.SortKeysByTrailer(&span.Keys)
if err := rangekey.Encode(span, w.AddRangeKey); err != nil {
panic(err)
}
}
}
if err := iter.Error(); err != nil {
h.Recordf("%s // %v", r, err)
return
}
if err := w.Close(); err != nil {
panic(err)
}
Expand Down
Loading

0 comments on commit 1657dc5

Please sign in to comment.