Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-21.1: kvcoord: prevent concurrent EndTxn requests #65863

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/storage/enginepb",
"//pkg/util",
"//pkg/util/contextutil",
"//pkg/util/ctxgroup",
"//pkg/util/errorutil/unimplemented",
"//pkg/util/grpcutil",
Expand Down
160 changes: 142 additions & 18 deletions pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
)

// abortTxnAsyncTimeout is the context timeout for abortTxnAsyncLocked()
// rollbacks. If the intent resolver has spare async task capacity, this timeout
// only needs to be long enough for the EndTxn request to make it through Raft,
// but if the cleanup task is synchronous (to backpressure clients) then cleanup
// will be abandoned when the timeout expires. We generally want to clean up if
// possible, but not at any cost, so we set it high at 1 minute.
const abortTxnAsyncTimeout = time.Minute

// txnHeartbeater is a txnInterceptor in charge of a transaction's heartbeat
// loop. Transaction coordinators heartbeat their transaction record
// periodically to indicate the liveness of their transaction. Other actors like
Expand Down Expand Up @@ -104,9 +113,37 @@ type txnHeartbeater struct {
// future requests sent though it (which indicates that the heartbeat
// loop did not race with an EndTxn request).
finalObservedStatus roachpb.TransactionStatus

// ifReqs tracks the number of in-flight requests. This is expected to
// be either 0 or 1, but we let the txnLockGatekeeper enforce that.
//
// This is used to make sure we don't send EndTxn(commit=false) from
// abortTxnAsyncLocked() concurrently with another in-flight request.
// The TxnCoordSender assumes synchronous operation; in particular,
// the txnPipeliner must update its lock spans with pending responses
// before attaching the final lock spans to the EndTxn request.
ifReqs uint8

// abortTxnAsyncPending, if true, signals that an abortTxnAsyncLocked()
// call is waiting for in-flight requests to complete. Once the last
// request returns (setting ifReqs=0), it calls abortTxnAsyncLocked().
abortTxnAsyncPending bool

// abortTxnAsyncResultC is non-nil when an abortTxnAsyncLocked()
// rollback is in-flight. If a client rollback arrives concurrently, it
// will wait for the result on this channel, collapsing the requests to
// prevent concurrent rollbacks. Only EndTxn(commit=false) requests can
// arrive during rollback, the TxnCoordSender blocks any others due to
// finalObservedStatus.
abortTxnAsyncResultC chan abortTxnAsyncResult
}
}

type abortTxnAsyncResult struct {
br *roachpb.BatchResponse
pErr *roachpb.Error
}

// init initializes the txnHeartbeater. This method exists instead of a
// constructor because txnHeartbeaters live in a pool in the TxnCoordSender.
func (h *txnHeartbeater) init(
Expand Down Expand Up @@ -165,10 +202,46 @@ func (h *txnHeartbeater) SendLocked(
if hasET {
et := etArg.(*roachpb.EndTxnRequest)
et.TxnHeartbeating = h.mu.loopStarted

if !et.Commit {
// If an abortTxnAsyncLocked() rollback is in flight, we'll wait for
// its result here to avoid sending a concurrent rollback.
// Otherwise, txnLockGatekeeper would error since it does not allow
// concurrent requests (to enforce a synchronous client protocol).
if resultC := h.mu.abortTxnAsyncResultC; resultC != nil {
// We have to unlock the mutex while waiting, to allow the
// txnLockGatekeeper to acquire the mutex when receiving the
// async abort response. Once we receive our copy of the
// response, we re-acquire the lock to return it to the client.
h.mu.Unlock()
defer h.mu.Lock()
select {
case res := <-resultC:
return res.br, res.pErr
case <-ctx.Done():
return nil, roachpb.NewError(ctx.Err())
}
}
}
}

// Forward the batch through the wrapped lockedSender.
return h.wrapped.SendLocked(ctx, ba)
// Forward the batch through the wrapped lockedSender, recording the
// in-flight request to coordinate with abortTxnAsyncLocked(). Recall that
// the mutex is unlocked for the duration of the SendLocked() call.
h.mu.ifReqs++
br, pErr := h.wrapped.SendLocked(ctx, ba)
h.mu.ifReqs--

// If an abortTxnAsyncLocked() call is waiting for this in-flight
// request to complete, call it. At this point, finalObservedStatus has
// already been set, so we don't have to worry about additional incoming
// requests (except rollbacks) -- the TxnCoordSender will block them.
if h.mu.abortTxnAsyncPending && h.mu.ifReqs == 0 {
h.abortTxnAsyncLocked(ctx)
h.mu.abortTxnAsyncPending = false
}

return br, pErr
}

// setWrapped is part of the txnInterceptor interface.
Expand Down Expand Up @@ -321,7 +394,7 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {

var respTxn *roachpb.Transaction
if pErr != nil {
log.VEventf(ctx, 2, "heartbeat failed: %s", pErr)
log.VEventf(ctx, 2, "heartbeat failed for %s: %s", h.mu.txn, pErr)

// We need to be prepared here to handle the case of a
// TransactionAbortedError with no transaction proto in it.
Expand All @@ -337,6 +410,7 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
// its commit or an ambiguous one and we have nothing to offer that
// provides more clarity. We do however prevent it from running more
// requests in case it isn't aware that the transaction is over.
log.VEventf(ctx, 1, "Heartbeat detected aborted txn, cleaning up for %s", h.mu.txn)
h.abortTxnAsyncLocked(ctx)
h.mu.finalObservedStatus = roachpb.ABORTED
return false
Expand All @@ -356,6 +430,7 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
case roachpb.ABORTED:
// Roll back the transaction record to clean up intents and
// then shut down the heartbeat loop.
log.VEventf(ctx, 1, "Heartbeat detected aborted txn, cleaning up for %s", h.mu.txn)
h.abortTxnAsyncLocked(ctx)
}
h.mu.finalObservedStatus = respTxn.Status
Expand All @@ -364,15 +439,19 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
return true
}

// abortTxnAsyncLocked send an EndTxn(commmit=false) asynchronously.
// abortTxnAsyncLocked sends an EndTxn(commmit=false) asynchronously.
// The purpose of the async cleanup is to resolve transaction intents as soon
// as possible when a transaction coordinator observes an ABORTED transaction.
func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
log.VEventf(ctx, 1, "Heartbeat detected aborted txn. Cleaning up.")

// NB: We use context.Background() here because we don't want a canceled
// context to interrupt the aborting.
ctx = h.AnnotateCtx(context.Background())
// If a request is in flight, we must wait for it to complete first such
// that txnPipeliner can record its lock spans and attach them to the EndTxn
// request we'll send.
if h.mu.ifReqs > 0 {
h.mu.abortTxnAsyncPending = true
log.VEventf(ctx, 2, "async abort waiting for in-flight request for txn %s", h.mu.txn)
return
}

// Construct a batch with an EndTxn request.
txn := h.mu.txn.Clone()
Expand All @@ -386,17 +465,62 @@ func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {
TxnHeartbeating: true,
})

const taskName = "txnHeartbeater: aborting txn"
log.VEventf(ctx, 2, "async abort for txn: %s", txn)
if err := h.stopper.RunAsyncTask(
ctx, "txnHeartbeater: aborting txn", func(ctx context.Context) {
// Send the abort request through the interceptor stack. This is
// important because we need the txnPipeliner to append lock spans
// to the EndTxn request.
h.mu.Lock()
defer h.mu.Unlock()
_, pErr := h.wrapped.SendLocked(ctx, ba)
if pErr != nil {
log.VErrEventf(ctx, 1, "async abort failed for %s: %s ", txn, pErr)
if err := h.stopper.RunAsyncTask(h.AnnotateCtx(context.Background()), taskName,
func(ctx context.Context) {
if err := contextutil.RunWithTimeout(ctx, taskName, abortTxnAsyncTimeout,
func(ctx context.Context) error {
h.mu.Lock()
defer h.mu.Unlock()

// If we find an abortTxnAsyncResultC, that means an async
// rollback request is already in flight, so there's no
// point in us running another. This can happen because the
// TxnCoordSender also calls abortTxnAsyncLocked()
// independently of the heartbeat loop.
if h.mu.abortTxnAsyncResultC != nil {
log.VEventf(ctx, 2,
"skipping async abort due to concurrent async abort for %s", txn)
return nil
}

// TxnCoordSender allows EndTxn(commit=false) through even
// after we set finalObservedStatus, and that request can
// race with us for the mutex. Thus, if we find an in-flight
// request here, after checking ifReqs=0 before being spawned,
// we deduce that it must have been a rollback and there's no
// point in sending another rollback.
if h.mu.ifReqs > 0 {
log.VEventf(ctx, 2,
"skipping async abort due to client rollback for %s", txn)
return nil
}

// Set up a result channel to signal to an incoming client
// rollback that an async rollback is already in progress,
// and pass it the result. The buffer allows storing the
// result even when no client rollback arrives. Recall that
// the SendLocked() call below releases the mutex while
// running, allowing concurrent incoming requests.
h.mu.abortTxnAsyncResultC = make(chan abortTxnAsyncResult, 1)

// Send the abort request through the interceptor stack. This is
// important because we need the txnPipeliner to append lock spans
// to the EndTxn request.
br, pErr := h.wrapped.SendLocked(ctx, ba)
if pErr != nil {
log.VErrEventf(ctx, 1, "async abort failed for %s: %s ", txn, pErr)
}

// Pass the result to a waiting client rollback, if any, and
// remove the channel since we're no longer in flight.
h.mu.abortTxnAsyncResultC <- abortTxnAsyncResult{br: br, pErr: pErr}
h.mu.abortTxnAsyncResultC = nil
return nil
},
); err != nil {
log.VEventf(ctx, 1, "async abort failed for %s: %s", txn, err)
}
},
); err != nil {
Expand Down
Loading