From 667e9d5daadb5f523d6e28a3afe0502c0c2e100d Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 21 Apr 2023 18:34:37 -0400 Subject: [PATCH 1/3] kv: use local TxnMetrics in TestTxnCoordSenderRetries This commit updates TestTxnCoordSenderRetries to use a local kv.DB with a local TxnMetrics for its test transaction. This allows the test to precisely assert on the metrics without having to worry about other transactions in the system affecting them. Epic: None Release note: None --- pkg/kv/db.go | 5 ++ .../kvcoord/dist_sender_server_test.go | 84 +++++++++---------- 2 files changed, 45 insertions(+), 44 deletions(-) diff --git a/pkg/kv/db.go b/pkg/kv/db.go index 9de451e26cbe..ea5126dab707 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -287,6 +287,11 @@ func (db *DB) Clock() *hlc.Clock { return db.clock } +// Context returns the DB's DBContext. +func (db *DB) Context() DBContext { + return db.ctx +} + // NewDB returns a new DB. func NewDB( actx log.AmbientContext, factory TxnSenderFactory, clock *hlc.Clock, stopper *stop.Stopper, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index a1ecdf73e78c..6a9238d6d7d5 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -3179,40 +3180,49 @@ func TestTxnCoordSenderRetries(t *testing.T) { } } + filterFn.Store((func(kvserverbase.FilterArgs) *kvpb.Error)(nil)) if tc.filter != nil { filterFn.Store(tc.filter) - defer filterFn.Store((func(kvserverbase.FilterArgs) *kvpb.Error)(nil)) } + refreshSpansCondenseFilter.Store((func() bool)(nil)) if tc.refreshSpansCondenseFilter != nil { refreshSpansCondenseFilter.Store(tc.refreshSpansCondenseFilter) - defer refreshSpansCondenseFilter.Store((func() bool)(nil)) } - var metrics kvcoord.TxnMetrics - var lastAutoRetries int64 - var hadClientRetry bool - epoch := 0 - if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if tc.priorReads { - _, err := txn.Get(ctx, "prior read") - if err != nil { - t.Fatalf("unexpected error during prior read: %v", err) - } + // Construct a new DB with a fresh set of TxnMetrics. This allows the test + // to precisely assert on the metrics without having to worry about other + // transactions in the system affecting them. + metrics := kvcoord.MakeTxnMetrics(metric.TestSampleInterval) + tcsFactoryCfg := kvcoord.TxnCoordSenderFactoryConfig{ + AmbientCtx: s.AmbientCtx(), + Settings: s.ClusterSettings(), + Clock: s.Clock(), + Stopper: s.Stopper(), + Metrics: metrics, + TestingKnobs: *s.TestingKnobs().KVClient.(*kvcoord.ClientTestingKnobs), + } + distSender := s.DistSenderI().(*kvcoord.DistSender) + tcsFactory := kvcoord.NewTxnCoordSenderFactory(tcsFactoryCfg, distSender) + testDB := kv.NewDBWithContext(s.AmbientCtx(), tcsFactory, s.Clock(), db.Context()) + + err := testDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if txn.Epoch() > 0 { + // We expected a new epoch and got it; return success. + return nil } + if tc.tsLeaked { // Read the commit timestamp so the expectation is that // this transaction cannot be restarted internally. _ = txn.CommitTimestamp() } - if epoch > 0 { - if !tc.clientRetry { - t.Fatal("expected txn coord sender to retry, but got client-side retry") + + if tc.priorReads { + _, err := txn.Get(ctx, "prior read") + if err != nil { + t.Fatalf("unexpected error during prior read: %v", err) } - hadClientRetry = true - // We expected a new epoch and got it; return success. - return nil } - defer func() { epoch++ }() if tc.afterTxnStart != nil { if err := tc.afterTxnStart(ctx, db); err != nil { @@ -3220,34 +3230,20 @@ func TestTxnCoordSenderRetries(t *testing.T) { } } - metrics = txn.Sender().(*kvcoord.TxnCoordSender).TxnCoordSenderFactory.Metrics() - lastAutoRetries = metrics.RefreshAutoRetries.Count() - return tc.retryable(ctx, txn) - }); err != nil { - if len(tc.expFailure) == 0 || !testutils.IsError(err, tc.expFailure) { - t.Fatal(err) - } + }) + + // Verify success or failure. + if len(tc.expFailure) == 0 { + require.NoError(t, err) } else { - if len(tc.expFailure) > 0 { - t.Errorf("expected failure %q", tc.expFailure) - } - } - // Verify auto retry metric. Because there's a chance that splits - // from the cluster setup are still ongoing and can experience - // their own retries, this might increase by more than one, so we - // can only check here that it's >= 1. - autoRetries := metrics.RefreshAutoRetries.Count() - lastAutoRetries - if tc.txnCoordRetry && autoRetries == 0 { - t.Errorf("expected [at least] one txn coord sender auto retry; got %d", autoRetries) - } else if !tc.txnCoordRetry && autoRetries != 0 { - t.Errorf("expected no txn coord sender auto retries; got %d", autoRetries) - } - if tc.clientRetry && !hadClientRetry { - t.Errorf("expected but did not experience client retry") - } else if !tc.clientRetry && hadClientRetry { - t.Errorf("did not expect but experienced client retry") + require.Error(t, err) + require.Regexp(t, tc.expFailure, err) } + + // Verify metrics. + require.Equal(t, tc.txnCoordRetry, metrics.RefreshAutoRetries.Count() != 0) + require.Equal(t, tc.clientRetry, metrics.Restarts.TotalSum() != 0) }) } } From 68607244d063cf92e2f878dd3b47e2f489f18104 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 21 Apr 2023 19:03:49 -0400 Subject: [PATCH 2/3] kv: add preemptive refreshes and 1PCs to TestTxnCoordSenderRetries This commit adds more testing conditions to TestTxnCoordSenderRetries. Epic: None Release note: None --- .../kvcoord/dist_sender_server_test.go | 199 ++++++++++++------ 1 file changed, 131 insertions(+), 68 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 6a9238d6d7d5..a8cb2bcd44d3 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -2059,10 +2059,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { refreshSpansCondenseFilter func() bool priorReads bool tsLeaked bool - // If both of these are false, no retries. - txnCoordRetry bool - clientRetry bool - expFailure string // regexp pattern to match on error if not empty + // Testing expectations. + expClientRefresh bool // pre-emptive or reactive client refresh + expClientAutoRetryAfterRefresh bool // auto-retries of batches after client refresh + expClientRestart bool // client side restarts + expOnePhaseCommit bool // 1PC commits + expFailure string // regexp pattern to match on error, if not empty }{ { name: "forwarded timestamp with get and put", @@ -2074,6 +2076,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.Put(ctx, "a", "put") // put to advance txn ts }, // No retry, preemptive refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with get and put after timestamp leaked", @@ -2084,8 +2088,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.Put(ctx, "a", "put") // put to advance txn ts }, - tsLeaked: true, - clientRetry: true, + tsLeaked: true, + // Cannot refresh, so must restart the transaction. + expClientRestart: true, }, { name: "forwarded timestamp with get and initput", @@ -2096,6 +2101,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.InitPut(ctx, "a", "put", false /* failOnTombstones */) // put to advance txn ts }, + // No retry, preemptive (no-op) refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with get and cput", @@ -2106,6 +2114,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("put")) // cput to advance txn ts, set update span }, + // No retry, preemptive (no-op) refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with get and cput after timestamp leaked", @@ -2119,8 +2130,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("put")) // cput to advance txn ts, set update span }, - tsLeaked: true, - clientRetry: true, + tsLeaked: true, + // Cannot refresh, so must restart the transaction. + expClientRestart: true, }, { name: "forwarded timestamp with scan and cput", @@ -2131,6 +2143,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "ab", "cput", nil) // cput advances, sets update span }, + // No retry, preemptive (no-op) refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with delete range", @@ -2143,6 +2158,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return err }, // No retry, preemptive refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with put in batch commit", @@ -2156,6 +2173,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // No retries, server-side refresh, 1pc commit. + expOnePhaseCommit: true, }, { name: "forwarded timestamp with cput in batch commit", @@ -2172,6 +2190,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // No retries, server-side refresh, 1pc commit. + expOnePhaseCommit: true, }, { name: "forwarded timestamp with get before commit", @@ -2188,6 +2207,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return err }, // No retry, preemptive refresh before get. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with scan before commit", @@ -2204,6 +2225,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return err }, // No retry, preemptive refresh before scan. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with get in batch commit", @@ -2221,6 +2244,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // No retry, preemptive refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with scan in batch commit", @@ -2238,6 +2263,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // No retry, preemptive refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with put and get in batch commit", @@ -2252,7 +2279,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // Read-only request (Get) prevents server-side refresh. - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "forwarded timestamp with put and scan in batch commit", @@ -2267,7 +2295,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // Read-only request (Scan) prevents server-side refresh. - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { // If we've exhausted the limit for tracking refresh spans but we @@ -2296,7 +2325,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, filter: newUncertaintyFilter(roachpb.Key("a")), // We expect the request to succeed after a server-side retry. - txnCoordRetry: false, + expClientAutoRetryAfterRefresh: false, }, { // Even if accounting for the refresh spans would have exhausted the @@ -2332,6 +2361,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // No retry, preemptive refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { // Even if accounting for the refresh spans would have exhausted the @@ -2368,6 +2399,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // No retry, preemptive refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "write too old with put", @@ -2386,8 +2419,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.Put(ctx, "a", "put") }, - priorReads: true, - txnCoordRetry: true, + priorReads: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "write too old with put after timestamp leaked", @@ -2397,8 +2431,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.Put(ctx, "a", "put") }, - tsLeaked: true, - clientRetry: true, + tsLeaked: true, + expClientRestart: true, }, { name: "write too old with get in the clear", @@ -2411,7 +2445,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return txn.Put(ctx, "a", "put") }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "write too old with get conflict", @@ -2424,7 +2459,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return txn.Put(ctx, "a", "put") }, - clientRetry: true, + expClientRestart: true, }, { name: "write too old with multiple puts to same key", @@ -2452,7 +2487,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { // out-of-band Put's value would be missed (see #23032). return txn.Put(ctx, "a", "txn-value2") }, - clientRetry: true, // expect a client-side retry as refresh should fail + expClientRestart: true, // expect a client-side retry as refresh should fail }, { name: "write too old with cput matching newer value", @@ -2465,8 +2500,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("put")) }, - txnCoordRetry: false, // fails on first attempt at cput - expFailure: "unexpected value", // the failure we get is a condition failed error + expClientAutoRetryAfterRefresh: false, // fails on first attempt at cput + expFailure: "unexpected value", // the failure we get is a condition failed error }, { name: "write too old with cput matching older value", @@ -2479,8 +2514,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value")) }, - txnCoordRetry: false, // non-matching value means we fail txn coord retry - expFailure: "unexpected value", // the failure we get is a condition failed error + expClientAutoRetryAfterRefresh: false, // non-matching value means we fail txn coord retry + expFailure: "unexpected value", // the failure we get is a condition failed error }, { name: "write too old with cput matching older and newer values", @@ -2505,8 +2540,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value")) }, - priorReads: true, - txnCoordRetry: true, + priorReads: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "write too old with increment", @@ -2549,8 +2585,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return nil }, - priorReads: true, - txnCoordRetry: true, + priorReads: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "write too old with initput", @@ -2569,8 +2606,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.InitPut(ctx, "iput", "put", false) }, - priorReads: true, - txnCoordRetry: true, // fails on first attempt at cput with write too old + priorReads: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, // fails on first attempt at cput with write too old // Succeeds on second attempt. }, { @@ -2598,7 +2636,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, priorReads: true, // Expect a transaction coord retry, which should succeed. - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "write too old with initput matching older value", @@ -2611,8 +2650,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.InitPut(ctx, "iput", "put1", false) }, - txnCoordRetry: false, // non-matching value means we fail txn coord retry - expFailure: "unexpected value", // the failure we get is a condition failed error + expClientAutoRetryAfterRefresh: false, // non-matching value means we fail txn coord retry + expFailure: "unexpected value", // the failure we get is a condition failed error }, { name: "write too old with initput matching newer value", @@ -2654,8 +2693,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.InitPut(ctx, "iput", "put", true) }, - txnCoordRetry: false, // non-matching value means we fail txn coord retry - expFailure: "unexpected value", // condition failed error when failing on tombstones + expClientAutoRetryAfterRefresh: false, // non-matching value means we fail txn coord retry + expFailure: "unexpected value", // condition failed error when failing on tombstones }, { name: "write too old with locking read", @@ -2666,6 +2705,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.ScanForUpdate(ctx, "a", "a\x00", 0) return err }, + expOnePhaseCommit: true, }, { name: "write too old with locking read after prior read", @@ -2676,8 +2716,10 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.ScanForUpdate(ctx, "a", "a\x00", 0) return err }, - priorReads: true, - txnCoordRetry: true, + priorReads: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, + expOnePhaseCommit: true, }, { name: "write too old with multi-range locking read (err on first range)", @@ -2688,7 +2730,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.ScanForUpdate(ctx, "a", "c", 0) return err }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, + expOnePhaseCommit: true, }, { name: "write too old with multi-range locking read (err on second range)", @@ -2699,7 +2743,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.ScanForUpdate(ctx, "a", "c", 0) return err }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, + expOnePhaseCommit: true, }, { name: "write too old with multi-range batch of locking reads", @@ -2712,7 +2758,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.ScanForUpdate("b", "b\x00") return txn.Run(ctx, b) }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, + expOnePhaseCommit: true, }, { name: "write too old with delete range after prior read on other key", @@ -2730,7 +2778,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.DelRange(ctx, "a", "b", false /* returnKeys */) return err }, - txnCoordRetry: true, // can refresh + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, // can refresh }, { name: "write too old with delete range after prior read on same key", @@ -2748,7 +2797,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.DelRange(ctx, "a", "b", false /* returnKeys */) return err }, - clientRetry: true, // can't refresh + expClientRestart: true, // can't refresh }, { // This test sends a 1PC batch with Put+EndTxn. @@ -2764,6 +2813,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) // will be a 1PC, won't get auto retry }, // No retries, server-side refresh, 1pc commit. + expOnePhaseCommit: true, }, { // This test is like the previous one in that the commit batch succeeds at @@ -2788,7 +2838,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // The request will succeed after a server-side refresh. - txnCoordRetry: false, + expClientAutoRetryAfterRefresh: false, }, { name: "write too old with cput in batch commit", @@ -2808,6 +2858,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { // WriteTooOldError, and then once at the pushed timestamp. The // server-side retry is enabled by the fact that there have not been any // previous reads and so the transaction can commit at a pushed timestamp. + expOnePhaseCommit: true, }, { // This test is like the previous one, except the 1PC batch cannot commit @@ -2838,7 +2889,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.Put("c", "put") return txn.CommitInBatch(ctx, b) }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "multi-range batch with forwarded timestamp and cput", @@ -2874,7 +2926,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.Put("c", "put") return txn.CommitInBatch(ctx, b) // both puts will succeed, et will retry from get }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "multi-range batch with forwarded timestamp and cput and delete range", @@ -2891,7 +2944,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value")) return txn.CommitInBatch(ctx, b) // both puts will succeed, et will retry }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "multi-range batch with write too old", @@ -2904,7 +2958,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.Put("c", "put") return txn.CommitInBatch(ctx, b) // put to c will return WriteTooOldError }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "multi-range batch with write too old and failed cput", @@ -2920,8 +2975,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.Put("c", "put") return txn.CommitInBatch(ctx, b) }, - txnCoordRetry: false, // non-matching value means we fail txn coord retry - expFailure: "unexpected value", // the failure we get is a condition failed error + expClientAutoRetryAfterRefresh: false, // non-matching value means we fail txn coord retry + expFailure: "unexpected value", // the failure we get is a condition failed error }, { name: "multi-range batch with write too old and successful cput", @@ -2938,7 +2993,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // We expect the request to succeed after a server-side retry. - txnCoordRetry: false, + expClientAutoRetryAfterRefresh: false, }, { // This test checks the behavior of batches that were split by the @@ -2976,7 +3031,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return txn.Commit(ctx) }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "cput within uncertainty interval", @@ -2988,7 +3044,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, filter: newUncertaintyFilter(roachpb.Key("a")), // We expect the request to succeed after a server-side retry. - txnCoordRetry: false, + expClientAutoRetryAfterRefresh: false, }, { name: "cput within uncertainty interval after timestamp leaked", @@ -2998,9 +3054,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value")) }, - filter: newUncertaintyFilter(roachpb.Key("a")), - clientRetry: true, - tsLeaked: true, + filter: newUncertaintyFilter(roachpb.Key("a")), + expClientRestart: true, + tsLeaked: true, }, { name: "reads within uncertainty interval", @@ -3019,8 +3075,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value")) }, - filter: newUncertaintyFilter(roachpb.Key("ac")), - txnCoordRetry: true, + filter: newUncertaintyFilter(roachpb.Key("ac")), + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "reads within uncertainty interval and violating concurrent put", @@ -3042,8 +3099,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return nil }, - filter: newUncertaintyFilter(roachpb.Key("ac")), - clientRetry: true, // note this txn is read-only but still restarts + filter: newUncertaintyFilter(roachpb.Key("ac")), + expClientRestart: true, // note this txn is read-only but still restarts }, { name: "multi-range batch with uncertainty interval error", @@ -3058,8 +3115,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value")) return txn.CommitInBatch(ctx, b) }, - filter: newUncertaintyFilter(roachpb.Key("c")), - txnCoordRetry: true, + filter: newUncertaintyFilter(roachpb.Key("c")), + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "multi-range batch with uncertainty interval error and get conflict", @@ -3080,8 +3138,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.CPut("a", "cput", kvclientutils.StrToCPutExistingValue("value")) return txn.CommitInBatch(ctx, b) }, - filter: newUncertaintyFilter(roachpb.Key("a")), - clientRetry: true, // will fail because of conflict on refresh span for the Get + filter: newUncertaintyFilter(roachpb.Key("a")), + expClientRestart: true, // will fail because of conflict on refresh span for the Get }, { name: "multi-range batch with uncertainty interval error and mixed success", @@ -3096,7 +3154,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, filter: newUncertaintyFilter(roachpb.Key("c")), // Expect a transaction coord retry, which should succeed. - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "multi-range scan with uncertainty interval error", @@ -3106,7 +3165,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, filter: newUncertaintyFilter(roachpb.Key("c")), // Expect a transaction coord retry, which should succeed. - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "multi-range delete range with uncertainty interval error", @@ -3116,7 +3176,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, filter: newUncertaintyFilter(roachpb.Key("c")), // Expect a transaction coord retry, which should succeed. - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "missing pipelined write caught on chain", @@ -3143,7 +3204,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return err }, // The missing intent write results in a RETRY_ASYNC_WRITE_FAILURE error. - clientRetry: true, + expClientRestart: true, }, { name: "missing pipelined write caught on commit", @@ -3168,7 +3229,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return nil // commit }, // The missing intent write results in a RETRY_ASYNC_WRITE_FAILURE error. - clientRetry: true, + expClientRestart: true, }, } @@ -3242,8 +3303,10 @@ func TestTxnCoordSenderRetries(t *testing.T) { } // Verify metrics. - require.Equal(t, tc.txnCoordRetry, metrics.RefreshAutoRetries.Count() != 0) - require.Equal(t, tc.clientRetry, metrics.Restarts.TotalSum() != 0) + require.Equal(t, tc.expClientRefresh, metrics.RefreshSuccess.Count() != 0, "TxnMetrics.RefreshSuccess") + require.Equal(t, tc.expClientAutoRetryAfterRefresh, metrics.RefreshAutoRetries.Count() != 0, "TxnMetrics.RefreshAutoRetries") + require.Equal(t, tc.expClientRestart, metrics.Restarts.TotalSum() != 0, "TxnMetrics.Restarts") + require.Equal(t, tc.expOnePhaseCommit, metrics.Commits1PC.Count() != 0, "TxnMetrics.Commits1PC") }) } } From c59d66a986cdaf89bd98309c65f02026f2b0e296 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Sat, 29 Apr 2023 21:55:45 -0500 Subject: [PATCH 3/3] kv: remove redundant test case in TestTxnCoordSenderRetries This was the same as the "write too old with put after prior read" case. Epic: None Release note: None --- pkg/kv/kvclient/kvcoord/dist_sender_server_test.go | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index a8cb2bcd44d3..265ef32f1913 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -2434,20 +2434,6 @@ func TestTxnCoordSenderRetries(t *testing.T) { tsLeaked: true, expClientRestart: true, }, - { - name: "write too old with get in the clear", - afterTxnStart: func(ctx context.Context, db *kv.DB) error { - return db.Put(ctx, "a", "put") - }, - retryable: func(ctx context.Context, txn *kv.Txn) error { - if _, err := txn.Get(ctx, "b"); err != nil { - return err - } - return txn.Put(ctx, "a", "put") - }, - expClientRefresh: true, - expClientAutoRetryAfterRefresh: true, - }, { name: "write too old with get conflict", afterTxnStart: func(ctx context.Context, db *kv.DB) error {