Skip to content

Commit

Permalink
kv: enable no-op preemptive refreshes under under weak isolation levels
Browse files Browse the repository at this point in the history
In 33fda25, we disabled preemptive refreshes for weak isolation transactions.
This commit refines that logic by enabling preemptive refreshes for these
isolation levels if the refresh is free (as opposed to inevitable). As the test
added in the previous commit shows, this can avoid client-side retries.

Epic: None
Release note: None
  • Loading branch information
nvanbenschoten committed Apr 30, 2023
1 parent 0c2ab63 commit e663100
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 31 deletions.
17 changes: 7 additions & 10 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,21 +426,18 @@ func (sr *txnSpanRefresher) maybeRefreshPreemptively(
return ba, nil
}

// If the transaction can tolerate write skew, no preemptive refresh is
// necessary, even if its write timestamp has been bumped. Transactions run at
// weak isolation levels may refresh in response to WriteTooOld errors or
// ReadWithinUncertaintyInterval errors returned by requests, but they do not
// need to refresh preemptively ahead of an EndTxn request.
if ba.Txn.IsoLevel.ToleratesWriteSkew() {
return ba, nil
}

// If true, tryRefreshTxnSpans will trivially succeed.
refreshFree := ba.CanForwardReadTimestamp

// If true, this batch is guaranteed to fail without a refresh.
args, hasET := ba.GetArg(kvpb.EndTxn)
refreshInevitable := hasET && args.(*kvpb.EndTxnRequest).Commit
refreshInevitable := hasET && args.(*kvpb.EndTxnRequest).Commit &&
// If the transaction can tolerate write skew, no preemptive refresh is
// necessary, even if its write timestamp has been bumped. Transactions run
// at weak isolation levels may refresh in response to WriteTooOld errors or
// ReadWithinUncertaintyInterval errors returned by requests, but they do
// not need to refresh preemptively ahead of an EndTxn request.
!ba.Txn.IsoLevel.ToleratesWriteSkew()

// If neither condition is true, defer the refresh.
if !refreshFree && !refreshInevitable {
Expand Down
70 changes: 49 additions & 21 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,27 +542,45 @@ func TestTxnSpanRefresherPreemptiveRefresh(t *testing.T) {
}

// TestTxnSpanRefresherPreemptiveRefreshIsoLevel tests that the txnSpanRefresher
// only performed preemptive client-side refreshes of Serializable transactions.
// only performed preemptive client-side refreshes of Serializable transactions,
// except when the preemptive refresh is free.
func TestTxnSpanRefresherPreemptiveRefreshIsoLevel(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

tests := []struct {
isoLevel isolation.Level
expRefresh bool
isoLevel isolation.Level
noRefreshSpans bool
expRefresh bool
}{
{isolation.Serializable, true},
{isolation.Snapshot, false},
{isolation.ReadCommitted, false},
{isolation.Serializable, false, true},
{isolation.Serializable, true, true},
{isolation.Snapshot, false, false},
{isolation.Snapshot, true, true},
{isolation.ReadCommitted, false, false},
{isolation.ReadCommitted, true, true},
}
for _, tt := range tests {
t.Run(tt.isoLevel.String(), func(t *testing.T) {
name := fmt.Sprintf("iso=%s,noRefreshSpans=%t", tt.isoLevel, tt.noRefreshSpans)
t.Run(name, func(t *testing.T) {
ctx := context.Background()
tsr, mockSender := makeMockTxnSpanRefresher()

txn := makeTxnProto()
txn.IsoLevel = tt.isoLevel

// Add refresh spans, if necessary.
if !tt.noRefreshSpans {
ba := &kvpb.BatchRequest{}
ba.Header = kvpb.Header{Txn: &txn}
ba.Add(&kvpb.GetRequest{RequestHeader: kvpb.RequestHeader{Key: txn.Key}})

br, pErr := tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
}
require.Equal(t, tt.noRefreshSpans, tsr.refreshFootprint.empty())

// Push the txn.
txn.WriteTimestamp = txn.WriteTimestamp.Add(1, 0)
origReadTs := txn.ReadTimestamp
Expand All @@ -574,21 +592,30 @@ func TestTxnSpanRefresherPreemptiveRefreshIsoLevel(t *testing.T) {
etArgs := kvpb.EndTxnRequest{Commit: true}
ba.Add(&etArgs)

var sawRefreshReq bool
expRefreshReq := tt.expRefresh && !tt.noRefreshSpans
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
require.Len(t, ba.Requests, 1)
require.True(t, ba.CanForwardReadTimestamp)
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner())

if tt.expRefresh {
// The transaction should be refreshed.
require.NotEqual(t, origReadTs, ba.Txn.ReadTimestamp)
require.Equal(t, pushedWriteTs, ba.Txn.ReadTimestamp)
require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp)
} else {
// The transaction should not be refreshed.
require.Equal(t, origReadTs, ba.Txn.ReadTimestamp)
require.NotEqual(t, pushedWriteTs, ba.Txn.ReadTimestamp)
require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp)
require.Equal(t, tt.noRefreshSpans, ba.CanForwardReadTimestamp)
req := ba.Requests[0].GetInner()
switch req.(type) {
case *kvpb.RefreshRequest:
require.True(t, expRefreshReq)
sawRefreshReq = true
case *kvpb.EndTxnRequest:
if tt.expRefresh {
// The transaction should be refreshed.
require.NotEqual(t, origReadTs, ba.Txn.ReadTimestamp)
require.Equal(t, pushedWriteTs, ba.Txn.ReadTimestamp)
require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp)
} else {
// The transaction should not be refreshed.
require.Equal(t, origReadTs, ba.Txn.ReadTimestamp)
require.NotEqual(t, pushedWriteTs, ba.Txn.ReadTimestamp)
require.Equal(t, pushedWriteTs, ba.Txn.WriteTimestamp)
}
default:
t.Fatalf("unexpected request: %T", req)
}

br := ba.CreateReply()
Expand All @@ -599,6 +626,7 @@ func TestTxnSpanRefresherPreemptiveRefreshIsoLevel(t *testing.T) {
br, pErr := tsr.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
require.Equal(t, expRefreshReq, sawRefreshReq)

expRefreshSuccess := int64(0)
if tt.expRefresh {
Expand All @@ -608,7 +636,7 @@ func TestTxnSpanRefresherPreemptiveRefreshIsoLevel(t *testing.T) {
require.Equal(t, int64(0), tsr.metrics.ClientRefreshFail.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshAutoRetries.Count())
require.Equal(t, int64(0), tsr.metrics.ServerRefreshSuccess.Count())
require.True(t, tsr.refreshFootprint.empty())
require.Equal(t, tt.noRefreshSpans, tsr.refreshFootprint.empty())
require.False(t, tsr.refreshInvalid)
})
}
Expand Down

0 comments on commit e663100

Please sign in to comment.