From 55accddf88382d511aba2203604b44510a5ece5c Mon Sep 17 00:00:00 2001 From: Mira Radeva Date: Wed, 16 Aug 2023 14:05:51 -0400 Subject: [PATCH] kvnemesis: add support for savepoints This patch adds support for generating and validating savepoints in kvnemesis. Fixes: #97444 Release note: None --- pkg/kv/kvnemesis/BUILD.bazel | 1 + pkg/kv/kvnemesis/applier.go | 60 ++++++- pkg/kv/kvnemesis/applier_test.go | 18 +++ pkg/kv/kvnemesis/generator.go | 142 +++++++++++++++-- pkg/kv/kvnemesis/generator_test.go | 103 ++++++++++++ pkg/kv/kvnemesis/kvnemesis_test.go | 12 +- pkg/kv/kvnemesis/operations.go | 27 ++++ pkg/kv/kvnemesis/operations.proto | 18 +++ pkg/kv/kvnemesis/operations_test.go | 9 ++ .../TestApplier/txn-si-release-savepoint | 14 ++ .../TestApplier/txn-si-rollback-savepoint | 14 ++ .../testdata/TestApplier/txn-si-savepoint | 12 ++ .../TestApplier/txn-ssi-release-savepoint | 14 ++ .../TestApplier/txn-ssi-rollback-savepoint | 14 ++ .../testdata/TestApplier/txn-ssi-savepoint | 12 ++ .../kvnemesis/testdata/TestOperationsFormat/5 | 13 ++ .../TestValidate/nested_savepoint_release | 14 ++ .../one_deleterange_after_write_missing_write | 2 +- ...terange_before_write_returning_wrong_value | 2 +- .../TestValidate/one_savepoint_and_one_put | 9 ++ .../TestValidate/one_savepoint_and_release | 12 ++ .../TestValidate/one_savepoint_and_rollback | 13 ++ .../savepoint_release_and_rollback | 17 ++ .../TestValidate/savepoint_with_no_last_write | 10 ++ ...oint_with_no_last_write_and_existing_write | 13 ++ .../two_closed_nested_savepoint_rollbacks | 17 ++ .../two_nested_savepoint_rollbacks | 16 ++ .../two_non-nested_savepoints_rollbacks | 17 ++ pkg/kv/kvnemesis/validator.go | 133 ++++++++++++++-- pkg/kv/kvnemesis/validator_test.go | 149 ++++++++++++++++++ 30 files changed, 872 insertions(+), 35 deletions(-) create mode 100644 pkg/kv/kvnemesis/testdata/TestApplier/txn-si-release-savepoint create mode 100644 pkg/kv/kvnemesis/testdata/TestApplier/txn-si-rollback-savepoint create mode 100644 pkg/kv/kvnemesis/testdata/TestApplier/txn-si-savepoint create mode 100644 pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-release-savepoint create mode 100644 pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-rollback-savepoint create mode 100644 pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-savepoint create mode 100644 pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/nested_savepoint_release create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_one_put create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_release create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_rollback create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/savepoint_release_and_rollback create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/savepoint_with_no_last_write create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/savepoint_with_no_last_write_and_existing_write create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/two_closed_nested_savepoint_rollbacks create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/two_nested_savepoint_rollbacks create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/two_non-nested_savepoints_rollbacks diff --git a/pkg/kv/kvnemesis/BUILD.bazel b/pkg/kv/kvnemesis/BUILD.bazel index 9920f2d759e6..82ead6f1e8f1 100644 --- a/pkg/kv/kvnemesis/BUILD.bazel +++ b/pkg/kv/kvnemesis/BUILD.bazel @@ -54,6 +54,7 @@ go_library( "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_pebble//vfs", "@org_golang_google_protobuf//proto", + "@org_golang_x_exp//slices", ], ) diff --git a/pkg/kv/kvnemesis/applier.go b/pkg/kv/kvnemesis/applier.go index 6e6902d67f82..1ce6913333e1 100644 --- a/pkg/kv/kvnemesis/applier.go +++ b/pkg/kv/kvnemesis/applier.go @@ -134,7 +134,7 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) { *DeleteRangeOperation, *DeleteRangeUsingTombstoneOperation, *AddSSTableOperation: - applyClientOp(ctx, db, op, false) + applyClientOp(ctx, db, op, false /* inTxn */, nil /* spIDToToken */) case *SplitOperation: err := db.AdminSplit(ctx, o.Key, hlc.MaxTimestamp) o.Result = resultInit(ctx, err) @@ -169,6 +169,9 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) { retryOnAbort.Next() } savedTxn = txn + // A map of a savepoint id to the corresponding savepoint token that was + // created after applying the savepoint op. + spIDToToken := make(map[int]kv.SavepointToken) // First error. Because we need to mark everything that // we didn't "reach" due to a prior error with errOmitted, // we *don't* return eagerly on this but save it to the end. @@ -191,7 +194,7 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) { continue } - applyClientOp(ctx, txn, op, true) + applyClientOp(ctx, txn, op, true /* inTxn */, &spIDToToken) // The KV api disallows use of a txn after an operation on it errors. if r := op.Result(); r.Type == ResultType_Error { err = errors.DecodeError(ctx, *r.Err) @@ -234,6 +237,8 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) { o.Txn = savedTxn.TestingCloneTxn() o.Result.OptionalTimestamp = o.Txn.WriteTimestamp } + case *SavepointCreateOperation, *SavepointReleaseOperation, *SavepointRollbackOperation: + panic(errors.AssertionFailedf(`can't apply a savepoint operation %v outside of a ClosureTxnOperation`, o)) default: panic(errors.AssertionFailedf(`unknown operation type: %T %v`, o, o)) } @@ -286,7 +291,13 @@ func batchRun( return ts, nil } -func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) { +func applyClientOp( + ctx context.Context, + db clientI, + op *Operation, + inTxn bool, + spIDToToken *map[int]kv.SavepointToken, +) { switch o := op.GetValue().(type) { case *GetOperation: res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) { @@ -438,6 +449,49 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) { case *BatchOperation: b := &kv.Batch{} applyBatchOp(ctx, b, db.Run, o) + case *SavepointCreateOperation: + txn, ok := db.(*kv.Txn) // savepoints are only allowed with transactions + if !ok { + panic(errors.AssertionFailedf(`non-txn interface attempted to create a savepoint %v`, o)) + } + spt, err := txn.CreateSavepoint(ctx) + o.Result = resultInit(ctx, err) + if err != nil { + return + } + // Map the savepoint id to the newly created savepoint token. + if _, ok := (*spIDToToken)[int(o.ID)]; ok { + panic(errors.AssertionFailedf("applying a savepoint create op: ID %d already exists", o.ID)) + } + (*spIDToToken)[int(o.ID)] = spt + case *SavepointReleaseOperation: + txn, ok := db.(*kv.Txn) // savepoints are only allowed with transactions + if !ok { + panic(errors.AssertionFailedf(`non-txn interface attempted to release a savepoint %v`, o)) + } + spt, ok := (*spIDToToken)[int(o.ID)] + if !ok { + panic(errors.AssertionFailedf("applying a savepoint release op: ID %d does not exist", o.ID)) + } + err := txn.ReleaseSavepoint(ctx, spt) + o.Result = resultInit(ctx, err) + if err != nil { + return + } + case *SavepointRollbackOperation: + txn, ok := db.(*kv.Txn) // savepoints are only allowed with transactions + if !ok { + panic(errors.AssertionFailedf(`non-txn interface attempted to rollback a savepoint %v`, o)) + } + spt, ok := (*spIDToToken)[int(o.ID)] + if !ok { + panic(errors.AssertionFailedf("applying a savepoint rollback op: ID %d does not exist", o.ID)) + } + err := txn.RollbackToSavepoint(ctx, spt) + o.Result = resultInit(ctx, err) + if err != nil { + return + } default: panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, o, o)) } diff --git a/pkg/kv/kvnemesis/applier_test.go b/pkg/kv/kvnemesis/applier_test.go index 9ec13509d1f7..5f5f8a5ae96b 100644 --- a/pkg/kv/kvnemesis/applier_test.go +++ b/pkg/kv/kvnemesis/applier_test.go @@ -311,6 +311,24 @@ func TestApplier(t *testing.T) { { "change-replicas", step(changeReplicas(k1, kvpb.ReplicationChange{ChangeType: roachpb.ADD_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}})), }, + { + "txn-ssi-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5))), + }, + { + "txn-si-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5))), + }, + { + "txn-ssi-release-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5), releaseSavepoint(1), get(k5))), + }, + { + "txn-si-release-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5), releaseSavepoint(1), get(k5))), + }, + { + "txn-ssi-rollback-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5), rollbackSavepoint(1), get(k5))), + }, + { + "txn-si-rollback-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5), rollbackSavepoint(1), get(k5))), + }, } w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name())) diff --git a/pkg/kv/kvnemesis/generator.go b/pkg/kv/kvnemesis/generator.go index b16e8879bb6c..b690f5e51c7c 100644 --- a/pkg/kv/kvnemesis/generator.go +++ b/pkg/kv/kvnemesis/generator.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "golang.org/x/exp/slices" ) // GeneratorConfig contains all the tunable knobs necessary to run a Generator. @@ -95,6 +96,7 @@ type ClosureTxnConfig struct { // When CommitInBatch is selected, CommitBatchOps controls the composition of // the kv.Batch used. CommitBatchOps ClientOperationConfig + SavepointOps SavepointConfig } // ClientOperationConfig configures the relative probabilities of the @@ -321,6 +323,15 @@ type ChangeZoneConfig struct { ToggleGlobalReads int } +type SavepointConfig struct { + // SavepointCreate is an operation that creates a new savepoint with a given id. + SavepointCreate int + // SavepointRelease is an operation that releases a savepoint with a given id. + SavepointRelease int + // SavepointRollback is an operation that rolls back a savepoint with a given id. + SavepointRollback int +} + // newAllOperationsConfig returns a GeneratorConfig that exercises *all* // options. You probably want NewDefaultConfig. Most of the time, these will be // the same, but having both allows us to merge code for operations that do not @@ -378,6 +389,12 @@ func newAllOperationsConfig() GeneratorConfig { Batch: 4, Ops: clientOpConfig, } + // SavepointConfig is only relevant in ClosureTxnConfig. + savepointConfig := SavepointConfig{ + SavepointCreate: 1, + SavepointRelease: 1, + SavepointRollback: 1, + } return GeneratorConfig{Ops: OperationConfig{ DB: clientOpConfig, Batch: batchOpConfig, @@ -394,6 +411,7 @@ func newAllOperationsConfig() GeneratorConfig { TxnClientOps: clientOpConfig, TxnBatchOps: batchOpConfig, CommitBatchOps: clientOpConfig, + SavepointOps: savepointConfig, }, Split: SplitConfig{ SplitNew: 1, @@ -1351,23 +1369,23 @@ func (g *generator) registerClosureTxnOps(allowed *[]opGen, c *ClosureTxnConfig) const Commit, Rollback = ClosureTxnType_Commit, ClosureTxnType_Rollback const SSI, SI, RC = isolation.Serializable, isolation.Snapshot, isolation.ReadCommitted addOpGen(allowed, - makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.CommitSerializable) + makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSerializable) addOpGen(allowed, - makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.CommitSnapshot) + makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSnapshot) addOpGen(allowed, - makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.CommitReadCommitted) + makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitReadCommitted) addOpGen(allowed, - makeClosureTxn(Rollback, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.RollbackSerializable) + makeClosureTxn(Rollback, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackSerializable) addOpGen(allowed, - makeClosureTxn(Rollback, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.RollbackSnapshot) + makeClosureTxn(Rollback, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackSnapshot) addOpGen(allowed, - makeClosureTxn(Rollback, RC, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.RollbackReadCommitted) + makeClosureTxn(Rollback, RC, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackReadCommitted) addOpGen(allowed, - makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps), c.CommitSerializableInBatch) + makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSerializableInBatch) addOpGen(allowed, - makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps), c.CommitSnapshotInBatch) + makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSnapshotInBatch) addOpGen(allowed, - makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps), c.CommitReadCommittedInBatch) + makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitReadCommittedInBatch) } func makeClosureTxn( @@ -1376,16 +1394,33 @@ func makeClosureTxn( txnClientOps *ClientOperationConfig, txnBatchOps *BatchOperationConfig, commitInBatch *ClientOperationConfig, + savepointOps *SavepointConfig, ) opGenFunc { return func(g *generator, rng *rand.Rand) Operation { + // All allowed non-savepoint ops. These don't change as we iteratively + // select ops in the loop below. var allowed []opGen g.registerClientOps(&allowed, txnClientOps) g.registerBatchOps(&allowed, txnBatchOps) - const maxOps = 5 + const maxOps = 20 numOps := rng.Intn(maxOps + 1) ops := make([]Operation, numOps) + // Stack of savepoint indexes/ids. + // The last element of the slice is the top of the stack. + var spIDs []int for i := range ops { - ops[i] = g.selectOp(rng, allowed) + // In each iteration, we start with the allowed non-savepoint ops, + // and we add the valid savepoint ops in registerSavepointOps. + allowedIncludingSavepointOps := allowed + // Savepoints are registered on each iteration of the loop (unlike other + // ops that are registered once at the start) because depending on what + // savepoint ops are randomly selected in selectOp below, the set of + // allowed savepoint ops changes. See registerSavepointOps. + g.registerSavepointOps(&allowedIncludingSavepointOps, savepointOps, spIDs, i) + ops[i] = g.selectOp(rng, allowedIncludingSavepointOps) + // Now that a random op is selected, we may need to update the stack of + // existing savepoints. See maybeUpdateSavepoints. + maybeUpdateSavepoints(&spIDs, ops[i]) } op := closureTxn(txnType, iso, ops...) if commitInBatch != nil { @@ -1398,6 +1433,79 @@ func makeClosureTxn( } } +// registerSavepointOps assumes existingSp is the current stack of savepoints +// and uses it to register only valid savepoint ops. I.e. releasing or rolling +// back a savepoint that hasn't been created or has already been released or +// rolled back is not allowed. +func (g *generator) registerSavepointOps( + allowed *[]opGen, s *SavepointConfig, existingSp []int, idx int, +) { + // A savepoint creation is always a valid. The index of the op in the txn is + // used as its id. + addOpGen(allowed, makeSavepointCreate(idx), s.SavepointCreate) + // For each existing savepoint, a rollback and a release op are valid. + for _, id := range existingSp { + addOpGen(allowed, makeSavepointRelease(id), s.SavepointRelease) + addOpGen(allowed, makeSavepointRollback(id), s.SavepointRollback) + } +} + +func makeSavepointCreate(id int) opGenFunc { + return func(_ *generator, _ *rand.Rand) Operation { + return createSavepoint(id) + } +} + +func makeSavepointRelease(id int) opGenFunc { + return func(_ *generator, _ *rand.Rand) Operation { + return releaseSavepoint(id) + } +} + +func makeSavepointRollback(id int) opGenFunc { + return func(_ *generator, _ *rand.Rand) Operation { + return rollbackSavepoint(id) + } +} + +// maybeUpdateSavepoints modifies the slice of existing savepoints based on the +// previously selected op to either: (1) add a new savepoint if the previous op +// was SavepointCreateOperation, or (2) remove a suffix of savepoints if the +// previous op was SavepointReleaseOperation or SavepointRollbackOperation. +func maybeUpdateSavepoints(existingSp *[]int, prevOp Operation) { + switch op := prevOp.GetValue().(type) { + case *SavepointCreateOperation: + // If the previous selected op was a savepoint creation, add it to the stack + // of existing savepoints. + if slices.Index(*existingSp, int(op.ID)) != -1 { + panic(errors.AssertionFailedf(`generating a savepoint create op: ID %d already exists`, int(op.ID))) + } + *existingSp = append(*existingSp, int(op.ID)) + case *SavepointReleaseOperation: + // If the previous selected op was a savepoint release, remove it from the + // stack of existing savepoints, together with all other savepoint above it + // on the stack. E.g. if the existing savepoints are [1, 2, 3], releasing + // savepoint 2 means savepoint 3 also needs to be released. + index := slices.Index(*existingSp, int(op.ID)) + if index == -1 { + panic(errors.AssertionFailedf(`generating a savepoint release op: ID %d does not exist`, int(op.ID))) + } + *existingSp = (*existingSp)[:index] + case *SavepointRollbackOperation: + // If the previous selected op was a savepoint rollback, remove it from the + // stack of existing savepoints, together with all other savepoint above it + // on the stack. E.g. if the existing savepoints are [1, 2, 3], rolling back + // savepoint 2 means savepoint 3 also needs to be rolled back. + index := slices.Index(*existingSp, int(op.ID)) + if index == -1 { + panic(errors.AssertionFailedf(`generating a savepoint rollback op: ID %d does not exist`, int(op.ID))) + } + *existingSp = (*existingSp)[:index] + default: + // prevOp is not a savepoint operation, no need to adjust existingSp. + } +} + // fk stands for "from key", i.e. decode the uint64 the key represents. // Panics on error. func fk(k string) uint64 { @@ -1731,3 +1839,15 @@ func addSSTable( AsWrites: asWrites, }} } + +func createSavepoint(id int) Operation { + return Operation{SavepointCreate: &SavepointCreateOperation{ID: int32(id)}} +} + +func releaseSavepoint(id int) Operation { + return Operation{SavepointRelease: &SavepointReleaseOperation{ID: int32(id)}} +} + +func rollbackSavepoint(id int) Operation { + return Operation{SavepointRollback: &SavepointRollbackOperation{ID: int32(id)}} +} diff --git a/pkg/kv/kvnemesis/generator_test.go b/pkg/kv/kvnemesis/generator_test.go index 2dfefae87c08..a96eed0860d9 100644 --- a/pkg/kv/kvnemesis/generator_test.go +++ b/pkg/kv/kvnemesis/generator_test.go @@ -253,12 +253,27 @@ func TestRandStep(t *testing.T) { case *BatchOperation: batch.Batch++ countClientOps(&batch.Ops, nil, o.Ops...) + case *SavepointCreateOperation, *SavepointRollbackOperation, *SavepointReleaseOperation: + // We'll count these separately. default: t.Fatalf("%T", o) } } } + countSavepointOps := func(savepoint *SavepointConfig, ops ...Operation) { + for _, op := range ops { + switch op.GetValue().(type) { + case *SavepointCreateOperation: + savepoint.SavepointCreate++ + case *SavepointReleaseOperation: + savepoint.SavepointRelease++ + case *SavepointRollbackOperation: + savepoint.SavepointRollback++ + } + } + } + counts := OperationConfig{} for { step := g.RandStep(rng) @@ -274,6 +289,7 @@ func TestRandStep(t *testing.T) { countClientOps(&counts.DB, &counts.Batch, step.Op) case *ClosureTxnOperation: countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...) + countSavepointOps(&counts.ClosureTxn.SavepointOps, o.Ops...) if o.CommitInBatch != nil { switch o.IsoLevel { case isolation.Serializable: @@ -482,3 +498,90 @@ func TestRandDelRangeUsingTombstone(t *testing.T) { echotest.Require(t, buf.String(), datapathutils.TestDataPath(t, t.Name()+".txt")) } + +func TestUpdateSavepoints(t *testing.T) { + defer leaktest.AfterTest(t)() + + tests := []struct { + name string + savepoints []int + prevOp Operation + expectedSp []int + expectedErr string + }{ + { + name: "no savepoints (nil)", + savepoints: nil, + prevOp: get(k1), + expectedSp: nil, + }, + { + name: "no savepoints (empty)", + savepoints: []int{}, + prevOp: get(k1), + expectedSp: []int{}, + }, + { + name: "prevOp is not a savepoint", + savepoints: []int{0}, + prevOp: get(k1), + expectedSp: []int{0}, + }, + { + name: "prevOp is a savepoint create", + savepoints: nil, + prevOp: createSavepoint(2), + expectedSp: []int{2}, + }, + { + name: "prevOp is a savepoint release", + savepoints: []int{1}, + prevOp: releaseSavepoint(1), + expectedSp: []int{}, + }, + { + name: "prevOp is a savepoint rollback", + savepoints: []int{1}, + prevOp: rollbackSavepoint(1), + expectedSp: []int{}, + }, + { + name: "nested rollbacks", + savepoints: []int{1, 2, 3, 4}, + prevOp: rollbackSavepoint(2), + expectedSp: []int{1}, + }, + { + name: "nested releases", + savepoints: []int{1, 2, 3, 4}, + prevOp: releaseSavepoint(2), + expectedSp: []int{1}, + }, + { + name: "re-create existing savepoint", + savepoints: []int{1, 2, 3, 4}, + prevOp: createSavepoint(1), + expectedErr: "generating a savepoint create op: ID 1 already exists", + }, + { + name: "release non-existent savepoint", + savepoints: []int{1, 2, 3, 4}, + prevOp: releaseSavepoint(5), + expectedErr: "generating a savepoint release op: ID 5 does not exist", + }, + { + name: "roll back non-existent savepoint", + savepoints: []int{1, 2, 3, 4}, + prevOp: rollbackSavepoint(5), + expectedErr: "generating a savepoint rollback op: ID 5 does not exist", + }, + } + for _, test := range tests { + if test.expectedErr != "" { + require.PanicsWithError(t, test.expectedErr, func() { maybeUpdateSavepoints(&test.savepoints, test.prevOp) }) + } else { + maybeUpdateSavepoints(&test.savepoints, test.prevOp) + require.Equal(t, test.expectedSp, test.savepoints) + } + } +} diff --git a/pkg/kv/kvnemesis/kvnemesis_test.go b/pkg/kv/kvnemesis/kvnemesis_test.go index 8d20fa88d928..4a7799be674b 100644 --- a/pkg/kv/kvnemesis/kvnemesis_test.go +++ b/pkg/kv/kvnemesis/kvnemesis_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" @@ -44,7 +45,9 @@ import ( var defaultNumSteps = envutil.EnvOrDefaultInt("COCKROACH_KVNEMESIS_STEPS", 100) -func (cfg kvnemesisTestCfg) testClusterArgs(tr *SeqTracker) base.TestClusterArgs { +func (cfg kvnemesisTestCfg) testClusterArgs( + ctx context.Context, tr *SeqTracker, +) base.TestClusterArgs { storeKnobs := &kvserver.StoreTestingKnobs{ // Drop the clock MaxOffset to reduce commit-wait time for // transactions that write to global_read ranges. @@ -156,6 +159,10 @@ func (cfg kvnemesisTestCfg) testClusterArgs(tr *SeqTracker) base.TestClusterArgs } } + settings := cluster.MakeTestingClusterSettings() + // TODO(mira): Remove this cluster setting once the default is set to true. + kvcoord.KeepRefreshSpansOnSavepointRollback.Override(ctx, &settings.SV, true) + return base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ @@ -179,6 +186,7 @@ func (cfg kvnemesisTestCfg) testClusterArgs(tr *SeqTracker) base.TestClusterArgs }, }, }, + Settings: settings, }, } } @@ -321,7 +329,7 @@ func testKVNemesisImpl(t *testing.T, cfg kvnemesisTestCfg) { // 4 nodes so we have somewhere to move 3x replicated ranges to. ctx := context.Background() tr := &SeqTracker{} - tc := testcluster.StartTestCluster(t, cfg.numNodes, cfg.testClusterArgs(tr)) + tc := testcluster.StartTestCluster(t, cfg.numNodes, cfg.testClusterArgs(ctx, tr)) defer tc.Stopper().Stop(ctx) dbs, sqlDBs := make([]*kv.DB, cfg.numNodes), make([]*gosql.DB, cfg.numNodes) for i := 0; i < cfg.numNodes; i++ { diff --git a/pkg/kv/kvnemesis/operations.go b/pkg/kv/kvnemesis/operations.go index 036a59cbc86a..1ed357a36564 100644 --- a/pkg/kv/kvnemesis/operations.go +++ b/pkg/kv/kvnemesis/operations.go @@ -55,6 +55,12 @@ func (op Operation) Result() *Result { return &o.Result case *ClosureTxnOperation: return &o.Result + case *SavepointCreateOperation: + return &o.Result + case *SavepointReleaseOperation: + return &o.Result + case *SavepointRollbackOperation: + return &o.Result default: panic(errors.AssertionFailedf(`unknown operation: %T %v`, o, o)) } @@ -194,6 +200,12 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) { if o.Txn != nil { fmt.Fprintf(w, "\n%s// ^-- txnpb:(%s)", fctx.indent, o.Txn) } + case *SavepointCreateOperation: + o.format(w, fctx) + case *SavepointReleaseOperation: + o.format(w, fctx) + case *SavepointRollbackOperation: + o.format(w, fctx) default: fmt.Fprintf(w, "%v", op.GetValue()) } @@ -377,6 +389,21 @@ func (op ChangeZoneOperation) format(w *strings.Builder, fctx formatCtx) { op.Result.format(w) } +func (op SavepointCreateOperation) format(w *strings.Builder, fctx formatCtx) { + fmt.Fprintf(w, `%s.CreateSavepoint(ctx, %d)`, fctx.receiver, int(op.ID)) + op.Result.format(w) +} + +func (op SavepointReleaseOperation) format(w *strings.Builder, fctx formatCtx) { + fmt.Fprintf(w, `%s.ReleaseSavepoint(ctx, %d)`, fctx.receiver, int(op.ID)) + op.Result.format(w) +} + +func (op SavepointRollbackOperation) format(w *strings.Builder, fctx formatCtx) { + fmt.Fprintf(w, `%s.RollbackSavepoint(ctx, %d)`, fctx.receiver, int(op.ID)) + op.Result.format(w) +} + func (r Result) format(w *strings.Builder) { if r.Type == ResultType_Unknown { return diff --git a/pkg/kv/kvnemesis/operations.proto b/pkg/kv/kvnemesis/operations.proto index bb33a83627cf..cd980b3bdfdb 100644 --- a/pkg/kv/kvnemesis/operations.proto +++ b/pkg/kv/kvnemesis/operations.proto @@ -126,6 +126,21 @@ message ChangeZoneOperation { Result result = 2 [(gogoproto.nullable) = false]; } +message SavepointCreateOperation { + int32 id = 1 [(gogoproto.customname) = "ID"]; + Result result = 2 [(gogoproto.nullable) = false]; +} + +message SavepointReleaseOperation { + int32 id = 1 [(gogoproto.customname) = "ID"]; + Result result = 2 [(gogoproto.nullable) = false]; +} + +message SavepointRollbackOperation { + int32 id = 1 [(gogoproto.customname) = "ID"]; + Result result = 2 [(gogoproto.nullable) = false]; +} + message Operation { option (gogoproto.goproto_stringer) = false; option (gogoproto.onlyone) = true; @@ -150,6 +165,9 @@ message Operation { TransferLeaseOperation transfer_lease = 16; ChangeZoneOperation change_zone = 17; AddSSTableOperation add_sstable = 18 [(gogoproto.customname) = "AddSSTable"]; + SavepointCreateOperation savepoint_create = 19; + SavepointReleaseOperation savepoint_release = 20; + SavepointRollbackOperation savepoint_rollback = 21; } enum ResultType { diff --git a/pkg/kv/kvnemesis/operations_test.go b/pkg/kv/kvnemesis/operations_test.go index 5c24e2034477..1968dcbc94b3 100644 --- a/pkg/kv/kvnemesis/operations_test.go +++ b/pkg/kv/kvnemesis/operations_test.go @@ -87,6 +87,15 @@ func TestOperationsFormat(t *testing.T) { { step: step(addSSTable(sstFile.Data(), sstSpan, sstTS, sstValueHeader.KVNemesisSeq.Get(), true)), }, + { + step: step( + closureTxn(ClosureTxnType_Commit, + isolation.Serializable, + createSavepoint(0), put(k9, 3), releaseSavepoint(0), + get(k8), + createSavepoint(4), del(k9, 1), rollbackSavepoint(4), + )), + }, } w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name())) diff --git a/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-release-savepoint b/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-release-savepoint new file mode 100644 index 000000000000..98ba0a0de7a7 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-release-savepoint @@ -0,0 +1,14 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.Put(ctx, tk(5), sv(0)) // @ + txn.CreateSavepoint(ctx, 1) // + txn.Put(ctx, tk(5), sv(2)) // @ + txn.CreateSavepoint(ctx, 3) // + txn.Get(ctx, tk(5)) // @ (v2, ) + txn.ReleaseSavepoint(ctx, 1) // + txn.Get(ctx, tk(5)) // @ (v2, ) + return nil +}) // @ +// ^-- txnpb: diff --git a/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-rollback-savepoint b/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-rollback-savepoint new file mode 100644 index 000000000000..135d3c9f22df --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-rollback-savepoint @@ -0,0 +1,14 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.Put(ctx, tk(5), sv(0)) // @ + txn.CreateSavepoint(ctx, 1) // + txn.Put(ctx, tk(5), sv(2)) // @ + txn.CreateSavepoint(ctx, 3) // + txn.Get(ctx, tk(5)) // @ (v2, ) + txn.RollbackSavepoint(ctx, 1) // + txn.Get(ctx, tk(5)) // @ (v0, ) + return nil +}) // @ +// ^-- txnpb: diff --git a/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-savepoint b/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-savepoint new file mode 100644 index 000000000000..631107464d00 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-savepoint @@ -0,0 +1,12 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.Put(ctx, tk(5), sv(0)) // @ + txn.CreateSavepoint(ctx, 1) // + txn.Put(ctx, tk(5), sv(2)) // @ + txn.CreateSavepoint(ctx, 3) // + txn.Get(ctx, tk(5)) // @ (v2, ) + return nil +}) // @ +// ^-- txnpb: diff --git a/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-release-savepoint b/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-release-savepoint new file mode 100644 index 000000000000..6f6bd2f6b195 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-release-savepoint @@ -0,0 +1,14 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(5), sv(0)) // @ + txn.CreateSavepoint(ctx, 1) // + txn.Put(ctx, tk(5), sv(2)) // @ + txn.CreateSavepoint(ctx, 3) // + txn.Get(ctx, tk(5)) // @ (v2, ) + txn.ReleaseSavepoint(ctx, 1) // + txn.Get(ctx, tk(5)) // @ (v2, ) + return nil +}) // @ +// ^-- txnpb: diff --git a/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-rollback-savepoint b/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-rollback-savepoint new file mode 100644 index 000000000000..23e06d365c7f --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-rollback-savepoint @@ -0,0 +1,14 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(5), sv(0)) // @ + txn.CreateSavepoint(ctx, 1) // + txn.Put(ctx, tk(5), sv(2)) // @ + txn.CreateSavepoint(ctx, 3) // + txn.Get(ctx, tk(5)) // @ (v2, ) + txn.RollbackSavepoint(ctx, 1) // + txn.Get(ctx, tk(5)) // @ (v0, ) + return nil +}) // @ +// ^-- txnpb: diff --git a/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-savepoint b/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-savepoint new file mode 100644 index 000000000000..f5d56c4da873 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-savepoint @@ -0,0 +1,12 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(5), sv(0)) // @ + txn.CreateSavepoint(ctx, 1) // + txn.Put(ctx, tk(5), sv(2)) // @ + txn.CreateSavepoint(ctx, 3) // + txn.Get(ctx, tk(5)) // @ (v2, ) + return nil +}) // @ +// ^-- txnpb: diff --git a/pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 new file mode 100644 index 000000000000..712a19c13f35 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 @@ -0,0 +1,13 @@ +echo +---- +···db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { +··· txn.SetIsoLevel(isolation.Serializable) +··· txn.CreateSavepoint(ctx, 0) +··· txn.Put(ctx, tk(9), sv(3)) +··· txn.ReleaseSavepoint(ctx, 0) +··· txn.Get(ctx, tk(8)) +··· txn.CreateSavepoint(ctx, 4) +··· txn.Del(ctx, tk(9) /* @s1 */) +··· txn.RollbackSavepoint(ctx, 4) +··· return nil +···}) diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/nested_savepoint_release b/pkg/kv/kvnemesis/testdata/TestValidate/nested_savepoint_release new file mode 100644 index 000000000000..36f49513668c --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/nested_savepoint_release @@ -0,0 +1,14 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(1), sv(1)) // + txn.CreateSavepoint(ctx, 2) // + txn.Put(ctx, tk(1), sv(2)) // + txn.CreateSavepoint(ctx, 4) // + txn.Put(ctx, tk(1), sv(3)) // + txn.ReleaseSavepoint(ctx, 2) // + txn.Get(ctx, tk(1)) // (v3, ) + return nil +}) // @0.000000001,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s3 v3 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/one_deleterange_after_write_missing_write b/pkg/kv/kvnemesis/testdata/TestValidate/one_deleterange_after_write_missing_write index d33dc3f2e3f1..429162ec2cf2 100644 --- a/pkg/kv/kvnemesis/testdata/TestValidate/one_deleterange_after_write_missing_write +++ b/pkg/kv/kvnemesis/testdata/TestValidate/one_deleterange_after_write_missing_write @@ -7,4 +7,4 @@ db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return nil }) // @0.000000001,0 /Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 -committed serializable txn missing write at seq s2: [dr.d]/Table/100/"0000000000000001":missing->@s2 [dr.s]/Table/100/"000000000000000{1"-3"}:{gap:[, 0.000000001,0)}->[] +committed serializable txn missing write at seq s2: [dr.d]/Table/100/"0000000000000001":missing->@s2 [dr.s]/Table/100/"000000000000000{1"-3"}:{gap:[, 0.000000001,0),[0.000000001,0, )}->[] diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/one_deleterange_before_write_returning_wrong_value b/pkg/kv/kvnemesis/testdata/TestValidate/one_deleterange_before_write_returning_wrong_value index b2bfe819e6dc..7d3fe5c76362 100644 --- a/pkg/kv/kvnemesis/testdata/TestValidate/one_deleterange_before_write_returning_wrong_value +++ b/pkg/kv/kvnemesis/testdata/TestValidate/one_deleterange_before_write_returning_wrong_value @@ -3,4 +3,4 @@ echo db0.DelRange(ctx, tk(1), tk(3), true /* @s1 */) // @0.000000001,0 (/Table/100/"0000000000000001", ) db0.Put(ctx, tk(1), sv(2)) // @0.000000002,0 /Table/100/"0000000000000001"/0.000000002,0 @ s2 v2 -committed deleteRange missing write at seq s1: [dr.d]/Table/100/"0000000000000001":missing->@s1 [dr.s]/Table/100/"000000000000000{1"-3"}:{gap:[, 0.000000002,0)}->[] +committed deleteRange missing write at seq s1: [dr.d]/Table/100/"0000000000000001":missing->@s1 [dr.s]/Table/100/"000000000000000{1"-3"}:{gap:[, 0.000000001,0),[0.000000001,0, 0.000000002,0)}->[] diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_one_put b/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_one_put new file mode 100644 index 000000000000..cc0cb21fc03d --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_one_put @@ -0,0 +1,9 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(1), sv(1)) // + txn.CreateSavepoint(ctx, 2) // + return nil +}) // @0.000000001,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_release b/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_release new file mode 100644 index 000000000000..354c0ca5d377 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_release @@ -0,0 +1,12 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(1), sv(1)) // + txn.CreateSavepoint(ctx, 2) // + txn.Put(ctx, tk(1), sv(2)) // + txn.ReleaseSavepoint(ctx, 2) // + txn.Get(ctx, tk(1)) // (v2, ) + return nil +}) // @0.000000001,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s2 v2 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_rollback b/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_rollback new file mode 100644 index 000000000000..bd32c14f5663 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_rollback @@ -0,0 +1,13 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(1), sv(1)) // + txn.CreateSavepoint(ctx, 2) // + txn.Put(ctx, tk(1), sv(2)) // + txn.Get(ctx, tk(1)) // (v2, ) + txn.RollbackSavepoint(ctx, 2) // + txn.Get(ctx, tk(1)) // (v1, ) + return nil +}) // @0.000000001,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_release_and_rollback b/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_release_and_rollback new file mode 100644 index 000000000000..c81738d82100 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_release_and_rollback @@ -0,0 +1,17 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(1), sv(1)) // + txn.CreateSavepoint(ctx, 2) // + txn.Put(ctx, tk(1), sv(2)) // + txn.Get(ctx, tk(1)) // (v2, ) + txn.CreateSavepoint(ctx, 5) // + txn.Put(ctx, tk(1), sv(3)) // + txn.Get(ctx, tk(1)) // (v3, ) + txn.ReleaseSavepoint(ctx, 5) // + txn.RollbackSavepoint(ctx, 2) // + txn.Get(ctx, tk(1)) // (v1, ) + return nil +}) // @0.000000001,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_with_no_last_write b/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_with_no_last_write new file mode 100644 index 000000000000..a43c9fcb1d54 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_with_no_last_write @@ -0,0 +1,10 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.CreateSavepoint(ctx, 0) // + txn.Put(ctx, tk(1), sv(1)) // + txn.Get(ctx, tk(1)) // (v1, ) + txn.RollbackSavepoint(ctx, 0) // + return nil +}) // @0.000000001,0 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_with_no_last_write_and_existing_write b/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_with_no_last_write_and_existing_write new file mode 100644 index 000000000000..7e0db02d643d --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_with_no_last_write_and_existing_write @@ -0,0 +1,13 @@ +echo +---- +db0.Put(ctx, tk(1), sv(1)) // @0.000000001,0 +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.CreateSavepoint(ctx, 1) // + txn.Put(ctx, tk(1), sv(2)) // + txn.Get(ctx, tk(1)) // (v2, ) + txn.RollbackSavepoint(ctx, 1) // + txn.Get(ctx, tk(1)) // (v1, ) + return nil +}) // @0.000000002,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/two_closed_nested_savepoint_rollbacks b/pkg/kv/kvnemesis/testdata/TestValidate/two_closed_nested_savepoint_rollbacks new file mode 100644 index 000000000000..6b7f7b0e3959 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/two_closed_nested_savepoint_rollbacks @@ -0,0 +1,17 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(1), sv(1)) // + txn.CreateSavepoint(ctx, 2) // + txn.Put(ctx, tk(1), sv(2)) // + txn.Get(ctx, tk(1)) // (v2, ) + txn.CreateSavepoint(ctx, 5) // + txn.Put(ctx, tk(1), sv(3)) // + txn.Get(ctx, tk(1)) // (v3, ) + txn.RollbackSavepoint(ctx, 5) // + txn.RollbackSavepoint(ctx, 2) // + txn.Get(ctx, tk(1)) // (v1, ) + return nil +}) // @0.000000001,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/two_nested_savepoint_rollbacks b/pkg/kv/kvnemesis/testdata/TestValidate/two_nested_savepoint_rollbacks new file mode 100644 index 000000000000..7f7ac960b781 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/two_nested_savepoint_rollbacks @@ -0,0 +1,16 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(1), sv(1)) // + txn.CreateSavepoint(ctx, 2) // + txn.Put(ctx, tk(1), sv(2)) // + txn.Get(ctx, tk(1)) // (v2, ) + txn.CreateSavepoint(ctx, 5) // + txn.Put(ctx, tk(1), sv(3)) // + txn.Get(ctx, tk(1)) // (v3, ) + txn.RollbackSavepoint(ctx, 2) // + txn.Get(ctx, tk(1)) // (v1, ) + return nil +}) // @0.000000001,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/two_non-nested_savepoints_rollbacks b/pkg/kv/kvnemesis/testdata/TestValidate/two_non-nested_savepoints_rollbacks new file mode 100644 index 000000000000..ec3ab0b268a2 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/two_non-nested_savepoints_rollbacks @@ -0,0 +1,17 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.CreateSavepoint(ctx, 0) // + txn.Put(ctx, tk(1), sv(1)) // + txn.Get(ctx, tk(1)) // (v1, ) + txn.RollbackSavepoint(ctx, 0) // + txn.Put(ctx, tk(1), sv(2)) // + txn.CreateSavepoint(ctx, 5) // + txn.Put(ctx, tk(1), sv(3)) // + txn.Get(ctx, tk(1)) // (v3, ) + txn.RollbackSavepoint(ctx, 5) // + txn.Get(ctx, tk(1)) // (v2, ) + return nil +}) // @0.000000001,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s2 v2 diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index ab101a207e21..8563c8488207 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -13,7 +13,6 @@ package kvnemesis import ( "context" "fmt" - "reflect" "regexp" "sort" "strings" @@ -244,6 +243,21 @@ type observedScan struct { func (*observedScan) observedMarker() {} +type savepointType int + +const ( + create savepointType = iota + release + rollback +) + +type observedSavepoint struct { + ID int + Type savepointType +} + +func (*observedSavepoint) observedMarker() {} + type validator struct { kvs *Engine @@ -853,6 +867,24 @@ func (v *validator) processOp(op Operation) { case *ChangeZoneOperation: execTimestampStrictlyOptional = true v.failIfError(op, t.Result) // fail on all errors + case *SavepointCreateOperation: + sp := &observedSavepoint{ID: int(t.ID), Type: create} + v.curObservations = append(v.curObservations, sp) + // Don't fail on all errors because savepoints can be labeled with + // errOmitted if a previous op in the txn failed. + v.checkError(op, t.Result) + case *SavepointReleaseOperation: + sp := &observedSavepoint{ID: int(t.ID), Type: release} + v.curObservations = append(v.curObservations, sp) + // Don't fail on all errors because savepoints can be labeled with + // errOmitted if a previous op in the txn failed. + v.checkError(op, t.Result) + case *SavepointRollbackOperation: + sp := &observedSavepoint{ID: int(t.ID), Type: rollback} + v.curObservations = append(v.curObservations, sp) + // Don't fail on all errors because savepoints can be labeled with + // errOmitted if a previous op in the txn failed. + v.checkError(op, t.Result) default: panic(errors.AssertionFailedf(`unknown operation type: %T %v`, t, t)) } @@ -991,6 +1023,18 @@ func (v *validator) checkAtomicCommitted( // unit tested. lastWritesByIdx := map[int]struct{}{} var lastWrites roachpb.SpanGroup + // We will iterate over the observations in reverse order to find the last + // write; so we will encounter a savepoint rollback before the corresponding + // savepoint create. Assuming a rollback is preceded by a matching create + // (guaranteed by the generator), to identify the last write we need to ignore + // all writes that occur between a savepoint rollback and a corresponding + // savepoint create. + // rollbackSp keeps track of the current active rollback. + // rollbackSp = nil if no savepoint rollback has been encountered yet or any + // encountered rollback has been matched by a rollback create. + // rollbackSp = observedSavepoint{...} when the observedSavepoint object + // contains a rollback for which we haven't encountered a matching create yet. + var rollbackSp *observedSavepoint = nil for idx := len(txnObservations) - 1; idx >= 0; idx-- { observation := txnObservations[idx] switch o := observation.(type) { @@ -1022,19 +1066,10 @@ func (v *validator) checkAtomicCommitted( { var g roachpb.SpanGroup g.Add(lastWrites.Slice()...) - lastWrite = !g.Sub(sp) // if subtracting did nothing, it's a most recent write - if !lastWrite { - // Otherwise, add it back in, which should restore the old set. If it - // didn't, there was partial overlap, which shouldn't be possible. - g.Add(sp) - } - if then, now := lastWrites.Slice(), g.Slice(); !reflect.DeepEqual(then, now) { - v.failures = append(v.failures, - errors.AssertionFailedf("%s has write %q partially overlapping %+v; subtracting and re-adding gave %+v", atomicType, sp, then, now)) - return - } + // If subtracting did nothing and there are no active rollbacks, it's a + // most recent write. + lastWrite = !g.Sub(sp) && rollbackSp == nil } - if lastWrite { lastWritesByIdx[idx] = struct{}{} lastWrites.Add(sp) @@ -1066,9 +1101,33 @@ func (v *validator) checkAtomicCommitted( panic(err) } } + case *observedSavepoint: + + switch o.Type { + case create: + // Set rollbackSp to nil if this savepoint create matches the rolled + // back one recorded in rollbackSp. + if rollbackSp != nil && rollbackSp.ID == o.ID { + rollbackSp = nil + } + case release: + // Savepoint releases don't affect the last write. + case rollback: + // Update rollbackSp only if there is no active rollback (i.e. a + // rollback for which we haven't found a matching create). Otherwise, + // the active rollback recorded in rollbackSp subsumes the one we're + // seeing now. + if rollbackSp == nil { + rollbackSp = o + } + } } } + // A map from a savepoint id to a batch that represents the state of the + // txn at the time of the savepoint creation. + spIDToBatch := make(map[int]*pebble.Batch) + // Iterate through the observations, building up the snapshot visible at each // point in the atomic unit and filling in the valid read times (validating // them later, in a separate loop, for better errors). We also check that only @@ -1085,14 +1144,15 @@ func (v *validator) checkAtomicCommitted( // writeTS was populated above as the unique timestamp at which the writes became // visible. We know the operation had writes (we're looking at one now) and so // this operation has either materialized or is covered by a later one that did, - // and so we must have a timestamp here. We defer the failure to the next for - // loop, as we will have filled in the read timestamps at that time. + // and so we must have a timestamp here. + // The only exception is when all writes were rolled back by a savepoint + // rollback. In that case, we still need a writeTS to ensure reads within the txn + // can see those writes; the execTimestamp serves that purpose. if writeTS.IsEmpty() { - continue + writeTS = execTimestamp } _, isLastWrite := lastWritesByIdx[idx] - if !isLastWrite && o.Timestamp.IsSet() { failure = `committed txn overwritten key had write` break @@ -1136,11 +1196,36 @@ func (v *validator) checkAtomicCommitted( failure = `scan result not ordered correctly` } o.Valid = validScanTime(batch, o.Span, o.KVs, o.SkipLocked /* missingKeysValid */) + case *observedSavepoint: + switch o.Type { + case create: + // Clone the existing batch by creating a new batch and applying the + // existing batch state to it. + newBatch := v.kvs.kvs.NewIndexedBatch() + _ = newBatch.Apply(batch, nil) + if _, ok := spIDToBatch[o.ID]; ok { + panic(errors.AssertionFailedf("validating a savepoint create op: ID %d already exists", o.ID)) + } + spIDToBatch[o.ID] = newBatch + case release: + // Savepoint releases don't affect the validation. + case rollback: + if _, ok := spIDToBatch[o.ID]; !ok { + panic(errors.AssertionFailedf("validating a savepoint rollback op: ID %d does not exist", o.ID)) + } + // The new batch to use for validation. + batch = spIDToBatch[o.ID] + } default: panic(errors.AssertionFailedf(`unknown observedOp: %T %s`, observation, observation)) } } + // Close all batches in spIDToBatch. + for _, spBatch := range spIDToBatch { + _ = spBatch.Close() + } + validPossibilities := disjointTimeSpans{{Start: hlc.MinTimestamp, End: hlc.MaxTimestamp}} for idx, observation := range txnObservations { if failure != `` { @@ -1162,6 +1247,9 @@ func (v *validator) checkAtomicCommitted( opValid = o.ValidTimes case *observedScan: opValid = o.Valid.Combined() + case *observedSavepoint: + // A savepoint is always valid. + opValid = disjointTimeSpans{{Start: hlc.MinTimestamp, End: hlc.MaxTimestamp}} default: panic(errors.AssertionFailedf(`unknown observedOp: %T %s`, observation, observation)) } @@ -1241,6 +1329,7 @@ func (v *validator) checkAtomicUncommitted(atomicType string, txnObservations [] case *observedScan: // TODO(dan): Figure out what we can assert about reads in an uncommitted // transaction. + case *observedSavepoint: default: panic(errors.AssertionFailedf(`unknown observedOp: %T %s`, observed, observed)) } @@ -1633,6 +1722,16 @@ func printObserved(observedOps ...observedOp) string { } fmt.Fprintf(&buf, "[%s]%s:%s->[%s]", opCode, o.Span, o.Valid, kvs.String()) + case *observedSavepoint: + opCode := "sp" + if o.Type == create { + opCode += "(create)" + } else if o.Type == release { + opCode += "(release)" + } else if o.Type == rollback { + opCode += "(rollback)" + } + fmt.Fprintf(&buf, "[%s]id:%d", opCode, o.ID) default: panic(errors.AssertionFailedf(`unknown observedOp: %T %s`, observed, observed)) } diff --git a/pkg/kv/kvnemesis/validator_test.go b/pkg/kv/kvnemesis/validator_test.go index 9e22178afaf8..f0100c6c7ede 100644 --- a/pkg/kv/kvnemesis/validator_test.go +++ b/pkg/kv/kvnemesis/validator_test.go @@ -2198,6 +2198,155 @@ func TestValidate(t *testing.T) { }, kvs: kvs(kv(k1, t1, s1), kv(k1, t3, s4), kv(k2, t2, s2), kv(k3, t1, s3), kv(k3, t3, s4)), }, + { + name: "one savepoint and one put", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(put(k1, s1)), + withResult(createSavepoint(2)), + ), t1)), + }, + kvs: kvs(kv(k1, t1, s1)), + }, + { + name: "one savepoint and release", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(put(k1, s1)), + withResult(createSavepoint(2)), + withResult(put(k1, s2)), + withResult(releaseSavepoint(2)), + withReadResult(get(k1), v2), + ), t1)), + }, + kvs: kvs(kv(k1, t1, s2)), + }, + { + name: "nested savepoint release", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(put(k1, s1)), + withResult(createSavepoint(2)), + withResult(put(k1, s2)), + withResult(createSavepoint(4)), + withResult(put(k1, s3)), + withResult(releaseSavepoint(2)), + withReadResult(get(k1), v3), + ), t1)), + }, + kvs: kvs(kv(k1, t1, s3)), + }, + { + name: "one savepoint and rollback", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(put(k1, s1)), + withResult(createSavepoint(2)), + withResult(put(k1, s2)), + withReadResult(get(k1), v2), + withResult(rollbackSavepoint(2)), + withReadResult(get(k1), v1), + ), t1)), + }, + kvs: kvs(kv(k1, t1, s1)), + }, + { + name: "two nested savepoint rollbacks", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(put(k1, s1)), + withResult(createSavepoint(2)), + withResult(put(k1, s2)), + withReadResult(get(k1), v2), + withResult(createSavepoint(5)), + withResult(put(k1, s3)), + withReadResult(get(k1), v3), + withResult(rollbackSavepoint(2)), + withReadResult(get(k1), v1), + ), t1)), + }, + kvs: kvs(kv(k1, t1, s1)), + }, + { + name: "two closed nested savepoint rollbacks", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(put(k1, s1)), + withResult(createSavepoint(2)), + withResult(put(k1, s2)), + withReadResult(get(k1), v2), + withResult(createSavepoint(5)), + withResult(put(k1, s3)), + withReadResult(get(k1), v3), + withResult(rollbackSavepoint(5)), + withResult(rollbackSavepoint(2)), + withReadResult(get(k1), v1), + ), t1)), + }, + kvs: kvs(kv(k1, t1, s1)), + }, + { + name: "two non-nested savepoints rollbacks", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(createSavepoint(0)), + withResult(put(k1, s1)), + withReadResult(get(k1), v1), + withResult(rollbackSavepoint(0)), + withResult(put(k1, s2)), + withResult(createSavepoint(5)), + withResult(put(k1, s3)), + withReadResult(get(k1), v3), + withResult(rollbackSavepoint(5)), + withReadResult(get(k1), v2), + ), t1)), + }, + kvs: kvs(kv(k1, t1, s2)), + }, + { + name: "savepoint release and rollback", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(put(k1, s1)), + withResult(createSavepoint(2)), + withResult(put(k1, s2)), + withReadResult(get(k1), v2), + withResult(createSavepoint(5)), + withResult(put(k1, s3)), + withReadResult(get(k1), v3), + withResult(releaseSavepoint(5)), + withResult(rollbackSavepoint(2)), + withReadResult(get(k1), v1), + ), t1)), + }, + kvs: kvs(kv(k1, t1, s1)), + }, + { + name: "savepoint with no last write", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(createSavepoint(0)), + withResult(put(k1, s1)), + withReadResult(get(k1), v1), + withResult(rollbackSavepoint(0)), + ), t1)), + }, + kvs: nil, + }, + { + name: "savepoint with no last write and existing write", + steps: []Step{ + step(withResultTS(put(k1, s1), t1)), + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(createSavepoint(1)), + withResult(put(k1, s2)), + withReadResult(get(k1), v2), + withResult(rollbackSavepoint(1)), + withReadResult(get(k1), v1), + ), t2)), + }, + kvs: kvs(kv(k1, t1, s1)), + }, } w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name()))