diff --git a/pkg/kv/kvnemesis/BUILD.bazel b/pkg/kv/kvnemesis/BUILD.bazel index 2a315e896139..62b240b19401 100644 --- a/pkg/kv/kvnemesis/BUILD.bazel +++ b/pkg/kv/kvnemesis/BUILD.bazel @@ -52,6 +52,7 @@ go_library( "@com_github_cockroachdb_pebble//:pebble", "@com_github_cockroachdb_pebble//vfs", "@org_golang_google_protobuf//proto", + "@org_golang_x_exp//slices", ], ) diff --git a/pkg/kv/kvnemesis/applier.go b/pkg/kv/kvnemesis/applier.go index 0b85f3dd78b6..b140b885fe02 100644 --- a/pkg/kv/kvnemesis/applier.go +++ b/pkg/kv/kvnemesis/applier.go @@ -120,7 +120,7 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) { *DeleteRangeOperation, *DeleteRangeUsingTombstoneOperation, *AddSSTableOperation: - applyClientOp(ctx, db, op, false) + applyClientOp(ctx, db, op, false, nil) case *SplitOperation: err := db.AdminSplit(ctx, o.Key, hlc.MaxTimestamp) o.Result = resultInit(ctx, err) @@ -155,6 +155,7 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) { retryOnAbort.Next() } savedTxn = txn + savepoints := make(map[int]kv.SavepointToken) // First error. Because we need to mark everything that // we didn't "reach" due to a prior error with errOmitted, // we *don't* return eagerly on this but save it to the end. @@ -177,7 +178,7 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) { continue } - applyClientOp(ctx, txn, op, true) + applyClientOp(ctx, txn, op, true, &savepoints) // The KV api disallows use of a txn after an operation on it errors. if r := op.Result(); r.Type == ResultType_Error { err = errors.DecodeError(ctx, *r.Err) @@ -269,7 +270,9 @@ func batchRun( return ts, nil } -func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) { +func applyClientOp( + ctx context.Context, db clientI, op *Operation, inTxn bool, savepoints *map[int]kv.SavepointToken, +) { switch o := op.GetValue().(type) { case *GetOperation: fn := (*kv.Batch).Get @@ -403,6 +406,48 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) { case *BatchOperation: b := &kv.Batch{} applyBatchOp(ctx, b, db.Run, o) + case *SavepointOperation: + // db is a Txn because savepoints are allowed only within transactions. + txn, ok := db.(*kv.Txn) + if !ok { + panic(errors.AssertionFailedf(`non-txn interface attempted to create a savepoint %v`, o)) + } + spt, err := txn.CreateSavepoint(ctx) + o.Result = resultInit(ctx, err) + if err != nil { + return + } + (*savepoints)[int(o.SeqNum)] = spt + case *SavepointReleaseOperation: + // db is a Txn because savepoints are allowed only within transactions. + txn, ok := db.(*kv.Txn) + if !ok { + panic(errors.AssertionFailedf(`non-txn interface attempted to release a savepoint %v`, o)) + } + spt, ok := (*savepoints)[int(o.SeqNum)] + if !ok { + panic(errors.AssertionFailedf("savepoint sequence number %d does not exist", o.SeqNum)) + } + err := txn.ReleaseSavepoint(ctx, spt) + o.Result = resultInit(ctx, err) + if err != nil { + return + } + case *SavepointRollbackOperation: + // db is a Txn because savepoints are allowed only within transactions. + txn, ok := db.(*kv.Txn) + if !ok { + panic(errors.AssertionFailedf(`non-txn interface attempted to rollback a savepoint %v`, o)) + } + spt, ok := (*savepoints)[int(o.SeqNum)] + if !ok { + panic(errors.AssertionFailedf("savepoint sequence number %d does not exist", o.SeqNum)) + } + err := txn.RollbackToSavepoint(ctx, spt) + o.Result = resultInit(ctx, err) + if err != nil { + return + } default: panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, o, o)) } diff --git a/pkg/kv/kvnemesis/applier_test.go b/pkg/kv/kvnemesis/applier_test.go index 05331c3bc334..3d7efb6f85d0 100644 --- a/pkg/kv/kvnemesis/applier_test.go +++ b/pkg/kv/kvnemesis/applier_test.go @@ -217,6 +217,24 @@ func TestApplier(t *testing.T) { { "change-replicas", step(changeReplicas(k1, kvpb.ReplicationChange{ChangeType: roachpb.ADD_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}})), }, + { + "txn-ssi-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5))), + }, + { + "txn-si-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5))), + }, + { + "txn-ssi-release-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5), releaseSavepoint(1), get(k5))), + }, + { + "txn-si-release-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5), releaseSavepoint(1), get(k5))), + }, + { + "txn-ssi-rollback-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5), rollbackSavepoint(1), get(k5))), + }, + { + "txn-si-rollback-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5), rollbackSavepoint(1), get(k5))), + }, } w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name())) diff --git a/pkg/kv/kvnemesis/generator.go b/pkg/kv/kvnemesis/generator.go index dc6f4d97610e..14ef9017b595 100644 --- a/pkg/kv/kvnemesis/generator.go +++ b/pkg/kv/kvnemesis/generator.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "golang.org/x/exp/slices" ) // GeneratorConfig contains all the tunable knobs necessary to run a Generator. @@ -95,6 +96,7 @@ type ClosureTxnConfig struct { // When CommitInBatch is selected, CommitBatchOps controls the composition of // the kv.Batch used. CommitBatchOps ClientOperationConfig + SavepointOps SavepointConfig } // ClientOperationConfig configures the relative probabilities of the @@ -221,6 +223,15 @@ type ChangeZoneConfig struct { ToggleGlobalReads int } +type SavepointConfig struct { + // Savepoint is an operation that creates a new savepoint with a given name. + Savepoint int + // SavepointRelease is an operation that releases a savepoint with a given name. + SavepointRelease int + // SavepointRollback is an operation that rolls back a savepoint with a given name. + SavepointRollback int +} + // newAllOperationsConfig returns a GeneratorConfig that exercises *all* // options. You probably want NewDefaultConfig. Most of the time, these will be // the same, but having both allows us to merge code for operations that do not @@ -253,6 +264,11 @@ func newAllOperationsConfig() GeneratorConfig { Batch: 4, Ops: clientOpConfig, } + savepointConfig := SavepointConfig{ + Savepoint: 1, + SavepointRelease: 1, + SavepointRollback: 1, + } return GeneratorConfig{Ops: OperationConfig{ DB: clientOpConfig, Batch: batchOpConfig, @@ -269,6 +285,7 @@ func newAllOperationsConfig() GeneratorConfig { TxnClientOps: clientOpConfig, TxnBatchOps: batchOpConfig, CommitBatchOps: clientOpConfig, + SavepointOps: savepointConfig, }, Split: SplitConfig{ SplitNew: 1, @@ -974,23 +991,23 @@ func (g *generator) registerClosureTxnOps(allowed *[]opGen, c *ClosureTxnConfig) const Commit, Rollback = ClosureTxnType_Commit, ClosureTxnType_Rollback const SSI, SI, RC = isolation.Serializable, isolation.Snapshot, isolation.ReadCommitted addOpGen(allowed, - makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.CommitSerializable) + makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSerializable) addOpGen(allowed, - makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.CommitSnapshot) + makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, nil, &c.SavepointOps /* commitInBatch*/), c.CommitSnapshot) addOpGen(allowed, - makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.CommitReadCommitted) + makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, nil, &c.SavepointOps /* commitInBatch*/), c.CommitReadCommitted) addOpGen(allowed, - makeClosureTxn(Rollback, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.RollbackSerializable) + makeClosureTxn(Rollback, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil, &c.SavepointOps /* commitInBatch*/), c.RollbackSerializable) addOpGen(allowed, - makeClosureTxn(Rollback, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.RollbackSnapshot) + makeClosureTxn(Rollback, SI, &c.TxnClientOps, &c.TxnBatchOps, nil, &c.SavepointOps /* commitInBatch*/), c.RollbackSnapshot) addOpGen(allowed, - makeClosureTxn(Rollback, RC, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.RollbackReadCommitted) + makeClosureTxn(Rollback, RC, &c.TxnClientOps, &c.TxnBatchOps, nil, &c.SavepointOps /* commitInBatch*/), c.RollbackReadCommitted) addOpGen(allowed, - makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps), c.CommitSerializableInBatch) + makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSerializableInBatch) addOpGen(allowed, - makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps), c.CommitSnapshotInBatch) + makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSnapshotInBatch) addOpGen(allowed, - makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps), c.CommitReadCommittedInBatch) + makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitReadCommittedInBatch) } func makeClosureTxn( @@ -999,15 +1016,20 @@ func makeClosureTxn( txnClientOps *ClientOperationConfig, txnBatchOps *BatchOperationConfig, commitInBatch *ClientOperationConfig, + savepointOps *SavepointConfig, ) opGenFunc { return func(g *generator, rng *rand.Rand) Operation { var allowed []opGen g.registerClientOps(&allowed, txnClientOps) g.registerBatchOps(&allowed, txnBatchOps) - numOps := rng.Intn(4) + numOps := rng.Intn(10) ops := make([]Operation, numOps) + var savepoints []int for i := range ops { - ops[i] = g.selectOp(rng, allowed) + allowedSp := allowed + g.registerSavepointOps(&allowedSp, savepointOps, savepoints, i) + ops[i] = g.selectOp(rng, allowedSp) + updateSavepoints(&savepoints, ops[i]) } op := closureTxn(txnType, iso, ops...) if commitInBatch != nil { @@ -1020,6 +1042,56 @@ func makeClosureTxn( } } +func (g *generator) registerSavepointOps( + allowed *[]opGen, s *SavepointConfig, existingSp []int, seqNum int, +) { + addOpGen(allowed, makeSavepoint(seqNum), s.Savepoint) + for _, sn := range existingSp { + addOpGen(allowed, makeRollbackSavepoint(sn), s.SavepointRollback) + addOpGen(allowed, makeReleaseSavepoint(sn), s.SavepointRelease) + } +} + +func makeSavepoint(seqNum int) opGenFunc { + return func(_ *generator, _ *rand.Rand) Operation { + return createSavepoint(seqNum) + } +} + +func makeReleaseSavepoint(seqNum int) opGenFunc { + return func(_ *generator, _ *rand.Rand) Operation { + return releaseSavepoint(seqNum) + } +} + +func makeRollbackSavepoint(seqNum int) opGenFunc { + return func(_ *generator, _ *rand.Rand) Operation { + return rollbackSavepoint(seqNum) + } +} + +// Based on the previously selected op, updateSavepoints modifies the slice of +// existing savepoints to either: (1) add a new savepoint if the previous op was +// SavepointOperation, or (2) go back and remove a suffix of savepoints if the +// previous operation was SavepointReleaseOperation or SavepointRollbackOperation. +// The savepoints slice can be thought of as a stack where the end of the slice +// is the top of the stack. +func updateSavepoints(savepoints *[]int, prevOp Operation) { + switch op := prevOp.GetValue().(type) { + case *SavepointOperation: + *savepoints = append(*savepoints, int(op.SeqNum)) + case *SavepointReleaseOperation: + index := slices.Index(*savepoints, int(op.SeqNum)) + *savepoints = (*savepoints)[:index] + case *SavepointRollbackOperation: + index := slices.Index(*savepoints, int(op.SeqNum)) + *savepoints = (*savepoints)[:index] + default: + // prevOp is not a savepoint operation. + return + } +} + // fk stands for "from key", i.e. decode the uint64 the key represents. // Panics on error. func fk(k string) uint64 { @@ -1269,3 +1341,15 @@ func addSSTable( AsWrites: asWrites, }} } + +func createSavepoint(seqNum int) Operation { + return Operation{Savepoint: &SavepointOperation{SeqNum: int32(seqNum)}} +} + +func releaseSavepoint(seqNum int) Operation { + return Operation{SavepointRelease: &SavepointReleaseOperation{SeqNum: int32(seqNum)}} +} + +func rollbackSavepoint(seqNum int) Operation { + return Operation{SavepointRollback: &SavepointRollbackOperation{SeqNum: int32(seqNum)}} +} diff --git a/pkg/kv/kvnemesis/generator_test.go b/pkg/kv/kvnemesis/generator_test.go index f23dde12715e..92ba06c61c63 100644 --- a/pkg/kv/kvnemesis/generator_test.go +++ b/pkg/kv/kvnemesis/generator_test.go @@ -171,12 +171,27 @@ func TestRandStep(t *testing.T) { case *BatchOperation: batch.Batch++ countClientOps(&batch.Ops, nil, o.Ops...) + case *SavepointOperation, *SavepointRollbackOperation, *SavepointReleaseOperation: + // We'll count these separately. default: t.Fatalf("%T", o) } } } + countSavepointOps := func(savepoint *SavepointConfig, ops ...Operation) { + for _, op := range ops { + switch op.GetValue().(type) { + case *SavepointOperation: + savepoint.Savepoint++ + case *SavepointReleaseOperation: + savepoint.SavepointRelease++ + case *SavepointRollbackOperation: + savepoint.SavepointRollback++ + } + } + } + counts := OperationConfig{} for { step := g.RandStep(rng) @@ -192,6 +207,7 @@ func TestRandStep(t *testing.T) { countClientOps(&counts.DB, &counts.Batch, step.Op) case *ClosureTxnOperation: countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...) + countSavepointOps(&counts.ClosureTxn.SavepointOps, o.Ops...) if o.CommitInBatch != nil { switch o.IsoLevel { case isolation.Serializable: @@ -400,3 +416,67 @@ func TestRandDelRangeUsingTombstone(t *testing.T) { echotest.Require(t, buf.String(), datapathutils.TestDataPath(t, t.Name()+".txt")) } + +func TestUpdateSavepoints(t *testing.T) { + defer leaktest.AfterTest(t)() + + tests := []struct { + name string + savepoints []int + prevOp Operation + expectedSp []int + }{ + { + name: "no savepoints (nil)", + savepoints: nil, + prevOp: get(k1), + expectedSp: nil, + }, + { + name: "no savepoints (empty)", + savepoints: []int{}, + prevOp: get(k1), + expectedSp: []int{}, + }, + { + name: "prevOp is not a savepoint", + savepoints: []int{0}, + prevOp: get(k1), + expectedSp: []int{0}, + }, + { + name: "prevOp is a savepoint create", + savepoints: nil, + prevOp: createSavepoint(2), + expectedSp: []int{2}, + }, + { + name: "prevOp is a savepoint release", + savepoints: []int{1}, + prevOp: releaseSavepoint(1), + expectedSp: []int{}, + }, + { + name: "prevOp is a savepoint rollback", + savepoints: []int{1}, + prevOp: rollbackSavepoint(1), + expectedSp: []int{}, + }, + { + name: "nested rollbacks", + savepoints: []int{1, 2, 3, 4}, + prevOp: rollbackSavepoint(2), + expectedSp: []int{1}, + }, + { + name: "nested releases", + savepoints: []int{1, 2, 3, 4}, + prevOp: releaseSavepoint(2), + expectedSp: []int{1}, + }, + } + for _, test := range tests { + updateSavepoints(&test.savepoints, test.prevOp) + require.Equal(t, test.expectedSp, test.savepoints) + } +} diff --git a/pkg/kv/kvnemesis/operations.go b/pkg/kv/kvnemesis/operations.go index a44a908c1bd6..27a66510b7a6 100644 --- a/pkg/kv/kvnemesis/operations.go +++ b/pkg/kv/kvnemesis/operations.go @@ -55,6 +55,12 @@ func (op Operation) Result() *Result { return &o.Result case *ClosureTxnOperation: return &o.Result + case *SavepointOperation: + return &o.Result + case *SavepointReleaseOperation: + return &o.Result + case *SavepointRollbackOperation: + return &o.Result default: panic(errors.AssertionFailedf(`unknown operation: %T %v`, o, o)) } @@ -194,6 +200,12 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) { if o.Txn != nil { fmt.Fprintf(w, "\n%s// ^-- txnpb:(%s)", fctx.indent, o.Txn) } + case *SavepointOperation: + o.format(w, fctx) + case *SavepointReleaseOperation: + o.format(w, fctx) + case *SavepointRollbackOperation: + o.format(w, fctx) default: fmt.Fprintf(w, "%v", op.GetValue()) } @@ -365,6 +377,21 @@ func (op ChangeZoneOperation) format(w *strings.Builder, fctx formatCtx) { op.Result.format(w) } +func (op SavepointOperation) format(w *strings.Builder, fctx formatCtx) { + fmt.Fprintf(w, `%s.CreateSavepoint(ctx, %d)`, fctx.receiver, int(op.SeqNum)) + op.Result.format(w) +} + +func (op SavepointReleaseOperation) format(w *strings.Builder, fctx formatCtx) { + fmt.Fprintf(w, `%s.ReleaseSavepoint(ctx, %d)`, fctx.receiver, int(op.SeqNum)) + op.Result.format(w) +} + +func (op SavepointRollbackOperation) format(w *strings.Builder, fctx formatCtx) { + fmt.Fprintf(w, `%s.RollbackSavepoint(ctx, %d)`, fctx.receiver, int(op.SeqNum)) + op.Result.format(w) +} + func (r Result) format(w *strings.Builder) { if r.Type == ResultType_Unknown { return diff --git a/pkg/kv/kvnemesis/operations.proto b/pkg/kv/kvnemesis/operations.proto index aab6a736b24a..ff966d301b32 100644 --- a/pkg/kv/kvnemesis/operations.proto +++ b/pkg/kv/kvnemesis/operations.proto @@ -122,6 +122,21 @@ message ChangeZoneOperation { Result result = 2 [(gogoproto.nullable) = false]; } +message SavepointOperation { + int32 seq_num = 1; + Result result = 2 [(gogoproto.nullable) = false]; +} + +message SavepointReleaseOperation { + int32 seq_num = 1; + Result result = 2 [(gogoproto.nullable) = false]; +} + +message SavepointRollbackOperation { + int32 seq_num = 1; + Result result = 2 [(gogoproto.nullable) = false]; +} + message Operation { option (gogoproto.goproto_stringer) = false; option (gogoproto.onlyone) = true; @@ -146,6 +161,9 @@ message Operation { TransferLeaseOperation transfer_lease = 16; ChangeZoneOperation change_zone = 17; AddSSTableOperation add_sstable = 18 [(gogoproto.customname) = "AddSSTable"]; + SavepointOperation savepoint = 19; + SavepointReleaseOperation savepoint_release = 20; + SavepointRollbackOperation savepoint_rollback = 21; } enum ResultType { diff --git a/pkg/kv/kvnemesis/operations_test.go b/pkg/kv/kvnemesis/operations_test.go index 5c24e2034477..1968dcbc94b3 100644 --- a/pkg/kv/kvnemesis/operations_test.go +++ b/pkg/kv/kvnemesis/operations_test.go @@ -87,6 +87,15 @@ func TestOperationsFormat(t *testing.T) { { step: step(addSSTable(sstFile.Data(), sstSpan, sstTS, sstValueHeader.KVNemesisSeq.Get(), true)), }, + { + step: step( + closureTxn(ClosureTxnType_Commit, + isolation.Serializable, + createSavepoint(0), put(k9, 3), releaseSavepoint(0), + get(k8), + createSavepoint(4), del(k9, 1), rollbackSavepoint(4), + )), + }, } w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name())) diff --git a/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-release-savepoint b/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-release-savepoint new file mode 100644 index 000000000000..98ba0a0de7a7 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-release-savepoint @@ -0,0 +1,14 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.Put(ctx, tk(5), sv(0)) // @ + txn.CreateSavepoint(ctx, 1) // + txn.Put(ctx, tk(5), sv(2)) // @ + txn.CreateSavepoint(ctx, 3) // + txn.Get(ctx, tk(5)) // @ (v2, ) + txn.ReleaseSavepoint(ctx, 1) // + txn.Get(ctx, tk(5)) // @ (v2, ) + return nil +}) // @ +// ^-- txnpb: diff --git a/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-rollback-savepoint b/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-rollback-savepoint new file mode 100644 index 000000000000..135d3c9f22df --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-rollback-savepoint @@ -0,0 +1,14 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.Put(ctx, tk(5), sv(0)) // @ + txn.CreateSavepoint(ctx, 1) // + txn.Put(ctx, tk(5), sv(2)) // @ + txn.CreateSavepoint(ctx, 3) // + txn.Get(ctx, tk(5)) // @ (v2, ) + txn.RollbackSavepoint(ctx, 1) // + txn.Get(ctx, tk(5)) // @ (v0, ) + return nil +}) // @ +// ^-- txnpb: diff --git a/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-savepoint b/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-savepoint new file mode 100644 index 000000000000..631107464d00 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestApplier/txn-si-savepoint @@ -0,0 +1,12 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.Put(ctx, tk(5), sv(0)) // @ + txn.CreateSavepoint(ctx, 1) // + txn.Put(ctx, tk(5), sv(2)) // @ + txn.CreateSavepoint(ctx, 3) // + txn.Get(ctx, tk(5)) // @ (v2, ) + return nil +}) // @ +// ^-- txnpb: diff --git a/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-release-savepoint b/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-release-savepoint new file mode 100644 index 000000000000..6f6bd2f6b195 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-release-savepoint @@ -0,0 +1,14 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(5), sv(0)) // @ + txn.CreateSavepoint(ctx, 1) // + txn.Put(ctx, tk(5), sv(2)) // @ + txn.CreateSavepoint(ctx, 3) // + txn.Get(ctx, tk(5)) // @ (v2, ) + txn.ReleaseSavepoint(ctx, 1) // + txn.Get(ctx, tk(5)) // @ (v2, ) + return nil +}) // @ +// ^-- txnpb: diff --git a/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-rollback-savepoint b/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-rollback-savepoint new file mode 100644 index 000000000000..23e06d365c7f --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-rollback-savepoint @@ -0,0 +1,14 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(5), sv(0)) // @ + txn.CreateSavepoint(ctx, 1) // + txn.Put(ctx, tk(5), sv(2)) // @ + txn.CreateSavepoint(ctx, 3) // + txn.Get(ctx, tk(5)) // @ (v2, ) + txn.RollbackSavepoint(ctx, 1) // + txn.Get(ctx, tk(5)) // @ (v0, ) + return nil +}) // @ +// ^-- txnpb: diff --git a/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-savepoint b/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-savepoint new file mode 100644 index 000000000000..f5d56c4da873 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestApplier/txn-ssi-savepoint @@ -0,0 +1,12 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(5), sv(0)) // @ + txn.CreateSavepoint(ctx, 1) // + txn.Put(ctx, tk(5), sv(2)) // @ + txn.CreateSavepoint(ctx, 3) // + txn.Get(ctx, tk(5)) // @ (v2, ) + return nil +}) // @ +// ^-- txnpb: diff --git a/pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 new file mode 100644 index 000000000000..712a19c13f35 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestOperationsFormat/5 @@ -0,0 +1,13 @@ +echo +---- +···db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { +··· txn.SetIsoLevel(isolation.Serializable) +··· txn.CreateSavepoint(ctx, 0) +··· txn.Put(ctx, tk(9), sv(3)) +··· txn.ReleaseSavepoint(ctx, 0) +··· txn.Get(ctx, tk(8)) +··· txn.CreateSavepoint(ctx, 4) +··· txn.Del(ctx, tk(9) /* @s1 */) +··· txn.RollbackSavepoint(ctx, 4) +··· return nil +···}) diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/nested_savepoint_release b/pkg/kv/kvnemesis/testdata/TestValidate/nested_savepoint_release new file mode 100644 index 000000000000..36f49513668c --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/nested_savepoint_release @@ -0,0 +1,14 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(1), sv(1)) // + txn.CreateSavepoint(ctx, 2) // + txn.Put(ctx, tk(1), sv(2)) // + txn.CreateSavepoint(ctx, 4) // + txn.Put(ctx, tk(1), sv(3)) // + txn.ReleaseSavepoint(ctx, 2) // + txn.Get(ctx, tk(1)) // (v3, ) + return nil +}) // @0.000000001,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s3 v3 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/one_deleterange_after_write_missing_write b/pkg/kv/kvnemesis/testdata/TestValidate/one_deleterange_after_write_missing_write index d33dc3f2e3f1..429162ec2cf2 100644 --- a/pkg/kv/kvnemesis/testdata/TestValidate/one_deleterange_after_write_missing_write +++ b/pkg/kv/kvnemesis/testdata/TestValidate/one_deleterange_after_write_missing_write @@ -7,4 +7,4 @@ db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { return nil }) // @0.000000001,0 /Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 -committed serializable txn missing write at seq s2: [dr.d]/Table/100/"0000000000000001":missing->@s2 [dr.s]/Table/100/"000000000000000{1"-3"}:{gap:[, 0.000000001,0)}->[] +committed serializable txn missing write at seq s2: [dr.d]/Table/100/"0000000000000001":missing->@s2 [dr.s]/Table/100/"000000000000000{1"-3"}:{gap:[, 0.000000001,0),[0.000000001,0, )}->[] diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/one_deleterange_before_write_returning_wrong_value b/pkg/kv/kvnemesis/testdata/TestValidate/one_deleterange_before_write_returning_wrong_value index b2bfe819e6dc..7d3fe5c76362 100644 --- a/pkg/kv/kvnemesis/testdata/TestValidate/one_deleterange_before_write_returning_wrong_value +++ b/pkg/kv/kvnemesis/testdata/TestValidate/one_deleterange_before_write_returning_wrong_value @@ -3,4 +3,4 @@ echo db0.DelRange(ctx, tk(1), tk(3), true /* @s1 */) // @0.000000001,0 (/Table/100/"0000000000000001", ) db0.Put(ctx, tk(1), sv(2)) // @0.000000002,0 /Table/100/"0000000000000001"/0.000000002,0 @ s2 v2 -committed deleteRange missing write at seq s1: [dr.d]/Table/100/"0000000000000001":missing->@s1 [dr.s]/Table/100/"000000000000000{1"-3"}:{gap:[, 0.000000002,0)}->[] +committed deleteRange missing write at seq s1: [dr.d]/Table/100/"0000000000000001":missing->@s1 [dr.s]/Table/100/"000000000000000{1"-3"}:{gap:[, 0.000000001,0),[0.000000001,0, 0.000000002,0)}->[] diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_one_put b/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_one_put new file mode 100644 index 000000000000..cc0cb21fc03d --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_one_put @@ -0,0 +1,9 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(1), sv(1)) // + txn.CreateSavepoint(ctx, 2) // + return nil +}) // @0.000000001,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_release b/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_release new file mode 100644 index 000000000000..354c0ca5d377 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_release @@ -0,0 +1,12 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(1), sv(1)) // + txn.CreateSavepoint(ctx, 2) // + txn.Put(ctx, tk(1), sv(2)) // + txn.ReleaseSavepoint(ctx, 2) // + txn.Get(ctx, tk(1)) // (v2, ) + return nil +}) // @0.000000001,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s2 v2 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_rollback b/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_rollback new file mode 100644 index 000000000000..bd32c14f5663 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/one_savepoint_and_rollback @@ -0,0 +1,13 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(1), sv(1)) // + txn.CreateSavepoint(ctx, 2) // + txn.Put(ctx, tk(1), sv(2)) // + txn.Get(ctx, tk(1)) // (v2, ) + txn.RollbackSavepoint(ctx, 2) // + txn.Get(ctx, tk(1)) // (v1, ) + return nil +}) // @0.000000001,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_release_and_rollback b/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_release_and_rollback new file mode 100644 index 000000000000..c81738d82100 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_release_and_rollback @@ -0,0 +1,17 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(1), sv(1)) // + txn.CreateSavepoint(ctx, 2) // + txn.Put(ctx, tk(1), sv(2)) // + txn.Get(ctx, tk(1)) // (v2, ) + txn.CreateSavepoint(ctx, 5) // + txn.Put(ctx, tk(1), sv(3)) // + txn.Get(ctx, tk(1)) // (v3, ) + txn.ReleaseSavepoint(ctx, 5) // + txn.RollbackSavepoint(ctx, 2) // + txn.Get(ctx, tk(1)) // (v1, ) + return nil +}) // @0.000000001,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_with_no_last_write b/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_with_no_last_write new file mode 100644 index 000000000000..a43c9fcb1d54 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_with_no_last_write @@ -0,0 +1,10 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.CreateSavepoint(ctx, 0) // + txn.Put(ctx, tk(1), sv(1)) // + txn.Get(ctx, tk(1)) // (v1, ) + txn.RollbackSavepoint(ctx, 0) // + return nil +}) // @0.000000001,0 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_with_no_last_write_and_existing_write b/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_with_no_last_write_and_existing_write new file mode 100644 index 000000000000..7e0db02d643d --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/savepoint_with_no_last_write_and_existing_write @@ -0,0 +1,13 @@ +echo +---- +db0.Put(ctx, tk(1), sv(1)) // @0.000000001,0 +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.CreateSavepoint(ctx, 1) // + txn.Put(ctx, tk(1), sv(2)) // + txn.Get(ctx, tk(1)) // (v2, ) + txn.RollbackSavepoint(ctx, 1) // + txn.Get(ctx, tk(1)) // (v1, ) + return nil +}) // @0.000000002,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/two_closed_nested_savepoint_rollbacks b/pkg/kv/kvnemesis/testdata/TestValidate/two_closed_nested_savepoint_rollbacks new file mode 100644 index 000000000000..6b7f7b0e3959 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/two_closed_nested_savepoint_rollbacks @@ -0,0 +1,17 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(1), sv(1)) // + txn.CreateSavepoint(ctx, 2) // + txn.Put(ctx, tk(1), sv(2)) // + txn.Get(ctx, tk(1)) // (v2, ) + txn.CreateSavepoint(ctx, 5) // + txn.Put(ctx, tk(1), sv(3)) // + txn.Get(ctx, tk(1)) // (v3, ) + txn.RollbackSavepoint(ctx, 5) // + txn.RollbackSavepoint(ctx, 2) // + txn.Get(ctx, tk(1)) // (v1, ) + return nil +}) // @0.000000001,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/two_nested_savepoint_rollbacks b/pkg/kv/kvnemesis/testdata/TestValidate/two_nested_savepoint_rollbacks new file mode 100644 index 000000000000..7f7ac960b781 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/two_nested_savepoint_rollbacks @@ -0,0 +1,16 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.Put(ctx, tk(1), sv(1)) // + txn.CreateSavepoint(ctx, 2) // + txn.Put(ctx, tk(1), sv(2)) // + txn.Get(ctx, tk(1)) // (v2, ) + txn.CreateSavepoint(ctx, 5) // + txn.Put(ctx, tk(1), sv(3)) // + txn.Get(ctx, tk(1)) // (v3, ) + txn.RollbackSavepoint(ctx, 2) // + txn.Get(ctx, tk(1)) // (v1, ) + return nil +}) // @0.000000001,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/two_non-nested_savepoints_rollbacks b/pkg/kv/kvnemesis/testdata/TestValidate/two_non-nested_savepoints_rollbacks new file mode 100644 index 000000000000..ec3ab0b268a2 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/two_non-nested_savepoints_rollbacks @@ -0,0 +1,17 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Serializable) + txn.CreateSavepoint(ctx, 0) // + txn.Put(ctx, tk(1), sv(1)) // + txn.Get(ctx, tk(1)) // (v1, ) + txn.RollbackSavepoint(ctx, 0) // + txn.Put(ctx, tk(1), sv(2)) // + txn.CreateSavepoint(ctx, 5) // + txn.Put(ctx, tk(1), sv(3)) // + txn.Get(ctx, tk(1)) // (v3, ) + txn.RollbackSavepoint(ctx, 5) // + txn.Get(ctx, tk(1)) // (v2, ) + return nil +}) // @0.000000001,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s2 v2 diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index 71886956077b..47055ff625bf 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -13,7 +13,6 @@ package kvnemesis import ( "context" "fmt" - "reflect" "regexp" "sort" "strings" @@ -245,6 +244,19 @@ type observedScan struct { func (*observedScan) observedMarker() {} +const ( + create = iota + release + rollback +) + +type observedSavepoint struct { + SeqNum int + Type int +} + +func (*observedSavepoint) observedMarker() {} + type validator struct { kvs *Engine @@ -789,6 +801,24 @@ func (v *validator) processOp(op Operation) { case *ChangeZoneOperation: execTimestampStrictlyOptional = true v.failIfError(op, t.Result) // fail on all errors + case *SavepointOperation: + sp := &observedSavepoint{SeqNum: int(t.SeqNum), Type: create} + v.curObservations = append(v.curObservations, sp) + // Don't fail on all errors because savepoints can be labeled with + // errOmitted if a previous op in the txn failed. + v.checkError(op, t.Result) + case *SavepointReleaseOperation: + sp := &observedSavepoint{SeqNum: int(t.SeqNum), Type: release} + v.curObservations = append(v.curObservations, sp) + // Don't fail on all errors because savepoints can be labeled with + // errOmitted if a previous op in the txn failed. + v.checkError(op, t.Result) + case *SavepointRollbackOperation: + sp := &observedSavepoint{SeqNum: int(t.SeqNum), Type: rollback} + v.curObservations = append(v.curObservations, sp) + // Don't fail on all errors because savepoints can be labeled with + // errOmitted if a previous op in the txn failed. + v.checkError(op, t.Result) default: panic(errors.AssertionFailedf(`unknown operation type: %T %v`, t, t)) } @@ -926,6 +956,7 @@ func (v *validator) checkAtomicCommitted( // unit tested. lastWritesByIdx := map[int]struct{}{} var lastWrites roachpb.SpanGroup + var rollbackSp *observedSavepoint = nil for idx := len(txnObservations) - 1; idx >= 0; idx-- { observation := txnObservations[idx] switch o := observation.(type) { @@ -957,19 +988,10 @@ func (v *validator) checkAtomicCommitted( { var g roachpb.SpanGroup g.Add(lastWrites.Slice()...) - lastWrite = !g.Sub(sp) // if subtracting did nothing, it's a most recent write - if !lastWrite { - // Otherwise, add it back in, which should restore the old set. If it - // didn't, there was partial overlap, which shouldn't be possible. - g.Add(sp) - } - if then, now := lastWrites.Slice(), g.Slice(); !reflect.DeepEqual(then, now) { - v.failures = append(v.failures, - errors.AssertionFailedf("%s has write %q partially overlapping %+v; subtracting and re-adding gave %+v", atomicType, sp, then, now)) - return - } + // If subtracting did nothing and there are no active rollbacks, it's a + // most recent write. + lastWrite = !g.Sub(sp) && rollbackSp == nil } - if lastWrite { lastWritesByIdx[idx] = struct{}{} lastWrites.Add(sp) @@ -999,9 +1021,32 @@ func (v *validator) checkAtomicCommitted( panic(err) } } + case *observedSavepoint: + // We are iterating over the observations in reverse, so we encounter a + // savepoint rollback before the corresponding savepoint create. Assuming + // a rollback is preceded by a matching create, to identify the last write + // we need to make sure that any open rollback is closed by a matching create. + switch o.Type { + case create: + if rollbackSp != nil && rollbackSp.SeqNum == o.SeqNum { + rollbackSp = nil + } + case release: + // Savepoint releases don't affect the last write. + case rollback: + // If we've already observed a rollback, that already observed rollback + // will subsume rolling the current one back. + if rollbackSp == nil { + rollbackSp = o + } + } } } + // A map from a savepoint seq num to a batch that represents the state of the + // txn at the time of the savepoint creation. + savepointBatches := make(map[int]*pebble.Batch) + // Iterate through the observations, building up the snapshot visible at each // point in the atomic unit and filling in the valid read times (validating // them later, in a separate loop, for better errors). We also check that only @@ -1018,14 +1063,15 @@ func (v *validator) checkAtomicCommitted( // writeTS was populated above as the unique timestamp at which the writes became // visible. We know the operation had writes (we're looking at one now) and so // this operation has either materialized or is covered by a later one that did, - // and so we must have a timestamp here. We defer the failure to the next for - // loop, as we will have filled in the read timestamps at that time. + // and so we must have a timestamp here. + // The only exception is when all writes were rolled back by a savepoint + // rollback. In that case, we still need a writeTS to ensure reads within the txn + // can see those writes; the execTimestamp serves that purpose. if writeTS.IsEmpty() { - continue + writeTS = execTimestamp } _, isLastWrite := lastWritesByIdx[idx] - if !isLastWrite && o.Timestamp.IsSet() { failure = `committed txn overwritten key had write` break @@ -1067,11 +1113,60 @@ func (v *validator) checkAtomicCommitted( failure = `scan result not ordered correctly` } o.Valid = validScanTime(batch, o.Span, o.KVs, o.SkipLocked /* missingKeysValid */) + case *observedSavepoint: + switch o.Type { + case create: + // Clone the existing batch by creating a new batch and applying the + // existing batch state to it. + newBatch := v.kvs.kvs.NewIndexedBatch() + _ = newBatch.Apply(batch, nil) + savepointBatches[o.SeqNum] = newBatch + case release: + _, ok := savepointBatches[o.SeqNum] + if !ok { + panic(errors.AssertionFailedf("savepoint (release) seq num %d does not exist", o.SeqNum)) + } + // Iterate over the stored batches and remove from the map any batches + // with a seq num larger than or equal to the savepoint being released. + // We know these are nested savepoints that also need to be released. + // All these batches should also be closed + for sn, b := range savepointBatches { + if sn >= o.SeqNum { + _ = b.Close() + delete(savepointBatches, sn) + } + } + case rollback: + _, ok := savepointBatches[o.SeqNum] + if !ok { + panic(errors.AssertionFailedf("savepoint (rollback) seq num %d does not exist", o.SeqNum)) + } + batch = savepointBatches[o.SeqNum] + // Iterate over the stored batches and remove from the map any batches + // with a seq num larger than or equal to the savepoint being rolled + // back. We know these are nested savepoints that also need to be rolled + // back. All these batches should also be closed, except the one being + // rolled back because it is now the new current batch. + for sn, b := range savepointBatches { + if sn >= o.SeqNum { + if sn != o.SeqNum { + _ = b.Close() + } + delete(savepointBatches, sn) + } + } + } default: panic(errors.AssertionFailedf(`unknown observedOp: %T %s`, observation, observation)) } } + // Close any remaining batches in the map. These correspond to savepoints that + // were created but never rolled back or released. + for _, spBatch := range savepointBatches { + _ = spBatch.Close() + } + validPossibilities := disjointTimeSpans{{Start: hlc.MinTimestamp, End: hlc.MaxTimestamp}} for idx, observation := range txnObservations { if failure != `` { @@ -1093,6 +1188,9 @@ func (v *validator) checkAtomicCommitted( opValid = o.ValidTimes case *observedScan: opValid = o.Valid.Combined() + case *observedSavepoint: + // A savepoint is always valid. + opValid = disjointTimeSpans{{Start: hlc.MinTimestamp, End: hlc.MaxTimestamp}} default: panic(errors.AssertionFailedf(`unknown observedOp: %T %s`, observation, observation)) } @@ -1172,6 +1270,7 @@ func (v *validator) checkAtomicUncommitted(atomicType string, txnObservations [] case *observedScan: // TODO(dan): Figure out what we can assert about reads in an uncommitted // transaction. + case *observedSavepoint: default: panic(errors.AssertionFailedf(`unknown observedOp: %T %s`, observed, observed)) } @@ -1560,6 +1659,14 @@ func printObserved(observedOps ...observedOp) string { } fmt.Fprintf(&buf, "[%s]%s:%s->[%s]", opCode, o.Span, o.Valid, kvs.String()) + case *observedSavepoint: + opCode := "sp" + if o.Type == release { + opCode += "(release)" + } else if o.Type == rollback { + opCode += "(rollback)" + } + fmt.Fprintf(&buf, "[%s]seqNum:%d", opCode, o.SeqNum) default: panic(errors.AssertionFailedf(`unknown observedOp: %T %s`, observed, observed)) } diff --git a/pkg/kv/kvnemesis/validator_test.go b/pkg/kv/kvnemesis/validator_test.go index 014f54e7bbd7..e032a518b863 100644 --- a/pkg/kv/kvnemesis/validator_test.go +++ b/pkg/kv/kvnemesis/validator_test.go @@ -1995,6 +1995,155 @@ func TestValidate(t *testing.T) { }, kvs: kvs(kv(k1, t1, s1), kv(k2, t2, s2)), }, + { + name: "one savepoint and one put", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(put(k1, s1)), + withResult(createSavepoint(2)), + ), t1)), + }, + kvs: kvs(kv(k1, t1, s1)), + }, + { + name: "one savepoint and release", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(put(k1, s1)), + withResult(createSavepoint(2)), + withResult(put(k1, s2)), + withResult(releaseSavepoint(2)), + withReadResult(get(k1), v2), + ), t1)), + }, + kvs: kvs(kv(k1, t1, s2)), + }, + { + name: "nested savepoint release", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(put(k1, s1)), + withResult(createSavepoint(2)), + withResult(put(k1, s2)), + withResult(createSavepoint(4)), + withResult(put(k1, s3)), + withResult(releaseSavepoint(2)), + withReadResult(get(k1), v3), + ), t1)), + }, + kvs: kvs(kv(k1, t1, s3)), + }, + { + name: "one savepoint and rollback", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(put(k1, s1)), + withResult(createSavepoint(2)), + withResult(put(k1, s2)), + withReadResult(get(k1), v2), + withResult(rollbackSavepoint(2)), + withReadResult(get(k1), v1), + ), t1)), + }, + kvs: kvs(kv(k1, t1, s1)), + }, + { + name: "two nested savepoint rollbacks", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(put(k1, s1)), + withResult(createSavepoint(2)), + withResult(put(k1, s2)), + withReadResult(get(k1), v2), + withResult(createSavepoint(5)), + withResult(put(k1, s3)), + withReadResult(get(k1), v3), + withResult(rollbackSavepoint(2)), + withReadResult(get(k1), v1), + ), t1)), + }, + kvs: kvs(kv(k1, t1, s1)), + }, + { + name: "two closed nested savepoint rollbacks", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(put(k1, s1)), + withResult(createSavepoint(2)), + withResult(put(k1, s2)), + withReadResult(get(k1), v2), + withResult(createSavepoint(5)), + withResult(put(k1, s3)), + withReadResult(get(k1), v3), + withResult(rollbackSavepoint(5)), + withResult(rollbackSavepoint(2)), + withReadResult(get(k1), v1), + ), t1)), + }, + kvs: kvs(kv(k1, t1, s1)), + }, + { + name: "two non-nested savepoints rollbacks", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(createSavepoint(0)), + withResult(put(k1, s1)), + withReadResult(get(k1), v1), + withResult(rollbackSavepoint(0)), + withResult(put(k1, s2)), + withResult(createSavepoint(5)), + withResult(put(k1, s3)), + withReadResult(get(k1), v3), + withResult(rollbackSavepoint(5)), + withReadResult(get(k1), v2), + ), t1)), + }, + kvs: kvs(kv(k1, t1, s2)), + }, + { + name: "savepoint release and rollback", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(put(k1, s1)), + withResult(createSavepoint(2)), + withResult(put(k1, s2)), + withReadResult(get(k1), v2), + withResult(createSavepoint(5)), + withResult(put(k1, s3)), + withReadResult(get(k1), v3), + withResult(releaseSavepoint(5)), + withResult(rollbackSavepoint(2)), + withReadResult(get(k1), v1), + ), t1)), + }, + kvs: kvs(kv(k1, t1, s1)), + }, + { + name: "savepoint with no last write", + steps: []Step{ + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(createSavepoint(0)), + withResult(put(k1, s1)), + withReadResult(get(k1), v1), + withResult(rollbackSavepoint(0)), + ), t1)), + }, + kvs: nil, + }, + { + name: "savepoint with no last write and existing write", + steps: []Step{ + step(withResultTS(put(k1, s1), t1)), + step(withResultTS(closureTxnSSI(ClosureTxnType_Commit, + withResult(createSavepoint(1)), + withResult(put(k1, s2)), + withReadResult(get(k1), v2), + withResult(rollbackSavepoint(1)), + withReadResult(get(k1), v1), + ), t2)), + }, + kvs: kvs(kv(k1, t1, s1)), + }, } w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name()))