diff --git a/pkg/kv/dist_sender_server_test.go b/pkg/kv/dist_sender_server_test.go index c329ad99d9d9..21de65a9dd12 100644 --- a/pkg/kv/dist_sender_server_test.go +++ b/pkg/kv/dist_sender_server_test.go @@ -1871,7 +1871,6 @@ func TestTxnCoordSenderHeartbeatFailurePostSplit(t *testing.T) { // but still fail in others, depending on different conditions. func TestTxnCoordSenderRetries(t *testing.T) { defer leaktest.AfterTest(t)() - t.Skip("26434") var filterFn atomic.Value var storeKnobs storage.StoreTestingKnobs diff --git a/pkg/kv/txn_coord_sender.go b/pkg/kv/txn_coord_sender.go index 35ce8e15224f..c0649a6df1c8 100644 --- a/pkg/kv/txn_coord_sender.go +++ b/pkg/kv/txn_coord_sender.go @@ -979,6 +979,7 @@ func (tc *TxnCoordSender) heartbeatLoop(ctx context.Context) { tc.mu.txnEnd = nil } duration, restarts, status := tc.finalTxnStatsLocked() + tc.mu.tracking = false tc.mu.Unlock() tc.updateStats(duration, restarts, status, false) }() @@ -1215,6 +1216,13 @@ func (tc *TxnCoordSender) StartTracking(ctx context.Context) error { return nil } +// IsTracking returns true if the heartbeat loop is running. +func (tc *TxnCoordSender) IsTracking() bool { + tc.mu.Lock() + defer tc.mu.Unlock() + return tc.mu.tracking +} + // updateState updates the transaction state in both the success and // error cases, applying those updates to the corresponding txnMeta // object when adequate. It also updates retryable errors with the @@ -1384,6 +1392,17 @@ func (tc *TxnCoordSender) updateState( tc.mu.meta.Txn.Update(&newTxn) tc.mu.lastUpdateNanos = tc.clock.PhysicalNow() + if pErr != nil { + // On rollback error, stop the heartbeat loop. No more requests can come + // after a rollback, and there's nobody else to stop the heartbeat loop. + // The rollback success, like the commit success, is handled similarly + // below. + et, isEnding := ba.GetArg(roachpb.EndTransaction) + if isEnding && !et.(*roachpb.EndTransactionRequest).Commit { + tc.cleanupTxnLocked(ctx, aborted) + } + } + return pErr } diff --git a/pkg/kv/txn_coord_sender_test.go b/pkg/kv/txn_coord_sender_test.go index c3a5d18f3a3f..6c121bc2c28c 100644 --- a/pkg/kv/txn_coord_sender_test.go +++ b/pkg/kv/txn_coord_sender_test.go @@ -1850,3 +1850,88 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) { }) } } + +type mockSender struct { + matchers []matcher +} + +var _ client.Sender = &mockSender{} + +type matcher func(roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) + +func (s *mockSender) match(m matcher) { + s.matchers = append(s.matchers, m) +} + +func (s *mockSender) Send( + _ context.Context, ba roachpb.BatchRequest, +) (*roachpb.BatchResponse, *roachpb.Error) { + for _, m := range s.matchers { + br, pErr := m(ba) + if br != nil || pErr != nil { + return br, pErr + } + } + // If none of the matchers triggered, just create an empty reply. + return ba.CreateReply(), nil +} + +// Test that a rollback sent to the TxnCoordSender stops the heartbeat loop even +// if it encounters an error. As of June 2018, there's a separate code path for +// handling errors on rollback in this regard. +func TestRollbackErrorStopsHeartbeat(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond) + ambient := log.AmbientContext{Tracer: tracing.NewTracer()} + sender := &mockSender{} + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + factory := NewTxnCoordSenderFactory( + ambient, + cluster.MakeTestingClusterSettings(), + sender, + clock, + false, /* linearizable */ + stopper, + MakeTxnMetrics(metric.TestSampleInterval), + ) + db := client.NewDB(factory, clock) + + sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { + if _, ok := ba.GetArg(roachpb.EndTransaction); !ok { + return nil, nil + } + return nil, roachpb.NewErrorf("injected err") + }) + + txn := client.NewTxn(db, roachpb.NodeID(1), client.RootTxn) + txnHeader := roachpb.Header{ + Txn: txn.Proto(), + } + if _, pErr := client.SendWrappedWith( + ctx, txn, txnHeader, &roachpb.PutRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: roachpb.Key("a"), + }, + }, + ); pErr != nil { + t.Fatal(pErr) + } + if !txn.Sender().(*TxnCoordSender).IsTracking() { + t.Fatalf("expected TxnCoordSender to be tracking after the write") + } + + if _, pErr := client.SendWrappedWith( + ctx, txn, txnHeader, + &roachpb.EndTransactionRequest{Commit: false}); !testutils.IsPError(pErr, "injected err") { + t.Fatal(pErr) + } + + testutils.SucceedsSoon(t, func() error { + if txn.Sender().(*TxnCoordSender).IsTracking() { + return fmt.Errorf("still tracking") + } + return nil + }) +}