diff --git a/pkg/kv/db.go b/pkg/kv/db.go index 9de451e26cbe..ea5126dab707 100644 --- a/pkg/kv/db.go +++ b/pkg/kv/db.go @@ -287,6 +287,11 @@ func (db *DB) Clock() *hlc.Clock { return db.clock } +// Context returns the DB's DBContext. +func (db *DB) Context() DBContext { + return db.ctx +} + // NewDB returns a new DB. func NewDB( actx log.AmbientContext, factory TxnSenderFactory, clock *hlc.Clock, stopper *stop.Stopper, diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index a1ecdf73e78c..265ef32f1913 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -2058,10 +2059,12 @@ func TestTxnCoordSenderRetries(t *testing.T) { refreshSpansCondenseFilter func() bool priorReads bool tsLeaked bool - // If both of these are false, no retries. - txnCoordRetry bool - clientRetry bool - expFailure string // regexp pattern to match on error if not empty + // Testing expectations. + expClientRefresh bool // pre-emptive or reactive client refresh + expClientAutoRetryAfterRefresh bool // auto-retries of batches after client refresh + expClientRestart bool // client side restarts + expOnePhaseCommit bool // 1PC commits + expFailure string // regexp pattern to match on error, if not empty }{ { name: "forwarded timestamp with get and put", @@ -2073,6 +2076,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.Put(ctx, "a", "put") // put to advance txn ts }, // No retry, preemptive refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with get and put after timestamp leaked", @@ -2083,8 +2088,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.Put(ctx, "a", "put") // put to advance txn ts }, - tsLeaked: true, - clientRetry: true, + tsLeaked: true, + // Cannot refresh, so must restart the transaction. + expClientRestart: true, }, { name: "forwarded timestamp with get and initput", @@ -2095,6 +2101,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.InitPut(ctx, "a", "put", false /* failOnTombstones */) // put to advance txn ts }, + // No retry, preemptive (no-op) refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with get and cput", @@ -2105,6 +2114,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("put")) // cput to advance txn ts, set update span }, + // No retry, preemptive (no-op) refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with get and cput after timestamp leaked", @@ -2118,8 +2130,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("put")) // cput to advance txn ts, set update span }, - tsLeaked: true, - clientRetry: true, + tsLeaked: true, + // Cannot refresh, so must restart the transaction. + expClientRestart: true, }, { name: "forwarded timestamp with scan and cput", @@ -2130,6 +2143,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "ab", "cput", nil) // cput advances, sets update span }, + // No retry, preemptive (no-op) refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with delete range", @@ -2142,6 +2158,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return err }, // No retry, preemptive refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with put in batch commit", @@ -2155,6 +2173,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // No retries, server-side refresh, 1pc commit. + expOnePhaseCommit: true, }, { name: "forwarded timestamp with cput in batch commit", @@ -2171,6 +2190,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // No retries, server-side refresh, 1pc commit. + expOnePhaseCommit: true, }, { name: "forwarded timestamp with get before commit", @@ -2187,6 +2207,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return err }, // No retry, preemptive refresh before get. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with scan before commit", @@ -2203,6 +2225,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return err }, // No retry, preemptive refresh before scan. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with get in batch commit", @@ -2220,6 +2244,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // No retry, preemptive refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with scan in batch commit", @@ -2237,6 +2263,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // No retry, preemptive refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "forwarded timestamp with put and get in batch commit", @@ -2251,7 +2279,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // Read-only request (Get) prevents server-side refresh. - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "forwarded timestamp with put and scan in batch commit", @@ -2266,7 +2295,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // Read-only request (Scan) prevents server-side refresh. - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { // If we've exhausted the limit for tracking refresh spans but we @@ -2295,7 +2325,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, filter: newUncertaintyFilter(roachpb.Key("a")), // We expect the request to succeed after a server-side retry. - txnCoordRetry: false, + expClientAutoRetryAfterRefresh: false, }, { // Even if accounting for the refresh spans would have exhausted the @@ -2331,6 +2361,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // No retry, preemptive refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { // Even if accounting for the refresh spans would have exhausted the @@ -2367,6 +2399,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // No retry, preemptive refresh before commit. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, }, { name: "write too old with put", @@ -2385,8 +2419,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.Put(ctx, "a", "put") }, - priorReads: true, - txnCoordRetry: true, + priorReads: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "write too old with put after timestamp leaked", @@ -2396,21 +2431,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.Put(ctx, "a", "put") }, - tsLeaked: true, - clientRetry: true, - }, - { - name: "write too old with get in the clear", - afterTxnStart: func(ctx context.Context, db *kv.DB) error { - return db.Put(ctx, "a", "put") - }, - retryable: func(ctx context.Context, txn *kv.Txn) error { - if _, err := txn.Get(ctx, "b"); err != nil { - return err - } - return txn.Put(ctx, "a", "put") - }, - txnCoordRetry: true, + tsLeaked: true, + expClientRestart: true, }, { name: "write too old with get conflict", @@ -2423,7 +2445,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return txn.Put(ctx, "a", "put") }, - clientRetry: true, + expClientRestart: true, }, { name: "write too old with multiple puts to same key", @@ -2451,7 +2473,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { // out-of-band Put's value would be missed (see #23032). return txn.Put(ctx, "a", "txn-value2") }, - clientRetry: true, // expect a client-side retry as refresh should fail + expClientRestart: true, // expect a client-side retry as refresh should fail }, { name: "write too old with cput matching newer value", @@ -2464,8 +2486,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("put")) }, - txnCoordRetry: false, // fails on first attempt at cput - expFailure: "unexpected value", // the failure we get is a condition failed error + expClientAutoRetryAfterRefresh: false, // fails on first attempt at cput + expFailure: "unexpected value", // the failure we get is a condition failed error }, { name: "write too old with cput matching older value", @@ -2478,8 +2500,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value")) }, - txnCoordRetry: false, // non-matching value means we fail txn coord retry - expFailure: "unexpected value", // the failure we get is a condition failed error + expClientAutoRetryAfterRefresh: false, // non-matching value means we fail txn coord retry + expFailure: "unexpected value", // the failure we get is a condition failed error }, { name: "write too old with cput matching older and newer values", @@ -2504,8 +2526,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value")) }, - priorReads: true, - txnCoordRetry: true, + priorReads: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "write too old with increment", @@ -2548,8 +2571,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return nil }, - priorReads: true, - txnCoordRetry: true, + priorReads: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "write too old with initput", @@ -2568,8 +2592,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.InitPut(ctx, "iput", "put", false) }, - priorReads: true, - txnCoordRetry: true, // fails on first attempt at cput with write too old + priorReads: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, // fails on first attempt at cput with write too old // Succeeds on second attempt. }, { @@ -2597,7 +2622,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, priorReads: true, // Expect a transaction coord retry, which should succeed. - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "write too old with initput matching older value", @@ -2610,8 +2636,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.InitPut(ctx, "iput", "put1", false) }, - txnCoordRetry: false, // non-matching value means we fail txn coord retry - expFailure: "unexpected value", // the failure we get is a condition failed error + expClientAutoRetryAfterRefresh: false, // non-matching value means we fail txn coord retry + expFailure: "unexpected value", // the failure we get is a condition failed error }, { name: "write too old with initput matching newer value", @@ -2653,8 +2679,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.InitPut(ctx, "iput", "put", true) }, - txnCoordRetry: false, // non-matching value means we fail txn coord retry - expFailure: "unexpected value", // condition failed error when failing on tombstones + expClientAutoRetryAfterRefresh: false, // non-matching value means we fail txn coord retry + expFailure: "unexpected value", // condition failed error when failing on tombstones }, { name: "write too old with locking read", @@ -2665,6 +2691,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.ScanForUpdate(ctx, "a", "a\x00", 0) return err }, + expOnePhaseCommit: true, }, { name: "write too old with locking read after prior read", @@ -2675,8 +2702,10 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.ScanForUpdate(ctx, "a", "a\x00", 0) return err }, - priorReads: true, - txnCoordRetry: true, + priorReads: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, + expOnePhaseCommit: true, }, { name: "write too old with multi-range locking read (err on first range)", @@ -2687,7 +2716,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.ScanForUpdate(ctx, "a", "c", 0) return err }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, + expOnePhaseCommit: true, }, { name: "write too old with multi-range locking read (err on second range)", @@ -2698,7 +2729,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.ScanForUpdate(ctx, "a", "c", 0) return err }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, + expOnePhaseCommit: true, }, { name: "write too old with multi-range batch of locking reads", @@ -2711,7 +2744,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.ScanForUpdate("b", "b\x00") return txn.Run(ctx, b) }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, + expOnePhaseCommit: true, }, { name: "write too old with delete range after prior read on other key", @@ -2729,7 +2764,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.DelRange(ctx, "a", "b", false /* returnKeys */) return err }, - txnCoordRetry: true, // can refresh + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, // can refresh }, { name: "write too old with delete range after prior read on same key", @@ -2747,7 +2783,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { _, err := txn.DelRange(ctx, "a", "b", false /* returnKeys */) return err }, - clientRetry: true, // can't refresh + expClientRestart: true, // can't refresh }, { // This test sends a 1PC batch with Put+EndTxn. @@ -2763,6 +2799,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) // will be a 1PC, won't get auto retry }, // No retries, server-side refresh, 1pc commit. + expOnePhaseCommit: true, }, { // This test is like the previous one in that the commit batch succeeds at @@ -2787,7 +2824,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // The request will succeed after a server-side refresh. - txnCoordRetry: false, + expClientAutoRetryAfterRefresh: false, }, { name: "write too old with cput in batch commit", @@ -2807,6 +2844,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { // WriteTooOldError, and then once at the pushed timestamp. The // server-side retry is enabled by the fact that there have not been any // previous reads and so the transaction can commit at a pushed timestamp. + expOnePhaseCommit: true, }, { // This test is like the previous one, except the 1PC batch cannot commit @@ -2837,7 +2875,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.Put("c", "put") return txn.CommitInBatch(ctx, b) }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "multi-range batch with forwarded timestamp and cput", @@ -2873,7 +2912,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.Put("c", "put") return txn.CommitInBatch(ctx, b) // both puts will succeed, et will retry from get }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "multi-range batch with forwarded timestamp and cput and delete range", @@ -2890,7 +2930,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value")) return txn.CommitInBatch(ctx, b) // both puts will succeed, et will retry }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "multi-range batch with write too old", @@ -2903,7 +2944,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.Put("c", "put") return txn.CommitInBatch(ctx, b) // put to c will return WriteTooOldError }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "multi-range batch with write too old and failed cput", @@ -2919,8 +2961,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.Put("c", "put") return txn.CommitInBatch(ctx, b) }, - txnCoordRetry: false, // non-matching value means we fail txn coord retry - expFailure: "unexpected value", // the failure we get is a condition failed error + expClientAutoRetryAfterRefresh: false, // non-matching value means we fail txn coord retry + expFailure: "unexpected value", // the failure we get is a condition failed error }, { name: "multi-range batch with write too old and successful cput", @@ -2937,7 +2979,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return txn.CommitInBatch(ctx, b) }, // We expect the request to succeed after a server-side retry. - txnCoordRetry: false, + expClientAutoRetryAfterRefresh: false, }, { // This test checks the behavior of batches that were split by the @@ -2975,7 +3017,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return txn.Commit(ctx) }, - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "cput within uncertainty interval", @@ -2987,7 +3030,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, filter: newUncertaintyFilter(roachpb.Key("a")), // We expect the request to succeed after a server-side retry. - txnCoordRetry: false, + expClientAutoRetryAfterRefresh: false, }, { name: "cput within uncertainty interval after timestamp leaked", @@ -2997,9 +3040,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { retryable: func(ctx context.Context, txn *kv.Txn) error { return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value")) }, - filter: newUncertaintyFilter(roachpb.Key("a")), - clientRetry: true, - tsLeaked: true, + filter: newUncertaintyFilter(roachpb.Key("a")), + expClientRestart: true, + tsLeaked: true, }, { name: "reads within uncertainty interval", @@ -3018,8 +3061,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return txn.CPut(ctx, "a", "cput", kvclientutils.StrToCPutExistingValue("value")) }, - filter: newUncertaintyFilter(roachpb.Key("ac")), - txnCoordRetry: true, + filter: newUncertaintyFilter(roachpb.Key("ac")), + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "reads within uncertainty interval and violating concurrent put", @@ -3041,8 +3085,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { } return nil }, - filter: newUncertaintyFilter(roachpb.Key("ac")), - clientRetry: true, // note this txn is read-only but still restarts + filter: newUncertaintyFilter(roachpb.Key("ac")), + expClientRestart: true, // note this txn is read-only but still restarts }, { name: "multi-range batch with uncertainty interval error", @@ -3057,8 +3101,9 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.CPut("c", "cput", kvclientutils.StrToCPutExistingValue("value")) return txn.CommitInBatch(ctx, b) }, - filter: newUncertaintyFilter(roachpb.Key("c")), - txnCoordRetry: true, + filter: newUncertaintyFilter(roachpb.Key("c")), + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "multi-range batch with uncertainty interval error and get conflict", @@ -3079,8 +3124,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { b.CPut("a", "cput", kvclientutils.StrToCPutExistingValue("value")) return txn.CommitInBatch(ctx, b) }, - filter: newUncertaintyFilter(roachpb.Key("a")), - clientRetry: true, // will fail because of conflict on refresh span for the Get + filter: newUncertaintyFilter(roachpb.Key("a")), + expClientRestart: true, // will fail because of conflict on refresh span for the Get }, { name: "multi-range batch with uncertainty interval error and mixed success", @@ -3095,7 +3140,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, filter: newUncertaintyFilter(roachpb.Key("c")), // Expect a transaction coord retry, which should succeed. - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "multi-range scan with uncertainty interval error", @@ -3105,7 +3151,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, filter: newUncertaintyFilter(roachpb.Key("c")), // Expect a transaction coord retry, which should succeed. - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "multi-range delete range with uncertainty interval error", @@ -3115,7 +3162,8 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, filter: newUncertaintyFilter(roachpb.Key("c")), // Expect a transaction coord retry, which should succeed. - txnCoordRetry: true, + expClientRefresh: true, + expClientAutoRetryAfterRefresh: true, }, { name: "missing pipelined write caught on chain", @@ -3142,7 +3190,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return err }, // The missing intent write results in a RETRY_ASYNC_WRITE_FAILURE error. - clientRetry: true, + expClientRestart: true, }, { name: "missing pipelined write caught on commit", @@ -3167,7 +3215,7 @@ func TestTxnCoordSenderRetries(t *testing.T) { return nil // commit }, // The missing intent write results in a RETRY_ASYNC_WRITE_FAILURE error. - clientRetry: true, + expClientRestart: true, }, } @@ -3179,40 +3227,49 @@ func TestTxnCoordSenderRetries(t *testing.T) { } } + filterFn.Store((func(kvserverbase.FilterArgs) *kvpb.Error)(nil)) if tc.filter != nil { filterFn.Store(tc.filter) - defer filterFn.Store((func(kvserverbase.FilterArgs) *kvpb.Error)(nil)) } + refreshSpansCondenseFilter.Store((func() bool)(nil)) if tc.refreshSpansCondenseFilter != nil { refreshSpansCondenseFilter.Store(tc.refreshSpansCondenseFilter) - defer refreshSpansCondenseFilter.Store((func() bool)(nil)) } - var metrics kvcoord.TxnMetrics - var lastAutoRetries int64 - var hadClientRetry bool - epoch := 0 - if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - if tc.priorReads { - _, err := txn.Get(ctx, "prior read") - if err != nil { - t.Fatalf("unexpected error during prior read: %v", err) - } + // Construct a new DB with a fresh set of TxnMetrics. This allows the test + // to precisely assert on the metrics without having to worry about other + // transactions in the system affecting them. + metrics := kvcoord.MakeTxnMetrics(metric.TestSampleInterval) + tcsFactoryCfg := kvcoord.TxnCoordSenderFactoryConfig{ + AmbientCtx: s.AmbientCtx(), + Settings: s.ClusterSettings(), + Clock: s.Clock(), + Stopper: s.Stopper(), + Metrics: metrics, + TestingKnobs: *s.TestingKnobs().KVClient.(*kvcoord.ClientTestingKnobs), + } + distSender := s.DistSenderI().(*kvcoord.DistSender) + tcsFactory := kvcoord.NewTxnCoordSenderFactory(tcsFactoryCfg, distSender) + testDB := kv.NewDBWithContext(s.AmbientCtx(), tcsFactory, s.Clock(), db.Context()) + + err := testDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + if txn.Epoch() > 0 { + // We expected a new epoch and got it; return success. + return nil } + if tc.tsLeaked { // Read the commit timestamp so the expectation is that // this transaction cannot be restarted internally. _ = txn.CommitTimestamp() } - if epoch > 0 { - if !tc.clientRetry { - t.Fatal("expected txn coord sender to retry, but got client-side retry") + + if tc.priorReads { + _, err := txn.Get(ctx, "prior read") + if err != nil { + t.Fatalf("unexpected error during prior read: %v", err) } - hadClientRetry = true - // We expected a new epoch and got it; return success. - return nil } - defer func() { epoch++ }() if tc.afterTxnStart != nil { if err := tc.afterTxnStart(ctx, db); err != nil { @@ -3220,34 +3277,22 @@ func TestTxnCoordSenderRetries(t *testing.T) { } } - metrics = txn.Sender().(*kvcoord.TxnCoordSender).TxnCoordSenderFactory.Metrics() - lastAutoRetries = metrics.RefreshAutoRetries.Count() - return tc.retryable(ctx, txn) - }); err != nil { - if len(tc.expFailure) == 0 || !testutils.IsError(err, tc.expFailure) { - t.Fatal(err) - } + }) + + // Verify success or failure. + if len(tc.expFailure) == 0 { + require.NoError(t, err) } else { - if len(tc.expFailure) > 0 { - t.Errorf("expected failure %q", tc.expFailure) - } - } - // Verify auto retry metric. Because there's a chance that splits - // from the cluster setup are still ongoing and can experience - // their own retries, this might increase by more than one, so we - // can only check here that it's >= 1. - autoRetries := metrics.RefreshAutoRetries.Count() - lastAutoRetries - if tc.txnCoordRetry && autoRetries == 0 { - t.Errorf("expected [at least] one txn coord sender auto retry; got %d", autoRetries) - } else if !tc.txnCoordRetry && autoRetries != 0 { - t.Errorf("expected no txn coord sender auto retries; got %d", autoRetries) - } - if tc.clientRetry && !hadClientRetry { - t.Errorf("expected but did not experience client retry") - } else if !tc.clientRetry && hadClientRetry { - t.Errorf("did not expect but experienced client retry") + require.Error(t, err) + require.Regexp(t, tc.expFailure, err) } + + // Verify metrics. + require.Equal(t, tc.expClientRefresh, metrics.RefreshSuccess.Count() != 0, "TxnMetrics.RefreshSuccess") + require.Equal(t, tc.expClientAutoRetryAfterRefresh, metrics.RefreshAutoRetries.Count() != 0, "TxnMetrics.RefreshAutoRetries") + require.Equal(t, tc.expClientRestart, metrics.Restarts.TotalSum() != 0, "TxnMetrics.Restarts") + require.Equal(t, tc.expOnePhaseCommit, metrics.Commits1PC.Count() != 0, "TxnMetrics.Commits1PC") }) } }