Skip to content

Commit

Permalink
Merge #102960
Browse files Browse the repository at this point in the history
102960: kv: don't refresh on WriteTooOld flag with non-PENDING status r=arulajmani a=nvanbenschoten

This commit adjusts the `txnSpanRefresher` to not refresh a transaction's read timestamp when the transaction has its WriteTooOld flag set but it is not PENDING. This is precautionary. It's not clear that this is possible to hit today, but it would be a serous problem if it was, as it could allow us to swallow a commit and then consider a committed transaction to be pending/aborted. It will also become possible if we move the `txnSpanRefresher` below the `txnCommitter` in the interceptor stack, which I plan to do as part of supporting Snapshot isolation.

Epic: None
Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed May 11, 2023
2 parents 79a236b + ef77322 commit 0c00e43
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 3 deletions.
8 changes: 8 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,14 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts(
}

if pErr == nil && br.Txn.WriteTooOld {
// If the transaction is no longer pending, terminate the WriteTooOld flag
// without hitting the logic below. It's not clear that this can happen in
// practice, but it's better to be safe.
if br.Txn.Status != roachpb.PENDING {
br.Txn = br.Txn.Clone()
br.Txn.WriteTooOld = false
return br, nil
}
// If we got a response with the WriteTooOld flag set, then we pretend that
// we got a WriteTooOldError, which will cause us to attempt to refresh and
// propagate the error if we failed. When it can, the server prefers to
Expand Down
57 changes: 54 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
pErr func() *kvpb.Error
expRefresh bool
expRefreshTS hlc.Timestamp
expErr bool
}{
{
pErr: func() *kvpb.Error {
Expand Down Expand Up @@ -180,19 +181,60 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
return kvpb.NewErrorf("no refresh")
},
expRefresh: false,
expErr: true,
},
{
name: "write_too_old flag",
name: "write_too_old flag (pending)",
onFirstSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()
br.Txn.Status = roachpb.PENDING
br.Txn.WriteTooOld = true
br.Txn.WriteTimestamp = txn.WriteTimestamp.Add(20, 1)
return br, nil
},
expRefresh: true,
expRefreshTS: txn.WriteTimestamp.Add(20, 1), // Same as br.Txn.WriteTimestamp.
},
{
name: "write_too_old flag (staging)",
onFirstSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()
br.Txn.Status = roachpb.STAGING
br.Txn.WriteTooOld = true
br.Txn.WriteTimestamp = txn.WriteTimestamp.Add(20, 1)
return br, nil
},
expRefresh: false,
expErr: false,
},
{
name: "write_too_old flag (committed)",
onFirstSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()
br.Txn.Status = roachpb.COMMITTED
br.Txn.WriteTooOld = true
br.Txn.WriteTimestamp = txn.WriteTimestamp.Add(20, 1)
return br, nil
},
expRefresh: false,
expErr: false,
},
{
name: "write_too_old flag (aborted)",
onFirstSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()
br.Txn.Status = roachpb.ABORTED
br.Txn.WriteTooOld = true
br.Txn.WriteTimestamp = txn.WriteTimestamp.Add(20, 1)
return br, nil
},
expRefresh: false,
expErr: false,
},
}
for _, tc := range cases {
name := tc.name
Expand Down Expand Up @@ -279,6 +321,8 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
if tc.expRefresh {
require.Nil(t, pErr)
require.NotNil(t, br)
require.NotNil(t, br.Txn)
require.False(t, br.Txn.WriteTooOld)
require.Equal(t, tc.expRefreshTS, br.Txn.WriteTimestamp)
require.Equal(t, tc.expRefreshTS, br.Txn.ReadTimestamp)
require.Equal(t, tc.expRefreshTS, tsr.refreshedTimestamp)
Expand All @@ -287,8 +331,15 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
require.Equal(t, int64(1), tsr.metrics.ClientRefreshAutoRetries.Count())
require.Equal(t, int64(0), tsr.metrics.ServerRefreshSuccess.Count())
} else {
require.Nil(t, br)
require.NotNil(t, pErr)
if tc.expErr {
require.Nil(t, br)
require.NotNil(t, pErr)
} else {
require.Nil(t, pErr)
require.NotNil(t, br)
require.NotNil(t, br.Txn)
require.False(t, br.Txn.WriteTooOld)
}
require.Zero(t, tsr.refreshedTimestamp)
require.Equal(t, int64(0), tsr.metrics.ClientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshFail.Count())
Expand Down

0 comments on commit 0c00e43

Please sign in to comment.