diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index d6a87f7e9ae0..32c645f7bff7 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -2184,10 +2184,14 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.DelRange(ctx, "a", "b", false /* returnKeys */) return err }, - // No retry, preemptive refresh before commit. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive refresh before commit. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. + isolation.Snapshot: {}, }, }, { @@ -2321,14 +2325,14 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.Put("a", "put") // advance timestamp return txn.CommitInBatch(ctx, b) }, - // Read-only request (Get) prevents server-side refresh. - // TODO(nvanbenschoten): This is written like this to exercise the - // perIsoLevel mechanism. perIsoLevel: map[isolation.Level]*expect{ + // Read-only request (Get) prevents server-side refresh. isolation.Serializable: { expClientRefreshSuccess: true, expClientAutoRetryAfterRefresh: true, }, + // No refresh, no retry. + isolation.Snapshot: {}, }, }, { @@ -2343,10 +2347,14 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.Put("a", "put") // advance timestamp return txn.CommitInBatch(ctx, b) }, - // Read-only request (Scan) prevents server-side refresh. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + perIsoLevel: map[isolation.Level]*expect{ + // Read-only request (Scan) prevents server-side refresh. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // No refresh, no retry. + isolation.Snapshot: {}, }, }, { @@ -2454,10 +2462,14 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return txn.CommitInBatch(ctx, b) }, - // No retry, preemptive refresh before commit. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive refresh before commit. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. + isolation.Snapshot: {}, }, }, { @@ -3111,11 +3123,16 @@ func TestTxnCoordSenderRetries(t *testing.T) { b := txn.NewBatch() b.CPut("a", "cput", kvclientutils.StrToCPutExistingValue("value")) b.Put("c", "put") - return txn.CommitInBatch(ctx, b) // both puts will succeed, et will retry from get + return txn.CommitInBatch(ctx, b) }, - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + perIsoLevel: map[isolation.Level]*expect{ + // Both writes will succeed, EndTxn will retry. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // No refresh, no retry. + isolation.Snapshot: {}, }, }, { @@ -3131,11 +3148,16 @@ func TestTxnCoordSenderRetries(t *testing.T) { b := txn.NewBatch() b.DelRange("a", "b", false /* returnKeys */) b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value")) - return txn.CommitInBatch(ctx, b) // both puts will succeed, et will retry + return txn.CommitInBatch(ctx, b) }, - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + perIsoLevel: map[isolation.Level]*expect{ + // Both writes will succeed, EndTxn will retry. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // No refresh, no retry. + isolation.Snapshot: {}, }, }, { @@ -3634,8 +3656,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - // TODO(nvanbenschoten): test Snapshot isolation. - testLevels := []isolation.Level{isolation.Serializable} + testLevels := []isolation.Level{isolation.Serializable, isolation.Snapshot} for _, iso := range testLevels { t.Run(iso.String(), func(t *testing.T) { run(t, tc, iso) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index 2367dec634b3..00aa4b72bf7e 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -1091,6 +1091,24 @@ func (tc *TxnCoordSender) CommitTimestamp() hlc.Timestamp { tc.mu.Lock() defer tc.mu.Unlock() txn := &tc.mu.txn + if txn.Status == roachpb.COMMITTED { + return txn.ReadTimestamp + } + // If the transaction is not yet committed, configure the CommitTimestampFixed + // flag to ensure that the transaction's commit timestamp is not pushed before + // it commits. + // + // This operates by disabling the transaction refresh mechanism. For isolation + // levels that can tolerate write skew, this is not enough to prevent the + // transaction from committing with a later timestamp. In fact, it's not even + // clear what timestamp to consider the "commit timestamp" for these + // transactions. For this reason, we currently disable the CommitTimestamp + // method for these isolation levels. + // TODO(nvanbenschoten): figure out something better to do here. At least + // return an error. Tracked in #103245. + if txn.IsoLevel.ToleratesWriteSkew() { + panic("unsupported") + } tc.mu.txn.CommitTimestampFixed = true return txn.ReadTimestamp } diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 0ded319bb857..68e4a1cd4675 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -149,7 +149,7 @@ func TestTxnCoordSenderKeyRanges(t *testing.T) { } } -// Test that the theartbeat loop detects aborted transactions and stops. +// Test that the heartbeat loop detects aborted transactions and stops. func TestTxnCoordSenderHeartbeat(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/kv/kvclient/kvcoord/txn_correctness_test.go b/pkg/kv/kvclient/kvcoord/txn_correctness_test.go index e03d767720e4..5a8abb96cf15 100644 --- a/pkg/kv/kvclient/kvcoord/txn_correctness_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_correctness_test.go @@ -1284,11 +1284,9 @@ func runWriteSkewTest(t *testing.T, iso isolation.Level) { return nil } checks[isolation.Snapshot] = func(env map[string]int64) error { - // TODO(nvanbenschoten): re-enable this exception when we permit write skew - // under Snapshot isolation. - //if env["A"] == 1 && env["B"] == 1 { - // return nil - //} + if env["A"] == 1 && env["B"] == 1 { + return nil + } return checks[isolation.Serializable](env) } diff --git a/pkg/kv/kvclient/kvcoord/txn_test.go b/pkg/kv/kvclient/kvcoord/txn_test.go index 3032b67b37aa..083076d55353 100644 --- a/pkg/kv/kvclient/kvcoord/txn_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_test.go @@ -129,7 +129,7 @@ func BenchmarkSingleRoundtripWithLatency(b *testing.B) { } } -// TestLostIncrement verifies that Increment with any isolation level is not +// TestTxnLostIncrement verifies that Increment with any isolation level is not // susceptible to the lost update anomaly between the value that the increment // reads and the value that it writes. In other words, the increment is atomic, // regardless of isolation level. @@ -143,7 +143,7 @@ func BenchmarkSingleRoundtripWithLatency(b *testing.B) { // increment. Demonstrate that doing so allows for increment to applied to a // newer value than that returned by the get, but that the increment is still // atomic. -func TestLostIncrement(t *testing.T) { +func TestTxnLostIncrement(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -214,7 +214,7 @@ func TestLostIncrement(t *testing.T) { } } -// TestLostUpdate verifies that transactions are not susceptible to the +// TestTxnLostUpdate verifies that transactions are not susceptible to the // lost update anomaly, regardless of isolation level. // // The transaction history looks as follows: @@ -224,7 +224,7 @@ func TestLostIncrement(t *testing.T) { // TODO(nvanbenschoten): once we address #100133, update this test to advance // the read snapshot for ReadCommitted transactions between the read and the // write. Demonstrate that doing so allows for a lost update. -func TestLostUpdate(t *testing.T) { +func TestTxnLostUpdate(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -303,6 +303,66 @@ func TestLostUpdate(t *testing.T) { } } +// TestTxnWeakIsolationLevelsTolerateWriteSkew verifies that transactions run +// under weak isolation levels (snapshot and read committed) can tolerate their +// write timestamp being skewed from their read timestamp, while transaction run +// under strong isolation levels (serializable) cannot. +func TestTxnWeakIsolationLevelsTolerateWriteSkew(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + run := func(isoLevel isolation.Level) { + s := createTestDB(t) + defer s.Stop() + ctx := context.Background() + + // Begin the test's transaction. + txn1 := s.DB.NewTxn(ctx, "txn1") + require.NoError(t, txn1.SetIsoLevel(isoLevel)) + + // Read from key "a" in txn1 and then write to key "a" in txn2. This + // establishes an anti-dependency from txn1 to txn2, meaning that txn1's + // read snapshot must be ordered before txn2's commit. In practice, this + // prevents txn1 from refreshing. + { + res, err := txn1.Get(ctx, "a") + require.NoError(t, err) + require.False(t, res.Exists()) + + txn2 := s.DB.NewTxn(ctx, "txn2") + require.NoError(t, txn2.Put(ctx, "a", "value")) + require.NoError(t, txn2.Commit(ctx)) + } + + // Now read from key "b" in a txn3 before writing to key "b" in txn1. This + // establishes an anti-dependency from txn3 to txn1, meaning that txn3's + // read snapshot must be ordered before txn1's commit. In practice, this + // pushes txn1's write timestamp forward through the timestamp cache. + { + txn3 := s.DB.NewTxn(ctx, "txn3") + res, err := txn3.Get(ctx, "b") + require.NoError(t, err) + require.False(t, res.Exists()) + require.NoError(t, txn3.Commit(ctx)) + + require.NoError(t, txn1.Put(ctx, "b", "value")) + } + + // Finally, try to commit. This should succeed for isolation levels that + // allow for write skew. It should fail for isolation levels that do not. + err := txn1.Commit(ctx) + if isoLevel.ToleratesWriteSkew() { + require.NoError(t, err) + } else { + require.Error(t, err) + require.IsType(t, &kvpb.TransactionRetryWithProtoRefreshError{}, err) + } + } + for _, isoLevel := range isolation.Levels() { + t.Run(isoLevel.String(), func(t *testing.T) { run(isoLevel) }) + } +} + // TestPriorityRatchetOnAbortOrPush verifies that the priority of // a transaction is ratcheted by successive aborts or pushes. In // particular, we want to ensure ratcheted priorities when the txn diff --git a/pkg/kv/kvnemesis/validator.go b/pkg/kv/kvnemesis/validator.go index 631753f78570..85230ba15e22 100644 --- a/pkg/kv/kvnemesis/validator.go +++ b/pkg/kv/kvnemesis/validator.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvnemesis/kvnemesisutil" kvpb "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/isolation" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" @@ -710,7 +711,15 @@ func (v *validator) processOp(op Operation) { for _, op := range ops { v.processOp(op) } + prevFailures := v.failures + // TODO(nvanbenschoten): add isolation level to the atomicType string: + // atomicTxnType := fmt.Sprintf(`%s txn`, strings.ToLower(t.IsoLevel.String())) v.checkAtomic(`txn`, t.Result) + if t.IsoLevel == isolation.Snapshot { + // TODO(nvanbenschoten): for now, we run snapshot transactions in the mix + // but don't validate their results. Doing so is non-trivial. See #100169. + v.failures = prevFailures + } case *SplitOperation: execTimestampStrictlyOptional = true v.failIfError(op, t.Result) // splits should never return *any* error diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go index 9d7d7078f9c6..bbffc398b4da 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction.go @@ -502,7 +502,7 @@ func IsEndTxnTriggeringRetryError( // update anomalies. return true, kvpb.RETRY_WRITE_TOO_OLD, "" } - if txn.WriteTimestamp != txn.ReadTimestamp { + if !txn.IsoLevel.ToleratesWriteSkew() && txn.WriteTimestamp != txn.ReadTimestamp { // Return a transaction retry error if the commit timestamp isn't equal to // the txn timestamp. return true, kvpb.RETRY_SERIALIZABLE, "" diff --git a/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go b/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go index d748a4550a56..d77670954cea 100644 --- a/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go +++ b/pkg/kv/kvserver/batcheval/cmd_end_transaction_test.go @@ -79,16 +79,16 @@ func TestIsEndTxnTriggeringRetryError(t *testing.T) { {isolation.Serializable, true, true, true, true, kvpb.RETRY_WRITE_TOO_OLD}, {isolation.Snapshot, false, false, false, false, 0}, {isolation.Snapshot, false, false, true, true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED}, - {isolation.Snapshot, false, true, false, true, kvpb.RETRY_SERIALIZABLE}, - {isolation.Snapshot, false, true, true, true, kvpb.RETRY_SERIALIZABLE}, + {isolation.Snapshot, false, true, false, false, 0}, + {isolation.Snapshot, false, true, true, true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED}, {isolation.Snapshot, true, false, false, true, kvpb.RETRY_WRITE_TOO_OLD}, {isolation.Snapshot, true, false, true, true, kvpb.RETRY_WRITE_TOO_OLD}, {isolation.Snapshot, true, true, false, true, kvpb.RETRY_WRITE_TOO_OLD}, {isolation.Snapshot, true, true, true, true, kvpb.RETRY_WRITE_TOO_OLD}, {isolation.ReadCommitted, false, false, false, false, 0}, {isolation.ReadCommitted, false, false, true, true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED}, - {isolation.ReadCommitted, false, true, false, true, kvpb.RETRY_SERIALIZABLE}, - {isolation.ReadCommitted, false, true, true, true, kvpb.RETRY_SERIALIZABLE}, + {isolation.ReadCommitted, false, true, false, false, 0}, + {isolation.ReadCommitted, false, true, true, true, kvpb.RETRY_COMMIT_DEADLINE_EXCEEDED}, {isolation.ReadCommitted, true, false, false, true, kvpb.RETRY_WRITE_TOO_OLD}, {isolation.ReadCommitted, true, false, true, true, kvpb.RETRY_WRITE_TOO_OLD}, {isolation.ReadCommitted, true, true, false, true, kvpb.RETRY_WRITE_TOO_OLD}, diff --git a/pkg/kv/kvserver/replica_write.go b/pkg/kv/kvserver/replica_write.go index ace73130dfa3..c5157b437bd3 100644 --- a/pkg/kv/kvserver/replica_write.go +++ b/pkg/kv/kvserver/replica_write.go @@ -353,9 +353,13 @@ func (r *Replica) canAttempt1PCEvaluation( return false } - if ba.Timestamp != ba.Txn.WriteTimestamp { - log.Fatalf(ctx, "unexpected 1PC execution with diverged timestamp. %s != %s", - ba.Timestamp, ba.Txn.WriteTimestamp) + // isOnePhaseCommit ensured that the transaction has a non-skewed read/write + // timestamp, even for isolation levels that can commit with such skew. Sanity + // check that this timestamp is equal to the batch timestamp. + if ba.Timestamp != ba.Txn.ReadTimestamp || ba.Timestamp != ba.Txn.WriteTimestamp { + log.Fatalf(ctx, "unexpected 1PC execution with diverged read or write timestamps; "+ + "ba.Timestamp: %s, ba.Txn.ReadTimestamp: %s, ba.Txn.WriteTimestamp: %s", + ba.Timestamp, ba.Txn.ReadTimestamp, ba.Txn.WriteTimestamp) } // The EndTxn checks whether the txn record can be created and, if so, at what @@ -399,9 +403,10 @@ func (r *Replica) evaluateWriteBatch( ) (storage.Batch, enginepb.MVCCStats, *kvpb.BatchResponse, result.Result, *kvpb.Error) { log.Event(ctx, "executing read-write batch") - // If the transaction has been pushed but it can commit at the higher + // If the transaction has been pushed but it can be forwarded to the higher // timestamp, let's evaluate the batch at the bumped timestamp. This will - // allow it commit, and also it'll allow us to attempt the 1PC code path. + // allow serializable transactions to commit. It will also allow transactions + // with any isolation level to attempt the 1PC code path. maybeBumpReadTimestampToWriteTimestamp(ctx, ba, g) // Attempt 1PC execution, if applicable. If not transactional or there are @@ -773,16 +778,30 @@ func (r *Replica) newBatchedEngine( // isOnePhaseCommit returns true iff the BatchRequest contains all writes in the // transaction and ends with an EndTxn. One phase commits are disallowed if any // of the following conditions are true: -// (1) the transaction has already been flagged with a write too old error -// (2) the transaction's commit timestamp has been forwarded -// (3) the transaction exceeded its deadline -// (4) the transaction is not in its first epoch and the EndTxn request does -// -// not require one phase commit. +// 1. the transaction's commit timestamp has been forwarded. Note that this +// prevents one phase commit even for isolation levels that can otherwise +// tolerate write skew. +// 2. the transaction is failing a commit condition and must retry. This +// condition is isolation level dependent. +// 3. the transaction is not in its first epoch and the EndTxn request does +// not require one phase commit. func isOnePhaseCommit(ba *kvpb.BatchRequest) bool { if ba.Txn == nil { return false } + if ba.Txn.ReadTimestamp != ba.Txn.WriteTimestamp { + // If the transaction's read and write timestamp are skewed, one phase + // commit is not allowed. This is true even for isolation levels that can + // otherwise tolerate write skew. This is because the one phase commit + // evaluation logic operates using a non-transactional batch which does not + // know how to evaluate with reads and writes at different timestamps. Even + // for write-only batches, the non-transactional path would be unable to + // detect write-write version conflicts between the transaction's read and + // write timestamps. + // + // NOTE: ba.Timestamp == ba.Txn.ReadTimestamp + return false + } if !ba.IsCompleteTransaction() { return false } diff --git a/pkg/sql/internal_test.go b/pkg/sql/internal_test.go index 3912ab4aef4e..f6f1d8502237 100644 --- a/pkg/sql/internal_test.go +++ b/pkg/sql/internal_test.go @@ -528,10 +528,8 @@ func TestInternalExecutorPushDetectionInTxn(t *testing.T) { refreshable bool exp bool }{ - {serializable: false, pushed: false, refreshable: false, exp: false}, - {serializable: false, pushed: false, refreshable: true, exp: false}, - {serializable: false, pushed: true, refreshable: false, exp: false}, - {serializable: false, pushed: true, refreshable: true, exp: false}, + {serializable: false, pushed: false, exp: false}, + {serializable: false, pushed: true, exp: false}, {serializable: true, pushed: false, refreshable: false, exp: false}, {serializable: true, pushed: false, refreshable: true, exp: false}, {serializable: true, pushed: true, refreshable: false, exp: true}, @@ -561,7 +559,7 @@ func TestInternalExecutorPushDetectionInTxn(t *testing.T) { require.NoError(t, txn.Put(ctx, keyA, "x")) require.NotEqual(t, txn.ReadTimestamp(), txn.ProvisionalCommitTimestamp(), "expect txn wts to be pushed") } - if !tt.refreshable { + if tt.serializable && !tt.refreshable { // Fix the txn's timestamp to prevent refreshes. txn.CommitTimestamp() }