From 0c2ab639645ff24234e3e709c76f5501a5b26235 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 27 Apr 2023 19:17:56 -0400 Subject: [PATCH 1/2] kv: add test case where no-op preemptive refresh avoids retry This commit adds a new test variant to `TestTxnCoordSenderRetries` where a no-op preemptive refresh avoids a client-side transaction retry by refreshing before the transaction establishes read spans (which would fail a refresh). The test currently fails for Snapshot isolation transactions, which never perform preemptive refreshes. Epic: None Release note: None --- .../kvcoord/dist_sender_server_test.go | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go index 03b9fd2b83f0..6c0a6230cd28 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_server_test.go @@ -2452,6 +2452,36 @@ func TestTxnCoordSenderRetries(t *testing.T) { }, expClientRestart: true, }, + { + name: "write too old with get conflict after forwarded timestamp", + afterTxnStart: func(ctx context.Context, db *kv.DB) error { + otherTxn := db.NewTxn(ctx, "afterTxnStart") + b := otherTxn.NewBatch() + // Set ts cache on "a". + b.Get("a") + // Create write-write conflict on "b". + b.Put("b", "put") + return otherTxn.CommitInBatch(ctx, b) + }, + retryable: func(ctx context.Context, txn *kv.Txn) error { + // Put to "a" to advance the txn timestamp. + if err := txn.Put(ctx, "a", "put"); err != nil { + return err + } + // Get from "b" to establish a read span. It is important that we + // perform a preemptive refresh before this read, otherwise the refresh + // would fail. + if _, err := txn.Get(ctx, "b"); err != nil { + return err + } + // Now, Put to "b", which would have thrown a write-too-old error had + // the transaction not preemptively refreshed before the Get. + return txn.Put(ctx, "b", "put") + }, + // No retry, preemptive refresh before Get. + expClientRefresh: true, + expClientAutoRetryAfterRefresh: false, + }, { name: "write too old with multiple puts to same key", beforeTxnStart: func(ctx context.Context, db *kv.DB) error { From e663100011f91bd7ad1e44a4dfc3395cfc6ad979 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 27 Apr 2023 19:36:42 -0400 Subject: [PATCH 2/2] kv: enable no-op preemptive refreshes under under weak isolation levels In 33fda25c, we disabled preemptive refreshes for weak isolation transactions. This commit refines that logic by enabling preemptive refreshes for these isolation levels if the refresh is free (as opposed to inevitable). As the test added in the previous commit shows, this can avoid client-side retries. Epic: None Release note: None --- .../kvcoord/txn_interceptor_span_refresher.go | 17 ++--- .../txn_interceptor_span_refresher_test.go | 70 +++++++++++++------ 2 files changed, 56 insertions(+), 31 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index bc56da8a996a..979c7bc2351d 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -426,21 +426,18 @@ func (sr *txnSpanRefresher) maybeRefreshPreemptively( return ba, nil } - // If the transaction can tolerate write skew, no preemptive refresh is - // necessary, even if its write timestamp has been bumped. Transactions run at - // weak isolation levels may refresh in response to WriteTooOld errors or - // ReadWithinUncertaintyInterval errors returned by requests, but they do not - // need to refresh preemptively ahead of an EndTxn request. - if ba.Txn.IsoLevel.ToleratesWriteSkew() { - return ba, nil - } - // If true, tryRefreshTxnSpans will trivially succeed. refreshFree := ba.CanForwardReadTimestamp // If true, this batch is guaranteed to fail without a refresh. args, hasET := ba.GetArg(kvpb.EndTxn) - refreshInevitable := hasET && args.(*kvpb.EndTxnRequest).Commit + refreshInevitable := hasET && args.(*kvpb.EndTxnRequest).Commit && + // If the transaction can tolerate write skew, no preemptive refresh is + // necessary, even if its write timestamp has been bumped. Transactions run + // at weak isolation levels may refresh in response to WriteTooOld errors or + // ReadWithinUncertaintyInterval errors returned by requests, but they do + // not need to refresh preemptively ahead of an EndTxn request. + !ba.Txn.IsoLevel.ToleratesWriteSkew() // If neither condition is true, defer the refresh. if !refreshFree && !refreshInevitable { diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go index ea807e383260..f9e49bdaaefc 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go @@ -542,27 +542,45 @@ func TestTxnSpanRefresherPreemptiveRefresh(t *testing.T) { } // TestTxnSpanRefresherPreemptiveRefreshIsoLevel tests that the txnSpanRefresher -// only performed preemptive client-side refreshes of Serializable transactions. +// only performed preemptive client-side refreshes of Serializable transactions, +// except when the preemptive refresh is free. func TestTxnSpanRefresherPreemptiveRefreshIsoLevel(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) tests := []struct { - isoLevel isolation.Level - expRefresh bool + isoLevel isolation.Level + noRefreshSpans bool + expRefresh bool }{ - {isolation.Serializable, true}, - {isolation.Snapshot, false}, - {isolation.ReadCommitted, false}, + {isolation.Serializable, false, true}, + {isolation.Serializable, true, true}, + {isolation.Snapshot, false, false}, + {isolation.Snapshot, true, true}, + {isolation.ReadCommitted, false, false}, + {isolation.ReadCommitted, true, true}, } for _, tt := range tests { - t.Run(tt.isoLevel.String(), func(t *testing.T) { + name := fmt.Sprintf("iso=%s,noRefreshSpans=%t", tt.isoLevel, tt.noRefreshSpans) + t.Run(name, func(t *testing.T) { ctx := context.Background() tsr, mockSender := makeMockTxnSpanRefresher() txn := makeTxnProto() txn.IsoLevel = tt.isoLevel + // Add refresh spans, if necessary. + if !tt.noRefreshSpans { + ba := &kvpb.BatchRequest{} + ba.Header = kvpb.Header{Txn: &txn} + ba.Add(&kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: txn.Key}}) + + br, pErr := tsr.SendLocked(ctx, ba) + require.Nil(t, pErr) + require.NotNil(t, br) + } + require.Equal(t, tt.noRefreshSpans, tsr.refreshFootprint.empty()) + // Push the txn. txn.WriteTimestamp = txn.WriteTimestamp.Add(1, 0) origReadTs := txn.ReadTimestamp @@ -574,21 +592,30 @@ func TestTxnSpanRefresherPreemptiveRefreshIsoLevel(t *testing.T) { etArgs := kvpb.EndTxnRequest{Commit: true} ba.Add(&etArgs) + var sawRefreshReq bool + expRefreshReq := tt.expRefresh && !tt.noRefreshSpans mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { require.Len(t, ba.Requests, 1) - require.True(t, ba.CanForwardReadTimestamp) - require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner()) - - if tt.expRefresh { - // The transaction should be refreshed. - require.NotEqual(t, origReadTs, ba.Txn.ReadTimestamp) - require.Equal(t, pushedWriteTs, ba.Txn.ReadTimestamp) - require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp) - } else { - // The transaction should not be refreshed. - require.Equal(t, origReadTs, ba.Txn.ReadTimestamp) - require.NotEqual(t, pushedWriteTs, ba.Txn.ReadTimestamp) - require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp) + require.Equal(t, tt.noRefreshSpans, ba.CanForwardReadTimestamp) + req := ba.Requests[0].GetInner() + switch req.(type) { + case *kvpb.RefreshRequest: + require.True(t, expRefreshReq) + sawRefreshReq = true + case *kvpb.EndTxnRequest: + if tt.expRefresh { + // The transaction should be refreshed. + require.NotEqual(t, origReadTs, ba.Txn.ReadTimestamp) + require.Equal(t, pushedWriteTs, ba.Txn.ReadTimestamp) + require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp) + } else { + // The transaction should not be refreshed. + require.Equal(t, origReadTs, ba.Txn.ReadTimestamp) + require.NotEqual(t, pushedWriteTs, ba.Txn.ReadTimestamp) + require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp) + } + default: + t.Fatalf("unexpected request: %T", req) } br := ba.CreateReply() @@ -599,6 +626,7 @@ func TestTxnSpanRefresherPreemptiveRefreshIsoLevel(t *testing.T) { br, pErr := tsr.SendLocked(ctx, ba) require.Nil(t, pErr) require.NotNil(t, br) + require.Equal(t, expRefreshReq, sawRefreshReq) expRefreshSuccess := int64(0) if tt.expRefresh { @@ -608,7 +636,7 @@ func TestTxnSpanRefresherPreemptiveRefreshIsoLevel(t *testing.T) { require.Equal(t, int64(0), tsr.metrics.ClientRefreshFail.Count()) require.Equal(t, int64(0), tsr.metrics.ClientRefreshAutoRetries.Count()) require.Equal(t, int64(0), tsr.metrics.ServerRefreshSuccess.Count()) - require.True(t, tsr.refreshFootprint.empty()) + require.Equal(t, tt.noRefreshSpans, tsr.refreshFootprint.empty()) require.False(t, tsr.refreshInvalid) }) }