Skip to content

Commit

Permalink
kv, storage: HeartbeatTxnReq skips checking the abort span
Browse files Browse the repository at this point in the history
Before this patch, the behavior of the heartbeat request was
inconsistent when it found an aborted txn record:
It either returned a successful response, with an aborted txn proto in
it (as suggested by the docs) or it would return TransactionAbortedError
if the abort span had been populated. No more; stick to the former.
This also fixes the problem that we were not handling the first case
correctly in the TxnCoordSender - we were not stopping the hb loop.

Release note: None
  • Loading branch information
andreimatei committed Jul 23, 2018
1 parent 54a811d commit 39f6950
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 86 deletions.
43 changes: 21 additions & 22 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,6 +878,7 @@ func (tc *TxnCoordSender) heartbeat(ctx context.Context) bool {
log.VEvent(ctx, 2, "heartbeat")
br, pErr := tc.wrapped.Send(ctx, ba)

var respTxn *roachpb.Transaction
if pErr != nil {
log.VEventf(ctx, 2, "heartbeat failed: %s", pErr)

Expand All @@ -890,32 +891,30 @@ func (tc *TxnCoordSender) heartbeat(ctx context.Context) bool {
return true
}

tc.mu.Lock()
// If the error contains updated txn info, ingest it. For example, we might
// find out this way that the transaction has been Aborted in the meantime
// (e.g. someone else pushed it), in which case it's up to us to clean up.
if errTxn := pErr.GetTxn(); errTxn != nil {
tc.mu.txn.Update(errTxn)
}
status := tc.mu.txn.Status
if status == roachpb.ABORTED {
log.VEventf(ctx, 1, "Heartbeat detected aborted txn. Cleaning up.")
tc.abortTxnAsyncLocked()
if pErr.GetTxn() != nil {
// It is not expected for a 2.1 node to return an error with a transaction
// in it. For one, heartbeats are not supposed to return
// TransactionAbortedErrors.
// TODO(andrei): Remove this in 2.2.
respTxn = pErr.GetTxn()
} else {
return true
}
tc.mu.Unlock()

return status == roachpb.PENDING
} else {
respTxn = br.Responses[0].GetInner().(*roachpb.HeartbeatTxnResponse).Txn
}
txn.Update(br.Responses[0].GetInner().(*roachpb.HeartbeatTxnResponse).Txn)

// Give the news to the txn in the txns map. This will update long-running
// transactions (which may find out that they have to restart in that way),
// but in particular makes sure that they notice when they've been aborted
// (in which case we'll give them an error on their next request).
// Update our txn. In particular, we need to make sure that the client will
// notice when the txn has been aborted (in which case we'll give them an
// error on their next request).
tc.mu.Lock()
tc.mu.txn.Update(&txn)
tc.mu.Unlock()

defer tc.mu.Unlock()
tc.mu.txn.Update(respTxn)
if tc.mu.txn.Status != roachpb.PENDING {
log.VEventf(ctx, 1, "Heartbeat detected aborted txn. Cleaning up.")
tc.abortTxnAsyncLocked()
return false
}
return true
}

Expand Down
129 changes: 74 additions & 55 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,74 +394,93 @@ func TestTxnCoordSenderCondenseIntentSpans(t *testing.T) {
}
}

// TestTxnCoordSenderHeartbeat verifies periodic heartbeat of the
// transaction record.
// Test that the theartbeat loop detects aborted transactions and stops.
func TestTxnCoordSenderHeartbeat(t *testing.T) {
defer leaktest.AfterTest(t)()
s := createTestDB(t)
defer s.Stop()
ctx := context.Background()

initialTxn := client.NewTxn(s.DB, 0 /* gatewayNodeID */, client.RootTxn)
tc := initialTxn.Sender().(*TxnCoordSender)
defer teardownHeartbeat(tc)
// Set heartbeat interval to 1ms for testing.
tc.TxnCoordSenderFactory.heartbeatInterval = 1 * time.Millisecond

if err := initialTxn.Put(context.TODO(), roachpb.Key("a"), []byte("value")); err != nil {
keyA := roachpb.Key("a")
keyC := roachpb.Key("c")
splitKey := roachpb.Key("b")
if err := s.DB.AdminSplit(ctx, splitKey /* spanKey */, splitKey /* splitKey */); err != nil {
t.Fatal(err)
}

// Verify 3 heartbeats.
var heartbeatTS hlc.Timestamp
for i := 0; i < 3; i++ {
testutils.SucceedsSoon(t, func() error {
txn, pErr := getTxn(context.Background(), initialTxn)
if pErr != nil {
t.Fatal(pErr)
// We're going to test twice. In both cases the heartbeat is supposed to
// notice that its transaction is aborted, but:
// - once the abort span is populated on the txn's range.
// - once the abort span is not populated.
// The two conditions are created by either clearing an intent from the txn's
// range or not (i.e. clearing an intent from another range).
// The difference is supposed to be immaterial for the heartbeat loop (that's
// what we're testing). As of June 2018, HeartbeatTxnRequests don't check the
// abort span.
for _, pusherKey := range []roachpb.Key{keyA, keyC} {
t.Run(fmt.Sprintf("pusher:%s", pusherKey), func(t *testing.T) {
initialTxn := client.NewTxn(s.DB, 0 /* gatewayNodeID */, client.RootTxn)
tc := initialTxn.Sender().(*TxnCoordSender)
tc.TxnCoordSenderFactory.heartbeatInterval = 1 * time.Millisecond

if err := initialTxn.Put(ctx, keyA, []byte("value")); err != nil {
t.Fatal(err)
}
// Advance clock by 1ns.
// Locking the TxnCoordSender to prevent a data race.
tc.mu.Lock()
s.Manual.Increment(1)
tc.mu.Unlock()
if lastActive := txn.LastActive(); heartbeatTS.Less(lastActive) {
heartbeatTS = lastActive
return nil
if err := initialTxn.Put(ctx, keyC, []byte("value")); err != nil {
t.Fatal(err)
}
return errors.Errorf("expected heartbeat")
})
}

// Sneakily send an ABORT right to DistSender (bypassing TxnCoordSender).
{
var ba roachpb.BatchRequest
ba.Add(&roachpb.EndTransactionRequest{
RequestHeader: roachpb.RequestHeader{Key: initialTxn.Proto().Key},
Commit: false,
Poison: true,
})
ba.Txn = initialTxn.Proto()
if _, pErr := tc.TxnCoordSenderFactory.wrapped.Send(context.Background(), ba); pErr != nil {
t.Fatal(pErr)
}
}
// Verify 3 heartbeats.
var heartbeatTS hlc.Timestamp
for i := 0; i < 3; i++ {
testutils.SucceedsSoon(t, func() error {
txn, pErr := getTxn(ctx, initialTxn)
if pErr != nil {
t.Fatal(pErr)
}
// Advance clock by 1ns.
// Locking the TxnCoordSender to prevent a data race.
tc.mu.Lock()
s.Manual.Increment(1)
tc.mu.Unlock()
if lastActive := txn.LastActive(); heartbeatTS.Less(lastActive) {
heartbeatTS = lastActive
return nil
}
return errors.Errorf("expected heartbeat")
})
}

// Verify that the abort is discovered and the heartbeat discontinued.
// This relies on the heartbeat loop stopping once it figures out that the txn
// has been aborted.
testutils.SucceedsSoon(t, func() error {
tc.mu.Lock()
done := tc.mu.txnEnd == nil
tc.mu.Unlock()
if !done {
return fmt.Errorf("transaction is not aborted")
}
return nil
})
// Push our txn with another high-priority txn.
{
if err := s.DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
if err := txn.SetUserPriority(roachpb.MaxUserPriority); err != nil {
return err
}
return txn.Put(ctx, pusherKey, []byte("pusher val"))
}); err != nil {
t.Fatal(err)
}
}

// Trying to do something else should give us a TransactionAbortedError.
_, err := initialTxn.Get(context.TODO(), "a")
assertTransactionAbortedError(t, err)
// Verify that the abort is discovered and the heartbeat discontinued.
// This relies on the heartbeat loop stopping once it figures out that the txn
// has been aborted.
testutils.SucceedsSoon(t, func() error {
tc.mu.Lock()
done := tc.mu.txnEnd == nil
tc.mu.Unlock()
if !done {
return fmt.Errorf("transaction is not aborted")
}
return nil
})

// Trying to do something else should give us a TransactionAbortedError.
_, err := initialTxn.Get(ctx, "a")
assertTransactionAbortedError(t, err)
})
}
}

// getTxn fetches the requested key and returns the transaction info.
Expand Down
10 changes: 10 additions & 0 deletions pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,16 @@ func (ba *BatchRequest) IsSingleQueryTxnRequest() bool {
return false
}

// IsSingleHeartbeatTxnRequest returns true iff the batch contains a single
// request, and that request is a HeartbeatTxn.
func (ba *BatchRequest) IsSingleHeartbeatTxnRequest() bool {
if ba.IsSingleRequest() {
_, ok := ba.Requests[0].GetInner().(*HeartbeatTxnRequest)
return ok
}
return false
}

// IsSingleEndTransactionRequest returns true iff the batch contains a single
// request, and that request is an EndTransactionRequest.
func (ba *BatchRequest) IsSingleEndTransactionRequest() bool {
Expand Down
6 changes: 0 additions & 6 deletions pkg/storage/batcheval/cmd_heartbeat_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ func declareKeysHeartbeatTransaction(
desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
declareKeysWriteTransaction(desc, header, req, spans)
if header.Txn != nil {
header.Txn.AssertInitialized(context.TODO())
spans.Add(spanset.SpanReadOnly, roachpb.Span{
Key: keys.AbortSpanKey(header.RangeID, header.Txn.ID),
})
}
}

// HeartbeatTxn updates the transaction status and heartbeat
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -5896,12 +5896,15 @@ func evaluateBatch(
// transaction field cleared by this point so we do not execute
// this check in that case.
if ba.IsTransactionWrite() || ba.Txn.Writing {
// If the request is asking to abort the transaction, then don't check the
// We don't check the abort span for a couple of special requests:
// - if the request is asking to abort the transaction, then don't check the
// AbortSpan; we don't want the request to be rejected if the transaction
// has already been aborted.
// - heartbeats don't check the abort span. If the txn is aborted, they'll
// return an aborted proto in their otherwise successful response.
singleAbort := ba.IsSingleEndTransactionRequest() &&
!ba.Requests[0].GetInner().(*roachpb.EndTransactionRequest).Commit
if !singleAbort {
if !singleAbort && !ba.IsSingleHeartbeatTxnRequest() {
if pErr := checkIfTxnAborted(ctx, rec, batch, *ba.Txn); pErr != nil {
return nil, result.Result{}, pErr
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2663,8 +2663,8 @@ func TestReplicaCommandQueueCancellationLocal(t *testing.T) {
})
t.Run("CancelEndTxn", func(t *testing.T) {
instrs := []cancelInstr{
{reqOverride: heartbeatBa, expErr: "txn record not found"},
{reqOverride: endTxnBa, expErr: "txn record not found"},
{reqOverride: heartbeatBa, expErr: "txn record not found"},
{reqOverride: pushBa},
{reqOverride: heartbeatBa, expErr: "txn record not found"},
{reqOverride: resolveIntentBa},
Expand Down

0 comments on commit 39f6950

Please sign in to comment.