Skip to content

Commit

Permalink
kv: disallow 1PC evaluation when heartbeating and txn record present
Browse files Browse the repository at this point in the history
Fixes cockroachdb#53403.
Fixes cockroachdb#53518.
Fixes cockroachdb#53772
Fixes cockroachdb#54094.

This commit disables one-phase commit evaluation for transactions with
existing PENDING transaction records. As we saw in cockroachdb#53518 (comment),
failing to do so can lead to a transaction that commits on the one-phase
commit fast-path but still has a PENDING transaction record. It doesn't
seem like this can actually cause serious correctness issues today other
than in the presence of replays because a 1PC transaction does not have
remote intents (by definition). However, it was creating the appearance
of abandoned transaction records in `kv/contention/nodes=4` and causing
that test to fail.

This commit needs a backport to release-20.2 and release-20.1. This was
not an issue before release-20.1 because before then, we would never
begin a transaction's heartbeat loop for 1PC transactions. This changed
in v20.1 because of unreplicated locks. We allow transactions that
acquire unreplicated locks to still hit the one-phase commit fast-path,
but we also need to start heartbeating once a transaction has acquired
any locks so that it doesn't get aborted by conflicting transactions.

In the vast majority of these cases, the heartbeat loop will never
actually fire (for any txn that takes less than 1s), so with this
change, we'll still be able to perform a 1PC evaluation. However, this
is adding in a disk read for those cases, which is a little
disappointing but doesn't seem easy to avoid without disabling the
heartbeat loop before issuing the 1PC batch (another alternative, happy
to discuss). The upside of this is that we now have enough information
on the server to avoid a bit of work for 1PC txns that have not
previously acquired locks (see TODO in evaluate1PC).
  • Loading branch information
nvanbenschoten committed Sep 18, 2020
1 parent 7509e36 commit 4b18d34
Show file tree
Hide file tree
Showing 9 changed files with 896 additions and 666 deletions.
41 changes: 35 additions & 6 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions c-deps/libroach/protos/roachpb/api.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 12 additions & 4 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ func (h *txnHeartbeater) init(
func (h *txnHeartbeater) SendLocked(
ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
etArg, hasET := ba.GetArg(roachpb.EndTxn)
firstLockingIndex, pErr := firstLockingIndex(&ba)
if pErr != nil {
return nil, pErr
Expand All @@ -149,17 +150,23 @@ func (h *txnHeartbeater) SendLocked(
ba.Txn.Key = anchor
}

// Start the heartbeat loop if it has not already started.
// Start the heartbeat loop if it has not already started and this batch
// is not intending to commit/abort the transaction.
if !h.mu.loopStarted {
_, haveEndTxn := ba.GetArg(roachpb.EndTxn)
if !haveEndTxn {
if !hasET {
if err := h.startHeartbeatLoopLocked(ctx); err != nil {
return nil, roachpb.NewError(err)
}
}
}
}

// Set the EndTxn request's TxnHeartbeating flag, if necessary.
if hasET {
et := etArg.(*roachpb.EndTxnRequest)
et.TxnHeartbeating = h.mu.loopStarted
}

// Forward the batch through the wrapped lockedSender.
return h.wrapped.SendLocked(ctx, ba)
}
Expand Down Expand Up @@ -375,7 +382,8 @@ func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
Commit: false,
// Resolved intents should maintain an abort span entry to prevent
// concurrent requests from failing to notice the transaction was aborted.
Poison: true,
Poison: true,
TxnHeartbeating: true,
})

log.VEventf(ctx, 2, "async abort for txn: %s", txn)
Expand Down
39 changes: 37 additions & 2 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestTxnHeartbeaterLoopStartedOnFirstLock(t *testing.T) {
testutils.RunTrueAndFalse(t, "write", func(t *testing.T, write bool) {
ctx := context.Background()
txn := makeTxnProto()
th, _, _ := makeMockTxnHeartbeater(&txn)
th, mockSender, _ := makeMockTxnHeartbeater(&txn)
defer th.stopper.Stop(ctx)

// Read-only requests don't start the heartbeat loop.
Expand Down Expand Up @@ -185,6 +185,27 @@ func TestTxnHeartbeaterLoopStartedOnFirstLock(t *testing.T) {
require.True(t, th.heartbeatLoopRunningLocked())
th.mu.Unlock()

// The interceptor indicates whether the heartbeat loop is
// running on EndTxn requests.
ba.Requests = nil
ba.Add(&roachpb.EndTxnRequest{Commit: true})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 1)
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[0].GetInner())

etReq := ba.Requests[0].GetInner().(*roachpb.EndTxnRequest)
require.True(t, etReq.TxnHeartbeating)

br = ba.CreateReply()
br.Txn = ba.Txn
br.Txn.Status = roachpb.COMMITTED
return br, nil
})
br, pErr = th.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)

// Closing the interceptor stops the heartbeat loop.
th.mu.Lock()
th.closeLocked()
Expand All @@ -201,7 +222,7 @@ func TestTxnHeartbeaterLoopNotStartedFor1PC(t *testing.T) {
defer log.Scope(t).Close(t)
ctx := context.Background()
txn := makeTxnProto()
th, _, _ := makeMockTxnHeartbeater(&txn)
th, mockSender, _ := makeMockTxnHeartbeater(&txn)
defer th.stopper.Stop(ctx)

keyA := roachpb.Key("a")
Expand All @@ -210,6 +231,19 @@ func TestTxnHeartbeaterLoopNotStartedFor1PC(t *testing.T) {
ba.Add(&roachpb.PutRequest{RequestHeader: roachpb.RequestHeader{Key: keyA}})
ba.Add(&roachpb.EndTxnRequest{Commit: true})

mockSender.MockSend(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
require.Len(t, ba.Requests, 2)
require.IsType(t, &roachpb.PutRequest{}, ba.Requests[0].GetInner())
require.IsType(t, &roachpb.EndTxnRequest{}, ba.Requests[1].GetInner())

etReq := ba.Requests[1].GetInner().(*roachpb.EndTxnRequest)
require.False(t, etReq.TxnHeartbeating)

br := ba.CreateReply()
br.Txn = ba.Txn
br.Txn.Status = roachpb.COMMITTED
return br, nil
})
br, pErr := th.SendLocked(ctx, ba)
require.Nil(t, pErr)
require.NotNil(t, br)
Expand Down Expand Up @@ -360,6 +394,7 @@ func TestTxnHeartbeaterAsyncAbort(t *testing.T) {
require.Nil(t, etReq.Key) // set in txnCommitter
require.False(t, etReq.Commit)
require.True(t, etReq.Poison)
require.True(t, etReq.TxnHeartbeating)

br = ba.CreateReply()
br.Txn = ba.Txn
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/batcheval/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -191,3 +193,17 @@ func SynthesizeTxnFromMeta(rec EvalContext, txn enginepb.TxnMeta) roachpb.Transa
}
return synth.AsTransaction()
}

// HasTxnRecord returns whether the provided transaction has a transaction
// record. The provided reader must come from the leaseholder of the transaction
// record's Range.
func HasTxnRecord(
ctx context.Context, reader storage.Reader, txn *roachpb.Transaction,
) (bool, error) {
key := keys.TransactionKey(txn.Key, txn.ID)
val, _, err := storage.MVCCGet(ctx, reader, key, hlc.Timestamp{}, storage.MVCCGetOptions{})
if err != nil {
return false, err
}
return val != nil, nil
}
Loading

0 comments on commit 4b18d34

Please sign in to comment.