Skip to content

Commit

Permalink
kv: don't update transaction proto directly from heartbeat loop
Browse files Browse the repository at this point in the history
Fixes cockroachdb#39652.
Fixes cockroachdb#39661.
Fixes cockroachdb#35144.

This commit fixes the referenced issues by eliminating the practice of updating
the transaction coordinator's proto directly from its heartbeat loop. This was
problematic because of the race described in
https://github.com/cockroachdb/cockroach/blob/dc3686f79b3750500efaff7092c81a3e5ce6d02c/pkg/kv/txn_interceptor_heartbeater.go#L357-L364.
The heartbeat loop doesn't know if it's racing with an EndTransaction request
and it could incorrectly interpret a missing transaction record if it is. The
safest thing to do is to limit the path in which it informs the TxnCoordSender
of updates.

This limits the responsibility of the heartbeat loop. Its job is now
only to:
1. update the transaction record to maintain liveness
2. eagerly clean up a transaction if it is found to be aborted
3. inform the transaction coordinator about an aborted transaction
   record IF the transaction coordinator is continuing to send requests
   through the interceptor.

Notably, the heartbeat loop no longer blindly updates the transaction
coordinator's transaction proto. There wasn't a strong reason for it to be able
to do so, especially now that we no longer push transactions or ratchet their
priority frequently. Moreover, even if those were still frequent occurrences,
updating the proto from the heartbeat loop prevented usual restart handling from
being used. For instance, doing so might prevent us from refreshing the
transaction. All in all, allowing this didn't seem worth the complexity.

This commit also includes some cleanup. For instance, it removes a confusing
dependency where the txnHeartbeater called back into the TxnCoordSender. It also
addresses a longstanding TODO to actually unit test the txnHeartbeater.

Release note: None
  • Loading branch information
nvanbenschoten committed Aug 21, 2019
1 parent 19d6e22 commit 4fc8816
Show file tree
Hide file tree
Showing 9 changed files with 572 additions and 196 deletions.
80 changes: 54 additions & 26 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ const (
opTxnCoordSender = "txn coordinator send"
)

// txnState represents states relating to whether Begin/EndTxn requests need to
// be sent.
// txnState represents states relating to whether an EndTransaction request
// needs to be sent.
//go:generate stringer -type=txnState
type txnState int

Expand Down Expand Up @@ -484,14 +484,13 @@ func (tcf *TxnCoordSenderFactory) TransactionalSender(
if typ == client.RootTxn {
tcs.interceptorAlloc.txnHeartbeater.init(
tcf.AmbientContext,
&tcs.mu.Mutex,
&tcs.mu.txn,
tcs.stopper,
tcs.clock,
&tcs.metrics,
tcs.heartbeatInterval,
&tcs.interceptorAlloc.txnLockGatekeeper,
&tcs.metrics,
tcs.stopper,
tcs.cleanupTxnLocked,
&tcs.mu.Mutex,
&tcs.mu.txn,
)
tcs.interceptorAlloc.txnCommitter = txnCommitter{
st: tcf.st,
Expand Down Expand Up @@ -609,12 +608,17 @@ func (tc *TxnCoordSender) GetMeta(
for _, reqInt := range tc.interceptorStack {
reqInt.populateMetaLocked(&meta)
}
if opt == client.OnlyPending && meta.Txn.Status != roachpb.PENDING {
switch opt {
case client.AnyTxnStatus:
// Nothing to check.
case client.OnlyPending:
// Check the coordinator's proto status.
rejectErr := tc.maybeRejectClientLocked(ctx, nil /* ba */)
if rejectErr == nil {
log.Fatal(ctx, "expected non-nil rejectErr")
if rejectErr != nil {
return roachpb.TxnCoordMeta{}, rejectErr.GoError()
}
return roachpb.TxnCoordMeta{}, rejectErr.GoError()
default:
panic("unreachable")
}
return meta, nil
}
Expand Down Expand Up @@ -863,44 +867,60 @@ func (tc *TxnCoordSender) maybeSleepForLinearizable(
func (tc *TxnCoordSender) maybeRejectClientLocked(
ctx context.Context, ba *roachpb.BatchRequest,
) *roachpb.Error {
if singleRollback := ba != nil &&
ba.IsSingleEndTransactionRequest() &&
!ba.Requests[0].GetInner().(*roachpb.EndTransactionRequest).Commit; singleRollback {
if ba != nil && ba.IsSingleAbortTransactionRequest() {
// As a special case, we allow rollbacks to be sent at any time. Any
// rollback attempt moves the TxnCoordSender state to txnFinalized, but higher
// layers are free to retry rollbacks if they want (and they do, for
// example, when the context was canceled while txn.Rollback() was running).
return nil
}

if tc.mu.txnState == txnFinalized {
// Check the transaction coordinator state.
switch tc.mu.txnState {
case txnPending:
// All good.
case txnError:
return tc.mu.storedErr
case txnFinalized:
msg := fmt.Sprintf("client already committed or rolled back the transaction. "+
"Trying to execute: %s", ba)
stack := string(debug.Stack())
log.Errorf(ctx, "%s. stack:\n%s", msg, stack)
return roachpb.NewErrorWithTxn(roachpb.NewTransactionStatusError(msg), &tc.mu.txn)
}
if tc.mu.txnState == txnError {
return tc.mu.storedErr
}
if tc.mu.txn.Status == roachpb.ABORTED {

// Check the transaction proto state, along with any finalized transaction
// status observed by the transaction heartbeat loop.
protoStatus := tc.mu.txn.Status
hbObservedStatus := tc.interceptorAlloc.txnHeartbeater.mu.finalObservedStatus
switch {
case protoStatus == roachpb.ABORTED:
// The transaction was rolled back synchronously.
fallthrough
case protoStatus != roachpb.COMMITTED && hbObservedStatus == roachpb.ABORTED:
// The transaction heartbeat observed an aborted transaction record and
// this was not due to a synchronous transaction commit and transaction
// record garbage collection.
// See the comment on txnHeartbeater.mu.finalizedStatus for more details.
abortedErr := roachpb.NewErrorWithTxn(
roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_CLIENT_REJECT), &tc.mu.txn)
if tc.typ == client.LeafTxn {
// Leaf txns return raw retriable errors (which get handled by the
// root) rather than TransactionRetryWithProtoRefreshError.
return abortedErr
}
// Root txns handle retriable errors.
newTxn := roachpb.PrepareTransactionForRetry(
ctx, abortedErr,
// priority is not used for aborted errors
roachpb.NormalUserPriority,
tc.clock)
ctx, abortedErr, roachpb.NormalUserPriority, tc.clock)
return roachpb.NewError(roachpb.NewTransactionRetryWithProtoRefreshError(
abortedErr.Message, tc.mu.txn.ID, newTxn))
}
if tc.mu.txn.Status != roachpb.PENDING {
return roachpb.NewErrorf("(see issue #37866) unexpected txn state: %s", tc.mu.txn)
case protoStatus != roachpb.PENDING || hbObservedStatus != roachpb.PENDING:
// The transaction proto is in an unexpected state.
return roachpb.NewErrorf(
"(see issue #37866) unexpected txn state: %s; heartbeat observed status: %s",
tc.mu.txn, hbObservedStatus)
default:
// All good.
}
return nil
}
Expand Down Expand Up @@ -1004,6 +1024,7 @@ func (tc *TxnCoordSender) handleRetryableErrLocked(
// Abort the old txn. The client is not supposed to use use this
// TxnCoordSender any more.
tc.interceptorAlloc.txnHeartbeater.abortTxnAsyncLocked(ctx)
tc.cleanupTxnLocked(ctx)
return retErr
}

Expand Down Expand Up @@ -1239,3 +1260,10 @@ func (tc *TxnCoordSender) SerializeTxn() *roachpb.Transaction {
defer tc.mu.Unlock()
return tc.mu.txn.Clone()
}

// IsTracking returns true if the heartbeat loop is running.
func (tc *TxnCoordSender) IsTracking() bool {
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.interceptorAlloc.txnHeartbeater.heartbeatLoopRunningLocked()
}
4 changes: 2 additions & 2 deletions pkg/kv/txn_coord_sender_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func TestHeartbeatFindsOutAboutAbortedTransaction(t *testing.T) {

// Now wait until the heartbeat loop notices that the transaction is aborted.
testutils.SucceedsSoon(t, func() error {
if txn.GetTxnCoordMeta(ctx).Txn.Status != roachpb.ABORTED {
return fmt.Errorf("txn not aborted yet")
if txn.Sender().(*kv.TxnCoordSender).IsTracking() {
return fmt.Errorf("txn heartbeat loop running")
}
return nil
})
Expand Down
35 changes: 14 additions & 21 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,6 @@ func makeTS(walltime int64, logical int32) hlc.Timestamp {
}
}

// isTracking returns true if the heartbeat loop is running.
func (tc *TxnCoordSender) isTracking() bool {
tc.mu.Lock()
defer tc.mu.Unlock()
return tc.interceptorAlloc.txnHeartbeater.mu.txnEnd != nil
}

// TestTxnCoordSenderBeginTransaction verifies that a command sent with a
// not-nil Txn with empty ID gets a new transaction initialized.
func TestTxnCoordSenderBeginTransaction(t *testing.T) {
Expand Down Expand Up @@ -379,7 +372,7 @@ func TestTxnCoordSenderHeartbeat(t *testing.T) {
// This relies on the heartbeat loop stopping once it figures out that the txn
// has been aborted.
testutils.SucceedsSoon(t, func() error {
if tc.isTracking() {
if tc.IsTracking() {
return fmt.Errorf("transaction is not aborted")
}
return nil
Expand Down Expand Up @@ -419,7 +412,7 @@ func getTxn(ctx context.Context, txn *client.Txn) (*roachpb.Transaction, *roachp
func verifyCleanup(key roachpb.Key, eng engine.Engine, t *testing.T, coords ...*TxnCoordSender) {
testutils.SucceedsSoon(t, func() error {
for _, coord := range coords {
if coord.isTracking() {
if coord.IsTracking() {
return fmt.Errorf("expected no heartbeat")
}
}
Expand Down Expand Up @@ -657,7 +650,7 @@ func TestTxnCoordSenderGCWithAmbiguousResultErr(t *testing.T) {

testutils.SucceedsSoon(t, func() error {
// Locking the TxnCoordSender to prevent a data race.
if tc.isTracking() {
if tc.IsTracking() {
return errors.Errorf("expected garbage collection")
}
return nil
Expand Down Expand Up @@ -1165,7 +1158,7 @@ func TestTxnRestartCount(t *testing.T) {
// Wait for heartbeat to start.
tc := txn.Sender().(*TxnCoordSender)
testutils.SucceedsSoon(t, func() error {
if !tc.isTracking() {
if !tc.IsTracking() {
return errors.New("expected heartbeat to start")
}
return nil
Expand Down Expand Up @@ -1394,7 +1387,7 @@ func TestRollbackErrorStopsHeartbeat(t *testing.T) {
); pErr != nil {
t.Fatal(pErr)
}
if !txn.Sender().(*TxnCoordSender).isTracking() {
if !txn.Sender().(*TxnCoordSender).IsTracking() {
t.Fatalf("expected TxnCoordSender to be tracking after the write")
}

Expand All @@ -1406,20 +1399,20 @@ func TestRollbackErrorStopsHeartbeat(t *testing.T) {
}

testutils.SucceedsSoon(t, func() error {
if txn.Sender().(*TxnCoordSender).isTracking() {
if txn.Sender().(*TxnCoordSender).IsTracking() {
return fmt.Errorf("still tracking")
}
return nil
})
}

// Test that intent tracking behaves correctly for
// transactions that attempt to run a batch containing both a BeginTransaction
// and an EndTransaction. Since in case of an error it's not easy to determine
// whether any intents have been laid down (i.e. in case the batch was split by
// the DistSender and then there was mixed success for the sub-batches, or in
// case a retriable error is returned), the test verifies that all possible
// intents are properly tracked and attached to a subsequent EndTransaction.
// Test that intent tracking behaves correctly for transactions that attempt to
// run a batch containing both a BeginTransaction and an EndTransaction. Since
// in case of an error it's not easy to determine whether any intents have been
// laid down (i.e. in case the batch was split by the DistSender and then there
// was mixed success for the sub-batches, or in case a retriable error is
// returned), the test verifies that all possible intents are properly tracked
// and attached to a subsequent EndTransaction.
func TestOnePCErrorTracking(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
Expand Down Expand Up @@ -1492,7 +1485,7 @@ func TestOnePCErrorTracking(t *testing.T) {

// As always, check that the rollback we just sent stops the heartbeat loop.
testutils.SucceedsSoon(t, func() error {
if txn.Sender().(*TxnCoordSender).isTracking() {
if txn.Sender().(*TxnCoordSender).IsTracking() {
return fmt.Errorf("still tracking")
}
return nil
Expand Down
Loading

0 comments on commit 4fc8816

Please sign in to comment.