Skip to content

Commit

Permalink
kv: stop the heartbeat loop on rollback errors
Browse files Browse the repository at this point in the history
A txn's heartbeat loop is generally stopped when, upon a successful
request, the response's txn is no longer PENDING. This was insufficient;
the loop should always be closed after an EndTransaction(commit=false),
regardless of whether it results in success or error.
The heartbeat loop happens to be currently closed by the context
cancelation that the db.Txn() API performs, but that is going away.

This fixes cockroachdb#26434 - TestTxnCoordSenderRetries had become flaky after cockroachdb#25541
because cockroachdb#25441 caused EndTransactions to be sent in some situations
where they weren't before. What's going on in that test is that a
subtest was leaking a heartbeat loop that was stopped after the subtest
finished; the EndTxn sent when the loop finally was being stopped was
interfering with a CommandFilter installed by a different subtest.
Before cockroachdb#25441, the first subtest was waiting for the heartbeat loop to
be done because of its own CommandFilter. However, with cockroachdb#25441, the
first subtest's CommandFilter was being satisfied by a different, newly
introduced EndTransaction.

Release note: None
  • Loading branch information
andreimatei committed Jun 7, 2018
1 parent 80ab908 commit c9ae2d7
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 1 deletion.
1 change: 0 additions & 1 deletion pkg/kv/dist_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1871,7 +1871,6 @@ func TestTxnCoordSenderHeartbeatFailurePostSplit(t *testing.T) {
// but still fail in others, depending on different conditions.
func TestTxnCoordSenderRetries(t *testing.T) {
defer leaktest.AfterTest(t)()
t.Skip("26434")

var filterFn atomic.Value
var storeKnobs storage.StoreTestingKnobs
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,18 @@ func (tc *TxnCoordSender) Send(

if pErr = tc.updateState(ctx, startNS, ba, br, pErr); pErr != nil {
log.VEventf(ctx, 2, "error: %s", pErr)

// On rollback error, stop the heartbeat loop. No more requests can come
// after a rollback, and there's nobody else to stop the heartbeat loop.
// The rollback success, like the commit success, is handled similarly
// below.
et, isEnding := ba.GetArg(roachpb.EndTransaction)
if isEnding && !et.(*roachpb.EndTransactionRequest).Commit {
tc.mu.Lock()
tc.cleanupTxnLocked(ctx, aborted)
tc.mu.Unlock()
}

return nil, pErr
}
}
Expand Down Expand Up @@ -979,6 +991,7 @@ func (tc *TxnCoordSender) heartbeatLoop(ctx context.Context) {
tc.mu.txnEnd = nil
}
duration, restarts, status := tc.finalTxnStatsLocked()
tc.mu.tracking = false
tc.mu.Unlock()
tc.updateStats(duration, restarts, status, false)
}()
Expand Down Expand Up @@ -1215,6 +1228,13 @@ func (tc *TxnCoordSender) StartTracking(ctx context.Context) error {
return nil
}

// IsTracking returns true if the heartbeat loop is running.
func (tc *TxnCoordSender) IsTracking() bool {
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.mu.tracking
}

// updateState updates the transaction state in both the success and
// error cases, applying those updates to the corresponding txnMeta
// object when adequate. It also updates retryable errors with the
Expand Down
85 changes: 85 additions & 0 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1850,3 +1850,88 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) {
})
}
}

type mockSender struct {
matchers []matcher
}

var _ client.Sender = &mockSender{}

type matcher func(roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)

func (s *mockSender) match(m matcher) {
s.matchers = append(s.matchers, m)
}

func (s *mockSender) Send(
_ context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
for _, m := range s.matchers {
br, pErr := m(ba)
if br != nil || pErr != nil {
return br, pErr
}
}
// If none of the matchers triggered, just create an empty reply.
return ba.CreateReply(), nil
}

// Test that a rollback sent to the TxnCoordSender stops the heartbeat loop even
// if it encounters an error. As of June 2018, there's a separate code path for
// handling errors on rollback in this regard.
func TestRollbackErrorStopsHeartbeat(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
clock := hlc.NewClock(hlc.UnixNano, time.Nanosecond)
ambient := log.AmbientContext{Tracer: tracing.NewTracer()}
sender := &mockSender{}
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
factory := NewTxnCoordSenderFactory(
ambient,
cluster.MakeTestingClusterSettings(),
sender,
clock,
false, /* linearizable */
stopper,
MakeTxnMetrics(metric.TestSampleInterval),
)
db := client.NewDB(factory, clock)

sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if _, ok := ba.GetArg(roachpb.EndTransaction); !ok {
return nil, nil
}
return nil, roachpb.NewErrorf("injected err")
})

txn := client.NewTxn(db, roachpb.NodeID(1), client.RootTxn)
txnHeader := roachpb.Header{
Txn: txn.Proto(),
}
if _, pErr := client.SendWrappedWith(
ctx, txn, txnHeader, &roachpb.PutRequest{
RequestHeader: roachpb.RequestHeader{
Key: roachpb.Key("a"),
},
},
); pErr != nil {
t.Fatal(pErr)
}
if !txn.Sender().(*TxnCoordSender).IsTracking() {
t.Fatalf("expected TxnCoordSender to be tracking after the write")
}

if _, pErr := client.SendWrappedWith(
ctx, txn, txnHeader,
&roachpb.EndTransactionRequest{Commit: false}); !testutils.IsPError(pErr, "injected err") {
t.Fatal(pErr)
}

testutils.SucceedsSoon(t, func() error {
if txn.Sender().(*TxnCoordSender).IsTracking() {
return fmt.Errorf("still tracking")
}
return nil
})
}

0 comments on commit c9ae2d7

Please sign in to comment.