diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 32c645f7bff7..bdc23647bd64 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -2087,10 +2087,19 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.Put(ctx, "a", "put") // put to advance txn ts }, - // No retry, preemptive refresh before commit. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive refresh before commit. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive refresh before commit. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before commit. + isolation.ReadCommitted: {}, }, }, { @@ -2117,10 +2126,19 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.InitPut(ctx, "a", "put", false /* failOnTombstones */) // put to advance txn ts }, - // No retry, preemptive (no-op) refresh before commit. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive (no-op) refresh before commit. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive (no-op) refresh before commit. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before commit. + isolation.ReadCommitted: {}, }, }, { @@ -2135,10 +2153,19 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("put")) // cput to advance txn ts, set update span }, - // No retry, preemptive (no-op) refresh before commit. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive (no-op) refresh before commit. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive (no-op) refresh before commit. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before commit. + isolation.ReadCommitted: {}, }, }, { @@ -2168,10 +2195,19 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "ab", "cput", nil) // cput advances, sets update span }, - // No retry, preemptive (no-op) refresh before commit. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive (no-op) refresh before commit. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive (no-op) refresh before commit. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before commit. + isolation.ReadCommitted: {}, }, }, { @@ -2191,7 +2227,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { expClientAutoRetryAfterRefresh: false, }, // No refresh, no retry. - isolation.Snapshot: {}, + isolation.Snapshot: {}, + isolation.ReadCommitted: {}, }, }, { @@ -2242,13 +2279,25 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "a", "put"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } _, err := txn.Get(ctx, "a2") return err }, - // No retry, preemptive refresh before get. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive refresh before get. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive refresh before get. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before get. + isolation.ReadCommitted: {}, }, }, { @@ -2262,13 +2311,25 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "a", "put"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } _, err := txn.Scan(ctx, "a2", "a3", 0) return err }, - // No retry, preemptive refresh before scan. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive refresh before scan. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive refresh before scan. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before scan. + isolation.ReadCommitted: {}, }, }, { @@ -2282,14 +2343,26 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "a", "put"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } b := txn.NewBatch() b.Get("a2") return txn.CommitInBatch(ctx, b) }, - // No retry, preemptive refresh before commit. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive refresh before commit. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive refresh before commit. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before commit. + isolation.ReadCommitted: {}, }, }, { @@ -2303,14 +2376,26 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "a", "put"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } b := txn.NewBatch() b.Scan("a2", "a3") return txn.CommitInBatch(ctx, b) }, - // No retry, preemptive refresh before commit. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive refresh before commit. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive refresh before commit. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before commit. + isolation.ReadCommitted: {}, }, }, { @@ -2332,7 +2417,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { expClientAutoRetryAfterRefresh: true, }, // No refresh, no retry. - isolation.Snapshot: {}, + isolation.Snapshot: {}, + isolation.ReadCommitted: {}, }, }, { @@ -2354,7 +2440,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { expClientAutoRetryAfterRefresh: true, }, // No refresh, no retry. - isolation.Snapshot: {}, + isolation.Snapshot: {}, + isolation.ReadCommitted: {}, }, }, { @@ -2407,6 +2494,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "a", "put"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } // Make the final batch large enough such that if we accounted // for all of its spans then we would exceed the limit on // refresh spans. This is not an issue because we never need to @@ -2422,10 +2512,19 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return txn.CommitInBatch(ctx, b) }, - // No retry, preemptive refresh before commit. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive refresh before commit. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive refresh before commit. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before commit. + isolation.ReadCommitted: {}, }, }, { @@ -2447,6 +2546,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { if _, err := txn.DelRange(ctx, "a", "b", false /* returnKeys */); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } // Make the final batch large enough such that if we accounted // for all of its spans then we would exceed the limit on // refresh spans. This is not an issue because we never need to @@ -2469,7 +2571,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { expClientAutoRetryAfterRefresh: false, }, // No refresh, no retry. - isolation.Snapshot: {}, + isolation.Snapshot: {}, + isolation.ReadCommitted: {}, }, }, { @@ -2493,9 +2596,22 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.Put(ctx, "a", "put") }, priorReads: true, - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + perIsoLevel: map[isolation.Level]*expect{ + // Client-side refresh of prior reads after write-write conflict. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Client-side refresh of prior reads after write-write conflict. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Server-side refresh after write-write conflict. Prior reads performed + // in earlier batches (from earlier read snapshots) are not refreshed. + isolation.ReadCommitted: { + expServerRefresh: true, + }, }, }, { @@ -2543,20 +2659,35 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "a", "put"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } // Get from "b" to establish a read span. It is important that we // perform a preemptive refresh before this read, otherwise the refresh // would fail. if _, err := txn.Get(ctx, "b"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } // Now, Put to "b", which would have thrown a write-too-old error had // the transaction not preemptively refreshed before the Get. return txn.Put(ctx, "b", "put") }, - // No retry, preemptive refresh before Get. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive refresh before Get. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive refresh before Get. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before Get. + isolation.ReadCommitted: {}, }, }, { @@ -2566,6 +2697,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, retryable: func(ctx context.Context, txn *kv.Txn) error { // Get so we must refresh when txn timestamp moves forward. + // Note that we don't step the transaction between this read and the + // subsequent write, so the write-write conflict causes a client-side + // refresh even under Read Committed. if _, err := txn.Get(ctx, "a"); err != nil { return err } @@ -2578,6 +2712,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "a", "txn-value1"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } // Write again to make sure the timestamp of the second intent // is correctly set to the txn's advanced timestamp. There was // previously a bug where the txn's original timestamp would be used @@ -2650,9 +2787,22 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value")) }, priorReads: true, - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + perIsoLevel: map[isolation.Level]*expect{ + // Client-side refresh of prior reads after write-write conflict. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Client-side refresh of prior reads after write-write conflict. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Server-side refresh after write-write conflict. Prior reads performed + // in earlier batches (from earlier read snapshots) are not refreshed. + isolation.ReadCommitted: { + expServerRefresh: true, + }, }, }, { @@ -2700,9 +2850,22 @@ func TestTxnCoordSenderRetries(t *testing.T) { return nil }, priorReads: true, - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + perIsoLevel: map[isolation.Level]*expect{ + // Client-side refresh of prior reads after write-write conflict. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Client-side refresh of prior reads after write-write conflict. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Server-side refresh after write-write conflict. Prior reads performed + // in earlier batches (from earlier read snapshots) are not refreshed. + isolation.ReadCommitted: { + expServerRefresh: true, + }, }, }, { @@ -2726,10 +2889,22 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.InitPut(ctx, "iput", "put", false) }, priorReads: true, - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, // fails on first attempt at cput with write too old - // Succeeds on second attempt. + perIsoLevel: map[isolation.Level]*expect{ + // Client-side refresh of prior reads after write-write conflict. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Client-side refresh of prior reads after write-write conflict. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Server-side refresh after write-write conflict. Prior reads performed + // in earlier batches (from earlier read snapshots) are not refreshed. + isolation.ReadCommitted: { + expServerRefresh: true, + }, }, }, { @@ -2759,10 +2934,22 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.InitPut(ctx, "iput", "put", false) }, priorReads: true, - // Expect a transaction coord retry, which should succeed. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + perIsoLevel: map[isolation.Level]*expect{ + // Client-side refresh of prior reads after write-write conflict. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Client-side refresh of prior reads after write-write conflict. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Server-side refresh after write-write conflict. Prior reads performed + // in earlier batches (from earlier read snapshots) are not refreshed. + isolation.ReadCommitted: { + expServerRefresh: true, + }, }, }, { @@ -2857,10 +3044,25 @@ func TestTxnCoordSenderRetries(t *testing.T) { return err }, priorReads: true, - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, - expOnePhaseCommit: true, + perIsoLevel: map[isolation.Level]*expect{ + // Client-side refresh of prior reads after write-write conflict. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + expOnePhaseCommit: true, + }, + // Client-side refresh of prior reads after write-write conflict. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + expOnePhaseCommit: true, + }, + // Server-side refresh after write-write conflict. Prior reads performed + // in earlier batches (from earlier read snapshots) are not refreshed. + isolation.ReadCommitted: { + expServerRefresh: true, + expOnePhaseCommit: true, + }, }, }, { @@ -2923,6 +3125,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { if _, err := txn.Get(ctx, "c"); err != nil { return err } + // NOTE: don't Step to preserve write-write conflict under Read Committed. _, err := txn.DelRange(ctx, "a", "b", false /* returnKeys */) return err }, @@ -2944,6 +3147,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { if _, err := txn.Get(ctx, "a"); err != nil { return err } + // NOTE: don't Step to preserve write-write conflict under Read Committed. _, err := txn.DelRange(ctx, "a", "b", false /* returnKeys */) return err }, @@ -2989,6 +3193,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "another", "another put"); err != nil { return err } + // NOTE: don't Step to preserve write-write conflict under Read Committed. b := txn.NewBatch() b.Put("a", "final value") return txn.CommitInBatch(ctx, b) @@ -3132,7 +3337,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { expClientAutoRetryAfterRefresh: true, }, // No refresh, no retry. - isolation.Snapshot: {}, + isolation.Snapshot: {}, + isolation.ReadCommitted: {}, }, }, { @@ -3157,7 +3363,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { expClientAutoRetryAfterRefresh: true, }, // No refresh, no retry. - isolation.Snapshot: {}, + isolation.Snapshot: {}, + isolation.ReadCommitted: {}, }, }, { @@ -3216,13 +3423,36 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, priorReads: true, - // The Put to "a" will fail, failing the parallel commit with an error and - // forcing a client-side refresh and auto-retry of the full batch. - allIsoLevels: &expect{ - expServerRefresh: false, - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, - expParallelCommitAutoRetry: false, + perIsoLevel: map[isolation.Level]*expect{ + // The Put to "a" will fail, failing the parallel commit with an error and + // forcing a client-side refresh and auto-retry of the full batch. + isolation.Serializable: { + expServerRefresh: false, + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + expParallelCommitAutoRetry: false, + }, + // The Put to "a" will fail, failing the parallel commit with an error and + // forcing a client-side refresh and auto-retry of the full batch. + isolation.Snapshot: { + expServerRefresh: false, + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + expParallelCommitAutoRetry: false, + }, + // The Put to "a" and EndTxn will succeed after a server-side refresh. + // This will instruct the txn to stage at the post-refresh timestamp, + // qualifying for the implicit commit condition and avoiding a client-side + // refresh. + // + // Prior reads performed in earlier batches (from earlier read snapshots) + // are not refreshed. + isolation.ReadCommitted: { + expServerRefresh: true, + expClientRefreshSuccess: false, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: false, + }, }, }, { @@ -3237,13 +3467,36 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, priorReads: true, - // The Put to "c" will fail, failing the parallel commit with an error and - // forcing a client-side refresh and auto-retry of the full batch. - allIsoLevels: &expect{ - expServerRefresh: false, - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, - expParallelCommitAutoRetry: false, + perIsoLevel: map[isolation.Level]*expect{ + // The Put to "c" will fail, failing the parallel commit with an error and + // forcing a client-side refresh and auto-retry of the full batch. + isolation.Serializable: { + expServerRefresh: false, + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + expParallelCommitAutoRetry: false, + }, + // The Put to "c" will fail, failing the parallel commit with an error and + // forcing a client-side refresh and auto-retry of the full batch. + isolation.Snapshot: { + expServerRefresh: false, + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + expParallelCommitAutoRetry: false, + }, + // The Put to "c" will succeed after a server-side refresh. However, the + // txn has already staged on the other range at the pre-refresh timestamp. + // As a result, it does not qualify for the implicit commit condition and + // requires a parallel commit auto-retry. + // + // Prior reads performed in earlier batches (from earlier read snapshots) + // are not refreshed. + isolation.ReadCommitted: { + expServerRefresh: true, + expClientRefreshSuccess: false, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, + }, }, }, { @@ -3319,6 +3572,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { } else if !bytes.Equal(b, []byte("newval")) { return fmt.Errorf("expected \"newval\", got: %v", b) } + if err := txn.Step(ctx); err != nil { + return err + } return txn.Commit(ctx) }, allIsoLevels: &expect{ @@ -3413,6 +3669,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "a", "put"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } b := txn.NewBatch() b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value")) return txn.CommitInBatch(ctx, b) @@ -3614,6 +3873,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { return err } + // Configure stepping to make the Read Committed tests more interesting. + // If we didn't do this, the transaction would never have its timestamp + // pushed, because it would always capture a new read snapshot after the + // afterTxnStart functions run. + txn.ConfigureStepping(ctx, kv.SteppingEnabled) + if tc.tsLeaked { if iso != isolation.Serializable { skip.IgnoreLint(t, "fixed commit timestamp unsupported") @@ -3626,6 +3891,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { if tc.priorReads { _, err := txn.Get(ctx, "prior read") require.NoError(t, err, "prior read") + require.NoError(t, txn.Step(ctx)) } if tc.afterTxnStart != nil { @@ -3633,7 +3899,16 @@ func TestTxnCoordSenderRetries(t *testing.T) { require.NoError(t, err, "afterTxnStart") } - return tc.retryable(ctx, txn) + if err := tc.retryable(ctx, txn); err != nil { + return err + } + + // If the transaction is still open, step one more time before the commit. + if txn.IsOpen() { + require.NoError(t, txn.Step(ctx)) + } + + return nil }) // Verify success or failure. @@ -3656,10 +3931,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - testLevels := []isolation.Level{isolation.Serializable, isolation.Snapshot} - for _, iso := range testLevels { - t.Run(iso.String(), func(t *testing.T) { - run(t, tc, iso) + for _, isoLevel := range isolation.Levels() { + t.Run(isoLevel.String(), func(t *testing.T) { + run(t, tc, isoLevel) }) } }) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index c711a0315a9c..bb654ca077ac 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -521,6 +521,10 @@ func (tc *TxnCoordSender) Send( return nil, nil } + if tc.mu.txn.IsoLevel.PerStatementReadSnapshot() { + tc.maybeAutoStepReadTimestampLocked() + } + // Clone the Txn's Proto so that future modifications can be made without // worrying about synchronization. ba.Txn = tc.mu.txn.Clone() @@ -1353,8 +1357,16 @@ func (tc *TxnCoordSender) PrepareRetryableError( // Step is part of the TxnSender interface. func (tc *TxnCoordSender) Step(ctx context.Context) error { + // TODO(nvanbenschoten): it should be possible to make this assertion, but + // the API is currently misused by the connExecutor. See #86162. + //if tc.typ != kv.RootTxn { + // return errors.AssertionFailedf("cannot step in non-root txn") + //} tc.mu.Lock() defer tc.mu.Unlock() + if tc.mu.txn.IsoLevel.PerStatementReadSnapshot() { + tc.manualStepReadTimestampLocked() + } return tc.interceptorAlloc.txnSeqNumAllocator.manualStepReadSeqLocked(ctx) } @@ -1391,6 +1403,83 @@ func (tc *TxnCoordSender) GetSteppingMode(ctx context.Context) (curMode kv.Stepp return tc.interceptorAlloc.txnSeqNumAllocator.steppingMode } +// manualStepReadTimestampLocked advances the transaction's read timestamp to a +// timestamp taken from the local clock and resets refresh span tracking. +// +//gcassert:inline +func (tc *TxnCoordSender) manualStepReadTimestampLocked() { + tc.stepReadTimestampLocked() +} + +// maybeAutoStepReadTimestampLocked advances the transaction's read timestamp to +// a timestamp taken from the local clock and resets refresh span tracking, if +// manual stepping is disabled and the transaction is expected to automatically +// capture a new read snapshot on each batch. +// +//gcassert:inline +func (tc *TxnCoordSender) maybeAutoStepReadTimestampLocked() { + if tc.typ != kv.RootTxn { + return // only root transactions auto-step + } + if tc.interceptorAlloc.txnSeqNumAllocator.steppingMode == kv.SteppingEnabled { + return // only manual stepping allowed + } + tc.stepReadTimestampLocked() +} + +// stepReadTimestampLocked advances the transaction's read timestamp to a +// timestamp taken from the local clock and resets refresh span tracking. +// +// Doing so establishes a new external "read snapshot" from which all future +// reads in the transaction will operate. This read snapshot will be at least as +// recent as the previous read snapshot, and will typically be more recent (i.e. +// it will never regress). Consistency with prior reads in the transaction is +// not maintained, so reads of previously read keys may not be "repeatable" and +// may observe "phantoms". On the other hand, by not maintaining consistency +// between read snapshots, isolation-related retries (write-write conflicts) and +// consistency-related retries (uncertainty errors) have a higher chance of +// being refreshed away (client-side or server-side) without need for client +// intervention (i.e. without requiring a statement-level retry). +// +// Note that the transaction's uncertainty interval is not reset by this method +// to now()+max_offset, even though doing so would be necessary to strictly +// guarantee real-time ordering between the commit of a writer transaction and +// the subsequent establishment of a new read snapshot in a reader transaction. +// By not resetting the uncertainty interval, we allow for the possibility that +// a reader transaction may establish a new read snapshot after the writer has +// committed (on a node with a fast clock) and yet not observe that writer's +// writes. +// +// This decision not to reset the transaction's uncertainty interval is +// discussed in the Read Committed RFC (section "Read Uncertainty Intervals"): +// +// > Read Committed transactions have the option to provide the same "no stale +// > reads" guarantee at the level of each individual statement. Doing so would +// > require transactions to reset their `GlobalUncertaintyLimit` and +// > `ObservedTimestamps` on each statement boundary, setting their +// > `GlobalUncertaintyLimit` to `hlc.Now() + hlc.MaxOffset()` and clearing all +// > `ObservedTimestamps`. +// > +// > We propose that Read Committed transactions do not do this. The cost of +// > resetting a transaction's uncertainty interval on each statement boundary is +// > likely greater than the benefit. Doing so increases the chance that +// > individual statements retry due to `ReadWithinUncertaintyInterval` errors. In +// > the worst case, each statement will need to traverse (through retries) an +// > entire uncertainty interval before converging to a "certain" read snapshot. +// > While these retries will be scoped to a single statement and should not +// > escape to the client, they do still have a latency cost. +// > +// > We make this decision because we do not expect that applications rely on +// > strong consistency guarantees between the commit of one transaction and the +// > start of an individual statement within another in-progress transaction. To +// > rely on such guarantees would require complex and surprising application-side +// > synchronization. +func (tc *TxnCoordSender) stepReadTimestampLocked() { + now := tc.clock.Now() + tc.mu.txn.BumpReadTimestamp(now) + tc.interceptorAlloc.txnSpanRefresher.resetRefreshSpansLocked() +} + // DeferCommitWait is part of the TxnSender interface. func (tc *TxnCoordSender) DeferCommitWait(ctx context.Context) func(context.Context) error { tc.mu.Lock() diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index 2d655c9c8147..64e19117c1b3 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -629,6 +629,16 @@ func (sr *txnSpanRefresher) appendRefreshSpans( }) } +// resetRefreshSpansLocked clears the txnSpanRefresher's refresh span set and +// marks the empty set as valid. This is used when a transaction is establishing +// a new read snapshot and no longer needs to maintain consistency with previous +// reads. +func (sr *txnSpanRefresher) resetRefreshSpansLocked() { + sr.refreshFootprint.clear() + sr.refreshInvalid = false + sr.refreshedTimestamp.Reset() +} + // canForwardReadTimestampWithoutRefresh returns whether the transaction can // forward its read timestamp after refreshing all the reads that has performed // to this point. This requires that the transaction's timestamp has not leaked. @@ -758,13 +768,14 @@ func (sr *txnSpanRefresher) importLeafFinalState( // epochBumpedLocked implements the txnInterceptor interface. func (sr *txnSpanRefresher) epochBumpedLocked() { - sr.refreshFootprint.clear() - sr.refreshInvalid = false - sr.refreshedTimestamp.Reset() + sr.resetRefreshSpansLocked() } // createSavepointLocked is part of the txnInterceptor interface. func (sr *txnSpanRefresher) createSavepointLocked(ctx context.Context, s *savepoint) { + // TODO(nvanbenschoten): make sure this works correctly with ReadCommitted. + // The refresh spans should either be empty when captured into a savepoint or + // should be cleared when the savepoint is rolled back to. s.refreshSpans = make([]roachpb.Span, len(sr.refreshFootprint.asSlice())) copy(s.refreshSpans, sr.refreshFootprint.asSlice()) s.refreshInvalid = sr.refreshInvalid diff --git a/pkg/kv/kvclient/kvcoord/txn_test.go b/pkg/kv/kvclient/kvcoord/txn_test.go index 2190ca84b7b4..7724b1f5d307 100644 --- a/pkg/kv/kvclient/kvcoord/txn_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_test.go @@ -134,15 +134,13 @@ func BenchmarkSingleRoundtripWithLatency(b *testing.B) { // reads and the value that it writes. In other words, the increment is atomic, // regardless of isolation level. // -// The transaction history looks as follows: +// The transaction history looks as follows for Snapshot and Serializable: // -// R1(A) W2(A,+1) W1(A,+1) [write-write restart] R1(A) W1(A,+1) C1 +// R1(A) W2(A,+1) C2 W1(A,+1) [write-write restart] R1(A) W1(A,+1) C1 // -// TODO(nvanbenschoten): once we address #100133, update this test to advance -// the read snapshot for ReadCommitted transactions between the read and the -// increment. Demonstrate that doing so allows for increment to applied to a -// newer value than that returned by the get, but that the increment is still -// atomic. +// The transaction history looks as follows for Read Committed: +// +// R1(A) W2(A,+1) C2 W1(A,+1) C1 func TestTxnLostIncrement(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -190,14 +188,19 @@ func TestTxnLostIncrement(t *testing.T) { ir := b.Results[0].Rows[0] // During the first attempt, this should encounter a write-write conflict - // and force a transaction retry. - if epoch == 0 { + // and force a transaction retry for Snapshot and Serializable isolation + // transactions. For ReadCommitted transactions which allow each batch to + // operate using a different read snapshot, the increment will be applied + // to a newer version of the key than that returned by the get, but the + // increment itself will still be atomic. + if epoch == 0 && isoLevel != isolation.ReadCommitted { require.Error(t, err) require.Regexp(t, "TransactionRetryWithProtoRefreshError: .*WriteTooOldError", err) return err } - // During the second attempt, this should succeed. + // During the second attempt (or first for Read Committed), this should + // succeed. require.NoError(t, err) require.Equal(t, int64(2), ir.ValueInt()) return nil @@ -215,15 +218,17 @@ func TestTxnLostIncrement(t *testing.T) { } // TestTxnLostUpdate verifies that transactions are not susceptible to the -// lost update anomaly, regardless of isolation level. +// lost update anomaly if they run at Snapshot isolation or stronger, but +// are susceptible to the lost update anomaly if they run at Read Committed +// isolation. +// +// The transaction history looks as follows for Snapshot and Serializable: // -// The transaction history looks as follows: +// R1(A) W2(A,"hi") C2 W1(A,"oops!") [write-write restart] R1(A) W1(A,"correct") C1 // -// R1(A) W2(A,"hi") W1(A,"oops!") C1 [write-write restart] R1(A) W1(A,"correct") C1 +// The transaction history looks as follows for Read Committed: // -// TODO(nvanbenschoten): once we address #100133, update this test to advance -// the read snapshot for ReadCommitted transactions between the read and the -// write. Demonstrate that doing so allows for a lost update. +// R1(A) W2(A,"hi") C2 W1(A,"oops!") C1 func TestTxnLostUpdate(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -274,24 +279,33 @@ func TestTxnLostUpdate(t *testing.T) { } // During the first attempt, this should encounter a write-write conflict - // and force a transaction retry. - if epoch == 0 { + // and force a transaction retry for Snapshot and Serializable isolation + // transactions. For ReadCommitted transactions which allow each batch to + // operate using a different read snapshot, the write will succeed. + if epoch == 0 && isoLevel != isolation.ReadCommitted { require.Error(t, err) require.Regexp(t, "TransactionRetryWithProtoRefreshError: .*WriteTooOldError", err) return err } - // During the second attempt, this should succeed. + // During the second attempt (or first for Read Committed), this should + // succeed. require.NoError(t, err) return nil }) require.NoError(t, err) // Verify final value. + var expVal string + if isoLevel != isolation.ReadCommitted { + expVal = "correct" + } else { + expVal = "oops!" + } gr, err := s.DB.Get(ctx, key) require.NoError(t, err) require.True(t, gr.Exists()) - require.Equal(t, []byte("correct"), gr.ValueBytes()) + require.Equal(t, []byte(expVal), gr.ValueBytes()) } for _, isoLevel := range isolation.Levels() { @@ -351,7 +365,7 @@ func TestTxnWeakIsolationLevelsTolerateWriteSkew(t *testing.T) { // Finally, try to commit. This should succeed for isolation levels that // allow for write skew. It should fail for isolation levels that do not. err := txn1.Commit(ctx) - if isoLevel.ToleratesWriteSkew() { + if isoLevel != isolation.Serializable { require.NoError(t, err) } else { require.Error(t, err) @@ -364,6 +378,94 @@ func TestTxnWeakIsolationLevelsTolerateWriteSkew(t *testing.T) { } } +// TestTxnReadCommittedPerStatementReadSnapshot verifies that transactions run +// under the read committed isolation level observe a new read snapshot on each +// statement (or kv batch), while transactions run under stronger isolation +// levels (snapshot and serializable) observe a single read snapshot for their +// entire duration. +func TestTxnReadCommittedPerStatementReadSnapshot(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + run := func(isoLevel isolation.Level, mode kv.SteppingMode, step bool, expObserveExternalWrites bool) { + s := createTestDB(t) + defer s.Stop() + ctx := context.Background() + key := roachpb.Key("a") + + incrementKey := func() { + _, err := s.DB.Inc(ctx, key, 1) + require.NoError(t, err) + } + incrementKey() + + // Begin the test's transaction. + txn1 := s.DB.NewTxn(ctx, "txn1") + require.NoError(t, txn1.SetIsoLevel(isoLevel)) + txn1.ConfigureStepping(ctx, mode) + + // In a loop, increment the key outside the transaction, then read it in the + // transaction. If stepping is enabled, step the transaction before each read. + var readVals []int64 + for i := 0; i < 3; i++ { + incrementKey() + + if step { + require.NoError(t, txn1.Step(ctx)) + } + + // Read the key twice in the same batch, to demonstrate that regardless of + // isolation level or stepping mode, a single batch observes a single read + // snapshot. + b := txn1.NewBatch() + b.Get(key) + b.Scan(key, key.Next()) + require.NoError(t, txn1.Run(ctx, b)) + require.Equal(t, 2, len(b.Results)) + require.Equal(t, 1, len(b.Results[0].Rows)) + require.Equal(t, 1, len(b.Results[1].Rows)) + require.Equal(t, b.Results[0].Rows[0], b.Results[1].Rows[0]) + readVals = append(readVals, b.Results[0].Rows[0].ValueInt()) + } + + // Commit the transaction. + require.NoError(t, txn1.Commit(ctx)) + + // Verify that the transaction read the correct values. + var expVals []int64 + if expObserveExternalWrites { + expVals = []int64{2, 3, 4} + } else { + expVals = []int64{1, 1, 1} + } + require.Equal(t, expVals, readVals) + } + + for _, isoLevel := range isolation.Levels() { + t.Run(isoLevel.String(), func(t *testing.T) { + testutils.RunTrueAndFalse(t, "steppingMode", func(t *testing.T, modeBool bool) { + mode := kv.SteppingMode(modeBool) + if mode == kv.SteppingEnabled { + // If stepping is enabled, run a variant of the test where the + // transaction is stepped between reads and a variant of the test + // where it is not. + testutils.RunTrueAndFalse(t, "step", func(t *testing.T, step bool) { + // Expect a new read snapshot on each kv operation if the + // transaction is read committed and is manually stepped. + expObserveExternalWrites := isoLevel == isolation.ReadCommitted && step + run(isoLevel, mode, step, expObserveExternalWrites) + }) + } else { + // Expect a new read snapshot on each kv operation if the + // transaction is read committed. + expObserveExternalWrites := isoLevel == isolation.ReadCommitted + run(isoLevel, mode, false, expObserveExternalWrites) + } + }) + }) + } +} + // TestTxnWriteReadConflict verifies that write-read conflicts are non-blocking // to the reader, except when both the writer and reader are both serializable // transactions. In that case, the reader will block until the writer completes. diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index b4b5e2d8ddb5..da7faa8469fb 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -273,16 +273,23 @@ type TxnSender interface { // GetLeafTxnInitialState() instead when creating leaf transactions. TestingCloneTxn() *roachpb.Transaction - // Step creates a sequencing point in the current transaction. A - // sequencing point establishes a snapshot baseline for subsequent - // read-only operations: until the next sequencing point, read-only - // operations observe the data at the time the snapshot was - // established and ignore writes performed since. + // Step creates an internal sequencing point in the current transaction. An + // internal sequencing point establishes a snapshot baseline for subsequent + // read-only operations of the transaction's own writes: until the next + // sequencing point, read-only operations observe the transaction's writes at + // the time the snapshot was established and ignore writes performed by the + // transaction since. + // + // Additionally, for Read Committed transactions, Step also advances the + // transaction's external read snapshot (i.e. ReadTimestamp) to a timestamp + // captured from the local HLC clock. This ensures that subsequent read-only + // operations observe the writes of other transactions that were committed + // before the time the new snapshot was established. For more detail on the + // interaction between transaction isolation levels and Step, see + // (isolation.Level).PerStatementReadSnapshot. // // Step() can only be called after stepping mode has been enabled // using ConfigureStepping(SteppingEnabled). - // - // The method is idempotent. Step(context.Context) error // GetReadSeqNum gets the read sequence point for the current transaction. @@ -293,10 +300,10 @@ type TxnSender interface { // ConfigureStepping sets the sequencing point behavior. // - // Note that a Sender is initially in the non-stepping mode, - // i.e. uses reads-own-writes by default. This makes the step - // behavior opt-in and backward-compatible with existing code which - // does not need it. + // Note that a Sender is initially in the non-stepping mode, i.e. by default, + // it uses reads-own-writes and, under Read Committed, establishes a new read + // snapshot per batch. This makes the step behavior opt-in and + // backward-compatible with existing code which does not need it. // // Calling ConfigureStepping(SteppingEnabled) when the stepping mode // is currently disabled implies calling Step(), for convenience.