Skip to content

Commit

Permalink
kv/kvclient: fix ManualRefresh error handling
Browse files Browse the repository at this point in the history
Fixes cockroachdb#60760.
Fallout from cockroachdb#60567.

The refreshed BatchRequest was nil on the error path, which was
resulting in a nil-pointer exception. This commit fixes this by
passing the original BatchRequest to updateStateLocked, like the
TxnCoordSender normally does.
  • Loading branch information
nvanbenschoten committed Feb 20, 2021
1 parent 557346d commit e5926f9
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 18 deletions.
10 changes: 7 additions & 3 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1158,16 +1158,20 @@ func (tc *TxnCoordSender) ManualRefresh(ctx context.Context) error {

// Hijack the pre-emptive refresh code path to perform the refresh but
// provide the force flag to ensure that the refresh occurs unconditionally.
// We provide an empty BatchRequest - maybeRefreshPreemptivelyLocked just
// needs the transaction proto. The function then returns a BatchRequest
// with the updated transaction proto. We use this updated proto to call
// into updateStateLocked directly.
var ba roachpb.BatchRequest
ba.Txn = tc.mu.txn.Clone()
const force = true
ba, pErr := tc.interceptorAlloc.txnSpanRefresher.maybeRefreshPreemptivelyLocked(ctx, ba, force)
refreshedBa, pErr := tc.interceptorAlloc.txnSpanRefresher.maybeRefreshPreemptivelyLocked(ctx, ba, force)
if pErr != nil {
pErr = tc.updateStateLocked(ctx, ba, nil, pErr)
} else {
var br roachpb.BatchResponse
br.Txn = ba.Txn
pErr = tc.updateStateLocked(ctx, ba, &br, pErr)
br.Txn = refreshedBa.Txn
pErr = tc.updateStateLocked(ctx, ba, &br, nil)
}
return pErr.GoError()
}
77 changes: 62 additions & 15 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2460,10 +2460,9 @@ func TestTxnManualRefresh(t *testing.T) {
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Get)
require.True(t, ok)
var br roachpb.BatchResponse
br := r.ba.CreateReply()
br.Txn = r.ba.Txn
br.Add(&roachpb.GetResponse{})
r.respCh <- resp{br: &br}
r.respCh <- resp{br: br}
}
require.NoError(t, <-errCh)

Expand All @@ -2474,7 +2473,7 @@ func TestTxnManualRefresh(t *testing.T) {
},
},
{
name: "refresh occurs due to read",
name: "refresh occurs successfully due to read",
run: func(
ctx context.Context, t *testing.T, db *kv.DB,
clock *hlc.ManualClock, reqCh <-chan req,
Expand All @@ -2489,10 +2488,9 @@ func TestTxnManualRefresh(t *testing.T) {
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Get)
require.True(t, ok)
var br roachpb.BatchResponse
br := r.ba.CreateReply()
br.Txn = r.ba.Txn
br.Add(&roachpb.GetResponse{})
r.respCh <- resp{br: &br}
r.respCh <- resp{br: br}
}
require.NoError(t, <-errCh)

Expand All @@ -2503,14 +2501,12 @@ func TestTxnManualRefresh(t *testing.T) {
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Put)
require.True(t, ok)
var br roachpb.BatchResponse
br := r.ba.CreateReply()
br.Txn = r.ba.Txn.Clone()
// Push the WriteTimestamp simulating an interaction with the
// timestamp cache.
br.Txn.WriteTimestamp =
br.Txn.WriteTimestamp.Add(time.Millisecond.Nanoseconds(), 0)
br.Add(&roachpb.PutResponse{})
r.respCh <- resp{br: &br}
br.Txn.WriteTimestamp = db.Clock().Now()
r.respCh <- resp{br: br}
}
require.NoError(t, <-errCh)

Expand All @@ -2521,10 +2517,9 @@ func TestTxnManualRefresh(t *testing.T) {
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Refresh)
require.True(t, ok)
var br roachpb.BatchResponse
br := r.ba.CreateReply()
br.Txn = r.ba.Txn.Clone()
br.Add(&roachpb.RefreshResponse{})
r.respCh <- resp{br: &br}
r.respCh <- resp{br: br}
}
require.NoError(t, <-errCh)

Expand All @@ -2533,6 +2528,58 @@ func TestTxnManualRefresh(t *testing.T) {
require.NoError(t, txn.ManualRefresh(ctx))
},
},
{
name: "refresh occurs unsuccessfully due to read",
run: func(
ctx context.Context, t *testing.T, db *kv.DB,
clock *hlc.ManualClock, reqCh <-chan req,
) {
txn := db.NewTxn(ctx, "test")
errCh := make(chan error)
go func() {
_, err := txn.Get(ctx, "foo")
errCh <- err
}()
{
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Get)
require.True(t, ok)
br := r.ba.CreateReply()
br.Txn = r.ba.Txn
r.respCh <- resp{br: br}
}
require.NoError(t, <-errCh)

go func() {
errCh <- txn.Put(ctx, "bar", "baz")
}()
{
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Put)
require.True(t, ok)
br := r.ba.CreateReply()
br.Txn = r.ba.Txn.Clone()
// Push the WriteTimestamp simulating an interaction with the
// timestamp cache.
br.Txn.WriteTimestamp = db.Clock().Now()
r.respCh <- resp{br: br}
}
require.NoError(t, <-errCh)

go func() {
errCh <- txn.ManualRefresh(ctx)
}()
{
r := <-reqCh
_, ok := r.ba.GetArg(roachpb.Refresh)
require.True(t, ok)
// Rejects the refresh due to a conflicting write.
pErr := roachpb.NewErrorf("encountered recently written key")
r.respCh <- resp{pErr: pErr}
}
require.Regexp(t, `TransactionRetryError: retry txn \(RETRY_SERIALIZABLE - failed preemptive refresh\)`, <-errCh)
},
},
}
run := func(t *testing.T, tc testCase) {
stopper := stop.NewStopper()
Expand Down

0 comments on commit e5926f9

Please sign in to comment.