Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
102260: kv: teach kvnemesis about isolation levels and snapshot isolation r=arulajmani a=nvanbenschoten

Informs cockroachdb#100169.

This commit teaches kvnemesis about transaction isolation levels, and specifically to generate snapshot isolation transaction closures. The commit is strictly plumbing because isolation level is currently a no-op. Once snapshot isolation is implemented, the validator logic in kvnemesis will need to change.

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Apr 27, 2023
2 parents 5b44c4f + 85ff796 commit e875499
Show file tree
Hide file tree
Showing 111 changed files with 336 additions and 125 deletions.
4 changes: 4 additions & 0 deletions pkg/kv/kvnemesis/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
"//pkg/kv/kvnemesis/kvnemesisutil",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/concurrency/isolation",
"//pkg/kv/kvserver/liveness",
"//pkg/roachpb",
"//pkg/settings/cluster",
Expand Down Expand Up @@ -77,6 +78,7 @@ go_test(
"//pkg/kv/kvnemesis/kvnemesisutil",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/concurrency/isolation",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
Expand Down Expand Up @@ -111,6 +113,7 @@ proto_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvpb:kvpb_proto",
"//pkg/kv/kvserver/concurrency/isolation:isolation_proto",
"//pkg/roachpb:roachpb_proto",
"//pkg/util/hlc:hlc_proto",
"@com_github_cockroachdb_errors//errorspb:errorspb_proto",
Expand All @@ -126,6 +129,7 @@ go_proto_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/concurrency/isolation",
"//pkg/roachpb",
"//pkg/util/hlc",
"@com_github_cockroachdb_errors//errorspb",
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/kvnemesis/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func applyOp(ctx context.Context, env *Env, db *kv.DB, op *Operation) {
})
var savedTxn *kv.Txn
txnErr := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := txn.SetIsoLevel(o.IsoLevel); err != nil {
panic(err)
}
if savedTxn != nil && txn.TestingCloneTxn().Epoch == 0 {
// If the txn's current epoch is 0 and we've run at least one prior
// iteration, we were just aborted.
Expand Down
25 changes: 19 additions & 6 deletions pkg/kv/kvnemesis/applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/config/zonepb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -104,7 +105,10 @@ func TestApplier(t *testing.T) {
"delrange", step(delRange(k1, k3, 6)),
},
{
"txn-delrange", step(closureTxn(ClosureTxnType_Commit, delRange(k2, k4, 1))),
"txn-ssi-delrange", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, delRange(k2, k4, 1))),
},
{
"txn-si-delrange", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, delRange(k2, k4, 1))),
},
{
"get-err", step(get(k1)),
Expand All @@ -128,7 +132,10 @@ func TestApplier(t *testing.T) {
"delrange-err", step(delRange(k2, k3, 12)),
},
{
"txn-err", step(closureTxn(ClosureTxnType_Commit, delRange(k2, k4, 1))),
"txn-ssi-err", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, delRange(k2, k4, 1))),
},
{
"txn-si-err", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, delRange(k2, k4, 1))),
},
{
"batch-mixed", step(batch(put(k2, 2), get(k1), del(k2, 1), del(k3, 1), scan(k1, k3), reverseScanForUpdate(k1, k5))),
Expand All @@ -137,16 +144,22 @@ func TestApplier(t *testing.T) {
"batch-mixed-err", step(batch(put(k2, 2), getForUpdate(k1), scanForUpdate(k1, k3), reverseScan(k1, k3))),
},
{
"txn-commit-mixed", step(closureTxn(ClosureTxnType_Commit, put(k5, 5), batch(put(k6, 6), delRange(k3, k5, 1)))),
"txn-ssi-commit-mixed", step(closureTxn(ClosureTxnType_Commit, isolation.Serializable, put(k5, 5), batch(put(k6, 6), delRange(k3, k5, 1)))),
},
{
"txn-si-commit-mixed", step(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, put(k5, 5), batch(put(k6, 6), delRange(k3, k5, 1)))),
},
{
"txn-ssi-commit-batch", step(closureTxnCommitInBatch(isolation.Serializable, opSlice(get(k1), put(k6, 6)), put(k5, 5))),
},
{
"txn-commit-batch", step(closureTxnCommitInBatch(opSlice(get(k1), put(k6, 6)), put(k5, 5))),
"txn-si-commit-batch", step(closureTxnCommitInBatch(isolation.Snapshot, opSlice(get(k1), put(k6, 6)), put(k5, 5))),
},
{
"txn-rollback", step(closureTxn(ClosureTxnType_Rollback, put(k5, 5))),
"txn-ssi-rollback", step(closureTxn(ClosureTxnType_Rollback, isolation.Serializable, put(k5, 5))),
},
{
"txn-error", step(closureTxn(ClosureTxnType_Rollback, put(k5, 5))),
"txn-si-rollback", step(closureTxn(ClosureTxnType_Rollback, isolation.Snapshot, put(k5, 5))),
},
{
"split", step(split(k2)),
Expand Down
73 changes: 50 additions & 23 deletions pkg/kv/kvnemesis/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvnemesis/kvnemesisutil"
kvpb "github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap"
Expand Down Expand Up @@ -63,17 +64,25 @@ type OperationConfig struct {
// composition of the operations in the txn is controlled by TxnClientOps and
// TxnBatchOps
type ClosureTxnConfig struct {
// CommitSerializable is a serializable transaction that commits normally.
CommitSerializable int
// CommitSnapshot is a snapshot transaction that commits normally.
CommitSnapshot int
// RollbackSerializable is a serializable transaction that encounters an error
// at the end and has to roll back.
RollbackSerializable int
// RollbackSnapshot is a snapshot transaction that encounters an error at the
// end and has to roll back.
RollbackSnapshot int
// CommitSerializableInBatch is a serializable transaction that commits via
// the CommitInBatchMethod. This is an important part of the 1pc txn fastpath.
CommitSerializableInBatch int
// CommitSnapshotInBatch is a snapshot transaction that commits via the
// CommitInBatchMethod. This is an important part of the 1pc txn fastpath.
CommitSnapshotInBatch int

TxnClientOps ClientOperationConfig
TxnBatchOps BatchOperationConfig

// Commit is a transaction that commits normally.
Commit int
// Rollback is a transaction that encounters an error at the end and has to
// roll back.
Rollback int
// CommitInBatch is a transaction that commits via the CommitInBatchMethod.
// This is an important part of the 1pc txn fastpath.
CommitInBatch int
// When CommitInBatch is selected, CommitBatchOps controls the composition of
// the kv.Batch used.
CommitBatchOps ClientOperationConfig
Expand Down Expand Up @@ -208,12 +217,15 @@ func newAllOperationsConfig() GeneratorConfig {
DB: clientOpConfig,
Batch: batchOpConfig,
ClosureTxn: ClosureTxnConfig{
Commit: 5,
Rollback: 5,
CommitInBatch: 5,
TxnClientOps: clientOpConfig,
TxnBatchOps: batchOpConfig,
CommitBatchOps: clientOpConfig,
CommitSerializable: 3,
CommitSnapshot: 3,
RollbackSerializable: 3,
RollbackSnapshot: 3,
CommitSerializableInBatch: 3,
CommitSnapshotInBatch: 3,
TxnClientOps: clientOpConfig,
TxnBatchOps: batchOpConfig,
CommitBatchOps: clientOpConfig,
},
Split: SplitConfig{
SplitNew: 1,
Expand Down Expand Up @@ -849,16 +861,25 @@ func makeRandBatch(c *ClientOperationConfig) opGenFunc {
}

func (g *generator) registerClosureTxnOps(allowed *[]opGen, c *ClosureTxnConfig) {
const Commit, Rollback = ClosureTxnType_Commit, ClosureTxnType_Rollback
const SSI, SI = isolation.Serializable, isolation.Snapshot
addOpGen(allowed,
makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.CommitSerializable)
addOpGen(allowed,
makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.CommitSnapshotInBatch)
addOpGen(allowed,
makeClosureTxn(ClosureTxnType_Commit, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.Commit)
makeClosureTxn(Rollback, SSI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.RollbackSerializable)
addOpGen(allowed,
makeClosureTxn(ClosureTxnType_Rollback, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.Rollback)
makeClosureTxn(Rollback, SI, &c.TxnClientOps, &c.TxnBatchOps, nil /* commitInBatch*/), c.RollbackSnapshot)
addOpGen(allowed,
makeClosureTxn(ClosureTxnType_Commit, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps), c.CommitInBatch)
makeClosureTxn(Commit, SSI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps), c.CommitSerializableInBatch)
addOpGen(allowed,
makeClosureTxn(Commit, SI, &c.TxnClientOps, &c.TxnBatchOps, &c.CommitBatchOps), c.CommitSnapshotInBatch)
}

func makeClosureTxn(
txnType ClosureTxnType,
iso isolation.Level,
txnClientOps *ClientOperationConfig,
txnBatchOps *BatchOperationConfig,
commitInBatch *ClientOperationConfig,
Expand All @@ -872,7 +893,7 @@ func makeClosureTxn(
for i := range ops {
ops[i] = g.selectOp(rng, allowed)
}
op := closureTxn(txnType, ops...)
op := closureTxn(txnType, iso, ops...)
if commitInBatch != nil {
if txnType != ClosureTxnType_Commit {
panic(errors.AssertionFailedf(`CommitInBatch must commit got: %s`, txnType))
Expand Down Expand Up @@ -1012,12 +1033,18 @@ func opSlice(ops ...Operation) []Operation {
return ops
}

func closureTxn(typ ClosureTxnType, ops ...Operation) Operation {
return Operation{ClosureTxn: &ClosureTxnOperation{Ops: ops, Type: typ}}
func closureTxn(typ ClosureTxnType, iso isolation.Level, ops ...Operation) Operation {
return Operation{ClosureTxn: &ClosureTxnOperation{Ops: ops, Type: typ, IsoLevel: iso}}
}

func closureTxnSSI(typ ClosureTxnType, ops ...Operation) Operation {
return closureTxn(typ, isolation.Serializable, ops...)
}

func closureTxnCommitInBatch(commitInBatch []Operation, ops ...Operation) Operation {
o := closureTxn(ClosureTxnType_Commit, ops...)
func closureTxnCommitInBatch(
iso isolation.Level, commitInBatch []Operation, ops ...Operation,
) Operation {
o := closureTxn(ClosureTxnType_Commit, iso, ops...)
if len(commitInBatch) > 0 {
o.ClosureTxn.CommitInBatch = &BatchOperation{Ops: commitInBatch}
}
Expand Down
28 changes: 25 additions & 3 deletions pkg/kv/kvnemesis/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvnemesis/kvnemesisutil"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/datapathutils"
"github.com/cockroachdb/cockroach/pkg/testutils/echotest"
Expand Down Expand Up @@ -172,12 +173,33 @@ func TestRandStep(t *testing.T) {
case *ClosureTxnOperation:
countClientOps(&counts.ClosureTxn.TxnClientOps, &counts.ClosureTxn.TxnBatchOps, o.Ops...)
if o.CommitInBatch != nil {
counts.ClosureTxn.CommitInBatch++
switch o.IsoLevel {
case isolation.Serializable:
counts.ClosureTxn.CommitSerializableInBatch++
case isolation.Snapshot:
counts.ClosureTxn.CommitSnapshotInBatch++
default:
t.Fatalf("unexpected isolation level %s", o.IsoLevel)
}
countClientOps(&counts.ClosureTxn.CommitBatchOps, nil, o.CommitInBatch.Ops...)
} else if o.Type == ClosureTxnType_Commit {
counts.ClosureTxn.Commit++
switch o.IsoLevel {
case isolation.Serializable:
counts.ClosureTxn.CommitSerializable++
case isolation.Snapshot:
counts.ClosureTxn.CommitSnapshot++
default:
t.Fatalf("unexpected isolation level %s", o.IsoLevel)
}
} else if o.Type == ClosureTxnType_Rollback {
counts.ClosureTxn.Rollback++
switch o.IsoLevel {
case isolation.Serializable:
counts.ClosureTxn.RollbackSerializable++
case isolation.Snapshot:
counts.ClosureTxn.RollbackSnapshot++
default:
t.Fatalf("unexpected isolation level %s", o.IsoLevel)
}
}
case *SplitOperation:
if _, ok := splits[string(o.Key)]; ok {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvnemesis/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ func (op Operation) format(w *strings.Builder, fctx formatCtx) {
newFctx.receiver = txnName
w.WriteString(fctx.receiver)
fmt.Fprintf(w, `.Txn(ctx, func(ctx context.Context, %s *kv.Txn) error {`, txnName)
w.WriteString("\n")
w.WriteString(newFctx.indent)
w.WriteString(newFctx.receiver)
fmt.Fprintf(w, `.SetIsoLevel(isolation.%s)`, o.IsoLevel)
formatOps(w, newFctx, o.Ops)
if o.CommitInBatch != nil {
newFctx.receiver = `b`
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvnemesis/operations.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ option go_package = "github.com/cockroachdb/cockroach/pkg/kv/kvnemesis";
import "errorspb/errors.proto";
import "gogoproto/gogo.proto";
import "kv/kvpb/api.proto";
import "kv/kvserver/concurrency/isolation/levels.proto";
import "roachpb/data.proto";
import "util/hlc/timestamp.proto";

Expand All @@ -34,6 +35,7 @@ message ClosureTxnOperation {
repeated Operation ops = 2 [(gogoproto.nullable) = false];
BatchOperation commit_in_batch = 3;
ClosureTxnType type = 4;
cockroach.kv.kvserver.concurrency.isolation.Level iso_level = 7;
Result result = 5 [(gogoproto.nullable) = false];
roachpb.Transaction txn = 6;
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kv/kvnemesis/operations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
Expand Down Expand Up @@ -77,6 +78,7 @@ func TestOperationsFormat(t *testing.T) {
{
step: step(
closureTxn(ClosureTxnType_Commit,
isolation.Serializable,
batch(get(k7), get(k8), del(k9, 1)),
delRange(k10, k11, 2),
put(k11, 3),
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvnemesis/testdata/TestApplier/del-err
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
db0.Del(ctx, tk(2) /* @s1 */) // context canceled
db1.Del(ctx, tk(2) /* @s1 */) // context canceled
2 changes: 1 addition & 1 deletion pkg/kv/kvnemesis/testdata/TestApplier/delrange-err
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
db1.DelRange(ctx, tk(2), tk(3), true /* @s12 */) // context canceled
db0.DelRange(ctx, tk(2), tk(3), true /* @s12 */) // context canceled
2 changes: 1 addition & 1 deletion pkg/kv/kvnemesis/testdata/TestApplier/get-err
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
db1.Get(ctx, tk(1)) // context canceled
db0.Get(ctx, tk(1)) // context canceled
2 changes: 1 addition & 1 deletion pkg/kv/kvnemesis/testdata/TestApplier/put-err
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
db0.Put(ctx, tk(1), sv(1)) // context canceled
db1.Put(ctx, tk(1), sv(1)) // context canceled
2 changes: 1 addition & 1 deletion pkg/kv/kvnemesis/testdata/TestApplier/rscan-err
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
db0.ReverseScan(ctx, tk(1), tk(3), 0) // context canceled
db1.ReverseScan(ctx, tk(1), tk(3), 0) // context canceled
2 changes: 1 addition & 1 deletion pkg/kv/kvnemesis/testdata/TestApplier/rscan-for-update-err
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
db1.ReverseScanForUpdate(ctx, tk(1), tk(3), 0) // context canceled
db0.ReverseScanForUpdate(ctx, tk(1), tk(3), 0) // context canceled
2 changes: 1 addition & 1 deletion pkg/kv/kvnemesis/testdata/TestApplier/scan-for-update-err
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
echo
----
db1.ScanForUpdate(ctx, tk(1), tk(3), 0) // context canceled
db0.ScanForUpdate(ctx, tk(1), tk(3), 0) // context canceled
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Snapshot)
txn.Put(ctx, tk(5), sv(5)) // @<ts> <nil>
b := &kv.Batch{}
b.Get(tk(1)) // (<nil>, <nil>)
Expand Down
14 changes: 14 additions & 0 deletions pkg/kv/kvnemesis/testdata/TestApplier/txn-si-commit-mixed
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Snapshot)
txn.Put(ctx, tk(5), sv(5)) // @<ts> <nil>
{
b := &kv.Batch{}
b.Put(tk(6), sv(6)) // <nil>
b.DelRange(tk(3), tk(5), true /* @s1 */) // <nil>
txn.Run(ctx, b) // @<ts> <nil>
}
return nil
}) // @<ts> <nil>
// ^-- txnpb:<txn>
8 changes: 8 additions & 0 deletions pkg/kv/kvnemesis/testdata/TestApplier/txn-si-delrange
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
echo
----
db1.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Snapshot)
txn.DelRange(ctx, tk(2), tk(4), true /* @s1 */) // @<ts> <nil>
return nil
}) // @<ts> <nil>
// ^-- txnpb:<txn>
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Snapshot)
txn.DelRange(ctx, tk(2), tk(4), true /* @s1 */)
return nil
}) // context canceled
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
echo
----
db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
txn.SetIsoLevel(isolation.Snapshot)
txn.Put(ctx, tk(5), sv(5)) // @<ts> <nil>
return errors.New("rollback")
}) // rollback
Loading

0 comments on commit e875499

Please sign in to comment.