diff --git a/pkg/kv/kvclient/kvcoord/txn_correctness_test.go b/pkg/kv/kvclient/kvcoord/txn_correctness_test.go index 2b807d9639b8..0896d0e0e676 100644 --- a/pkg/kv/kvclient/kvcoord/txn_correctness_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_correctness_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -35,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" ) type retryError struct { @@ -317,6 +319,89 @@ func parseHistories(histories []string, t *testing.T) [][]*cmd { return results } +// Easily accessible slices of transaction isolation variations. +var ( + allLevels = []isolation.Level{isolation.Serializable, isolation.Snapshot, isolation.ReadCommitted} + serializableAndSnapshot = []isolation.Level{isolation.Serializable, isolation.Snapshot} + onlySerializable = []isolation.Level{isolation.Serializable} +) + +// enumerateIsolations returns a slice enumerating all combinations of +// isolation types across the transactions. The inner slice describes +// the isolation level for each transaction. The outer slice contains +// each possible combination of such transaction isolations. +func enumerateIsolations(numTxns int, isoLevels []isolation.Level) [][]isolation.Level { + // Use a count from 0 to pow(# isolations, numTxns)-1 and examine + // n-ary digits to get all possible combinations of txn isolations. + n := len(isoLevels) + var result [][]isolation.Level + for i := 0; i < int(math.Pow(float64(n), float64(numTxns))); i++ { + desc := make([]isolation.Level, numTxns) + val := i + for j := 0; j < numTxns; j++ { + desc[j] = isoLevels[val%n] + val /= n + } + result = append(result, desc) + } + return result +} + +func TestEnumerateIsolations(t *testing.T) { + defer leaktest.AfterTest(t)() + SSI := isolation.Serializable + SI := isolation.Snapshot + RC := isolation.ReadCommitted + + expAll := [][]isolation.Level{ + {SSI, SSI, SSI}, + {SI, SSI, SSI}, + {RC, SSI, SSI}, + {SSI, SI, SSI}, + {SI, SI, SSI}, + {RC, SI, SSI}, + {SSI, RC, SSI}, + {SI, RC, SSI}, + {RC, RC, SSI}, + {SSI, SSI, SI}, + {SI, SSI, SI}, + {RC, SSI, SI}, + {SSI, SI, SI}, + {SI, SI, SI}, + {RC, SI, SI}, + {SSI, RC, SI}, + {SI, RC, SI}, + {RC, RC, SI}, + {SSI, SSI, RC}, + {SI, SSI, RC}, + {RC, SSI, RC}, + {SSI, SI, RC}, + {SI, SI, RC}, + {RC, SI, RC}, + {SSI, RC, RC}, + {SI, RC, RC}, + {RC, RC, RC}, + } + require.Equal(t, expAll, enumerateIsolations(3, allLevels)) + + expSSIAndSI := [][]isolation.Level{ + {SSI, SSI, SSI}, + {SI, SSI, SSI}, + {SSI, SI, SSI}, + {SI, SI, SSI}, + {SSI, SSI, SI}, + {SI, SSI, SI}, + {SSI, SI, SI}, + {SI, SI, SI}, + } + require.Equal(t, expSSIAndSI, enumerateIsolations(3, serializableAndSnapshot)) + + expSSI := [][]isolation.Level{ + {SSI, SSI, SSI}, + } + require.Equal(t, expSSI, enumerateIsolations(3, onlySerializable)) +} + // enumeratePriorities returns a slice enumerating all combinations of // priorities across the transactions. The inner slice describes the // priority for each transaction. The outer slice contains each possible @@ -587,17 +672,20 @@ func newHistoryVerifier( } } -func (hv *historyVerifier) run(db *kv.DB, t *testing.T) { +func (hv *historyVerifier) run(isoLevels []isolation.Level, db *kv.DB, t *testing.T) { log.Infof(context.Background(), "verifying all possible histories for the %q anomaly", hv.name) + enumIso := enumerateIsolations(len(hv.txns), isoLevels) enumPri := enumeratePriorities(len(hv.txns), []enginepb.TxnPriority{1, enginepb.MaxTxnPriority}) enumHis := enumerateHistories(hv.txns, hv.equal) - for _, p := range enumPri { - for _, h := range enumHis { - hv.retriedTxns = map[int]struct{}{} // always reset the retried txns set - if err := hv.runHistoryWithRetry(p, h, db, t); err != nil { - t.Errorf("expected success, experienced %s", err) - return + for _, i := range enumIso { + for _, p := range enumPri { + for _, h := range enumHis { + hv.retriedTxns = map[int]struct{}{} // always reset the retried txns set + if err := hv.runHistoryWithRetry(i, p, h, db, t); err != nil { + t.Errorf("expected success, experienced %s", err) + return + } } } } @@ -611,9 +699,13 @@ func (hv *historyVerifier) run(db *kv.DB, t *testing.T) { // // This process continues recursively if there are further retries. func (hv *historyVerifier) runHistoryWithRetry( - priorities []enginepb.TxnPriority, cmds []*cmd, db *kv.DB, t *testing.T, + isoLevels []isolation.Level, + priorities []enginepb.TxnPriority, + cmds []*cmd, + db *kv.DB, + t *testing.T, ) error { - if err := hv.runHistory(priorities, cmds, db, t); err != nil { + if err := hv.runHistory(isoLevels, priorities, cmds, db, t); err != nil { if log.V(1) { log.Infof(context.Background(), "got an error running history %s: %s", historyString(cmds), err) } @@ -636,7 +728,7 @@ func (hv *historyVerifier) runHistoryWithRetry( if log.V(1) { log.Infof(context.Background(), "after retry, running alternate history %d of %d", i, len(enumHis)) } - if err := hv.runHistoryWithRetry(priorities, h, db, t); err != nil { + if err := hv.runHistoryWithRetry(isoLevels, priorities, h, db, t); err != nil { return err } } @@ -645,7 +737,11 @@ func (hv *historyVerifier) runHistoryWithRetry( } func (hv *historyVerifier) runHistory( - priorities []enginepb.TxnPriority, cmds []*cmd, db *kv.DB, t *testing.T, + isoLevels []isolation.Level, + priorities []enginepb.TxnPriority, + cmds []*cmd, + db *kv.DB, + t *testing.T, ) error { hv.idx++ if t.Failed() { @@ -661,7 +757,7 @@ func (hv *historyVerifier) runHistory( plannedStr := historyString(cmds) if log.V(1) { - log.Infof(context.Background(), "pri=%d history=%s", priorities, plannedStr) + log.Infof(context.Background(), "iso=%s pri=%d history=%s", isoLevels, priorities, plannedStr) } hv.mu.actual = []string{} @@ -681,7 +777,7 @@ func (hv *historyVerifier) runHistory( for i, txnCmds := range txnMap { go func(i int, txnCmds []*cmd) { - if err := hv.runTxn(i, priorities[i], txnCmds, db, t); err != nil { + if err := hv.runTxn(i, isoLevels[i], priorities[i], txnCmds, db, t); err != nil { if re := (*retryError)(nil); !errors.As(err, &re) { reportErr := errors.Wrapf(err, "(%s): unexpected failure", cmds) select { @@ -723,12 +819,12 @@ func (hv *historyVerifier) runHistory( err = hv.verify.checkFn(verifyEnv) if err == nil { if log.V(1) { - log.Infof(context.Background(), "PASSED: pri=%d, history=%q", priorities, actualStr) + log.Infof(context.Background(), "PASSED: iso=%s pri=%d, history=%q", isoLevels, priorities, actualStr) } } if err != nil { - t.Errorf("%d: pri=%d, history=%q: actual=%q, verify=%q: %s", - hv.idx, priorities, plannedStr, actualStr, verifyStr, err) + t.Errorf("%d: iso=%s, pri=%d, history=%q: actual=%q, verify=%q: %s", + hv.idx, isoLevels, priorities, plannedStr, actualStr, verifyStr, err) } return err } @@ -756,7 +852,12 @@ func (hv *historyVerifier) runCmds( } func (hv *historyVerifier) runTxn( - txnIdx int, priority enginepb.TxnPriority, cmds []*cmd, db *kv.DB, t *testing.T, + txnIdx int, + isoLevel isolation.Level, + priority enginepb.TxnPriority, + cmds []*cmd, + db *kv.DB, + t *testing.T, ) error { var retry int txnName := fmt.Sprintf("txn %d", txnIdx+1) @@ -784,6 +885,9 @@ func (hv *historyVerifier) runTxn( } txn.SetDebugName(txnName) + if err := txn.SetIsoLevel(isoLevel); err != nil { + return err + } txn.TestingSetPriority(priority) env := map[string]int64{} @@ -821,7 +925,9 @@ func (hv *historyVerifier) runCmd( // checkConcurrency creates a history verifier, starts a new database // and runs the verifier. -func checkConcurrency(name string, txns []string, verify *verifier, t *testing.T) { +func checkConcurrency( + name string, isoLevels []isolation.Level, txns []string, verify *verifier, t *testing.T, +) { verifier := newHistoryVerifier(name, txns, verify, t) s := &localtestcluster.LocalTestCluster{ StoreTestingKnobs: &kvserver.StoreTestingKnobs{ @@ -840,7 +946,7 @@ func checkConcurrency(name string, txns []string, verify *verifier, t *testing.T } s.Start(t, testutils.NewNodeTestBaseContext(), kvcoord.InitFactoryForLocalTestCluster) defer s.Stop() - verifier.run(s.DB, t) + verifier.run(isoLevels, s.DB, t) } // The following tests for concurrency anomalies include documentation @@ -866,8 +972,8 @@ func checkConcurrency(name string, txns []string, verify *verifier, t *testing.T // In.m(x) - increment from txn "n" ("m"th retry) of key "x" // Cn.m - commit of txn "n" ("m"th retry) -// TestTxnDBReadSkewAnomaly verifies that transactions are not -// subject to the read skew anomaly, an example of a database +// TestTxnDBReadSkewAnomaly verifies that neither SI nor SSI isolation +// are subject to the read skew anomaly, an example of a database // constraint violation known as inconsistent analysis (see // http://research.microsoft.com/pubs/69541/tr-95-51.pdf). This anomaly // is prevented by REPEATABLE_READ. @@ -896,11 +1002,11 @@ func TestTxnDBReadSkewAnomaly(t *testing.T) { return nil }, } - checkConcurrency("read skew", []string{txn1, txn2}, verify, t) + checkConcurrency("read skew", serializableAndSnapshot, []string{txn1, txn2}, verify, t) } -// TestTxnDBLostUpdateAnomaly verifies that transactions are not -// subject to the lost update anomaly. This anomaly is prevented +// TestTxnDBLostUpdateAnomaly verifies that neither SI nor SSI isolation +// are subject to the lost update anomaly. This anomaly is prevented // in most cases by using the READ_COMMITTED ANSI isolation level. // However, only REPEATABLE_READ fully protects against it. // @@ -930,11 +1036,11 @@ func TestTxnDBLostUpdateAnomaly(t *testing.T) { return nil }, } - checkConcurrency("lost update", []string{txn, txn}, verify, t) + checkConcurrency("lost update", serializableAndSnapshot, []string{txn, txn}, verify, t) } -// TestTxnDBLostDeleteAnomaly verifies that transactions are not -// subject to the lost delete anomaly. See #6240. +// TestTxnDBLostDeleteAnomaly verifies that neither SI nor SSI +// isolation are subject to the lost delete anomaly. See #6240. // // With lost delete, the two deletions from txn2 are interleaved // with a read and write from txn1, allowing txn1 to read a pre- @@ -964,23 +1070,20 @@ func TestTxnDBLostDeleteAnomaly(t *testing.T) { return nil }, } - checkConcurrency("lost update (delete)", []string{txn1, txn2}, verify, t) + checkConcurrency("lost update (delete)", serializableAndSnapshot, []string{txn1, txn2}, verify, t) } -// TestTxnDBLostDeleteRangeAnomaly verifies that transactions are not +// TestTxnDBLostDeleteRangeAnomaly verifies that SSI isolation is not // subject to the lost delete range anomaly. See #6240. // // With lost delete range, the delete range for keys B-C leave no // deletion tombstones (as there are an infinite number of keys in the // range [B,C)). Without deletion tombstones, the anomaly manifests in // snapshot mode when txn1 pushes txn2 to commit at a higher timestamp -// and then txn1 writes B and commits an an earlier timestamp. The +// and then txn1 writes B and commits at an earlier timestamp. The // delete range request therefore committed but failed to delete the // value written to key B. // -// Note that the snapshot isolation level is no longer supported. This -// test is retained for good measure. -// // Lost delete range would typically fail with a history such as: // // D2(A) DR2(B-C) R1(A) C2 W1(B,A) C1 @@ -1001,11 +1104,11 @@ func TestTxnDBLostDeleteRangeAnomaly(t *testing.T) { return nil }, } - checkConcurrency("lost update (range delete)", []string{txn1, txn2}, verify, t) + checkConcurrency("lost update (range delete)", onlySerializable, []string{txn1, txn2}, verify, t) } -// TestTxnDBPhantomReadAnomaly verifies that transactions are not subject -// to the phantom reads anomaly. This anomaly is prevented by +// TestTxnDBPhantomReadAnomaly verifies that neither SI nor SSI isolation +// are subject to the phantom reads anomaly. This anomaly is prevented by // the SQL ANSI SERIALIZABLE isolation level, though it's also prevented // by snapshot isolation (i.e. Oracle's traditional "serializable"). // @@ -1031,11 +1134,11 @@ func TestTxnDBPhantomReadAnomaly(t *testing.T) { return nil }, } - checkConcurrency("phantom read", []string{txn1, txn2}, verify, t) + checkConcurrency("phantom read", serializableAndSnapshot, []string{txn1, txn2}, verify, t) } -// TestTxnDBPhantomDeleteAnomaly verifies that transactions are not -// subject to the phantom deletion anomaly; this is +// TestTxnDBPhantomDeleteAnomaly verifies that neither SI nor SSI +// isolation are subject to the phantom deletion anomaly; this is // similar to phantom reads, but verifies the delete range // functionality causes read/write conflicts. // @@ -1056,18 +1159,43 @@ func TestTxnDBPhantomDeleteAnomaly(t *testing.T) { return nil }, } - checkConcurrency("phantom delete", []string{txn1, txn2}, verify, t) + checkConcurrency("phantom delete", serializableAndSnapshot, []string{txn1, txn2}, verify, t) +} + +func runWriteSkewTest(t *testing.T, iso isolation.Level) { + checks := make(map[isolation.Level]func(map[string]int64) error) + checks[isolation.Serializable] = func(env map[string]int64) error { + if !((env["A"] == 1 && env["B"] == 2) || (env["A"] == 2 && env["B"] == 1)) { + return errors.Errorf("expected either A=1, B=2 -or- A=2, B=1, but have A=%d, B=%d", env["A"], env["B"]) + } + return nil + } + checks[isolation.Snapshot] = func(env map[string]int64) error { + // TODO(nvanbenschoten): re-enable this exception when we permit write skew + // under Snapshot isolation. + //if env["A"] == 1 && env["B"] == 1 { + // return nil + //} + return checks[isolation.Serializable](env) + } + + txn1 := "SC(A-C) W(A,A+B+1) C" + txn2 := "SC(A-C) W(B,A+B+1) C" + verify := &verifier{ + history: "R(A) R(B)", + checkFn: checks[iso], + } + checkConcurrency("write skew", []isolation.Level{iso}, []string{txn1, txn2}, verify, t) } -// TestTxnDBWriteSkewAnomaly verifies that transactions are not -// subject to the write skew anomaly. Write skew is only possible -// at the weaker, snapshot isolation level, which is no longer -// supported. +// TestTxnDBWriteSkewAnomaly verifies that SI suffers from the write +// skew anomaly but not SSI. The write skew anomaly is a condition which +// illustrates that snapshot isolation is not serializable in practice. // // With write skew, two transactions both read values from A and B // respectively, but each writes to either A or B only. Thus there are // no write/write conflicts but a cycle of dependencies which result in -// "skew". +// "skew". Only serializable isolation prevents this anomaly. // // Write skew would typically fail with a history such as: // @@ -1083,16 +1211,6 @@ func TestTxnDBPhantomDeleteAnomaly(t *testing.T) { func TestTxnDBWriteSkewAnomaly(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - txn1 := "SC(A-C) W(A,A+B+1) C" - txn2 := "SC(A-C) W(B,A+B+1) C" - verify := &verifier{ - history: "R(A) R(B)", - checkFn: func(env map[string]int64) error { - if !((env["A"] == 1 && env["B"] == 2) || (env["A"] == 2 && env["B"] == 1)) { - return errors.Errorf("expected either A=1, B=2 -or- A=2, B=1, but have A=%d, B=%d", env["A"], env["B"]) - } - return nil - }, - } - checkConcurrency("write skew", []string{txn1, txn2}, verify, t) + runWriteSkewTest(t, isolation.Serializable) + runWriteSkewTest(t, isolation.Snapshot) }