From e5926f976e36b3b9b04c91186c496130eb42ecf8 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Fri, 19 Feb 2021 19:14:26 -0500 Subject: [PATCH] kv/kvclient: fix ManualRefresh error handling Fixes #60760. Fallout from #60567. The refreshed BatchRequest was nil on the error path, which was resulting in a nil-pointer exception. This commit fixes this by passing the original BatchRequest to updateStateLocked, like the TxnCoordSender normally does. --- pkg/kv/kvclient/kvcoord/txn_coord_sender.go | 10 ++- .../kvclient/kvcoord/txn_coord_sender_test.go | 77 +++++++++++++++---- 2 files changed, 69 insertions(+), 18 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go index b69e7e59ae4c..1deaa7727f6f 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender.go @@ -1158,16 +1158,20 @@ func (tc *TxnCoordSender) ManualRefresh(ctx context.Context) error { // Hijack the pre-emptive refresh code path to perform the refresh but // provide the force flag to ensure that the refresh occurs unconditionally. + // We provide an empty BatchRequest - maybeRefreshPreemptivelyLocked just + // needs the transaction proto. The function then returns a BatchRequest + // with the updated transaction proto. We use this updated proto to call + // into updateStateLocked directly. var ba roachpb.BatchRequest ba.Txn = tc.mu.txn.Clone() const force = true - ba, pErr := tc.interceptorAlloc.txnSpanRefresher.maybeRefreshPreemptivelyLocked(ctx, ba, force) + refreshedBa, pErr := tc.interceptorAlloc.txnSpanRefresher.maybeRefreshPreemptivelyLocked(ctx, ba, force) if pErr != nil { pErr = tc.updateStateLocked(ctx, ba, nil, pErr) } else { var br roachpb.BatchResponse - br.Txn = ba.Txn - pErr = tc.updateStateLocked(ctx, ba, &br, pErr) + br.Txn = refreshedBa.Txn + pErr = tc.updateStateLocked(ctx, ba, &br, nil) } return pErr.GoError() } diff --git a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go index 5b8a70ccfdb7..a5492b1914b5 100644 --- a/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go @@ -2460,10 +2460,9 @@ func TestTxnManualRefresh(t *testing.T) { r := <-reqCh _, ok := r.ba.GetArg(roachpb.Get) require.True(t, ok) - var br roachpb.BatchResponse + br := r.ba.CreateReply() br.Txn = r.ba.Txn - br.Add(&roachpb.GetResponse{}) - r.respCh <- resp{br: &br} + r.respCh <- resp{br: br} } require.NoError(t, <-errCh) @@ -2474,7 +2473,7 @@ func TestTxnManualRefresh(t *testing.T) { }, }, { - name: "refresh occurs due to read", + name: "refresh occurs successfully due to read", run: func( ctx context.Context, t *testing.T, db *kv.DB, clock *hlc.ManualClock, reqCh <-chan req, @@ -2489,10 +2488,9 @@ func TestTxnManualRefresh(t *testing.T) { r := <-reqCh _, ok := r.ba.GetArg(roachpb.Get) require.True(t, ok) - var br roachpb.BatchResponse + br := r.ba.CreateReply() br.Txn = r.ba.Txn - br.Add(&roachpb.GetResponse{}) - r.respCh <- resp{br: &br} + r.respCh <- resp{br: br} } require.NoError(t, <-errCh) @@ -2503,14 +2501,12 @@ func TestTxnManualRefresh(t *testing.T) { r := <-reqCh _, ok := r.ba.GetArg(roachpb.Put) require.True(t, ok) - var br roachpb.BatchResponse + br := r.ba.CreateReply() br.Txn = r.ba.Txn.Clone() // Push the WriteTimestamp simulating an interaction with the // timestamp cache. - br.Txn.WriteTimestamp = - br.Txn.WriteTimestamp.Add(time.Millisecond.Nanoseconds(), 0) - br.Add(&roachpb.PutResponse{}) - r.respCh <- resp{br: &br} + br.Txn.WriteTimestamp = db.Clock().Now() + r.respCh <- resp{br: br} } require.NoError(t, <-errCh) @@ -2521,10 +2517,9 @@ func TestTxnManualRefresh(t *testing.T) { r := <-reqCh _, ok := r.ba.GetArg(roachpb.Refresh) require.True(t, ok) - var br roachpb.BatchResponse + br := r.ba.CreateReply() br.Txn = r.ba.Txn.Clone() - br.Add(&roachpb.RefreshResponse{}) - r.respCh <- resp{br: &br} + r.respCh <- resp{br: br} } require.NoError(t, <-errCh) @@ -2533,6 +2528,58 @@ func TestTxnManualRefresh(t *testing.T) { require.NoError(t, txn.ManualRefresh(ctx)) }, }, + { + name: "refresh occurs unsuccessfully due to read", + run: func( + ctx context.Context, t *testing.T, db *kv.DB, + clock *hlc.ManualClock, reqCh <-chan req, + ) { + txn := db.NewTxn(ctx, "test") + errCh := make(chan error) + go func() { + _, err := txn.Get(ctx, "foo") + errCh <- err + }() + { + r := <-reqCh + _, ok := r.ba.GetArg(roachpb.Get) + require.True(t, ok) + br := r.ba.CreateReply() + br.Txn = r.ba.Txn + r.respCh <- resp{br: br} + } + require.NoError(t, <-errCh) + + go func() { + errCh <- txn.Put(ctx, "bar", "baz") + }() + { + r := <-reqCh + _, ok := r.ba.GetArg(roachpb.Put) + require.True(t, ok) + br := r.ba.CreateReply() + br.Txn = r.ba.Txn.Clone() + // Push the WriteTimestamp simulating an interaction with the + // timestamp cache. + br.Txn.WriteTimestamp = db.Clock().Now() + r.respCh <- resp{br: br} + } + require.NoError(t, <-errCh) + + go func() { + errCh <- txn.ManualRefresh(ctx) + }() + { + r := <-reqCh + _, ok := r.ba.GetArg(roachpb.Refresh) + require.True(t, ok) + // Rejects the refresh due to a conflicting write. + pErr := roachpb.NewErrorf("encountered recently written key") + r.respCh <- resp{pErr: pErr} + } + require.Regexp(t, `TransactionRetryError: retry txn \(RETRY_SERIALIZABLE - failed preemptive refresh\)`, <-errCh) + }, + }, } run := func(t *testing.T, tc testCase) { stopper := stop.NewStopper()