Skip to content

Commit

Permalink
kvnemesis: add support for savepoints
Browse files Browse the repository at this point in the history
This patch adds support for generating and validating savepoints in
kvnemesis.

Fixes: cockroachdb#97444

Release note: None
  • Loading branch information
miraradeva committed Jan 2, 2024
1 parent 13ce29e commit e52608e
Show file tree
Hide file tree
Showing 30 changed files with 868 additions and 35 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvnemesis/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ go_library(
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_cockroachdb_pebble//vfs",
"@org_golang_google_protobuf//proto",
"@org_golang_x_exp//slices",
],
)

Expand Down
60 changes: 57 additions & 3 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
*DeleteRangeOperation,
*DeleteRangeUsingTombstoneOperation,
*AddSSTableOperation:
applyClientOp(ctx, db, op, false)
applyClientOp(ctx, db, op, false /* inTxn */, nil /* spIDToToken */)
case *SplitOperation:
err := db.AdminSplit(ctx, o.Key, hlc.MaxTimestamp)
o.Result = resultInit(ctx, err)
Expand Down Expand Up @@ -169,6 +169,9 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
retryOnAbort.Next()
}
savedTxn = txn
// A map of a savepoint id to the corresponding savepoint token that was
// created after applying the savepoint op.
spIDToToken := make(map[int]kv.SavepointToken)
// First error. Because we need to mark everything that
// we didn't "reach" due to a prior error with errOmitted,
// we *don't* return eagerly on this but save it to the end.
Expand All @@ -191,7 +194,7 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
continue
}

applyClientOp(ctx, txn, op, true)
applyClientOp(ctx, txn, op, true /* inTxn */, &spIDToToken)
// The KV api disallows use of a txn after an operation on it errors.
if r := op.Result(); r.Type == ResultType_Error {
err = errors.DecodeError(ctx, *r.Err)
Expand Down Expand Up @@ -234,6 +237,8 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
o.Txn = savedTxn.TestingCloneTxn()
o.Result.OptionalTimestamp = o.Txn.WriteTimestamp
}
case *SavepointCreateOperation, *SavepointReleaseOperation, *SavepointRollbackOperation:
panic(errors.AssertionFailedf(`can't apply a savepoint operation %v outside of a ClosureTxnOperation`, o))
default:
panic(errors.AssertionFailedf(`unknown operation type: %T %v`, o, o))
}
Expand Down Expand Up @@ -286,7 +291,13 @@ func batchRun(
return ts, nil
}

func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
func applyClientOp(
ctx context.Context,
db clientI,
op *Operation,
inTxn bool,
spIDToToken *map[int]kv.SavepointToken,
) {
switch o := op.GetValue().(type) {
case *GetOperation:
res, ts, err := dbRunWithResultAndTimestamp(ctx, db, func(b *kv.Batch) {
Expand Down Expand Up @@ -438,6 +449,49 @@ func applyClientOp(ctx context.Context, db clientI, op *Operation, inTxn bool) {
case *BatchOperation:
b := &kv.Batch{}
applyBatchOp(ctx, b, db.Run, o)
case *SavepointCreateOperation:
txn, ok := db.(*kv.Txn) // savepoints are only allowed with transactions
if !ok {
panic(errors.AssertionFailedf(`non-txn interface attempted to create a savepoint %v`, o))
}
spt, err := txn.CreateSavepoint(ctx)
o.Result = resultInit(ctx, err)
if err != nil {
return
}
// Map the savepoint id to the newly created savepoint token.
if _, ok := (*spIDToToken)[int(o.ID)]; ok {
panic(errors.AssertionFailedf("applying a savepoint create op: ID %d already exists", o.ID))
}
(*spIDToToken)[int(o.ID)] = spt
case *SavepointReleaseOperation:
txn, ok := db.(*kv.Txn) // savepoints are only allowed with transactions
if !ok {
panic(errors.AssertionFailedf(`non-txn interface attempted to release a savepoint %v`, o))
}
spt, ok := (*spIDToToken)[int(o.ID)]
if !ok {
panic(errors.AssertionFailedf("applying a savepoint release op: ID %d does not exist", o.ID))
}
err := txn.ReleaseSavepoint(ctx, spt)
o.Result = resultInit(ctx, err)
if err != nil {
return
}
case *SavepointRollbackOperation:
txn, ok := db.(*kv.Txn) // savepoints are only allowed with transactions
if !ok {
panic(errors.AssertionFailedf(`non-txn interface attempted to rollback a savepoint %v`, o))
}
spt, ok := (*spIDToToken)[int(o.ID)]
if !ok {
panic(errors.AssertionFailedf("applying a savepoint rollback op: ID %d does not exist", o.ID))
}
err := txn.RollbackToSavepoint(ctx, spt)
o.Result = resultInit(ctx, err)
if err != nil {
return
}
default:
panic(errors.AssertionFailedf(`unknown batch operation type: %T %v`, o, o))
}
Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/kvnemesis/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,24 @@ func TestApplier(t *testing.T) {
{
"change-replicas", step(changeReplicas(k1, kvpb.ReplicationChange{ChangeType: roachpb.ADD_VOTER, Target: roachpb.ReplicationTarget{NodeID: 1, StoreID: 1}})),
},
{
"txn-ssi-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5))),
},
{
"txn-si-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5))),
},
{
"txn-ssi-release-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5), releaseSavepoint(1), get(k5))),
},
{
"txn-si-release-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5), releaseSavepoint(1), get(k5))),
},
{
"txn-ssi-rollback-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5), rollbackSavepoint(1), get(k5))),
},
{
"txn-si-rollback-savepoint", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, put(k5, 0), createSavepoint(1), put(k5, 2), createSavepoint(3), get(k5), rollbackSavepoint(1), get(k5))),
},
}

w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name()))
Expand Down
142 changes: 131 additions & 11 deletions pkg/kv/kvnemesis/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"golang.org/x/exp/slices"
)

// GeneratorConfig contains all the tunable knobs necessary to run a Generator.
Expand Down Expand Up @@ -95,6 +96,7 @@ type ClosureTxnConfig struct {
// When CommitInBatch is selected, CommitBatchOps controls the composition of
// the kv.Batch used.
CommitBatchOps ClientOperationConfig
SavepointOps SavepointConfig
}

// ClientOperationConfig configures the relative probabilities of the
Expand Down Expand Up @@ -321,6 +323,15 @@ type ChangeZoneConfig struct {
ToggleGlobalReads int
}

type SavepointConfig struct {
// SavepointCreate is an operation that creates a new savepoint with a given id.
SavepointCreate int
// SavepointRelease is an operation that releases a savepoint with a given id.
SavepointRelease int
// SavepointRollback is an operation that rolls back a savepoint with a given id.
SavepointRollback int
}

// newAllOperationsConfig returns a GeneratorConfig that exercises *all*
// options. You probably want NewDefaultConfig. Most of the time, these will be
// the same, but having both allows us to merge code for operations that do not
Expand Down Expand Up @@ -378,6 +389,12 @@ func newAllOperationsConfig() GeneratorConfig {
Batch: 4,
Ops: clientOpConfig,
}
// SavepointConfig is only relevant in ClosureTxnConfig.
savepointConfig := SavepointConfig{
SavepointCreate: 1,
SavepointRelease: 1,
SavepointRollback: 1,
}
return GeneratorConfig{Ops: OperationConfig{
DB: clientOpConfig,
Batch: batchOpConfig,
Expand All @@ -394,6 +411,7 @@ func newAllOperationsConfig() GeneratorConfig {
TxnClientOps: clientOpConfig,
TxnBatchOps: batchOpConfig,
CommitBatchOps: clientOpConfig,
SavepointOps: savepointConfig,
},
Split: SplitConfig{
SplitNew: 1,
Expand Down Expand Up @@ -1351,23 +1369,23 @@ func (g *generator) registerClosureTxnOps(allowed *[]opGen, c *ClosureTxnConfig)
const Commit, Rollback = ClosureTxnType_Commit, ClosureTxnType_Rollback
const SSI, SI, RC = isolation.Serializable, isolation.Snapshot, isolation.ReadCommitted
addOpGen(allowed,
makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.CommitSerializable)
makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSerializable)
addOpGen(allowed,
makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.CommitSnapshot)
makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitSnapshot)
addOpGen(allowed,
makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.CommitReadCommitted)
makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.CommitReadCommitted)
addOpGen(allowed,
makeClosureTxn(Rollback, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.RollbackSerializable)
makeClosureTxn(Rollback, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackSerializable)
addOpGen(allowed,
makeClosureTxn(Rollback, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.RollbackSnapshot)
makeClosureTxn(Rollback, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackSnapshot)
addOpGen(allowed,
makeClosureTxn(Rollback, RC, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.RollbackReadCommitted)
makeClosureTxn(Rollback, RC, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/, &c.SavepointOps), c.RollbackReadCommitted)
addOpGen(allowed,
makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps), c.CommitSerializableInBatch)
makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSerializableInBatch)
addOpGen(allowed,
makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps), c.CommitSnapshotInBatch)
makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitSnapshotInBatch)
addOpGen(allowed,
makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps), c.CommitReadCommittedInBatch)
makeClosureTxn(Commit, RC, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps, &c.SavepointOps), c.CommitReadCommittedInBatch)
}

func makeClosureTxn(
Expand All @@ -1376,16 +1394,33 @@ func makeClosureTxn(
txnClientOps *ClientOperationConfig,
txnBatchOps *BatchOperationConfig,
commitInBatch *ClientOperationConfig,
savepointOps *SavepointConfig,
) opGenFunc {
return func(g *generator, rng *rand.Rand) Operation {
// All allowed non-savepoint ops. These don't change as we iteratively
// select ops in the loop below.
var allowed []opGen
g.registerClientOps(&allowed, txnClientOps)
g.registerBatchOps(&allowed, txnBatchOps)
const maxOps = 5
const maxOps = 20
numOps := rng.Intn(maxOps + 1)
ops := make([]Operation, numOps)
// Stack of savepoint indexes/ids.
// The last element of the slice is the top of the stack.
var spIDs []int
for i := range ops {
ops[i] = g.selectOp(rng, allowed)
// In each iteration, we start with the allowed non-savepoint ops,
// and we add the valid savepoint ops in registerSavepointOps.
allowedIncludingSavepointOps := allowed
// Savepoints are registered on each iteration of the loop (unlike other
// ops that are registered once at the start) because depending on what
// savepoint ops are randomly selected in selectOp below, the set of
// allowed savepoint ops changes. See registerSavepointOps.
g.registerSavepointOps(&allowedIncludingSavepointOps, savepointOps, spIDs, i)
ops[i] = g.selectOp(rng, allowedIncludingSavepointOps)
// Now that a random op is selected, we may need to update the stack of
// existing savepoints. See maybeUpdateSavepoints.
maybeUpdateSavepoints(&spIDs, ops[i])
}
op := closureTxn(txnType, iso, ops...)
if commitInBatch != nil {
Expand All @@ -1398,6 +1433,79 @@ func makeClosureTxn(
}
}

// registerSavepointOps assumes existingSp is the current stack of savepoints
// and uses it to register only valid savepoint ops. I.e. releasing or rolling
// back a savepoint that hasn't been created or has already been released or
// rolled back is not allowed.
func (g *generator) registerSavepointOps(
allowed *[]opGen, s *SavepointConfig, existingSp []int, idx int,
) {
// A savepoint creation is always a valid. The index of the op in the txn is
// used as its id.
addOpGen(allowed, makeSavepointCreate(idx), s.SavepointCreate)
// For each existing savepoint, a rollback and a release op are valid.
for _, id := range existingSp {
addOpGen(allowed, makeSavepointRelease(id), s.SavepointRelease)
addOpGen(allowed, makeSavepointRollback(id), s.SavepointRollback)
}
}

func makeSavepointCreate(id int) opGenFunc {
return func(_ *generator, _ *rand.Rand) Operation {
return createSavepoint(id)
}
}

func makeSavepointRelease(id int) opGenFunc {
return func(_ *generator, _ *rand.Rand) Operation {
return releaseSavepoint(id)
}
}

func makeSavepointRollback(id int) opGenFunc {
return func(_ *generator, _ *rand.Rand) Operation {
return rollbackSavepoint(id)
}
}

// maybeUpdateSavepoints modifies the slice of existing savepoints based on the
// previously selected op to either: (1) add a new savepoint if the previous op
// was SavepointCreateOperation, or (2) remove a suffix of savepoints if the
// previous op was SavepointReleaseOperation or SavepointRollbackOperation.
func maybeUpdateSavepoints(existingSp *[]int, prevOp Operation) {
switch op := prevOp.GetValue().(type) {
case *SavepointCreateOperation:
// If the previous selected op was a savepoint creation, add it to the stack
// of existing savepoints.
if slices.Index(*existingSp, int(op.ID)) != -1 {
panic(errors.AssertionFailedf(`generating a savepoint create op: ID %d already exists`, int(op.ID)))
}
*existingSp = append(*existingSp, int(op.ID))
case *SavepointReleaseOperation:
// If the previous selected op was a savepoint release, remove it from the
// stack of existing savepoints, together with all other savepoint above it
// on the stack. E.g. if the existing savepoints are [1, 2, 3], releasing
// savepoint 2 means savepoint 3 also needs to be released.
index := slices.Index(*existingSp, int(op.ID))
if index == -1 {
panic(errors.AssertionFailedf(`generating a savepoint release op: ID %d does not exist`, int(op.ID)))
}
*existingSp = (*existingSp)[:index]
case *SavepointRollbackOperation:
// If the previous selected op was a savepoint rollback, remove it from the
// stack of existing savepoints, together with all other savepoint above it
// on the stack. E.g. if the existing savepoints are [1, 2, 3], rolling back
// savepoint 2 means savepoint 3 also needs to be rolled back.
index := slices.Index(*existingSp, int(op.ID))
if index == -1 {
panic(errors.AssertionFailedf(`generating a savepoint rollback op: ID %d does not exist`, int(op.ID)))
}
*existingSp = (*existingSp)[:index]
default:
// prevOp is not a savepoint operation, no need to adjust existingSp.
}
}

// fk stands for "from key", i.e. decode the uint64 the key represents.
// Panics on error.
func fk(k string) uint64 {
Expand Down Expand Up @@ -1731,3 +1839,15 @@ func addSSTable(
AsWrites: asWrites,
}}
}

func createSavepoint(id int) Operation {
return Operation{SavepointCreate: &SavepointCreateOperation{ID: int32(id)}}
}

func releaseSavepoint(id int) Operation {
return Operation{SavepointRelease: &SavepointReleaseOperation{ID: int32(id)}}
}

func rollbackSavepoint(id int) Operation {
return Operation{SavepointRollback: &SavepointRollbackOperation{ID: int32(id)}}
}
Loading

0 comments on commit e52608e

Please sign in to comment.