Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
102504: kv: enable no-op preemptive refreshes under under weak isolation levels r=nvanbenschoten a=nvanbenschoten

This PR adds a new test variant to `TestTxnCoordSenderRetries` where a no-op preemptive refresh avoids a client-side transaction retry by refreshing before the transaction establishes read spans (which would fail a refresh). The test initially fails for Snapshot isolation transactions (not tested in CI), which never perform preemptive refreshes.

In 33fda25, we disabled preemptive refreshes for weak isolation transactions. After adding the test, this PR refines that logic by enabling preemptive refreshes for these isolation levels if the refresh is free (as opposed to inevitable). As the test shows, this can avoid client-side retries, even in Snapshot transactions.

Epic: None
Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed May 1, 2023
2 parents c6d76de + e663100 commit 2924cc5
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 31 deletions.
30 changes: 30 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2452,6 +2452,36 @@ func TestTxnCoordSenderRetries(t *testing.T) {
},
expClientRestart: true,
},
{
name: "write too old with get conflict after forwarded timestamp",
afterTxnStart: func(ctx context.Context, db *kv.DB) error {
otherTxn := db.NewTxn(ctx, "afterTxnStart")
b := otherTxn.NewBatch()
// Set ts cache on "a".
b.Get("a")
// Create write-write conflict on "b".
b.Put("b", "put")
return otherTxn.CommitInBatch(ctx, b)
},
retryable: func(ctx context.Context, txn *kv.Txn) error {
// Put to "a" to advance the txn timestamp.
if err := txn.Put(ctx, "a", "put"); err != nil {
return err
}
// Get from "b" to establish a read span. It is important that we
// perform a preemptive refresh before this read, otherwise the refresh
// would fail.
if _, err := txn.Get(ctx, "b"); err != nil {
return err
}
// Now, Put to "b", which would have thrown a write-too-old error had
// the transaction not preemptively refreshed before the Get.
return txn.Put(ctx, "b", "put")
},
// No retry, preemptive refresh before Get.
expClientRefresh: true,
expClientAutoRetryAfterRefresh: false,
},
{
name: "write too old with multiple puts to same key",
beforeTxnStart: func(ctx context.Context, db *kv.DB) error {
Expand Down
17 changes: 7 additions & 10 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,21 +426,18 @@ func (sr *txnSpanRefresher) maybeRefreshPreemptively(
return ba, nil
}

// If the transaction can tolerate write skew, no preemptive refresh is
// necessary, even if its write timestamp has been bumped. Transactions run at
// weak isolation levels may refresh in response to WriteTooOld errors or
// ReadWithinUncertaintyInterval errors returned by requests, but they do not
// need to refresh preemptively ahead of an EndTxn request.
if ba.Txn.IsoLevel.ToleratesWriteSkew() {
return ba, nil
}

// If true, tryRefreshTxnSpans will trivially succeed.
refreshFree := ba.CanForwardReadTimestamp

// If true, this batch is guaranteed to fail without a refresh.
args, hasET := ba.GetArg(kvpb.EndTxn)
refreshInevitable := hasET && args.(*kvpb.EndTxnRequest).Commit
refreshInevitable := hasET && args.(*kvpb.EndTxnRequest).Commit &&
// If the transaction can tolerate write skew, no preemptive refresh is
// necessary, even if its write timestamp has been bumped. Transactions run
// at weak isolation levels may refresh in response to WriteTooOld errors or
// ReadWithinUncertaintyInterval errors returned by requests, but they do
// not need to refresh preemptively ahead of an EndTxn request.
!ba.Txn.IsoLevel.ToleratesWriteSkew()

// If neither condition is true, defer the refresh.
if !refreshFree && !refreshInevitable {
Expand Down
70 changes: 49 additions & 21 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,27 +542,45 @@ func TestTxnSpanRefresherPreemptiveRefresh(t *testing.T) {
}

// TestTxnSpanRefresherPreemptiveRefreshIsoLevel tests that the txnSpanRefresher
// only performed preemptive client-side refreshes of Serializable transactions.
// only performed preemptive client-side refreshes of Serializable transactions,
// except when the preemptive refresh is free.
func TestTxnSpanRefresherPreemptiveRefreshIsoLevel(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tests := []struct {
isoLevel isolation.Level
expRefresh bool
isoLevel isolation.Level
noRefreshSpans bool
expRefresh bool
}{
{isolation.Serializable, true},
{isolation.Snapshot, false},
{isolation.ReadCommitted, false},
{isolation.Serializable, false, true},
{isolation.Serializable, true, true},
{isolation.Snapshot, false, false},
{isolation.Snapshot, true, true},
{isolation.ReadCommitted, false, false},
{isolation.ReadCommitted, true, true},
}
for _, tt := range tests {
t.Run(tt.isoLevel.String(), func(t *testing.T) {
name := fmt.Sprintf("iso=%s,noRefreshSpans=%t", tt.isoLevel, tt.noRefreshSpans)
t.Run(name, func(t *testing.T) {
ctx := context.Background()
tsr, mockSender := makeMockTxnSpanRefresher()

txn := makeTxnProto()
txn.IsoLevel = tt.isoLevel

// Add refresh spans, if necessary.
if !tt.noRefreshSpans {
ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: &txn}
ba.Add(&kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: txn.Key}})

br, pErr := tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
}
require.Equal(t, tt.noRefreshSpans, tsr.refreshFootprint.empty())

// Push the txn.
txn.WriteTimestamp = txn.WriteTimestamp.Add(1, 0)
origReadTs := txn.ReadTimestamp
Expand All @@ -574,21 +592,30 @@ func TestTxnSpanRefresherPreemptiveRefreshIsoLevel(t *testing.T) {
etArgs := kvpb.EndTxnRequest{Commit: true}
ba.Add(&etArgs)

var sawRefreshReq bool
expRefreshReq := tt.expRefresh && !tt.noRefreshSpans
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 1)
require.True(t, ba.CanForwardReadTimestamp)
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner())

if tt.expRefresh {
// The transaction should be refreshed.
require.NotEqual(t, origReadTs, ba.Txn.ReadTimestamp)
require.Equal(t, pushedWriteTs, ba.Txn.ReadTimestamp)
require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp)
} else {
// The transaction should not be refreshed.
require.Equal(t, origReadTs, ba.Txn.ReadTimestamp)
require.NotEqual(t, pushedWriteTs, ba.Txn.ReadTimestamp)
require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp)
require.Equal(t, tt.noRefreshSpans, ba.CanForwardReadTimestamp)
req := ba.Requests[0].GetInner()
switch req.(type) {
case *kvpb.RefreshRequest:
require.True(t, expRefreshReq)
sawRefreshReq = true
case *kvpb.EndTxnRequest:
if tt.expRefresh {
// The transaction should be refreshed.
require.NotEqual(t, origReadTs, ba.Txn.ReadTimestamp)
require.Equal(t, pushedWriteTs, ba.Txn.ReadTimestamp)
require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp)
} else {
// The transaction should not be refreshed.
require.Equal(t, origReadTs, ba.Txn.ReadTimestamp)
require.NotEqual(t, pushedWriteTs, ba.Txn.ReadTimestamp)
require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp)
}
default:
t.Fatalf("unexpected request: %T", req)
}

br := ba.CreateReply()
Expand All @@ -599,6 +626,7 @@ func TestTxnSpanRefresherPreemptiveRefreshIsoLevel(t *testing.T) {
br, pErr := tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, expRefreshReq, sawRefreshReq)

expRefreshSuccess := int64(0)
if tt.expRefresh {
Expand All @@ -608,7 +636,7 @@ func TestTxnSpanRefresherPreemptiveRefreshIsoLevel(t *testing.T) {
require.Equal(t, int64(0), tsr.metrics.ClientRefreshFail.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshAutoRetries.Count())
require.Equal(t, int64(0), tsr.metrics.ServerRefreshSuccess.Count())
require.True(t, tsr.refreshFootprint.empty())
require.Equal(t, tt.noRefreshSpans, tsr.refreshFootprint.empty())
require.False(t, tsr.refreshInvalid)
})
}
Expand Down

0 comments on commit 2924cc5

Please sign in to comment.