Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: don't refresh on WriteTooOld flag with non-PENDING status #102960

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,14 @@ func (sr *txnSpanRefresher) sendLockedWithRefreshAttempts(
}

if pErr == nil && br.Txn.WriteTooOld {
// If the transaction is no longer pending, terminate the WriteTooOld flag
// without hitting the logic below. It's not clear that this can happen in
// practice, but it's better to be safe.
if br.Txn.Status != roachpb.PENDING {
br.Txn = br.Txn.Clone()
br.Txn.WriteTooOld = false
return br, nil
}
// If we got a response with the WriteTooOld flag set, then we pretend that
// we got a WriteTooOldError, which will cause us to attempt to refresh and
// propagate the error if we failed. When it can, the server prefers to
Expand Down
57 changes: 54 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
pErr func() *kvpb.Error
expRefresh bool
expRefreshTS hlc.Timestamp
expErr bool
}{
{
pErr: func() *kvpb.Error {
Expand Down Expand Up @@ -180,19 +181,60 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
return kvpb.NewErrorf("no refresh")
},
expRefresh: false,
expErr: true,
},
{
name: "write_too_old flag",
name: "write_too_old flag (pending)",
onFirstSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()
br.Txn.Status = roachpb.PENDING
br.Txn.WriteTooOld = true
br.Txn.WriteTimestamp = txn.WriteTimestamp.Add(20, 1)
return br, nil
},
expRefresh: true,
expRefreshTS: txn.WriteTimestamp.Add(20, 1), // Same as br.Txn.WriteTimestamp.
},
{
name: "write_too_old flag (staging)",
onFirstSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()
br.Txn.Status = roachpb.STAGING
br.Txn.WriteTooOld = true
br.Txn.WriteTimestamp = txn.WriteTimestamp.Add(20, 1)
return br, nil
},
expRefresh: false,
expErr: false,
},
{
name: "write_too_old flag (committed)",
onFirstSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()
br.Txn.Status = roachpb.COMMITTED
br.Txn.WriteTooOld = true
br.Txn.WriteTimestamp = txn.WriteTimestamp.Add(20, 1)
return br, nil
},
expRefresh: false,
expErr: false,
},
{
name: "write_too_old flag (aborted)",
onFirstSend: func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
br := ba.CreateReply()
br.Txn = ba.Txn.Clone()
br.Txn.Status = roachpb.ABORTED
br.Txn.WriteTooOld = true
br.Txn.WriteTimestamp = txn.WriteTimestamp.Add(20, 1)
return br, nil
},
expRefresh: false,
expErr: false,
},
}
for _, tc := range cases {
name := tc.name
Expand Down Expand Up @@ -279,6 +321,8 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
if tc.expRefresh {
require.Nil(t, pErr)
require.NotNil(t, br)
require.NotNil(t, br.Txn)
require.False(t, br.Txn.WriteTooOld)
require.Equal(t, tc.expRefreshTS, br.Txn.WriteTimestamp)
require.Equal(t, tc.expRefreshTS, br.Txn.ReadTimestamp)
require.Equal(t, tc.expRefreshTS, tsr.refreshedTimestamp)
Expand All @@ -287,8 +331,15 @@ func TestTxnSpanRefresherRefreshesTransactions(t *testing.T) {
require.Equal(t, int64(1), tsr.metrics.ClientRefreshAutoRetries.Count())
require.Equal(t, int64(0), tsr.metrics.ServerRefreshSuccess.Count())
} else {
require.Nil(t, br)
require.NotNil(t, pErr)
if tc.expErr {
require.Nil(t, br)
require.NotNil(t, pErr)
} else {
require.Nil(t, pErr)
require.NotNil(t, br)
require.NotNil(t, br.Txn)
require.False(t, br.Txn.WriteTooOld)
}
require.Zero(t, tsr.refreshedTimestamp)
require.Equal(t, int64(0), tsr.metrics.ClientRefreshSuccess.Count())
require.Equal(t, int64(0), tsr.metrics.ClientRefreshFail.Count())
Expand Down