From f8e5a8a5f1b9941aa16385b90dac0de058da3671 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 5 Jun 2023 16:15:50 -0400 Subject: [PATCH] kv: establish new read snapshot on statement/batch boundaries under RC Closes #100133. This commit teaches Read Committed transactions to establish a new external "read snapshot" on each SQL statement or KV batch boundary. The per-statement/batch read snapshot logic is integrated into the transaction `Stepping` infrastructure in an analogous manner to how the transaction's internal `readSeq` is advanced. For transactions with stepping enabled (e.g. SQL transactions), the `Step` API now advances the transaction's external read snapshot (i.e. `ReadTimestamp`) to a timestamp captured from the local HLC clock. This ensures that subsequent read-only operations observe the writes of other transactions that were committed before the time the statement began. For transactions with stepping disabled (e.g. raw KV transactions), each batch implicitly advances the read snapshot. This ensures that the batch observes the writes of other transactions that were committed before the batch was issued. This read snapshot will be at least as recent as the previous read snapshot, and will typically be more recent (i.e. it will never regress). Consistency with prior reads in the transaction is not maintained, so reads of previously read keys may not be "repeatable" and may observe "phantoms". On the other hand, by not maintaining consistency between read snapshots, isolation-related retries (write-write conflicts) and consistency-related retries (uncertainty errors) have a higher chance of being refreshed away (client-side or server-side) without need for client intervention (i.e. without requiring a statement-level retry). This benefit can be seen in `TestTxnCoordSenderRetries`. As described in the Read Committed RFC, the transaction's uncertainty interval is not reset when establishing a new read snapshot. See section "Read Uncertainty Intervals" of the RFC for the rationale behind this decision. Release note: None --- .../kvcoord/dist_sender_server_test.go | 446 ++++++++++++++---- pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 89 ++++ .../kvcoord/txn_interceptor_span_refresher.go | 17 +- pkg/kv/kvclient/kvcoord/txn_test.go | 144 +++++- pkg/kv/sender.go | 29 +- 5 files changed, 604 insertions(+), 121 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 32c645f7bff7..bdc23647bd64 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -2087,10 +2087,19 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.Put(ctx, "a", "put") // put to advance txn ts }, - // No retry, preemptive refresh before commit. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive refresh before commit. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive refresh before commit. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before commit. + isolation.ReadCommitted: {}, }, }, { @@ -2117,10 +2126,19 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.InitPut(ctx, "a", "put", false /* failOnTombstones */) // put to advance txn ts }, - // No retry, preemptive (no-op) refresh before commit. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive (no-op) refresh before commit. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive (no-op) refresh before commit. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before commit. + isolation.ReadCommitted: {}, }, }, { @@ -2135,10 +2153,19 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("put")) // cput to advance txn ts, set update span }, - // No retry, preemptive (no-op) refresh before commit. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive (no-op) refresh before commit. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive (no-op) refresh before commit. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before commit. + isolation.ReadCommitted: {}, }, }, { @@ -2168,10 +2195,19 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "ab", "cput", nil) // cput advances, sets update span }, - // No retry, preemptive (no-op) refresh before commit. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive (no-op) refresh before commit. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive (no-op) refresh before commit. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before commit. + isolation.ReadCommitted: {}, }, }, { @@ -2191,7 +2227,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { expClientAutoRetryAfterRefresh: false, }, // No refresh, no retry. - isolation.Snapshot: {}, + isolation.Snapshot: {}, + isolation.ReadCommitted: {}, }, }, { @@ -2242,13 +2279,25 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "a", "put"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } _, err := txn.Get(ctx, "a2") return err }, - // No retry, preemptive refresh before get. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive refresh before get. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive refresh before get. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before get. + isolation.ReadCommitted: {}, }, }, { @@ -2262,13 +2311,25 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "a", "put"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } _, err := txn.Scan(ctx, "a2", "a3", 0) return err }, - // No retry, preemptive refresh before scan. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive refresh before scan. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive refresh before scan. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before scan. + isolation.ReadCommitted: {}, }, }, { @@ -2282,14 +2343,26 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "a", "put"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } b := txn.NewBatch() b.Get("a2") return txn.CommitInBatch(ctx, b) }, - // No retry, preemptive refresh before commit. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive refresh before commit. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive refresh before commit. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before commit. + isolation.ReadCommitted: {}, }, }, { @@ -2303,14 +2376,26 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "a", "put"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } b := txn.NewBatch() b.Scan("a2", "a3") return txn.CommitInBatch(ctx, b) }, - // No retry, preemptive refresh before commit. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive refresh before commit. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive refresh before commit. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before commit. + isolation.ReadCommitted: {}, }, }, { @@ -2332,7 +2417,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { expClientAutoRetryAfterRefresh: true, }, // No refresh, no retry. - isolation.Snapshot: {}, + isolation.Snapshot: {}, + isolation.ReadCommitted: {}, }, }, { @@ -2354,7 +2440,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { expClientAutoRetryAfterRefresh: true, }, // No refresh, no retry. - isolation.Snapshot: {}, + isolation.Snapshot: {}, + isolation.ReadCommitted: {}, }, }, { @@ -2407,6 +2494,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "a", "put"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } // Make the final batch large enough such that if we accounted // for all of its spans then we would exceed the limit on // refresh spans. This is not an issue because we never need to @@ -2422,10 +2512,19 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return txn.CommitInBatch(ctx, b) }, - // No retry, preemptive refresh before commit. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive refresh before commit. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive refresh before commit. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before commit. + isolation.ReadCommitted: {}, }, }, { @@ -2447,6 +2546,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { if _, err := txn.DelRange(ctx, "a", "b", false /* returnKeys */); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } // Make the final batch large enough such that if we accounted // for all of its spans then we would exceed the limit on // refresh spans. This is not an issue because we never need to @@ -2469,7 +2571,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { expClientAutoRetryAfterRefresh: false, }, // No refresh, no retry. - isolation.Snapshot: {}, + isolation.Snapshot: {}, + isolation.ReadCommitted: {}, }, }, { @@ -2493,9 +2596,22 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.Put(ctx, "a", "put") }, priorReads: true, - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + perIsoLevel: map[isolation.Level]*expect{ + // Client-side refresh of prior reads after write-write conflict. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Client-side refresh of prior reads after write-write conflict. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Server-side refresh after write-write conflict. Prior reads performed + // in earlier batches (from earlier read snapshots) are not refreshed. + isolation.ReadCommitted: { + expServerRefresh: true, + }, }, }, { @@ -2543,20 +2659,35 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "a", "put"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } // Get from "b" to establish a read span. It is important that we // perform a preemptive refresh before this read, otherwise the refresh // would fail. if _, err := txn.Get(ctx, "b"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } // Now, Put to "b", which would have thrown a write-too-old error had // the transaction not preemptively refreshed before the Get. return txn.Put(ctx, "b", "put") }, - // No retry, preemptive refresh before Get. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: false, + perIsoLevel: map[isolation.Level]*expect{ + // No retry, preemptive refresh before Get. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No retry, preemptive refresh before Get. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: false, + }, + // No refresh, no retry. New read snapshot established before Get. + isolation.ReadCommitted: {}, }, }, { @@ -2566,6 +2697,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, retryable: func(ctx context.Context, txn *kv.Txn) error { // Get so we must refresh when txn timestamp moves forward. + // Note that we don't step the transaction between this read and the + // subsequent write, so the write-write conflict causes a client-side + // refresh even under Read Committed. if _, err := txn.Get(ctx, "a"); err != nil { return err } @@ -2578,6 +2712,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "a", "txn-value1"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } // Write again to make sure the timestamp of the second intent // is correctly set to the txn's advanced timestamp. There was // previously a bug where the txn's original timestamp would be used @@ -2650,9 +2787,22 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value")) }, priorReads: true, - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + perIsoLevel: map[isolation.Level]*expect{ + // Client-side refresh of prior reads after write-write conflict. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Client-side refresh of prior reads after write-write conflict. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Server-side refresh after write-write conflict. Prior reads performed + // in earlier batches (from earlier read snapshots) are not refreshed. + isolation.ReadCommitted: { + expServerRefresh: true, + }, }, }, { @@ -2700,9 +2850,22 @@ func TestTxnCoordSenderRetries(t *testing.T) { return nil }, priorReads: true, - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + perIsoLevel: map[isolation.Level]*expect{ + // Client-side refresh of prior reads after write-write conflict. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Client-side refresh of prior reads after write-write conflict. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Server-side refresh after write-write conflict. Prior reads performed + // in earlier batches (from earlier read snapshots) are not refreshed. + isolation.ReadCommitted: { + expServerRefresh: true, + }, }, }, { @@ -2726,10 +2889,22 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.InitPut(ctx, "iput", "put", false) }, priorReads: true, - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, // fails on first attempt at cput with write too old - // Succeeds on second attempt. + perIsoLevel: map[isolation.Level]*expect{ + // Client-side refresh of prior reads after write-write conflict. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Client-side refresh of prior reads after write-write conflict. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Server-side refresh after write-write conflict. Prior reads performed + // in earlier batches (from earlier read snapshots) are not refreshed. + isolation.ReadCommitted: { + expServerRefresh: true, + }, }, }, { @@ -2759,10 +2934,22 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.InitPut(ctx, "iput", "put", false) }, priorReads: true, - // Expect a transaction coord retry, which should succeed. - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, + perIsoLevel: map[isolation.Level]*expect{ + // Client-side refresh of prior reads after write-write conflict. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Client-side refresh of prior reads after write-write conflict. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + }, + // Server-side refresh after write-write conflict. Prior reads performed + // in earlier batches (from earlier read snapshots) are not refreshed. + isolation.ReadCommitted: { + expServerRefresh: true, + }, }, }, { @@ -2857,10 +3044,25 @@ func TestTxnCoordSenderRetries(t *testing.T) { return err }, priorReads: true, - allIsoLevels: &expect{ - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, - expOnePhaseCommit: true, + perIsoLevel: map[isolation.Level]*expect{ + // Client-side refresh of prior reads after write-write conflict. + isolation.Serializable: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + expOnePhaseCommit: true, + }, + // Client-side refresh of prior reads after write-write conflict. + isolation.Snapshot: { + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + expOnePhaseCommit: true, + }, + // Server-side refresh after write-write conflict. Prior reads performed + // in earlier batches (from earlier read snapshots) are not refreshed. + isolation.ReadCommitted: { + expServerRefresh: true, + expOnePhaseCommit: true, + }, }, }, { @@ -2923,6 +3125,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { if _, err := txn.Get(ctx, "c"); err != nil { return err } + // NOTE: don't Step to preserve write-write conflict under Read Committed. _, err := txn.DelRange(ctx, "a", "b", false /* returnKeys */) return err }, @@ -2944,6 +3147,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { if _, err := txn.Get(ctx, "a"); err != nil { return err } + // NOTE: don't Step to preserve write-write conflict under Read Committed. _, err := txn.DelRange(ctx, "a", "b", false /* returnKeys */) return err }, @@ -2989,6 +3193,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "another", "another put"); err != nil { return err } + // NOTE: don't Step to preserve write-write conflict under Read Committed. b := txn.NewBatch() b.Put("a", "final value") return txn.CommitInBatch(ctx, b) @@ -3132,7 +3337,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { expClientAutoRetryAfterRefresh: true, }, // No refresh, no retry. - isolation.Snapshot: {}, + isolation.Snapshot: {}, + isolation.ReadCommitted: {}, }, }, { @@ -3157,7 +3363,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { expClientAutoRetryAfterRefresh: true, }, // No refresh, no retry. - isolation.Snapshot: {}, + isolation.Snapshot: {}, + isolation.ReadCommitted: {}, }, }, { @@ -3216,13 +3423,36 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, priorReads: true, - // The Put to "a" will fail, failing the parallel commit with an error and - // forcing a client-side refresh and auto-retry of the full batch. - allIsoLevels: &expect{ - expServerRefresh: false, - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, - expParallelCommitAutoRetry: false, + perIsoLevel: map[isolation.Level]*expect{ + // The Put to "a" will fail, failing the parallel commit with an error and + // forcing a client-side refresh and auto-retry of the full batch. + isolation.Serializable: { + expServerRefresh: false, + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + expParallelCommitAutoRetry: false, + }, + // The Put to "a" will fail, failing the parallel commit with an error and + // forcing a client-side refresh and auto-retry of the full batch. + isolation.Snapshot: { + expServerRefresh: false, + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + expParallelCommitAutoRetry: false, + }, + // The Put to "a" and EndTxn will succeed after a server-side refresh. + // This will instruct the txn to stage at the post-refresh timestamp, + // qualifying for the implicit commit condition and avoiding a client-side + // refresh. + // + // Prior reads performed in earlier batches (from earlier read snapshots) + // are not refreshed. + isolation.ReadCommitted: { + expServerRefresh: true, + expClientRefreshSuccess: false, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: false, + }, }, }, { @@ -3237,13 +3467,36 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, priorReads: true, - // The Put to "c" will fail, failing the parallel commit with an error and - // forcing a client-side refresh and auto-retry of the full batch. - allIsoLevels: &expect{ - expServerRefresh: false, - expClientRefreshSuccess: true, - expClientAutoRetryAfterRefresh: true, - expParallelCommitAutoRetry: false, + perIsoLevel: map[isolation.Level]*expect{ + // The Put to "c" will fail, failing the parallel commit with an error and + // forcing a client-side refresh and auto-retry of the full batch. + isolation.Serializable: { + expServerRefresh: false, + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + expParallelCommitAutoRetry: false, + }, + // The Put to "c" will fail, failing the parallel commit with an error and + // forcing a client-side refresh and auto-retry of the full batch. + isolation.Snapshot: { + expServerRefresh: false, + expClientRefreshSuccess: true, + expClientAutoRetryAfterRefresh: true, + expParallelCommitAutoRetry: false, + }, + // The Put to "c" will succeed after a server-side refresh. However, the + // txn has already staged on the other range at the pre-refresh timestamp. + // As a result, it does not qualify for the implicit commit condition and + // requires a parallel commit auto-retry. + // + // Prior reads performed in earlier batches (from earlier read snapshots) + // are not refreshed. + isolation.ReadCommitted: { + expServerRefresh: true, + expClientRefreshSuccess: false, + expClientAutoRetryAfterRefresh: false, + expParallelCommitAutoRetry: true, + }, }, }, { @@ -3319,6 +3572,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { } else if !bytes.Equal(b, []byte("newval")) { return fmt.Errorf("expected \"newval\", got: %v", b) } + if err := txn.Step(ctx); err != nil { + return err + } return txn.Commit(ctx) }, allIsoLevels: &expect{ @@ -3413,6 +3669,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { if err := txn.Put(ctx, "a", "put"); err != nil { return err } + if err := txn.Step(ctx); err != nil { + return err + } b := txn.NewBatch() b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value")) return txn.CommitInBatch(ctx, b) @@ -3614,6 +3873,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { return err } + // Configure stepping to make the Read Committed tests more interesting. + // If we didn't do this, the transaction would never have its timestamp + // pushed, because it would always capture a new read snapshot after the + // afterTxnStart functions run. + txn.ConfigureStepping(ctx, kv.SteppingEnabled) + if tc.tsLeaked { if iso != isolation.Serializable { skip.IgnoreLint(t, "fixed commit timestamp unsupported") @@ -3626,6 +3891,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { if tc.priorReads { _, err := txn.Get(ctx, "prior read") require.NoError(t, err, "prior read") + require.NoError(t, txn.Step(ctx)) } if tc.afterTxnStart != nil { @@ -3633,7 +3899,16 @@ func TestTxnCoordSenderRetries(t *testing.T) { require.NoError(t, err, "afterTxnStart") } - return tc.retryable(ctx, txn) + if err := tc.retryable(ctx, txn); err != nil { + return err + } + + // If the transaction is still open, step one more time before the commit. + if txn.IsOpen() { + require.NoError(t, txn.Step(ctx)) + } + + return nil }) // Verify success or failure. @@ -3656,10 +3931,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - testLevels := []isolation.Level{isolation.Serializable, isolation.Snapshot} - for _, iso := range testLevels { - t.Run(iso.String(), func(t *testing.T) { - run(t, tc, iso) + for _, isoLevel := range isolation.Levels() { + t.Run(isoLevel.String(), func(t *testing.T) { + run(t, tc, isoLevel) }) } }) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index c711a0315a9c..bb654ca077ac 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -521,6 +521,10 @@ func (tc *TxnCoordSender) Send( return nil, nil } + if tc.mu.txn.IsoLevel.PerStatementReadSnapshot() { + tc.maybeAutoStepReadTimestampLocked() + } + // Clone the Txn's Proto so that future modifications can be made without // worrying about synchronization. ba.Txn = tc.mu.txn.Clone() @@ -1353,8 +1357,16 @@ func (tc *TxnCoordSender) PrepareRetryableError( // Step is part of the TxnSender interface. func (tc *TxnCoordSender) Step(ctx context.Context) error { + // TODO(nvanbenschoten): it should be possible to make this assertion, but + // the API is currently misused by the connExecutor. See #86162. + //if tc.typ != kv.RootTxn { + // return errors.AssertionFailedf("cannot step in non-root txn") + //} tc.mu.Lock() defer tc.mu.Unlock() + if tc.mu.txn.IsoLevel.PerStatementReadSnapshot() { + tc.manualStepReadTimestampLocked() + } return tc.interceptorAlloc.txnSeqNumAllocator.manualStepReadSeqLocked(ctx) } @@ -1391,6 +1403,83 @@ func (tc *TxnCoordSender) GetSteppingMode(ctx context.Context) (curMode kv.Stepp return tc.interceptorAlloc.txnSeqNumAllocator.steppingMode } +// manualStepReadTimestampLocked advances the transaction's read timestamp to a +// timestamp taken from the local clock and resets refresh span tracking. +// +//gcassert:inline +func (tc *TxnCoordSender) manualStepReadTimestampLocked() { + tc.stepReadTimestampLocked() +} + +// maybeAutoStepReadTimestampLocked advances the transaction's read timestamp to +// a timestamp taken from the local clock and resets refresh span tracking, if +// manual stepping is disabled and the transaction is expected to automatically +// capture a new read snapshot on each batch. +// +//gcassert:inline +func (tc *TxnCoordSender) maybeAutoStepReadTimestampLocked() { + if tc.typ != kv.RootTxn { + return // only root transactions auto-step + } + if tc.interceptorAlloc.txnSeqNumAllocator.steppingMode == kv.SteppingEnabled { + return // only manual stepping allowed + } + tc.stepReadTimestampLocked() +} + +// stepReadTimestampLocked advances the transaction's read timestamp to a +// timestamp taken from the local clock and resets refresh span tracking. +// +// Doing so establishes a new external "read snapshot" from which all future +// reads in the transaction will operate. This read snapshot will be at least as +// recent as the previous read snapshot, and will typically be more recent (i.e. +// it will never regress). Consistency with prior reads in the transaction is +// not maintained, so reads of previously read keys may not be "repeatable" and +// may observe "phantoms". On the other hand, by not maintaining consistency +// between read snapshots, isolation-related retries (write-write conflicts) and +// consistency-related retries (uncertainty errors) have a higher chance of +// being refreshed away (client-side or server-side) without need for client +// intervention (i.e. without requiring a statement-level retry). +// +// Note that the transaction's uncertainty interval is not reset by this method +// to now()+max_offset, even though doing so would be necessary to strictly +// guarantee real-time ordering between the commit of a writer transaction and +// the subsequent establishment of a new read snapshot in a reader transaction. +// By not resetting the uncertainty interval, we allow for the possibility that +// a reader transaction may establish a new read snapshot after the writer has +// committed (on a node with a fast clock) and yet not observe that writer's +// writes. +// +// This decision not to reset the transaction's uncertainty interval is +// discussed in the Read Committed RFC (section "Read Uncertainty Intervals"): +// +// > Read Committed transactions have the option to provide the same "no stale +// > reads" guarantee at the level of each individual statement. Doing so would +// > require transactions to reset their `GlobalUncertaintyLimit` and +// > `ObservedTimestamps` on each statement boundary, setting their +// > `GlobalUncertaintyLimit` to `hlc.Now() + hlc.MaxOffset()` and clearing all +// > `ObservedTimestamps`. +// > +// > We propose that Read Committed transactions do not do this. The cost of +// > resetting a transaction's uncertainty interval on each statement boundary is +// > likely greater than the benefit. Doing so increases the chance that +// > individual statements retry due to `ReadWithinUncertaintyInterval` errors. In +// > the worst case, each statement will need to traverse (through retries) an +// > entire uncertainty interval before converging to a "certain" read snapshot. +// > While these retries will be scoped to a single statement and should not +// > escape to the client, they do still have a latency cost. +// > +// > We make this decision because we do not expect that applications rely on +// > strong consistency guarantees between the commit of one transaction and the +// > start of an individual statement within another in-progress transaction. To +// > rely on such guarantees would require complex and surprising application-side +// > synchronization. +func (tc *TxnCoordSender) stepReadTimestampLocked() { + now := tc.clock.Now() + tc.mu.txn.BumpReadTimestamp(now) + tc.interceptorAlloc.txnSpanRefresher.resetRefreshSpansLocked() +} + // DeferCommitWait is part of the TxnSender interface. func (tc *TxnCoordSender) DeferCommitWait(ctx context.Context) func(context.Context) error { tc.mu.Lock() diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index 2d655c9c8147..64e19117c1b3 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -629,6 +629,16 @@ func (sr *txnSpanRefresher) appendRefreshSpans( }) } +// resetRefreshSpansLocked clears the txnSpanRefresher's refresh span set and +// marks the empty set as valid. This is used when a transaction is establishing +// a new read snapshot and no longer needs to maintain consistency with previous +// reads. +func (sr *txnSpanRefresher) resetRefreshSpansLocked() { + sr.refreshFootprint.clear() + sr.refreshInvalid = false + sr.refreshedTimestamp.Reset() +} + // canForwardReadTimestampWithoutRefresh returns whether the transaction can // forward its read timestamp after refreshing all the reads that has performed // to this point. This requires that the transaction's timestamp has not leaked. @@ -758,13 +768,14 @@ func (sr *txnSpanRefresher) importLeafFinalState( // epochBumpedLocked implements the txnInterceptor interface. func (sr *txnSpanRefresher) epochBumpedLocked() { - sr.refreshFootprint.clear() - sr.refreshInvalid = false - sr.refreshedTimestamp.Reset() + sr.resetRefreshSpansLocked() } // createSavepointLocked is part of the txnInterceptor interface. func (sr *txnSpanRefresher) createSavepointLocked(ctx context.Context, s *savepoint) { + // TODO(nvanbenschoten): make sure this works correctly with ReadCommitted. + // The refresh spans should either be empty when captured into a savepoint or + // should be cleared when the savepoint is rolled back to. s.refreshSpans = make([]roachpb.Span, len(sr.refreshFootprint.asSlice())) copy(s.refreshSpans, sr.refreshFootprint.asSlice()) s.refreshInvalid = sr.refreshInvalid diff --git a/pkg/kv/kvclient/kvcoord/txn_test.go b/pkg/kv/kvclient/kvcoord/txn_test.go index 2190ca84b7b4..7724b1f5d307 100644 --- a/pkg/kv/kvclient/kvcoord/txn_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_test.go @@ -134,15 +134,13 @@ func BenchmarkSingleRoundtripWithLatency(b *testing.B) { // reads and the value that it writes. In other words, the increment is atomic, // regardless of isolation level. // -// The transaction history looks as follows: +// The transaction history looks as follows for Snapshot and Serializable: // -// R1(A) W2(A,+1) W1(A,+1) [write-write restart] R1(A) W1(A,+1) C1 +// R1(A) W2(A,+1) C2 W1(A,+1) [write-write restart] R1(A) W1(A,+1) C1 // -// TODO(nvanbenschoten): once we address #100133, update this test to advance -// the read snapshot for ReadCommitted transactions between the read and the -// increment. Demonstrate that doing so allows for increment to applied to a -// newer value than that returned by the get, but that the increment is still -// atomic. +// The transaction history looks as follows for Read Committed: +// +// R1(A) W2(A,+1) C2 W1(A,+1) C1 func TestTxnLostIncrement(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -190,14 +188,19 @@ func TestTxnLostIncrement(t *testing.T) { ir := b.Results[0].Rows[0] // During the first attempt, this should encounter a write-write conflict - // and force a transaction retry. - if epoch == 0 { + // and force a transaction retry for Snapshot and Serializable isolation + // transactions. For ReadCommitted transactions which allow each batch to + // operate using a different read snapshot, the increment will be applied + // to a newer version of the key than that returned by the get, but the + // increment itself will still be atomic. + if epoch == 0 && isoLevel != isolation.ReadCommitted { require.Error(t, err) require.Regexp(t, "TransactionRetryWithProtoRefreshError: .*WriteTooOldError", err) return err } - // During the second attempt, this should succeed. + // During the second attempt (or first for Read Committed), this should + // succeed. require.NoError(t, err) require.Equal(t, int64(2), ir.ValueInt()) return nil @@ -215,15 +218,17 @@ func TestTxnLostIncrement(t *testing.T) { } // TestTxnLostUpdate verifies that transactions are not susceptible to the -// lost update anomaly, regardless of isolation level. +// lost update anomaly if they run at Snapshot isolation or stronger, but +// are susceptible to the lost update anomaly if they run at Read Committed +// isolation. +// +// The transaction history looks as follows for Snapshot and Serializable: // -// The transaction history looks as follows: +// R1(A) W2(A,"hi") C2 W1(A,"oops!") [write-write restart] R1(A) W1(A,"correct") C1 // -// R1(A) W2(A,"hi") W1(A,"oops!") C1 [write-write restart] R1(A) W1(A,"correct") C1 +// The transaction history looks as follows for Read Committed: // -// TODO(nvanbenschoten): once we address #100133, update this test to advance -// the read snapshot for ReadCommitted transactions between the read and the -// write. Demonstrate that doing so allows for a lost update. +// R1(A) W2(A,"hi") C2 W1(A,"oops!") C1 func TestTxnLostUpdate(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -274,24 +279,33 @@ func TestTxnLostUpdate(t *testing.T) { } // During the first attempt, this should encounter a write-write conflict - // and force a transaction retry. - if epoch == 0 { + // and force a transaction retry for Snapshot and Serializable isolation + // transactions. For ReadCommitted transactions which allow each batch to + // operate using a different read snapshot, the write will succeed. + if epoch == 0 && isoLevel != isolation.ReadCommitted { require.Error(t, err) require.Regexp(t, "TransactionRetryWithProtoRefreshError: .*WriteTooOldError", err) return err } - // During the second attempt, this should succeed. + // During the second attempt (or first for Read Committed), this should + // succeed. require.NoError(t, err) return nil }) require.NoError(t, err) // Verify final value. + var expVal string + if isoLevel != isolation.ReadCommitted { + expVal = "correct" + } else { + expVal = "oops!" + } gr, err := s.DB.Get(ctx, key) require.NoError(t, err) require.True(t, gr.Exists()) - require.Equal(t, []byte("correct"), gr.ValueBytes()) + require.Equal(t, []byte(expVal), gr.ValueBytes()) } for _, isoLevel := range isolation.Levels() { @@ -351,7 +365,7 @@ func TestTxnWeakIsolationLevelsTolerateWriteSkew(t *testing.T) { // Finally, try to commit. This should succeed for isolation levels that // allow for write skew. It should fail for isolation levels that do not. err := txn1.Commit(ctx) - if isoLevel.ToleratesWriteSkew() { + if isoLevel != isolation.Serializable { require.NoError(t, err) } else { require.Error(t, err) @@ -364,6 +378,94 @@ func TestTxnWeakIsolationLevelsTolerateWriteSkew(t *testing.T) { } } +// TestTxnReadCommittedPerStatementReadSnapshot verifies that transactions run +// under the read committed isolation level observe a new read snapshot on each +// statement (or kv batch), while transactions run under stronger isolation +// levels (snapshot and serializable) observe a single read snapshot for their +// entire duration. +func TestTxnReadCommittedPerStatementReadSnapshot(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + run := func(isoLevel isolation.Level, mode kv.SteppingMode, step bool, expObserveExternalWrites bool) { + s := createTestDB(t) + defer s.Stop() + ctx := context.Background() + key := roachpb.Key("a") + + incrementKey := func() { + _, err := s.DB.Inc(ctx, key, 1) + require.NoError(t, err) + } + incrementKey() + + // Begin the test's transaction. + txn1 := s.DB.NewTxn(ctx, "txn1") + require.NoError(t, txn1.SetIsoLevel(isoLevel)) + txn1.ConfigureStepping(ctx, mode) + + // In a loop, increment the key outside the transaction, then read it in the + // transaction. If stepping is enabled, step the transaction before each read. + var readVals []int64 + for i := 0; i < 3; i++ { + incrementKey() + + if step { + require.NoError(t, txn1.Step(ctx)) + } + + // Read the key twice in the same batch, to demonstrate that regardless of + // isolation level or stepping mode, a single batch observes a single read + // snapshot. + b := txn1.NewBatch() + b.Get(key) + b.Scan(key, key.Next()) + require.NoError(t, txn1.Run(ctx, b)) + require.Equal(t, 2, len(b.Results)) + require.Equal(t, 1, len(b.Results[0].Rows)) + require.Equal(t, 1, len(b.Results[1].Rows)) + require.Equal(t, b.Results[0].Rows[0], b.Results[1].Rows[0]) + readVals = append(readVals, b.Results[0].Rows[0].ValueInt()) + } + + // Commit the transaction. + require.NoError(t, txn1.Commit(ctx)) + + // Verify that the transaction read the correct values. + var expVals []int64 + if expObserveExternalWrites { + expVals = []int64{2, 3, 4} + } else { + expVals = []int64{1, 1, 1} + } + require.Equal(t, expVals, readVals) + } + + for _, isoLevel := range isolation.Levels() { + t.Run(isoLevel.String(), func(t *testing.T) { + testutils.RunTrueAndFalse(t, "steppingMode", func(t *testing.T, modeBool bool) { + mode := kv.SteppingMode(modeBool) + if mode == kv.SteppingEnabled { + // If stepping is enabled, run a variant of the test where the + // transaction is stepped between reads and a variant of the test + // where it is not. + testutils.RunTrueAndFalse(t, "step", func(t *testing.T, step bool) { + // Expect a new read snapshot on each kv operation if the + // transaction is read committed and is manually stepped. + expObserveExternalWrites := isoLevel == isolation.ReadCommitted && step + run(isoLevel, mode, step, expObserveExternalWrites) + }) + } else { + // Expect a new read snapshot on each kv operation if the + // transaction is read committed. + expObserveExternalWrites := isoLevel == isolation.ReadCommitted + run(isoLevel, mode, false, expObserveExternalWrites) + } + }) + }) + } +} + // TestTxnWriteReadConflict verifies that write-read conflicts are non-blocking // to the reader, except when both the writer and reader are both serializable // transactions. In that case, the reader will block until the writer completes. diff --git a/pkg/kv/sender.go b/pkg/kv/sender.go index b4b5e2d8ddb5..da7faa8469fb 100644 --- a/pkg/kv/sender.go +++ b/pkg/kv/sender.go @@ -273,16 +273,23 @@ type TxnSender interface { // GetLeafTxnInitialState() instead when creating leaf transactions. TestingCloneTxn() *roachpb.Transaction - // Step creates a sequencing point in the current transaction. A - // sequencing point establishes a snapshot baseline for subsequent - // read-only operations: until the next sequencing point, read-only - // operations observe the data at the time the snapshot was - // established and ignore writes performed since. + // Step creates an internal sequencing point in the current transaction. An + // internal sequencing point establishes a snapshot baseline for subsequent + // read-only operations of the transaction's own writes: until the next + // sequencing point, read-only operations observe the transaction's writes at + // the time the snapshot was established and ignore writes performed by the + // transaction since. + // + // Additionally, for Read Committed transactions, Step also advances the + // transaction's external read snapshot (i.e. ReadTimestamp) to a timestamp + // captured from the local HLC clock. This ensures that subsequent read-only + // operations observe the writes of other transactions that were committed + // before the time the new snapshot was established. For more detail on the + // interaction between transaction isolation levels and Step, see + // (isolation.Level).PerStatementReadSnapshot. // // Step() can only be called after stepping mode has been enabled // using ConfigureStepping(SteppingEnabled). - // - // The method is idempotent. Step(context.Context) error // GetReadSeqNum gets the read sequence point for the current transaction. @@ -293,10 +300,10 @@ type TxnSender interface { // ConfigureStepping sets the sequencing point behavior. // - // Note that a Sender is initially in the non-stepping mode, - // i.e. uses reads-own-writes by default. This makes the step - // behavior opt-in and backward-compatible with existing code which - // does not need it. + // Note that a Sender is initially in the non-stepping mode, i.e. by default, + // it uses reads-own-writes and, under Read Committed, establishes a new read + // snapshot per batch. This makes the step behavior opt-in and + // backward-compatible with existing code which does not need it. // // Calling ConfigureStepping(SteppingEnabled) when the stepping mode // is currently disabled implies calling Step(), for convenience.