From f9e9bf3fa762e4d628c80404f56d4d244a33b829 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 20 Apr 2023 18:45:17 -0400 Subject: [PATCH] kv: revive isolation level exploration in kv correctness tests Informs #100131. This commit revives the isolation level exploration in kv correctness tests. These were removed in 39ba88b7. Very little of the code here is new, beyond extending this code for Read Committed. I have confirmed that `TestTxnDBWriteSkewAnomaly` fails if it does not expect Snapshot isolation to permit write skew and I update `IsEndTxnTriggeringRetryError` to ignore skew under Snapshot isolation. Release note: None --- .../kvclient/kvcoord/txn_correctness_test.go | 232 +++++++++++++----- 1 file changed, 175 insertions(+), 57 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_correctness_test.go b/pkg/kv/kvclient/kvcoord/txn_correctness_test.go index 2b807d9639b8..0896d0e0e676 100644 --- a/pkg/kv/kvclient/kvcoord/txn_correctness_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_correctness_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -35,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" ) type retryError struct { @@ -317,6 +319,89 @@ func parseHistories(histories []string, t *testing.T) [][]*cmd { return results } +// Easily accessible slices of transaction isolation variations. +var ( + allLevels = []isolation.Level{isolation.Serializable, isolation.Snapshot, isolation.ReadCommitted} + serializableAndSnapshot = []isolation.Level{isolation.Serializable, isolation.Snapshot} + onlySerializable = []isolation.Level{isolation.Serializable} +) + +// enumerateIsolations returns a slice enumerating all combinations of +// isolation types across the transactions. The inner slice describes +// the isolation level for each transaction. The outer slice contains +// each possible combination of such transaction isolations. +func enumerateIsolations(numTxns int, isoLevels []isolation.Level) [][]isolation.Level { + // Use a count from 0 to pow(# isolations, numTxns)-1 and examine + // n-ary digits to get all possible combinations of txn isolations. + n := len(isoLevels) + var result [][]isolation.Level + for i := 0; i < int(math.Pow(float64(n), float64(numTxns))); i++ { + desc := make([]isolation.Level, numTxns) + val := i + for j := 0; j < numTxns; j++ { + desc[j] = isoLevels[val%n] + val /= n + } + result = append(result, desc) + } + return result +} + +func TestEnumerateIsolations(t *testing.T) { + defer leaktest.AfterTest(t)() + SSI := isolation.Serializable + SI := isolation.Snapshot + RC := isolation.ReadCommitted + + expAll := [][]isolation.Level{ + {SSI, SSI, SSI}, + {SI, SSI, SSI}, + {RC, SSI, SSI}, + {SSI, SI, SSI}, + {SI, SI, SSI}, + {RC, SI, SSI}, + {SSI, RC, SSI}, + {SI, RC, SSI}, + {RC, RC, SSI}, + {SSI, SSI, SI}, + {SI, SSI, SI}, + {RC, SSI, SI}, + {SSI, SI, SI}, + {SI, SI, SI}, + {RC, SI, SI}, + {SSI, RC, SI}, + {SI, RC, SI}, + {RC, RC, SI}, + {SSI, SSI, RC}, + {SI, SSI, RC}, + {RC, SSI, RC}, + {SSI, SI, RC}, + {SI, SI, RC}, + {RC, SI, RC}, + {SSI, RC, RC}, + {SI, RC, RC}, + {RC, RC, RC}, + } + require.Equal(t, expAll, enumerateIsolations(3, allLevels)) + + expSSIAndSI := [][]isolation.Level{ + {SSI, SSI, SSI}, + {SI, SSI, SSI}, + {SSI, SI, SSI}, + {SI, SI, SSI}, + {SSI, SSI, SI}, + {SI, SSI, SI}, + {SSI, SI, SI}, + {SI, SI, SI}, + } + require.Equal(t, expSSIAndSI, enumerateIsolations(3, serializableAndSnapshot)) + + expSSI := [][]isolation.Level{ + {SSI, SSI, SSI}, + } + require.Equal(t, expSSI, enumerateIsolations(3, onlySerializable)) +} + // enumeratePriorities returns a slice enumerating all combinations of // priorities across the transactions. The inner slice describes the // priority for each transaction. The outer slice contains each possible @@ -587,17 +672,20 @@ func newHistoryVerifier( } } -func (hv *historyVerifier) run(db *kv.DB, t *testing.T) { +func (hv *historyVerifier) run(isoLevels []isolation.Level, db *kv.DB, t *testing.T) { log.Infof(context.Background(), "verifying all possible histories for the %q anomaly", hv.name) + enumIso := enumerateIsolations(len(hv.txns), isoLevels) enumPri := enumeratePriorities(len(hv.txns), []enginepb.TxnPriority{1, enginepb.MaxTxnPriority}) enumHis := enumerateHistories(hv.txns, hv.equal) - for _, p := range enumPri { - for _, h := range enumHis { - hv.retriedTxns = map[int]struct{}{} // always reset the retried txns set - if err := hv.runHistoryWithRetry(p, h, db, t); err != nil { - t.Errorf("expected success, experienced %s", err) - return + for _, i := range enumIso { + for _, p := range enumPri { + for _, h := range enumHis { + hv.retriedTxns = map[int]struct{}{} // always reset the retried txns set + if err := hv.runHistoryWithRetry(i, p, h, db, t); err != nil { + t.Errorf("expected success, experienced %s", err) + return + } } } } @@ -611,9 +699,13 @@ func (hv *historyVerifier) run(db *kv.DB, t *testing.T) { // // This process continues recursively if there are further retries. func (hv *historyVerifier) runHistoryWithRetry( - priorities []enginepb.TxnPriority, cmds []*cmd, db *kv.DB, t *testing.T, + isoLevels []isolation.Level, + priorities []enginepb.TxnPriority, + cmds []*cmd, + db *kv.DB, + t *testing.T, ) error { - if err := hv.runHistory(priorities, cmds, db, t); err != nil { + if err := hv.runHistory(isoLevels, priorities, cmds, db, t); err != nil { if log.V(1) { log.Infof(context.Background(), "got an error running history %s: %s", historyString(cmds), err) } @@ -636,7 +728,7 @@ func (hv *historyVerifier) runHistoryWithRetry( if log.V(1) { log.Infof(context.Background(), "after retry, running alternate history %d of %d", i, len(enumHis)) } - if err := hv.runHistoryWithRetry(priorities, h, db, t); err != nil { + if err := hv.runHistoryWithRetry(isoLevels, priorities, h, db, t); err != nil { return err } } @@ -645,7 +737,11 @@ func (hv *historyVerifier) runHistoryWithRetry( } func (hv *historyVerifier) runHistory( - priorities []enginepb.TxnPriority, cmds []*cmd, db *kv.DB, t *testing.T, + isoLevels []isolation.Level, + priorities []enginepb.TxnPriority, + cmds []*cmd, + db *kv.DB, + t *testing.T, ) error { hv.idx++ if t.Failed() { @@ -661,7 +757,7 @@ func (hv *historyVerifier) runHistory( plannedStr := historyString(cmds) if log.V(1) { - log.Infof(context.Background(), "pri=%d history=%s", priorities, plannedStr) + log.Infof(context.Background(), "iso=%s pri=%d history=%s", isoLevels, priorities, plannedStr) } hv.mu.actual = []string{} @@ -681,7 +777,7 @@ func (hv *historyVerifier) runHistory( for i, txnCmds := range txnMap { go func(i int, txnCmds []*cmd) { - if err := hv.runTxn(i, priorities[i], txnCmds, db, t); err != nil { + if err := hv.runTxn(i, isoLevels[i], priorities[i], txnCmds, db, t); err != nil { if re := (*retryError)(nil); !errors.As(err, &re) { reportErr := errors.Wrapf(err, "(%s): unexpected failure", cmds) select { @@ -723,12 +819,12 @@ func (hv *historyVerifier) runHistory( err = hv.verify.checkFn(verifyEnv) if err == nil { if log.V(1) { - log.Infof(context.Background(), "PASSED: pri=%d, history=%q", priorities, actualStr) + log.Infof(context.Background(), "PASSED: iso=%s pri=%d, history=%q", isoLevels, priorities, actualStr) } } if err != nil { - t.Errorf("%d: pri=%d, history=%q: actual=%q, verify=%q: %s", - hv.idx, priorities, plannedStr, actualStr, verifyStr, err) + t.Errorf("%d: iso=%s, pri=%d, history=%q: actual=%q, verify=%q: %s", + hv.idx, isoLevels, priorities, plannedStr, actualStr, verifyStr, err) } return err } @@ -756,7 +852,12 @@ func (hv *historyVerifier) runCmds( } func (hv *historyVerifier) runTxn( - txnIdx int, priority enginepb.TxnPriority, cmds []*cmd, db *kv.DB, t *testing.T, + txnIdx int, + isoLevel isolation.Level, + priority enginepb.TxnPriority, + cmds []*cmd, + db *kv.DB, + t *testing.T, ) error { var retry int txnName := fmt.Sprintf("txn %d", txnIdx+1) @@ -784,6 +885,9 @@ func (hv *historyVerifier) runTxn( } txn.SetDebugName(txnName) + if err := txn.SetIsoLevel(isoLevel); err != nil { + return err + } txn.TestingSetPriority(priority) env := map[string]int64{} @@ -821,7 +925,9 @@ func (hv *historyVerifier) runCmd( // checkConcurrency creates a history verifier, starts a new database // and runs the verifier. -func checkConcurrency(name string, txns []string, verify *verifier, t *testing.T) { +func checkConcurrency( + name string, isoLevels []isolation.Level, txns []string, verify *verifier, t *testing.T, +) { verifier := newHistoryVerifier(name, txns, verify, t) s := &localtestcluster.LocalTestCluster{ StoreTestingKnobs: &kvserver.StoreTestingKnobs{ @@ -840,7 +946,7 @@ func checkConcurrency(name string, txns []string, verify *verifier, t *testing.T } s.Start(t, testutils.NewNodeTestBaseContext(), kvcoord.InitFactoryForLocalTestCluster) defer s.Stop() - verifier.run(s.DB, t) + verifier.run(isoLevels, s.DB, t) } // The following tests for concurrency anomalies include documentation @@ -866,8 +972,8 @@ func checkConcurrency(name string, txns []string, verify *verifier, t *testing.T // In.m(x) - increment from txn "n" ("m"th retry) of key "x" // Cn.m - commit of txn "n" ("m"th retry) -// TestTxnDBReadSkewAnomaly verifies that transactions are not -// subject to the read skew anomaly, an example of a database +// TestTxnDBReadSkewAnomaly verifies that neither SI nor SSI isolation +// are subject to the read skew anomaly, an example of a database // constraint violation known as inconsistent analysis (see // http://research.microsoft.com/pubs/69541/tr-95-51.pdf). This anomaly // is prevented by REPEATABLE_READ. @@ -896,11 +1002,11 @@ func TestTxnDBReadSkewAnomaly(t *testing.T) { return nil }, } - checkConcurrency("read skew", []string{txn1, txn2}, verify, t) + checkConcurrency("read skew", serializableAndSnapshot, []string{txn1, txn2}, verify, t) } -// TestTxnDBLostUpdateAnomaly verifies that transactions are not -// subject to the lost update anomaly. This anomaly is prevented +// TestTxnDBLostUpdateAnomaly verifies that neither SI nor SSI isolation +// are subject to the lost update anomaly. This anomaly is prevented // in most cases by using the READ_COMMITTED ANSI isolation level. // However, only REPEATABLE_READ fully protects against it. // @@ -930,11 +1036,11 @@ func TestTxnDBLostUpdateAnomaly(t *testing.T) { return nil }, } - checkConcurrency("lost update", []string{txn, txn}, verify, t) + checkConcurrency("lost update", serializableAndSnapshot, []string{txn, txn}, verify, t) } -// TestTxnDBLostDeleteAnomaly verifies that transactions are not -// subject to the lost delete anomaly. See #6240. +// TestTxnDBLostDeleteAnomaly verifies that neither SI nor SSI +// isolation are subject to the lost delete anomaly. See #6240. // // With lost delete, the two deletions from txn2 are interleaved // with a read and write from txn1, allowing txn1 to read a pre- @@ -964,23 +1070,20 @@ func TestTxnDBLostDeleteAnomaly(t *testing.T) { return nil }, } - checkConcurrency("lost update (delete)", []string{txn1, txn2}, verify, t) + checkConcurrency("lost update (delete)", serializableAndSnapshot, []string{txn1, txn2}, verify, t) } -// TestTxnDBLostDeleteRangeAnomaly verifies that transactions are not +// TestTxnDBLostDeleteRangeAnomaly verifies that SSI isolation is not // subject to the lost delete range anomaly. See #6240. // // With lost delete range, the delete range for keys B-C leave no // deletion tombstones (as there are an infinite number of keys in the // range [B,C)). Without deletion tombstones, the anomaly manifests in // snapshot mode when txn1 pushes txn2 to commit at a higher timestamp -// and then txn1 writes B and commits an an earlier timestamp. The +// and then txn1 writes B and commits at an earlier timestamp. The // delete range request therefore committed but failed to delete the // value written to key B. // -// Note that the snapshot isolation level is no longer supported. This -// test is retained for good measure. -// // Lost delete range would typically fail with a history such as: // // D2(A) DR2(B-C) R1(A) C2 W1(B,A) C1 @@ -1001,11 +1104,11 @@ func TestTxnDBLostDeleteRangeAnomaly(t *testing.T) { return nil }, } - checkConcurrency("lost update (range delete)", []string{txn1, txn2}, verify, t) + checkConcurrency("lost update (range delete)", onlySerializable, []string{txn1, txn2}, verify, t) } -// TestTxnDBPhantomReadAnomaly verifies that transactions are not subject -// to the phantom reads anomaly. This anomaly is prevented by +// TestTxnDBPhantomReadAnomaly verifies that neither SI nor SSI isolation +// are subject to the phantom reads anomaly. This anomaly is prevented by // the SQL ANSI SERIALIZABLE isolation level, though it's also prevented // by snapshot isolation (i.e. Oracle's traditional "serializable"). // @@ -1031,11 +1134,11 @@ func TestTxnDBPhantomReadAnomaly(t *testing.T) { return nil }, } - checkConcurrency("phantom read", []string{txn1, txn2}, verify, t) + checkConcurrency("phantom read", serializableAndSnapshot, []string{txn1, txn2}, verify, t) } -// TestTxnDBPhantomDeleteAnomaly verifies that transactions are not -// subject to the phantom deletion anomaly; this is +// TestTxnDBPhantomDeleteAnomaly verifies that neither SI nor SSI +// isolation are subject to the phantom deletion anomaly; this is // similar to phantom reads, but verifies the delete range // functionality causes read/write conflicts. // @@ -1056,18 +1159,43 @@ func TestTxnDBPhantomDeleteAnomaly(t *testing.T) { return nil }, } - checkConcurrency("phantom delete", []string{txn1, txn2}, verify, t) + checkConcurrency("phantom delete", serializableAndSnapshot, []string{txn1, txn2}, verify, t) +} + +func runWriteSkewTest(t *testing.T, iso isolation.Level) { + checks := make(map[isolation.Level]func(map[string]int64) error) + checks[isolation.Serializable] = func(env map[string]int64) error { + if !((env["A"] == 1 && env["B"] == 2) || (env["A"] == 2 && env["B"] == 1)) { + return errors.Errorf("expected either A=1, B=2 -or- A=2, B=1, but have A=%d, B=%d", env["A"], env["B"]) + } + return nil + } + checks[isolation.Snapshot] = func(env map[string]int64) error { + // TODO(nvanbenschoten): re-enable this exception when we permit write skew + // under Snapshot isolation. + //if env["A"] == 1 && env["B"] == 1 { + // return nil + //} + return checks[isolation.Serializable](env) + } + + txn1 := "SC(A-C) W(A,A+B+1) C" + txn2 := "SC(A-C) W(B,A+B+1) C" + verify := &verifier{ + history: "R(A) R(B)", + checkFn: checks[iso], + } + checkConcurrency("write skew", []isolation.Level{iso}, []string{txn1, txn2}, verify, t) } -// TestTxnDBWriteSkewAnomaly verifies that transactions are not -// subject to the write skew anomaly. Write skew is only possible -// at the weaker, snapshot isolation level, which is no longer -// supported. +// TestTxnDBWriteSkewAnomaly verifies that SI suffers from the write +// skew anomaly but not SSI. The write skew anomaly is a condition which +// illustrates that snapshot isolation is not serializable in practice. // // With write skew, two transactions both read values from A and B // respectively, but each writes to either A or B only. Thus there are // no write/write conflicts but a cycle of dependencies which result in -// "skew". +// "skew". Only serializable isolation prevents this anomaly. // // Write skew would typically fail with a history such as: // @@ -1083,16 +1211,6 @@ func TestTxnDBPhantomDeleteAnomaly(t *testing.T) { func TestTxnDBWriteSkewAnomaly(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - txn1 := "SC(A-C) W(A,A+B+1) C" - txn2 := "SC(A-C) W(B,A+B+1) C" - verify := &verifier{ - history: "R(A) R(B)", - checkFn: func(env map[string]int64) error { - if !((env["A"] == 1 && env["B"] == 2) || (env["A"] == 2 && env["B"] == 1)) { - return errors.Errorf("expected either A=1, B=2 -or- A=2, B=1, but have A=%d, B=%d", env["A"], env["B"]) - } - return nil - }, - } - checkConcurrency("write skew", []string{txn1, txn2}, verify, t) + runWriteSkewTest(t, isolation.Serializable) + runWriteSkewTest(t, isolation.Snapshot) }