From 4c72ea53003dd8d7cd36a1961047358b05e10e0f Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Sun, 2 Feb 2020 16:08:10 -0500 Subject: [PATCH] engine/metamorphic: Add engine restarts, compare across runs` This PR builds on top of #44458 (all commits before the last one are from that PR). It adds two things: one, it ensures that whenever successive engine types are tested, it does a comparison for matching outputs. This will ensure CI will fail if any change down the line causes an observed difference in behaviour between rocksdb and pebble. It also adds more MVCC operations that would be useful to test, such as MVCCFindSplitKey, MVCCDeleteRange, MVCCClearTimeRange, MVCCConditionalPut, etc. Furthermore, it adds a new kind of operation: restart. Restarts are special in that they're added after the specified n number of runs have happened; so a test run with 3 specified engine types (so 2 restarts), and n = 10000 will have 3 * 10000 = 30000 operations. A restart closes all open objects, then closes the engine, and starts up the next engine in the specified engine sequence. This, combined with the aforementioned checking functionality across different engine runs, lets us test for bidirectional compatibility across different engines. Fixes #43762 . Release note: None. --- pkg/storage/engine/metamorphic/generator.go | 120 +++++++- pkg/storage/engine/metamorphic/meta_test.go | 250 ++++++++++----- pkg/storage/engine/metamorphic/operands.go | 29 +- pkg/storage/engine/metamorphic/operations.go | 304 +++++++++++++++---- 4 files changed, 562 insertions(+), 141 deletions(-) diff --git a/pkg/storage/engine/metamorphic/generator.go b/pkg/storage/engine/metamorphic/generator.go index 9311e2e2637d..6c70fa762695 100644 --- a/pkg/storage/engine/metamorphic/generator.go +++ b/pkg/storage/engine/metamorphic/generator.go @@ -19,13 +19,58 @@ import ( "strings" "testing" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/pebble" ) const zipfMax uint64 = 100000 +func makeStorageConfig(path string) base.StorageConfig { + return base.StorageConfig{ + Dir: path, + Settings: cluster.MakeTestingClusterSettings(), + } +} + +func createTestRocksDBEngine(path string) (engine.Engine, error) { + cache := engine.NewRocksDBCache(1 << 20) + defer cache.Release() + cfg := engine.RocksDBConfig{ + StorageConfig: makeStorageConfig(path), + ReadOnly: false, + } + + return engine.NewRocksDB(cfg, cache) +} + +func createTestPebbleEngine(path string) (engine.Engine, error) { + pebbleConfig := engine.PebbleConfig{ + StorageConfig: makeStorageConfig(path), + Opts: engine.DefaultPebbleOptions(), + } + pebbleConfig.Opts.Cache = pebble.NewCache(1 << 20) + + return engine.NewPebble(context.Background(), pebbleConfig) +} + +type engineImpl struct { + name string + create func(path string) (engine.Engine, error) +} + +var _ fmt.Stringer = &engineImpl{} + +func (e *engineImpl) String() string { + return e.name +} + +var engineImplRocksDB = engineImpl{"rocksdb", createTestRocksDBEngine} +var engineImplPebble = engineImpl{"pebble", createTestPebbleEngine} + // Object to store info corresponding to one metamorphic test run. Responsible // for generating and executing operations. type metaTestRunner struct { @@ -34,6 +79,10 @@ type metaTestRunner struct { t *testing.T rng *rand.Rand seed int64 + path string + engineImpls []engineImpl + curEngine int + restarts bool engine engine.Engine tsGenerator tsGenerator managers map[operandType]operandManager @@ -46,6 +95,13 @@ func (m *metaTestRunner) init() { // test runs should guarantee the same operations being generated. m.rng = rand.New(rand.NewSource(m.seed)) m.tsGenerator.init(m.rng) + m.curEngine = 0 + + var err error + m.engine, err = m.engineImpls[0].create(m.path) + if err != nil { + m.t.Fatal(err) + } m.managers = map[operandType]operandManager{ operandTransaction: &txnManager{ @@ -57,7 +113,7 @@ func (m *metaTestRunner) init() { }, operandReadWriter: &readWriterManager{ rng: m.rng, - eng: m.engine, + m: m, batchToIDMap: make(map[engine.Batch]int), }, operandMVCCKey: &keyManager{ @@ -89,6 +145,10 @@ func (m *metaTestRunner) init() { // Run this function in a defer to ensure any Fatals on m.t do not cause panics // due to leaked iterators. func (m *metaTestRunner) closeAll() { + if m.engine == nil { + // Engine already closed; possibly running in a defer after a panic. + return + } // Close all open objects. This should let the engine close cleanly. closingOrder := []operandType{ operandIterator, @@ -98,13 +158,14 @@ func (m *metaTestRunner) closeAll() { for _, operandType := range closingOrder { m.managers[operandType].closeAll() } + m.engine.Close() + m.engine = nil } // generateAndRun generates n operations using a TPCC-style deck shuffle with // weighted probabilities of each operation appearing. func (m *metaTestRunner) generateAndRun(n int) { deck := newDeck(m.rng, m.weights...) - for i := 0; i < n; i++ { op := &operations[deck.Int()] @@ -112,23 +173,63 @@ func (m *metaTestRunner) generateAndRun(n int) { } } +// Closes the current engine and starts another one up, with the same path. +// Returns the engine transition that +func (m *metaTestRunner) restart() (string, string) { + m.closeAll() + oldEngineName := m.engineImpls[m.curEngine].name + // TODO(itsbilal): Select engines at random instead of cycling through them. + m.curEngine++ + if m.curEngine >= len(m.engineImpls) { + // If we're restarting more times than the number of engine implementations + // specified, loop back around to the first engine type specified. + m.curEngine = 0 + } + + var err error + m.engine, err = m.engineImpls[m.curEngine].create(m.path) + if err != nil { + m.t.Fatal(err) + } + return oldEngineName, m.engineImpls[m.curEngine].name +} + func (m *metaTestRunner) parseFileAndRun(f io.Reader) { reader := bufio.NewReader(f) lineCount := 0 for { var argList []operand var opName, argListString, expectedOutput string + var firstByte byte var err error lineCount++ - // TODO(itsbilal): Implement the ability to skip comments. + // Read the first byte to check if this line is a comment. + firstByte, err = reader.ReadByte() + if err != nil { + if err == io.EOF { + return + } + m.t.Fatal(err) + } + if firstByte == '#' { + // Advance to the end of the line and continue. + if _, err := reader.ReadString('\n'); err != nil { + if err == io.EOF { + return + } + m.t.Fatal(err) + } + continue + } + if opName, err = reader.ReadString('('); err != nil { if err == io.EOF { return } m.t.Fatal(err) } - opName = opName[:len(opName)-1] + opName = string(firstByte) + opName[:len(opName)-1] if argListString, err = reader.ReadString(')'); err != nil { m.t.Fatal(err) @@ -173,6 +274,11 @@ func (m *metaTestRunner) parseFileAndRun(f io.Reader) { }) if strings.Compare(strings.TrimSpace(expectedOutput), strings.TrimSpace(actualOutput)) != 0 { + // Error messages can sometimes mismatch. If both outputs contain "error", + // consider this a pass. + if strings.Contains(expectedOutput, "error") && strings.Contains(actualOutput, "error") { + continue + } m.t.Fatalf("mismatching output at line %d: expected %s, got %s", lineCount, expectedOutput, actualOutput) } } @@ -236,6 +342,12 @@ func (m *metaTestRunner) printOp(op *mvccOp, argStrings []string, output string) fmt.Fprintf(m.w, ") -> %s\n", output) } +// printComment prints a comment line into the output file. Supports single-line +// comments only. +func (m *metaTestRunner) printComment(comment string) { + fmt.Fprintf(m.w, "# %s\n", comment) +} + // Monotonically increasing timestamp generator. type tsGenerator struct { lastTS hlc.Timestamp diff --git a/pkg/storage/engine/metamorphic/meta_test.go b/pkg/storage/engine/metamorphic/meta_test.go index 3a868f6712ac..034b05b66e7a 100644 --- a/pkg/storage/engine/metamorphic/meta_test.go +++ b/pkg/storage/engine/metamorphic/meta_test.go @@ -18,102 +18,142 @@ import ( "math/rand" "os" "path/filepath" + "strings" "testing" - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/settings/cluster" - "github.com/cockroachdb/cockroach/pkg/storage/engine" - "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/leaktest" ) -func makeStorageConfig(path string) base.StorageConfig { - return base.StorageConfig{ - Attrs: roachpb.Attributes{}, - Dir: path, - MustExist: false, - MaxSize: 0, - Settings: cluster.MakeTestingClusterSettings(), - UseFileRegistry: false, - ExtraOptions: nil, - } -} +var ( + keep = flag.Bool("keep", false, "keep temp directories after test") + check = flag.String("check", "", "run operations in specified file and check output for equality") + seed = flag.Int64("seed", 456, "specify seed to use for random number generator") + opCount = flag.Int("operations", 1000, "number of MVCC operations to generate and run") +) -// createTestRocksDBEngine returns a new in-memory RocksDB engine with 1MB of -// storage capacity. -func createTestRocksDBEngine(path string) (engine.Engine, error) { - return engine.NewEngine(enginepb.EngineTypeRocksDB, 1<<20, makeStorageConfig(path)) +type testRun struct { + ctx context.Context + t *testing.T + seed int64 + checkFile string + restarts bool + engineSequences [][]engineImpl } -// createTestPebbleEngine returns a new in-memory Pebble storage engine. -func createTestPebbleEngine(path string) (engine.Engine, error) { - return engine.NewEngine(enginepb.EngineTypePebble, 1<<20, makeStorageConfig(path)) +type testRunForEngines struct { + ctx context.Context + t *testing.T + seed int64 + restarts bool + checkFile io.Reader + outputFile io.Writer + engineSequence []engineImpl } -var mvccEngineImpls = []struct { - name string - create func(path string) (engine.Engine, error) -}{ - {"rocksdb", createTestRocksDBEngine}, - {"pebble", createTestPebbleEngine}, +func runMetaTestForEngines(run testRunForEngines) { + tempDir, cleanup := testutils.TempDir(run.t) + defer func() { + if !*keep { + cleanup() + } + }() + + testRunner := metaTestRunner{ + ctx: run.ctx, + t: run.t, + w: run.outputFile, + seed: run.seed, + restarts: run.restarts, + engineImpls: run.engineSequence, + path: filepath.Join(tempDir, "store"), + } + fmt.Printf("store path = %s\n", testRunner.path) + + testRunner.init() + defer testRunner.closeAll() + if run.checkFile != nil { + testRunner.parseFileAndRun(run.checkFile) + } else { + testRunner.generateAndRun(*opCount) + } } -var ( - keep = flag.Bool("keep", false, "keep temp directories after test") - check = flag.String("check", "", "run operations in specified file and check output for equality") - seed = flag.Int64("seed", 456, "specify seed to use for random number generator") -) - -func runMetaTest(ctx context.Context, t *testing.T, seed int64, checkFile io.Reader) { - for _, engineImpl := range mvccEngineImpls { - t.Run(engineImpl.name, func(t *testing.T) { - tempDir, cleanup := testutils.TempDir(t) +func runMetaTest(run testRun) { + t := run.t + outerTempDir, cleanup := testutils.TempDir(run.t) + defer func() { + if !*keep { + cleanup() + } + }() + + // The test run with the first engine sequence writes its output to this file. + // All subsequent engine sequence runs compare their output against this file. + firstRunOutput := filepath.Join(outerTempDir, "output.meta") + firstRunExecuted := false + fmt.Printf("first run output file: %s\n", firstRunOutput) + + for _, engineSequence := range run.engineSequences { + var engineNames []string + for _, engineImpl := range engineSequence { + engineNames = append(engineNames, engineImpl.name) + } + + t.Run(strings.Join(engineNames, ","), func(t *testing.T) { + innerTempDir, cleanup := testutils.TempDir(t) defer func() { if !*keep { cleanup() } }() - eng, err := engineImpl.create(filepath.Join(tempDir, engineImpl.name)) - if err != nil { - t.Fatal(err) + // If this is not the first sequence run and a "check" file was not passed + // in, use the first run's output file as the check file. + var checkFileReader io.ReadCloser + if run.checkFile == "" && firstRunExecuted { + run.checkFile = firstRunOutput + } + if run.checkFile != "" { + var err error + checkFileReader, err = os.Open(run.checkFile) + if err != nil { + t.Fatal(err) + } + defer checkFileReader.Close() } - defer eng.Close() - - outputFilePath := filepath.Join(tempDir, fmt.Sprintf("%s.meta", engineImpl.name)) - fmt.Printf("output file path: %s\n", outputFilePath) - outputFile, err := os.Create(outputFilePath) + var outputFileWriter io.WriteCloser + outputFile := firstRunOutput + if firstRunExecuted { + outputFile = filepath.Join(innerTempDir, "output.meta") + } + var err error + outputFileWriter, err = os.Create(outputFile) if err != nil { t.Fatal(err) } - defer outputFile.Close() - - testRunner := metaTestRunner{ - ctx: ctx, - t: t, - w: outputFile, - seed: seed, - engine: eng, - } - - testRunner.init() - defer testRunner.closeAll() - if checkFile != nil { - testRunner.parseFileAndRun(checkFile) - } else { - // TODO(itsbilal): Make this configurable. - testRunner.generateAndRun(10000) + defer outputFileWriter.Close() + fmt.Printf("check file = %s\noutput file = %s\n", run.checkFile, outputFile) + engineRun := testRunForEngines{ + ctx: run.ctx, + t: t, + seed: run.seed, + restarts: run.restarts, + checkFile: checkFileReader, + outputFile: outputFileWriter, + engineSequence: engineSequence, } + runMetaTestForEngines(engineRun) + firstRunExecuted = true }) } } -// TestMeta runs the MVCC Metamorphic test suite. -func TestMeta(t *testing.T) { +// TestRocksPebbleEquivalence runs the MVCC Metamorphic test suite, and checks +// for matching outputs by the test suite between RocksDB and Pebble. +func TestRocksPebbleEquivalence(t *testing.T) { defer leaktest.AfterTest(t) ctx := context.Background() if util.RaceEnabled { @@ -124,23 +164,77 @@ func TestMeta(t *testing.T) { // Have one fixed seed, one user-specified seed, and one random seed. seeds := []int64{123, *seed, rand.Int63()} - if *check != "" { - t.Run("check", func(t *testing.T) { - if _, err := os.Stat(*check); os.IsNotExist(err) { - t.Fatal(err) - } - checkFile, err := os.Open(*check) - if err != nil { - t.Fatal(err) + for _, seed := range seeds { + t.Run(fmt.Sprintf("seed=%d", seed), func(t *testing.T) { + run := testRun{ + ctx: ctx, + t: t, + seed: seed, + restarts: false, + engineSequences: [][]engineImpl{ + {engineImplRocksDB}, + {engineImplPebble}, + }, } - defer checkFile.Close() - - runMetaTest(ctx, t, 0, checkFile) + runMetaTest(run) }) } +} + +// TestRocksPebbleRestarts runs the MVCC Metamorphic test suite with restarts +// enabled, and ensures that the output remains the same across different +// engine sequences with restarts in between. +func TestRocksPebbleRestarts(t *testing.T) { + defer leaktest.AfterTest(t) + ctx := context.Background() + if util.RaceEnabled { + // This test times out with the race detector enabled. + return + } + + // Have one fixed seed, one user-specified seed, and one random seed. + seeds := []int64{123, *seed, rand.Int63()} + for _, seed := range seeds { t.Run(fmt.Sprintf("seed=%d", seed), func(t *testing.T) { - runMetaTest(ctx, t, seed, nil) + run := testRun{ + ctx: ctx, + t: t, + seed: seed, + restarts: true, + engineSequences: [][]engineImpl{ + {engineImplRocksDB}, + {engineImplPebble}, + {engineImplRocksDB, engineImplPebble}, + }, + } + runMetaTest(run) }) } } + +// TestRocksPebbleCheck checks whether the output file specified with --check has +// matching behavior across rocks/pebble. +func TestRocksPebbleCheck(t *testing.T) { + defer leaktest.AfterTest(t) + ctx := context.Background() + + if *check != "" { + if _, err := os.Stat(*check); os.IsNotExist(err) { + t.Fatal(err) + } + + run := testRun{ + ctx: ctx, + t: t, + checkFile: *check, + restarts: true, + engineSequences: [][]engineImpl{ + {engineImplRocksDB}, + {engineImplPebble}, + {engineImplRocksDB, engineImplPebble}, + }, + } + runMetaTest(run) + } +} diff --git a/pkg/storage/engine/metamorphic/operands.go b/pkg/storage/engine/metamorphic/operands.go index 86ec6725d6e2..6746efdcb2e7 100644 --- a/pkg/storage/engine/metamorphic/operands.go +++ b/pkg/storage/engine/metamorphic/operands.go @@ -259,10 +259,24 @@ func (t *txnManager) clearBatch(batch engine.Batch) { } } +func (t *txnManager) trackWriteOnBatch(w engine.Writer, txn *roachpb.Transaction) { + if batch, ok := w.(engine.Batch); ok { + openBatches, ok := t.openBatches[txn] + if !ok { + t.openBatches[txn] = make(map[engine.Batch]struct{}) + openBatches = t.openBatches[txn] + } + openBatches[batch] = struct{}{} + } +} + func (t *txnManager) closeAll() { for _, txn := range t.liveTxns { t.close(txn) } + t.liveTxns = nil + t.txnIDMap = make(map[string]*roachpb.Transaction) + t.openBatches = make(map[*roachpb.Transaction]map[engine.Batch]struct{}) } func (t *txnManager) toString(op operand) string { @@ -353,7 +367,7 @@ func (t *testRunnerManager) get() operand { type readWriterManager struct { rng *rand.Rand - eng engine.Engine + m *metaTestRunner liveBatches []engine.Batch batchToIDMap map[engine.Batch]int batchCounter int @@ -364,14 +378,14 @@ var _ operandManager = &readWriterManager{} func (w *readWriterManager) get() operand { // 25% chance of returning the engine, even if there are live batches. if len(w.liveBatches) == 0 || w.rng.Float64() < 0.25 { - return w.eng + return w.m.engine } return w.liveBatches[w.rng.Intn(len(w.liveBatches))] } func (w *readWriterManager) open() engine.ReadWriter { - batch := w.eng.NewBatch() + batch := w.m.engine.NewBatch() w.batchCounter++ w.liveBatches = append(w.liveBatches, batch) w.batchToIDMap[batch] = w.batchCounter @@ -388,7 +402,7 @@ func (w *readWriterManager) count() int { func (w *readWriterManager) close(op operand) { // No-op if engine. - if op == w.eng { + if op == w.m.engine { return } @@ -413,7 +427,7 @@ func (w *readWriterManager) closeAll() { } func (w *readWriterManager) toString(op operand) string { - if w.eng == op { + if w.m.engine == op { return "engine" } return fmt.Sprintf("batch%d", w.batchToIDMap[op.(engine.Batch)]) @@ -421,7 +435,7 @@ func (w *readWriterManager) toString(op operand) string { func (w *readWriterManager) parse(input string) operand { if input == "engine" { - return w.eng + return w.m.engine } var id int @@ -512,6 +526,9 @@ func (i *iteratorManager) closeAll() { for iter := range i.iterToInfo { iter.Close() } + i.liveIters = nil + i.iterToInfo = make(map[engine.Iterator]iteratorInfo) + i.readerToIter = make(map[engine.Reader][]engine.Iterator) } func (i *iteratorManager) toString(op operand) string { diff --git a/pkg/storage/engine/metamorphic/operations.go b/pkg/storage/engine/metamorphic/operations.go index d80b0eaa548c..fcea123fa19f 100644 --- a/pkg/storage/engine/metamorphic/operations.go +++ b/pkg/storage/engine/metamorphic/operations.go @@ -14,7 +14,11 @@ import ( "context" "fmt" "math" + "os" + "path/filepath" + "sort" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -132,14 +136,10 @@ func printIterState(iter engine.Iterator) string { // List of operations, where each operation is defined as one instance of mvccOp. // // TODO(itsbilal): Add more missing MVCC operations, such as: -// - MVCCConditionalPut // - MVCCBlindPut // - MVCCMerge -// - MVCCClearTimeRange // - MVCCIncrement // - MVCCResolveWriteIntent in the aborted case -// - MVCCFindSplitKey -// - ingestions. // - and any others that would be important to test. var operations = []mvccOp{ { @@ -166,7 +166,7 @@ var operations = []mvccOp{ operandMVCCKey, operandPastTS, }, - weight: 10, + weight: 100, }, { name: "mvcc_get", @@ -189,7 +189,7 @@ var operations = []mvccOp{ operandMVCCKey, operandTransaction, }, - weight: 10, + weight: 100, }, { name: "mvcc_put", @@ -209,20 +209,73 @@ var operations = []mvccOp{ txn.IntentSpans = append(txn.IntentSpans, roachpb.Span{ Key: key.Key, }) - // If this write happened on a batch, track that in the txn manager so - // that the batch is committed before the transaction is aborted or - // committed. Note that this append happens without checking if this - // batch is already in the slice; readers of this slice need to be aware - // of duplicates. - if batch, ok := writer.(engine.Batch); ok { - txnManager := m.managers[operandTransaction].(*txnManager) - openBatches, ok := txnManager.openBatches[txn] - if !ok { - txnManager.openBatches[txn] = make(map[engine.Batch]struct{}) - openBatches = txnManager.openBatches[txn] - } - openBatches[batch] = struct{}{} + // Track this write in the txn manager. This ensures the batch will be + // committed before the transaction is committed + m.managers[operandTransaction].(*txnManager).trackWriteOnBatch(writer, txn) + return "ok" + }, + operands: []operandType{ + operandReadWriter, + operandMVCCKey, + operandValue, + operandTransaction, + }, + weight: 500, + }, + { + name: "mvcc_conditional_put", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + writer := args[0].(engine.ReadWriter) + key := args[1].(engine.MVCCKey) + value := roachpb.MakeValueFromBytes(args[2].([]byte)) + expVal := roachpb.MakeValueFromBytes(args[3].([]byte)) + txn := args[4].(*roachpb.Transaction) + txn.Sequence++ + + err := engine.MVCCConditionalPut(ctx, writer, nil, key.Key, txn.WriteTimestamp, value, &expVal, true, txn) + if err != nil { + return fmt.Sprintf("error: %s", err) + } + + // Update the txn's intent spans to account for this intent being written. + txn.IntentSpans = append(txn.IntentSpans, roachpb.Span{ + Key: key.Key, + }) + // Track this write in the txn manager. This ensures the batch will be + // committed before the transaction is committed + m.managers[operandTransaction].(*txnManager).trackWriteOnBatch(writer, txn) + return "ok" + }, + operands: []operandType{ + operandReadWriter, + operandMVCCKey, + operandValue, + operandValue, + operandTransaction, + }, + weight: 50, + }, + { + name: "mvcc_init_put", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + writer := args[0].(engine.ReadWriter) + key := args[1].(engine.MVCCKey) + value := roachpb.MakeValueFromBytes(args[2].([]byte)) + txn := args[3].(*roachpb.Transaction) + txn.Sequence++ + + err := engine.MVCCInitPut(ctx, writer, nil, key.Key, txn.WriteTimestamp, value, false, txn) + if err != nil { + return fmt.Sprintf("error: %s", err) } + + // Update the txn's intent spans to account for this intent being written. + txn.IntentSpans = append(txn.IntentSpans, roachpb.Span{ + Key: key.Key, + }) + // Track this write in the txn manager. This ensures the batch will be + // committed before the transaction is committed + m.managers[operandTransaction].(*txnManager).trackWriteOnBatch(writer, txn) return "ok" }, operands: []operandType{ @@ -231,7 +284,81 @@ var operations = []mvccOp{ operandValue, operandTransaction, }, - weight: 30, + weight: 50, + }, + { + name: "mvcc_delete_range", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + writer := args[0].(engine.ReadWriter) + key := args[1].(engine.MVCCKey).Key + endKey := args[2].(engine.MVCCKey).Key + txn := args[3].(*roachpb.Transaction) + txn.Sequence++ + + if endKey.Compare(key) < 0 { + key, endKey = endKey, key + } + + keys, _, _, err := engine.MVCCDeleteRange(ctx, writer, nil, key, endKey, 0, txn.WriteTimestamp, txn, true) + if err != nil { + return fmt.Sprintf("error: %s", err) + } + + // Update the txn's intent spans to account for this intent being written. + for _, key := range keys { + txn.IntentSpans = append(txn.IntentSpans, roachpb.Span{ + Key: key, + }) + } + // Track this write in the txn manager. This ensures the batch will be + // committed before the transaction is committed + m.managers[operandTransaction].(*txnManager).trackWriteOnBatch(writer, txn) + return fmt.Sprintf("keys = %v", keys) + }, + dependentOps: func(m *metaTestRunner, args ...operand) (results []opRun) { + return closeItersOnBatch(m, args[0].(engine.Reader)) + }, + operands: []operandType{ + operandReadWriter, + operandMVCCKey, + operandMVCCKey, + operandTransaction, + }, + weight: 20, + }, + { + name: "mvcc_clear_time_range", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + writer := args[0].(engine.ReadWriter) + key := args[1].(engine.MVCCKey).Key + endKey := args[2].(engine.MVCCKey).Key + startTime := args[3].(hlc.Timestamp) + endTime := args[3].(hlc.Timestamp) + + if endKey.Compare(key) < 0 { + key, endKey = endKey, key + } + if endTime.Less(startTime) { + startTime, endTime = endTime, startTime + } + + span, err := engine.MVCCClearTimeRange(ctx, writer, nil, key, endKey, startTime, endTime, math.MaxInt64) + if err != nil { + return fmt.Sprintf("error: %s", err) + } + return fmt.Sprintf("ok, span = %v", span) + }, + dependentOps: func(m *metaTestRunner, args ...operand) (results []opRun) { + return closeItersOnBatch(m, args[0].(engine.Reader)) + }, + operands: []operandType{ + operandReadWriter, + operandMVCCKey, + operandMVCCKey, + operandPastTS, + operandPastTS, + }, + weight: 20, }, { name: "mvcc_delete", @@ -250,18 +377,9 @@ var operations = []mvccOp{ txn.IntentSpans = append(txn.IntentSpans, roachpb.Span{ Key: key.Key, }) - // If this write happened on a batch, track that in the txn manager so - // that the batch is committed before the transaction is aborted or - // committed. - if batch, ok := writer.(engine.Batch); ok { - txnManager := m.managers[operandTransaction].(*txnManager) - openBatches, ok := txnManager.openBatches[txn] - if !ok { - txnManager.openBatches[txn] = make(map[engine.Batch]struct{}) - openBatches = txnManager.openBatches[txn] - } - openBatches[batch] = struct{}{} - } + // Track this write in the txn manager. This ensures the batch will be + // committed before the transaction is committed + m.managers[operandTransaction].(*txnManager).trackWriteOnBatch(writer, txn) return "ok" }, operands: []operandType{ @@ -269,7 +387,27 @@ var operations = []mvccOp{ operandMVCCKey, operandTransaction, }, - weight: 10, + weight: 100, + }, + { + name: "mvcc_find_split_key", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + key, _ := keys.Addr(args[0].(engine.MVCCKey).Key) + endKey, _ := keys.Addr(args[1].(engine.MVCCKey).Key) + splitSize := int64(1024) + + splitKey, err := engine.MVCCFindSplitKey(ctx, m.engine, key, endKey, splitSize) + if err != nil { + return fmt.Sprintf("error: %s", err) + } + + return fmt.Sprintf("ok, splitSize = %d, splitKey = %v", splitSize, splitKey) + }, + operands: []operandType{ + operandMVCCKey, + operandMVCCKey, + }, + weight: 20, }, { name: "mvcc_scan", @@ -281,7 +419,7 @@ var operations = []mvccOp{ operandMVCCKey, operandTransaction, }, - weight: 10, + weight: 100, }, { name: "mvcc_inconsistent_scan", @@ -293,7 +431,7 @@ var operations = []mvccOp{ operandMVCCKey, operandPastTS, }, - weight: 10, + weight: 100, }, { name: "mvcc_reverse_scan", @@ -305,7 +443,7 @@ var operations = []mvccOp{ operandMVCCKey, operandTransaction, }, - weight: 10, + weight: 100, }, { name: "txn_open", @@ -314,7 +452,7 @@ var operations = []mvccOp{ return m.managers[operandTransaction].toString(txn) }, operands: []operandType{}, - weight: 4, + weight: 40, }, { name: "txn_commit", @@ -343,7 +481,7 @@ var operations = []mvccOp{ operands: []operandType{ operandTransaction, }, - weight: 10, + weight: 100, }, { name: "batch_open", @@ -352,7 +490,7 @@ var operations = []mvccOp{ return m.managers[operandReadWriter].toString(batch) }, operands: []operandType{}, - weight: 4, + weight: 40, }, { name: "batch_commit", @@ -374,7 +512,7 @@ var operations = []mvccOp{ operands: []operandType{ operandReadWriter, }, - weight: 10, + weight: 100, }, { name: "iterator_open", @@ -408,7 +546,7 @@ var operations = []mvccOp{ operandMVCCKey, operandMVCCKey, }, - weight: 2, + weight: 20, }, { name: "iterator_close", @@ -421,7 +559,7 @@ var operations = []mvccOp{ operands: []operandType{ operandIterator, }, - weight: 5, + weight: 50, }, { name: "iterator_seekge", @@ -446,7 +584,7 @@ var operations = []mvccOp{ operandIterator, operandMVCCKey, }, - weight: 5, + weight: 50, }, { name: "iterator_seeklt", @@ -467,7 +605,7 @@ var operations = []mvccOp{ operandIterator, operandMVCCKey, }, - weight: 5, + weight: 50, }, { name: "iterator_next", @@ -488,7 +626,7 @@ var operations = []mvccOp{ operands: []operandType{ operandIterator, }, - weight: 10, + weight: 100, }, { name: "iterator_nextkey", @@ -509,7 +647,7 @@ var operations = []mvccOp{ operands: []operandType{ operandIterator, }, - weight: 10, + weight: 100, }, { name: "iterator_prev", @@ -533,7 +671,7 @@ var operations = []mvccOp{ operands: []operandType{ operandIterator, }, - weight: 10, + weight: 100, }, { // Note that this is not an MVCC* operation; unlike MVCC{Put,Get,Scan}, etc, @@ -541,9 +679,9 @@ var operations = []mvccOp{ // behavior. name: "delete_range", run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { - key := args[0].(engine.MVCCKey) - endKey := args[1].(engine.MVCCKey) - if endKey.Less(key) { + key := args[0].(engine.MVCCKey).Key + endKey := args[1].(engine.MVCCKey).Key + if endKey.Compare(key) < 0 { key, endKey = endKey, key } else if endKey.Equal(key) { // Range tombstones where start = end can exhibit different behavior on @@ -552,7 +690,9 @@ var operations = []mvccOp{ // standardize behavior. endKey = endKey.Next() } - err := m.engine.ClearRange(key, endKey) + // All ClearRange calls in Cockroach usually happen with metadata keys, so + // mimic the same behavior here. + err := m.engine.ClearRange(engine.MakeMVCCMetadataKey(key), engine.MakeMVCCMetadataKey(endKey)) if err != nil { return fmt.Sprintf("error: %s", err.Error()) } @@ -562,7 +702,7 @@ var operations = []mvccOp{ operandMVCCKey, operandMVCCKey, }, - weight: 2, + weight: 20, }, { name: "compact", @@ -582,6 +722,64 @@ var operations = []mvccOp{ operandMVCCKey, operandMVCCKey, }, - weight: 2, + weight: 10, + }, + { + name: "ingest", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + sstPath := filepath.Join(m.path, "ingest.sst") + f, err := os.Create(sstPath) + if err != nil { + return fmt.Sprintf("error = %s", err.Error()) + } + defer f.Close() + + var keys []engine.MVCCKey + for _, arg := range args { + keys = append(keys, arg.(engine.MVCCKey)) + } + // SST Writer expects keys in sorted order, so sort them first. + sort.Slice(keys, func(i, j int) bool { + return keys[i].Less(keys[j]) + }) + + sstWriter := engine.MakeIngestionSSTWriter(f) + for _, key := range keys { + _ = sstWriter.Put(key, []byte("ingested")) + } + if err := sstWriter.Finish(); err != nil { + return fmt.Sprintf("error = %s", err.Error()) + } + sstWriter.Close() + + if err := m.engine.IngestExternalFiles(ctx, []string{sstPath}); err != nil { + return fmt.Sprintf("error = %s", err.Error()) + } + + return "ok" + }, + operands: []operandType{ + operandMVCCKey, + operandMVCCKey, + operandMVCCKey, + operandMVCCKey, + operandMVCCKey, + }, + weight: 10, + }, + { + name: "restart", + run: func(ctx context.Context, m *metaTestRunner, args ...operand) string { + if !m.restarts { + m.printComment("no-op due to restarts being disabled") + return "ok" + } + + oldEngineName, newEngineName := m.restart() + m.printComment(fmt.Sprintf("restarting: %s -> %s", oldEngineName, newEngineName)) + return "ok" + }, + operands: nil, + weight: 4, }, }