From ef7732270b1903272c8c7848a5ddba6d3e49fe34 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 9 May 2023 11:51:45 -0400 Subject: [PATCH] kv: don't refresh on WriteTooOld flag with non-PENDING status This commit adjusts the `txnSpanRefresher` to not refresh a transaction's read timestamp when the transaction has its WriteTooOld flag set but it is not PENDING. This is precautionary. It's not clear that this is possible to hit today, but it would be a serous problem if it was, as it could allow us to swallow a commit and then consider a committed transaction to be pending/aborted. It will also become possible if we move the `txnSpanRefresher` below the `txnCommitter` in the interceptor stack, which I plan to do as part of supporting Snapshot isolation. Epic: None Release note: None --- .../kvcoord/txn_interceptor_span_refresher.go | 8 +++ .../txn_interceptor_span_refresher_test.go | 57 ++++++++++++++++++- 2 files changed, 62 insertions(+), 3 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go index 979c7bc2351d..3ed00cc02bd6 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go @@ -232,6 +232,14 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts( } if pErr == nil && br.Txn.WriteTooOld { + // If the transaction is no longer pending, terminate the WriteTooOld flag + // without hitting the logic below. It's not clear that this can happen in + // practice, but it's better to be safe. + if br.Txn.Status != roachpb.PENDING { + br.Txn = br.Txn.Clone() + br.Txn.WriteTooOld = false + return br, nil + } // If we got a response with the WriteTooOld flag set, then we pretend that // we got a WriteTooOldError, which will cause us to attempt to refresh and // propagate the error if we failed. When it can, the server prefers to diff --git a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go index f9e49bdaaefc..7b9b5bf3f8d5 100644 --- a/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go @@ -129,6 +129,7 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { pErr func() *kvpb.Error expRefresh bool expRefreshTS hlc.Timestamp + expErr bool }{ { pErr: func() *kvpb.Error { @@ -180,12 +181,14 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { return kvpb.NewErrorf("no refresh") }, expRefresh: false, + expErr: true, }, { - name: "write_too_old flag", + name: "write_too_old flag (pending)", onFirstSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { br := ba.CreateReply() br.Txn = ba.Txn.Clone() + br.Txn.Status = roachpb.PENDING br.Txn.WriteTooOld = true br.Txn.WriteTimestamp = txn.WriteTimestamp.Add(20, 1) return br, nil @@ -193,6 +196,45 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { expRefresh: true, expRefreshTS: txn.WriteTimestamp.Add(20, 1), // Same as br.Txn.WriteTimestamp. }, + { + name: "write_too_old flag (staging)", + onFirstSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + br := ba.CreateReply() + br.Txn = ba.Txn.Clone() + br.Txn.Status = roachpb.STAGING + br.Txn.WriteTooOld = true + br.Txn.WriteTimestamp = txn.WriteTimestamp.Add(20, 1) + return br, nil + }, + expRefresh: false, + expErr: false, + }, + { + name: "write_too_old flag (committed)", + onFirstSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + br := ba.CreateReply() + br.Txn = ba.Txn.Clone() + br.Txn.Status = roachpb.COMMITTED + br.Txn.WriteTooOld = true + br.Txn.WriteTimestamp = txn.WriteTimestamp.Add(20, 1) + return br, nil + }, + expRefresh: false, + expErr: false, + }, + { + name: "write_too_old flag (aborted)", + onFirstSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) { + br := ba.CreateReply() + br.Txn = ba.Txn.Clone() + br.Txn.Status = roachpb.ABORTED + br.Txn.WriteTooOld = true + br.Txn.WriteTimestamp = txn.WriteTimestamp.Add(20, 1) + return br, nil + }, + expRefresh: false, + expErr: false, + }, } for _, tc := range cases { name := tc.name @@ -279,6 +321,8 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { if tc.expRefresh { require.Nil(t, pErr) require.NotNil(t, br) + require.NotNil(t, br.Txn) + require.False(t, br.Txn.WriteTooOld) require.Equal(t, tc.expRefreshTS, br.Txn.WriteTimestamp) require.Equal(t, tc.expRefreshTS, br.Txn.ReadTimestamp) require.Equal(t, tc.expRefreshTS, tsr.refreshedTimestamp) @@ -287,8 +331,15 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) { require.Equal(t, int64(1), tsr.metrics.ClientRefreshAutoRetries.Count()) require.Equal(t, int64(0), tsr.metrics.ServerRefreshSuccess.Count()) } else { - require.Nil(t, br) - require.NotNil(t, pErr) + if tc.expErr { + require.Nil(t, br) + require.NotNil(t, pErr) + } else { + require.Nil(t, pErr) + require.NotNil(t, br) + require.NotNil(t, br.Txn) + require.False(t, br.Txn.WriteTooOld) + } require.Zero(t, tsr.refreshedTimestamp) require.Equal(t, int64(0), tsr.metrics.ClientRefreshSuccess.Count()) require.Equal(t, int64(0), tsr.metrics.ClientRefreshFail.Count())