Skip to content

Commit

Permalink
kv: prevent STAGING -> PENDING transition during high-priority push
Browse files Browse the repository at this point in the history
Fixes #61992.
Fixes #62064.

This commit fixes a bug uncovered recently (for less than obvious
reasons) in cdc roachtests where a STAGING transaction could have its
transaction record moved back to a PENDING state without changing epochs
but after its timestamp was bumped. This could result in concurrent
transaction recovery attempts returning `programming error: cannot
recover PENDING transaction in same epoch` errors, because such a state
transition was not expected to be possible by transaction recovery.
However, as we found in #61992, this has actually been possible since
01bc20e.

This commit fixes the bug by detecting cases where a pusher knows of a
failed parallel commit and selectively upgrading PUSH_TIMESTAMP push
attempts to PUSH_ABORTs. This has no effect on pushes that fail with a
TransactionPushError. Such pushes will still wait on the pushee to retry
its commit and eventually commit or abort. It also has no effect on
expired pushees, as they would have been aborted anyway. This only
impacts pushes which would have succeeded due to priority mismatches. In
these cases, the push acts the same as a short-circuited transaction
recovery process, because the transaction recovery procedure always
finalizes target transactions, even if initiated by a PUSH_TIMESTAMP.

This seems very rare in practice, as it requires a few specific
interactions to line up just right, including:
- a STAGING transaction that has one of its in-flight intent writes bumped
- a rangefeed processor listening to that intent write
- a separate request that conflicts with a different intent
- a STAGING transaction which expires to allow transaction recovery
- a rangefeed processor push between the time of the request push and the request recovery

Still, this fix well contained, so I think we should backport it to all
of the release branches. However, since this issue does seem rare and
also can not cause corruption or atomicity violations, I wanted to be
conservative with the backport, so I'm going to let this bake on master
+ release-21.1 for a few weeks before merging the backport.

Release notes (bug fix): an improper interaction between conflicting
transactions which could result in spurious `cannot recover PENDING
transaction in same epoch` errors was fixed.
  • Loading branch information
nvanbenschoten committed Mar 29, 2021
1 parent 0e70529 commit 90fd58a
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 10 deletions.
33 changes: 27 additions & 6 deletions pkg/kv/kvserver/batcheval/cmd_push_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,20 +185,21 @@ func PushTxn(

// If we're trying to move the timestamp forward, and it's already
// far enough forward, return success.
if args.PushType == roachpb.PUSH_TIMESTAMP && args.PushTo.LessEq(reply.PusheeTxn.WriteTimestamp) {
pushType := args.PushType
if pushType == roachpb.PUSH_TIMESTAMP && args.PushTo.LessEq(reply.PusheeTxn.WriteTimestamp) {
// Trivial noop.
return result.Result{}, nil
}

// The pusher might be aware of a newer version of the pushee.
increasedEpochOrTimestamp := false
var knownHigherTimestamp, knownHigherEpoch bool
if reply.PusheeTxn.WriteTimestamp.Less(args.PusheeTxn.WriteTimestamp) {
reply.PusheeTxn.WriteTimestamp = args.PusheeTxn.WriteTimestamp
increasedEpochOrTimestamp = true
knownHigherTimestamp = true
}
if reply.PusheeTxn.Epoch < args.PusheeTxn.Epoch {
reply.PusheeTxn.Epoch = args.PusheeTxn.Epoch
increasedEpochOrTimestamp = true
knownHigherEpoch = true
}
reply.PusheeTxn.UpgradePriority(args.PusheeTxn.Priority)

Expand All @@ -208,12 +209,32 @@ func PushTxn(
// a higher epoch than the parallel commit attempt, it should not consider
// the pushee to be performing a parallel commit. Its commit status is not
// indeterminate.
if increasedEpochOrTimestamp && reply.PusheeTxn.Status == roachpb.STAGING {
if (knownHigherTimestamp || knownHigherEpoch) && reply.PusheeTxn.Status == roachpb.STAGING {
reply.PusheeTxn.Status = roachpb.PENDING
reply.PusheeTxn.InFlightWrites = nil
// If the pusher is aware that the pushee's currently recorded attempt
// at a parallel commit failed but the transaction's epoch has not yet
// been incremented, upgrade PUSH_TIMESTAMPs to PUSH_ABORTs. We don't
// want to move the transaction back to PENDING in the same epoch, as
// this is not (currently) allowed by the recovery protocol. We also
// don't want to move the transaction to a new timestamp while retaining
// the STAGING status, as this could allow the transaction to enter an
// implicit commit state without its knowledge, leading to atomicity
// violations.
//
// This has no effect on pushes that fail with a TransactionPushError.
// Such pushes will still wait on the pushee to retry its commit and
// eventually commit or abort. It also has no effect on expired pushees,
// as they would have been aborted anyway. This only impacts pushes
// which would have succeeded due to priority mismatches. In these
// cases, the push acts the same as a short-circuited transaction
// recovery process, because the transaction recovery procedure always
// finalizes target transactions, even if initiated by a PUSH_TIMESTAMP.
if !knownHigherEpoch && pushType == roachpb.PUSH_TIMESTAMP {
pushType = roachpb.PUSH_ABORT
}
}

pushType := args.PushType
var pusherWins bool
var reason string

Expand Down
26 changes: 24 additions & 2 deletions pkg/kv/kvserver/batcheval/cmd_recover_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,16 +174,38 @@ func TestRecoverTxnRecordChanged(t *testing.T) {
}(),
},
{
name: "transaction restart after write prevented",
name: "transaction restart (pending) after write prevented",
implicitlyCommitted: false,
changedTxn: func() roachpb.Transaction {
txnCopy := txn
txnCopy.BumpEpoch()
txnCopy.Status = roachpb.PENDING
return txnCopy
}(),
},
{
name: "transaction timestamp increase after write prevented",
name: "transaction restart (staging) after write prevented",
implicitlyCommitted: false,
changedTxn: func() roachpb.Transaction {
txnCopy := txn
txnCopy.BumpEpoch()
return txnCopy
}(),
},
{
name: "transaction timestamp increase (pending) after write prevented",
implicitlyCommitted: false,
expError: "cannot recover PENDING transaction in same epoch",
changedTxn: func() roachpb.Transaction {
txnCopy := txn
txnCopy.Status = roachpb.PENDING
txnCopy.InFlightWrites = nil
txnCopy.WriteTimestamp = txnCopy.WriteTimestamp.Add(1, 0)
return txnCopy
}(),
},
{
name: "transaction timestamp increase (staging) after write prevented",
implicitlyCommitted: false,
changedTxn: func() roachpb.Transaction {
txnCopy := txn
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11367,8 +11367,8 @@ func TestTxnRecordLifecycleTransitions(t *testing.T) {
return sendWrappedWithErr(roachpb.Header{}, &pt)
},
expTxn: func(txn *roachpb.Transaction, pushTs hlc.Timestamp) roachpb.TransactionRecord {
record := txn.AsRecord()
record.WriteTimestamp.Forward(pushTs)
record := txnWithStatus(roachpb.ABORTED)(txn, pushTs)
record.WriteTimestamp = record.WriteTimestamp.Add(0, 1)
record.Priority = pusher.Priority - 1
return record
},
Expand Down
130 changes: 130 additions & 0 deletions pkg/kv/kvserver/txn_recovery_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -197,6 +198,135 @@ func TestTxnRecoveryFromStaging(t *testing.T) {
}
}

// TestTxnRecoveryFromStagingWithHighPriority tests the transaction recovery
// process initiated by a high-priority operation which encounters a staging
// transaction. The test contains a subtest for each of the combinations of the
// following boolean options:
//
// - pushAbort: configures whether or not the high-priority operation is a
// read (false) or a write (true), which dictates the kind of push
// operation dispatched against the staging transaction.
//
// - newEpoch: configures whether or not the staging transaction wrote the
// intent which the high-priority operation conflicts with at a higher
// epoch than it is staged at. If true, the staging transaction is not
// implicitly committed.
//
// - newTimestamp: configures whether or not the staging transaction wrote the
// intent which the high-priority operation conflicts with at a higher
// timestamp than it is staged at. If true, the staging transaction is not
// implicitly committed.
//
func TestTxnRecoveryFromStagingWithHighPriority(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

run := func(t *testing.T, pushAbort, newEpoch, newTimestamp bool) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
manual := hlc.NewManualClock(123)
cfg := TestStoreConfig(hlc.NewClock(manual.UnixNano, time.Nanosecond))
store := createTestStoreWithConfig(t, stopper, testStoreOpts{createSystemRanges: true}, &cfg)

// Create a transaction that will get stuck performing a parallel
// commit.
keyA, keyB := roachpb.Key("a"), roachpb.Key("b")
txn := newTransaction("txn", keyA, 1, store.Clock())

// Issue two writes, which will be considered in-flight at the time of
// the transaction's EndTxn request.
keyAVal := []byte("value")
pArgs := putArgs(keyA, keyAVal)
pArgs.Sequence = 1
h := roachpb.Header{Txn: txn}
_, pErr := kv.SendWrappedWith(ctx, store.TestSender(), h, &pArgs)
require.Nil(t, pErr)

// The second write may or may not be bumped.
pArgs = putArgs(keyB, []byte("value2"))
pArgs.Sequence = 2
h2 := roachpb.Header{Txn: txn.Clone()}
if newEpoch {
h2.Txn.BumpEpoch()
}
if newTimestamp {
manual.Increment(100)
h2.Txn.WriteTimestamp = store.Clock().Now()
}
_, pErr = kv.SendWrappedWith(ctx, store.TestSender(), h2, &pArgs)
require.Nil(t, pErr)

// Issue a parallel commit, which will put the transaction into a
// STAGING state. Include both writes as the EndTxn's in-flight writes.
et, etH := endTxnArgs(txn, true)
et.InFlightWrites = []roachpb.SequencedWrite{
{Key: keyA, Sequence: 1},
{Key: keyB, Sequence: 2},
}
etReply, pErr := kv.SendWrappedWith(ctx, store.TestSender(), etH, &et)
require.Nil(t, pErr)
require.Equal(t, roachpb.STAGING, etReply.Header().Txn.Status)

// Issue a conflicting, high-priority operation.
var conflictArgs roachpb.Request
if pushAbort {
pArgs = putArgs(keyB, []byte("value3"))
conflictArgs = &pArgs
} else {
gArgs := getArgs(keyB)
conflictArgs = &gArgs
}
manual.Increment(100)
conflictH := roachpb.Header{
UserPriority: roachpb.MaxUserPriority,
Timestamp: store.Clock().Now(),
}
_, pErr = kv.SendWrappedWith(ctx, store.TestSender(), conflictH, conflictArgs)
require.Nil(t, pErr)

// Query the transaction and verify that it has the right state.
qtArgs := queryTxnArgs(txn.TxnMeta, false /* waitForUpdate */)
qtReply, pErr := kv.SendWrapped(ctx, store.TestSender(), &qtArgs)
require.Nil(t, pErr)
qtTxn := qtReply.(*roachpb.QueryTxnResponse).QueriedTxn

if !newEpoch && !newTimestamp {
// The transaction was implicitly committed at its initial epoch and
// timestamp.
require.Equal(t, roachpb.COMMITTED, qtTxn.Status)
require.Equal(t, txn.Epoch, qtTxn.Epoch)
require.Equal(t, txn.WriteTimestamp, qtTxn.WriteTimestamp)
} else if newEpoch {
// The transaction is aborted if that's what the high-priority
// request wants. Otherwise, the transaction's record is bumped to
// the new epoch pulled from its intent and pushed above the
// high-priority request's timestamp.
if pushAbort {
require.Equal(t, roachpb.ABORTED, qtTxn.Status)
} else /* pushTimestamp */ {
require.Equal(t, roachpb.PENDING, qtTxn.Status)
require.Equal(t, txn.Epoch+1, qtTxn.Epoch)
require.Equal(t, conflictH.Timestamp.Next(), qtTxn.WriteTimestamp)
}
} else /* if newTimestamp */ {
// The transaction is aborted, even if the high-priority request
// only needed it to be pushed to a higher timestamp. This is
// because we don't allow a STAGING transaction record to move back
// to PENDING in the same epoch.
require.Equal(t, roachpb.ABORTED, qtTxn.Status)
}
}

testutils.RunTrueAndFalse(t, "push_abort", func(t *testing.T, pushAbort bool) {
testutils.RunTrueAndFalse(t, "new_epoch", func(t *testing.T, newEpoch bool) {
testutils.RunTrueAndFalse(t, "new_timestamp", func(t *testing.T, newTimestamp bool) {
run(t, pushAbort, newEpoch, newTimestamp)
})
})
})
}

// TestTxnClearRangeIntents tests whether a ClearRange call blindly removes
// write intents. This can cause it to remove an intent from an implicitly
// committed STAGING txn. When txn recovery kicks in, it will fail to find the
Expand Down

0 comments on commit 90fd58a

Please sign in to comment.