Skip to content

Commit

Permalink
kvcoord: prevent concurrent EndTxn requests
Browse files Browse the repository at this point in the history
`TxnCoordSender` generally operates synchronously (i.e. the client waits
for the previous response before sending the next request). However, the
`txnHeartbeater` sends asynchronous `EndTxn(commit=false)` rollbacks
when it discovers an aborted transaction record. Unfortunately, some code
assumes synchrony, which caused race conditions with txn rollbacks.

In particular, the `txnPipeliner` attaches lock spans and in-flight
writes to the `EndTxn` request for e.g. intent cleanup, but it only
records this information when it receives responses. Thus, if an
`EndTxn(commit=false)` is sent concurrently with a write request, the
lock spans and in-flight writes of that write request will not get
attached to the `EndTxn` request and the intents will not get cleaned
up.

This patch makes the `txnHeartbeater` wait for any in-flight requests to
complete before sending asynchronous rollbacks, and collapses incoming
client rollbacks with in-flight async rollbacks.

Release note (bug fix): Fixed a race condition where transaction cleanup
would fail to take into account ongoing writes and clean up their
intents.
  • Loading branch information
erikgrinaker committed May 31, 2021
1 parent 587aaed commit 834f0aa
Show file tree
Hide file tree
Showing 5 changed files with 651 additions and 108 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/storage/enginepb",
"//pkg/util",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
"//pkg/util/errorutil/unimplemented",
"//pkg/util/grpcutil",
Expand Down
160 changes: 142 additions & 18 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// abortTxnAsyncTimeout is the context timeout for abortTxnAsyncLocked()
// rollbacks. If the intent resolver has spare async task capacity, this timeout
// only needs to be long enough for the EndTxn request to make it through Raft,
// but if the cleanup task is synchronous (to backpressure clients) then cleanup
// will be abandoned when the timeout expires. We generally want to clean up if
// possible, but not at any cost, so we set it high at 1 minute.
const abortTxnAsyncTimeout = time.Minute

// txnHeartbeater is a txnInterceptor in charge of a transaction's heartbeat
// loop. Transaction coordinators heartbeat their transaction record
// periodically to indicate the liveness of their transaction. Other actors like
Expand Down Expand Up @@ -104,9 +113,37 @@ type txnHeartbeater struct {
// future requests sent though it (which indicates that the heartbeat
// loop did not race with an EndTxn request).
finalObservedStatus roachpb.TransactionStatus

// ifReqs tracks the number of in-flight requests. This is expected to
// be either 0 or 1, but we let the txnLockGatekeeper enforce that.
//
// This is used to make sure we don't send EndTxn(commit=false) from
// abortTxnAsyncLocked() concurrently with another in-flight request.
// The TxnCoordSender assumes synchronous operation; in particular,
// the txnPipeliner must update its lock spans with pending responses
// before attaching the final lock spans to the EndTxn request.
ifReqs uint8

// abortTxnAsyncPending, if true, signals that an abortTxnAsyncLocked()
// call is waiting for in-flight requests to complete. Once the last
// request returns (setting ifReqs=0), it calls abortTxnAsyncLocked().
abortTxnAsyncPending bool

// abortTxnAsyncResultC is non-nil when an abortTxnAsyncLocked()
// rollback is in-flight. If a client rollback arrives concurrently, it
// will wait for the result on this channel, collapsing the requests to
// prevent concurrent rollbacks. Only EndTxn(commit=false) requests can
// arrive during rollback, the TxnCoordSender blocks any others due to
// finalObservedStatus.
abortTxnAsyncResultC chan abortTxnAsyncResult
}
}

type abortTxnAsyncResult struct {
br *roachpb.BatchResponse
pErr *roachpb.Error
}

// init initializes the txnHeartbeater. This method exists instead of a
// constructor because txnHeartbeaters live in a pool in the TxnCoordSender.
func (h *txnHeartbeater) init(
Expand Down Expand Up @@ -165,10 +202,46 @@ func (h *txnHeartbeater) SendLocked(
if hasET {
et := etArg.(*roachpb.EndTxnRequest)
et.TxnHeartbeating = h.mu.loopStarted

if !et.Commit {
// If an abortTxnAsyncLocked() rollback is in flight, we'll wait for
// its result here to avoid sending a concurrent rollback.
// Otherwise, txnLockGatekeeper would error since it does not allow
// concurrent requests (to enforce a synchronous client protocol).
if resultC := h.mu.abortTxnAsyncResultC; resultC != nil {
// We have to unlock the mutex while waiting, to allow the
// txnLockGatekeeper to acquire the mutex when receiving the
// async abort response. Once we receive our copy of the
// response, we re-acquire the lock to return it to the client.
h.mu.Unlock()
defer h.mu.Lock()
select {
case res := <-resultC:
return res.br, res.pErr
case <-ctx.Done():
return nil, roachpb.NewError(ctx.Err())
}
}
}
}

// Forward the batch through the wrapped lockedSender.
return h.wrapped.SendLocked(ctx, ba)
// Forward the batch through the wrapped lockedSender, recording the
// in-flight request to coordinate with abortTxnAsyncLocked(). Recall that
// the mutex is unlocked for the duration of the SendLocked() call.
h.mu.ifReqs++
br, pErr := h.wrapped.SendLocked(ctx, ba)
h.mu.ifReqs--

// If an abortTxnAsyncLocked() call is waiting for this in-flight
// request to complete, call it. At this point, finalObservedStatus has
// already been set, so we don't have to worry about additional incoming
// requests (except rollbacks) -- the TxnCoordSender will block them.
if h.mu.abortTxnAsyncPending && h.mu.ifReqs == 0 {
h.abortTxnAsyncLocked(ctx)
h.mu.abortTxnAsyncPending = false
}

return br, pErr
}

// setWrapped is part of the txnInterceptor interface.
Expand Down Expand Up @@ -321,7 +394,7 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {

var respTxn *roachpb.Transaction
if pErr != nil {
log.VEventf(ctx, 2, "heartbeat failed: %s", pErr)
log.VEventf(ctx, 2, "heartbeat failed for %s: %s", h.mu.txn, pErr)

// We need to be prepared here to handle the case of a
// TransactionAbortedError with no transaction proto in it.
Expand All @@ -337,6 +410,7 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
// its commit or an ambiguous one and we have nothing to offer that
// provides more clarity. We do however prevent it from running more
// requests in case it isn't aware that the transaction is over.
log.VEventf(ctx, 1, "Heartbeat detected aborted txn, cleaning up for %s", h.mu.txn)
h.abortTxnAsyncLocked(ctx)
h.mu.finalObservedStatus = roachpb.ABORTED
return false
Expand All @@ -356,6 +430,7 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
case roachpb.ABORTED:
// Roll back the transaction record to clean up intents and
// then shut down the heartbeat loop.
log.VEventf(ctx, 1, "Heartbeat detected aborted txn, cleaning up for %s", h.mu.txn)
h.abortTxnAsyncLocked(ctx)
}
h.mu.finalObservedStatus = respTxn.Status
Expand All @@ -364,15 +439,19 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
return true
}

// abortTxnAsyncLocked send an EndTxn(commmit=false) asynchronously.
// abortTxnAsyncLocked sends an EndTxn(commmit=false) asynchronously.
// The purpose of the async cleanup is to resolve transaction intents as soon
// as possible when a transaction coordinator observes an ABORTED transaction.
func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
log.VEventf(ctx, 1, "Heartbeat detected aborted txn. Cleaning up.")

// NB: We use context.Background() here because we don't want a canceled
// context to interrupt the aborting.
ctx = h.AnnotateCtx(context.Background())
// If a request is in flight, we must wait for it to complete first such
// that txnPipeliner can record its lock spans and attach them to the EndTxn
// request we'll send.
if h.mu.ifReqs > 0 {
h.mu.abortTxnAsyncPending = true
log.VEventf(ctx, 2, "async abort waiting for in-flight request for txn %s", h.mu.txn)
return
}

// Construct a batch with an EndTxn request.
txn := h.mu.txn.Clone()
Expand All @@ -386,17 +465,62 @@ func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
TxnHeartbeating: true,
})

const taskName = "txnHeartbeater: aborting txn"
log.VEventf(ctx, 2, "async abort for txn: %s", txn)
if err := h.stopper.RunAsyncTask(
ctx, "txnHeartbeater: aborting txn", func(ctx context.Context) {
// Send the abort request through the interceptor stack. This is
// important because we need the txnPipeliner to append lock spans
// to the EndTxn request.
h.mu.Lock()
defer h.mu.Unlock()
_, pErr := h.wrapped.SendLocked(ctx, ba)
if pErr != nil {
log.VErrEventf(ctx, 1, "async abort failed for %s: %s ", txn, pErr)
if err := h.stopper.RunAsyncTask(h.AnnotateCtx(context.Background()), taskName,
func(ctx context.Context) {
if err := contextutil.RunWithTimeout(ctx, taskName, abortTxnAsyncTimeout,
func(ctx context.Context) error {
h.mu.Lock()
defer h.mu.Unlock()

// If we find an abortTxnAsyncResultC, that means an async
// rollback request is already in flight, so there's no
// point in us running another. This can happen because the
// TxnCoordSender also calls abortTxnAsyncLocked()
// independently of the heartbeat loop.
if h.mu.abortTxnAsyncResultC != nil {
log.VEventf(ctx, 2,
"skipping async abort due to concurrent async abort for %s", txn)
return nil
}

// TxnCoordSender allows EndTxn(commit=false) through even
// after we set finalObservedStatus, and that request can
// race with us for the mutex. Thus, if we find an in-flight
// request here, after checking ifReqs=0 before being spawned,
// we deduce that it must have been a rollback and there's no
// point in sending another rollback.
if h.mu.ifReqs > 0 {
log.VEventf(ctx, 2,
"skipping async abort due to client rollback for %s", txn)
return nil
}

// Set up a result channel to signal to an incoming client
// rollback that an async rollback is already in progress,
// and pass it the result. The buffer allows storing the
// result even when no client rollback arrives. Recall that
// the SendLocked() call below releases the mutex while
// running, allowing concurrent incoming requests.
h.mu.abortTxnAsyncResultC = make(chan abortTxnAsyncResult, 1)

// Send the abort request through the interceptor stack. This is
// important because we need the txnPipeliner to append lock spans
// to the EndTxn request.
br, pErr := h.wrapped.SendLocked(ctx, ba)
if pErr != nil {
log.VErrEventf(ctx, 1, "async abort failed for %s: %s ", txn, pErr)
}

// Pass the result to a waiting client rollback, if any, and
// remove the channel since we're no longer in flight.
h.mu.abortTxnAsyncResultC <- abortTxnAsyncResult{br: br, pErr: pErr}
h.mu.abortTxnAsyncResultC = nil
return nil
},
); err != nil {
log.VEventf(ctx, 1, "async abort failed for %s: %s", txn, err)
}
},
); err != nil {
Expand Down
Loading

0 comments on commit 834f0aa

Please sign in to comment.