Skip to content

Commit

Permalink
kv: don't update transaction proto directly from heartbeat loop
Browse files Browse the repository at this point in the history
Fixes cockroachdb#39652.
Fixes cockroachdb#39661.
Fixes cockroachdb#35144.

This commit fixes the referenced issues by eliminating the practice of updating
the transaction coordinator's proto directly from its heartbeat loop. This was
problematic because of the race described in
https://github.com/cockroachdb/cockroach/blob/dc3686f79b3750500efaff7092c81a3e5ce6d02c/pkg/kv/txn_interceptor_heartbeater.go#L357-L364.
The heartbeat loop doesn't know if it's racing with an EndTransaction request
and it could incorrectly interpret a missing transaction record if it is. The
safest thing to do is to limit the path in which it informs the TxnCoordSender
of updates.

This limits the responsibility of the heartbeat loop. Its job is now
only to:
1. update the transaction record to maintain liveness
2. eagerly clean up a transaction if it is found to be aborted
3. inform the transaction coordinator about an aborted transaction
   record IF the transaction coordinator is continuing to send requests
   through the interceptor.

Notably, the heartbeat loop no longer blindly updates the transaction
coordinator's transaction proto. There wasn't a strong reason for it to be able
to do so, especially now that we no longer push transactions or ratchet their
priority frequently. Moreover, even if those were still frequent occurrences,
updating the proto from the heartbeat loop prevented usual restart handling from
being used. For instance, doing so might prevent us from refreshing the
transaction. All in all, allowing this didn't seem worth the complexity.

This commit also includes some cleanup. For instance, it removes a confusing
dependency where the txnHeartbeater called back into the TxnCoordSender. It also
addresses a longstanding TODO to actually unit test the txnHeartbeater.

Release note: None
  • Loading branch information
nvanbenschoten committed Aug 16, 2019
1 parent 5beb210 commit 59e34a3
Show file tree
Hide file tree
Showing 9 changed files with 535 additions and 168 deletions.
21 changes: 13 additions & 8 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,14 +484,13 @@ func (tcf *TxnCoordSenderFactory) TransactionalSender(
if typ == client.RootTxn {
tcs.interceptorAlloc.txnHeartbeater.init(
tcf.AmbientContext,
&tcs.mu.Mutex,
&tcs.mu.txn,
tcs.stopper,
tcs.clock,
&tcs.metrics,
tcs.heartbeatInterval,
&tcs.interceptorAlloc.txnLockGatekeeper,
&tcs.metrics,
tcs.stopper,
tcs.cleanupTxnLocked,
&tcs.mu.Mutex,
&tcs.mu.txn,
)
tcs.interceptorAlloc.txnCommitter = txnCommitter{
st: tcf.st,
Expand Down Expand Up @@ -863,9 +862,7 @@ func (tc *TxnCoordSender) maybeSleepForLinearizable(
func (tc *TxnCoordSender) maybeRejectClientLocked(
ctx context.Context, ba *roachpb.BatchRequest,
) *roachpb.Error {
if singleRollback := ba != nil &&
ba.IsSingleEndTransactionRequest() &&
!ba.Requests[0].GetInner().(*roachpb.EndTransactionRequest).Commit; singleRollback {
if ba.IsSingleAbortTransactionRequest() {
// As a special case, we allow rollbacks to be sent at any time. Any
// rollback attempt moves the TxnCoordSender state to txnFinalized, but higher
// layers are free to retry rollbacks if they want (and they do, for
Expand Down Expand Up @@ -1004,6 +1001,7 @@ func (tc *TxnCoordSender) handleRetryableErrLocked(
// Abort the old txn. The client is not supposed to use use this
// TxnCoordSender any more.
tc.interceptorAlloc.txnHeartbeater.abortTxnAsyncLocked(ctx)
tc.cleanupTxnLocked(ctx)
return retErr
}

Expand Down Expand Up @@ -1239,3 +1237,10 @@ func (tc *TxnCoordSender) SerializeTxn() *roachpb.Transaction {
defer tc.mu.Unlock()
return tc.mu.txn.Clone()
}

// IsTracking returns true if the heartbeat loop is running.
func (tc *TxnCoordSender) IsTracking() bool {
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.interceptorAlloc.txnHeartbeater.heartbeatLoopRunningLocked()
}
4 changes: 2 additions & 2 deletions pkg/kv/txn_coord_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func TestHeartbeatFindsOutAboutAbortedTransaction(t *testing.T) {

// Now wait until the heartbeat loop notices that the transaction is aborted.
testutils.SucceedsSoon(t, func() error {
if txn.GetTxnCoordMeta(ctx).Txn.Status != roachpb.ABORTED {
return fmt.Errorf("txn not aborted yet")
if txn.Sender().(*kv.TxnCoordSender).IsTracking() {
return fmt.Errorf("txn heartbeat loop running")
}
return nil
})
Expand Down
35 changes: 14 additions & 21 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,6 @@ func makeTS(walltime int64, logical int32) hlc.Timestamp {
}
}

// isTracking returns true if the heartbeat loop is running.
func (tc *TxnCoordSender) isTracking() bool {
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.interceptorAlloc.txnHeartbeater.mu.txnEnd != nil
}

// TestTxnCoordSenderBeginTransaction verifies that a command sent with a
// not-nil Txn with empty ID gets a new transaction initialized.
func TestTxnCoordSenderBeginTransaction(t *testing.T) {
Expand Down Expand Up @@ -374,7 +367,7 @@ func TestTxnCoordSenderHeartbeat(t *testing.T) {
// This relies on the heartbeat loop stopping once it figures out that the txn
// has been aborted.
testutils.SucceedsSoon(t, func() error {
if tc.isTracking() {
if tc.IsTracking() {
return fmt.Errorf("transaction is not aborted")
}
return nil
Expand Down Expand Up @@ -414,7 +407,7 @@ func getTxn(ctx context.Context, txn *client.Txn) (*roachpb.Transaction, *roachp
func verifyCleanup(key roachpb.Key, eng engine.Engine, t *testing.T, coords ...*TxnCoordSender) {
testutils.SucceedsSoon(t, func() error {
for _, coord := range coords {
if coord.isTracking() {
if coord.IsTracking() {
return fmt.Errorf("expected no heartbeat")
}
}
Expand Down Expand Up @@ -652,7 +645,7 @@ func TestTxnCoordSenderGCWithAmbiguousResultErr(t *testing.T) {

testutils.SucceedsSoon(t, func() error {
// Locking the TxnCoordSender to prevent a data race.
if tc.isTracking() {
if tc.IsTracking() {
return errors.Errorf("expected garbage collection")
}
return nil
Expand Down Expand Up @@ -1160,7 +1153,7 @@ func TestTxnRestartCount(t *testing.T) {
// Wait for heartbeat to start.
tc := txn.Sender().(*TxnCoordSender)
testutils.SucceedsSoon(t, func() error {
if !tc.isTracking() {
if !tc.IsTracking() {
return errors.New("expected heartbeat to start")
}
return nil
Expand Down Expand Up @@ -1389,7 +1382,7 @@ func TestRollbackErrorStopsHeartbeat(t *testing.T) {
); pErr != nil {
t.Fatal(pErr)
}
if !txn.Sender().(*TxnCoordSender).isTracking() {
if !txn.Sender().(*TxnCoordSender).IsTracking() {
t.Fatalf("expected TxnCoordSender to be tracking after the write")
}

Expand All @@ -1401,20 +1394,20 @@ func TestRollbackErrorStopsHeartbeat(t *testing.T) {
}

testutils.SucceedsSoon(t, func() error {
if txn.Sender().(*TxnCoordSender).isTracking() {
if txn.Sender().(*TxnCoordSender).IsTracking() {
return fmt.Errorf("still tracking")
}
return nil
})
}

// Test that intent tracking behaves correctly for
// transactions that attempt to run a batch containing both a BeginTransaction
// and an EndTransaction. Since in case of an error it's not easy to determine
// whether any intents have been laid down (i.e. in case the batch was split by
// the DistSender and then there was mixed success for the sub-batches, or in
// case a retriable error is returned), the test verifies that all possible
// intents are properly tracked and attached to a subsequent EndTransaction.
// Test that intent tracking behaves correctly for transactions that attempt to
// run a batch containing both a BeginTransaction and an EndTransaction. Since
// in case of an error it's not easy to determine whether any intents have been
// laid down (i.e. in case the batch was split by the DistSender and then there
// was mixed success for the sub-batches, or in case a retriable error is
// returned), the test verifies that all possible intents are properly tracked
// and attached to a subsequent EndTransaction.
func TestOnePCErrorTracking(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand Down Expand Up @@ -1487,7 +1480,7 @@ func TestOnePCErrorTracking(t *testing.T) {

// As always, check that the rollback we just sent stops the heartbeat loop.
testutils.SucceedsSoon(t, func() error {
if txn.Sender().(*TxnCoordSender).isTracking() {
if txn.Sender().(*TxnCoordSender).IsTracking() {
return fmt.Errorf("still tracking")
}
return nil
Expand Down
Loading

0 comments on commit 59e34a3

Please sign in to comment.