From 7ff1c793a62e644413484424e6d6c97e6c19d70b Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Sat, 7 Oct 2023 02:54:20 -0400 Subject: [PATCH] kvnemesis: validate writes and locking reads under weak isolation txns Fixes #100169. This commit updates kvnemesis to validate the atomicity of writes and locking reads under weak isolation transactions, leaving only non-locking reads unvalidated. This strengthens the validation of weak isolation transactions, which were previously in the mix but ignored during validation. In the future, we could explore adding some validation for non-locking reads as well, although this will be both more complex and isolation level dependent (snapshot and read committed behave differently). This enhancement serves an important role in testing the correctness of replicated locking reads under weak isolation transactions, both in that they are never lost like unreplicated locks, and that they enforce isolation even after being released by bumping the timestamp cache on release to the commit timestamp (7702ff8c and 0e6de2b5). If I remove either of those two commits, kvnemesis now fails. While here, the commit also bumps the maximum number of operations per transaction from 3 to 5. This creates more interesting transactions. Release note: None --- pkg/kv/kvnemesis/generator.go | 3 +- ...saction_with_atomic_locking_replicated_get | 13 ++ ...mic_locking_replicated_get_and_missing_key | 13 ++ ...action_with_atomic_locking_replicated_scan | 15 ++ ...ic_locking_replicated_scan_and_missing_key | 15 ++ ...k_isolation_transaction_with_atomic_writes | 10 ++ ...on_transaction_with_non-atomic_deleterange | 15 ++ ...ion_with_non-atomic_locking_replicated_get | 14 ++ ...on_with_non-atomic_locking_replicated_scan | 16 ++ ...n_with_non-atomic_locking_unreplicated_get | 13 ++ ..._with_non-atomic_locking_unreplicated_scan | 15 ++ ...ransaction_with_non-atomic_non-locking_get | 13 ++ ...ansaction_with_non-atomic_non-locking_scan | 15 ++ ...olation_transaction_with_non-atomic_writes | 11 ++ pkg/kv/kvnemesis/validator.go | 141 ++++++++++----- pkg/kv/kvnemesis/validator_test.go | 162 ++++++++++++++++++ 16 files changed, 444 insertions(+), 40 deletions(-) create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_locking_replicated_get create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_locking_replicated_get_and_missing_key create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_locking_replicated_scan create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_locking_replicated_scan_and_missing_key create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_writes create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_deleterange create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_locking_replicated_get create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_locking_replicated_scan create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_locking_unreplicated_get create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_locking_unreplicated_scan create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_non-locking_get create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_non-locking_scan create mode 100644 pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_writes diff --git a/pkg/kv/kvnemesis/generator.go b/pkg/kv/kvnemesis/generator.go index 6287d89f2498..06f060e178da 100644 --- a/pkg/kv/kvnemesis/generator.go +++ b/pkg/kv/kvnemesis/generator.go @@ -1371,7 +1371,8 @@ func makeClosureTxn( var allowed []opGen g.registerClientOps(&allowed, txnClientOps) g.registerBatchOps(&allowed, txnBatchOps) - numOps := rng.Intn(4) + const maxOps = 5 + numOps := rng.Intn(maxOps + 1) ops := make([]Operation, numOps) for i := range ops { ops[i] = g.selectOp(rng, allowed) diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_locking_replicated_get b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_locking_replicated_get new file mode 100644 index 000000000000..32d74f591e7f --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_locking_replicated_get @@ -0,0 +1,13 @@ +echo +---- +db0.Put(ctx, tk(1), sv(1)) // @0.000000001,0 +db0.Put(ctx, tk(1), sv(2)) // @0.000000002,0 +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.GetForShareGuaranteedDurability(ctx, tk(1)) // (v2, ) + txn.Put(ctx, tk(3), sv(3)) // + return nil +}) // @0.000000003,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 +/Table/100/"0000000000000001"/0.000000002,0 @ s2 v2 +/Table/100/"0000000000000003"/0.000000003,0 @ s3 v3 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_locking_replicated_get_and_missing_key b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_locking_replicated_get_and_missing_key new file mode 100644 index 000000000000..264ed15d002d --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_locking_replicated_get_and_missing_key @@ -0,0 +1,13 @@ +echo +---- +db0.Put(ctx, tk(1), sv(1)) // @0.000000001,0 +db0.Put(ctx, tk(1), sv(2)) // @0.000000002,0 +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.GetForShareGuaranteedDurability(ctx, tk(1)) // (, ) + txn.Put(ctx, tk(3), sv(3)) // + return nil +}) // @0.000000003,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 +/Table/100/"0000000000000001"/0.000000002,0 @ s2 v2 +/Table/100/"0000000000000003"/0.000000003,0 @ s3 v3 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_locking_replicated_scan b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_locking_replicated_scan new file mode 100644 index 000000000000..b4e7c88fba57 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_locking_replicated_scan @@ -0,0 +1,15 @@ +echo +---- +db0.Put(ctx, tk(1), sv(1)) // @0.000000001,0 +db0.Put(ctx, tk(1), sv(2)) // @0.000000002,0 +db0.Put(ctx, tk(2), sv(3)) // @0.000000001,0 +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.ScanForShareGuaranteedDurability(ctx, tk(1), tk(3), 0) // (/Table/100/"0000000000000001":v2, /Table/100/"0000000000000002":v3, ) + txn.Put(ctx, tk(3), sv(4)) // + return nil +}) // @0.000000003,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 +/Table/100/"0000000000000001"/0.000000002,0 @ s2 v2 +/Table/100/"0000000000000002"/0.000000001,0 @ s3 v3 +/Table/100/"0000000000000003"/0.000000003,0 @ s4 v4 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_locking_replicated_scan_and_missing_key b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_locking_replicated_scan_and_missing_key new file mode 100644 index 000000000000..76c6a4ccee68 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_locking_replicated_scan_and_missing_key @@ -0,0 +1,15 @@ +echo +---- +db0.Put(ctx, tk(1), sv(1)) // @0.000000001,0 +db0.Put(ctx, tk(1), sv(2)) // @0.000000002,0 +db0.Put(ctx, tk(2), sv(3)) // @0.000000001,0 +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.ScanForShareGuaranteedDurability(ctx, tk(1), tk(3), 0) // (/Table/100/"0000000000000001":v2, ) + txn.Put(ctx, tk(3), sv(4)) // + return nil +}) // @0.000000003,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 +/Table/100/"0000000000000001"/0.000000002,0 @ s2 v2 +/Table/100/"0000000000000002"/0.000000001,0 @ s3 v3 +/Table/100/"0000000000000003"/0.000000003,0 @ s4 v4 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_writes b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_writes new file mode 100644 index 000000000000..3015716ad924 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_atomic_writes @@ -0,0 +1,10 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.Put(ctx, tk(1), sv(1)) // + txn.Put(ctx, tk(2), sv(2)) // + return nil +}) // @0.000000002,0 +/Table/100/"0000000000000001"/0.000000002,0 @ s1 v1 +/Table/100/"0000000000000002"/0.000000002,0 @ s2 v2 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_deleterange b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_deleterange new file mode 100644 index 000000000000..1facb7067097 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_deleterange @@ -0,0 +1,15 @@ +echo +---- +db0.Put(ctx, tk(1), sv(1)) // @0.000000001,0 +db0.Put(ctx, tk(2), sv(2)) // @0.000000002,0 +db0.Put(ctx, tk(3), sv(3)) // @0.000000001,0 +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.DelRange(ctx, tk(1), tk(4), true /* @s4 */) // (/Table/100/"0000000000000001", /Table/100/"0000000000000003", ) + return nil +}) // @0.000000003,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 +/Table/100/"0000000000000001"/0.000000003,0 @ s4 v4 +/Table/100/"0000000000000002"/0.000000002,0 @ s2 v2 +/Table/100/"0000000000000003"/0.000000001,0 @ s3 v3 +/Table/100/"0000000000000003"/0.000000003,0 @ s4 v4 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_locking_replicated_get b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_locking_replicated_get new file mode 100644 index 000000000000..5fa88da4b81e --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_locking_replicated_get @@ -0,0 +1,14 @@ +echo +---- +db0.Put(ctx, tk(1), sv(1)) // @0.000000001,0 +db0.Put(ctx, tk(1), sv(2)) // @0.000000002,0 +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.GetForShareGuaranteedDurability(ctx, tk(1)) // (v1, ) + txn.Put(ctx, tk(3), sv(3)) // + return nil +}) // @0.000000003,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 +/Table/100/"0000000000000001"/0.000000002,0 @ s2 v2 +/Table/100/"0000000000000003"/0.000000003,0 @ s3 v3 +committed snapshot txn non-atomic timestamps: [r]/Table/100/"0000000000000001":[0.000000001,0, 0.000000002,0)->v1 [w]/Table/100/"0000000000000003":0.000000003,0->v3@s3 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_locking_replicated_scan b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_locking_replicated_scan new file mode 100644 index 000000000000..9cd087be30c4 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_locking_replicated_scan @@ -0,0 +1,16 @@ +echo +---- +db0.Put(ctx, tk(1), sv(1)) // @0.000000001,0 +db0.Put(ctx, tk(1), sv(2)) // @0.000000002,0 +db0.Put(ctx, tk(2), sv(3)) // @0.000000001,0 +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.ScanForShareGuaranteedDurability(ctx, tk(1), tk(3), 0) // (/Table/100/"0000000000000001":v1, /Table/100/"0000000000000002":v3, ) + txn.Put(ctx, tk(3), sv(4)) // + return nil +}) // @0.000000003,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 +/Table/100/"0000000000000001"/0.000000002,0 @ s2 v2 +/Table/100/"0000000000000002"/0.000000001,0 @ s3 v3 +/Table/100/"0000000000000003"/0.000000003,0 @ s4 v4 +committed snapshot txn non-atomic timestamps: [r]/Table/100/"0000000000000001":[0.000000001,0, 0.000000002,0)->v1 [r]/Table/100/"0000000000000002":[0.000000001,0, )->v3 [w]/Table/100/"0000000000000003":0.000000003,0->v4@s4 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_locking_unreplicated_get b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_locking_unreplicated_get new file mode 100644 index 000000000000..fb3b5b35af4f --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_locking_unreplicated_get @@ -0,0 +1,13 @@ +echo +---- +db0.Put(ctx, tk(1), sv(1)) // @0.000000001,0 +db0.Put(ctx, tk(1), sv(2)) // @0.000000002,0 +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.GetForShare(ctx, tk(1)) // (v1, ) + txn.Put(ctx, tk(3), sv(3)) // + return nil +}) // @0.000000003,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 +/Table/100/"0000000000000001"/0.000000002,0 @ s2 v2 +/Table/100/"0000000000000003"/0.000000003,0 @ s3 v3 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_locking_unreplicated_scan b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_locking_unreplicated_scan new file mode 100644 index 000000000000..c963d6516eac --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_locking_unreplicated_scan @@ -0,0 +1,15 @@ +echo +---- +db0.Put(ctx, tk(1), sv(1)) // @0.000000001,0 +db0.Put(ctx, tk(1), sv(2)) // @0.000000002,0 +db0.Put(ctx, tk(2), sv(3)) // @0.000000001,0 +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.ScanForShare(ctx, tk(1), tk(3), 0) // (/Table/100/"0000000000000001":v1, /Table/100/"0000000000000002":v3, ) + txn.Put(ctx, tk(3), sv(4)) // + return nil +}) // @0.000000003,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 +/Table/100/"0000000000000001"/0.000000002,0 @ s2 v2 +/Table/100/"0000000000000002"/0.000000001,0 @ s3 v3 +/Table/100/"0000000000000003"/0.000000003,0 @ s4 v4 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_non-locking_get b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_non-locking_get new file mode 100644 index 000000000000..7d1dadedad06 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_non-locking_get @@ -0,0 +1,13 @@ +echo +---- +db0.Put(ctx, tk(1), sv(1)) // @0.000000001,0 +db0.Put(ctx, tk(1), sv(2)) // @0.000000002,0 +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.Get(ctx, tk(1)) // (v1, ) + txn.Put(ctx, tk(3), sv(3)) // + return nil +}) // @0.000000003,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 +/Table/100/"0000000000000001"/0.000000002,0 @ s2 v2 +/Table/100/"0000000000000003"/0.000000003,0 @ s3 v3 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_non-locking_scan b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_non-locking_scan new file mode 100644 index 000000000000..d6515a383526 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_non-locking_scan @@ -0,0 +1,15 @@ +echo +---- +db0.Put(ctx, tk(1), sv(1)) // @0.000000001,0 +db0.Put(ctx, tk(1), sv(2)) // @0.000000002,0 +db0.Put(ctx, tk(2), sv(3)) // @0.000000001,0 +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.Scan(ctx, tk(1), tk(3), 0) // (/Table/100/"0000000000000001":v1, /Table/100/"0000000000000002":v3, ) + txn.Put(ctx, tk(3), sv(4)) // + return nil +}) // @0.000000003,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 +/Table/100/"0000000000000001"/0.000000002,0 @ s2 v2 +/Table/100/"0000000000000002"/0.000000001,0 @ s3 v3 +/Table/100/"0000000000000003"/0.000000003,0 @ s4 v4 diff --git a/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_writes b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_writes new file mode 100644 index 000000000000..87249bbcf6d0 --- /dev/null +++ b/pkg/kv/kvnemesis/testdata/TestValidate/weak_isolation_transaction_with_non-atomic_writes @@ -0,0 +1,11 @@ +echo +---- +db0.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + txn.SetIsoLevel(isolation.Snapshot) + txn.Put(ctx, tk(1), sv(1)) // + txn.Put(ctx, tk(2), sv(2)) // + return nil +}) // @0.000000002,0 +/Table/100/"0000000000000001"/0.000000001,0 @ s1 v1 +/Table/100/"0000000000000002"/0.000000002,0 @ s2 v2 +committed snapshot txn non-atomic timestamps: [w]/Table/100/"0000000000000001":0.000000001,0->v1@s1 [w]/Table/100/"0000000000000002":0.000000002,0->v2@s2 diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index af619f2f6104..37fa1ddebf79 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -22,7 +22,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvnemesis/kvnemesisutil" kvpb "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -251,8 +250,9 @@ type validator struct { // Observations for the current atomic unit. This is reset between units, in // checkAtomic, which then calls processOp (which might recurse owing to the // existence of txn closures, batches, etc). - curObservations []observedOp - buffering bufferingType + curObservations []observedOp + observationFilter observationFilter + buffering bufferingType // NB: The Generator carefully ensures that each value written is unique // globally over a run, so there's a 1:1 relationship between a value that was @@ -346,6 +346,18 @@ func (v *validator) tryConsumeRangedWrite( return consumed, len(consumed) > 0 } +// observationFilter describes which observations should be included in the +// validator's observations. +type observationFilter int + +const ( + // observeAll includes all observations. + observeAll observationFilter = iota + // observeLocking includes only observations for operations that acquire locks + // (i.e. writes and locking reads). + observeLocking +) + type bufferingType byte const ( @@ -399,7 +411,23 @@ func (v *validator) processOp(op Operation) { SkipLocked: t.SkipLocked, Value: roachpb.Value{RawBytes: t.Result.Value}, } - v.curObservations = append(v.curObservations, read) + var observe bool + switch v.observationFilter { + case observeAll: + observe = true + case observeLocking: + // NOTE: even if t.ForUpdate || t.ForShare, we only consider the read to + // be locking if it has a guaranteed durability. Furthermore, we only + // consider the read as an observation if it found and returned a value, + // otherwise no lock would have been acquired on the non-existent key. + // Gets do not acquire gap locks. + observe = t.GuaranteedDurability && read.Value.IsPresent() + default: + panic("unexpected") + } + if observe { + v.curObservations = append(v.curObservations, read) + } if v.buffering == bufferingSingle { v.checkAtomic(`get`, t.Result) @@ -469,14 +497,22 @@ func (v *validator) processOp(op Operation) { } v.curObservations = append(v.curObservations, deleteOps...) // The span ought to be empty right after the DeleteRange. - v.curObservations = append(v.curObservations, &observedScan{ - Span: roachpb.Span{ - Key: t.Key, - EndKey: t.EndKey, - }, - IsDeleteRange: true, // just for printing - KVs: nil, - }) + // + // However, we do not add this observation if the observation filter is + // observeLocking because the DeleteRange's read is not locking. This means + // that for isolation levels that permit write skew, the DeleteRange does + // not prevent new keys from being inserted in the deletion span between the + // transaction's read and write timestamps. + if v.observationFilter != observeLocking { + v.curObservations = append(v.curObservations, &observedScan{ + Span: roachpb.Span{ + Key: t.Key, + EndKey: t.EndKey, + }, + IsDeleteRange: true, // just for printing + KVs: nil, + }) + } if v.buffering == bufferingSingle { v.checkAtomic(`deleteRange`, t.Result) @@ -547,12 +583,17 @@ func (v *validator) processOp(op Operation) { // The span ought to be empty right after the DeleteRange, even if parts of // the DeleteRange that didn't materialize due to a shadowing operation. - v.curObservations = append(v.curObservations, &observedScan{ - Span: roachpb.Span{ - Key: t.Key, - EndKey: t.EndKey, - }, - }) + // + // See above for why we do not add this observation if the observation + // filter is observeLocking. + if v.observationFilter != observeLocking { + v.curObservations = append(v.curObservations, &observedScan{ + Span: roachpb.Span{ + Key: t.Key, + EndKey: t.EndKey, + }, + }) + } if v.buffering == bufferingSingle { v.checkAtomic(`deleteRangeUsingTombstone`, t.Result) @@ -656,22 +697,43 @@ func (v *validator) processOp(op Operation) { if _, isErr := v.checkError(op, t.Result); isErr { break } - scan := &observedScan{ - Span: roachpb.Span{ - Key: t.Key, - EndKey: t.EndKey, - }, - Reverse: t.Reverse, - SkipLocked: t.SkipLocked, - KVs: make([]roachpb.KeyValue, len(t.Result.Values)), - } - for i, kv := range t.Result.Values { - scan.KVs[i] = roachpb.KeyValue{ - Key: kv.Key, - Value: roachpb.Value{RawBytes: kv.Value}, + switch v.observationFilter { + case observeAll: + scan := &observedScan{ + Span: roachpb.Span{ + Key: t.Key, + EndKey: t.EndKey, + }, + Reverse: t.Reverse, + SkipLocked: t.SkipLocked, + KVs: make([]roachpb.KeyValue, len(t.Result.Values)), } + for i, kv := range t.Result.Values { + scan.KVs[i] = roachpb.KeyValue{ + Key: kv.Key, + Value: roachpb.Value{RawBytes: kv.Value}, + } + } + v.curObservations = append(v.curObservations, scan) + case observeLocking: + // If we are only observing locking operations then we only want to + // consider the scan to be locking if it has a guaranteed durability. + // Furthermore, we only consider the individual keys that were returned to + // be locked, not the entire span that was scanned. Scans do not acquire + // gap locks. + if t.GuaranteedDurability { + for _, kv := range t.Result.Values { + read := &observedRead{ + Key: kv.Key, + SkipLocked: t.SkipLocked, + Value: roachpb.Value{RawBytes: kv.Value}, + } + v.curObservations = append(v.curObservations, read) + } + } + default: + panic("unexpected") } - v.curObservations = append(v.curObservations, scan) if v.buffering == bufferingSingle { atomicScanType := `scan` @@ -714,19 +776,19 @@ func (v *validator) processOp(op Operation) { if t.CommitInBatch != nil { ops = append(ops, t.CommitInBatch.Ops...) } + if t.IsoLevel.ToleratesWriteSkew() { + // If the transaction ran under an isolation level that permits write skew + // then we only validate the atomicity of locking operations (writes and + // locking reads). Non-locking reads may be inconsistent with the commit + // timestamp of the transaction. + v.observationFilter = observeLocking + } v.buffering = bufferingBatchOrTxn for _, op := range ops { v.processOp(op) } - prevFailures := v.failures atomicTxnType := fmt.Sprintf(`%s txn`, t.IsoLevel.StringLower()) v.checkAtomic(atomicTxnType, t.Result) - if t.IsoLevel != isolation.Serializable { - // TODO(nvanbenschoten): for now, we run snapshot and read committed - // transactions in the mix but don't validate their results. Doing so - // is non-trivial. See #100169 and #100170 - v.failures = prevFailures - } case *SplitOperation: execTimestampStrictlyOptional = true v.failIfError(op, t.Result) // splits should never return *any* error @@ -809,6 +871,7 @@ func (v *validator) processOp(op Operation) { func (v *validator) checkAtomic(atomicType string, result Result) { observations := v.curObservations v.curObservations = nil + v.observationFilter = observeAll v.buffering = bufferingSingle // Only known-uncommitted results may come without a timestamp. Whenever we diff --git a/pkg/kv/kvnemesis/validator_test.go b/pkg/kv/kvnemesis/validator_test.go index a709af6dfca0..9e22178afaf8 100644 --- a/pkg/kv/kvnemesis/validator_test.go +++ b/pkg/kv/kvnemesis/validator_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvnemesis/kvnemesisutil" kvpb "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage" @@ -79,6 +80,10 @@ func withReadResultTS(op Operation, value string, ts int) Operation { return op } +func withScanResult(op Operation, kvs ...KeyValue) Operation { + return withScanResultTS(op, 0, kvs...) +} + func withScanResultTS(op Operation, ts int, kvs ...KeyValue) Operation { op = withTimestamp(withResult(op), ts) scan := op.GetValue().(*ScanOperation) @@ -2036,6 +2041,163 @@ func TestValidate(t *testing.T) { }, kvs: kvs(kv(k1, t1, s1), kv(k2, t2, s2)), }, + { + name: "weak isolation transaction with non-atomic writes", + steps: []Step{ + step(withResultTS(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, + withResultOK(put(k1, s1)), + withResultOK(put(k2, s2)), + ), t2)), + }, + kvs: kvs(kv(k1, t1, s1), kv(k2, t2, s2)), + }, + { + name: "weak isolation transaction with atomic writes", + steps: []Step{ + step(withResultTS(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, + withResultOK(put(k1, s1)), + withResultOK(put(k2, s2)), + ), t2)), + }, + kvs: kvs(kv(k1, t2, s1), kv(k2, t2, s2)), // difference: t2 instead of t1 + }, + { + name: "weak isolation transaction with non-atomic non-locking get", + steps: []Step{ + step(withResultTS(put(k1, s1), t1)), + step(withResultTS(put(k1, s2), t2)), + step(withResultTS(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, + withReadResult(get(k1), v1), + withResult(put(k3, s3)), + ), t3)), + }, + kvs: kvs(kv(k1, t1, s1), kv(k1, t2, s2), kv(k3, t3, s3)), + }, + { + name: "weak isolation transaction with non-atomic locking (unreplicated) get", + steps: []Step{ + step(withResultTS(put(k1, s1), t1)), + step(withResultTS(put(k1, s2), t2)), + step(withResultTS(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, + withReadResult(getForShare(k1), v1), + withResult(put(k3, s3)), + ), t3)), + }, + kvs: kvs(kv(k1, t1, s1), kv(k1, t2, s2), kv(k3, t3, s3)), + }, + { + name: "weak isolation transaction with non-atomic locking (replicated) get", + steps: []Step{ + step(withResultTS(put(k1, s1), t1)), + step(withResultTS(put(k1, s2), t2)), + step(withResultTS(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, + withReadResult(getForShareGuaranteedDurability(k1), v1), + withResult(put(k3, s3)), + ), t3)), + }, + kvs: kvs(kv(k1, t1, s1), kv(k1, t2, s2), kv(k3, t3, s3)), + }, + { + name: "weak isolation transaction with atomic locking (replicated) get", + steps: []Step{ + step(withResultTS(put(k1, s1), t1)), + step(withResultTS(put(k1, s2), t2)), + step(withResultTS(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, + withReadResult(getForShareGuaranteedDurability(k1), v2), // difference: v2 instead of v1 + withResult(put(k3, s3)), + ), t3)), + }, + kvs: kvs(kv(k1, t1, s1), kv(k1, t2, s2), kv(k3, t3, s3)), + }, + { + name: "weak isolation transaction with atomic locking (replicated) get and missing key", + steps: []Step{ + step(withResultTS(put(k1, s1), t1)), + step(withResultTS(put(k1, s2), t2)), + step(withResultTS(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, + withReadResult(getForShareGuaranteedDurability(k1), ``), // difference: no value instead of v1 + withResult(put(k3, s3)), + ), t3)), + }, + kvs: kvs(kv(k1, t1, s1), kv(k1, t2, s2), kv(k3, t3, s3)), + }, + { + name: "weak isolation transaction with non-atomic non-locking scan", + steps: []Step{ + step(withResultTS(put(k1, s1), t1)), + step(withResultTS(put(k1, s2), t2)), + step(withResultTS(put(k2, s3), t1)), + step(withResultTS(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, + withScanResult(scan(k1, k3), scanKV(k1, v1), scanKV(k2, v3)), + withResult(put(k3, s4)), + ), t3)), + }, + kvs: kvs(kv(k1, t1, s1), kv(k1, t2, s2), kv(k2, t1, s3), kv(k3, t3, s4)), + }, + { + name: "weak isolation transaction with non-atomic locking (unreplicated) scan", + steps: []Step{ + step(withResultTS(put(k1, s1), t1)), + step(withResultTS(put(k1, s2), t2)), + step(withResultTS(put(k2, s3), t1)), + step(withResultTS(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, + withScanResult(scanForShare(k1, k3), scanKV(k1, v1), scanKV(k2, v3)), + withResult(put(k3, s4)), + ), t3)), + }, + kvs: kvs(kv(k1, t1, s1), kv(k1, t2, s2), kv(k2, t1, s3), kv(k3, t3, s4)), + }, + { + name: "weak isolation transaction with non-atomic locking (replicated) scan", + steps: []Step{ + step(withResultTS(put(k1, s1), t1)), + step(withResultTS(put(k1, s2), t2)), + step(withResultTS(put(k2, s3), t1)), + step(withResultTS(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, + withScanResult(scanForShareGuaranteedDurability(k1, k3), scanKV(k1, v1), scanKV(k2, v3)), + withResult(put(k3, s4)), + ), t3)), + }, + kvs: kvs(kv(k1, t1, s1), kv(k1, t2, s2), kv(k2, t1, s3), kv(k3, t3, s4)), + }, + { + name: "weak isolation transaction with atomic locking (replicated) scan", + steps: []Step{ + step(withResultTS(put(k1, s1), t1)), + step(withResultTS(put(k1, s2), t2)), + step(withResultTS(put(k2, s3), t1)), + step(withResultTS(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, + withScanResult(scanForShareGuaranteedDurability(k1, k3), scanKV(k1, v2), scanKV(k2, v3)), // difference: v2 instead of v1 + withResult(put(k3, s4)), + ), t3)), + }, + kvs: kvs(kv(k1, t1, s1), kv(k1, t2, s2), kv(k2, t1, s3), kv(k3, t3, s4)), + }, + { + name: "weak isolation transaction with atomic locking (replicated) scan and missing key", + steps: []Step{ + step(withResultTS(put(k1, s1), t1)), + step(withResultTS(put(k1, s2), t2)), + step(withResultTS(put(k2, s3), t1)), + step(withResultTS(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, + withScanResult(scanForShareGuaranteedDurability(k1, k3), scanKV(k1, v2)), // difference: k2 not returned + withResult(put(k3, s4)), + ), t3)), + }, + kvs: kvs(kv(k1, t1, s1), kv(k1, t2, s2), kv(k2, t1, s3), kv(k3, t3, s4)), + }, + { + name: "weak isolation transaction with non-atomic deleterange", + steps: []Step{ + step(withResultTS(put(k1, s1), t1)), + step(withResultTS(put(k2, s2), t2)), + step(withResultTS(put(k3, s3), t1)), + step(withResultTS(closureTxn(ClosureTxnType_Commit, isolation.Snapshot, + withDeleteRangeResult(delRange(k1, k4, s4), noTS, roachpb.Key(k1), roachpb.Key(k3)), + ), t3)), + }, + kvs: kvs(kv(k1, t1, s1), kv(k1, t3, s4), kv(k2, t2, s2), kv(k3, t1, s3), kv(k3, t3, s4)), + }, } w := echotest.NewWalker(t, datapathutils.TestDataPath(t, t.Name()))