Skip to content

Commit

Permalink
kvnemesis: Add support for savepoints
Browse files Browse the repository at this point in the history
This patch adds basic support for savepoints in kvnemesis. It does not
handle validating retries using initial savepoints.

Fixes: cockroachdb#97444
Release note: None
  • Loading branch information
miraradeva committed Sep 13, 2023
1 parent 40dd180 commit 60d0f10
Show file tree
Hide file tree
Showing 29 changed files with 802 additions and 33 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvnemesis/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ go_library(
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//vfs",
"@org_golang_google_protobuf//proto",
"@org_golang_x_exp//slices",
],
)

Expand Down
51 changes: 48 additions & 3 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
*DeleteRangeOperation,
*DeleteRangeUsingTombstoneOperation,
*AddSSTableOperation:
applyClientOp(ctx, db, op, false)
applyClientOp(ctx, db, op, false, nil)
case *SplitOperation:
err := db.AdminSplit(ctx, o.Key, hlc.MaxTimestamp)
o.Result = resultInit(ctx, err)
Expand Down Expand Up @@ -155,6 +155,7 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
retryOnAbort.Next()
}
savedTxn = txn
savepoints := make(map[int]kv.SavepointToken)
// First error. Because we need to mark everything that
// we didn't "reach" due to a prior error with errOmitted,
// we *don't* return eagerly on this but save it to the end.
Expand All @@ -177,7 +178,7 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
continue
}

applyClientOp(ctx, txn, op, true)
applyClientOp(ctx, txn, op, true, &savepoints)
// The KV api disallows use of a txn after an operation on it errors.
if r := op.Result(); r.Type == ResultType_Error {
err = errors.DecodeError(ctx, *r.Err)
Expand Down Expand Up @@ -269,7 +270,9 @@ func batchRun(
return ts, nil
}

func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
func applyClientOp(
ctx context.Context, db clientI, op *Operation, inTxn bool, savepoints *map[int]kv.SavepointToken,
) {
switch o := op.GetValue().(type) {
case *GetOperation:
fn := (*kv.Batch).Get
Expand Down Expand Up @@ -403,6 +406,48 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
case *BatchOperation:
b := &kv.Batch{}
applyBatchOp(ctx, b, db.Run, o)
case *SavepointOperation:
// db is a Txn because savepoints are allowed only within transactions.
txn, ok := db.(*kv.Txn)
if !ok {
panic(errors.AssertionFailedf(`non-txn interface attempted to create a savepoint %v`, o))
}
spt, err := txn.CreateSavepoint(ctx)
o.Result = resultInit(ctx, err)
if err != nil {
return
}
(*savepoints)[int(o.SeqNum)] = spt
case *SavepointReleaseOperation:
// db is a Txn because savepoints are allowed only within transactions.
txn, ok := db.(*kv.Txn)
if !ok {
panic(errors.AssertionFailedf(`non-txn interface attempted to release a savepoint %v`, o))
}
spt, ok := (*savepoints)[int(o.SeqNum)]
if !ok {
panic(errors.AssertionFailedf("savepoint sequence number %d does not exist", o.SeqNum))
}
err := txn.ReleaseSavepoint(ctx, spt)
o.Result = resultInit(ctx, err)
if err != nil {
return
}
case *SavepointRollbackOperation:
// db is a Txn because savepoints are allowed only within transactions.
txn, ok := db.(*kv.Txn)
if !ok {
panic(errors.AssertionFailedf(`non-txn interface attempted to rollback a savepoint %v`, o))
}
spt, ok := (*savepoints)[int(o.SeqNum)]
if !ok {
panic(errors.AssertionFailedf("savepoint sequence number %d does not exist", o.SeqNum))
}
err := txn.RollbackToSavepoint(ctx, spt)
o.Result = resultInit(ctx, err)
if err != nil {
return
}
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, o, o))
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/kvnemesis/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,24 @@ func TestApplier(t *testing.T) {
{
"change-replicas", step(changeReplicas(k1, kvpb.ReplicationChange{ChangeType: roachpb.ADD_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}})),
},
{
"txn-ssi-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5))),
},
{
"txn-si-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5))),
},
{
"txn-ssi-release-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5), releaseSavepoint(1), get(k5))),
},
{
"txn-si-release-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5), releaseSavepoint(1), get(k5))),
},
{
"txn-ssi-rollback-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5), rollbackSavepoint(1), get(k5))),
},
{
"txn-si-rollback-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5), rollbackSavepoint(1), get(k5))),
},
}

w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name()))
Expand Down
106 changes: 95 additions & 11 deletions pkg/kv/kvnemesis/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"golang.org/x/exp/slices"
)

// GeneratorConfig contains all the tunable knobs necessary to run a Generator.
Expand Down Expand Up @@ -95,6 +96,7 @@ type ClosureTxnConfig struct {
// When CommitInBatch is selected, CommitBatchOps controls the composition of
// the kv.Batch used.
CommitBatchOps ClientOperationConfig
SavepointOps SavepointConfig
}

// ClientOperationConfig configures the relative probabilities of the
Expand Down Expand Up @@ -221,6 +223,15 @@ type ChangeZoneConfig struct {
ToggleGlobalReads int
}

type SavepointConfig struct {
// Savepoint is an operation that creates a new savepoint with a given name.
Savepoint int
// SavepointRelease is an operation that releases a savepoint with a given name.
SavepointRelease int
// SavepointRollback is an operation that rolls back a savepoint with a given name.
SavepointRollback int
}

// newAllOperationsConfig returns a GeneratorConfig that exercises *all*
// options. You probably want NewDefaultConfig. Most of the time, these will be
// the same, but having both allows us to merge code for operations that do not
Expand Down Expand Up @@ -253,6 +264,11 @@ func newAllOperationsConfig() GeneratorConfig {
Batch: 4,
Ops: clientOpConfig,
}
savepointConfig := SavepointConfig{
Savepoint: 1,
SavepointRelease: 1,
SavepointRollback: 1,
}
return GeneratorConfig{Ops: OperationConfig{
DB: clientOpConfig,
Batch: batchOpConfig,
Expand All @@ -269,6 +285,7 @@ func newAllOperationsConfig() GeneratorConfig {
TxnClientOps: clientOpConfig,
TxnBatchOps: batchOpConfig,
CommitBatchOps: clientOpConfig,
SavepointOps: savepointConfig,
},
Split: SplitConfig{
SplitNew: 1,
Expand Down Expand Up @@ -974,23 +991,23 @@ func (g *generator) registerClosureTxnOps(allowed *[]opGen, c *ClosureTxnConfig)
const Commit, Rollback = ClosureTxnType_Commit, ClosureTxnType_Rollback
const SSI, SI, RC = isolation.Serializable, isolation.Snapshot, isolation.ReadCommitted
addOpGen(allowed,
makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.CommitSerializable)
makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSerializable)
addOpGen(allowed,
makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.CommitSnapshot)
makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, nil, &c.SavepointOps /* commitInBatch*/), c.CommitSnapshot)
addOpGen(allowed,
makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.CommitReadCommitted)
makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, nil, &c.SavepointOps /* commitInBatch*/), c.CommitReadCommitted)
addOpGen(allowed,
makeClosureTxn(Rollback, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.RollbackSerializable)
makeClosureTxn(Rollback, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil, &c.SavepointOps /* commitInBatch*/), c.RollbackSerializable)
addOpGen(allowed,
makeClosureTxn(Rollback, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.RollbackSnapshot)
makeClosureTxn(Rollback, SI, &c.TxnClientOps, &c.TxnBatchOps, nil, &c.SavepointOps /* commitInBatch*/), c.RollbackSnapshot)
addOpGen(allowed,
makeClosureTxn(Rollback, RC, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.RollbackReadCommitted)
makeClosureTxn(Rollback, RC, &c.TxnClientOps, &c.TxnBatchOps, nil, &c.SavepointOps /* commitInBatch*/), c.RollbackReadCommitted)
addOpGen(allowed,
makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps), c.CommitSerializableInBatch)
makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSerializableInBatch)
addOpGen(allowed,
makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps), c.CommitSnapshotInBatch)
makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSnapshotInBatch)
addOpGen(allowed,
makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps), c.CommitReadCommittedInBatch)
makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitReadCommittedInBatch)
}

func makeClosureTxn(
Expand All @@ -999,15 +1016,20 @@ func makeClosureTxn(
txnClientOps *ClientOperationConfig,
txnBatchOps *BatchOperationConfig,
commitInBatch *ClientOperationConfig,
savepointOps *SavepointConfig,
) opGenFunc {
return func(g *generator, rng *rand.Rand) Operation {
var allowed []opGen
g.registerClientOps(&allowed, txnClientOps)
g.registerBatchOps(&allowed, txnBatchOps)
numOps := rng.Intn(4)
numOps := rng.Intn(10)
ops := make([]Operation, numOps)
var savepoints []int
for i := range ops {
ops[i] = g.selectOp(rng, allowed)
allowedSp := allowed
g.registerSavepointOps(&allowedSp, savepointOps, savepoints, i)
ops[i] = g.selectOp(rng, allowedSp)
updateSavepoints(&savepoints, ops[i])
}
op := closureTxn(txnType, iso, ops...)
if commitInBatch != nil {
Expand All @@ -1020,6 +1042,56 @@ func makeClosureTxn(
}
}

func (g *generator) registerSavepointOps(
allowed *[]opGen, s *SavepointConfig, existingSp []int, seqNum int,
) {
addOpGen(allowed, makeSavepoint(seqNum), s.Savepoint)
for _, sn := range existingSp {
addOpGen(allowed, makeRollbackSavepoint(sn), s.SavepointRollback)
addOpGen(allowed, makeReleaseSavepoint(sn), s.SavepointRelease)
}
}

func makeSavepoint(seqNum int) opGenFunc {
return func(_ *generator, _ *rand.Rand) Operation {
return createSavepoint(seqNum)
}
}

func makeReleaseSavepoint(seqNum int) opGenFunc {
return func(_ *generator, _ *rand.Rand) Operation {
return releaseSavepoint(seqNum)
}
}

func makeRollbackSavepoint(seqNum int) opGenFunc {
return func(_ *generator, _ *rand.Rand) Operation {
return rollbackSavepoint(seqNum)
}
}

// Based on the previously selected op, updateSavepoints modifies the slice of
// existing savepoints to either: (1) add a new savepoint if the previous op was
// SavepointOperation, or (2) go back and remove a suffix of savepoints if the
// previous operation was SavepointReleaseOperation or SavepointRollbackOperation.
// The savepoints slice can be thought of as a stack where the end of the slice
// is the top of the stack.
func updateSavepoints(savepoints *[]int, prevOp Operation) {
switch op := prevOp.GetValue().(type) {
case *SavepointOperation:
*savepoints = append(*savepoints, int(op.SeqNum))
case *SavepointReleaseOperation:
index := slices.Index(*savepoints, int(op.SeqNum))
*savepoints = (*savepoints)[:index]
case *SavepointRollbackOperation:
index := slices.Index(*savepoints, int(op.SeqNum))
*savepoints = (*savepoints)[:index]
default:
// prevOp is not a savepoint operation.
return
}
}

// fk stands for "from key", i.e. decode the uint64 the key represents.
// Panics on error.
func fk(k string) uint64 {
Expand Down Expand Up @@ -1269,3 +1341,15 @@ func addSSTable(
AsWrites: asWrites,
}}
}

func createSavepoint(seqNum int) Operation {
return Operation{Savepoint: &SavepointOperation{SeqNum: int32(seqNum)}}
}

func releaseSavepoint(seqNum int) Operation {
return Operation{SavepointRelease: &SavepointReleaseOperation{SeqNum: int32(seqNum)}}
}

func rollbackSavepoint(seqNum int) Operation {
return Operation{SavepointRollback: &SavepointRollbackOperation{SeqNum: int32(seqNum)}}
}
Loading

0 comments on commit 60d0f10

Please sign in to comment.