Skip to content

Commit

Permalink
[DNM] kv: don't let pusher win when pushing STAGING txn with equal pr…
Browse files Browse the repository at this point in the history
…iority

DNM: needs more testing.

Fixes cockroachdb#105330.
Fixes cockroachdb#101721.

This commit updates the transaction push logic to stop pushes from completing
successfully when the pushee is STAGING and the pusher has equal priority. Prior
to this change, the pusher would win in these cases when using a PUSH_TIMESTAMP
if at least one of the two transactions involved used a weak isolation level.

This had two undesirable effects:
- if the pushee was still possibly committable and requiring recovery
  (`!(knownHigherTimestamp || knownHigherEpoch)` in the code) then the pusher
  would immediately begin parallel commit recovery, attempting to disrupt the
  commit and abort the pushee. This is undesirable because the pushee may still
  be in progress and in cases of equal priority, we'd like to wait for the
  parallel commit to complete before kicking off recovery, deferring recovery to
  only the cases where the pushee/committers's heartbeat has expired.
- if the pushee was known too be uncommittable (`knownHigherTimestamp || knownHigherEpoch`
  in the code), then txn recovery was not kicked off but the pusher still could
  not perform the PUSH_TIMESTAMP (see e40c1b4), so it would return a
  `TransactionPushError`. This confused logic in `handleTransactionPushError`,
  allowing the error to escape to the client.

This commit resolves both issues by considering the pushee's transaction status
in `txnwait.ShouldPushImmediately`.

Release note: None
  • Loading branch information
nvanbenschoten committed Jul 31, 2023
1 parent 409fdeb commit 3d49079
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 76 deletions.
10 changes: 0 additions & 10 deletions pkg/kv/kvnemesis/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1232,16 +1232,6 @@ func (v *validator) failIfError(
) (ambiguous, hasError bool) {
exceptions = append(exceptions[:len(exceptions):len(exceptions)], func(err error) bool {
return errors.Is(err, errInjected)
}, func(err error) bool {
// Work-around for [1].
//
// TODO(arul): find out why we (as of [2]) sometimes leaking
// *TransactionPushError (wrapped in `UnhandledRetryableError`) from
// `db.Get`, `db.Scan`, etc.
//
// [1]: https://github.com/cockroachdb/cockroach/issues/105330
// [2]: https://github.com/cockroachdb/cockroach/pull/97779
return errors.HasType(err, (*kvpb.UnhandledRetryableError)(nil))
})
switch r.Type {
case ResultType_Unknown:
Expand Down
65 changes: 33 additions & 32 deletions pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,38 +213,9 @@ func PushTxn(
}
reply.PusheeTxn.UpgradePriority(args.PusheeTxn.Priority)

// If the pusher is aware that the pushee's currently recorded attempt at a
// parallel commit failed, either because it found intents at a higher
// timestamp than the parallel commit attempt or because it found intents at
// a higher epoch than the parallel commit attempt, it should not consider
// the pushee to be performing a parallel commit. Its commit status is not
// indeterminate.
if (knownHigherTimestamp || knownHigherEpoch) && reply.PusheeTxn.Status == roachpb.STAGING {
reply.PusheeTxn.Status = roachpb.PENDING
reply.PusheeTxn.InFlightWrites = nil
// If the pusher is aware that the pushee's currently recorded attempt
// at a parallel commit failed, upgrade PUSH_TIMESTAMPs to PUSH_ABORTs.
// We don't want to move the transaction back to PENDING, as this is not
// (currently) allowed by the recovery protocol. We also don't want to
// move the transaction to a new timestamp while retaining the STAGING
// status, as this could allow the transaction to enter an implicit
// commit state without its knowledge, leading to atomicity violations.
//
// This has no effect on pushes that fail with a TransactionPushError.
// Such pushes will still wait on the pushee to retry its commit and
// eventually commit or abort. It also has no effect on expired pushees,
// as they would have been aborted anyway. This only impacts pushes
// which would have succeeded due to priority mismatches. In these
// cases, the push acts the same as a short-circuited transaction
// recovery process, because the transaction recovery procedure always
// finalizes target transactions, even if initiated by a PUSH_TIMESTAMP.
if pushType == kvpb.PUSH_TIMESTAMP {
pushType = kvpb.PUSH_ABORT
}
}

pusherIso, pusheeIso := args.PusherTxn.IsoLevel, reply.PusheeTxn.IsoLevel
pusherPri, pusheePri := args.PusherTxn.Priority, reply.PusheeTxn.Priority
pusheeStatus := reply.PusheeTxn.Status
var pusherWins bool
var reason string
switch {
Expand All @@ -258,7 +229,7 @@ func PushTxn(
// If just attempting to cleanup old or already-committed txns,
// pusher always fails.
pusherWins = false
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri):
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus):
reason = "pusher has priority"
pusherWins = true
case args.Force:
Expand All @@ -282,8 +253,16 @@ func PushTxn(
// If the pushed transaction is in the staging state, we can't change its
// record without first going through the transaction recovery process and
// attempting to finalize it.
pusheeStaging := pusheeStatus == roachpb.STAGING
// However, if the pusher is aware that the pushee's currently recorded
// attempt at a parallel commit failed, either because it found intents at a
// higher timestamp than the parallel commit attempt or because it found
// intents at a higher epoch than the parallel commit attempt, it should not
// consider the pushee to be performing a parallel commit. Its commit status
// is not indeterminate.
pusheeStagingFailed := pusheeStaging && (knownHigherTimestamp || knownHigherEpoch)
recoverOnFailedPush := cArgs.EvalCtx.EvalKnobs().RecoverIndeterminateCommitsOnFailedPushes
if reply.PusheeTxn.Status == roachpb.STAGING && (pusherWins || recoverOnFailedPush) {
if pusheeStaging && !pusheeStagingFailed && (pusherWins || recoverOnFailedPush) {
err := kvpb.NewIndeterminateCommitError(reply.PusheeTxn)
log.VEventf(ctx, 1, "%v", err)
return result.Result{}, err
Expand All @@ -298,6 +277,26 @@ func PushTxn(
// Upgrade priority of pushed transaction to one less than pusher's.
reply.PusheeTxn.UpgradePriority(args.PusherTxn.Priority - 1)

// If the pusher is aware that the pushee's currently recorded attempt at a
// parallel commit failed, upgrade PUSH_TIMESTAMPs to PUSH_ABORTs. We don't
// want to move the transaction back to PENDING, as this is not (currently)
// allowed by the recovery protocol. We also don't want to move the
// transaction to a new timestamp while retaining the STAGING status, as this
// could allow the transaction to enter an implicit commit state without its
// knowledge, leading to atomicity violations.
//
// This has no effect on pushes that fail with a TransactionPushError. Such
// pushes will still wait on the pushee to retry its commit and eventually
// commit or abort. It also has no effect on expired pushees, as they would
// have been aborted anyway. This only impacts pushes which would have
// succeeded due to priority mismatches. In these cases, the push acts the
// same as a short-circuited transaction recovery process, because the
// transaction recovery procedure always finalizes target transactions, even
// if initiated by a PUSH_TIMESTAMP.
if pusheeStaging /* => pusheeStagingFailed */ && pushType == kvpb.PUSH_TIMESTAMP {
pushType = kvpb.PUSH_ABORT
}

// Determine what to do with the pushee, based on the push type.
switch pushType {
case kvpb.PUSH_ABORT:
Expand All @@ -306,6 +305,8 @@ func PushTxn(
// Forward the timestamp to accommodate AbortSpan GC. See method comment for
// details.
reply.PusheeTxn.WriteTimestamp.Forward(reply.PusheeTxn.LastActive())
// If the transaction was previously staging, clear its in-flight writes.
reply.PusheeTxn.InFlightWrites = nil
// If the transaction record was already present, persist the updates to it.
// If not, then we don't want to create it. This could allow for finalized
// transactions to be revived. Instead, we obey the invariant that only the
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,18 +747,19 @@ func (c *cluster) PushTransaction(
pusheeTxn, pusheeRecordSig := pusheeRecord.asTxn()
pusheeIso := pusheeTxn.IsoLevel
pusheePri := pusheeTxn.Priority
pusheeStatus := pusheeTxn.Status
// NOTE: this logic is adapted from cmd_push_txn.go.
var pusherWins bool
switch {
case pusheeTxn.Status.IsFinalized():
case pusheeStatus.IsFinalized():
// Already finalized.
return pusheeTxn, nil
case pushType == kvpb.PUSH_TIMESTAMP && pushTo.LessEq(pusheeTxn.WriteTimestamp):
// Already pushed.
return pusheeTxn, nil
case pushType == kvpb.PUSH_TOUCH:
pusherWins = false
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri):
case txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus):
pusherWins = true
default:
pusherWins = false
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1298,7 +1298,11 @@ func canPushWithPriority(req Request, s waitingState) bool {
}
pusheeIso = s.txn.IsoLevel
pusheePri = s.txn.Priority
return txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri)
// We assume that the pushee is in the PENDING state when deciding whether
// to push. A push may determine that the pushee is STAGING or has already
// been finalized.
pusheeStatus := roachpb.PENDING
return txnwait.CanPushWithPriority(pushType, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus)
}

func logResolveIntent(ctx context.Context, intent roachpb.LockUpdate) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_send.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ func (r *Replica) handleTransactionPushError(
dontRetry := r.store.cfg.TestingKnobs.DontRetryPushTxnFailures
if !dontRetry && ba.IsSinglePushTxnRequest() {
pushReq := ba.Requests[0].GetInner().(*kvpb.PushTxnRequest)
dontRetry = txnwait.ShouldPushImmediately(pushReq)
dontRetry = txnwait.ShouldPushImmediately(pushReq, t.PusheeTxn.Status)
}
if dontRetry {
return g, pErr
Expand Down
23 changes: 17 additions & 6 deletions pkg/kv/kvserver/txnwait/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,26 @@ func TestingOverrideTxnLivenessThreshold(t time.Duration) func() {
// proceed without queueing. This is true for pushes which are neither
// ABORT nor TIMESTAMP, but also for ABORT and TIMESTAMP pushes where
// the pushee has min priority or pusher has max priority.
func ShouldPushImmediately(req *kvpb.PushTxnRequest) bool {
func ShouldPushImmediately(req *kvpb.PushTxnRequest, pusheeStatus roachpb.TransactionStatus) bool {
if req.Force {
return true
}
return CanPushWithPriority(
req.PushType,
req.PusherTxn.IsoLevel, req.PusheeTxn.IsoLevel,
req.PusherTxn.Priority, req.PusheeTxn.Priority,
pusheeStatus,
)
}

// CanPushWithPriority returns true if the pusher can perform the specified push
// type on the pushee, based on the two txns' isolation levels and priorities.
// type on the pushee, based on the two txns' isolation levels, their priorities,
// and the pushee's status.
func CanPushWithPriority(
pushType kvpb.PushTxnType,
pusherIso, pusheeIso isolation.Level,
pusherPri, pusheePri enginepb.TxnPriority,
pusheeStatus roachpb.TransactionStatus,
) bool {
// Normalize the transaction priorities so that normal user priorities are
// considered equal for the purposes of pushing.
Expand All @@ -103,6 +106,15 @@ func CanPushWithPriority(
case kvpb.PUSH_ABORT:
return pusherPri > pusheePri
case kvpb.PUSH_TIMESTAMP:
// If the pushee transaction is STAGING, only let the PUSH_TIMESTAMP through
// to disrupt the transaction commit if the pusher has a higher priority. If
// the priorities are equal, the PUSH_TIMESTAMP should wait for the commit
// to complete.
if pusheeStatus == roachpb.STAGING {
return pusherPri > pusheePri
}
// Otherwise, the pushee has not yet started committing...

// If the pushee transaction tolerates write skew, the PUSH_TIMESTAMP is
// harmless, so let it through.
return pusheeIso.ToleratesWriteSkew() ||
Expand Down Expand Up @@ -476,10 +488,6 @@ func (q *Queue) releaseWaitingQueriesLocked(ctx context.Context, txnID uuid.UUID
func (q *Queue) MaybeWaitForPush(
ctx context.Context, req *kvpb.PushTxnRequest,
) (*kvpb.PushTxnResponse, *kvpb.Error) {
if ShouldPushImmediately(req) {
return nil, nil
}

q.mu.Lock()
// If the txn wait queue is not enabled or if the request is not
// contained within the replica, do nothing. The request can fall
Expand All @@ -501,6 +509,9 @@ func (q *Queue) MaybeWaitForPush(
if txn := pending.getTxn(); isPushed(req, txn) {
q.mu.Unlock()
return createPushTxnResponse(txn), nil
} else if ShouldPushImmediately(req, txn.Status) {
q.mu.Unlock()
return nil, nil
}

push := &waitingPush{
Expand Down
109 changes: 85 additions & 24 deletions pkg/kv/kvserver/txnwait/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ func TestShouldPushImmediately(t *testing.T) {
Priority: test.pusheePri,
},
}
shouldPush := ShouldPushImmediately(&req)
pusheeStatus := roachpb.PENDING
shouldPush := ShouldPushImmediately(&req, pusheeStatus)
require.Equal(t, test.shouldPush, shouldPush)
})
}
Expand All @@ -179,6 +180,7 @@ func testCanPushWithPriorityPushAbort(t *testing.T) {
max := enginepb.MaxTxnPriority
mid1 := enginepb.TxnPriority(1)
mid2 := enginepb.TxnPriority(2)
statuses := []roachpb.TransactionStatus{roachpb.PENDING, roachpb.STAGING}
testCases := []struct {
pusherPri enginepb.TxnPriority
pusheePri enginepb.TxnPriority
Expand All @@ -201,22 +203,31 @@ func testCanPushWithPriorityPushAbort(t *testing.T) {
{max, mid2, true},
{max, max, false},
}
// NOTE: the behavior of PUSH_ABORT pushes is agnostic to isolation levels.
for _, pusherIso := range isolation.Levels() {
for _, pusheeIso := range isolation.Levels() {
for _, test := range testCases {
name := fmt.Sprintf("pusherIso=%s/pusheeIso=%s/pusherPri=%d/pusheePri=%d",
pusherIso, pusheeIso, test.pusherPri, test.pusheePri)
t.Run(name, func(t *testing.T) {
canPush := CanPushWithPriority(kvpb.PUSH_ABORT, pusherIso, pusheeIso, test.pusherPri, test.pusheePri)
require.Equal(t, test.exp, canPush)
})
// NOTE: the behavior of PUSH_ABORT pushes is agnostic to pushee status.
for _, pusheeStatus := range statuses {
// NOTE: the behavior of PUSH_ABORT pushes is agnostic to isolation levels.
for _, pusherIso := range isolation.Levels() {
for _, pusheeIso := range isolation.Levels() {
for _, test := range testCases {
name := fmt.Sprintf("pusheeStatus=%s/pusherIso=%s/pusheeIso=%s/pusherPri=%d/pusheePri=%d",
pusheeStatus, pusherIso, pusheeIso, test.pusherPri, test.pusheePri)
t.Run(name, func(t *testing.T) {
canPush := CanPushWithPriority(
kvpb.PUSH_ABORT, pusherIso, pusheeIso, test.pusherPri, test.pusheePri, pusheeStatus)
require.Equal(t, test.exp, canPush)
})
}
}
}
}
}

func testCanPushWithPriorityPushTimestamp(t *testing.T) {
t.Run("pusheeStatus="+roachpb.PENDING.String(), testCanPushWithPriorityPushTimestampPusheePending)
t.Run("pusheeStatus="+roachpb.STAGING.String(), testCanPushWithPriorityPushTimestampPusheeStaging)
}

func testCanPushWithPriorityPushTimestampPusheePending(t *testing.T) {
SSI := isolation.Serializable
SI := isolation.Snapshot
RC := isolation.ReadCommitted
Expand Down Expand Up @@ -389,30 +400,80 @@ func testCanPushWithPriorityPushTimestamp(t *testing.T) {
name := fmt.Sprintf("pusherIso=%s/pusheeIso=%s/pusherPri=%d/pusheePri=%d",
test.pusherIso, test.pusheeIso, test.pusherPri, test.pusheePri)
t.Run(name, func(t *testing.T) {
canPush := CanPushWithPriority(kvpb.PUSH_TIMESTAMP, test.pusherIso, test.pusheeIso, test.pusherPri, test.pusheePri)
canPush := CanPushWithPriority(
kvpb.PUSH_TIMESTAMP, test.pusherIso, test.pusheeIso, test.pusherPri, test.pusheePri, roachpb.PENDING)
require.Equal(t, test.exp, canPush)
})
}
}

func testCanPushWithPriorityPushTouch(t *testing.T) {
func testCanPushWithPriorityPushTimestampPusheeStaging(t *testing.T) {
min := enginepb.MinTxnPriority
max := enginepb.MaxTxnPriority
mid1 := enginepb.TxnPriority(1)
mid2 := enginepb.TxnPriority(2)
priorities := []enginepb.TxnPriority{min, mid1, mid2, max}
// NOTE: the behavior of PUSH_TOUCH pushes is agnostic to isolation levels.
testCases := []struct {
pusherPri enginepb.TxnPriority
pusheePri enginepb.TxnPriority
exp bool
}{
{min, min, false},
{min, mid1, false},
{min, mid2, false},
{min, max, false},
{mid1, min, true},
{mid1, mid1, false},
{mid1, mid2, false},
{mid1, max, false},
{mid2, min, true},
{mid2, mid1, false},
{mid2, mid2, false},
{mid2, max, false},
{max, min, true},
{max, mid1, true},
{max, mid2, true},
{max, max, false},
}
// NOTE: the behavior of PUSH_TIMESTAMP pushes is agnostic to isolation levels
// when the pushee transaction is STAGING.
for _, pusherIso := range isolation.Levels() {
for _, pusheeIso := range isolation.Levels() {
// NOTE: the behavior of PUSH_TOUCH pushes is agnostic to txn priorities.
for _, pusherPri := range priorities {
for _, pusheePri := range priorities {
name := fmt.Sprintf("pusherIso=%s/pusheeIso=%s/pusherPri=%d/pusheePri=%d",
pusherIso, pusheeIso, pusherPri, pusheePri)
t.Run(name, func(t *testing.T) {
canPush := CanPushWithPriority(kvpb.PUSH_TOUCH, pusherIso, pusheeIso, pusherPri, pusheePri)
require.True(t, canPush)
})
for _, test := range testCases {
name := fmt.Sprintf("pusherIso=%s/pusheeIso=%s/pusherPri=%d/pusheePri=%d",
pusherIso, pusheeIso, test.pusherPri, test.pusheePri)
t.Run(name, func(t *testing.T) {
canPush := CanPushWithPriority(
kvpb.PUSH_TIMESTAMP, pusherIso, pusheeIso, test.pusherPri, test.pusheePri, roachpb.STAGING)
require.Equal(t, test.exp, canPush)
})
}
}
}
}

func testCanPushWithPriorityPushTouch(t *testing.T) {
min := enginepb.MinTxnPriority
max := enginepb.MaxTxnPriority
mid1 := enginepb.TxnPriority(1)
mid2 := enginepb.TxnPriority(2)
priorities := []enginepb.TxnPriority{min, mid1, mid2, max}
statuses := []roachpb.TransactionStatus{roachpb.PENDING, roachpb.STAGING}
// NOTE: the behavior of PUSH_TOUCH pushes is agnostic to pushee status.
for _, pusheeStatus := range statuses {
// NOTE: the behavior of PUSH_TOUCH pushes is agnostic to isolation levels.
for _, pusherIso := range isolation.Levels() {
for _, pusheeIso := range isolation.Levels() {
// NOTE: the behavior of PUSH_TOUCH pushes is agnostic to txn priorities.
for _, pusherPri := range priorities {
for _, pusheePri := range priorities {
name := fmt.Sprintf("pusheeStatus=%s/pusherIso=%s/pusheeIso=%s/pusherPri=%d/pusheePri=%d",
pusheeStatus, pusherIso, pusheeIso, pusherPri, pusheePri)
t.Run(name, func(t *testing.T) {
canPush := CanPushWithPriority(
kvpb.PUSH_TOUCH, pusherIso, pusheeIso, pusherPri, pusheePri, pusheeStatus)
require.True(t, canPush)
})
}
}
}
}
Expand Down

0 comments on commit 3d49079

Please sign in to comment.